構造化ストリーミングプログラミングガイド

概要

構造化ストリーミングはSpark SQLエンジン上に構築されたスケーラブルで耐障害性のあるストリーミング処理エンジンです。静的なデータ上のバッチ計算を表すのと同じ方法でストリーミングの計算を表現することができます。Spark SQL エンジンはそれを逐次および継続的に実行し、最終的な結果を到着し続けるストリーミングデータとして更新するように注意するでしょう。ストリーミング集約、イベントタイム ウィンドウ、ストリーム-to-バッチ joinなどを表現するために、Scala、Java、PythonあるいはRでDataset/DataFrame APIを使うことができます。計算は同じ最適化されたSpark SQLエンジン上で実行されます。結果的に、システムはチェックポイントと先行書き込みログを使って end-to-endの確実に一回の耐障害性を保証します。簡単に言うと、ユーザがストリーミングについて推論する必要無く、高速、スケーラブル、耐障害性、end-to-endの確実に一回のストリーミング処理を提供します。

デフォルトでは、構造化ストリーミング クエリは内部的にmicro-batch processingエンジンを使って処理されます。これはデータ巣トリムを小さなバッチジョブとして処理し、従って100ミリ秒ぐらいの end-to-endレイテンシと確実に1回の耐障害性の保証を実現します。しかし、Spark 2.3 から連続処理と呼ばれる新しい低レンテンシの処理モードを導入しました。これは少なくとも一回の保証の1ミリ秒ぐらいのend-to-endレイテンシを実現することができます。クエリ内の Dataset/DataFrame 走査の変更無しで、アプリケーションの要求に応じてモードを選択することができるでしょう。

このガイドでは、プログラミングモデルとAPIを渡り歩いてみようと思います。ほとんどはデフォルトのマイクロバッチ処理モデルを使って概念を説明し、後で連続処理モデルについて議論しようと思います。まず、構造化ストリーミング クエリの簡単な例 - ストリーミング ワード カウント から始めましょう。

クリックな例

TCPソケット上でlistenしているデータサーバから受け取ったテキストデータの単語のカウントの実行を続けたいとします。構造化ストリーミングを使ってこれをどう表現できるかを見てみましょう。完全なコードはScala/Java/Python/Rで見ることができます。そしてSparkをダウンロードした場合は、直接例を実行することができます。どの場合でも、一歩ずつ例を見てそれがどのように動くかを理解しましょう。まず、必要なクラスをインポートし、Sparkに関する全ての機能の開始点となるローカルSparkセッションを作成しなければなりません。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
sparkR.session(appName = "StructuredNetworkWordCount")

次に、localhost:9999上でlistenするサーバから受け取るテキストデータを表現するストリーミングデータフレームを作成して、word countを計算するためにデータフレームを変換してみましょう。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

このlines データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、各行を複数の単語に分割するためにflatMap を適用できるように、.as[String] を使ってデータフレームを文字列のデータセットに変換しました。結果のwords データセットは全ての単語を含みます。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

このlines データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、各行を複数の単語に分割するためにflatMap を適用できるように、.as(Encoders.STRING()) を使ってデータフレームを文字列のデータセットに変換しました。結果のwords データセットは全ての単語を含みます。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

このlines データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、二つの組み込みのSQL関数を使いました - 各行をそれぞれの単語を持つ複数の行に分割する split と explode。更に、新しいカラムに“word”という名前を付けるために関数aliasを使います。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

このlines SparkDataFrameはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、二つのSQL関数を持つSQL表現があります - 各行をそれぞれの単語を持つ複数の行に分割する split と explode。更に、新しいカラムに “word” という名前を付けます。最後に、SparkDataFrame内のユニークな値によってグルーピングし、それらをカウントすることで、wordCounts SparkDataFrame を定義しました。これはストリームの実行中のword countを表すSparkDataFrameであることに注意してください。

これで、ストリームデータ上にクエリを設定しました。最後に残っているのは、実際にデータの受信を開始し、カウントを計算することです。これをするために、カウントの完全なセット(outputMode("complete")によって指定されます)が更新される度に、コンソールにそれらの出力を設定します。そして、start()を使ってストリーミングの計算を開始します。

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

awaitTermination(query)

コードが実行された後で、ストリーミングの計算がバックグラウンドで開始されるでしょう。query オブジェクトは有効なストリーミングのクエリへのハンドラで、クエリが有効な間プロセスが終了することを避けるために awaitTermination() を使ってクエリの終了を待つと決めました。

この例のコードを実際に実行するために、独自の Spark アプリケーション内でコードをコンパイルするか、一旦Sparkをダウンロードして単純に例を実行することができます。後者を示します。以下のようにして最初にNetcat(ほとんどのUnix系のシステムで見つけることができる小さなユーティリティ)をデータサーバとして実行する必要があるでしょう。

$ nc -lk 9999

それから、違うターミナルで、以下のようにして例を実行することができます。

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

それから、netcatサーバを実行しているターミナルの中で入力される全ての行がカウントされ、毎秒ごとに画面に出力されるでしょう。それは以下のように見えるでしょう。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.R

$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

プログラミング モデル

構造化ストリーミングの重要なアイデアは、活気のあるデータストリームを連続して追加されるテーブルとして扱うことです。これによりバッチ処理モデルにとても良く似た新しいストリーム処理となります。ストリーミング計算を静的なテーブル上の標準的なバッチのようなクエリとして表し、Sparkはそれを無限の入力テーブル上の 増加するクエリとして実行するでしょう。このモデルをもっと詳細に理解しましょう。

基本概念

入力データストリームを “入力 テーブル”と見なします。ストリーム上で到着する各データ項目は、入力テーブルに追加される新しい行のようなものです。

テーブルのようなストリーム

入力上のクエリは “結果テーブル”を生成するでしょう。各引き起こされる間隔(例えば、各1秒)ごとに、新しい行が入力テーブルに追加されます。これは結果的に結果テーブルを更新します。結果テーブルが更新されるといつも変更された結果の行を外部のsinkに書き込みたいでしょう。

モデル

“出力” は外部ストレージに書き込まれたものとして定義されます。出力は異なるモードで定義することができます:

各モードはクエリの特定の型に適用可能です。詳細は後で議論されます。

このモデルの使い方を説明するために、以前のQuick Exampleのコンテキストのモデルを理解しましょう。最初のlines データフレームは入力テーブルで、最後のwordCounts データフレームは結果テーブルです。wordCountsを生成するためのストリーミングlines データフレーム上のクエリは、それが静的なデータフレームだろうということで、完全に同じです。しかし、このクエリが開始されると、Sparkはソケット接続から新しいデータを連続して調べるでしょう。以下で示すように、もし新しいデータがあれば、Sparkは更新カウントを計算するために、前の実行中のカウントと新しいデータを組み合わせる “incremental” クエリを実行するでしょう。

モデル

構造化ストリーミングは表全体を実現しないことに注意してください。ストリーミングのデータソースから利用可能な最新のデータを読み込み、結果を更新するために逐次的に処理し、それからソースのデータを破棄します。結果を更新するために必要とされる最小限の中間のstateデータのみを保持します (例えば、前の例の中間カウント)。

このモデルは他の多くのストリーミング処理エンジンと極めて異なります。多くのストリーミングシステムはユーザが実行中の集約を自身で保持することを必要とします。従って耐障害性およびデータの一貫性(少なくとも1回、あるいは最大で1回、あるいは確実に1回)について判断しなければなりません。このモデルでは、新しいデータがある場合Sparkは結果テーブルを更新する責任があります。従ってユーザがそれについて推論する必要がありません。例として、このモデルがどうやってイベント時間に基づいた処理と後でやってくるデータを扱うかを見てみましょう。

Event-time と Late Dataの扱い

イベント時間はデータ自身の中に埋め込まれている時間です。多くのアプリケーションで、このイベント時間上で操作したいと思うかも知れません。例えば毎秒IoTデバイスによって生成されるイベントの数を取得したい場合、Sparkが受け取った時間よりもおそらくデータが生成された時間を使いたいでしょう。このイベント時間はこのモデルでとても自然に表現されます – デバイスからの各イベントは表内の行で、イベント時間は行内の列の値です。これにより、ウィンドウに基づく集約(例えば、各秒毎のイベントの数)がイベント時間の列上の単なるグルーピングと集約の特別な形になります – 各時間ウィンドウはグループで、各行は複数のウィンドウ/グループに所属するかも知れません。従って、そのようなイベント時間のウィンドウに基づいた集約のクエリは、静的なデータセット(例えば、デバイスのイベントログを集めたもの)とデータストリームの両方で首尾一貫して定義することができ、ユーザの生活をもっと簡単にします。

更に、このモデルは本質的にイベント時間に基づいて期待したよりも遅く到着したデータを処理します。Sparkは結果テーブルを更新するため、遅れたデータがある場合は古い集約の更新と、中間状態データのサイズを制限するために古い集約の掃除について完全な制御をします。Spark 2.1からは、ユーザが遅れたデータの閾値を指定することができるウォーターマークをサポートし、そのためエンジンが古い状態を掃除することができます。後でウィンドウ操作 の章の中で詳細が説明されます。

耐障害性semantics

端から端まで確実に1つのセマンティクスの実現が構造化ストリーミングの設計の背景にある主要な目的の一つです。これを実現するために、再起動および/あるいは再処理によるあらゆる種類の障害を処理できるように、処理の確実な進捗を信頼して追跡できるように構造化ストリーミング ソース、シンクおよび実行エンジンを設計しました。各ストリーミングソースはストリーム内で読み込みの場所を追跡するためにオフセット(Kafkaのオフセット、あるいはKinesisのシーケンス番号に似ています)を持つと仮定します。エンジンは各トリガー内で処理されるデータのオフセット範囲を記録するために、チェックポイントと先行書き込みログを使用します。ストリーミング シンクは再処理の扱いが等冪であるように設計されています。再生ソースと等冪シンクの使用と合わせて、構造化ストリーミングはどのような障害下でも 端から端まで確実に一回のセマンティクス を保証することができます。

データセットとデータフレームを使うAPI

