構造化 ストリーミング + Kafka 統合ガイド (Kafkaブローカーバージョン 0.10.0 以上)
Kafkaからデータを読み込みおよび書き込みするためのKafka 0.10のための構造化ストリーミング統合。
リンク
SBT/Maven プロジェクト定義を使用するScala/Javaアプリケーションのために、ストリーミングアプリケーションを以下のartifactとリンクします:
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.2.0
Pythonアプリケーションに関しては、アプリケーションをデプロイする時に、この上のライブラリとその依存物を追加する必要があるでしょう。以下の配備 サブセクションを見てください。
Kafkaからのデータの読み込み
ストリーミング クエリのためにKafkaソースを生成
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to multiple topics
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to a pattern
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
バッチ クエリのためにKafkaソースを生成
バッチ処理にもっと適した利用法がある場合、オフセットの定義された範囲のためにデータセット/データフレームを作成することができます。
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to multiple topics, specifying explicit Kafka offsets
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern, at the earliest and latest offsets
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
ソース内の各行は以下のスキーマを持ちます:
カラム | 種類 |
---|---|
キー | binary |
値 | binary |
トピック | 文字列 |
パーティション | int |
オフセット | long |
timestamp | long |
timestampType | int |
以下のオプションはバッチとストリーミングクエリの両方についてのKafkaソースのために設定されなければなりません。
オプション | 値 | 意味 |
---|---|---|
assign | json string {"topicA":[0,1],"topicB":[2,4]} | 消費のための特定のトピックパーティションKafkaソースについては、"assign", "subscribe" あるいは "subscribePattern" オプションのうちの1つだけが指定することができます。 |
購読 | トピックのカンマ区切りのリスト | 購読するトピックのリストKafkaソースについては、"assign", "subscribe" あるいは "subscribePattern" オプションのうちの1つだけが指定することができます。 |
subscribePattern | Java regex 文字列 | トピックを購読するために使われるパターン。Kafkaソースについては、"assign", "subscribe" あるいは "subscribePattern" オプションのうちの1つだけが指定することができます。 |
kafka.bootstrap.servers | host:portのカンマ区切りのリスト | Kafka "bootstrap.servers" 設定 |
以下の設定は任意です:
オプション | 値 | デフォルト: | クエリの型 | 意味 |
---|---|---|---|---|
startingOffsets | "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | ストリーミングについては "latest"、バッチについては "earliest" | ストリーミングとバッチ | クエリが開始される開始点。元も早いオフセットからの"earliest"、最新のオフセットからの"latest"、あるいは各トピックパーティションについての開始オフセットを指定するjson文字列。jsonでは、earliestを参照するためにオフセットとして-2が、latestには-1が使われます。注意: バッチのクエリについては、latest(明示的あるいはjson内で-1を使って)は使えません。ストリーミングのクエリについては、これは新しいクエリが開始された時にのみ適用され、再開は常にクエリが中止された箇所から取り上げられるでしょう。クエリの間で新しく発見されたパーティションはearliestで開始されるでしょう。 |
endingOffsets | latest あるいは json 文字列 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | バッチのクエリ | バッチのクエリが終了される終了点。最新が参照される"earliest"、あるいは各トピックパーティションについての終了オフセットを指定するjson文字列。jsonでは、latestを参照するためにオフセットとして-1を使うことができ、オフセットとして-2 (earliest) は使うことができません。 |
failOnDataLoss | true あるいは false | true | ストリーミングのクエリ | データが紛失したかも知れない時にクエリを失敗するかどうか(例えば、トピックが削除された、あるいはオフセットが範囲外)。これは間違ったアラームかも知れません。期待した通りに動作しない場合は無効にすることができます。もしバッチのクエリがデータの紛失により指定されたオフセットからのデータの読み込みに失敗する場合は、バッチのクエリは常に失敗するでしょう。 |
kafkaConsumer.pollTimeoutMs | long | 512 | ストリーミングとバッチ | executor内のKafkaからデータをポーリングするタイムアウトのミリ秒。 |
fetchOffset.numRetries | int | 3 | ストリーミングとバッチ | Kafkaの最新のオフセットを取り出すのを諦めるまでの試行回数。 |
fetchOffset.retryIntervalMs | long | 10 | ストリーミングとバッチ | Kafkaのオフセットの取り出し試行までに待つミリ秒 |
maxOffsetsPerTrigger | long | none | ストリーミングとバッチ | レートはトリガーの間隔あたりで処理されるオフセットの最大数を制限します。オフセットの指定された総数は異なるボリュームのtopicPartitionに渡って均等に分割されます。 |
データのKafkaへの書き込み
ここで、Kafkaへのストリーミングクエリとバッチクエリの書き込みのサポートについて説明します。Apache Kafkaは少なくとも一回の書き込みセマンティクスのみをサポートすることに注意してください。その結果、Kafkaへの書き込む—ストリーミングクエリあるいはバッチクエリのどちらか—場合、いくつかのレコードは重複するかもしれません: 例えば、ブローカーがメッセージレコードを受信し書き込みをしたにも関わらず、Kafkaがブローカーによって知らされなかったメッセージを再試行する必要がある場合にこれがありえます。Kafkaの書き込みセマンティクスにより、構造化ストリーミングはそのような重複が起こることを避けることができません。しかし、もしクエリの書き込みが成功した場合、クエリ出力が少なくとも一回書き込まれたと仮定することができます。書き込まれたデータを読み込む時の有り得そうな解決策は、読み込み時に重複を無くすために使われるプライマリ(ユニーク)キーの導入でしょう。
Kafkaに書き込まれるデータフレームはスキーマ内に以下のカラムを持つべきです:
カラム | 種類 |
---|---|
キー (任意) | 文字列あるいはバイナリ |
値 (必須) | 文字列あるいはバイナリ |
トピック (*任意) | 文字列 |
* “topic” 設定オプションが指定されない場合はトピックのカラムが必要です。
値のカラムだけが必須のオプションです。キーのカラムが指定されない場合、null
の値のキーカラムが自動的に追加されます (null
の値のキーの値がどう扱われるかはKafkaのセマンティクスを見てください)。“topic”設定オプションが設定されていない場合、もしトピックカラムが存在する場合は、指定された行がKafkaに書き込む時にその値がトピックとして使われます。つまり、“topic” 設定オプションはトピックカラムを上書きします。
以下のオプションはバッチとストリーミングクエリの両方についてのKafkaのシンクのために設定されなければなりません。
オプション | 値 | 意味 |
---|---|---|
kafka.bootstrap.servers | host:portのカンマ区切りのリスト | Kafka "bootstrap.servers" 設定 |
以下の設定は任意です:
オプション | 値 | デフォルト: | クエリの型 | 意味 |
---|---|---|---|---|
トピック | 文字列 | none | ストリーミングとバッチ | Kafka内で全ての行が書き込まれるトピックを設定します。このオプションはデータ内に存在する可能性がある全てのトピックカラムを上書きします。 |
ストリーミング クエリのためのKafkaのシンクを作成
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
バッチ クエリの出力をKafkaに書き込む
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
Kafka 固有の設定
Kafka固有の設定はkafka.
のプレフィックスを持つDataStreamReader.option
を使って設定することができます。例えばstream.option("kafka.bootstrap.servers", "host:port")
。可能なkafkaパラメータについては、データの読み込みに関するパラメータはKafka コンシューマ設定ドキュメントを、データの書き込みに関するパラメータはKafka プロデューサ設定ドキュメント を見てください。
以下のKafkaパラメータは設定することができず、Kafkaのソースあるいはシンクは例外を投げるだろうことに注意してください:
- group.id: Kafkaのソースは各クエリについてのユニークなグループidを自動的に生成するでしょう。
- auto.offset.reset: 代わりにどこから開始するかを指定するためにソースオプション
startingOffsets
を設定します。構造化ストリーミングは、Kafkaコンシューマの管理に頼らず、内部的にどのオフセットが消費されるかを管理しますこれにより新しいトピック/パーティションが動的に購読される時にデータが失われないことが保証されるでしょう。startingOffsets
は新しいストリーミングクエリが開始された時にのみ適用され、再開は常にクエリが中止された箇所から取り上げられるだろうことに注意してください。 - key.deserializer: キーは常に ByteArrayDeserializer を使ってバイトの配列にデシリアライズされます。明示的にキーをデシリアライズするためにはDataFrame操作を使ってください。
- value.deserializer: 値は常に ByteArrayDeserializer を使ってバイトの配列にデシリアライズされます。明示的に値をデシリアライズするためにはDataFrame操作を使ってください。
- key.serializer: キーは常に ByteArraySerializer あるいは StringSerializer を使ってシリアライズされます。明示的にキーを文字列あるいはバイト配列にシリアライズするには、DataFrame操作を使ってください。
- value.serializer: 値は常に ByteArraySerializer あるいは StringSerializer を使ってシリアライズされます。明示的に値を文字列あるいはバイト配列にシリアライズするには、DataFrame操作を使ってください。
- enable.auto.commit: Kafkaのソースはオフセットをコミットしません。
- interceptor.classes: Kafkaのソースは常にキーと値をバイト配列として読み込みます。クエリを破壊するかも知れないので ConsumerInterceptor の使用は安全ではありません。
配備
Sparkアプリケーションと同様に、アプリケーションを起動するためにspark-submit
が使われます。spark-sql-kafka-0-10_2.11
とその依存物は--packages
を使って直接 spark-submit
に追加されるかも知れません。以下のように。
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ...
外部的な依存を持つアプリケーションをサブミットすることについての詳細は、アプリケーションのサブミット ガイド を見てください。