中核となる概念

Kafka Streams はKafka内に格納されているデータの処理と解析のためのクライアントライブラリです。それは、イベント時間と処理時間の間の適切な区分、ウィンドウのサポート、およびアプリケーション状態の単純だが効果的な管理とリアルタイムのクエリのような、重要なストリーム処理の概念に基づいています。

Kafkaストリームは敷居が低いです: 1つのマシーン上で小さな規模の概念実装を素早く書いて実行することができます; そして大容量のプロダクションの作業へスケールアップするために複数のマシーン上でアプリケーションの追加のインスタンスを実行する必要があるだけです。KafkaストリームはKafkaの並行モデルを利用することで同じアプリケーションの複数のインスタンスのロードバランシングを透過的に処理します。

Kafkaストリームの幾つかの重要な部分:

  • 単純および軽量なクライアントライブラリとして設計されています。Javaアプリケーションに簡単に埋め込むことができ、ユーザがストリーミングアプリケーションのために持つ既存のパッケージ、配備およびオペレーションのツールと統合することができます。
  • 内部メッセージング層としてApache Kafka自身以外のシステムに外部依存しない; 特に、強く順番の保証をしながら水平スケール処理のためにKafkaのパーティション モデルを使用します。
  • 耐障害性ローカル状態をサポートします。これはウィンドウ化された結合と集約のようなとても高速で効率的なステートフル操作を可能にします。
  • 各レコードが1回、処理の途中でストリームクライアントあるいはKafkaブローカーのどちらかで障害ある場合でさえも1回だけ処理される確実に1回の処理のセマンティクスをサポートします。
  • ミリ秒の処理レイテンシを達成するために1度に1回の処理を最小し、順不同のレコードに対してイベント時間ベースのウィンドウ操作をサポートします。
  • 高レベルストリームDSL低レベルプロセッサAPIと同調して、必要なストリーム処理のプリミティブを提供します。

最初にKafkaストリームの主要な概念について要約します。

ストリーム処理のトポロジ

  • stream はKafkaストリームによって提供される最も重要な抽象化です: それは無限の絶え間ないデータセットの更新を表します。ストリームはデータレコードはキー-値ストアとして定義される、順番に並べられ、再生可能で、不変のデータレコードの耐障害性のある順列です。
  • ストリーム処理アプリケーションはKafkaストリームライブラリを利用するプログラムです。それは1つ以上のプロセッサのトポロジを使ってコンピュータ的なロジックを定義します。この時プロセッサのトポロジはストリーム(エッジ)によって接続されたストリームのプロセッサー(ノード)のグラフです。
  • ストリーム プロセッサrはプロセッサのトポロジ内のノードです; 1度に1つの入力レコードをトポロジ内のupstreamプロセッサから受け取り、それにオペレーションを適用し、その後で1つ以上の出力ストリームをdownstreamプロセッサに生成することによってストリーム内のデータを変換するためのステップの処理を表します。
トポロジには2つの特別なプロセッサがあります:
  • ソース プロセッサ: ソース プロセッサはupstreamプロセッサを持たない特別な型のストリームプロセッサです。1つ以上のKafkaトピックからレコードを消費しそれらをdownstreamプロセッサに転送することで、トポロジへの入力ストリームを生成します。
  • シンク プロセッサ: シンク プロセッサはdown-streamプロセッサを持たない特別な型のストリームプロセッサです。upstreamプロセッサからの全ての受信したレコードを特定のKafkaトピックに送信します。
通常のプロセッサ ノードでは、他のリモートシステムも現在のレコードの処理中にアクセスされるかもしれないことに注意してください。従って処理された結果はKafkaにストリームされるか、外部システムに書き込まれるかもしれません。

Kafkaストリームはストリーム処理トポロジを定義する2つの方法を提供します: KafkaストリームDSLはそのままの状態でmap, filter, join および aggregationsのような最も一般的なデータ転送操作を提供します; 低レベル プロセッサ API により開発者は state storesとやり取りするために独自のプロセッサを定義および接続することができます。