Spark 2.0から、データフレームとデータセットはストリーミング、境界無しデータと同様に、静的、境界ありのデータを表すことができます。静的なデータセット/データフレームと似て、ストリーミング データフレーム/データセットをストリーミングソースから生成するために、一般的なエントリポイントSparkSession (Scala/Java/Python/R ドキュメント) を使うことができ、それらに同じオペレーションを静的なデータフレーム/データセットとして適用することができます。データセット/データフレームに馴染みが無い場合は、DataFrame/Dataset プログラミング ガイドを使ってそれらに慣れることを強くお勧めします。

ストリーミングデータフレームとストリーミングデータセットの生成

ストリーミングデータフレームはSparkSession.readStream()によって返されるDataStreamReader インタフェース(Scala/Java/Python ドキュメント) を使って生成されるかも知れません。Rでは、read.stream() メソッドを使います。静的データフレームを生成するための読み込みインタフェースと似て、ソースの詳細 – データフォーマット、スキーマ、オプションなどを指定することができます。

入力ソース

2,3の組み込みのソースがあります。

幾つかのソースは、障害の後でのチェックポイント オフセットを使ったデータの再生がされないかも知れないので、耐障害性がありません。耐障害性のセマンティクスの前の章を見てください。Sparkの全てのソースの詳細がここにあります。

ソース オプション 耐障害性 備考
ファイル ソース path: 入力ディレクトリへのパス。全てのファイルフォーマットで共通です。
maxFilesPerTrigger: 各トリガーごとに判断する新しいファイルの最大の数 (デフォルト: 上限無し)
latestFirst: 最新の新しいファイルを最初に処理するかどうか。大きなバックログファイルがある場合に便利です (デフォルト: false)
latestFirst: フルパスの代わりにファイル名だけに基づいて新しいファイルをチェックするかどうか (デフォルト: false)。これを `true` にすることで、ファイル名"dataset.txt"が同じなので、以下のファイルは同じファイルとみなされるでしょう:
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
maxFileAge: 無視されるまでの、このディレクトリにあるファイルの最大保持期間。最初のバッチでは、全てのファイルが有効とみなされます。latestFirst が `true` に設定され、maxFilesPerTrigger が設定された場合は、有効で処理する必要がある古いファイルが無視される可能性があるため、このパラメータは無視されます。最大経過時間は、現在のシステムのタイムスタンプではなく、最新のファイルのタイムスタンプを基準に指定されます。(デフォルト: 一週間)
cleanSource: 処理の後で完了したファイルを掃除するオプション。
利用可能なオプションは、"archive", "delete", "off" です。オプションが指定されていない場合、デフォルト値は "off" です。
"archive" が指定された場合、追加のオプション sourceArchiveDir も指定される必要があります。"sourceArchiveDir" の値は、深さ (ルートディレクトリからのディレクトリの数)のソースパスと一致してはなりません。ここで深さは両方のパスの深さの最小です。これにより、圧縮ファイルは新しいソースファイルとして含まれなくなります。
例えば、ソースパターンとして '/hello?/spark/*' を指定すると、'/hello?/spark/*' と '/hello1/spark/archive' は一致しないため、'/hello1/spark/archive/dir' は "sourceArchiveDir" の値として使えません。'/hello1/spark' も、'/hello?/spark' と '/hello1/spark' は一致しないため、"sourceArchiveDir" の値として使えません。'/archived/here' は一致しないため、問題ありません。Spark は、独自のパスを考慮してソースファイルを移動します。例えば、ソースファイルのパスが /a/b/dataset.txt で、圧縮ディレクトリのパスが /archived/here の場合、ファイルは /archived/here/a/b/dataset.txt に移動されます。
注意: 圧縮(移動による)と完了ファイルの削除の両方で、各マイクロバッチにオーバーヘッド(別のスレッドで発生している場合でも速度が低下します)が発生するため、このオプションを有効にする前に、ファイルシステムの各操作のコストを理解する必要があります。一方、このオプションを有効にすると、コストのかかる操作になる可能性があるソースファイルを一覧表示するコストが削減されます。
完了したファイルクリーナーで使われるスレッドの数は、spark.sql.streaming.fileSource.cleaner.numThreads (デフォルト: 1) で設定できます。
注意 2: このオプションを有効にする場合は、ソースパスを複数のソースあるいはクエリから使わないでください。同様に、ソースパスがファイルストリームシンクの出力ディレクトリのどのファイルとも一致しないことを確認してください。
注意 3: 削除アクションと移動アクションはどちらもベストエフォートです。ファイルの削除あるいは移動に失敗しても、ストリーミングクエリは失敗しません。Spark は、特定の状況で一部のソースファイルをクリーンアップしない場合があります - 例えば、アプリケーションは正常にシャットダウンせず、多くのファイルがクリーンアップのためにキューに入れられます。

ファイル形式に固有のオプションについては、DataStreamReader の関連メソッドを見てください (Scala/Java/Python/R)。例えば、"parquet" 形式のオプションについては、DataStreamReader.parquet() を見てください。

更に、特定のファイル形式に影響するセッション設定があります。詳細はSQL プログラミング ガイド を見てください。例えば、"parquet" については Parquet の設定を見てください。
Yes globパスをサポートしますが、複合的なカンマ区切りのパス/globはサポートしません。
ソケット ソース host: 接続するホスト。指定されなければなりません
port: 接続するポート。指定されなければなりません
いいえ
レートのソース rowsPerSecond (例えば 100、デフォルト: 1): 秒間あたりどれだけの行が生成されなければならないか。

rampUpTime (例えば 5秒、デフォルト: 0s): 生成のスピードがrowsPerSecondになるまで立ち上げにどれだけかかるか。秒より細かい粒度の使用は整数の秒に切り捨てられるでしょう。

numPartitions (例えば 10、デフォルト: Sparkのデフォルトの並行度): 生成された行のためのパーティション数。

ソースはrowsPerSecondに達するように最善を尽くすでしょうが、クエリはリソースに制限されるかもしれず、numPartitionsは望ましいスピードに達するのを手助けするために調整することができます。
Yes
Kafka ソース Kafka 統合ガイドを見てください。 Yes

幾つかの例です。

val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

これらの例は型無しのストリーミング データフレームを生成します。データフレームのスキーマはコンパイル時にチェックされず、クエリがサブミットされた時にのみ実行時にチェックされることを意味します。map, flatMap などの幾つかの操作は、コンパイル時に型が知られている必要があります。そうするために、これらの型無しのストリーミング データフレームを、静的なデータフレームとして同じメソッドを使って型有りのストリーミング データセットに変換することができます。詳細はSQL プログラミング ガイド を見てください。更に、サポートされるストリーミングソースについて更に詳しくこのドキュメントの後で議論されます。

Spark 3.1から、DataStreamReader.table()を使ってテーブルからストリーミングデータフレームを生成することもできます。詳細は、ストリーミングテーブル APIsを見てください。

スキーマ推論とストリーミング データフレーム/データセットのパーティション

デフォルトで、ファイルベースソースの構造化ストリーミングは、自動的に推測するためにSparkに頼るではなく、スキーマを指定することを必要とします。この制限は、障害時の場合においても、ストリーミング クエリのために一貫したスキーマが使われるだろうことを保証します。その場限りのために、spark.sql.streaming.schemaInferencetrue に設定することでスキーマの推測を再び有効にすることができます。

/key=value/ という名前のサブディレクトリが存在する場合はパーティションの調査は起こらず、リスト化はこれらのディレクトリ内に再帰的に起こるでしょう。これらのカラムがユーザが提供したスキーマの中で現れる場合、それらはファイルが読み込まれたパスに基づいたSparkによって情報が与えられるでしょう。パーティションスキーマを構成するディレクトリは、クエリが開始される時に存在しなければならず、静的なままでなければなりません。例えば、/data/year=2015/が存在する時に/data/year=2016/ を追加することは問題無いですが、パーティションカラムを変更(つまり、/data/date=2016-04-17/ディレクトリを生成)することは正しくありません。

ストリーミング データフレーム/データセット での操作

ストリーミング データフレーム/データセット に全ての種類の操作を適用することができます – 型無し SQL風の操作 (例えば select, where, groupBy) から型有りのRDD風の操作 (例えば map, filter, flatMap)まで。詳細はSQL プログラミング ガイド を見てください。使用できる2,3の操作の例を見てみましょう。

基本的なオペレーション - 選択、射影、集約

データフレーム/データセット上のほとんどの共通の操作がストリーミングのためにサポートされます。サポートされない少数の操作についてはこの章の後で議論されます。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

一時的なビューとしてストリーミングデータフレーム/データセットを登録し、それにSQLコマンドを適用することもできます。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

DataFrame/Dataset がストリーミングデータを持つかどうかdf.isStreamingを使って識別することができることに注意してください。

df.isStreaming
df.isStreaming()
df.isStreaming()
isStreaming(df)

Sparkはストリーミングデータセットに対するSQLステートメントの解析中にステートフル操作を挿入する可能性があるため、クエリのクエリプランを確認することをお勧めします。ステートフル操作がクエリプランに挿入されたら、ステートフル操作を考慮してクエリを確認する必要がある場合があります。(例えば、出力モード、ウォーターマーク、状態ストアサイズのメンテナンスなど)

イベントタイムのウィンドウ操作

スライドするイベント時間のウィンドウ上での集約は構造化ストリーミングでは簡単で、グループ化集約にとても似ています。グループ化された集約では、集約値(例えば count)はユーザ定義のグループ化カラムの中のそれぞれのユニークな値について保持されます。ウィンドウベースの集約の場合、集約値はイベント時間の行が分類される各ウィンドウについて保持されます。これを図を使って理解しましょう。

手っ取り早い例が修正され、ストリームの行は行が生成された時に時間とともに含まれると仮定します。word countを実行する代わりに、各5秒ごとに更新する10秒のウィンドウの中でのwordを数えたいとします。つまり、12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 などの10秒のウィンドウの間で受信した複数のwordの中のwordをカウントします。12:00 - 12:10 は12:00以降で12:10より前に到着したデータを意味することに注意してください。ここで、12:07に到着したwordを考えてみましょう。このwordは二つのウィンドウ 12:00 - 12:10 と 12:05 - 12:15 に対応するカウントを増やす必要があります。つまり、そのカウントはグループキー (つまり word) とウィンドウ(イベント時間から計算することができます)の両方によってインデックスされるでしょう。

