構造化ストリーミングプログラミングガイド

概要

構造化ストリーミングはSpark SQLエンジン上に構築されたスケーラブルで耐障害性のあるストリーミング処理エンジンです。静的なデータ上のバッチ計算を表すのと同じ方法でストリーミングの計算を表現することができます。Spark SQL エンジンはそれを逐次および継続的に実行し、最終的な結果を到着し続けるストリーミングデータとして更新するように注意するでしょう。ストリーミング集約、イベントタイム ウィンドウ、ストリーム-to-バッチ joinなどを表現するために、Scala、Java、PythonあるいはRでDataset/DataFrame APIを使うことができます。計算は同じ最適化されたSpark SQLエンジン上で実行されます。結果的に、システムはチェックポイントと先行書き込みログを使って end-to-endの確実に一回の耐障害性を保証します。簡単に言うと、ユーザがストリーミングについて推論する必要無く、高速、スケーラブル、耐障害性、end-to-endの確実に一回のストリーミング処理を提供します。

このガイドでは、プログラミングモデルとAPIを渡り歩いてみようと思います。まず、簡単な例 - ストリーミングのワードカウントを始めましょう。

クリックな例

TCPソケット上でlistenしているデータサーバから受け取ったテキストデータの単語のカウントの実行を続けたいとします。構造化ストリーミングを使ってこれをどう表現できるかを見てみましょう。完全なコードはScala/Java/Python/Rで見ることができます。そしてSparkをダウンロードした場合は、直接例を実行することができます。どの場合でも、一歩ずつ例を見てそれがどのように動くかを理解しましょう。まず、必要なクラスをインポートし、Sparkに関する全ての機能の開始点となるローカルSparkセッションを作成しなければなりません。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
sparkR.session(appName = "StructuredNetworkWordCount")

次に、localhost:9999上でlistenするサーバから受け取るテキストデータを表現するストリーミングデータフレームを作成して、word countを計算するためにデータフレームを変換してみましょう。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

このlines データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、各行を複数の単語に分割するためにflatMap を適用できるように、.as[String] を使ってデータフレームを文字列のデータセットに変換しました。結果のwords データセットは全ての単語を含みます。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

このlines データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、各行を複数の単語に分割するためにflatMap を適用できるように、.as(Encoders.STRING()) を使ってデータフレームを文字列のデータセットに変換しました。結果のwords データセットは全ての単語を含みます。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

このlines データフレームはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、二つの組み込みのSQL関数を使いました - 各行をそれぞれの単語を持つ複数の行に分割する split と explode。更に、新しいカラムに“word”という名前を付けるために関数aliasを使います。結果的に、データセット内のユニークな値によってグループ化し、それらを数えることでwordCounts データフレームを定義しました。これはストリームの実行中のword countを表すストリーミングデータフレームであることに注意してください。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

このlines SparkDataFrameはストリーミングテキストデータを含む境界の無いテーブルを表します。このテーブルは “value” という名前の文字列の1つのカラムを含み、ストリーミングテキストデータ内の各行はテーブル内の行になります。変換をちょうど設定し、まだそれを開始していないため、どのようなデータも今のところ受け取っていないことに注意してください。次に、二つのSQL関数を持つSQL表現があります - 各行をそれぞれの単語を持つ複数の行に分割する split と explode。更に、新しいカラムに “word” という名前を付けます。最後に、SparkDataFrame内のユニークな値によってグルーピングし、それらをカウントすることで、wordCounts SparkDataFrame を定義しました。これはストリームの実行中のword countを表すSparkDataFrameであることに注意してください。

これで、ストリームデータ上にクエリを設定しました。最後に残っているのは、実際にデータの受信を開始し、カウントを計算することです。これをするために、カウントの完全なセット(outputMode("complete")によって指定されます)が更新される度に、コンソールにそれらの出力を設定します。そして、start()を使ってストリーミングの計算を開始します。

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

awaitTermination(query)

コードが実行された後で、ストリーミングの計算がバックグラウンドで開始されるでしょう。query オブジェクトは有効なストリーミングのクエリへのハンドラで、クエリが有効な間プロセスが終了することを避けるために awaitTermination() を使ってクエリの終了を待つと決めました。

この例のコードを実際に実行するために、独自の Spark アプリケーション内でコードをコンパイルするか、一旦Sparkをダウンロードして単純に例を実行することができます。後者を示します。以下のようにして最初にNetcat(ほとんどのUnix系のシステムで見つけることができる小さなユーティリティ)をデータサーバとして実行する必要があるでしょう。

$ nc -lk 9999

それから、違うターミナルで、以下のようにして例を実行することができます。

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

