Spark ストリーミング + Kafka 統合ガイド
Apache Kafka is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. SparkストリーミングがKafkaからデータを受信するための方法を説明します。これをするには2つの方法があります - レシーバーとKafkaの高レベルAPIを使った古いやり方と、レシーバーを使わない新しい実験的なやり方(Spark 1.3で導入されました)。それらは異なるプログラミングモデル、パフォーマンス特性、そして意味の保証を持ちますので、詳細は読み続けてください。
方法 1: レシーバーベースの方法
この方法はデータを受け取るためにレシーバーを使います。レシーバーはKafka 高レベルconsumer APIを使って実装されます。全てのレシーバーと同じように、レシーバーを経由したKafkaから受け取ったデータはSpark executorに格納され、Sparkストリーミングによって起動されたジョブはデータを処理します。
しかし、デフォルトの設定では、このやりかたは障害時にデータを損失します(レシーバーの信頼性を見てください)。データの損失ゼロを確実にするには、更にSparkストリーミングの先行書き込みログを有効にする必要があります(Spark 1.2 で導入されました)。これは同期的に全てのKafkaから受信したデータを分散したファイルシステム(例えば、HDFS)上の先行書き込みログに書き込みます。それにより全てのデータは障害時に復旧されます。先行書き込みログについての詳細はストリーミングプログラミングガイドのDeploying section を見てください。
次に、このやり方をストリーミングアプリケーション内で使う方法を議論します。
-
Linking: SBT/Maven プロジェクト定義を使用するScala/Javaアプリケーションのために、ストリーミングアプリケーションを以下のartifactとリンクします(更に詳しい情報はメインのプログラミングガイドのLinking sectionを見てください)。
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.0.0
Pythonアプリケーションに関しては、アプリケーションをデプロイする時に、この上のライブラリとその依存物を追加する必要があるでしょう。以下のDeployingのサブセクションを見てください。
-
プログラミング ストリーミングアプリケーションコードの中で、
KafkaUtils
をインポートし、以下のように入力DStreamを生成します。import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
様々な
createStream
を使ってキーと値のクラス、およびそれらに対応するデコーダクラスを指定することもできます。API ドキュメント と例を見てください。import org.apache.spark.streaming.kafka.*; JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
様々な
createStream
を使ってキーと値のクラス、およびそれらに対応するデコーダクラスを指定することもできます。API ドキュメント と例を見てください。from pyspark.streaming.kafka import KafkaUtils kafkaStream = KafkaUtils.createStream(streamingContext, \ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
デフォルトでは、Python APIはKafkaデータをUTF8エンコード文字列としてデコードするでしょう。Kafkaレコード内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメント と例を見てください。
覚えておくべきこと:
-
Kafka内のトピックパーティションはSparkストリーミングで生成されたRDDのパーティションと関係ありません。つまり、
KafkaUtils.createStream()
内のトピック固有のパーティションの数の増加は、1つのレシーバー内で消費されるトピックを使うスレッドの数のみを増加します。データの処理時のSparkの平行性を増加しません。それについての詳細な情報はメインドキュメントを参照してください。 -
複数のKafka 入力DStreamは、複数のレシーバーを使ってデータの並行受信のための異なるグループとトピックを使って生成することができます。
-
HDFSのようなリプリケートされたファイルシステムを使って先行書き込みログを有効にした場合は、受信したデータは既にログ内にリプリケートされています。 Hence, the storage level in storage level for the input stream to
StorageLevel.MEMORY_AND_DISK_SER
(that is, useKafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
).
-
-
Deploying: Sparkアプリケーションと同様に、アプリケーションを起動するために
spark-submit
が使われます。しかし、Scala/JavaアプリケーションとPythonアプリケーションについては、詳細が少し異なります。Scala と Java アプリケーションについては、もしSBTあるいはMavenをプロジェクト管理に使っている場合、
spark-streaming-kafka-0-8_2.11
とその依存物をアプリケーションJARにパッケージします。spark-core_2.11
とspark-streaming_2.11
は既にSparkインストレーションの中に存在するため、それらがprovided
依存物として印がつけられるようにしてください。そして、アプリケーションを起動するためにspark-submit
を使います (メインプログラムのDeploying section を見てください)。SBT/Mavenプロジェクト管理が欠けているPythonアプリケーションについては、
spark-streaming-kafka-0-8_2.11
とその依存物は--packages
を使ってspark-submit
に直接追加することができます (アプリケーション submissionガイドを見てください)。つまり、./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 ...
もう一つの方法として、Maven repository からMaven artifact
spark-streaming-kafka-0-8-assembly
のJARをダウンロードし、それを--jars
を使ってspark-submit
に追加することもできます。
方法 2: 直接的な方法 (レシーバー無し)
このレシーバーが無い"direct"方法はストレージの端から端までの保証を確実にするためにSpark 1.3.で導入されました。レシーバをデータを受信するために使う代わりに、このやり方は定期的にKafkaに各トピック+パーティション内の最新のオフセットを質問し、それに応じて各バッチ内で処理するためにオフセットの領域を定義します。データを処理するジョブが起動される場合に、Kafkaの単純な消費APIがKafkaから規定の範囲のオフセットを読み込むために使われます(ファイルシステムからの読み込みに似ています)。これはScalaとJava APIのためにSpark1.3で、Python APIのためにSpark1.4で導入された実験的な機能であることに注意してください。
このやり方はレシーバーベースのやり方(つまり方法1)に比べて以下の利点があります。
-
単純化された並行性: 複数の入力Kafkaストリームを生成しそれを結合する必要はありません。
directStream
を使って、SparkストリーミングはKafkaパーティションが消費するだけのRDDパーティションを生成するでしょう。これらはKafkaから並行して全てのデータを読み込むでしょう。つまり、KafkaとRDDのパーティションの間に1対1のマッピングがあり、これは理解しやすく、調整しやすいです。 -
Efficiency: 1つ目の方法でのデータ損ゼロを達成するためには、データが先行書き込みログに格納されている必要があります。これはデータを更にリプリケートします。一度目はKafka、二度目は先行書き込みログによって、データが事実上2回リプリケートされるため、実際のところこれは効率的ではありません。2つ目の方法はレシーバーが無いため問題を削除し、従って先行書き込みログの必要がありません。Kafkaの遅延に差支えが無い限り、メッセージはKafkaから回復することができます。
-
確実に1回のsemantics: 最初のやり方は、Zookeeper内に消費されたoffsetを格納するためにKafkaの高レベルAPIを使用します。 これはKafkaからのデータを消費する伝統的な方法です。このやり方(先行書き込みログとの組み合わせ)は、データの損失を確実にゼロにすることができます(つまり、少なくとも1回のsemantics)が、幾つかの障害の際に幾つかのレコードを2回消費する可能性がわずかにあります。これはSparkストリーミングによる信頼できるデータの受信と、Zookeeperによって追跡されるoffsetの間の不一致のために起こります。従って、この二つ目のやり方では、Zookeeperを使わない単純なKafka APIを使います。offsetsはSparkストリーミングによってそのチェックポイント内で追跡されます。これはSparkストリーミングとZookeeper/Kafkaの間の不一致を取り除き、従ってSparkストリーミングによって受け取られる各レコードは障害にも関わらず実際上確実に1回です。結果の出力について確実に一回のsemanticsを達成するために、データを外部のデータストアに保存する出力操作は等冪、あるいは結果およびオフセットを保存するアトミックなトランザクションのどちらかでなければなりません。(更に詳しい情報はメインプログラミングガイドの出力操作のsemanticsを見てください)。
この方法の一つの不利な点は、Zookeeper内のオフセットを更新しないことです。従って、ZookeeperベースのKafka監視ツールは進捗を表示しないでしょう。しかし、このやり方によって各バッチ内で処理されたオフセットにアクセスすることができ、Zookeeper自身を更新することができます(以下を見てください)。
次に、このやり方をストリーミングアプリケーション内で使う方法を議論します。
-
Linking: このやり方はScala/Javaアプリケーションでのみサポートされます。SBT/Maveプロジェクトを以下のartfifactにリンクします(更に詳しい情報はメインプログラミングガイドのLinking sectionを見てください)。
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.0.0
-
プログラミング ストリーミングアプリケーションコードの中で、
KafkaUtils
をインポートし、以下のように入力DStreamを生成します。import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])
現在のメッセージについてのメタデータを格納する
MessageAndMetadata
へアクセスするためにmessageHandler
をcreateDirectStream
に渡し、それを求まるタイプに変換することができます。API ドキュメント と例を見てください。import org.apache.spark.streaming.kafka.*; JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]);
現在のメッセージについてのメタデータを格納する
MessageAndMetadata
へアクセスするためにmessageHandler
をcreateDirectStream
に渡し、それを求まるタイプに変換することができます。API ドキュメント と例を見てください。from pyspark.streaming.kafka import KafkaUtils directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
現在のメッセージについてのメタデータを格納する
KafkaMessageAndMetadata
へアクセスするためにmessageHandler
をcreateDirectStream
に渡し、それを求まるタイプに変換することができます。デフォルトでは、Python APIはKafkaデータをUTF8エンコード文字列としてデコードするでしょう。Kafkaレコード内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメント と例を見てください。Kakfaパラメータ内で、
metadata.broker.list
あるいはbootstrap.servers
のどちらかを指定する必要があります。デフォルトでは、各Kafkaパーティションの最新のオフセットから消費が開始されます。Kafkaパラメータの中でauto.offset.reset
をsmallest
に設定すると、一番小さいオフセットから消費を開始するでしょう。KafkaUtils.createDirectStream
の他の葉生物を使うことで、任意のオフセットから消費を開始することもできます。更に、各バッチ内で消費されたKafkaオフセットにアクセスしたい場合は、以下を行うことができます。// Hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array[OffsetRange]() directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map { ... }.foreachRDD { rdd => for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } ... }
// Hold a reference to the current offset ranges, so it can be used downstream final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); directKafkaStream.transformToPair( new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd; } } ).map( ... ).foreachRDD( new Function<JavaPairRDD<String, String>, Void>() { @Override public Void call(JavaPairRDD<String, String> rdd) throws IOException { for (OffsetRange o : offsetRanges.get()) { System.out.println( o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() ); } ... return null; } } );
offsetRanges = [] def storeOffsetRanges(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() return rdd def printOffsetRanges(rdd): for o in offsetRanges: print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset) directKafkaStream\ .transform(storeOffsetRanges)\ .foreachRDD(printOffsetRanges)
ZookeeperベースのKafka監視ツールにストリーミングアプリケーションの進捗を表示したい場合は、自分でZookeeperを更新するためにこれを使うことができます。
HasOffsetRangesへのタイプキャストはそれがdirectKafkaStreamで呼ばれる最初のメソッドで行われる場合のみ成功し、メソッドの下流では成功しないだろうことに注意してください。オフセットにアクセスするために呼び出す最初のメソッドとして、foreachRDD()の代わりにtransform()を使うことができます。 そして、さらにSparkのメソッドを呼びます。しかし、RDDパーティションとKafkaパーティション間の1対1のマッピングは、シャッフルあるいは再パーティションのどのようなメソッドの後でも維持されないことに注意してください。例えば、reduceByKey() or window()。
他に注意しなければならないこととしては、このやり方はレシーバーを使わないため、レシーバーに関する基準(つまり
spark.streaming.receiver.*
のconfigurations)は(他の入力DStreamには適用されるでしょうが)このやり方によって生成された入力DStreamには適用されません。代わりにspark.streaming.kafka.*
のconfigurations を使ってください。重要なことは、各Kafkaパーティションでの最大レート(秒間あたりのメッセージ数)spark.streaming.kafka.maxRatePerPartition
がこの直接のAPIによって読み込まれるだろうということです。 -
配備: これは最初のやり方と同じです。