結果のテーブルは以下のようなものになるでしょう。

ウィンドウ オペレーション

このウィンドウはコード上はグルーピングに似ているため、ウィンドウされた集約を表現するためにgroupBy() および window() 操作を使うことができます。以下の例のScala/Java/Pythonでの完全なコードを見ることができます

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

Late Dataとウォーターマークの扱い

ここで、イベントの一つがアプリケーションに遅れて到着すると何が起きるかを考えてみましょう。例えば、12:04(つまりイベント時間)に生成されたwordがアプリケーションによって12:11に受け取られるかも知れません。アプリケーションはウィンドウ 12:00 - 12:10の古いカウントを更新するために12:11ではなく、12:04を使うべきです。これはウィンドウベースのグルーピングでは普通に起こります - 構造化ストリーミングは、以下で示すように遅れたデータが古いウィンドウの集約を正しく更新することができるように、長い時間部分的な集約について中間状態を維持することができます。

遅れたデータの処理

しかし、このクエリを数日の間実行するには、システムが計算する中間のメモリの状態の量を制限することが必要です。このことは、アプリケーションが集約についてもう遅れたデータを受け取らないだろうということで、システムが古い集約をメモリの状態から削除することができるかを知る必要があることを意味します。これを有効にするには、Spark 2.1でwatermarkingを導入しました。これはエンジンが自動的にデータ内の現在のイベント時間を追跡し、古い状態を適宜掃除します。イベント時間のカラムと、イベント時間の点でデータがどれくらい遅れるかを予想をする閾値を指定することでクエリのwatermarkを定義することができます。時間 Tで終わる特定のウィンドウについて、エンジンは状態を維持し、遅れたデータが(エンジンによって観測された最大のイベント時間 - 遅延の閾値 > T)まで状態を更新することができるでしょう。別の言い方をすると、閾値内の遅れたデータは集約されますが、閾値より遅れたデータは取り零され始めるでしょう (確実な保証については後のの章をみてください)。例を使ってこれを理解してみましょう。以下で示すようにwithWatermark()を使って以前の例でのwatermarkを簡単に定義することができます。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

この例では、カラム“timestamp”の値にクエリのウォーターマークを定義し、データがどれだけ遅れることができるかの閾値を “10 分” に定義しました。このクエリがUpdate出力モード(後で出力モード の章で議論されます)で実行された場合、エンジンはウィンドウがウォーターマークより古くなるまでResultテーブル内のウィンドウの数を更新し続けるでしょう。カラム “timestamp” の現在のイベントタイムからの遅れは10分です。図での説明です。

Updateモードでのウォーターマーク

図で示されるように、エンジンによって追跡される最大のイベント時間はblue dashed lineで、ウォーターマークは各トリガーの開始時に (max event time - '10 mins') に設定されます。例えば、エンジンがデータ (12:14, dog)を観測すると、次のトリガーのためのウォーターマークを 12:04 に設定します。このウォーターマークは遅れたデータがカウントされるようにエンジンが更に10分間中間状態を維持します。例えば、データ(12:09, cat) は順番がバラバラで遅れており、ウィンドウ12:00 - 12:1012:05 - 12:15にあります。従って、それはトリガーのウォーターマーク12:04の前にまだあり、エンジンは状態として中間カウントをまだ維持していて、現在のところ関連するウィンドウのカウントを更新します。しかし、ウォーターマークが12:11に更新された場合、ウィンドウ(12:00 - 12:10)の中間状態が消去され、全てのそれに続くデータ(例えば (12:04, donkey))は“遅すぎる” と見なされ、従って無視されます。各トリガーの後で、更新されたカウント(つまり purple rows) はUpdateモードで命令されたようにトリガーの出力としてsinkに書き込まれます。

f幾つかのsink (例えば ファイル) はUpdateモードが必要とするfine-grained updateをサポートしないかも知れません。それらと連携するために、final countsだけがsinkに書き込まれるAppendモードをサポートします。これは以下で説明されます。

非ストリーミングデータセット上でwithWatermarkを使うと何もしないことに注意してください。ウォーターマークはどのようなやり方でもどのバッチクエリにも影響すべきではないため、それを直接無視するでしょう。

Appendモードでのウォーターマーク

前のUpdateモードと似ていて、エンジンは各ウィンドウのための中間カウントを維持します。しかし、部分的なカウントはResultテーブルへ更新されず、sinkに書き込まれません。エンジンは遅い日付がカウントされるように “10 分” 待ち、それから ウィンドウ < ウォーターマーク の中間状態を削除し、最終のカウントをResultテーブル/sinkに追加します。例えば、ウィンドウ 12:00 - 12:10 の最終カウントはウォーターマークが 12:11 に更新された後でのみResultテーブルに追加されます。

時間ウィンドウの型

Sparkは、タンブリング(固定)、スライド、セッションの3つの時間ウィンドウをサポートします。

時間ウィンドウの型

タンブリングウィンドウは、一連の固定サイズ、重複なし、連続した時間間隔です。入力は単一のウィンドウにのみバインドできます。

スライディングウィンドウは、“固定サイズ”の点でタンブリングウィンドウに似ていますが、スライドの期間がウィンドウの期間よりも短い場合、ウィンドウが重複する場合があります。この場合、入力を複数のウィンドウにバインドできます。

タンブリングとスライディングウィンドウは上記の例で説明したwindow関数を使います。

セッションウィンドウは、前の2つの型とは異なる特性を持っています。セッションウィンドウは、入力に応じてウィンドウの長さの動的なサイズがあります。セッションウィンドウは入力から始まり、ギャップ期間内に次の入力が受信されると自動的に拡大します。静的ギャップ期間の場合、最新の入力を受信した後、ギャップ期間内に入力が受信されないと、セッションウィンドウが閉じます。

セッションウィンドウはsession_window関数を使います。関数の使い方は、window関数に似ています。

import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window($"timestamp", "5 minutes"),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window(col("timestamp"), "5 minutes"),
        col("userId"))
    .count();
events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window(events.timestamp, "5 minutes"),
        events.userId) \
    .count()

静的な値の代わりに、入力行に基づいてギャップ期間を動的に指定する式を提供することもできます。ギャップ期間が負またはゼロの行は、集計から除外されることに注意してください。

動的ギャップ期間を使うと、セッションウィンドウの終了は最新の入力に依存しなくなります。セッションウィンドウの範囲は、全てのウィンドウ範囲の和集合であり、イベントの開始期間とクエリ実行中の評価されたギャップ期間によって決定されます。

import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
  .when($"userId" === "user2", "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        Column(sessionWindow),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

SessionWindow sessionWindow = session_window(col("timestamp"), when(col("userId").equalTo("user1"), "5 seconds")
  .when(col("userId").equalTo("user2"), "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        new Column(sessionWindow),
        col("userId"))
    .count();
from pyspark.sql import functions as F

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

session_window = session_window(events.timestamp, \
    F.when(events.userId == "user1", "5 seconds") \
    .when(events.userId == "user2", "20 seconds").otherwise("5 minutes"))

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window,
        events.userId) \
    .count()

ストリーミングクエリでセッションウィンドウを使う場合、以下のようにいくつかの制限があることに注してください:

  • 出力モードとしての“更新モード”はサポートされません。
  • グループ化キーには、session_windowに加えて少なくとも1つの列が必要です。

バッチクエリの場合、グローバルウィンドウ(グループ化キーにsession_windowだけが含まれる)がサポートされます。

ウォーターマークが集約状態を掃除する条件

以下の条件がウォーターマークが集約クエリ内の状態を掃除するために満たされるべきだと注意することが重要です (Spark 2.1.1 の時点の題名、将来には変わります)

  • Output モードは Append あるいは Update でなければなりません。 Complete モードは全ての集約データが保持されることを必要とし、従って中間データを削除するためにウォーターマークを使うことができません。各出力モードのセマンティクスの詳細な説明はOutput モード の章を見てください。

  • 集約はイベント時間カラムあるいはイベント時間カラム上のwindowのどちらからを持つ必要があります。

  • 集約で使われるtimestampカラムとしてwithWatermarkが同じカラムで呼ばれる必要があります。例えば、ウォーターマークは集約カラムからの異なるカラム上で定義されるため、df.withWatermark("time", "1 min").groupBy("time2").count() は Append 出力モードでは無効です。

  • ウォーターマークの詳細を使うには、集約の前にwithWatermark を呼び出す必要があります。例えば、Append出力モードではdf.groupBy("time").count().withWatermark("time", "1 min") は無効です。

ウォーターマークを使った集約の保証のセマンテック
  • “2 hours”のウォーターマークの遅延(withWatermarkで設定される) はエンジンが2時間未満の遅延した全てのデータを取り零さないだろうことを保証します。別の言い方をすると、最新の処理されたデータから(イベント時間の観点から)2時間未満の遅れた全てのデータは集約されることが保証されます。

  • しかし、その保証は1つの方向のみに制限されます。2時間より多く遅延したデータは取り零されることが保証されません; それは集約されるかもしれませんし、されないかもしれません。もっと遅れたデータは、おそらくエンジンが処理しようとしないでしょう。

join オペレーション

構造化ストリーミングは他のストリーミング Dataset/DataFrame と同様に、ストリーミング Dataset/DataFrame と静的なDataset/DataFrame とのjoinをサポートします。ストリーミングのjoinの結果は、前の章でのストリーミングの集約の結果に似て、逐次的に生成されます。この章では上の場合でどの型のjoin(つまり、inner, outer, semiなど)がサポートされるかを調査します。全てのサポートされるjoinの型において、ストリーミング Dataset/DataFrame とのjoinの結果は、ストリーム内の同じデータを含む静的な Dataset/DataFrame と一緒だった場合と確実に同じになるだろうことに注意してください。

Stream-static Joins

Spark 2.0 での導入から、構造化ストリーミングは ストリーミングと静的なDataFrame/Datasetとの join (inner join と幾つかの種類の outer join) をサポートしています。以下は簡単な例です。

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer");  // left outer join with a static DF
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
joined <- join(
            streamingDf,
            staticDf,
            streamingDf$value == staticDf$value,
            "left_outer")  # left outer join with a static DF

ストリーム-静的なjoinはステートフルではないため、状態の管理は必要ではないことに注意してください。しかし、2,3のストリーム-静的な outer joinはまだサポートされません。それらはこの join の章の最後でリスト化されます。

Stream-stream Joins

Spark 2.3でストリーム-ストリームのjoinのサポートを追加しました。つまり、2つのストリーミング Datasets/DataFrame をjoinすることができます。2つのデータストリームの間で結合結果を生成する際の課題は、どの時点においてもデータセットのビューが結合の両側で不完全であり、入力間の一致を見つけることが遥かに困難になることです。ある入力ストリームから受け取った行は、他の入力ストリームから受信する将来の未受信の行と一致するかもしれません。従って両方の入力ストリームについて、過去の入力をストリーミング状態としてバッファリングするため、将来の全ての入力を過去の入力と一致させ、それに応じて結合結果を生成できます。さらに、ストリーミング集約と同様に、遅延、順不同のデータを自動的に処理し、ウォーターマークを使って状態を制限することができます。サポートされるストリーム-ストリームjoinの異なる型とそれらをどう使うかを議論しましょう。

任意のウォーターマークを使った Inner Join

全ての種類のjoin条件と一緒の全ての種類のカラムとのinner joinがサポートされます。しかし、ストリームが実行されると、新しい入力は過去の入力と一致する可能性があるため、全ての過去の入力を保存する必要があるため、ストリーミング状態のサイズが無限に増え続けます。無制限の状態を避けるために、無期限に古い入力が将来の入力と一致しないようにして、状態から削除できるように追加の結合条件を定義する必要があります。別の言い方をすると、joinで以下の追加のステップを行う必要があるでしょう。

  1. (ストリーミングの集約と似て)入力がどれぐらい遅れるかもしれないかをエンジンが知るように、両方の入力上のウォーターマークの遅延を定義します。

  2. 2つの入力に渡るイベント時間の制約を定義して、一方の入力の古い行がもう一方の入力との一致に必要とされない(つまり、時間の制約を満たさない)時期をエンジンが把握できるようにします。この制約は2つのうちの1つの方法で定義することができます。

    1. 時間範囲のjoin条件 (例えば、...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR)、

    2. イベント時間のウィンドウのjoin (例えば ...JOIN ON leftTimeWindow = rightTimeWindow)。

例を使ってこれを理解してみましょう。

広告のインプレッション(広告が表示された時)のストリームと、貨幣化可能なクリックに繋がるインプレッションに関連する広告上のユーザのクリックの他のストリームとのストリームのjoinをしたいとします。このストリーム-ストリーム joinでの片付けができるように、以下のようにウォーターマークの遅延と時間の制限を指定する必要があるでしょう。

  1. ウォーターマークの遅延: 例えば、インプレッションと対応するクリックは、それぞれ最大2および3時間のイベント時間での遅延/順番違いが起こり得ます。

  2. イベント時間の範囲の条件: 例えば、クリックは対応するインプレッションの後で 0秒から1時間の時間範囲内で発生するかもしれません。

コードはこのようになるでしょう。

import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);
from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"
)))
ウォーターマークを持つストリーム-ストリームのinner joinのセマンテックな保証