プロセッサのトポロジは単にシステム処理コードのための論理的な抽象化にすぎません。実行時に、論理的なトポロジはインスタンス化され並行処理のためにアプリケーションの内部にレプリケートされます (詳細はストリーム パーティションとタスクを見てください)。

時間

ストリーム処理で重要な特徴はtimeの記法と、それがどのようにモデル化され統合されるかです。例えば、windowingのような幾つかのオペレーションは時間の境界に基づいて定義されます。

ストリームでの一般的な時間の記法:

  • イベント時間 - イベントあるいはデータレコードが発生した時間の点。つまり元々"ソースでの"生成。例: もしイベントが車でのGPSセンサーによって報告された地理位置の変更の場合、関連するイベント時間はGPSセンサーが位置の変更を捕らえた時間でしょう。
  • 処理時間 - イベントあるいはデータレコードが結果的にストリーム処理アプリケーションによって処理された時間の点。つまり、レコードが消費された時間。処理時間は、元のイベント時間では無く、ミリ秒、時間、あるいは日などかもしれません。例: 車のセンサーから報告された地理位置のデータを航行管理ダッシュボードへ表示するために読み込みおよび処理する解析アプリケーションを考えてみてください。ここで解析アプリケーションでの処理時間は、イベント時間の後の、ミリ秒あるいは秒 (例えば、Apache KafkaおよびKafkaストリームに基づくリアルタイム パイプラインのため)、あるいは時間 (例えば、Apache HadoopあるいはApache Sparkに基づくバッチ パイプラインのため)かもしれません。
  • 摂取時間 - イベントあるいはデータレコードがKafkaブローカーによってトピックパーティションに格納された時間の点。イベント時間との違いは、レコードが"ソースで"生成された時ではなく、この摂取タイムスタンプはKafkaブローカーによってレコードが目的のトピックに追加された時に生成されるということです。処理時間との違いは、処理時間はストリーム処理アプリケーションがレコードを処理する時の処理時間ということです。例えば、 もしレコードが処理されない場合、それについての処理時間という表記は無いですが、摂取時間はまだあります。

イベント時間と摂取時間の間の選択は、実際には(Kafkaストリームではなく)Kafkaの設定によって行われます: Kafka 0.10.x 以降、タイムスタンプは自動的にKafkaメッセージに埋め込まれます。Kafkaの設定に依存して、これらのタイムスタンプはイベント時間あるいは摂取時間を表します。それぞれのKafka構成設定はブローカーレベルあるいはトピック単位で指定されるかもしれません。Kafkaストリームでのデフォルトのタイムスタンプのエクストラクタはこれらの埋め込みのタイムスタンプをそのまま扱うでしょう。従って、アプリケーションの効果的な時間のセマンティクスはこれらの埋め込みのタイムスタンプについての有効なKafka設定に依存します。

KafkaストリームはタイムスタンプTimestampExtractor インタフェースを使ってそれぞれのデータレコードに割り当てます。これらのレコード単位のタイムスタンプは時間についてはストリームの進捗を表し、ウィンドウ オペレーションのような時間に依存するオペレーションによって利用されます。結果として、この時間は新しいレコードがプロセッサに到着した時に進むでしょう。このデータ駆動の時間をアプリケーションが実際に実行される時の壁時計の時間と区別するために、アプリケーションのストリーム時間と呼びます。そのためTimestampExtractorインスタンスの具体的な実装はストリーム時間の定義に異なる瀬万tぇイクスを提供するでしょう。例えば、埋め込みタイムスタンプフィールドなどのデータレコードの実際の内容に基づいてタイムスタンプを取得あるいは計算してイベント時間のセマンティクスを提供し、現在の実時間を返すことで、処理時間をストリーム時間に譲り渡します。従って開発者は仕事の必要性に応じて異なる時間の記法を強制することができます。

