Bagel プログラミング ガイド
Bagel は非推奨で、GraphXに取って変わられます。
Bagel はgoogleの Pregel グラフ処理フレームワークのSpark実装です。Bagelは今のところ基本的な計算、結合および集約をサポートします。
Pregel のプログラミング モデルでは、ジョブはsupersteps と呼ばれる繰り返しの系列として実行されます。各superstepにおいて、グラフの各頂点は頂点に関する状態を更新し 次の繰り返しで使われる他の頂点へのメッセージを送信するユーザ定義関数を実行します。
このガイドでは、BagelでのPageRankの実装例を一通り行うことで、プログラミングモデルおよびBagelの特徴を説明します。
Bagelを使ったリンク
プログラム内でBagelを使用するには、以下の依存するSBTあるいMavenを追加します:
groupId = org.apache.spark
artifactId = spark-bagel_2.10
version = 1.6.0
プログラミング モデル
Bagelは(K, V)ペアの分散データセットとして表現されるグラフを操作します。ここでキーは頂点IDで、値は頂点およびそれに関する状態です。各superstepでは、Bagelは現在の頂点の状態と全開のsuperstepの間に頂点に送信されたメッセージのリストを入力とするユーザ定義計算関数を各頂点で実行し、新しい頂点の状態と出力メッセージのリストを返します。
例えば、PageRankの実装にBagelを使うことができます。ここで、頂点はページを表し、辺はページ間のリンクを表し、メッセージはあるページがリンクしているページへ送信されるPageRankの割り当てを表します。
まずデフォルトの 頂点
クラスを頂点の現在のページランクを表すDouble
を格納するために拡張子、同じようにメッセージ
および 辺
クラスを拡張します。Sparkkがそれらをマシーン間で転送できるように@serializable
と印をつける必要があることに注意してください。また、Bagelタイプと明示的な変換を導入します。
import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._
@serializable class PREdge(val targetId: String) extends Edge
@serializable class PRVertex(
val id: String, val rank: Double, val outEdges: Seq[Edge],
val active: Boolean) extends Vertex
@serializable class PRMessage(
val targetId: String, val rankShare: Double) extends Message
次に、テキストファイルから分散データセットとしてサンプルのグラフをロードし、それをPRVertex
オブジェクトにパッケージします。Bagelはその分散データセットを複数回使うかもしれないので再計算を防ぐためにそれらをキャッシュします。
val input = sc.textFile("data/mllib/pagerank_data.txt")
val numVerts = input.count()
val verts = input.map(line => {
val fields = line.split('\t')
val (id, linksStr) = (fields(0), fields(1))
val links = linksStr.split(',').map(new PREdge(_))
(id, new PRVertex(id, 1.0 / numVerts, links, true))
}).cache
We run the Bagel job, passing in verts
, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations.
val emptyMsgs = sc.parallelize(List[(String, PRMessage)]())
def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int)
: (PRVertex, Iterable[PRMessage]) = {
val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum
val newRank =
if (msgSum != 0)
0.15 / numVerts + 0.85 * msgSum
else
self.rank
val halt = superstep >= 10
val msgsOut =
if (!halt)
self.outEdges.map(edge =>
new PRMessage(edge.targetId, newRank / self.outEdges.size))
else
List()
(new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut)
}
val result = Bagel.run(sc, verts, emptyMsgs)()(compute)
最後に、結果を出力します。
println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString)
Combiners
他の頂点にメッセージを送信することは、一般的にネットワーク上の高くつく通信が必要になります。特定のアルゴリズムについては、combinersを使用して通信量を減らすことが可能です。例えば、計算関数が数値メッセージを受け取りそれらの合計値のみを使用する場合は、Bagelが同じ頂点への複数のメッセージを合計することが可能です。
combinerのサポートのために、Bagelはメッセージを組み合わせ形式に変換する一連のbombiner関数を任意で取ることができます。
例: combinerを使ったページランク
Aggregators
Aggregatorは各superstepの後で全ての頂点での削減を行い、次のsuperstepで各頂点の結果を提供します。
aggregator のサポートのために、Bagelは各頂点に渡って削減を行うaggregator関数を任意に取る事ができます。
例
操作
以下はBagel APIでのアクションとタイプです。詳細はBagel.scala を見てください。
アクション
/*** Full form ***/
Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
/*** Abbreviated forms ***/
Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
Bagel.run(sc, vertices, messages, combiner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
Bagel.run(sc, vertices, messages, numSplits)(compute)
// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
タイプ
trait Combiner[M, C] {
def createCombiner(msg: M): C
def mergeMsg(combiner: C, msg: M): C
def mergeCombiners(a: C, b: C): C
}
trait Aggregator[V, A] {
def createAggregator(vert: V): A
def mergeAggregators(a: A, b: A): A
}
trait Vertex {
def active: Boolean
}
trait Message[K] {
def targetId: K
}