これは集約でのウォーターマークによって提供される保証に似ています。ウォーターマークの遅延の“2時間”の保証は、エンジンが2時間未満の遅れの全てのデータを取り零さないでしょう。しかし、2時間より大きな遅延は処理されるかあるいはされないかもしれません。

ウォーターマークを使ったOuter Join

ウォーターマーク+イベント時間の制約は、inner joinについては任意ですが、outer joinについては指定されなければなりません。これはouter joinでNULLの結果を生成するには、エンジンは入力行が将来何にも一致しないだろう時を知る必要があるからです。従って、ウォーターマーク + イベント時間の制限は正しい結果を生成するために指定されなければなりません。従って、outer-joinになるためにそれを指定する追加のパラメータがあることを除いて、outer-joinを持つクエリは前の広告の貨幣化にとても似ているでしょう。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
 )
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"),
  "left_outer"                 # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
ウォーターマークを持つストリーム-ストリームのouter joinのセマンテックな保証

outer joinは、ウォーターマークの遅延とデータが零れるかどうかに関して、inner joins と同じ保証を持ちます。

警告

outerの結果が生成される方法について、気を付けるべき2,3の重要な特徴があります。

  • outer NULLの結果は指定されたウォーターマークの遅延と時間範囲の制限に依存する遅延によって生成されるでしょう。 これは一致が存在せず、将来これ以上の一致が無いだろうことを確認するためにエンジンがそれだけ長く待つ必要があるからです。

  • マイクロ バッチ エンジンの現在の実装では、ウォーターマークはマイクロバッチの最後まで進み、次のマイクロバッチが状態を掃除しouterの結果を出力するために更新されたウォーターマークを使用します。処理をする新しいデータがある時にのみマイクロバッチを起動するため、ストリーム内で受信された新しいデータが無い場合はouterの結果の生成は遅れるかもしれません。簡単に言えば、もしjoinされる2つの入力ストリームのいずれかがしばらくの間データを受信しない場合、outer (left あるいは rightの両方の場合)の出力は遅れるかもしれません。

ウォーターマークーとのSemi join

semi join は右側と一致するリレーションの左側から値を返します。left semi joiin とも呼ばれます。outer joinに似て、ウォーターマーク+イベント時間の制約はsemi joinについて指定されなければなりません。これは、左側の一致しない入力行を削除するためです。エンジンは、左側の入力行が将来の右側の入力行と一致しなくなる時期を認識している必要があります。

ウォーターマークを持つストリーム-ストリームのsemi joinのセマンテックな保証

semi joinは、ウォーターマークの遅延とデータが零れるかどうかに関して、inner joins と同じ保証を持ちます。

ストリーミング クエリでのjoinについてのサポートの表
Left Input Right Input Join Type
Static Static 全ての型 ストリーミング クエリの中にあったとしても、それはストリーミングデータでは無いため、サポートされます。
ストリーム Static Inner サポートされます。ステートフルではありません
Left Outer サポートされます。ステートフルではありません
Right Outer サポートされません。
Full Outer サポートされません。
Left Semi サポートされます。ステートフルではありません
Static ストリーム Inner サポートされます。ステートフルではありません
Left Outer サポートされません。
Right Outer サポートされます。ステートフルではありません
Full Outer サポートされません。
Left Semi サポートされません。
ストリーム ストリーム Inner サポートされます。任意で状態の清掃のために両側のウォーターマーク + 時間の制限を指定します。
Left Outer 条件付きでサポートされます。結果を集めるために右側のウォーターマーク + 時間制限を指定する必要があります。任意で全ての状態を清掃するために左側のウォーターマークを指定します。
Right Outer 条件付きでサポートされます。結果を集めるために左側のウォーターマーク + 時間制限を指定する必要があります。任意で全ての状態を清掃するために右側のウォーターマークを指定します。
Full Outer 条件付きでサポートされます。結果を集めるために片側のウォーターマーク + 時間制限を指定する必要があります。任意で全ての状態を清掃するためにもう片方のウォーターマークを指定します。
Left Semi 条件付きでサポートされます。結果を集めるために右側のウォーターマーク + 時間制限を指定する必要があります。任意で全ての状態を清掃するために左側のウォーターマークを指定します。

サポートされるjoinの追加の詳細:

  • joinはカスケードすることができます。つまり、df1.join(df2, ...).join(df3, ...).join(df4, ....)

  • Spark 2.4 の時点で、クエリが追加出力モードの時のみjoinを使うことができます。他の出力モードはまだサポートされません。

  • Spark 2.4 の時点で、joinの前に他の非map形式のオペレーションを使うことができません。何が使うことができないかの2,3の例があります。

    • joinの前にストリーミングの集約を使うことができません。

    • joinの前で更新モードの mapGroupsWithState と flatMapGroupsWithState を使うことができません。

ストリーミングの非重複

イベント内でユニークな識別子を使ってデータストリーム内のレコードを非重複にすることができます。これはユニークな識別子のカラムを使った静的なものの非重複と完全に同じです。クエリは重複レコードをフィルタできるような以前のレコードから必要な量のデータを格納するでしょう。集約と似て、ウォーターマークの有り無しの非重複を使うことができます。

  • ウォーターマークあり - 重複したレコードが到着するかも知れない頻度について上限があります。そしてイベント時間のカラムにウォーターマークを定義するこができ、guidとイベント時間カラムの両方を使って、非重複にすることができます。クエリは重複するレコードがもうないと思われる以前のレコードから古い状態のデータを削除するためにウォーターマークを使うでしょう。これはクエリが維持しなければならない状態の量を制限します。

  • ウォーターマーク無し - 重複したレコードが到達するかも知れない時間に制限が無いため、クエリは状態として以前のレコード全てからデータを格納します。

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
streamingDf = spark.readStream. ...

# Without watermark using guid column
streamingDf.dropDuplicates("guid")

