初めてのGearpumpアプリケーションを書く

Geapumpアプリケーションを書く方法を説明するために、例としてwordcount を使うつもりです。

Maven/Sbt 設定

Maven Settingで、リポジトリとライブラリの依存物を見つけることができます。

IDE セットアップ (任意)

このガイドに従ってGearpumpの準備が整った好みのIDEを取得できます。

プロセッサ(タスク) クラスとパーティショナークラスを定義する

アプリケーションはプロセッサーの有向非循環グラフ(DAG)です。wordcountの例で、まず最初に2つのプロセッサSplitSumを定義し、それらを撚り合わせるつもりです。

メッセージタイプについて

ユーザは任意の種類のメッセージ(NULL、Nothing および Unit を除くどのような種類でも)を送信することができます。

case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp)

Split プロセッサ

Splitプロセッサ内で、単純に先に定義されたテキスト(内容は簡潔さのために単純化されています)を分割し、各分割された単語をSumに送信します。

Scala:

class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
  import taskContext.output

  override def onStart(startTime : StartTime) : Unit = {
    self ! Message("start")
  }

  override def onNext(msg : Message) : Unit = {
    Split.TEXT_TO_SPLIT.lines.foreach { line =>
      line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
        output(new Message(msg, System.currentTimeMillis()))
      }
    }
    self ! Message("continue", System.currentTimeMillis())
  }
}

object Split {
  val TEXT_TO_SPLIT = "some text"
}

Splitのように、各プロセッサは TaskActorを継承します。onStartメソッドは全てのメッセージがやってくる前に一度だけ呼ばれます; 各メッセージを処理するためにonNextメソッドが呼ばれます。Gearpumpはメッセージ駆動モデルを採用していることに注意してください。、次のメッセージの処理を引き起こすためにSplitがonStartonNextの最後でメッセージを送信しているのはそのためです。

Sum プロセッサ

Sumプロセッサの構造は似たようなものです。SumはSplitからメッセージを受信するため、自身へはメッセージを送信する必要はありません。

Scala:

class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
  private[wordcount] val map : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()

  private[wordcount] var wordCount : Long = 0
  private var snapShotTime : Long = System.currentTimeMillis()
  private var snapShotWordCount : Long = 0

  private var scheduler : Cancellable = null

  override def onStart(startTime : StartTime) : Unit = {
    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
      new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount)
  }

  override def onNext(msg : Message) : Unit = {
    if (null == msg) {
      return
    }
    val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
    wordCount += 1
    map.put(msg.msg.asInstanceOf[String], current + 1)
  }

  override def onStop() : Unit = {
    if (scheduler != null) {
      scheduler.cancel()
    }
  }

  def reportWordCount() : Unit = {
    val current : Long = System.currentTimeMillis()
    LOG.info(s"Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
    snapShotWordCount = wordCount
    snapShotTime = current
  }
}

合計の計算に加えて、5秒毎にレポートをするスケジュールも定義します。スケジューラは計算が完了した時点でキャンセルされなければなりません。これはonStopメソッドを上書きすることで達成されます。onStop のデフォルトの実装は no-op です。

パーティショナー

プロセッサーはタスクのリストに並行化されます。Partitioner はデータがSplitとSumのタスクの間にどうやってシャッフルされるかを定義します。Gearpump はすでに2つのパーティショナーを持っています

Partitioner trait を拡張し、getPartition メソッドを上書きすることで、独自のパーティショナーを定義することができます。

trait Partitioner extends Serializable {
  def getPartition(msg : Message, partitionNum : Int) : Int
}

TaskDescription と AppDescription を定義する

これで、上のコンポーネントを一緒に組み込んで独自のアプリケーションクラスを書くことができます。

アプリケーションクラスは、App と、引数のパースとmain関数の実行を容易にする ArgumentsParserを拡張します。

object WordCount extends App with ArgumentsParser {
  private val LOG: Logger = LogUtil.getLogger(getClass)
  val RUN_FOR_EVER = -1

  override val options: Array[(String, CLIOption[Any])] = Array(
    "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)),
    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
  )

  def application(config: ParseResult) : StreamApplication = {
    val splitNum = config.getInt("split")
    val sumNum = config.getInt("sum")
    val partitioner = new HashPartitioner()
    val split = Processor[Split](splitNum)
    val sum = Processor[Sum](sumNum)
    val app = StreamApplication("wordCount", Graph[Processor[_ <: Task], Partitioner](split ~ partitioner ~> sum), UserConfig.empty)
    app
  }

  val config = parse(args)
  val context = ClientContext()
  val appId = context.submit(application(config))
  context.close()
}

options の値を上書きし、パースするコマンドライン引数の配列を定義します。マスターのホストとポート、splitとsumタスクの並行化、例をどれだけ長く実行するかを、アプリケーションのユーザに渡して欲しいです。オプションが必須 かどうかを指定し、幾つかの引数のdefaultValueも提供します。

コマンドライン引数のParseResultという条件で、SplitとSumプロセッサのためのTaskDescriptionを生成し、DAG APIを使ってそれらを HashPartitioner に接続します。グラフはAppDescrition にラップされます。これは最終的にマスターにサブミットされます。

アプリケーションをサブミットする

それらが全て終わった後で、全てをuber jarにパッケージ化し、そのjarをGearpumpクラスタにサブミットする必要があります。コマンドラインツールの構文についてはアプリケーション サブミットツール をチェックしてください。

上級のトピック

実際のアプリケーションについては、プロセッサ間で渡す独自のメッセージを定義する必要が確実にあるでしょう。Customized message needs customized serializer to help message passing over wire. シリアライザをカスタマイズする方法はこのガイド をチェックしてください。

非ストリーミングの使用のためのGearpump

Gearpump は基本のプラットフォームとして非ストリーミングアプリケーションを開発することもできます。分散シェルを開発するためにGearpumpを使う方法は このガイド を見てください。

TOP
inserted by FC2 system