結局、KafkaストリームアプリケーションがKafkaにレコードを書き込む時はいつでもこれらの新しいレコードにタイムスタンプも割り当てるでしょう。タイムスタンプが割り当てられる方法は内容に依存します:

  • 新しい出力が何からの入力レコードを処理することで生成される場合、例えばprocess() 関数呼び出しの中で引き起こされるcontext.forward()、出力のレコードのタイムスタンプは入力レコードのタイムスタンプから直接継承されます。
  • 新しいレコードはPunctuator#punctuate()のような定期的な関数によって生成される場合、出力のレコードのタイムスタンプはストリーム タスクの現在の間隔時間(context.timestamp()によって取得される)として定義されます。
  • 集約の場合、結果の更新レコードのタイムスタンプは、結果に関与する全ての入力レコードの最大のタイムスタンプになります。

#forward()の呼び出し時にタイムスタンプを明示的にレコードを出力するように割り当てることで、プロセッサAPI内でデフォルトの挙動を変更することができます。

集約と結合の場合、タイムスタンプは以下のルールを使って計算されます。

  • 左と右の入力レコードを持つ結合(ストリーム-ストリーム、テーブル-テーブル)の場合、出力レコードのタイムスタンプは max(left.ts, right.ts) に割り当てられます。
  • ストリーム-テーブル結合の場合、出力レコードはタイムスタンプからストリームレコードに割り当てられます。
  • 集約の場合、Kafka ストリームはグローバル(ウィンドウ化されていない場合)またはウィンドウごとに、キーごとに全てのレコードの max タイムスタンプも計算します。
  • ステートレス操作の場合、入力レコードタイムスタンプはそのままになります。For flatMap and siblings that emit multiple records, all output records inherit the timestamp from the corresponding input record.

ストリームとテーブルの双対性

実際にストリーム処理のユースケースを実装する場合、通常はstreamsdatabases の両方が必要です。実際にとても一般的な使用例は、顧客トランザクションの受信streamを、database tableからの最新の顧客情報で強化する電子商取引アプリケーションです。つまり、ストリームはどこにでもありますが、データベースもどこにでもあります。

従って全てのストリーム処理技術は、ストリームおよびテーブルのファーストクラス サポートを提供する必要があります。Kafkaのストリーム API は、streams および tables のコア抽象化を通じて、このような機能を提供します。これについては後で説明します。さて、実際にストリームとテーブルの間に密接な関係がある、いわゆるストリーム-テーブルの双対性がある、という興味深い観察結果があります。Kafkaはこの双対性を様々な方法で利用します: 例えば、アプリケーションを elastic にしたり、耐障害性のあるステートフル処理をサポートしたり、アプリケーションの細心の処理結果に対して対話的なクエリを実行したりします。内部使用以外にも、KafkaストリームAPIは開発者に独自のアプリケーション内でこの双対性を利用できるようにもします。

Kafkaストリームでのaggregations のような概念を議論する前に、まず tablesを詳細に紹介し、前述のストリーム-テーブルの双対性について話す必要があります。基本的に、この双対性はストリームをテーブルとして表示することができ、テーブルはストリームとして表示することができることを意味します。Kafka's log compaction feature, for example, exploits this duality.

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

The stream-table duality describes the close relationship between streams and tables.
  • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).
  • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table.

Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column).

Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. The stream-table duality is such an important concept that Kafka Streams models it explicitly via the KStream, KTable, and GlobalKTable interfaces.

集約

集約操作は1つの入力ストリームあるいはテーブルを受け取り、複数の入力レコードを1つの出力レコードに結合することで新しいテーブルを生成します。集約の例は、カウントまたは合計の計算です。

Kafka Streams DSLでは、aggregationの入力ストリームは KStream あるいは KTable ですが、出力ストリームは常に KTable です。これにより、Kafka ストリームは、値が生成されて発行された後、追加のレコードが順不同で到着した時に集約値を更新することができます。そのような順不同の到着が発生すると、集約 KStream または KTable は新しい集約値を発行します。出力は KTable であるため、新しい値は後続の処理ステップで古い値を同じキーで上書きすると見なされます。

ウィンドウ

ウィンドウ化により、aggregations あるいは joinsのようなステートフルな操作のために同じキーを持つレコードをグループ化してwindowsに入れる方法を制御できます。ウィンドウはレコードのキーごとに追跡されます。