# With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("guid", "eventTime")
streamingDf <- read.stream(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")

複数のウォーターマークを処理するためのポリシー

ストリーミング クエリは、unionまたはjoinされた複数の入力ストリームを持つことができます。各入力ストリームはステートフル操作で許容される必要がある遅延データの異なる閾値を持つことができます。各入力ストリームでwithWatermarks("eventTime", delay) を使ってそれらの閾値を指定することができます。盾叔母、inputStream1inputStream2 の間のストリーム-ストリームjoinを含むクエリを考えてみます。

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

クエリの実行の間、構造化ストリームは各入力スートリームで見られる最大イベント時間を個々に追跡し、対応する遅延に基づいてウォーターマークを計算し、ステートフル操作のために使われる単一のグローバル ウォーターマークを選択します。デフォルトでは、ストリームの1つが他のストリームより遅れた場合(例えば、ストリームの1つがupstreamの障害によってデータの受信を停止)に、誤ってデータが零れないようにするために、最小値がグローバル ウォータマークとして選択されます。別の言い方をすると、グローバル ウォータマークは最も遅いストリームのペースで安全に移動し、クエリの出力はそれに応じて遅延します。

ただし、場合によっては最も遅いストリームからデータを取りこぼすことがあっても高速な結果を得たい場合があります。Spark 2.4 から、SQL設定 spark.sql.streaming.multipleWatermarkPolicymax (デフォルトは min)に設定することで、複数のウォーターマーク ポリシーにグローバル ウォーターマークとして最大値を設定することができます。これによりグローバル ウォーターマークは最も高速なストリームのペースで移動できます。ただし、副作用として、遅いストリームからのデータは積極的に取りこぼされます。従ってこの設定を慎重に使ってください。

任意のステートフル操作

多くの利用法で、集約よりもっと進化したステートフル操作を必要とします。例えば、多くの利用法の中で、イベントのデータストリームからセッションを追跡しなければなりません。そのようなセッション化を行う場合、任意の型のデータを状態として保存する必要があり、各トリガーでのデータストリームイベントを使って状態への任意の操作を実施するでしょう。Spark 2.2 から、これはオペレータmapGroupsWithStateともっと強力な操作 flatMapGroupsWithStateを使って行うことができます。両方の操作により、ユーザはユーザ定義の状態を更新するためにグループ化されたデータセット上でユーザ定義のコードを適用することができます。もっと具体的な詳細については、API ドキュメント(Scala/Java) と例 (Scala/Java) を見てください。

Sparkはそれをチェックして強制することはできませんが、状態関数は出力モードのセマンティクスに関して実装する必要があります。例えば、更新モードでは、Spark は状態関数が現在のウォーターマークと許容されるレコードの遅延よりも古い行を出力することを期待しませんが、追加モードでは状態関数はこれらの行を出力できます。

サポートされないオペレーション

ストリーミング データフレーム/データセットでサポートされない2,3のデータフレーム/データセット 操作があります。それらの幾つかは以下のようなものです。

  • 多段ストリーミング集約 (つまり、ストリーミングDF上の集約のチェーン)はまだストリーミングデータセットではサポートされません。

  • ストリーミング データセット上の Limit および 最初のN行の取得はサポートされません。

  • ストリーミング データセット上のdistinct操作はサポートされません。

  • ストリーミングデータセットでの集約後の重複排除操作はサポートされません。

  • 集約の後、およびComplete出力モードでのみ、ストリーミングデータセット上でのSort操作がサポートされます。

  • ストリーミング データセット上の少数のouter joinの型がサポートされません。詳細はjoinオペレーションの章のサポート マトリックスを見てください。

更に、ストリーミング データセット上で動作しないだろう幾つかのデータセット メソッドがあります。それらはすぐにクエリを実行して結果を返すだろうアクションで、ストリーミング データセット上では意味を為しません。むしろ、それらの機能はストリーミングクエリの明示的な開始によって行うことができます (それについては次の章を見てください)。

  • count() - ストリーミング データセットから1つのcountを返すことができません。代わりに、実行中のcountを含むストリーミング データセットを返すds.groupBy().count()を使ってください。

  • foreach() - 代わりにds.writeStream.foreach(...)を使ってください (次の章を見てください)。

  • show() - 代わりにconsole sinkを使ってください (次の章を見てください)。

もしこれらの操作のどれかを試すと、“operation XYZ is not supported with streaming DataFrames/Datasets” のようなAnalysisExceptionを見るでしょう。将来のリリースでそれらのうちの幾つかがサポートされるかも知れませんが、ストリーミングデータ上で効果的に実装することが本質的に困難な他のことがあります。例えば、入力ストリーム上でのソートは、ストリーム内で受け取った全てのデータを追跡し続けることが必要なため、サポートされません。従って、これは効率的に実行することが本質的に難しいです。

グローバル ウォーターマークの制限

追加モードで、ステートフル操作が現在のウォーターマークと許容されるレコードの遅延よりも古い行を発行する場合、(Spark はグローバルウォーターマークを使うので)それらはダウンストリームのステートフル操作の中で “late rows” になります。これらの行は破棄される場合があることに注意してください。これはグローバルウォーターマークの制限で、正確性の問題を引き起こす可能性があります。

Spark はクエリの論理プランをチェックし、Spark がそのようなパターンを検出すると、警告をログに記録します。

以下のステートフル操作の後のステートフル操作は、この問題が発生する可能性があります:

  • 追加モードでのストリーミング集約
  • stream-stream 外部結合
  • 追加モードの mapGroupsWithStateflatMapGroupsWithState (状態関数の実装に依存します)

Spark は mapGroupsWithState/flatMapGroupsWithState の状態関数をチェックできないため、オペレータが追加モードを使うと、Spark は状態関数が遅れた行を出力すると想定します。

Sparkには、問題を特定するために役立つステートフルオペレータ演算子の遅延の数を確認する2つの方法があります。

  1. Spark UIの場合: SQLタブのクエリ実行詳細のページでステートフルオペレータノードのメトリクスを確認します。
  2. ストリーミングクエリリスナーの場合: QueryProcessEventの“stateOperators”の“numRowsDroppedByWatermark”を確認します。

“numRowsDroppedByWatermark”はウォーターマークによって“dropped”された行の数を表すことに注意してください。これはオペレータの“late input rows”の数と常に同じとは限りません。オペレータの実装によって異なります - 例えば、ストリーミング集約は入力行を事前に集約し、遅延入力を事前集約入力と照合するため、その数は元の入力行の数と同じではありません。値が0か0以外かを確認したいだけです。

既知の回避策があります: ストリーミングクエリをステートフルオペレータごとに複数のクエリに分割し、エンドツーエンドをクエリごとに1回だけ確実にします。最後のクエリに対してエンドツーエンドを1回だけ確実にすることは、オプションです。

状態ストア

状態ストアは、読み取り操作と書き込み操作の両方を提供するバージョン管理されたkey-valueストアです。構造化ストリーミングでは、ステートストアプロバイダを使用して、バッチ間でステートフル操作を処理します。2つの組み込みの状態ストアプロバイダの実装があります。エンドユーザは、StateStoreProviderインタフェースを拡張することで独自の状態ストアプロバイダを実装することもできます。

HDFS 状態ストアプロバイダ

HDFS バックエンド状態ストアプロバイダは、[[StateStoreProvider]]と[[StateStore]]のデフォルトの実装で、全てのデータが最初のステージでメモリマップに格納され、HDFS互換ファイルシステムのファイルによってバックアップされます。ストアへの全ての更新はトランザクションでセットで実行する必要があり、更新の各セットはストアのバージョンをインクリメントします。これらのバージョンを利用して、ストアの正しいバージョンで(RDDオペレータを再試行して)更新を再実行し、ストアのバージョンを再生成できます。

RocksDB状態ストア実装

Spark 3.2の時点で、新しい組み込みの状態ストア実装であるRocksDB状態ストアプロバイダを追加します。

ストリーミングクエリにステートフル操作(例えば、ストリーミング集約、ストリーミングdropDuplicates、ストリーム-ストリーム join、mapGroupsWithStateやflatMapGroupsWithState)があり、状態に数百万のキーを維持したい場合は、マイクロバッチ処理時間に大きなばらつきを生じさせる大規模なガベージコレクション(GC)に関連する問題に直面する可能性があります。これは、HDFSBackedStateStoreの実装により、状態データがexecutorのJVMメモリに保持され、多数の状態オブジェクトがJVMにメモリ圧迫をかけ、GCの一時停止が大きくなるために発生します。

このような場合、RocksDBに基づくより最適化された状態管理ソリューションを使うことを選択できます。このソリューションはJVMメモリに状態を保持するのではなく、RocksDBを使ってネイティブメモリとローカルディスクの状態を効率的に管理します。さらに、この状態の変更は、構造化ストリーミングによって指定したチェックポイントの場所に自動的に保存されるため、完全な耐障害性が保証されます(デフォルトの状態管理と同じです)。

新しい組み込みの状態ストアの実装を有効にするには、spark.sql.streaming.stateStore.providerClassorg.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProviderに設定します。

状態ストアプロバイダのRocksDBインスタンスに関する設定は以下の通りです:

設定名 説明 デフォルト値
spark.sql.streaming.stateStore.rocksdb.compactOnCommit コミット操作のためにRocksDBインスタンスの範囲圧縮を実行するかどうか False
spark.sql.streaming.stateStore.rocksdb.blockSizeKB RocksDBのデフォルトのSSTファイル形式であるRocksDBのBlockBasedTableのブロックごとに圧縮されたユーザデータのおおよそのサイズのKB。 4
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB ブロックのキャッシュのサイズ容量のMB。 8
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs RocksDBインスタンスのロード操作でロックを取得するためのミリ秒単位の待ち時間。 60000
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad ロードジにRocksDBの全てのティッカーとヒストグラムの統計をリセットするかどうか。 True

状態ストアとタスクのローカリティ

ステートフル操作は、executorの状態ストアにイベントの状態を格納します。状態ストアは状態を格納するために、メモリやディスクスペースなどのリソースを占有します。したがって、状態ストアプロバイダを異なるストリーミングバッチ間で同じexecutorで実行し続ける方が効率的です。状態ストアプロバイダの場所を変更するには、チェックポイントされた状態をロードするための追加のオーバーヘッドが必要です。チェックポイントから状態をロードするオーバーヘッドは、外部ストレージと状態のサイズに依存します。これは、マイクロバッチ実行のレイテンシを損なう傾向があります。非常に大きな状態データの処理などの一部のユースケースでは、チェックポイントされた状態から新しい状態ストアプロバイダをロードすると非常に時間がかかり、非効率になる可能性があります。

構造化ストリーミングクエリのステートフル操作は、SparkのRDDの優先ロケーション機能に依存して、同じexecutorで状態ストアプロバイダを実行します。次のバッチで対応する状態ストアプロバイダが再度スケジューリングされた場合、以前の状態を再利用してチェックポイントされた状態をロードする時間を節約できます。

ただし、一般的に優先ロケーションは難しい要件ではなく、Sparkが優先されるタスク以外のexecutorにタスクをスケジュールする可能性があります。この場合、Sparkは新しいexecutorのチェックポイントされた状態から状態ストアプロバイダをロードします。前のバッチで実行された状態ストアプロバイダは、すぐにはアンロードされません。Sparkはexecutorで非アクティブになっている状態ストアプロバイダを調査してアンロードするメンテナンスタスクを実行します。

spark.locality.waitなどのタスクスケジューリングに関連するSpark設定を変更することにより、ユーザはデータローカルタスクの起動を待機する時間をSparkに設定できます。構造化ストリーミングでのステートフル操作の場合、これを使って状態ストアプロバイダがバッチ間で同じexecutorで実行されるようにすることができます。

特に組み込みのHDFS状態ストアプロバイダの場合、ユーザはloadedMapCacheHitCountloadedMapCacheMissCountのような状態ストアメトリクスを確認できます。理想的には、キャッシュの欠落数を最小限に抑えることが最善です。これは、Sparkがチェックポイントされた状態のロードに多くの時間を使わないことを意味します。ユーザはSparkのローカリティ待機設定を増やして、バッチ間で異なるexecutorに状態ストアプロバイダをロードしないようにすることができます。

ストリーミング クエリの開始

一旦最終の結果データフレーム/データセットを定義すると、後に残っているのはストリーミング計算の開始です。そうするには、Dataset.writeStream()を経由して帰ってくるDataStreamWriter (Scala/Java/Python ドキュメント) を使う必要があります。このインタフェース内で1つ以上の以下のものを指定する必要があります。

  • Details of the output sink: データフォーマット、locationなど。

  • Output mode: 出力sinkに書き込まれるものを指定する。

  • Query name: 任意で、識別のためのクエリのユニークな名前を指定する。

  • Trigger interval: 任意で、トリガーの間隔を指定する。指定されない場合、システムは以前の処理が完了したらすぐに新しいデータが利用可能かを調べるでしょう。以前の処理が完了していないためにトリガーの時間に失敗した場合は、システムはすぐに処理を起動するでしょう。

  • チェックポイントの location: 端から端まで耐障害性を保証できる出力sinkについては、システムが全てのチェックポイントの情報を書き込むだろうlocationを指定します。これはHDFS互換の耐障害性ファイルシステムの中のディレクトリでなければなりません。チェックポイントのセマンティクスは次の章で詳細に議論されます。

出力モード

出力モードには2,3の種類があります。

  • Append モード (デフォルト) - これはデフォルトのモードで、最後のトリガーで結果テーブルに追加された新しい行だけがsinkに書き込まれるでしょう。これは結果テーブルに追加された行が変更されないだろうクエリのためだけにサポートされます。従って、このモードは各行が一度だけ出力されるだろうことを保証します (耐障害性sinkを仮定します)。例えば、select, where, map, flatMap, filter, join などだけを持つクエリがAppendモードをサポートするでしょう。

  • Complete モード - 各トリガーの後で、結果テーブル全体がsinkに出力されるでしょう。これは集約クエリのためにサポートされます。

  • Update モード - (Spark 2.1.1 から利用可能) 最後のトリガーから更新された結果テーブル内の行だけがsinkに出力されるでしょう。詳細な情報は将来のリリースで追加されるでしょう。

異なる型のストリーミングクエリは、異なる出力モードをサポートします。互換性のマトリックスです。

クエリの型 サポートされる出力モード 備考
集約有りのクエリ ウォーターマークを持つイベント時間の集約 Append, Update, Complete Append モードは古い集約状態を削除するためにウォーターマークを使います。しかし、モード セマンティクスによってウィンドウ化された集計の出力は、withWatermark() で指定された遅延閾値よりも遅れます。行はそれらが最終化 (例えばウォーターマークが交差した)後でのみ結果テーブルに追加できます。詳細は Late Data の章を見てください。

Update モードは古い集約状態を削除するためにウォーターマークを使います。

定義によりこのモードは結果テーブル内の全てのデータを保持するため、Complete モードは古い集約状態を削除しません。
他の集約 Complete, Update ウォーターマークが定義されていない(他のカテゴリの中でのみ定義されている)ため、古い集約状態は削除されません。

集約は更新することができ、従ってこのモードのセマンティクスに違反するため、Append モードはサポートされません。
mapGroupsWithState を使ったクエリ Update 集約はmapGroupsWithStateを使ったクエリで許可されません。
flatMapGroupsWithState を使ったクエリ 追加操作モード Append 集約は flatMapGroupsWithState の後に可能です。
更新操作モード Update 集約はflatMapGroupsWithStateを使ったクエリで許可されません。
joinsを使ったクエリ Append 更新と完了モードはまだサポートされません。どの種類のjoinがサポートされるかについての詳細は、joinオペレーションの章のサポート マトリックス を見てください。
他のクエリ Append, Update 結果テーブル内の全て集約されていないデータを保持することは実現不可能なため、Complete モードはサポートされません。

出力シンク

2,3の組み込みの出力sinkがあります。

  • ファイル sink - 出力をディレクトリに格納します。
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
  • Kafka sink - Kafka内の1つ以上のトピックに出力を格納します。
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
  • Foreach sink - 出力内のレコードの任意の計算を実行します。詳細はこの章の後の方を見てください。
writeStream
    .foreach(...)
    .start()
  • コンソール sink (デバッグのため) - トリガーがある度に出力をコンソール/標準出力に出力します。Append と Complete の出力モードの両方がサポートされます。各トリガーの後でドライバーのメモリ内に出力全体が格納されるため、多くないデータの量のデバッグの目的のために使われるべきです。
writeStream
    .format("console")
    .start()
  • メモリ sink (for debugging) - 出力はメモリ内にインメモリテーブルとして格納されます。Append と Complete の出力モードの両方がサポートされます。ドライバーのメモリ内に出力全体が格納されるため、多くないデータの量のデバッグの目的のために使われるべきです。従って、慎重に使ってください。
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

幾つかのsinkは出力の永続性を保証せず、デバッグの目的のために予定されているため、耐障害性がありません。耐障害性のセマンティクスの前の章を見てください。Sparkでの全てのsinkの詳細です。

シンク サポートされる出力モード オプション 耐障害性 備考
ファイル Sink Append path: path to the output directory, must be specified.
retention: time to live (TTL) for output files. TTLよりも古いバッチがコミットされた出力ファイルは、最終的にメタデータログから削除されます。これは、シンクの出力ディレクトリを読み取るリーダークエリがそれを処理しない可能性があることを意味します。時間の文字列形式として値を指定できます。("12h", "7d"など) デフォルトでは無効です。

ファイルフォーマットに固有のオプションについては、DataFrameWriter (Scala/Java/Python/R)ないの関係するメソッドを見てください。例えば、"parquet" フォーマットのオプションについては、DataFrameWriter.parquet()を見てください
はい (確実に1回) パーティション テーブルへの書き込みをサポートします。時間によるパーティションが有用かも知れません。
Kafka Sink Append, Update, Complete Kafka 統合ガイドを見てください はい (少なくとも1回) 詳細は Kafka 統合ガイドにあります
Foreach シンク Append, Update, Complete None はい (少なくとも1回) 詳細は次の章にあります
ForeachBatch Sink Append, Update, Complete None 実装に依存 詳細は次の章にあります
コンソール Sink Append, Update, Complete numRows: 各トリガー毎に出力する行の数 (デフォルト: 20)
truncate: 出力が長い場合に切り捨てるかどうか (デフォルト: true)
いいえ
メモリ Sink Append, Complete None いいえ。しかし、Complete モードでは、再起動されたクエリは完全なテーブルを再生成するでしょう。 テーブル名はクエリ名です。

実際にクエリの実行を開始するにはstart() を呼び出す必要があることに注意してください。これは連続して実行中の実行へのハンドラである StreamingQuery オブジェクトを返します。このオブジェクトをクエリを管理するために使うことができます。これについては次のサブセクションで議論します。今は、2,3の例を使ってこれについて理解しましょう。

// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")   

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

# Interactively query in-memory table
head(sql("select * from aggregates"))
Foreach と ForeachBatch の使用

foreachforeachBatch 操作により、任意の操作と書き込みロジックをストリーミング クエリの出力に適用することができます。それらは僅かに異なるユースケースを持ちます - foreach は各行への独自の書き込みロジックを許可します、foreachBatch は各マイクロ バッチの出力への任意の操作と独自のロジックを許可します。この使い方をもっと詳細に理解しましょう。

ForeachBatch

foreachBatch(...) により、ストリーミング クエリの各マイクロ バッチの出力データで実行される関数を指定することができます。Spark 2.4から、これはScala, Java および Pythonでサポートされます。2つのパラメータを取ります: マイクロ バッチの出力データを持つデータフレームあるいはデータセットと、マイクロ バッチのユニークなID。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF 
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }    
  }
).start();
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass
  
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()   

