Spark ストリーミング + Kafka 統合ガイド

Apache Kafka is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. SparkストリーミングがKafkaからデータを受信するための方法を説明します。これをするには2つの方法があります - レシーバーとKafkaの高レベルAPIを使った古いやり方と、レシーバーを使わない新しい実験的なやり方(Spark 1.3で導入されました)。それらは異なるプログラミングモデル、パフォーマンス特性、そして意味の保証を持ちますので、詳細は読み続けてください。

方法 1: レシーバーベースの方法

この方法はデータを受け取るためにレシーバーを使います。レシーバーはKafka 高レベルconsumer APIを使って実装されます。全てのレシーバーと同じように、レシーバーを経由したKafkaから受け取ったデータはSpark executorに格納され、Sparkストリーミングによって起動されたジョブはデータを処理します。

しかし、デフォルトの設定では、このやりかたは障害時にデータを損失します(レシーバーの信頼性を見てください)。データの損失ゼロを確実にするには、更にSparkストリーミングの先行書き込みログを有効にする必要があります(Spark 1.2 で導入されました)。これは同期的に全てのKafkaから受信したデータを分散したファイルシステム(例えば、HDFS)上の先行書き込みログに書き込みます。それにより全てのデータは障害時に復旧されます。先行書き込みログについての詳細はストリーミングプログラミングガイドのDeploying section を見てください。

