信頼できるメッセージ配送
少なくとも一回のメッセージ配送とは何か?
メッセージはネットワークの分割のために配送途中で紛失することが有り得ます。少なくとも一回のメッセージ配送 (at least once) は、at least one がワークフローに処理されackされるように紛失されたメッセージが1度以上配送されることを意味します。
Gearpump は以前のタイムスタンプからメッセージを再生できるどのようなソースに対しても、少なくとも一回を保証します。Gearpumpでは、各メッセージはタイムスタンプでタグ付けされ、システムは全ての待っているメッセージの最小のタイムスタンプを追跡します(グローバルな最小クロック)。メッセージが喪失した場合にアプリケーションはグローバルな最小のクロックに再起動するでしょう。ソースはグローバルな最小のクロックから再生できるため、再起動前の全ての待っているメッセージが再生されるでしょう。Gearpump calls that kind of source TimeReplayableSource
and already provides a built in KafkaSource. KafkaSourceにデータをGearpumpに取り込むようにすると、ユーザは少なくとも一回のメッセージの配送を保証されます。
確実に一回のメッセージの配送とは何か?
少なくとも一回の配送はアプリケーションの結果の正確さを保証しません。例えば、受信したメッセージの数を保持するタスクについて、重複したメッセージを超過カウントし、その数はタスクの障害時に紛失されます。そのような場合、確実に一回のメッセージの配送 (exactly once) が必要とされ、確実に一回iのメッセージによって状態が更新されます。これはさらに重複したメッセージがフィルタで除外され、メモリ内の状態が維持されることを必要とします。
データを取り込むためにTimeReplayableSource
を使用し、メモリの状態内でそれらを管理するためにPersistent APIを使う場合に、Gearpump内で確実に一回が保証されます。Persistent API を使って、ユーザの状態は定期的にチェックポイントタイムと一緒にpersistentストア(例えば HDFS)にチェックポイントされます。Gearpump は全ての待っている状態の最小のチェックポイントタイムスタンプ(グローバル最小チェックポイントクロック)を追跡します。これも残ります。アプリケーションの再起動時には、システムはグローバル最小チェックポイントクロックでの状態を回復し、ソースはそのクロックからメッセージを再生します。これによりメッセージが全ての状態を確実に一回更新することを確実にします。
Persistent API
Persistent API PersistentTask
とPersistentState
からできています。
以下はそれらを使ってやってくるメッセージの数を維持する例です。
class CountProcessor(taskContext: TaskContext, conf: UserConfig)
extends PersistentTask[Long](taskContext, conf) {
override def persistentState: PersistentState[Long] = {
import com.twitter.algebird.Monoid.longMonoid
new NonWindowState[Long](new AlgebirdMonoid(longMonoid), new ChillSerializer[Long])
}
override def processMessage(state: PersistentState[Long], message: Message): Unit = {
state.update(message.timestamp, 1L)
}
}
CountProcessor
はPersistentTaks
によって管理されるだろうカスタマイズされたPersistentState
を生成し、新しいメッセージでどうやって状態を更新するかを定義する processMessage
メソッドを上書きします(新しい各メッセージは1
としてカウントされます、これは既存の値に追加されます)
Gearpumpはすでに二種類の状態を提供しています
- NonWindowState - 時間あるいは他の境界が無い状態
- WindowState - 各状態が時間ウィンドウによって制限されている
それらはモノイド則を満たす状態を対象としています。
+
のようなバイナリ結合演算子を持ち、0
のような単位元を持ちます
上の例では、便利なモノイドの一群を提供するTwitter’s Algebirdライブラリから、longMonoid
を利用します。