R はまだサポートされません。

foreachBatchを使って、以下を行うことができます。

  • 既存のバッチ データ ソースの再利用 - 多くのストレージ システムでは、ストリーミング シンクはまだ利用できないかもしれないですが、バッチ クエリ用のデータ ライタが既にあるかもしれません。foreachBatchを使うと、各マイクロ バッチの出力にバッチ データ ライタを使うことができます。
  • 複数の場所への書き込み - 複数の場所にストリーミング クエリの出力を書き込みたい場合、単純に出力データフレーム/データセットを複数回書き込むことができます。ただし、書き込みを試みるたびに出力データが再計算されます(入力データの再読み込みの可能性を含む)。再計算を回避するには、出力のデータフレーム/データセットをキャッシュし、それを複数の場所に書き、それからキャッシュを解除する必要がります。以下は概要です。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}
  • 追加のデータフレームの操作の適用 - Sparkはストリーミングのデータフレームの逐次的な生成プランをサポートしないため、多くのデータフレームとデータセットの操作はストリーミングデータフレームでサポートされません。foreachBatchを使うと、これらの操作を各マイクロバッチの出力に適用できます。ただし、その操作を自分自分で行う場合のエンドツーエンドのセマンティクスについて推論する必要があります。

注意:

  • デフォルトでは、foreachBatch は少なくとも1回の書き込みの保証のみを提供します。ただし、出力に重複制御を行い、確実に一回の保証を取得する方法として、関数に提供されたバッチIDを使うことができます。
  • foreachBatchは、基本的にストリーミング クエリのマイクロバッチ実行に依存するため、連続処理モードでは動作しません。連続モードでデータを書き込む場合は、代わりにforeachを使ってください。
Foreach

foreachBatchがオプションでは無い場合 (例えば、対応するバッチライタが存在しないか、連続処理モード)、独自のライターロジックをforeachを使って表現することができます。具体的には、データ書き込みロジックを3つのメソッドに分割することで表現することができます: open, process および close。Spark 2.4 から、Scala, Java およびPythonでforeach が利用可能です。