それから、netcatサーバを実行しているターミナルの中で入力される全ての行がカウントされ、毎秒ごとに画面に出力されるでしょう。それは以下のように見えるでしょう。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.R

$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

プログラミング モデル

構造化ストリーミングの重要なアイデアは、活気のあるデータストリームを連続して追加されるテーブルとして扱うことです。これによりバッチ処理モデルにとても良く似た新しいストリーム処理となります。ストリーミング計算を静的なテーブル上の標準的なバッチのようなクエリとして表し、Sparkはそれを無限の入力テーブル上の 増加するクエリとして実行するでしょう。このモデルをもっと詳細に理解しましょう。

基本概念

入力データストリームを “入力 テーブル”と見なします。ストリーム上で到着する各データ項目は、入力テーブルに追加される新しい行のようなものです。

テーブルのようなストリーム

入力上のクエリは “結果テーブル”を生成するでしょう。各引き起こされる間隔(例えば、各1秒)ごとに、新しい行が入力テーブルに追加されます。これは結果的に結果テーブルを更新します。結果テーブルが更新されるといつも変更された結果の行を外部のsinkに書き込みたいでしょう。

モデル

“出力” は外部ストレージに書き込まれたものとして定義されます。出力は異なるモードで定義することができます:

各モードはクエリの特定の型に適用可能です。詳細は後で議論されます。

このモデルの使い方を説明するために、以前のQuick Exampleのコンテキストのモデルを理解しましょう。最初のlines データフレームは入力テーブルで、最後のwordCounts データフレームは結果テーブルです。wordCountsを生成するためのストリーミングlines データフレーム上のクエリは、それが静的なデータフレームだろうということで、完全に同じです。しかし、このクエリが開始されると、Sparkはソケット接続から新しいデータを連続して調べるでしょう。以下で示すように、もし新しいデータがあれば、Sparkは更新カウントを計算するために、前の実行中のカウントと新しいデータを組み合わせる “incremental” クエリを実行するでしょう。

モデル

このモデルは他の多くのストリーミング処理エンジンと極めて異なります。Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). このモデルでは、新しいデータがある場合Sparkは結果テーブルを更新する責任があります。従ってユーザがそれについて推論する必要がありません。例として、このモデルがどうやってイベント時間に基づいた処理と後でやってくるデータを扱うかを見てみましょう。

Event-time と Late Dataの扱い

イベント時間はデータ自身の中に埋め込まれている時間です。多くのアプリケーションで、このイベント時間上で操作したいと思うかも知れません。例えば毎秒IoTデバイスによって生成されるイベントの数を取得したい場合、Sparkが受け取った時間よりもおそらくデータが生成された時間を使いたいでしょう。このイベント時間はこのモデルでとても自然に表現されます – デバイスからの各イベントは表内の行で、イベント時間は行内の列の値です。これにより、ウィンドウに基づく集約(例えば、各秒毎のイベントの数)がイベント時間の列上の単なるグルーピングと集約の特別な形になります – 各時間ウィンドウはグループで、各行は複数のウィンドウ/グループに所属するかも知れません。従って、そのようなイベント時間のウィンドウに基づいた集約のクエリは、静的なデータセット(例えば、デバイスのイベントログを集めたもの)とデータストリームの両方で首尾一貫して定義することができ、ユーザの生活をもっと簡単にします。

更に、このモデルは本質的にイベント時間に基づいて期待したよりも遅く到着したデータを処理します。Sparkは結果テーブルを更新するため、遅れたデータがある場合は古い集約の更新と、中間状態データのサイズを制限するために古い集約の掃除について完全な制御をします。Spark 2.1からは、ユーザが遅れたデータの閾値を指定することができるウォーターマークをサポートし、そのためエンジンが古い状態を掃除することができます。後でウィンドウ操作 の章の中で詳細が説明されます。

耐障害性semantics

端から端まで確実に1つのセマンティクスの実現が構造化ストリーミングの設計の背景にある主要な目的の一つです。To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. 各ストリーミングソースはストリーム内で読み込みの場所を追跡するためにオフセット(Kafkaのオフセット、あるいはKinesisのシーケンス番号に似ています)を持つと仮定します。エンジンは各トリガー内で処理されるデータのオフセット範囲を記録するために、チェックポイントと先行書き込みログを使用します。ストリーミング シンクは再処理の扱いが等冪であるように設計されています。再生ソースと等冪シンクの使用と合わせて、構造化ストリーミングはどのような障害下でも 端から端まで確実に一回のセマンティクス を保証することができます。

データセットとデータフレームを使うAPI