ウィンドウ操作Kafka Streams DSLのなかで利用可能です。ウィンドウを使う場合は、ウィンドウの猶予期間を指定することができます。この猶予期間は Kafka ストリームが指定されたウィンドウの順不同データレコードをどれだけ待つかを制御します。もしレコードがウィンドウの猶予期間が過ぎた後で到着した場合、レコードは破棄され、ウィンドウ内で処理されません。具体的には、タイムスタンプがウィンドウに属している場合はレコードが削除されますが、現在のストリーム時間はウィンドウの終了と猶予期間を足したものよりも長くなります。

順不同のレコードは実世界で常にあり得るため、アプリケーションで適切に計上する必要があります。順不同レコードがどのように扱われるかは、有効な 時間セマンティクス に依存します。処理時間の場合、セマンティクスは "レコードが処理される時" です。順不同レコードの概念は定義上適用できないため、レコードは順不同にできません。従って、順不同のレコードはイベント時間の場合のみそのように見なされます。両方の場合で、Kafka Stream は適切に順不同レコードを処理することができます。

状態

幾つかのストリーム処理アプリケーションは状態を必要としません。このことはメッセージの処理は全ての他のメッセージの処理から独立していることを意味します。しかし状態を維持できるようにすることは洗練されたストリーム処理アプリケーションのための多くの可能性を広げます: 入力ストリームを結合、あるいはデータのレコードをグループ化および集約することができます。多くのそのようなステートフルのオペレーションがKafka ストリーム DSLで提供されています。

Kafka ストリームは状態ストアと呼ばれるものを提供します。これはストリーム処理アプリケーションによってデータを格納およびクエリするために使うことができます。これはステートフル オペレーションを実装する時に重要な機能です。Kafkaストリームでの各タスクは処理のために必要とされるデータの格納およびクエリのためにAPIを使ってアクセスできる1つ以上の状態ストアを埋め込みます。これらの状態ストアは永続的なキー-値ストア、インメモリのハッシュマップ、あるいは他の便利なデータ構造のいずれかでしょう。Kafkaストリームはローカル状態ストアのために対障害性および自動的な回復を提供します。

Kafkaストリームは、状態ストアを作成したストリーム処理アプリケーションの外部への直接の状態ストアの読み取り専用クエリを、メソッド、スレッド、プロセスあるいはアプリケーションによって可能にします。これはInteractive Queriesと呼ばれる機能を使って提供されます。全てのストアは名前が付けられ、Interactive Queries は背後にある実装の読み込み操作のみを公開します。


処理の保証

ストリーム処理で最も良く聞かれる質問の1つが、"処理の途中でなんらかの障害にあった時でもストリーム処理システムは各レコードが1度そして一度だけ処理される保証がありますか?" です。確実に1回のストリーム処理の保証の失敗はデータの喪失あるいはデータの重複に耐えられない多くのアプリケーションにとって合意を壊すもので、ストリーム処理パイプラインに加えてバッチ指向のフレームワークが通常使われます。Lambda Architectureとして知られています。0.11.0.0より前は、Kafkaは少なくとも1回の配送保証のみを提供し、従ってバックエンド ストレージとしてそれを利用するストリーム処理システムは end-to-end の確実に1回のセマンティクスを保証できませんでした。実際、確実に1回の処理をサポートすると主張するそれらのストリーム処理システムについて、それらがソース/シンクとしてKafkaから読み込み/へ書き込みをしている限り、それらのアプリケーションは実際にはパイプライン全体に渡って重複が生成されない保証はできません。
0.11.0.0 リリースから、Kafkaはプロデューサがトランザクション的および等冪的な方法で異なるトピックパーティションにメッセージを送信できるようにサポートを追加し、Kafkaストリームは今後これらの機能を使ってend-to-endの確実に1回のセマンティクスを追加しました。もっと具体的には、ソースKafkaトピックから読み込まれたレコードについて、その処理結果は出力のKafkaトピックの中とステートフルオペレーションのための状態ストアの中で確実に1回繁栄されるでしょう。Kafkaストリームのend-to-endの確実に1回の保証と、他のストリーム処理のフレームワークが主張する保証との主要な違いは、Kafkaストリームは厳重に背後のKafkaストレージシステムと統合し、入力トピックのオフセット上のコミット、状態ストア上の更新、そして出力トピックへの書き込みが、副作用があるかもしれない外部システムとしてKafkaを使う代わりに自動的に完了されるだろうということです。これが Kafka ストリームでどのように行われるかの詳細については、KIP-129を見てください。
2.6.0 リリース以降、Kafka ストリームは、ブローカーバージョン 2.5.0 以降を必要とする "exactly-once beta" という名前の確実に1回の処理の改良された実装をサポートします。この実装は、クライアントスレッドと使用されたネットワーク接続のようなクライアントとブローカのリソース使用率を削減し、スループットの向上とスケーラビリティの向上を可能にするため、より効率的です。これが、ブローカと Kafka ストリーム内でどのように行われるかの詳細については、KIP-447 を見てください。
Kakfa ストリームアプリケーションの実行時に確実に1回の実行のセマンティクスを有効にするには、processing.guarantee 構成値(デフォルト値は at_least_once) を exactly_once (ブローカバージョン 0.11.0 以降が必要) またはexactly_once_beta (ブローカバージョン 2.5 以降が必要)をせっていします。詳細は、Kafka ストリーム構成セクションを見てください。