Scala では、ForeachWriter (ドキュメント) を拡張する必要があります。

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

Java では、ForeachWriter (ドキュメント) を拡張する必要があります。

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

Pythonでは、2つの方法でforeachを呼び出すことができます: 関数内あるいはオブジェクト内。関数は処理ロジックを表現するための簡単な方法を提供しますが、障害が一部の入力データの再処理を引き起こした場合に生成されたデータを重複排除することができません。その状況では、オブジェクト内で処理ロジックを指定する必要があります。

  • 最初に、関数は入力として行を取ります。
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()  
  • 次に、オブジェクトは処理メソッド、オプションのopenおよびcloseメソッドを持ちます:
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. このメソッドは Python ではオプションです。
        pass

    def process(self, row):
        # Write row to connection. このメソッドは Python ではオプションではありません。
        pass

    def close(self, error):
        # Close the connection. このメソッドは Python ではオプションです。
        pass
      
query = streamingDF.writeStream.foreach(ForeachWriter()).start()

R はまだサポートされません。

実行セマンティクス ストリーミング クエリが開始されると、Sparkは関数またはオブジェクトのメソッドを以下の方法で呼びます:

  • このオブジェクトの単一のコピーはクエリ内の単一のタスクによって生成された全てのデータに責任があります。別の言い方をすると、1つのインスタンスが分散方式で生成されたデータの1つのパーティションの処理に責任があります。

  • 各タスクは提供されたオブジェクトの新しいシリアライズ化-デシリアライズ化されたコピーを取得するため、このオブジェクトはシリアライズ化が可能でなければなりません。従って、データを書き込むための初期化 (例えば、接続を開く、トランザクションの開始)は、open()メソッドが呼ばれた後に実行することをお勧めします。これはタスクがデータの生成の準備ができたことを意味します。

  • メソッドのライフサイクルは以下の通りです:

    • partition_id を持つ各パーティションについて:

      • epoch_id を持つストリーミングデータの各batch/epochについて:

        • メソッド open(partitionId, epochId) が呼ばれます。

        • もし open(…) がtrueを返す場合、パーティションおよびbatch/epock内の各行について、メソッド process(row) が呼ばれます。

        • メソッド close(error)が呼ばれ、行の処理中にエラー(ある場合)が表示されます。

  • open()メソッドが存在し、JVMあるいはPythonプロセスが途中でクラッシュする場合を除いて正常に返る(戻り値に関係なく)場合、close() メソッド (存在する場合)が呼ばれます。

  • 注意: Spark は (partitionId, epochId) の同じ出力を保証しないため、(partitionId, epochId) では重複排除を達成できません。例えば、ソースはなんらかの理由で異なる数のパーティションを提供し、Spark の最適化はパーティションの数などを変更します。詳細は SPARK-28650 を見てください。出力の重複排除が必要な場合は、代わりに foreachBatch を試してください。

Streaming Table APIs

Spark 3.1以降、DataStreamReader.table()を使ってテーブルをストリーミングデータフレームとして読み込むこともでき、DataStreamWriter.toTable()を使ってテーブルとしてストリーミングデータフレームを書き込むことができます。

val spark: SparkSession = ...

// Create a streaming DataFrame
val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()

// Write the streaming DataFrame to a table
df.writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable")

// Check the table result
spark.read.table("myTable").show()

// Transform the source dataset and write to a new table
spark.readStream
  .table("myTable")
  .select("value")
  .writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable")

// Check the new table result
spark.read.table("newTable").show()
SparkSession spark = ...

// Create a streaming DataFrame
Dataset<Row> df = spark.readStream()
  .format("rate")
  .option("rowsPerSecond", 10)
  .load();

// Write the streaming DataFrame to a table
df.writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable");

// Check the table result
spark.read().table("myTable").show();

// Transform the source dataset and write to a new table
spark.readStream()
  .table("myTable")
  .select("value")
  .writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable");

// Check the new table result
spark.read().table("newTable").show();
spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()

Rでは利用できません。

詳細については、DataStreamReader (Scala/Java/Python docs)とDataStreamWriter (Scala/Java/Python docs)のドキュメントを調べてください。

トリガー

ストリーミング クエリのトリガー設定はストリーミング データ処理のタイミングを定義します。固定のバッチ間隔を持つマイクロバッチ クエリ、あるいは連続するクエリ処理として実行されるかどうか。サポートされる異なる種類のトリガーがあります。

トリガーの型 説明
無指定 (デフォルト) トリガーの設定が明示的に指定されない場合、デフォルトではクエリはマイクロバッチモードで実行されるでしょう。マイクロバッチは前のマイクロバッチが処理を完了するとすぐに生成されるでしょう。
固定間隔のマイクロ バッチ クエリはマイクロバッチモードで実行されるでしょう。マイクロバッチはユーザ定義の間隔で開始されるでしょう。
  • 前のマイクロバッチが間隔内で完了する場合、次のマイクロバッチが開始される前にエンジンは間隔を超えるまで待つでしょう。
  • もし前のマイクロバッチが終了するまでに間隔より長く掛かった場合(つまり、間隔の境界を抜けた場合)、次のマクロバッチは前のものが完了するとすぐに開始するでしょう (つまり、次の間隔の境界を待たないでしょう)。
  • もし新しいデータが利用可能では無い場合、マイクロバッチは開始されないでしょう。
1回だけのマイクロバッチ クエリは全ての利用可能なデータを処理するために1回だけマイクロバッチを実行し、自力で停止するでしょう。これは定期的にクラスタを分離新設し、最後の期間から利用可能な全てを処理し、そしてクラスタを終了するシナリオで便利です。場合によっては、これが極度にコストを節約することに繋がるかもしれません。
固定のチェックポイント間隔の連続
(実験的l)
クエリは新しい低レンテンシ、連続処理モードで実行されるでしょう。これについての詳細は以下の連続処理の章を読んでください。

2,3のコードの例があります。

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start();
# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \
  .start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

# One-time trigger
df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")

# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")

# One-time trigger
write.stream(df, "console", trigger.once = TRUE)

# Continuous trigger is not yet supported

ストリーミング クエリの管理

クエリが開始された時に生成されたStreamingQueryオブジェクトはクエリを監視および管理するために使うことができます。

val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress  # a list of the most recent progress updates for this query

query.lastProgress    # the most recent progress update of this streaming query
query <- write.stream(df, "console")  # get the query object

queryName(query)          # get the name of the auto-generated or user-specified name

explain(query)            # print detailed explanations of the query

stopQuery(query)          # stop the query

awaitTermination(query)   # block until query is terminated, with stop() or with error

lastProgress(query)       # the most recent progress update of this streaming query

1つのSparkSession内で任意の数のクエリを開始することができます。それら全ては同時にクラスタのリソースを共有して並行で実行するでしょう。現在アクティブなクエリを管理するために使うことができるStreamingQueryManager (Scala/Java/Python docs) を取得するために、sparkSession.streams() を使うことができます。

val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates
spark = ...  # spark session

spark.streams.active  # get the list of currently active streaming queries

spark.streams.get(id)  # get a query object by its unique id

spark.streams.awaitAnyTermination()  # block until any one of them terminates
R in 利用できません。

ストリーミングクエリの監視

アクティブなストリーミング クエリを監視する複数の方法があります。SparkのDropwizardメトリクスサポートを使った外部システムへメトリクスを追い出すか、プログラム的にそれらにアクセスするかのどちらかを行うことができます。

対話的にメトリクスを読み込む

streamingQuery.lastProgress()streamingQuery.status() を使ってアクティブなクエリの現在の状態とメトリクスを直接取得することができます。lastProgress()Scala および Java でのStreamingQueryProgress オブジェクトとPythonでの同じフィールドを持つディクショナリを返します。ストリームの最後のトリガーで行われた進捗についての全ての情報を持ちます - 何のデータが処理されたか、処理のレートがどれくらいか、レンテンシなど。最後の2,3の進捗の配列を返すstreamingQuery.recentProgress もあります。

更に、streamingQuery.status()Scala および JavaでのStreamingQueryStatus オブジェクトとPythonでの同じフィールドを持つディクショナリを返します。それはクエリがすぐに何をするかについての情報を与えます - トリガーを有効にする、データが処理されるなど。

2,3の例です。

val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
''' 
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
query <- ...  # a StreamingQuery
lastProgress(query)

'''
Will print something like the following.

{
  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
  "name" : null,
  "timestamp" : "2017-04-26T08:27:28.835Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 0
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
  }
}
'''

status(query)
'''
Will print something like the following.