Spark 2.0から、データフレームとデータセットはストリーミング、境界無しデータと同様に、静的、境界ありのデータを表すことができます。静的なデータセット/データフレームと似て、ストリーミング データフレーム/データセットをストリーミングソースから生成するために、一般的なエントリポイントSparkSession (Scala/Java/Python/R ドキュメント) を使うことができ、それらに同じオペレーションを静的なデータフレーム/データセットとして適用することができます。データセット/データフレームに馴染みが無い場合は、DataFrame/Dataset プログラミング ガイドを使ってそれらに慣れることを強くお勧めします。

ストリーミングデータフレームとストリーミングデータセットの生成

ストリーミングデータフレームはSparkSession.readStream()によって返されるDataStreamReader インタフェース(Scala/Java/Python ドキュメント) を使って生成されるかも知れません。Rでは、read.stream() メソッドを使います。静的データフレームを生成するための読み込みインタフェースと似て、ソースの詳細 – データフォーマット、スキーマ、オプションなどを指定することができます。

入力ソース

Spark 2.0には、2、3の組み込みのソースがあります。

幾つかのソースは、障害の後でのチェックポイント オフセットを使ったデータの再生がされないかも知れないので、耐障害性がありません。耐障害性のセマンティクスの前の章を見てください。Sparkの全てのソースの詳細がここにあります。

ソース オプション 耐障害性 備考
ファイル ソース path: 入力ディレクトリへのパス。全てのファイルフォーマットで共通です。
maxFilesPerTrigger: 各トリガーごとに判断する新しいファイルの最大の数 (デフォルト: 上限無し)
latestFirst: 最新の新しいファイルを最初に処理するかどうか。大きなバックログファイルがある場合に便利です (デフォルト: false)
latestFirst: フルパスの代わりにファイル名だけに基づいて新しいファイルをチェックするかどうか (デフォルト: false)。これを `true` にすることで、ファイル名"dataset.txt"が同じなので、以下のファイルは同じファイルとみなされるでしょう:
· "file:///dataset.txt"
· "s3://a/dataset.txt"
· "s3n://a/b/dataset.txt"
· "s3a://a/b/c/dataset.txt"


ファイル形式固有のオプションについては、DataStreamReader (Scala/Java/Python/R) 内の関連するメソッドを見てください。例えば、"parquet" 形式のオプションについては、DataStreamReader.parquet()を見てください。
Yes globパスをサポートしますが、複合的なカンマ区切りのパス/globはサポートしません。
ソケット ソース host: 接続するホスト。指定されなければなりません
port: 接続するポート。指定されなければなりません
いいえ
Kafka ソース Kafka 統合ガイドを見てください。 Yes

幾つかの例です。

val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

これらの例は型無しのストリーミング データフレームを生成します。データフレームのスキーマはコンパイル時にチェックされず、クエリがサブミットされた時にのみ実行時にチェックされることを意味します。map, flatMap などの幾つかの操作は、コンパイル時に型が知られている必要があります。そうするために、これらの型無しのストリーミング データフレームを、静的なデータフレームとして同じメソッドを使って型有りのストリーミング データセットに変換することができます。詳細はSQL プログラミング ガイド を見てください。更に、サポートされるストリーミングソースについて更に詳しくこのドキュメントの後で議論されます。

スキーマ推論とストリーミング データフレーム/データセットのパーティション

デフォルトで、ファイルベースソースの構造化ストリーミングは、自動的に推測するためにSparkに頼るではなく、スキーマを指定することを必要とします。この制限は、障害時の場合においても、ストリーミング クエリのために一貫したスキーマが使われるだろうことを保証します。その場限りのために、spark.sql.streaming.schemaInferencetrue に設定することでスキーマの推測を再び有効にすることができます。

/key=value/ という名前のサブディレクトリが存在する場合はパーティションの調査は起こらず、リスト化はこれらのディレクトリ内に再帰的に起こるでしょう。これらのカラムがユーザが提供したスキーマの中で現れる場合、それらはファイルが読み込まれたパスに基づいたSparkによって情報が与えられるでしょう。パーティションスキーマを構成するディレクトリは、クエリが開始される時に存在しなければならず、静的なままでなければなりません。例えば、/data/year=2015/が存在する時に/data/year=2016/ を追加することは問題無いですが、パーティションカラムを変更(つまり、/data/date=2016-04-17/ディレクトリを生成)することは正しくありません。

ストリーミング データフレーム/データセット での操作

ストリーミング データフレーム/データセット に全ての種類の操作を適用することができます – 型無し SQL風の操作 (例えば select, where, groupBy) から型有りのRDD風の操作 (例えば map, filter, flatMap)まで。詳細はSQL プログラミング ガイド を見てください。使用できる2,3の操作の例を見てみましょう。

