構造化ストリーミングプログラミングガイド
- 概要
- クリックな例
- プログラミング モデル
- データセットとデータフレームを使うAPI
- 連続的な処理
- 追加の情報
概要
構造化ストリーミングはSpark SQLエンジン上に構築されたスケーラブルで耐障害性のあるストリーミング処理エンジンです。静的なデータ上のバッチ計算を表すのと同じ方法でストリーミングの計算を表現することができます。Spark SQL エンジンはそれを逐次および継続的に実行し、最終的な結果を到着し続けるストリーミングデータとして更新するように注意するでしょう。ストリーミング集約、イベントタイム ウィンドウ、ストリーム-to-バッチ joinなどを表現するために、Scala、Java、PythonあるいはRでDataset/DataFrame APIを使うことができます。計算は同じ最適化されたSpark SQLエンジン上で実行されます。結果的に、システムはチェックポイントと先行書き込みログを使って end-to-endの確実に一回の耐障害性を保証します。簡単に言うと、ユーザがストリーミングについて推論する必要無く、高速、スケーラブル、耐障害性、end-to-endの確実に一回のストリーミング処理を提供します。
デフォルトでは、構造化ストリーミング クエリは内部的にmicro-batch processingエンジンを使って処理されます。これはデータ巣トリムを小さなバッチジョブとして処理し、従って100ミリ秒ぐらいの end-to-endレイテンシと確実に1回の耐障害性の保証を実現します。しかし、Spark 2.3 から連続処理と呼ばれる新しい低レンテンシの処理モードを導入しました。これは少なくとも一回の保証の1ミリ秒ぐらいのend-to-endレイテンシを実現することができます。クエリ内の Dataset/DataFrame 走査の変更無しで、アプリケーションの要求に応じてモードを選択することができるでしょう。
このガイドでは、プログラミングモデルとAPIを渡り歩いてみようと思います。ほとんどはデフォルトのマイクロバッチ処理モデルを使って概念を説明し、後で連続処理モデルについて議論しようと思います。まず、構造化ストリーミング クエリの簡単な例 - ストリーミング ワード カウント から始めましょう。
クリックな例
TCPソケット上でlistenしているデータサーバから受け取ったテキストデータの単語のカウントの実行を続けたいとします。構造化ストリーミングを使ってこれをどう表現できるかを見てみましょう。完全なコードはScala/Java/Python/Rで見ることができます。そしてSparkをダウンロードした場合は、直接例を実行することができます。どの場合でも、一歩ずつ例を見てそれがどのように動くかを理解しましょう。まず、必要なクラスをインポートし、Sparkに関する全ての機能の開始点となるローカルSparkセッションを作成しなければなりません。
次に、localhost:9999上でlistenするサーバから受け取るテキストデータを表現するストリーミングデータフレームを作成して、word countを計算するためにデータフレームを変換してみましょう。
このlines
データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、各行を複数の単語に分割するためにflatMap
を適用できるように、.as[String]
を使ってデータフレームを文字列のデータセットに変換しました。結果のwords
データセットは全ての単語を含みます。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts
データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。
このlines
データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、各行を複数の単語に分割するためにflatMap
を適用できるように、.as(Encoders.STRING())
を使ってデータフレームを文字列のデータセットに変換しました。結果のwords
データセットは全ての単語を含みます。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts
データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。
このlines
データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、二つの組み込みのSQL関数を使いました - 各行をそれぞれの単語を持つ複数の行に分割する split と explode。更に、新しいカラムに“word”という名前を付けるために関数alias
を使います。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts
データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。
このlines
SparkDataFrameはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、二つのSQL関数を持つSQL表現があります - 各行をそれぞれの単語を持つ複数の行に分割する split と explode。更に、新しいカラムに “word” という名前を付けます。最後に、SparkDataFrame内のユニークな値によってグルーピングし、それらをカウントすることで、wordCounts
SparkDataFrame を定義しました。これはストリームの実行中のword countを表すSparkDataFrameであることに注意してください。
これで、ストリームデータ上にクエリを設定しました。最後に残っているのは、実際にデータの受信を開始し、カウントを計算することです。これをするために、カウントの完全なセット(outputMode("complete")
によって指定されます)が更新される度に、コンソールにそれらの出力を設定します。そして、start()
を使ってストリーミングの計算を開始します。
コードが実行された後で、ストリーミングの計算がバックグラウンドで開始されるでしょう。query
オブジェクトは有効なストリーミングのクエリへのハンドラで、クエリが有効な間プロセスが終了することを避けるために awaitTermination()
を使ってクエリの終了を待つと決めました。
この例のコードを実際に実行するために、独自の Spark アプリケーション内でコードをコンパイルするか、一旦Sparkをダウンロードして単純に例を実行することができます。後者を示します。以下のようにして最初にNetcat(ほとんどのUnix系のシステムで見つけることができる小さなユーティリティ)をデータサーバとして実行する必要があるでしょう。
$ nc -lk 9999
それから、違うターミナルで、以下のようにして例を実行することができます。
それから、netcatサーバを実行しているターミナルの中で入力される全ての行がカウントされ、毎秒ごとに画面に出力されるでしょう。それは以下のように見えるでしょう。
|
プログラミング モデル
構造化ストリーミングの重要なアイデアは、活気のあるデータストリームを連続して追加されるテーブルとして扱うことです。これによりバッチ処理モデルにとても良く似た新しいストリーム処理となります。ストリーミング計算を静的なテーブル上の標準的なバッチのようなクエリとして表し、Sparkはそれを無限の入力テーブル上の 増加するクエリとして実行するでしょう。このモデルをもっと詳細に理解しましょう。
基本概念
入力データストリームを “入力 テーブル”と見なします。ストリーム上で到着する各データ項目は、入力テーブルに追加される新しい行のようなものです。
入力上のクエリは “結果テーブル”を生成するでしょう。各引き起こされる間隔(例えば、各1秒)ごとに、新しい行が入力テーブルに追加されます。これは結果的に結果テーブルを更新します。結果テーブルが更新されるといつも変更された結果の行を外部のsinkに書き込みたいでしょう。
“出力” は外部ストレージに書き込まれたものとして定義されます。出力は異なるモードで定義することができます:
-
完全モード - 更新された結果テーブル全体は外部ストレージに書き込まれるでしょう。テーブル全体の書き込みをどう扱うかはストレージのコネクタによります。
-
追加モード - 最後のトリガーから結果テーブルに追加された新しい行だけが外部ストレージに書き込まれるでしょう。これは結果テーブルの既存行が変更されないだろうクエリ上でのみ適用可能です。
-
更新モード - 最後のトリガーから結果テーブルに更新された行だけが外部ストレージに書き込まれるでしょう(Spark 2.1.1から利用可能)。このモードは最後のトリガーから変更された行のみ出力する点で、完全モードとは異なることに注意してください。クエリが集約を含まない場合、追加モードと同じです。
各モードはクエリの特定の型に適用可能です。詳細は後で議論されます。
このモデルの使い方を説明するために、以前のQuick Exampleのコンテキストのモデルを理解しましょう。最初のlines
データフレームは入力テーブルで、最後のwordCounts
データフレームは結果テーブルです。wordCounts
を生成するためのストリーミングlines
データフレーム上のクエリは、それが静的なデータフレームだろうということで、完全に同じです。しかし、このクエリが開始されると、Sparkはソケット接続から新しいデータを連続して調べるでしょう。以下で示すように、もし新しいデータがあれば、Sparkは更新カウントを計算するために、前の実行中のカウントと新しいデータを組み合わせる “incremental” クエリを実行するでしょう。
構造化ストリーミングは表全体を実現しないことに注意してください。ストリーミングのデータソースから利用可能な最新のデータを読み込み、結果を更新するために逐次的に処理し、それからソースのデータを破棄します。結果を更新するために必要とされる最小限の中間のstateデータのみを保持します (例えば、前の例の中間カウント)。
このモデルは他の多くのストリーミング処理エンジンと極めて異なります。多くのストリーミングシステムはユーザが実行中の集約を自身で保持することを必要とします。従って耐障害性およびデータの一貫性(少なくとも1回、あるいは最大で1回、あるいは確実に1回)について判断しなければなりません。このモデルでは、新しいデータがある場合Sparkは結果テーブルを更新する責任があります。従ってユーザがそれについて推論する必要がありません。例として、このモデルがどうやってイベント時間に基づいた処理と後でやってくるデータを扱うかを見てみましょう。
Event-time と Late Dataの扱い
イベント時間はデータ自身の中に埋め込まれている時間です。多くのアプリケーションで、このイベント時間上で操作したいと思うかも知れません。例えば毎秒IoTデバイスによって生成されるイベントの数を取得したい場合、Sparkが受け取った時間よりもおそらくデータが生成された時間を使いたいでしょう。このイベント時間はこのモデルでとても自然に表現されます – デバイスからの各イベントは表内の行で、イベント時間は行内の列の値です。これにより、ウィンドウに基づく集約(例えば、各秒毎のイベントの数)がイベント時間の列上の単なるグルーピングと集約の特別な形になります – 各時間ウィンドウはグループで、各行は複数のウィンドウ/グループに所属するかも知れません。従って、そのようなイベント時間のウィンドウに基づいた集約のクエリは、静的なデータセット(例えば、デバイスのイベントログを集めたもの)とデータストリームの両方で首尾一貫して定義することができ、ユーザの生活をもっと簡単にします。
更に、このモデルは本質的にイベント時間に基づいて期待したよりも遅く到着したデータを処理します。Sparkは結果テーブルを更新するため、遅れたデータがある場合は古い集約の更新と、中間状態データのサイズを制限するために古い集約の掃除について完全な制御をします。Spark 2.1からは、ユーザが遅れたデータの閾値を指定することができるウォーターマークをサポートし、そのためエンジンが古い状態を掃除することができます。後でウィンドウ操作 の章の中で詳細が説明されます。
耐障害性semantics
端から端まで確実に1つのセマンティクスの実現が構造化ストリーミングの設計の背景にある主要な目的の一つです。これを実現するために、再起動および/あるいは再処理によるあらゆる種類の障害を処理できるように、処理の確実な進捗を信頼して追跡できるように構造化ストリーミング ソース、シンクおよび実行エンジンを設計しました。各ストリーミングソースはストリーム内で読み込みの場所を追跡するためにオフセット(Kafkaのオフセット、あるいはKinesisのシーケンス番号に似ています)を持つと仮定します。エンジンは各トリガー内で処理されるデータのオフセット範囲を記録するために、チェックポイントと先行書き込みログを使用します。ストリーミング シンクは再処理の扱いが等冪であるように設計されています。再生ソースと等冪シンクの使用と合わせて、構造化ストリーミングはどのような障害下でも 端から端まで確実に一回のセマンティクス を保証することができます。
データセットとデータフレームを使うAPI
Spark 2.0から、データフレームとデータセットはストリーミング、境界無しデータと同様に、静的、境界ありのデータを表すことができます。静的なデータセット/データフレームと似て、ストリーミング データフレーム/データセットをストリーミングソースから生成するために、一般的なエントリポイントSparkSession
(Scala/Java/Python/R ドキュメント) を使うことができ、それらに同じオペレーションを静的なデータフレーム/データセットとして適用することができます。データセット/データフレームに馴染みが無い場合は、DataFrame/Dataset プログラミング ガイドを使ってそれらに慣れることを強くお勧めします。
ストリーミングデータフレームとストリーミングデータセットの生成
ストリーミングデータフレームはSparkSession.readStream()
によって返されるDataStreamReader
インタフェース(Scala/Java/Python ドキュメント) を使って生成されるかも知れません。Rでは、read.stream()
メソッドを使います。静的データフレームを生成するための読み込みインタフェースと似て、ソースの詳細 – データフォーマット、スキーマ、オプションなどを指定することができます。
入力ソース
2,3の組み込みのソースがあります。
- ファイルソース - ディレクトリ内に書き込まれたファイルをデータのストリームとして読み込みます。ファイルは修正時間の順番に処理されます。もし
latestFirst
が設定された場合は、順番が保持されます。サポートされるファイルのフォーマットは、CSV, JSON, ORC, Parquet です。最新のリスト、および各ファイルフォーマットでサポートされるオプションのために、DataStreamReader インタフェースのドキュメントを見てください。ファイルは自動的に指定されたディレクトリに配置されなければならない事に注意してください。それらはファイルの移動操作によって行うことができます。 -
Kafka ソース - Kafkaからデータを読み込みます。Kafka ブローカーのバージョン 0.10.0 以上と互換性があります。詳細はKafka 統合ガイドを見てください。
-
ソケット ソース (テストのため) - ソケット接続からUTF8テキストデータを読み込みます。listenしているサーバソケットがドライバです。これは端から端までの耐障害性の保証を提供しないため、テストのためだけに使われるべきだということに注意してください。
- ソースのレート (テストのため) - 秒間あたり指定された数の行でデータを生成します。出力の各行は
timestamp
とvalue
を含みます。timestamp
はメッセージの発行の時間を含むTimestamp
型で、value
は最初の行として0から始まるメッセージのカウント を含むLong
型です。このソースはテストとベンチマークを目的としています。
幾つかのソースは、障害の後でのチェックポイント オフセットを使ったデータの再生がされないかも知れないので、耐障害性がありません。耐障害性のセマンティクスの前の章を見てください。Sparkの全てのソースの詳細がここにあります。
ソース | オプション | 耐障害性 | 備考 |
---|---|---|---|
ファイル ソース |
path : 入力ディレクトリへのパス。全てのファイルフォーマットで共通です。maxFilesPerTrigger : 各トリガーごとに判断する新しいファイルの最大の数 (デフォルト: 上限無し)latestFirst : 最新の新しいファイルを最初に処理するかどうか。大きなバックログファイルがある場合に便利です (デフォルト: false)latestFirst : フルパスの代わりにファイル名だけに基づいて新しいファイルをチェックするかどうか (デフォルト: false)。これを `true` にすることで、ファイル名"dataset.txt"が同じなので、以下のファイルは同じファイルとみなされるでしょう: maxFileAge : 無視されるまでの、このディレクトリにあるファイルの最大保持期間。最初のバッチでは、全てのファイルが有効とみなされます。latestFirst が `true` に設定され、maxFilesPerTrigger が設定された場合は、有効で処理する必要がある古いファイルが無視される可能性があるため、このパラメータは無視されます。最大経過時間は、現在のシステムのタイムスタンプではなく、最新のファイルのタイムスタンプを基準に指定されます。(デフォルト: 一週間)"file:///dataset.txt" "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" cleanSource : 処理の後で完了したファイルを掃除するオプション。利用可能なオプションは、"archive", "delete", "off" です。オプションが指定されていない場合、デフォルト値は "off" です。 "archive" が指定された場合、追加のオプション sourceArchiveDir も指定される必要があります。"sourceArchiveDir" の値は、深さ (ルートディレクトリからのディレクトリの数)のソースパスと一致してはなりません。ここで深さは両方のパスの深さの最小です。これにより、圧縮ファイルは新しいソースファイルとして含まれなくなります。例えば、ソースパターンとして '/hello?/spark/*' を指定すると、'/hello?/spark/*' と '/hello1/spark/archive' は一致しないため、'/hello1/spark/archive/dir' は "sourceArchiveDir" の値として使えません。'/hello1/spark' も、'/hello?/spark' と '/hello1/spark' は一致しないため、"sourceArchiveDir" の値として使えません。'/archived/here' は一致しないため、問題ありません。Spark は、独自のパスを考慮してソースファイルを移動します。例えば、ソースファイルのパスが /a/b/dataset.txt で、圧縮ディレクトリのパスが /archived/here の場合、ファイルは /archived/here/a/b/dataset.txt に移動されます。注意: 圧縮(移動による)と完了ファイルの削除の両方で、各マイクロバッチにオーバーヘッド(別のスレッドで発生している場合でも速度が低下します)が発生するため、このオプションを有効にする前に、ファイルシステムの各操作のコストを理解する必要があります。一方、このオプションを有効にすると、コストのかかる操作になる可能性があるソースファイルを一覧表示するコストが削減されます。 完了したファイルクリーナーで使われるスレッドの数は、 spark.sql.streaming.fileSource.cleaner.numThreads (デフォルト: 1) で設定できます。注意 2: このオプションを有効にする場合は、ソースパスを複数のソースあるいはクエリから使わないでください。同様に、ソースパスがファイルストリームシンクの出力ディレクトリのどのファイルとも一致しないことを確認してください。 注意 3: 削除アクションと移動アクションはどちらもベストエフォートです。ファイルの削除あるいは移動に失敗しても、ストリーミングクエリは失敗しません。Spark は、特定の状況で一部のソースファイルをクリーンアップしない場合があります - 例えば、アプリケーションは正常にシャットダウンせず、多くのファイルがクリーンアップのためにキューに入れられます。 ファイル形式に固有のオプションについては、 DataStreamReader の関連メソッドを見てください (Scala/Java/Python/R)。例えば、"parquet" 形式のオプションについては、DataStreamReader.parquet() を見てください。更に、特定のファイル形式に影響するセッション設定があります。詳細はSQL プログラミング ガイド を見てください。例えば、"parquet" については Parquet の設定を見てください。 |
Yes | globパスをサポートしますが、複合的なカンマ区切りのパス/globはサポートしません。 |
ソケット ソース |
host : 接続するホスト。指定されなければなりませんport : 接続するポート。指定されなければなりません
|
いいえ | |
レートのソース |
rowsPerSecond (例えば 100、デフォルト: 1): 秒間あたりどれだけの行が生成されなければならないか。rampUpTime (例えば 5秒、デフォルト: 0s): 生成のスピードがrowsPerSecond になるまで立ち上げにどれだけかかるか。秒より細かい粒度の使用は整数の秒に切り捨てられるでしょう。numPartitions (例えば 10、デフォルト: Sparkのデフォルトの並行度): 生成された行のためのパーティション数。ソースは rowsPerSecond に達するように最善を尽くすでしょうが、クエリはリソースに制限されるかもしれず、numPartitions は望ましいスピードに達するのを手助けするために調整することができます。
|
Yes | |
Kafka ソース | Kafka 統合ガイドを見てください。 | Yes | |
幾つかの例です。
これらの例は型無しのストリーミング データフレームを生成します。データフレームのスキーマはコンパイル時にチェックされず、クエリがサブミットされた時にのみ実行時にチェックされることを意味します。map
, flatMap
などの幾つかの操作は、コンパイル時に型が知られている必要があります。そうするために、これらの型無しのストリーミング データフレームを、静的なデータフレームとして同じメソッドを使って型有りのストリーミング データセットに変換することができます。詳細はSQL プログラミング ガイド を見てください。更に、サポートされるストリーミングソースについて更に詳しくこのドキュメントの後で議論されます。
スキーマ推論とストリーミング データフレーム/データセットのパーティション
デフォルトで、ファイルベースソースの構造化ストリーミングは、自動的に推測するためにSparkに頼るではなく、スキーマを指定することを必要とします。この制限は、障害時の場合においても、ストリーミング クエリのために一貫したスキーマが使われるだろうことを保証します。その場限りのために、spark.sql.streaming.schemaInference
を true
に設定することでスキーマの推測を再び有効にすることができます。
/key=value/
という名前のサブディレクトリが存在する場合はパーティションの調査は起こらず、リスト化はこれらのディレクトリ内に再帰的に起こるでしょう。これらのカラムがユーザが提供したスキーマの中で現れる場合、それらはファイルが読み込まれたパスに基づいたSparkによって情報が与えられるでしょう。パーティションスキーマを構成するディレクトリは、クエリが開始される時に存在しなければならず、静的なままでなければなりません。例えば、/data/year=2015/
が存在する時に/data/year=2016/
を追加することは問題無いですが、パーティションカラムを変更(つまり、/data/date=2016-04-17/
ディレクトリを生成)することは正しくありません。
ストリーミング データフレーム/データセット での操作
ストリーミング データフレーム/データセット に全ての種類の操作を適用することができます – 型無し SQL風の操作 (例えば select
, where
, groupBy
) から型有りのRDD風の操作 (例えば map
, filter
, flatMap
)まで。詳細はSQL プログラミング ガイド を見てください。使用できる2,3の操作の例を見てみましょう。
基本的なオペレーション - 選択、射影、集約
データフレーム/データセット上のほとんどの共通の操作がストリーミングのためにサポートされます。サポートされない少数の操作についてはこの章の後で議論されます。
You can also register a streaming DataFrame/Dataset as a temporary view and then apply SQL commands on it.
DataFrame/Dataset がストリーミングデータを持つかどうかdf.isStreaming
を使って識別することができることに注意してください。
イベントタイムのウィンドウ操作
スライドするイベント時間のウィンドウ上での集約は構造化ストリーミングでは簡単で、グループ化集約にとても似ています。グループ化された集約では、集約値(例えば count)はユーザ定義のグループ化カラムの中のそれぞれのユニークな値について保持されます。ウィンドウベースの集約の場合、集約値はイベント時間の行が分類される各ウィンドウについて保持されます。これを図を使って理解しましょう。
手っ取り早い例が修正され、ストリームの行は行が生成された時に時間とともに含まれると仮定します。word countを実行する代わりに、各5秒ごとに更新する10秒のウィンドウの中でのwordを数えたいとします。つまり、12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 などの10秒のウィンドウの間で受信した複数のwordの中のwordをカウントします。12:00 - 12:10 は12:00以降で12:10より前に到着したデータを意味することに注意してください。ここで、12:07に到着したwordを考えてみましょう。このwordは二つのウィンドウ 12:00 - 12:10 と 12:05 - 12:15 に対応するカウントを増やす必要があります。つまり、そのカウントはグループキー (つまり word) とウィンドウ(イベント時間から計算することができます)の両方によってインデックスされるでしょう。
結果のテーブルは以下のようなものになるでしょう。
このウィンドウはコード上はグルーピングに似ているため、ウィンドウされた集約を表現するためにgroupBy()
および window()
操作を使うことができます。以下の例のScala/Java/Pythonでの完全なコードを見ることができます
Late Dataとウォーターマークの扱い
ここで、イベントの一つがアプリケーションに遅れて到着すると何が起きるかを考えてみましょう。例えば、12:04(つまりイベント時間)に生成されたwordがアプリケーションによって12:11に受け取られるかも知れません。アプリケーションはウィンドウ 12:00 - 12:10
の古いカウントを更新するために12:11ではなく、12:04を使うべきです。これはウィンドウベースのグルーピングでは普通に起こります - 構造化ストリーミングは、以下で示すように遅れたデータが古いウィンドウの集約を正しく更新することができるように、長い時間部分的な集約について中間状態を維持することができます。
しかし、このクエリを数日の間実行するには、システムが計算する中間のメモリの状態の量を制限することが必要です。このことは、アプリケーションが集約についてもう遅れたデータを受け取らないだろうということで、システムが古い集約をメモリの状態から削除することができるかを知る必要があることを意味します。これを有効にするには、Spark 2.1でwatermarkingを導入しました。これはエンジンが自動的にデータ内の現在のイベント時間を追跡し、古い状態を適宜掃除します。イベント時間のカラムと、イベント時間の点でデータがどれくらい遅れるかを予想をする閾値を指定することでクエリのwatermarkを定義することができます。時間 T
で終わる特定のウィンドウについて、エンジンは状態を維持し、遅れたデータが(エンジンによって観測された最大のイベント時間 - 遅延の閾値 > T)
まで状態を更新することができるでしょう。別の言い方をすると、閾値内の遅れたデータは集約されますが、閾値より遅れたデータは取り零され始めるでしょう (確実な保証については後のの章をみてください)。例を使ってこれを理解してみましょう。以下で示すようにwithWatermark()
を使って以前の例でのwatermarkを簡単に定義することができます。
この例では、カラム“timestamp”の値にクエリのウォーターマークを定義し、データがどれだけ遅れることができるかの閾値を “10 分” に定義しました。このクエリがUpdate出力モード(後で出力モード の章で議論されます)で実行された場合、エンジンはウィンドウがウォーターマークより古くなるまでResultテーブル内のウィンドウの数を更新し続けるでしょう。カラム “timestamp” の現在のイベントタイムからの遅れは10分です。図での説明です。
図で示されるように、エンジンによって追跡される最大のイベント時間はblue dashed lineで、ウォーターマークは各トリガーの開始時に (max event time - '10 mins')
に設定されます。例えば、エンジンがデータ (12:14, dog)
を観測すると、次のトリガーのためのウォーターマークを 12:04
に設定します。このウォーターマークは遅れたデータがカウントされるようにエンジンが更に10分間中間状態を維持します。例えば、データ(12:09, cat)
は順番がバラバラで遅れており、ウィンドウ12:00 - 12:10
と 12:05 - 12:15
にあります。従って、それはトリガーのウォーターマーク12:04
の前にまだあり、エンジンは状態として中間カウントをまだ維持していて、現在のところ関連するウィンドウのカウントを更新します。しかし、ウォーターマークが12:11
に更新された場合、ウィンドウ(12:00 - 12:10)
の中間状態が消去され、全てのそれに続くデータ(例えば
(12:04, donkey)
)は“遅すぎる” と見なされ、従って無視されます。各トリガーの後で、更新されたカウント(つまり purple rows) はUpdateモードで命令されたようにトリガーの出力としてsinkに書き込まれます。
f幾つかのsink (例えば ファイル) はUpdateモードが必要とするfine-grained updateをサポートしないかも知れません。それらと連携するために、final countsだけがsinkに書き込まれるAppendモードをサポートします。これは以下で説明されます。
非ストリーミングデータセット上でwithWatermark
を使うと何もしないことに注意してください。ウォーターマークはどのようなやり方でもどのバッチクエリにも影響すべきではないため、それを直接無視するでしょう。
前のUpdateモードと似ていて、エンジンは各ウィンドウのための中間カウントを維持します。しかし、部分的なカウントはResultテーブルへ更新されず、sinkに書き込まれません。エンジンは遅い日付がカウントされるように “10 分” 待ち、それから ウィンドウ < ウォーターマーク の中間状態を削除し、最終のカウントをResultテーブル/sinkに追加します。例えば、ウィンドウ 12:00 - 12:10
の最終カウントはウォーターマークが 12:11
に更新された後でのみResultテーブルに追加されます。
ウォーターマークが集約状態を掃除する条件
以下の条件がウォーターマークが集約クエリ内の状態を掃除するために満たされるべきだと注意することが重要です (Spark 2.1.1 の時点の題名、将来には変わります)
-
Output モードは Append あるいは Update でなければなりません。 Complete モードは全ての集約データが保持されることを必要とし、従って中間データを削除するためにウォーターマークを使うことができません。各出力モードのセマンティクスの詳細な説明はOutput モード の章を見てください。
-
集約はイベント時間カラムあるいはイベント時間カラム上の
window
のどちらからを持つ必要があります。 -
集約で使われるtimestampカラムとして
withWatermark
が同じカラムで呼ばれる必要があります。例えば、ウォーターマークは集約カラムからの異なるカラム上で定義されるため、df.withWatermark("time", "1 min").groupBy("time2").count()
は Append 出力モードでは無効です。 -
ウォーターマークの詳細を使うには、集約の前に
withWatermark
を呼び出す必要があります。例えば、Append出力モードではdf.groupBy("time").count().withWatermark("time", "1 min")
は無効です。
ウォーターマークを使った集約の保証のセマンテック
-
“2 hours”のウォーターマークの遅延(
withWatermark
で設定される) はエンジンが2時間未満の遅延した全てのデータを取り零さないだろうことを保証します。別の言い方をすると、最新の処理されたデータから(イベント時間の観点から)2時間未満の遅れた全てのデータは集約されることが保証されます。 -
しかし、その保証は1つの方向のみに制限されます。2時間より多く遅延したデータは取り零されることが保証されません; それは集約されるかもしれませんし、されないかもしれません。もっと遅れたデータは、おそらくエンジンが処理しようとしないでしょう。
join オペレーション
構造化ストリーミングは他のストリーミング Dataset/DataFrame と同様に、ストリーミング Dataset/DataFrame と静的なDataset/DataFrame とのjoinをサポートします。ストリーミングのjoinの結果は、前の章でのストリーミングの集約の結果に似て、逐次的に生成されます。この章では上の場合でどの型のjoin(つまり、inner, outerなど)がサポートされるかを調査します。全てのサポートされるjoinの型において、ストリーミング Dataset/DataFrame とのjoinの結果は、ストリーム内の同じデータを含む静的な Dataset/DataFrame と一緒だった場合と確実に同じになるだろうことに注意してください。
Stream-static Joins
Spark 2.0 での導入から、構造化ストリーミングは ストリーミングと静的なDataFrame/Datasetとの join (inner join と幾つかの種類の outer join) をサポートしています。以下は簡単な例です。
ストリーム-静的なjoinはステートフルではないため、状態の管理は必要ではないことに注意してください。しかし、2,3のストリーム-静的な outer joinはまだサポートされません。それらはこの join の章の最後でリスト化されます。
Stream-stream Joins
Spark 2.3でストリーム-ストリームのjoinのサポートを追加しました。つまり、2つのストリーミング Datasets/DataFrame をjoinすることができます。2つのデータストリームの間で結合結果を生成する際の課題は、どの時点においてもデータセットのビューが結合の両側で不完全であり、入力間の一致を見つけることが遥かに困難になることです。ある入力ストリームから受け取った行は、他の入力ストリームから受信する将来の未受信の行と一致するかもしれません。従って両方の入力ストリームについて、過去の入力をストリーミング状態としてバッファリングするため、将来の全ての入力を過去の入力と一致させ、それに応じて結合結果を生成できます。さらに、ストリーミング集約と同様に、遅延、順不同のデータを自動的に処理し、ウォーターマークを使って状態を制限することができます。サポートされるストリーム-ストリームjoinの異なる型とそれらをどう使うかを議論しましょう。
任意のウォーターマークを使った Inner Join
全ての種類のjoin条件と一緒の全ての種類のカラムとのinner joinがサポートされます。しかし、ストリームが実行されると、新しい入力は過去の入力と一致する可能性があるため、全ての過去の入力を保存する必要があるため、ストリーミング状態のサイズが無限に増え続けます。無制限の状態を避けるために、無期限に古い入力が将来の入力と一致しないようにして、状態から削除できるように追加の結合条件を定義する必要があります。別の言い方をすると、joinで以下の追加のステップを行う必要があるでしょう。
-
(ストリーミングの集約と似て)入力がどれぐらい遅れるかもしれないかをエンジンが知るように、両方の入力上のウォーターマークの遅延を定義します。
-
Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. この制約は2つのうちの1つの方法で定義することができます。
-
時間範囲のjoin条件 (例えば、
...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
)、 -
イベント時間のウィンドウのjoin (例えば
...JOIN ON leftTimeWindow = rightTimeWindow
)。
-
例を使ってこれを理解してみましょう。
広告のインプレッション(広告が表示された時)のストリームと、貨幣化可能なクリックに繋がるインプレッションに関連する広告上のユーザのクリックの他のストリームとのストリームのjoinをしたいとします。このストリーム-ストリーム joinでの片付けができるように、以下のようにウォーターマークの遅延と時間の制限を指定する必要があるでしょう。
-
ウォーターマークの遅延: 例えば、インプレッションと対応するクリックは、それぞれ最大2および3時間のイベント時間での遅延/順番違いが起こり得ます。
-
イベント時間の範囲の条件: 例えば、クリックは対応するインプレッションの後で 0秒から1時間の時間範囲内で発生するかもしれません。
コードはこのようになるでしょう。
ウォーターマークを持つストリーム-ストリームのinner joinのセマンテックな保証
これは集約でのウォーターマークによって提供される保証に似ています。ウォーターマークの遅延の“2時間”の保証は、エンジンが2時間未満の遅れの全てのデータを取り零さないでしょう。しかし、2時間より大きな遅延は処理されるかあるいはされないかもしれません。
ウォーターマークを使ったOuter Join
ウォーターマーク + イベント時間の制限はinner joinについては任意ですが、left および right outer join については、指定されなければなりません。これはouter joinでNULLの結果を生成するには、エンジンは入力行が将来何にも一致しないだろう時を知る必要があるからです。従って、ウォーターマーク + イベント時間の制限は正しい結果を生成するために指定されなければなりません。従って、outer-joinになるためにそれを指定する追加のパラメータがあることを除いて、outer-joinを持つクエリは前の広告の貨幣化にとても似ているでしょう。
ウォーターマークを持つストリーム-ストリームのouter joinのセマンテックな保証
outer joinは、ウォーターマークの遅延とデータが零れるかどうかに関して、inner joins と同じ保証を持ちます。
警告
outerの結果が生成される方法について、気を付けるべき2,3の重要な特徴があります。
-
outer NULLの結果は指定されたウォーターマークの遅延と時間範囲の制限に依存する遅延によって生成されるでしょう。 これは一致が存在せず、将来これ以上の一致が無いだろうことを確認するためにエンジンがそれだけ長く待つ必要があるからです。
-
マイクロ バッチ エンジンの現在の実装では、ウォーターマークはマイクロバッチの最後まで進み、次のマイクロバッチが状態を掃除しouterの結果を出力するために更新されたウォーターマークを使用します。処理をする新しいデータがある時にのみマイクロバッチを起動するため、ストリーム内で受信された新しいデータが無い場合はouterの結果の生成は遅れるかもしれません。簡単に言えば、もしjoinされる2つの入力ストリームのいずれかがしばらくの間データを受信しない場合、outer (left あるいは rightの両方の場合)の出力は遅れるかもしれません。
ストリーミング クエリでのjoinについてのサポートの表
Left Input | Right Input | Join Type | |
---|---|---|---|
Static | Static | 全ての型 | ストリーミング クエリの中にあったとしても、それはストリーミングデータでは無いため、サポートされます。 |
ストリーム | Static | Inner | サポートされます。ステートフルではありません |
Left Outer | サポートされます。ステートフルではありません | ||
Right Outer | サポートされません。 | ||
Full Outer | サポートされません。 | ||
Static | ストリーム | Inner | サポートされます。ステートフルではありません |
Left Outer | サポートされません。 | ||
Right Outer | サポートされます。ステートフルではありません | ||
Full Outer | サポートされません。 | ||
ストリーム | ストリーム | Inner | サポートされます。任意で状態の清掃のために両側のウォーターマーク + 時間の制限を指定します。 |
Left Outer | 条件付きでサポートされます。結果を集めるために右側のウォーターマーク + 時間制限を指定する必要があります。任意で全ての状態を清掃するために左側のウォーターマークを指定します。 | ||
Right Outer | 条件付きでサポートされます。結果を集めるために左側のウォーターマーク + 時間制限を指定する必要があります。任意で全ての状態を清掃するために右側のウォーターマークを指定します。 | ||
Full Outer | サポートされません。 | ||
サポートされるjoinの追加の詳細:
-
joinはカスケードすることができます。つまり、
df1.join(df2, ...).join(df3, ...).join(df4, ....)
。 -
Spark 2.4 の時点で、クエリが追加出力モードの時のみjoinを使うことができます。他の出力モードはまだサポートされません。
-
Spark 2.4 の時点で、joinの前に他の非map形式のオペレーションを使うことができません。何が使うことができないかの2,3の例があります。
-
joinの前にストリーミングの集約を使うことができません。
-
joinの前で更新モードの mapGroupsWithState と flatMapGroupsWithState を使うことができません。
-
ストリーミングの非重複
イベント内でユニークな識別子を使ってデータストリーム内のレコードを非重複にすることができます。これはユニークな識別子のカラムを使った静的なものの非重複と完全に同じです。クエリは重複レコードをフィルタできるような以前のレコードから必要な量のデータを格納するでしょう。集約と似て、ウォーターマークの有り無しの非重複を使うことができます。
-
ウォーターマークあり - 重複したレコードが到着するかも知れない頻度について上限があります。そしてイベント時間のカラムにウォーターマークを定義するこができ、guidとイベント時間カラムの両方を使って、非重複にすることができます。クエリは重複するレコードがもうないと思われる以前のレコードから古い状態のデータを削除するためにウォーターマークを使うでしょう。これはクエリが維持しなければならない状態の量を制限します。
-
ウォーターマーク無し - 重複したレコードが到達するかも知れない時間に制限が無いため、クエリは状態として以前のレコード全てからデータを格納します。
複数のウォーターマークを処理するためのポリシー
ストリーミング クエリは、unionまたはjoinされた複数の入力ストリームを持つことができます。各入力ストリームはステートフル操作で許容される必要がある遅延データの異なる閾値を持つことができます。各入力ストリームでwithWatermarks("eventTime", delay)
を使ってそれらの閾値を指定することができます。盾叔母、inputStream1
と inputStream2
の間のストリーム-ストリームjoinを含むクエリを考えてみます。
クエリの実行の間、構造化ストリームは各入力スートリームで見られる最大イベント時間を個々に追跡し、対応する遅延に基づいてウォーターマークを計算し、ステートフル操作のために使われる単一のグローバル ウォーターマークを選択します。デフォルトでは、ストリームの1つが他のストリームより遅れた場合(例えば、ストリームの1つがupstreamの障害によってデータの受信を停止)に、誤ってデータが零れないようにするために、最小値がグローバル ウォータマークとして選択されます。別の言い方をすると、グローバル ウォータマークは最も遅いストリームのペースで安全に移動し、クエリの出力はそれに応じて遅延します。
ただし、場合によっては最も遅いストリームからデータを取りこぼすことがあっても高速な結果を得たい場合があります。Spark 2.4 から、SQL設定 spark.sql.streaming.multipleWatermarkPolicy
を max
(デフォルトは min
)に設定することで、複数のウォーターマーク ポリシーにグローバル ウォーターマークとして最大値を設定することができます。これによりグローバル ウォーターマークは最も高速なストリームのペースで移動できます。ただし、副作用として、遅いストリームからのデータは積極的に取りこぼされます。従ってこの設定を慎重に使ってください。
任意のステートフル操作
多くの利用法で、集約よりもっと進化したステートフル操作を必要とします。例えば、多くの利用法の中で、イベントのデータストリームからセッションを追跡しなければなりません。そのようなセッション化を行う場合、任意の型のデータを状態として保存する必要があり、各トリガーでのデータストリームイベントを使って状態への任意の操作を実施するでしょう。Spark 2.2 から、これはオペレータmapGroupsWithState
ともっと強力な操作 flatMapGroupsWithState
を使って行うことができます。両方の操作により、ユーザはユーザ定義の状態を更新するためにグループ化されたデータセット上でユーザ定義のコードを適用することができます。もっと具体的な詳細については、API ドキュメント(Scala/Java) と例 (Scala/Java) を見てください。
Sparkはそれをチェックして強制することはできませんが、状態関数は出力モードのセマンティクスに関して実装する必要があります。例えば、更新モードでは、Spark は状態関数が現在のウォーターマークと許容されるレコードの遅延よりも古い行を出力することを期待しませんが、追加モードでは状態関数はこれらの行を出力できます。
サポートされないオペレーション
ストリーミング データフレーム/データセットでサポートされない2,3のデータフレーム/データセット 操作があります。それらの幾つかは以下のようなものです。
-
多段ストリーミング集約 (つまり、ストリーミングDF上の集約のチェーン)はまだストリーミングデータセットではサポートされません。
-
ストリーミング データセット上の Limit および 最初のN行の取得はサポートされません。
-
ストリーミング データセット上のdistinct操作はサポートされません。
-
集約の後、およびComplete出力モードでのみ、ストリーミングデータセット上でのSort操作がサポートされます。
-
ストリーミング データセット上の少数のouter joinの型がサポートされません。詳細はjoinオペレーションの章のサポート マトリックスを見てください。
更に、ストリーミング データセット上で動作しないだろう幾つかのデータセット メソッドがあります。それらはすぐにクエリを実行して結果を返すだろうアクションで、ストリーミング データセット上では意味を為しません。むしろ、それらの機能はストリーミングクエリの明示的な開始によって行うことができます (それについては次の章を見てください)。
-
count()
- ストリーミング データセットから1つのcountを返すことができません。代わりに、実行中のcountを含むストリーミング データセットを返すds.groupBy().count()
を使ってください。 -
foreach()
- 代わりにds.writeStream.foreach(...)
を使ってください (次の章を見てください)。 -
show()
- 代わりにconsole sinkを使ってください (次の章を見てください)。
もしこれらの操作のどれかを試すと、“operation XYZ is not supported with streaming DataFrames/Datasets” のようなAnalysisException
を見るでしょう。将来のリリースでそれらのうちの幾つかがサポートされるかも知れませんが、ストリーミングデータ上で効果的に実装することが本質的に困難な他のことがあります。例えば、入力ストリーム上でのソートは、ストリーム内で受け取った全てのデータを追跡し続けることが必要なため、サポートされません。従って、これは効率的に実行することが本質的に難しいです。
グローバル ウォーターマークの制限
追加モードで、ステートフル操作が現在のウォーターマークと許容されるレコードの遅延よりも古い行を発行する場合、(Spark はグローバルウォーターマークを使うので)それらはダウンストリームのステートフル操作の中で “late rows” になります。これらの行は破棄される場合があることに注意してください。これはグローバルウォーターマークの制限で、正確性の問題を引き起こす可能性があります。
Spark はクエリの論理プランをチェックし、Spark がそのようなパターンを検出すると、警告をログに記録します。
以下のステートフル操作の後のステートフル操作は、この問題が発生する可能性があります:
- 追加モードでのストリーミング集約
- stream-stream 外部結合
- 追加モードの
mapGroupsWithState
とflatMapGroupsWithState
(状態関数の実装に依存します)
Spark は mapGroupsWithState
/flatMapGroupsWithState
の状態関数をチェックできないため、オペレータが追加モードを使うと、Spark は状態関数が遅れた行を出力すると想定します。
既知の回避策があります: ストリーミングクエリをステートフルオペレータごとに複数のクエリに分割し、エンドツーエンドをクエリごとに1回だけ確実にします。最後のクエリに対してエンドツーエンドを1回だけ確実にすることは、オプションです。
ストリーミング クエリの開始
一旦最終の結果データフレーム/データセットを定義すると、後に残っているのはストリーミング計算の開始です。そうするには、Dataset.writeStream()
を経由して帰ってくるDataStreamWriter
(Scala/Java/Python ドキュメント) を使う必要があります。このインタフェース内で1つ以上の以下のものを指定する必要があります。
-
Details of the output sink: データフォーマット、locationなど。
-
Output mode: 出力sinkに書き込まれるものを指定する。
-
Query name: 任意で、識別のためのクエリのユニークな名前を指定する。
-
Trigger interval: 任意で、トリガーの間隔を指定する。指定されない場合、システムは以前の処理が完了したらすぐに新しいデータが利用可能かを調べるでしょう。以前の処理が完了していないためにトリガーの時間に失敗した場合は、システムはすぐに処理を起動するでしょう。
-
チェックポイントの location: 端から端まで耐障害性を保証できる出力sinkについては、システムが全てのチェックポイントの情報を書き込むだろうlocationを指定します。これはHDFS互換の耐障害性ファイルシステムの中のディレクトリでなければなりません。チェックポイントのセマンティクスは次の章で詳細に議論されます。
出力モード
出力モードには2,3の種類があります。
-
Append モード (デフォルト) - これはデフォルトのモードで、最後のトリガーで結果テーブルに追加された新しい行だけがsinkに書き込まれるでしょう。これは結果テーブルに追加された行が変更されないだろうクエリのためだけにサポートされます。従って、このモードは各行が一度だけ出力されるだろうことを保証します (耐障害性sinkを仮定します)。例えば、
select
,where
,map
,flatMap
,filter
,join
などだけを持つクエリがAppendモードをサポートするでしょう。 -
Complete モード - 各トリガーの後で、結果テーブル全体がsinkに出力されるでしょう。これは集約クエリのためにサポートされます。
-
Update モード - (Spark 2.1.1 から利用可能) 最後のトリガーから更新された結果テーブル内の行だけがsinkに出力されるでしょう。詳細な情報は将来のリリースで追加されるでしょう。
異なる型のストリーミングクエリは、異なる出力モードをサポートします。互換性のマトリックスです。
クエリの型 | サポートされる出力モード | 備考 | |
---|---|---|---|
集約有りのクエリ | ウォーターマークを持つイベント時間の集約 | Append, Update, Complete |
Append モードは古い集約状態を削除するためにウォーターマークを使います。しかし、モード セマンティクスによってウィンドウ化された集計の出力は、withWatermark() で指定された遅延閾値よりも遅れます。行はそれらが最終化 (例えばウォーターマークが交差した)後でのみ結果テーブルに追加できます。詳細は Late Data の章を見てください。Update モードは古い集約状態を削除するためにウォーターマークを使います。 定義によりこのモードは結果テーブル内の全てのデータを保持するため、Complete モードは古い集約状態を削除しません。 |
他の集約 | Complete, Update |
ウォーターマークが定義されていない(他のカテゴリの中でのみ定義されている)ため、古い集約状態は削除されません。 集約は更新することができ、従ってこのモードのセマンティクスに違反するため、Append モードはサポートされません。 |
|
mapGroupsWithState を使ったクエリ |
Update | ||
flatMapGroupsWithState を使ったクエリ |
追加操作モード | Append |
集約は flatMapGroupsWithState の後に可能です。
|
更新操作モード | Update |
集約は flatMapGroupsWithState の後では不可能です。
|
|
joins を使ったクエリ |
Append | 更新と完了モードはまだサポートされません。どの種類のjoinがサポートされるかについての詳細は、joinオペレーションの章のサポート マトリックス を見てください。 | |
他のクエリ | Append, Update | 結果テーブル内の全て集約されていないデータを保持することは実現不可能なため、Complete モードはサポートされません。 | |
出力シンク
2,3の組み込みの出力sinkがあります。
- ファイル sink - 出力をディレクトリに格納します。
- Kafka sink - Kafka内の1つ以上のトピックに出力を格納します。
- Foreach sink - 出力内のレコードの任意の計算を実行します。詳細はこの章の後の方を見てください。
- コンソール sink (デバッグのため) - トリガーがある度に出力をコンソール/標準出力に出力します。Append と Complete の出力モードの両方がサポートされます。各トリガーの後でドライバーのメモリ内に出力全体が格納されるため、多くないデータの量のデバッグの目的のために使われるべきです。
- メモリ sink (for debugging) - 出力はメモリ内にインメモリテーブルとして格納されます。Append と Complete の出力モードの両方がサポートされます。ドライバーのメモリ内に出力全体が格納されるため、多くないデータの量のデバッグの目的のために使われるべきです。従って、慎重に使ってください。
幾つかのsinkは出力の永続性を保証せず、デバッグの目的のために予定されているため、耐障害性がありません。耐障害性のセマンティクスの前の章を見てください。Sparkでの全てのsinkの詳細です。
シンク | サポートされる出力モード | オプション | 耐障害性 | 備考 |
---|---|---|---|---|
ファイル Sink | Append |
path : 出力ディレクトリのパス。指定しなければなりません。ファイルフォーマットに固有のオプションについては、DataFrameWriter内の関係するメソッドを見てください (Scala/Java/Python/R)。例えば、"parquet" フォーマットのオプションについては、 DataFrameWriter.parquet() を見てください
|
はい (確実に1回) | パーティション テーブルへの書き込みをサポートします。時間によるパーティションが有用かも知れません。 |
Kafka Sink | Append, Update, Complete | Kafka 統合ガイドを見てください | はい (少なくとも1回) | 詳細は Kafka 統合ガイドにあります |
Foreach シンク | Append, Update, Complete | None | はい (少なくとも1回) | 詳細は次の章にあります |
ForeachBatch Sink | Append, Update, Complete | None | 実装に依存 | 詳細は次の章にあります |
コンソール Sink | Append, Update, Complete |
numRows : 各トリガー毎に出力する行の数 (デフォルト: 20) truncate : 出力が長い場合に切り捨てるかどうか (デフォルト: true)
|
いいえ | |
メモリ Sink | Append, Complete | None | いいえ。しかし、Complete モードでは、再起動されたクエリは完全なテーブルを再生成するでしょう。 | テーブル名はクエリ名です。 |
実際にクエリの実行を開始するにはstart()
を呼び出す必要があることに注意してください。これは連続して実行中の実行へのハンドラである StreamingQuery オブジェクトを返します。このオブジェクトをクエリを管理するために使うことができます。これについては次のサブセクションで議論します。今は、2,3の例を使ってこれについて理解しましょう。
Foreach と ForeachBatch の使用
foreach
と foreachBatch
操作により、任意の操作と書き込みロジックをストリーミング クエリの出力に適用することができます。それらは僅かに異なるユースケースを持ちます - foreach
は各行への独自の書き込みロジックを許可します、foreachBatch
は各マイクロ バッチの出力への任意の操作と独自のロジックを許可します。この使い方をもっと詳細に理解しましょう。
ForeachBatch
foreachBatch(...)
により、ストリーミング クエリの各マイクロ バッチの出力データで実行される関数を指定することができます。Spark 2.4から、これはScala, Java および Pythonでサポートされます。2つのパラメータを取ります: マイクロ バッチの出力データを持つデータフレームあるいはデータセットと、マイクロ バッチのユニークなID。
R はまだサポートされません。
foreachBatch
を使って、以下を行うことができます。
- 既存のバッチ データ ソースの再利用 - 多くのストレージ システムでは、ストリーミング シンクはまだ利用できないかもしれないですが、バッチ クエリ用のデータ ライタが既にあるかもしれません。
foreachBatch
を使うと、各マイクロ バッチの出力にバッチ データ ライタを使うことができます。 - 複数の場所への書き込み - 複数の場所にストリーミング クエリの出力を書き込みたい場合、単純に出力データフレーム/データセットを複数回書き込むことができます。ただし、書き込みを試みるたびに出力データが再計算されます(入力データの再読み込みの可能性を含む)。再計算を回避するには、出力のデータフレーム/データセットをキャッシュし、それを複数の場所に書き、それからキャッシュを解除する必要がります。以下は概要です。
- 追加のデータフレームの操作の適用 - Sparkはストリーミングのデータフレームの逐次的な生成プランをサポートしないため、多くのデータフレームとデータセットの操作はストリーミングデータフレームでサポートされません。
foreachBatch
を使うと、これらの操作を各マイクロバッチの出力に適用できます。ただし、その操作を自分自分で行う場合のエンドツーエンドのセマンティクスについて推論する必要があります。
注意:
- デフォルトでは、
foreachBatch
は少なくとも1回の書き込みの保証のみを提供します。ただし、出力に重複制御を行い、確実に一回の保証を取得する方法として、関数に提供されたバッチIDを使うことができます。 foreachBatch
は、基本的にストリーミング クエリのマイクロバッチ実行に依存するため、連続処理モードでは動作しません。連続モードでデータを書き込む場合は、代わりにforeach
を使ってください。
Foreach
foreachBatch
がオプションでは無い場合 (例えば、対応するバッチライタが存在しないか、連続処理モード)、独自のライターロジックをforeach
を使って表現することができます。具体的には、データ書き込みロジックを3つのメソッドに分割することで表現することができます: open
, process
および close
。Spark 2.4 から、Scala, Java およびPythonでforeach
が利用可能です。
Scala では、ForeachWriter
(ドキュメント) を拡張する必要があります。
Java では、ForeachWriter
(ドキュメント) を拡張する必要があります。
Pythonでは、2つの方法でforeachを呼び出すことができます: 関数内あるいはオブジェクト内。関数は処理ロジックを表現するための簡単な方法を提供しますが、障害が一部の入力データの再処理を引き起こした場合に生成されたデータを重複排除することができません。その状況では、オブジェクト内で処理ロジックを指定する必要があります。
- 最初に、関数は入力として行を取ります。
- 次に、オブジェクトは処理メソッド、オプションのopenおよびcloseメソッドを持ちます:
R はまだサポートされません。
実行セマンティクス ストリーミング クエリが開始されると、Sparkは関数またはオブジェクトのメソッドを以下の方法で呼びます:
-
このオブジェクトの単一のコピーはクエリ内の単一のタスクによって生成された全てのデータに責任があります。別の言い方をすると、1つのインスタンスが分散方式で生成されたデータの1つのパーティションの処理に責任があります。
-
各タスクは提供されたオブジェクトの新しいシリアライズ化-デシリアライズ化されたコピーを取得するため、このオブジェクトはシリアライズ化が可能でなければなりません。従って、データを書き込むための初期化 (例えば、接続を開く、トランザクションの開始)は、open()メソッドが呼ばれた後に実行することをお勧めします。これはタスクがデータの生成の準備ができたことを意味します。
-
メソッドのライフサイクルは以下の通りです:
-
partition_id を持つ各パーティションについて:
-
epoch_id を持つストリーミングデータの各batch/epochについて:
-
メソッド open(partitionId, epochId) が呼ばれます。
-
もし open(…) がtrueを返す場合、パーティションおよびbatch/epock内の各行について、メソッド process(row) が呼ばれます。
-
メソッド close(error)が呼ばれ、行の処理中にエラー(ある場合)が表示されます。
-
-
-
-
open()メソッドが存在し、JVMあるいはPythonプロセスが途中でクラッシュする場合を除いて正常に返る(戻り値に関係なく)場合、close() メソッド (存在する場合)が呼ばれます。
-
注意: Spark は (partitionId, epochId) の同じ出力を保証しないため、(partitionId, epochId) では重複排除を達成できません。例えば、ソースはなんらかの理由で異なる数のパーティションを提供し、Spark の最適化はパーティションの数などを変更します。詳細は SPARK-28650 を見てください。出力の重複排除が必要な場合は、代わりに
foreachBatch
を試してください。
トリガー
ストリーミング クエリのトリガー設定はストリーミング データ処理のタイミングを定義します。固定のバッチ間隔を持つマイクロバッチ クエリ、あるいは連続するクエリ処理として実行されるかどうか。サポートされる異なる種類のトリガーがあります。
トリガーの型 | 説明 |
---|---|
無指定 (デフォルト) | トリガーの設定が明示的に指定されない場合、デフォルトではクエリはマイクロバッチモードで実行されるでしょう。マイクロバッチは前のマイクロバッチが処理を完了するとすぐに生成されるでしょう。 |
固定間隔のマイクロ バッチ |
クエリはマイクロバッチモードで実行されるでしょう。マイクロバッチはユーザ定義の間隔で開始されるでしょう。
|
1回だけのマイクロバッチ | クエリは全ての利用可能なデータを処理するために1回だけマイクロバッチを実行し、自力で停止するでしょう。これは定期的にクラスタを分離新設し、最後の期間から利用可能な全てを処理し、そしてクラスタを終了するシナリオで便利です。場合によっては、これが極度にコストを節約することに繋がるかもしれません。 |
固定のチェックポイント間隔の連続 (実験的l) |
クエリは新しい低レンテンシ、連続処理モードで実行されるでしょう。これについての詳細は以下の連続処理の章を読んでください。 |
2,3のコードの例があります。
ストリーミング クエリの管理
クエリが開始された時に生成されたStreamingQuery
オブジェクトはクエリを監視および管理するために使うことができます。
1つのSparkSession内で任意の数のクエリを開始することができます。それら全ては同時にクラスタのリソースを共有して並行で実行するでしょう。現在アクティブなクエリを管理するために使うことができるStreamingQueryManager
(Scala/Java/Python docs) を取得するために、sparkSession.streams()
を使うことができます。
ストリーミングクエリの監視
アクティブなストリーミング クエリを監視する複数の方法があります。SparkのDropwizardメトリクスサポートを使った外部システムへメトリクスを追い出すか、プログラム的にそれらにアクセスするかのどちらかを行うことができます。
対話的にメトリクスを読み込む
streamingQuery.lastProgress()
と streamingQuery.status()
を使ってアクティブなクエリの現在の状態とメトリクスを直接取得することができます。lastProgress()
はScala および Java でのStreamingQueryProgress
オブジェクトとPythonでの同じフィールドを持つディクショナリを返します。ストリームの最後のトリガーで行われた進捗についての全ての情報を持ちます - 何のデータが処理されたか、処理のレートがどれくらいか、レンテンシなど。最後の2,3の進捗の配列を返すstreamingQuery.recentProgress
もあります。
更に、streamingQuery.status()
はScala および JavaでのStreamingQueryStatus
オブジェクトとPythonでの同じフィールドを持つディクショナリを返します。それはクエリがすぐに何をするかについての情報を与えます - トリガーを有効にする、データが処理されるなど。
2,3の例です。
非同期APIを使ったプログラム的なメトリクスのレポート
StreamingQueryListener
をアタッチすることでSparkSession
に関連する全てのクエリを非同期で監視することもできます (Scala/Java ドキュメント)。sparkSession.streams.attachListener()
を使って独自のStreamingQueryListener
オブジェクトを一度アタッチすると、クエリが開始および停止した時とアクティブなクエリ内で進捗があった時のコールバックを得ます。例は以下のようになります。
Dropwizardを使ったメトリクスのレポート
SparkはDropwizard ライブラリを使ったレポーティング メトリクスをサポートします。構造化ストリーミングクエリのメトリクスもレポートされるようにするには、明示的にSparkSession内の設定spark.sql.streaming.metricsEnabled
を有効にする必要があります。
この設定が有効にされた後のSparkSessionで開始された全てのクエリはDropwizardを使って設定されたどのようなsinks (例えば Ganglia, Graphite, JMXなど)にも報告されるでしょう。
チェックポイントを使った障害からの回復
障害あるいは計画的なシャットダウンの場合、前の進捗と前のクエリの状態を回復することができ、中止したところから再開することができます。これはチェックポイントと書き込み先行ログを使って行われます。チェックポイントのlocationを持つクエリを設定することができ、クエリは全ての進捗の情報(つまり、各トリガーで処理されたオフセットの範囲)と実行中の集約(例えば、quick exampleでのword count)をチェックポイントのlocationに保存するでしょう。このチェックポイントのlocationはHDFS互換ファイルシステムのパスでなければならず、クエリを実行する時にDataStreamWriterでのオプションとして設定することができます。
ストリーミング クエリ内での変更の後のセマンティクスの回復
同じチェックポイントの場所からの再起動の間に許可されるストリーミング クエリの変更には制限があります。許可されていないか、あるいは変更の効果が明確に定義されていない変更の幾つかを以下に示します。それらの全てについて:
-
allowed という用語は、指定された変更を行うことができるが、その効果のセマンティクスが明確に定義されているかどうかはクエリとその変更によって異なることを意味します。
-
not allowed という用語は、指定された変更は再起動されたクエリが予測不可能なエラーで失敗する可能性があるため、するべきではないことを意味します。
sdf
は sparkSession.readStream を使って生成されたストリーミング データフレーム/データセットを表します。
変更の種類
-
入力ソースの数または型(つまり異なるソース)の変更: これは許可されません。
-
入力ソースのパラメータ内の変更: これが許可されているかどうか、変更のセマンティクスが明確に定義されているかどうかは、ソースとクエリに依存します。2,3の例です。
-
レート制限の追加/削除/変更が許可されます:
spark.readStream.format("kafka").option("subscribe", "topic")
tospark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
-
購読されたトピック/ファイルの変更は、結果が予想不可能なため通常は許可されません:
spark.readStream.format("kafka").option("subscribe", "topic")
tospark.readStream.format("kafka").option("subscribe", "newTopic")
-
-
出力シンクの型の変更: シンクの幾つかの特定の組み合わせ間の変更は許可されます。これはケースバイケースで検証される必要があります。2,3の例です。
-
Kafkaシンクへのファイル シンクは許可されます。Kafkaは新しいデータのみを表示します。
-
ファイル シンクへのKafka シンクは許可されません。
-
foreachへ変更されたKafkaシンク、あるいはその逆は許可されます。
-
-
出力シンクのパラメータ内の変更: これが許可されているかどうか、変更のセマンティクスが明確に定義されているかどうかは、シンクとクエリに依存します。2,3の例です。
-
ファイル シンクの出力ディレクトリの変更は許可されません:
sdf.writeStream.format("parquet").option("path", "/somePath")
tosdf.writeStream.format("parquet").option("path", "/anotherPath")
-
出力トピックの変更は許可されます:
sdf.writeStream.format("kafka").option("topic", "someTopic")
tosdf.writeStream.format("kafka").option("topic", "anotherTopic")
-
ユーザ定義のforeachシンク(つまり、
ForeachWriter
コード) の変更は許可されますが、変更のセマンティクスはコードに依存します。
-
-
投影/フィルタ/マップのような操作の変更: 幾つかのケースは許可されます。例えば:
-
フィルタの 追加 / 削除 は許可されます:
sdf.selectExpr("a")
tosdf.where(...).selectExpr("a").filter(...)
. -
同じ出力スキーマの投影の変更は許可されます:
sdf.selectExpr("stringColumn AS json").writeStream
tosdf.selectExpr("anotherStringColumn AS json").writeStream
-
異なる出力スキーマの投影の変更は、条件付きで許可されます:
sdf.selectExpr("a").writeStream
からsdf.selectExpr("b").writeStream
へは、出力シンクが"a"
から"b"
へのスキーマ変更を許可する場合のみ、許可されます。
-
-
ステートフル操作の変更: ストリーミング クエリ内の一部の操作は結果を継続的に更新するために状態データを維持する必要があります。構造化ストリーミングは状態データを耐障害性のあるストレージ (例えば、HDFS, AWS S3, Azure Blob ストレージ) に自動的にチェックポイントし、それを再起動後に復元します。ただし、これは状態データのスキーマが再起動後も同じままであることを前提とします。ストリーミング クエリのステートフル操作への変更 (つまり、追加、削除 あるいはスキーマの変更)は再起動の間に許可されません。状態の回復を保証するために、再起動間でスキーマを変更しないステートフル操作のリストを以下に示します:
-
ストリーミング集約: 例えば、
sdf.groupBy("a").agg(...)
。グルーピング キーまたは集約の数または型の変更は許可されません。 -
ストリーミング重複排除: 例えば、
sdf.dropDuplicates("a")
。グルーピング キーまたは集約の数または型の変更は許可されません。 -
ストリーム - ストリーム join: 例えば、
sdf1.join(sdf2, ...)
(つまり、両方の入力はsparkSession.readStream
を使って生成されます)。スキーマあるいは等結合のカラムの変更は許可されません。joinの型(outer あるいは inner) の変更は許可されません。join条件の他の変更は不明確です。 -
任意のステートフル操作: 例えば、
sdf.groupByKey(...).mapGroupsWithState(...)
あるいはsdf.groupByKey(...).flatMapGroupsWithState(...)
。ユーザ定義の状態のスキーマとタイムアウトの型を変更することはできません。ユーザ定義の状態マッピング関数内の変更は許可されますが、変更のセマンティックな効果はユーザ定義のロジックに依存します。状態スキーマの変更を本当にサポートしたい場合は、スキーマの移行をサポートするエンコーディング/デコーディングを使って、複雑な状態データ構造を明示的にバイトにエンコード/デコードすることができます。例えば、状態を Avro-encoded バイトとして保存したい場合、バイナリの状態は常に正常に復元されるため、クエリの再起動間で Avro-state-schema を自由に変更することができます。
-
連続的な処理
[実験的]
連続的な処理は新しい、Spark2.3で導入された少なくとも1回の保証の低レンテンシ(~1ms)のend-to-endが可能な実験的なストリーミング実行モードです。これに比べて、デフォルトのマイクロバッチ処理エンジンは、確実に1回の保証を実現するが、せいぜい ~100msのレイテンシを実現します。幾つかの種類のクエリ(以下で議論されます)について、アプリケーションのロジックの変更無し(つまりデータフレーム/データセットの操作無し)にそれらを実行するモードを選択することができます。
連続的な処理モードでサポートされるクエリを実行するのに必要なのは、パラメータとして望ましいチェックポイントの間隔と一緒に連続的なトリガを指定することだけです。例えば:
1秒のチェックポイントの間隔は連続的な処理のエンジンが毎秒ごとにクエリの進捗を記録するだろうことを意味します。結果のチェックポイントはマイクロバッチ エンジンと互換性のある形式で、従ってどのようなトリガーを使っても全てのクエリを再開することができます。例えば、マイクロバッチモードで開始されたサポートされるクエリは連続的なモードで再開することができます。逆もまた然りです。連続的なモードに切り替える時はいつでも、少なくとも1回の耐障害性の保証になるだろうことに注意してください。
サポートされるクエリ
Spark 2.4 の時点で、連続的な処理モードでは以下のクエリの種類だけがサポートされます。
- 操作: マップのような Dataset/DataFrame オペレーションだけが連続的なモードでサポートされます。つまり、写像 (
select
,map
,flatMap
,mapPartitions
など) と選択 (where
,filter
など)だけ。- 集約関数 (集約はまだサポートされないため)、
current_timestamp()
とcurrent_date()
(時間を使う決定論的な計算はやりがいがあります)を除く全てのSQL関数がサポートされます。
- 集約関数 (集約はまだサポートされないため)、
- ソース:
- Kafka ソース: 全てのオプションがサポートされます。
- Rate ソース: テストには良いです。連続的なモードでのみサポートされるオプションは
numPartitions
とrowsPerSecond
です。
- シンク:
- Kafka シンク: 全てのオプションがサポートされます。
- Memory シンク: デバッグには良いです。
- Console シンク: デバッグには良いです。全てのオプションがサポートされます。コンソールは連続的なトリガー内で指定した各チェックポイントの間隔で出力するだろうことに注意してください。
それらの詳細については入力ソース と 出力シンクを見てください。コンソールシンクはテストに適していますが、エンドツーエンドの低レイテンシ処理はソースとシンクが Kafka の場合に最もよく観察できます。これによりエンジンがデータを処理し、入力トピック内で利用可能な入力データのミリ秒内に出力トピック内で結果を利用可能にします。
警告
- 連続的な処理エンジンは、連続的にでーたをソースから読み込み、それを処理し、連続的にシンクに書き込む複数の長く実行するタスクを起動します。クエリによって必要とされるタスクの数はクエリがソースから並行してどれだけ多くのパーティションから読み込むことができるかに依存します。従って、連続的な処理のクエリが開始する前に、クラスタ内に並行して全てのタスクにとって十分なコアがあるようにしてください。例えば、もし10パーティションを持つKafkaトピックから読み込む場合は、クエリが進捗するのにクラスタは少なくとも10のコアを持つ必要があります。
- 連続的なストリームの処理の停止は偽のタスクの終了警告を生成するかもしれません。これらは安全に無視することができます。
- 現在のところ失敗したタスクの自動的な再試行はありません。全ての失敗はクエリを停止することに繋がり、手動でチェックポイントから再開する必要があります。
追加の情報
備考
- クエリの実行後は、幾つかの設定は変更できません。それらを変更するには、チェックポイントを破棄し、新しいクエリを開始します。これらの設定には以下が含まれます:
spark.sql.shuffle.partitions
- これは状態の物理的な分割によるものです: 状態はキーへのハッシュ関数を適用することで分割されるため、状態の分割数は変更されません。
- ステートフル操作で実行するタスクの数を減らしたい場合は、
coalesce
を使って不必要なパーティション分割を回避することができます。coalesce
の後で、別のシャッフルが発生しない限り、(削減された)タスクの数は保持されます。
spark.sql.streaming.stateStore.providerClass
: クエリの以前の状態を正しく読み取るには、状態ストアプロバイダのクラスを変更しないでください。spark.sql.streaming.multipleWatermarkPolicy
: クエリに複数のウォーターマークが含まれる場合、これを変更すると、ウォーターマークの値に一貫性が無くなるため、ポリシーは変更しないでください。
更なる読み物
- Scala/Java/Python/R の例を見て実行してください。
- 紹介 Sparkの例を実行する方法
- 構造化ストリーミング Kafka統合ガイドでKafkaとの統合について読んでください
- Spark SQL プログラミング ガイドでデータフレーム/データセットの使用についての詳細を読んでください
- サードパーティのブログの投稿
トーク
- Spark サミット Europe 2017
- Apache Sparkの構造化ストリーミングを使った簡単、スケーラブル、耐障害性のストリーミング処理 - Part 1 slides/video, Part 2 slides/video
- 構造化ストリーミングでのステートフル ストリーム処理へのディープ ダイブ - slides/video
- Spark サミット 2016
- 構造化ストリーミングへの深みへのディープ ダイブ - slides/video