{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
'''

非同期APIを使ったプログラム的なメトリクスのレポート

StreamingQueryListener をアタッチすることでSparkSession に関連する全てのクエリを非同期で監視することもできます (Scala/Java ドキュメント)。sparkSession.streams.attachListener()を使って独自のStreamingQueryListener オブジェクトを一度アタッチすると、クエリが開始および停止した時とアクティブなクエリ内で進捗があった時のコールバックを得ます。例は以下のようになります。

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
Python では利用できません。
R in 利用できません。

Dropwizardを使ったメトリクスのレポート

SparkはDropwizard ライブラリを使ったレポーティング メトリクスをサポートします。構造化ストリーミングクエリのメトリクスもレポートされるようにするには、明示的にSparkSession内の設定spark.sql.streaming.metricsEnabledを有効にする必要があります。

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
sql("SET spark.sql.streaming.metricsEnabled=true")

この設定が有効にされた後のSparkSessionで開始された全てのクエリはDropwizardを使って設定されたどのようなsinks (例えば Ganglia, Graphite, JMXなど)にも報告されるでしょう。

チェックポイントを使った障害からの回復

障害あるいは計画的なシャットダウンの場合、前の進捗と前のクエリの状態を回復することができ、中止したところから再開することができます。これはチェックポイントと書き込み先行ログを使って行われます。チェックポイントのlocationを持つクエリを設定することができ、クエリは全ての進捗の情報(つまり、各トリガーで処理されたオフセットの範囲)と実行中の集約(例えば、quick exampleでのword count)をチェックポイントのlocationに保存するでしょう。このチェックポイントのlocationはHDFS互換ファイルシステムのパスでなければならず、クエリを実行する時にDataStreamWriterでのオプションとして設定することができます。

aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();
aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

ストリーミング クエリ内での変更の後のセマンティクスの回復

同じチェックポイントの場所からの再起動の間に許可されるストリーミング クエリの変更には制限があります。許可されていないか、あるいは変更の効果が明確に定義されていない変更の幾つかを以下に示します。それらの全てについて:

  • allowed という用語は、指定された変更を行うことができるが、その効果のセマンティクスが明確に定義されているかどうかはクエリとその変更によって異なることを意味します。

  • not allowed という用語は、指定された変更は再起動されたクエリが予測不可能なエラーで失敗する可能性があるため、するべきではないことを意味します。sdf は sparkSession.readStream を使って生成されたストリーミング データフレーム/データセットを表します。

変更の種類

  • 入力ソースの数または型(つまり異なるソース)の変更: これは許可されません。

  • 入力ソースのパラメータ内の変更: これが許可されているかどうか、変更のセマンティクスが明確に定義されているかどうかは、ソースとクエリに依存します。2,3の例です。

    • レート制限の追加/削除/変更が許可されます: spark.readStream.format("kafka").option("subscribe", "topic") to spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)

    • 購読されたトピック/ファイルの変更は、結果が予想不可能なため通常は許可されません: spark.readStream.format("kafka").option("subscribe", "topic") to spark.readStream.format("kafka").option("subscribe", "newTopic")

  • 出力シンクの型の変更: シンクの幾つかの特定の組み合わせ間の変更は許可されます。これはケースバイケースで検証される必要があります。2,3の例です。

    • Kafkaシンクへのファイル シンクは許可されます。Kafkaは新しいデータのみを表示します。

    • ファイル シンクへのKafka シンクは許可されません。

    • foreachへ変更されたKafkaシンク、あるいはその逆は許可されます。

  • 出力シンクのパラメータ内の変更: これが許可されているかどうか、変更のセマンティクスが明確に定義されているかどうかは、シンクとクエリに依存します。2,3の例です。

    • ファイル シンクの出力ディレクトリの変更は許可されません: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")

    • 出力トピックの変更は許可されます: sdf.writeStream.format("kafka").option("topic", "someTopic") to sdf.writeStream.format("kafka").option("topic", "anotherTopic")

    • ユーザ定義のforeachシンク(つまり、ForeachWriter コード) の変更は許可されますが、変更のセマンティクスはコードに依存します。

  • 投影/フィルタ/マップのような操作の変更: 幾つかのケースは許可されます。例えば:

    • フィルタの 追加 / 削除 は許可されます: sdf.selectExpr("a") to sdf.where(...).selectExpr("a").filter(...).

    • 同じ出力スキーマの投影の変更は許可されます: sdf.selectExpr("stringColumn AS json").writeStream to sdf.selectExpr("anotherStringColumn AS json").writeStream

    • 異なる出力スキーマの投影の変更は、条件付きで許可されます: sdf.selectExpr("a").writeStream から sdf.selectExpr("b").writeStream へは、出力シンクが"a" から "b"へのスキーマ変更を許可する場合のみ、許可されます。

  • ステートフル操作の変更: ストリーミング クエリ内の一部の操作は結果を継続的に更新するために状態データを維持する必要があります。構造化ストリーミングは状態データを耐障害性のあるストレージ (例えば、HDFS, AWS S3, Azure Blob ストレージ) に自動的にチェックポイントし、それを再起動後に復元します。ただし、これは状態データのスキーマが再起動後も同じままであることを前提とします。ストリーミング クエリのステートフル操作への変更 (つまり、追加、削除 あるいはスキーマの変更)は再起動の間に許可されません。状態の回復を保証するために、再起動間でスキーマを変更しないステートフル操作のリストを以下に示します:

    • ストリーミング集約: 例えば、sdf.groupBy("a").agg(...)。グルーピング キーまたは集約の数または型の変更は許可されません。

    • ストリーミング重複排除: 例えば、sdf.dropDuplicates("a")。重複排除カラムの数や型を変更することはできません。

    • ストリーム - ストリーム join: 例えば、sdf1.join(sdf2, ...) (つまり、両方の入力はsparkSession.readStreamを使って生成されます)。スキーマあるいは等結合のカラムの変更は許可されません。joinの型(outer あるいは inner) の変更は許可されません。join条件の他の変更は不明確です。

    • 任意のステートフル操作: 例えば、sdf.groupByKey(...).mapGroupsWithState(...) あるいは sdf.groupByKey(...).flatMapGroupsWithState(...)。ユーザ定義の状態のスキーマとタイムアウトの型を変更することはできません。ユーザ定義の状態マッピング関数内の変更は許可されますが、変更のセマンティックな効果はユーザ定義のロジックに依存します。状態スキーマの変更を本当にサポートしたい場合は、スキーマの移行をサポートするエンコーディング/デコーディングを使って、複雑な状態データ構造を明示的にバイトにエンコード/デコードすることができます。例えば、状態を Avro-encoded バイトとして保存したい場合、バイナリの状態は常に正常に復元されるため、クエリの再起動間で Avro-state-schema を自由に変更することができます。

連続的な処理

[実験的]

連続的な処理は新しい、Spark2.3で導入された少なくとも1回の保証の低レンテンシ(~1ms)のend-to-endが可能な実験的なストリーミング実行モードです。これに比べて、デフォルトのマイクロバッチ処理エンジンは、確実に1回の保証を実現するが、せいぜい ~100msのレイテンシを実現します。幾つかの種類のクエリ(以下で議論されます)について、アプリケーションのロジックの変更無し(つまりデータフレーム/データセットの操作無し)にそれらを実行するモードを選択することができます。

連続的な処理モードでサポートされるクエリを実行するのに必要なのは、パラメータとして望ましいチェックポイントの間隔と一緒に連続的なトリガを指定することだけです。例えば:

import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();
spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .trigger(continuous="1 second") \     # only change in query
  .start()

1秒のチェックポイントの間隔は連続的な処理のエンジンが毎秒ごとにクエリの進捗を記録するだろうことを意味します。結果のチェックポイントはマイクロバッチ エンジンと互換性のある形式で、従ってどのようなトリガーを使っても全てのクエリを再開することができます。例えば、マイクロバッチモードで開始されたサポートされるクエリは連続的なモードで再開することができます。逆もまた然りです。連続的なモードに切り替える時はいつでも、少なくとも1回の耐障害性の保証になるだろうことに注意してください。

サポートされるクエリ

Spark 2.4 の時点で、連続的な処理モードでは以下のクエリの種類だけがサポートされます。

  • 操作: マップのような Dataset/DataFrame オペレーションだけが連続的なモードでサポートされます。つまり、写像 (select, map, flatMap, mapPartitionsなど) と選択 (where, filter など)だけ。
    • 集約関数 (集約はまだサポートされないため)、current_timestamp()current_date() (時間を使う決定論的な計算はやりがいがあります)を除く全てのSQL関数がサポートされます。
  • ソース:
    • Kafka ソース: 全てのオプションがサポートされます。
    • Rate ソース: テストには良いです。連続的なモードでのみサポートされるオプションは numPartitionsrowsPerSecondです。
  • シンク:
    • Kafka シンク: 全てのオプションがサポートされます。
    • Memory シンク: デバッグには良いです。
    • Console シンク: デバッグには良いです。全てのオプションがサポートされます。コンソールは連続的なトリガー内で指定した各チェックポイントの間隔で出力するだろうことに注意してください。

それらの詳細については入力ソース出力シンクを見てください。コンソールシンクはテストに適していますが、エンドツーエンドの低レイテンシ処理はソースとシンクが Kafka の場合に最もよく観察できます。これによりエンジンがデータを処理し、入力トピック内で利用可能な入力データのミリ秒内に出力トピック内で結果を利用可能にします。

警告

  • 連続的な処理エンジンは、連続的にでーたをソースから読み込み、それを処理し、連続的にシンクに書き込む複数の長く実行するタスクを起動します。クエリによって必要とされるタスクの数はクエリがソースから並行してどれだけ多くのパーティションから読み込むことができるかに依存します。従って、連続的な処理のクエリが開始する前に、クラスタ内に並行して全てのタスクにとって十分なコアがあるようにしてください。例えば、もし10パーティションを持つKafkaトピックから読み込む場合は、クエリが進捗するのにクラスタは少なくとも10のコアを持つ必要があります。
  • 連続的なストリームの処理の停止は偽のタスクの終了警告を生成するかもしれません。これらは安全に無視することができます。
  • 現在のところ失敗したタスクの自動的な再試行はありません。全ての失敗はクエリを停止することに繋がり、手動でチェックポイントから再開する必要があります。

追加の情報

備考

  • クエリの実行後は、幾つかの設定は変更できません。それらを変更するには、チェックポイントを破棄し、新しいクエリを開始します。これらの設定には以下が含まれます:
    • spark.sql.shuffle.partitions
      • これは状態の物理的な分割によるものです: 状態はキーへのハッシュ関数を適用することで分割されるため、状態の分割数は変更されません。
      • ステートフル操作で実行するタスクの数を減らしたい場合は、coalesce を使って不必要なパーティション分割を回避することができます。
        • coalesce の後で、別のシャッフルが発生しない限り、(削減された)タスクの数は保持されます。
    • spark.sql.streaming.stateStore.providerClass: クエリの以前の状態を正しく読み取るには、状態ストアプロバイダのクラスを変更しないでください。
    • spark.sql.streaming.multipleWatermarkPolicy: クエリに複数のウォーターマークが含まれる場合、これを変更すると、ウォーターマークの値に一貫性が無くなるため、ポリシーは変更しないでください。

更なる読み物

トーク

  • Spark サミット Europe 2017
    • Apache Sparkの構造化ストリーミングを使った簡単、スケーラブル、耐障害性のストリーミング処理 - Part 1 slides/video, Part 2 slides/video
    • 構造化ストリーミングでのステートフル ストリーム処理へのディープ ダイブ - slides/video
  • Spark サミット 2016
    • 構造化ストリーミングへの深みへのディープ ダイブ - slides/video

移行ガイド

移行ガイドは今はこのページに保管されています。

TOP
inserted by FC2 system