基本的なオペレーション - 選択、射影、集約

データフレーム/データセット上のほとんどの共通の操作がストリーミングのためにサポートされます。サポートされない少数の操作についてはこの章の後で議論されます。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

イベントタイムのウィンドウ操作

スライドするイベント時間のウィンドウ上での集約は構造化ストリーミングでは簡単で、グループ化集約にとても似ています。グループ化された集約では、集約値(例えば count)はユーザ定義のグループ化カラムの中のそれぞれのユニークな値について保持されます。ウィンドウベースの集約の場合、集約値はイベント時間の行が分類される各ウィンドウについて保持されます。これを図を使って理解しましょう。

手っ取り早い例が修正され、ストリームの行は行が生成された時に時間とともに含まれると仮定します。word countを実行する代わりに、各5秒ごとに更新する10秒のウィンドウの中でのwordを数えたいとします。つまり、12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 などの10秒のウィンドウの間で受信した複数のwordの中のwordをカウントします。12:00 - 12:10 は12:00以降で12:10より前に到着したデータを意味することに注意してください。ここで、12:07に到着したwordを考えてみましょう。このwordは二つのウィンドウ 12:00 - 12:10 と 12:05 - 12:15 に対応するカウントを増やす必要があります。つまり、そのカウントはグループキー (つまり word) とウィンドウ(イベント時間から計算することができます)の両方によってインデックスされるでしょう。

結果のテーブルは以下のようなものになるでしょう。

ウィンドウ オペレーション

このウィンドウはコード上はグルーピングに似ているため、ウィンドウされた集約を表現するためにgroupBy() および window() 操作を使うことができます。以下の例のScala/Java/Pythonでの完全なコードを見ることができます

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

Late Dataとウォーターマークの扱い

ここで、イベントの一つがアプリケーションに遅れて到着すると何が起きるかを考えてみましょう。例えば、12:04(つまりイベント時間)に生成されたwordがアプリケーションによって12:11に受け取られるかも知れません。アプリケーションはウィンドウ 12:00 - 12:10の古いカウントを更新するために12:11ではなく、12:04を使うべきです。これはウィンドウベースのグルーピングでは普通に起こります - 構造化ストリーミングは、以下で示すように遅れたデータが古いウィンドウの集約を正しく更新することができるように、長い時間部分的な集約について中間状態を維持することができます。

遅れたデータの処理

しかし、このクエリを数日の間実行するには、システムが計算する中間のメモリの状態の量を制限することが必要です。このことは、アプリケーションが集約についてもう遅れたデータを受け取らないだろうということで、システムが古い集約をメモリの状態から削除することができるかを知る必要があることを意味します。これを有効にするには、Spark 2.1でwatermarkingを導入しました。これはエンジンが自動的にデータ内の現在のイベント時間を追跡し、古い状態を適宜掃除します。イベント時間のカラムと、イベント時間の点でデータがどれくらい遅れるかを予想をする閾値を指定することでクエリのwatermarkを定義することができます。時間 Tに始まる特定のウィンドウについて、エンジンは状態を維持し、遅れたデータが(エンジンによって観測された最大のイベント時間 - 遅延の閾値 > T)まで状態を更新することができるでしょう。別の言い方をすると、閾値内の遅れたデータは集約されますが、閾値より遅れたデータは取り零されるでしょう。例を使ってこれを理解してみましょう。以下で示すようにwithWatermark()を使って以前の例でのwatermarkを簡単に定義することができます。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()

この例では、カラム“timestamp”の値にクエリのウォーターマークを定義し、データがどれだけ遅れることができるかの閾値を “10 分” に定義しました。このクエリがUpdate出力モード(後で出力モード の章で議論されます)で実行された場合、エンジンはウィンドウがウォーターマークより古くなるまでResultテーブル内のウィンドウの数を更新し続けるでしょう。カラム “timestamp” の現在のイベントタイムからの遅れは10分です。図での説明です。

Updateモードでのウォーターマーク

図で示されるように、エンジンによって追跡される最大のイベント時間はblue dashed lineで、ウォーターマークは各トリガーの開始時に (max event time - '10 mins') に設定されます。例えば、エンジンがデータ (12:14, dog)を観測すると、次のトリガーのためのウォーターマークを 12:04 に設定します。このウォーターマークは遅れたデータがカウントされるようにエンジンが更に10分間中間状態を維持します。例えば、データ(12:09, cat) は順番がバラバラで遅れており、ウィンドウ12:05 - 12:1512:10 - 12:20にあります。従って、それはトリガーのウォーターマーク12:04の前にまだあり、エンジンは状態として中間カウントをまだ維持していて、現在のところ関連するウィンドウのカウントを更新します。しかし、ウォーターマークが12:11に更新された場合、ウィンドウ(12:00 - 12:10)の中間状態が消去され、全てのそれに続くデータ(例えば (12:04, donkey))は“遅すぎる” と見なされ、従って無視されます。各トリガーの後で、更新されたカウント(つまり purple rows) はUpdateモードで命令されたようにトリガーの出力としてsinkに書き込まれます。

