構造化 ストリーミング + Kafka 統合ガイド (Kafkaブローカーバージョン 0.10.0 以上)

Kafkaからデータを読み込みおよび書き込みするためのKafka 0.10のための構造化ストリーミング統合。

リンク

SBT/Maven プロジェクト定義を使用するScala/Javaアプリケーションのために、ストリーミングアプリケーションを以下のartifactとリンクします:

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.2.0

Pythonアプリケーションに関しては、アプリケーションをデプロイする時に、この上のライブラリとその依存物を追加する必要があるでしょう。以下の配備 サブセクションを見てください。

Kafkaからのデータの読み込み

ストリーミング クエリのためにKafkaソースを生成

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Subscribe to multiple topics
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Subscribe to a pattern
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

バッチ クエリのためにKafkaソースを生成

バッチ処理にもっと適した利用法がある場合、オフセットの定義された範囲のためにデータセット/データフレームを作成することができます。

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics, specifying explicit Kafka offsets
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern, at the earliest and latest offsets
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

ソース内の各行は以下のスキーマを持ちます:

カラム種類
キー binary
binary
トピック 文字列
パーティション int
オフセット long
timestamp long
timestampType int

以下のオプションはバッチとストリーミングクエリの両方についてのKafkaソースのために設定されなければなりません。

オプション意味
assign json string {"topicA":[0,1],"topicB":[2,4]} 消費のための特定のトピックパーティションKafkaソースについては、"assign", "subscribe" あるいは "subscribePattern" オプションのうちの1つだけが指定することができます。
購読 トピックのカンマ区切りのリスト 購読するトピックのリストKafkaソースについては、"assign", "subscribe" あるいは "subscribePattern" オプションのうちの1つだけが指定することができます。
subscribePattern Java regex 文字列 トピックを購読するために使われるパターン。Kafkaソースについては、"assign", "subscribe" あるいは "subscribePattern" オプションのうちの1つだけが指定することができます。
kafka.bootstrap.servers host:portのカンマ区切りのリスト Kafka "bootstrap.servers" 設定

以下の設定は任意です:

オプションデフォルト:クエリの型意味
startingOffsets "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ ストリーミングについては "latest"、バッチについては "earliest" ストリーミングとバッチ クエリが開始される開始点。元も早いオフセットからの"earliest"、最新のオフセットからの"latest"、あるいは各トピックパーティションについての開始オフセットを指定するjson文字列。jsonでは、earliestを参照するためにオフセットとして-2が、latestには-1が使われます。注意: バッチのクエリについては、latest(明示的あるいはjson内で-1を使って)は使えません。ストリーミングのクエリについては、これは新しいクエリが開始された時にのみ適用され、再開は常にクエリが中止された箇所から取り上げられるでしょう。クエリの間で新しく発見されたパーティションはearliestで開始されるでしょう。
endingOffsets latest あるいは json 文字列 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} latest バッチのクエリ バッチのクエリが終了される終了点。最新が参照される"earliest"、あるいは各トピックパーティションについての終了オフセットを指定するjson文字列。jsonでは、latestを参照するためにオフセットとして-1を使うことができ、オフセットとして-2 (earliest) は使うことができません。
failOnDataLoss true あるいは false true ストリーミングのクエリ データが紛失したかも知れない時にクエリを失敗するかどうか(例えば、トピックが削除された、あるいはオフセットが範囲外)。これは間違ったアラームかも知れません。期待した通りに動作しない場合は無効にすることができます。もしバッチのクエリがデータの紛失により指定されたオフセットからのデータの読み込みに失敗する場合は、バッチのクエリは常に失敗するでしょう。
kafkaConsumer.pollTimeoutMs long 512 ストリーミングとバッチ executor内のKafkaからデータをポーリングするタイムアウトのミリ秒。
fetchOffset.numRetries int 3 ストリーミングとバッチ Kafkaの最新のオフセットを取り出すのを諦めるまでの試行回数。
fetchOffset.retryIntervalMs long 10 ストリーミングとバッチ Kafkaのオフセットの取り出し試行までに待つミリ秒
maxOffsetsPerTrigger long none ストリーミングとバッチ レートはトリガーの間隔あたりで処理されるオフセットの最大数を制限します。オフセットの指定された総数は異なるボリュームのtopicPartitionに渡って均等に分割されます。

データのKafkaへの書き込み

ここで、Kafkaへのストリーミングクエリとバッチクエリの書き込みのサポートについて説明します。Apache Kafkaは少なくとも一回の書き込みセマンティクスのみをサポートすることに注意してください。その結果、Kafkaへの書き込む—ストリーミングクエリあるいはバッチクエリのどちらか—場合、いくつかのレコードは重複するかもしれません: 例えば、ブローカーがメッセージレコードを受信し書き込みをしたにも関わらず、Kafkaがブローカーによって知らされなかったメッセージを再試行する必要がある場合にこれがありえます。Kafkaの書き込みセマンティクスにより、構造化ストリーミングはそのような重複が起こることを避けることができません。しかし、もしクエリの書き込みが成功した場合、クエリ出力が少なくとも一回書き込まれたと仮定することができます。書き込まれたデータを読み込む時の有り得そうな解決策は、読み込み時に重複を無くすために使われるプライマリ(ユニーク)キーの導入でしょう。

Kafkaに書き込まれるデータフレームはスキーマ内に以下のカラムを持つべきです:

カラム種類
キー (任意) 文字列あるいはバイナリ
値 (必須) 文字列あるいはバイナリ
トピック (*任意) 文字列

* “topic” 設定オプションが指定されない場合はトピックのカラムが必要です。

値のカラムだけが必須のオプションです。キーのカラムが指定されない場合、null の値のキーカラムが自動的に追加されます (null の値のキーの値がどう扱われるかはKafkaのセマンティクスを見てください)。“topic”設定オプションが設定されていない場合、もしトピックカラムが存在する場合は、指定された行がKafkaに書き込む時にその値がトピックとして使われます。つまり、“topic” 設定オプションはトピックカラムを上書きします。

以下のオプションはバッチとストリーミングクエリの両方についてのKafkaのシンクのために設定されなければなりません。

オプション意味
kafka.bootstrap.servers host:portのカンマ区切りのリスト Kafka "bootstrap.servers" 設定

以下の設定は任意です:

オプションデフォルト:クエリの型意味
トピック 文字列 none ストリーミングとバッチ Kafka内で全ての行が書き込まれるトピックを設定します。このオプションはデータ内に存在する可能性がある全てのトピックカラムを上書きします。

ストリーミング クエリのためのKafkaのシンクを作成

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()

バッチ クエリの出力をKafkaに書き込む

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()

Kafka 固有の設定

Kafka固有の設定はkafka.のプレフィックスを持つDataStreamReader.optionを使って設定することができます。例えばstream.option("kafka.bootstrap.servers", "host:port")。可能なkafkaパラメータについては、データの読み込みに関するパラメータはKafka コンシューマ設定ドキュメントを、データの書き込みに関するパラメータはKafka プロデューサ設定ドキュメント を見てください。

以下のKafkaパラメータは設定することができず、Kafkaのソースあるいはシンクは例外を投げるだろうことに注意してください:

配備

Sparkアプリケーションと同様に、アプリケーションを起動するためにspark-submit が使われます。spark-sql-kafka-0-10_2.11 とその依存物は--packagesを使って直接 spark-submit に追加されるかも知れません。以下のように。

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ...

外部的な依存を持つアプリケーションをサブミットすることについての詳細は、アプリケーションのサブミット ガイド を見てください。

TOP
inserted by FC2 system