次に、このやり方をストリーミングアプリケーション内で使う方法を議論します。

  1. Linking: SBT/Maven プロジェクト定義を使用するScala/Javaアプリケーションのために、ストリーミングアプリケーションを以下のartifactとリンクします(更に詳しい情報はメインのプログラミングガイドのLinking sectionを見てください)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kafka_2.10
     version = 1.6.0
    

    Pythonアプリケーションに関しては、アプリケーションをデプロイする時に、この上のライブラリとその依存物を追加する必要があるでしょう。以下のDeployingのサブセクションを見てください。

  2. プログラミング ストリーミングアプリケーションコードの中で、KafkaUtilsをインポートし、以下のように入力DStreamを生成します。

     import org.apache.spark.streaming.kafka._
    
     val kafkaStream = KafkaUtils.createStream(streamingContext, 
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
    

    様々なcreateStreamを使ってキーと値のクラス、およびそれらに対応するデコーダクラスを指定することもできます。API ドキュメントを見てください。

     import org.apache.spark.streaming.kafka.*;
    
     JavaPairReceiverInputDStream<String, String> kafkaStream = 
         KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
    

    様々なcreateStreamを使ってキーと値のクラス、およびそれらに対応するデコーダクラスを指定することもできます。API ドキュメントを見てください。

     from pyspark.streaming.kafka import KafkaUtils
    
     kafkaStream = KafkaUtils.createStream(streamingContext, \
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
    

    デフォルトでは、Python APIはKafkaデータをUTF8エンコード文字列としてデコードするでしょう。Kafkaレコード内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメントを見てください。

    覚えておくべきこと:

    • Kafka内のトピックパーティションはSparkストリーミングで生成されたRDDのパーティションと関係ありません。つまり、KafkaUtils.createStream()内のトピック固有のパーティションの数の増加は、1つのレシーバー内で消費されるトピックを使うスレッドの数のみを増加します。データの処理時のSparkの平行性を増加しません。それについての詳細な情報はメインドキュメントを参照してください。

    • 複数のKafka 入力DStreamは、複数のレシーバーを使ってデータの並行受信のための異なるグループとトピックを使って生成することができます。

    • HDFSのようなリプリケートされたファイルシステムを使って先行書き込みログを有効にした場合は、受信したデータは既にログ内にリプリケートされています。 Hence, the storage level in storage level for the input stream to StorageLevel.MEMORY_AND_DISK_SER (that is, use KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).

  3. Deploying: どのようなSparkアプリケーションと同様に、アプリケーションを起動するためにspark-submit が使われます。しかし、Scala/JavaアプリケーションとPythonアプリケーションについては、詳細が少し異なります。

    Scala と Java アプリケーションについては、もしSBTあるいはMavenをプロジェクト管理に使っている場合、spark-streaming-kafka_2.10 とその依存物をアプリケーションJARにパッケージします。spark-core_2.10spark-streaming_2.10 は既にSparkインストレーションの中に存在するため、それらが provided 依存物として印がつけられるようにしてください。そして、アプリケーションを起動するためにspark-submit を使います (メインプログラムのDeploying section を見てください)。

    SBT/Mavenプロジェクト管理が欠けているPythonアプリケーションについては、spark-streaming-kafka_2.10とその依存物は--packagesを使ってspark-submitに直接追加することができます (アプリケーション submissionガイドを見てください)。つまり、

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ...
    

    もう一つの方法として、Maven repository からMaven artifactspark-streaming-kafka-assemblyのJARをダウンロードし、それを--jarsを使ってspark-submitに追加することもできます。

方法 2: 直接的な方法 (レシーバー無し)

このレシーバーが無い"direct"方法はストレージの端から端までの保証を確実にするためにSpark 1.3.で導入されました。レシーバをデータを受信するために使う代わりに、このやり方は定期的にKafkaに各トピック+パーティション内の最新のオフセットを質問し、それに応じて各バッチ内で処理するためにオフセットの領域を定義します。データを処理するジョブが起動される場合に、Kafkaの単純な消費APIがKafkaから規定の範囲のオフセットを読み込むために使われます(ファイルシステムからの読み込みに似ています)。これはScalaとJava APIのためにSpark1.3で、Python APIのためにSpark1.4で導入された実験的な機能であることに注意してください。

このやり方はレシーバーベースのやり方(つまり方法1)に比べて以下の利点があります。

この方法の一つの不利な点は、Zookeeper内のオフセットを更新しないことです。従って、ZookeeperベースのKafka監視ツールは進捗を表示しないでしょう。しかし、このやり方によって各バッチ内で処理されたオフセットにアクセスすることができ、Zookeeper自身を更新することができます(以下を見てください)。

次に、このやり方をストリーミングアプリケーション内で使う方法を議論します。

  1. Linking: このやり方はScala/Javaアプリケーションでのみサポートされます。SBT/Maveプロジェクトを以下のartfifactにリンクします(更に詳しい情報はメインプログラミングガイドのLinking sectionを見てください)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kafka_2.10
     version = 1.6.0
    
  2. プログラミング ストリーミングアプリケーションコードの中で、KafkaUtilsをインポートし、以下のように入力DStreamを生成します。

     import org.apache.spark.streaming.kafka._
    
     val directKafkaStream = KafkaUtils.createDirectStream[
         [key class], [value class], [key decoder class], [value decoder class] ](
         streamingContext, [map of Kafka parameters], [set of topics to consume])
    

    API ドキュメントを見てください。

     import org.apache.spark.streaming.kafka.*;
    
     JavaPairReceiverInputDStream<String, String> directKafkaStream = 
         KafkaUtils.createDirectStream(streamingContext,
             [key class], [value class], [key decoder class], [value decoder class],
             [map of Kafka parameters], [set of topics to consume]);
    

    API ドキュメントを見てください。

     from pyspark.streaming.kafka import KafkaUtils
     directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    

    デフォルトでは、Python APIはKafkaデータをUTF8エンコード文字列としてデコードするでしょう。Kafkaレコード内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメントを見てください。

    Kakfaパラメータ内で、metadata.broker.list あるいは bootstrap.serversのどちらかを指定する必要があります。デフォルトでは、各Kafkaパーティションの最新のオフセットから消費が開始されます。Kafkaパラメータの中でauto.offset.resetsmallestに設定すると、一番小さいオフセットから消費を開始するでしょう。

    KafkaUtils.createDirectStreamの他の葉生物を使うことで、任意のオフセットから消費を開始することもできます。更に、各バッチ内で消費されたKafkaオフセットにアクセスしたい場合は、以下を行うことができます。

     // Hold a reference to the current offset ranges, so it can be used downstream
     var offsetRanges = Array[OffsetRange]()
    	
     directKafkaStream.transform { rdd =>
       offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
       rdd
     }.map {
               ...
     }.foreachRDD { rdd =>
       for (o <- offsetRanges) {
         println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
       }
       ...
     }
    
     // Hold a reference to the current offset ranges, so it can be used downstream
     final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
    	
     directKafkaStream.transformToPair(
       new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
         @Override
         public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
           OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
           offsetRanges.set(offsets);
           return rdd;
         }
       }
     ).map(
       ...
     ).foreachRDD(
       new Function<JavaPairRDD<String, String>, Void>() {
         @Override
         public Void call(JavaPairRDD<String, String> rdd) throws IOException {
           for (OffsetRange o : offsetRanges.get()) {
             System.out.println(
               o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
             );
           }
           ...
           return null;
         }
       }
     );
    
     offsetRanges = []
    
     def storeOffsetRanges(rdd):
         global offsetRanges
         offsetRanges = rdd.offsetRanges()
         return rdd
    
     def printOffsetRanges(rdd):
         for o in offsetRanges:
             print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
    
     directKafkaStream\
         .transform(storeOffsetRanges)\
         .foreachRDD(printOffsetRanges)
    

    ZookeeperベースのKafka監視ツールにストリーミングアプリケーションの進捗を表示したい場合は、自分でZookeeperを更新するためにこれを使うことができます。

    Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. オフセットにアクセスするために呼び出す最初のメソッドとして、foreachRDD()の代わりにtransform()を使うことができます。 そして、さらにSparkのメソッドを呼びます。しかし、RDDパーティションとKafkaパーティション間の1対1のマッピングは、シャッフルあるいは再パーティションのどのようなメソッドの後でも維持されないことに注意してください。例えば、reduceByKey() or window()。

    他に注意しなければならないこととしては、このやり方はレシーバーを使わないため、レシーバーに関する基準(つまりspark.streaming.receiver.*configurations)は(他の入力DStreamには適用されるでしょうが)このやり方によって生成された入力DStreamには適用されません。代わりにspark.streaming.kafka.*configurations を使ってください。An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.

  3. Deploying: Scala、Java および Pythonに関して、これは最初のやり方と同じです。

TOP
inserted by FC2 system