f幾つかのsink (例えば ファイル) はUpdateモードが必要とするfine-grained updateをサポートしないかも知れません。それらと連携するために、final countsだけがsinkに書き込まれるAppendモードをサポートします。これは以下で説明されます。

非ストリーミングデータセット上でwithWatermarkを使うと何もしないことに注意してください。ウォーターマークはどのようなやり方でもどのバッチクエリにも影響すべきではないため、それを直接無視するでしょう。

Appendモードでのウォーターマーク

前のUpdateモードと似ていて、エンジンは各ウィンドウのための中間カウントを維持します。しかし、部分的なカウントはResultテーブルへ更新されず、sinkに書き込まれません。エンジンは遅い日付がカウントされるように “10 分” 待ち、それから ウィンドウ < ウォーターマーク の中間状態を削除し、最終のカウントをResultテーブル/sinkに追加します。例えば、ウィンドウ 12:00 - 12:10 の最終カウントはウォーターマークが 12:11 に更新された後でのみResultテーブルに追加されます。

ウォーターマークが集約状態を掃除するための条件 以下の条件がウォーターマークが集約クエリ内の状態を掃除するために満たされるべきだと注意することが重要です (Spark 2.1.1 の時点の題名、将来には変わります)

  • Output モードは Append あるいは Update でなければなりません。 Complete モードは全ての集約データが保持されることを必要とし、従って中間データを削除するためにウォーターマークを使うことができません。各出力モードのセマンティクスの詳細な説明はOutput モード の章を見てください。

  • 集約はイベント時間カラムあるいはイベント時間カラム上のwindowのどちらからを持つ必要があります。

  • 集約で使われるtimestampカラムとしてwithWatermarkが同じカラムで呼ばれる必要があります。例えば、ウォーターマークは集約カラムからの異なるカラム上で定義されるため、df.withWatermark("time", "1 min").groupBy("time2").count() は Append 出力モードでは無効です。

  • withWatermark must be called before the aggregation for the watermark details to be used. 例えば、Append出力モードではdf.groupBy("time").count().withWatermark("time", "1 min") は無効です。

join オペレーション

ストリーミング データフレームは新しいストリーミング データフレームを作成するために静的データフレームを使ってjoinすることができます。2,3の例です。

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF
Dataset<Row> staticDf = spark.read. ...;
Dataset<Row> streamingDf = spark.readStream. ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join");  // right outer join with a static DF
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  # right outer join with a static DF

ストリーミングの非重複

イベント内でユニークな識別子を使ってデータストリーム内のレコードを非重複にすることができます。これはユニークな識別子のカラムを使った静的なものの非重複と完全に同じです。クエリは重複レコードをフィルタできるような以前のレコードから必要な量のデータを格納するでしょう。集約と似て、ウォーターマークの有り無しの非重複を使うことができます。

  • ウォーターマークあり - 重複したレコードが到着するかも知れない頻度について上限があります。そしてイベント時間のカラムにウォーターマークを定義するこができ、guidとイベント時間カラムの両方を使って、非重複にすることができます。クエリは重複するレコードがもうないと思われる以前のレコードから古い状態のデータを削除するためにウォーターマークを使うでしょう。これはクエリが維持しなければならない状態の量を制限します。

  • ウォーターマーク無し - 重複したレコードが到達するかも知れない時間に制限が無いため、クエリは状態として以前のレコード全てからデータを格納します。

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream. ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
streamingDf = spark.readStream. ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("guid", "eventTime")

任意のステートフル操作

多くの利用法で、集約よりもっと進化したステートフル操作を必要とします。例えば、多くの利用法の中で、イベントのデータストリームからセッションを追跡しなければなりません。そのようなセッション化を行う場合、任意の型のデータを状態として保存する必要があり、各トリガーでのデータストリームイベントを使って状態への任意の操作を実施するでしょう。Spark 2.2 から、これはオペレータmapGroupsWithStateともっと強力な操作 flatMapGroupsWithStateを使って行うことができます。両方の操作により、ユーザはユーザ定義の状態を更新するためにグループ化されたデータセット上でユーザ定義のコードを適用することができます。もっと具体的な詳細については、API ドキュメント(Scala/Java) と例 (Scala/Java) を見てください。

