初めてのGearpumpアプリケーションを書く
Geapumpアプリケーションを書く方法を説明するために、例としてwordcount を使うつもりです。
Maven/Sbt 設定
Maven Settingで、リポジトリとライブラリの依存物を見つけることができます。
IDE セットアップ (任意)
このガイドに従ってGearpumpの準備が整った好みのIDEを取得できます。
プロセッサ(タスク) クラスとパーティショナークラスを定義する
アプリケーションはプロセッサーの有向非循環グラフ(DAG)です。wordcountの例で、まず最初に2つのプロセッサSplit
と Sum
を定義し、それらを撚り合わせるつもりです。
メッセージタイプについて
ユーザは任意の種類のメッセージ(NULL、Nothing および Unit を除くどのような種類でも)を送信することができます。
case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp)
Split プロセッサ
Splitプロセッサ内で、単純に先に定義されたテキスト(内容は簡潔さのために単純化されています)を分割し、各分割された単語をSumに送信します。
Scala:
class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
override def onStart(startTime : StartTime) : Unit = {
self ! Message("start")
}
override def onNext(msg : Message) : Unit = {
Split.TEXT_TO_SPLIT.lines.foreach { line =>
line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
output(new Message(msg, System.currentTimeMillis()))
}
}
self ! Message("continue", System.currentTimeMillis())
}
}
object Split {
val TEXT_TO_SPLIT = "some text"
}
Splitのように、各プロセッサは TaskActor
を継承します。onStart
メソッドは全てのメッセージがやってくる前に一度だけ呼ばれます; 各メッセージを処理するためにonNext
メソッドが呼ばれます。Gearpumpはメッセージ駆動モデルを採用していることに注意してください。、次のメッセージの処理を引き起こすためにSplitがonStart
とonNext
の最後でメッセージを送信しているのはそのためです。
Sum プロセッサ
Sumプロセッサの構造は似たようなものです。SumはSplitからメッセージを受信するため、自身へはメッセージを送信する必要はありません。
Scala:
class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private[wordcount] val map : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
private[wordcount] var wordCount : Long = 0
private var snapShotTime : Long = System.currentTimeMillis()
private var snapShotWordCount : Long = 0
private var scheduler : Cancellable = null
override def onStart(startTime : StartTime) : Unit = {
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount)
}
override def onNext(msg : Message) : Unit = {
if (null == msg) {
return
}
val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
wordCount += 1
map.put(msg.msg.asInstanceOf[String], current + 1)
}
override def onStop() : Unit = {
if (scheduler != null) {
scheduler.cancel()
}
}
def reportWordCount() : Unit = {
val current : Long = System.currentTimeMillis()
LOG.info(s"Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
snapShotWordCount = wordCount
snapShotTime = current
}
}
合計の計算に加えて、5秒毎にレポートをするスケジュールも定義します。スケジューラは計算が完了した時点でキャンセルされなければなりません。これはonStop
メソッドを上書きすることで達成されます。onStop
のデフォルトの実装は no-op です。
パーティショナー
プロセッサーはタスクのリストに並行化されます。Partitioner
はデータがSplitとSumのタスクの間にどうやってシャッフルされるかを定義します。Gearpump はすでに2つのパーティショナーを持っています
HashPartitioner
: メッセージのハッシュコードに基づいたパーティションデータShufflePartitioner
: ラウンドロビンの方法のパーティションデータ。
Partitioner
trait を拡張し、getPartition
メソッドを上書きすることで、独自のパーティショナーを定義することができます。
trait Partitioner extends Serializable {
def getPartition(msg : Message, partitionNum : Int) : Int
}
TaskDescription と AppDescription を定義する
これで、上のコンポーネントを一緒に組み込んで独自のアプリケーションクラスを書くことができます。
アプリケーションクラスは、App
と、引数のパースとmain関数の実行を容易にする ArgumentsParserを拡張します。
object WordCount extends App with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
val RUN_FOR_EVER = -1
override val options: Array[(String, CLIOption[Any])] = Array(
"split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)),
"sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
)
def application(config: ParseResult) : StreamApplication = {
val splitNum = config.getInt("split")
val sumNum = config.getInt("sum")
val partitioner = new HashPartitioner()
val split = Processor[Split](splitNum)
val sum = Processor[Sum](sumNum)
val app = StreamApplication("wordCount", Graph[Processor[_ <: Task], Partitioner](split ~ partitioner ~> sum), UserConfig.empty)
app
}
val config = parse(args)
val context = ClientContext()
val appId = context.submit(application(config))
context.close()
}
options
の値を上書きし、パースするコマンドライン引数の配列を定義します。マスターのホストとポート、splitとsumタスクの並行化、例をどれだけ長く実行するかを、アプリケーションのユーザに渡して欲しいです。オプションが必須
かどうかを指定し、幾つかの引数のdefaultValue
も提供します。
コマンドライン引数のParseResult
という条件で、SplitとSumプロセッサのためのTaskDescription
を生成し、DAG APIを使ってそれらを HashPartitioner
に接続します。グラフはAppDescrition
にラップされます。これは最終的にマスターにサブミットされます。
アプリケーションをサブミットする
それらが全て終わった後で、全てをuber jarにパッケージ化し、そのjarをGearpumpクラスタにサブミットする必要があります。コマンドラインツールの構文についてはアプリケーション サブミットツール をチェックしてください。
上級のトピック
実際のアプリケーションについては、プロセッサ間で渡す独自のメッセージを定義する必要が確実にあるでしょう。Customized message needs customized serializer to help message passing over wire. シリアライザをカスタマイズする方法はこのガイド をチェックしてください。
非ストリーミングの使用のためのGearpump
Gearpump は基本のプラットフォームとして非ストリーミングアプリケーションを開発することもできます。分散シェルを開発するためにGearpumpを使う方法は このガイド を見てください。