順不同の処理

各レコードが確実に1回だけ処理されるという保証に加えて、多くのストリームアプリケーションが直面する別の問題は、ビジネス ロジックに影響を与える可能性のある順不同のデータをどう扱うかということです。Kafka ストリームでは、タイムスタンプに関して順不同のデータが到着する可能性がある2つの原因があります:

  • トピックパーティション内では、レコードのタイムスタンプはオフセットと共に単調に増加しない場合があります。Kafkaストリームは常にオフセットの順番に従うためにトピックパーティション内のレコードを処理しようとするため、同じトピックパーティション内の小さいタイムスタンプ (ただし大きなオフセット)よりも大きなタイムスタンプ(ただし小さなオフセット)が早く処理される可能性があります。
  • 複数のトピックパーティションを処理する可能性のある ストリーム タスク内で、ユーザがバッファデータが含まれるのを待つために全てのパーティションを待たず、次のレコードを処理するために最小のタイムスタンプを持つパーティションから選択する場合、その後に一部のレコードが他のトピックパーティションのためにフェッチされると、それらのタイムスタンプは別のトピックパーティションからフェッチされた処理済みのレコードよりも小さくなるかもしれません。

ステートレス操作の場合、過去に処理されたレコードの履歴を調べず、1度に1つのレコードのみが考慮されるため、順不同のデータは処理ロジックに影響しません; しかし、集約および結合のようなステートフル操作の場合、順不同のデータは処理ロジックが不正確になる可能性があります。ユーザがそのような順不同のデータを処理したい場合は、一般的に待機時間中に状態を記録しながらアプリケーションがより長い時間待つことを許可する必要があります。つまり、待ち時間、コストおよび正確さの間でトレードオフの決定を行います。特にKafkaストリームでは、ユーザがそのようなトレードオフを実現するためにウィンドウ集計用にウィンドウ操作を設定することができます (詳細は 開発者ガイドで見つけることができます)。結合については、ユーザはストリームの待ち時間およびコストを増やすことで、順不同のデータの一部を処理することができないことに注意しなければなりません:

  • ストリーム-ストリーム結合の場合、3つのタイプ(inner, outer, left) は全て順不同のレコードを正しく処理しますが、結果のストリームには、left結合の場合には不要な左レコードのnull、outer結合の場合は左レコードのnullあるいは右レコードのnullを含む可能性があります。
  • ストリーム-テーブル結合の場合、順不同のレコードは処理されません (つまり、ストリームアプリケーションは順不同レコードをチェックせず、全てのレコードをオフセット順に処理するだけです)。従って予測できない結果が生じる可能性があります。
  • テーブル-テーブル結合の場合、順不同のレコードは処理されません (つまり、ストリームアプリケーションは順不同レコードをチェックせず、全てのレコードをオフセット順に処理するだけです)。ただし、結合結果は変更ログのストリームのため、結果的に一貫性が保たれます。
inserted by FC2 system