サポートされないオペレーション

ストリーミング データフレーム/データセットでサポートされない2,3のデータフレーム/データセット 操作があります。それらの幾つかは以下のようなものです。

  • 多段ストリーミング集約 (つまり、ストリーミングDF上の集約のチェーン)はまだストリーミングデータセットではサポートされません。

  • ストリーミング データセット上の Limit および 最初のN行の取得はサポートされません。

  • ストリーミング データセット上のdistinct操作はサポートされません。

  • 集約の後、およびComplete出力モードでのみ、ストリーミングデータセット上でのSort操作がサポートされます。

  • ストリーミングと静的データセット間のouter joinは条件付きでサポートされます。

    • ストリーミングデータセットとのfull outer joinはサポートされません

    • 右にあるストリーミング データセットとのleft outer joinはサポートされません

    • 左にあるストリーミング データセットとの right outer join はサポートされません

  • 二つのストリーミング データセット間のどのような種類のjoinもまだサポートされません。

更に、ストリーミング データセット上で動作しないだろう幾つかのデータセット メソッドがあります。それらはすぐにクエリを実行して結果を返すだろうアクションで、ストリーミング データセット上では意味を為しません。むしろ、それらの機能はストリーミングクエリの明示的な開始によって行うことができます (それについては次の章を見てください)。

  • count() - ストリーミング データセットから1つのcountを返すことができません。代わりに、実行中のcountを含むストリーミング データセットを返すds.groupBy().count()を使ってください。

  • foreach() - 代わりにds.writeStream.foreach(...)を使ってください (次の章を見てください)。

  • show() - 代わりにconsole sinkを使ってください (次の章を見てください)。

もしこれらの操作のどれかを試すと、“operation XYZ is not supported with streaming DataFrames/Datasets” のようなAnalysisExceptionを見るでしょう。将来のリリースでそれらのうちの幾つかがサポートされるかも知れませんが、ストリーミングデータ上で効果的に実装することが本質的に困難な他のことがあります。例えば、入力ストリーム上でのソートは、ストリーム内で受け取った全てのデータを追跡し続けることが必要なため、サポートされません。従って、これは効率的に実行することが本質的に難しいです。

ストリーミング クエリの開始

一旦最終の結果データフレーム/データセットを定義すると、後に残っているのはストリーミング計算の開始です。そうするには、Dataset.writeStream()を経由して帰ってくるDataStreamWriter (Scala/Java/Python ドキュメント) を使う必要があります。このインタフェース内で1つ以上の以下のものを指定する必要があります。

  • Details of the output sink: データフォーマット、locationなど。

  • Output mode: 出力sinkに書き込まれるものを指定する。

  • Query name: 任意で、識別のためのクエリのユニークな名前を指定する。

  • Trigger interval: 任意で、トリガーの間隔を指定する。指定されない場合、システムは以前の処理が完了したらすぐに新しいデータが利用可能かを調べるでしょう。以前の処理が完了していないためにトリガーの時間に失敗した場合は、システムは処理が完了した後ですぐにではなく、次のトリガーの場所でトリガーしようとするでしょう。

  • チェックポイントの location: 端から端まで耐障害性を保証できる出力sinkについては、システムが全てのチェックポイントの情報を書き込むだろうlocationを指定します。これはHDFS互換の耐障害性ファイルシステムの中のディレクトリでなければなりません。チェックポイントのセマンティクスは次の章で詳細に議論されます。

出力モード

出力モードには2,3の種類があります。

  • Append モード (デフォルト) - これはデフォルトのモードで、最後のトリガーで結果テーブルに追加された新しい行だけがsinkに書き込まれるでしょう。これは結果テーブルに追加された行が変更されないだろうクエリのためだけにサポートされます。従って、このモードは各行が一度だけ出力されるだろうことを保証します (耐障害性sinkを仮定します)。例えば、select, where, map, flatMap, filter, join などだけを持つクエリがAppendモードをサポートするでしょう。

  • Complete モード - 各トリガーの後で、結果テーブル全体がsinkに出力されるでしょう。これは集約クエリのためにサポートされます。

  • Update モード - (Spark 2.1.1 から利用可能) 最後のトリガーから更新された結果テーブル内の行だけがsinkに出力されるでしょう。詳細な情報は将来のリリースで追加されるでしょう。

異なる型のストリーミングクエリは、異なる出力モードをサポートします。互換性のマトリックスです。

クエリの型 サポートされる出力モード 備考
集約有りのクエリ ウォーターマークを持つイベント時間の集約 Append, Update, Complete Append モードは古い集約状態を削除するためにウォーターマークを使います。But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). 詳細はLate Data の章を見てください。

Update モードは古い集約状態を削除するためにウォーターマークを使います。

定義によりこのモードは結果テーブル内の全てのデータを保持するため、Complete モードは古い集約状態を削除しません。
他の集約 Complete, Update ウォーターマークが定義されていない(他のカテゴリの中でのみ定義されている)ため、古い集約状態は削除されません。

集約は更新することができ、従ってこのモードのセマンティクスに違反するため、Append モードはサポートされません。
mapGroupsWithState を使ったクエリ Update
flatMapGroupsWithState を使ったクエリ 追加操作モード Append 集約は flatMapGroupsWithState の後に可能です。
更新操作モード Update 集約は flatMapGroupsWithStateの後では不可能です。
他のクエリ Append, Update 結果テーブル内の全て集約されていないデータを保持することは実現不可能なため、Complete モードはサポートされません。

出力シンク

2,3の組み込みの出力sinkがあります。

  • ファイル sink - 出力をディレクトリに格納します。
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
  • Foreach sink - 出力内のレコードの任意の計算を実行します。詳細はこの章の後の方を見てください。
writeStream
    .foreach(...)
    .start()
  • コンソール sink (デバッグのため) - トリガーがある度に出力をコンソール/標準出力に出力します。Append と Complete の出力モードの両方がサポートされます。各トリガーの後でドライバーのメモリ内に出力全体が格納されるため、多くないデータの量のデバッグの目的のために使われるべきです。
writeStream
    .format("console")
    .start()
  • メモリ sink (for debugging) - 出力はメモリ内にインメモリテーブルとして格納されます。Append と Complete の出力モードの両方がサポートされます。ドライバーのメモリ内に出力全体が格納されるため、多くないデータの量のデバッグの目的のために使われるべきです。従って、慎重に使ってください。
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

幾つかのsinkは出力の永続性を保証せず、デバッグの目的のために予定されているため、耐障害性がありません。耐障害性のセマンティクスの前の章を見てください。Sparkでの全てのsinkの詳細です。

Sink サポートされる出力モード オプション 耐障害性 備考
ファイル Sink Append path: 出力ディレクトリのパス。指定しなければなりません。

ファイルフォーマットに固有のオプションについては、DataFrameWriter内の関係するメソッドを見てください (Scala/Java/Python/R)。例えば、"parquet" フォーマットのオプションについては、DataFrameWriter.parquet()を見てください
Yes パーティション テーブルへの書き込みをサポートします。時間によるパーティションが有用かも知れません。
Foreach シンク Append, Update, Compelete None ForeachWriterの実装によります 詳細は次の章にあります
コンソール Sink Append, Update, Complete numRows: 各トリガー毎に出力する行の数 (デフォルト: 20)
truncate: 出力が長い場合に切り捨てるかどうか (デフォルト: true)
いいえ
メモリ Sink Append, Complete None いいえ。しかし、Complete モードでは、再起動されたクエリは完全なテーブルを再生成するでしょう。 テーブル名はクエリ名です。

実際にクエリの実行を開始するにはstart() を呼び出す必要があることに注意してください。これは連続して実行中の実行へのハンドラである StreamingQuery オブジェクトを返します。このオブジェクトをクエリを管理するために使うことができます。これについては次のサブセクションで議論します。今は、2,3の例を使ってこれについて理解しましょう。

// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")   

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

# Interactively query in-memory table
head(sql("select * from aggregates"))

Foreachの使用

foreach オペレータは任意のオペレーションを出力データ上で計算することができます。Spark 2.1 の時点では、これは Scala と Java のみで利用可能です。これを使うには、インタフェース ForeachWriterを実装する必要があるでしょう (Scala/Java docs)。これはトリガーの後で出力として生成される行の系列がある場合にはいつも呼ばれるメソッドを持ちます。以下の重要なポイントに注意してください。

  • writer はシリアライズ化され実行のためにexecutorに送信されるため、シリアライズ化される必要があります。

  • executor上では、open, process および close の3つの全てのメソッドが呼ばれるでしょう。

  • writerは openメソッドが呼ばれた時のみ、全ての初期化 (例えば、接続の開始、トランザクションの開始など)をする必要があります。オブジェクトが生成されてすぐにクラス内でなんらかの初期化がある場合、その初期化がドライバ内(なぜならインスタンスが生成される場所なので)で起きるだろうことに注意してください。これは意図したものでは無いかも知れません。

  • 出力するために必要な行のセットをユニークに表すopen内のversionpartition の二つのパラメータ。version は各トリガー毎に増加する単調増加のidです。partition は出力のパーティションを表すidです。出力は分散されるので、複数のexecutor上で処理されるでしょう。

  • open は行の系列を書き込む必要があるかどうかを選択するために、versionpartition を使うことができます。従って、true (書き込んで進める)、あるいはfalse (書き込む必要が無い)を返す可能性があります。false が返った場合、process はどの行においても呼ばれないでしょう。例えば、部分的な障害の後で、障害のあったトリガーの出力パーティションの一部がデータベースへ既にコミットされているかも知れません。データベース内に格納されているmetadataに基づいて、writerは既にコミットされたパーティションを識別することができ、それに応じて再びそれらをコミットすることをスキップするためにfalseを返すことができます。

  • open が呼ばれる時はいつも、close も呼ばれるでしょう (なんらかのエラーでJVMが終了しない場合)。もしopen がfalseを返す場合でもこれはtrueです。もしデータの処理および書き込みでエラーがある場合、close はエラーで呼ばれるでしょう。リソースのリークが無いように open内で生成された状態(例えば、接続、トランザクションなど)を掃除することに責任があります。

ストリーミング クエリの管理

クエリが開始された時に生成されたStreamingQueryオブジェクトはクエリを監視および管理するために使うことができます。

val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress()  # an array of the most recent progress updates for this query

query.lastProgress()    # the most recent progress update of this streaming query
query <- write.stream(df, "console")  # get the query object

queryName(query)          # get the name of the auto-generated or user-specified name

explain(query)            # print detailed explanations of the query

stopQuery(query)          # stop the query

awaitTermination(query)   # block until query is terminated, with stop() or with error

lastProgress(query)       # the most recent progress update of this streaming query

1つのSparkSession内で任意の数のクエリを開始することができます。それら全ては同時にクラスタのリソースを共有して並行で実行するでしょう。現在アクティブなクエリを管理するために使うことができるStreamingQueryManager (Scala/Java/Python docs) を取得するために、sparkSession.streams() を使うことができます。

val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates
spark = ...  # spark session

spark.streams().active  # get the list of currently active streaming queries

spark.streams().get(id)  # get a query object by its unique id

spark.streams().awaitAnyTermination()  # block until any one of them terminates
Rでは利用できません。

ストリーミングクエリの監視

アクティブなクエリを監視及びデバッグするための2つのAPIがあります - 対話的で非同期です。

インタラクティブ API

streamingQuery.lastProgress()streamingQuery.status() を使ってアクティブなクエリの現在の状態とメトリクスを直接取得することができます。lastProgress() returns a StreamingQueryProgress object in Scala and Java and a dictionary with the same fields in Python. ストリームの最後のトリガーで行われた進捗についての全ての情報を持ちます - 何のデータが処理されたか、処理のレートがどれくらいか、レンテンシなど。最後の2,3の進捗の配列を返すstreamingQuery.recentProgress もあります。

更に、streamingQuery.status()Scala および JavaでのStreamingQueryStatus オブジェクトとPythonでの同じフィールドを持つディクショナリを返します。それはクエリがすぐに何をするかについての情報を与えます - トリガーを有効にする、データが処理されるなど。

2,3の例です。

val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
''' 
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
query <- ...  # a StreamingQuery
lastProgress(query)

'''
Will print something like the following.

{
  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
  "name" : null,
  "timestamp" : "2017-04-26T08:27:28.835Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 0
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
  }
}
'''

status(query)
'''
Will print something like the following.

{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
'''

非同期API

StreamingQueryListener をアタッチすることでSparkSession に関連する全てのクエリを非同期で監視することもできます (Scala/Java ドキュメント)。sparkSession.streams.attachListener()を使って独自のStreamingQueryListener オブジェクトを一度アタッチすると、クエリが開始および停止した時とアクティブなクエリ内で進捗があった時のコールバックを得ます。例は以下のようになります。

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
Pythonでは利用できません。
Rでは利用できません。

チェックポイントを使った障害からの回復

障害あるいは計画的なシャットダウンの場合、前の進捗と前のクエリの状態を回復することができ、中止したところから再開することができます。これはチェックポイントと書き込み先行ログを使って行われます。チェックポイントのlocationを持つクエリを設定することができ、クエリは全ての進捗の情報(つまり、各トリガーで処理されたオフセットの範囲)と実行中の集約(例えば、quick exampleでのword count)をチェックポイントのlocationに保存するでしょう。このチェックポイントのlocationはHDFS互換ファイルシステムのパスでなければならず、クエリを実行する時にDataStreamWriterでのオプションとして設定することができます。

aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();
aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

この後どうすればいいか

TOP
inserted by FC2 system