Spark ストリーミング + Kinesis 統合

Amazon Kinesis は大規模のストリーミングデータのリアルタイム処理のための完全に管理されたサービスです。Kinesis レシーバーは、Amazon Software License (ASL)のもとにAmazonによって提供されるKinesis Client Library (KCL)を使って入力DStreamを生成します。KCLはAWS Java SDKにライセンスされたApache2.0上に構築され、ロードバランシング、ワーカーの概念を通じてのチェックポイント、チェックポイント、およびShard Leasesを提供します。SparkストリーミングがKinesisからデータを受信するための方法を説明します。

Kinesisの設定

Kinesis ストリームは以下のguideごとに1つ以上のshardを使って1つの有効なKinesisエンドポイント上にセットアップすることができます。

Sparkストリーミングアプリケーションの設定

  1. Linking: SBT/Maven プロジェクト定義を使ったScala/Javaアプリケーションについては、ストリーミングアプリケーションを以下のアーチファクトにリンクします (更に詳しい情報はメインのプログラミングガイドのLinking sectionを見てください)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.11
     version = 2.1.1
    

    Pythonアプリケーションに関しては、アプリケーションをデプロイする時に、この上のライブラリとその依存物を追加する必要があるでしょう。以下のDeployingのサブセクションを見てください。このライブラリにリンクすることで、アプリケーション内にASLにライセンスされたコードを含むことになることに注意してください。

  2. プログラミング ストリーミングアプリケーションコードの中で、KinesisUtilsをインポートし、以下のようにバイト配列の入力DStreamを生成します:

     import org.apache.spark.streaming.Duration
     import org.apache.spark.streaming.kinesis._
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
    
     val kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
    

    API ドキュメントを見てください。例を実行する方法についての説明は、例の実行のサブセクションを参照してください。

     import org.apache.spark.streaming.Duration;
     import org.apache.spark.streaming.kinesis.*;
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
    
     JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
    

    API ドキュメントを見てください。例を実行する方法についての説明は、例の実行のサブセクションを参照してください。

     from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
     kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
    

    API ドキュメントを見てください。例を実行する方法についての説明は、例の実行のサブセクションを参照してください。

    パーティションキーのような,c2>Record内に含まれる他のデータを使いたい場合には、Kinesis Record を取り一般的なオブジェクトTを返す"メッセージハンドラー関数"も提供するかも知れません。これは現在のところScalaとJavaでのみサポートされています。

     import org.apache.spark.streaming.Duration
     import org.apache.spark.streaming.kinesis._
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
    
     val kinesisStream = KinesisUtils.createStream[T](
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
         [message handler])
    
     import org.apache.spark.streaming.Duration;
     import org.apache.spark.streaming.kinesis.*;
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
    
     JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
         [message handler], [class T]);
    
    • streamingContext: StreamingContext はこのKinesisアプリケーションをKinesisストリームに結びつけるために、Kinesisで使用されるアプリケーション名を含みます。

    • [Kinesis app name]: DynamoDBテーブル内でKinesisシーケンス番号をチェックポイントするために使われるだろうアプリケーション名。
      • アプリケーション名は指定されたアカウントおよび地域でユニークでなければなりません。
      • テーブルが存在するが正しくないチェックポイント情報を持つ(異なるストリームあるいは古くて期限が切れたシーケンス番号)場合、一時的なエラーになるかも知れません。
    • [Kinesis stream name]: このストリーミングアプリケーションがデータを取り出すKinesisストリーム。

    • [endpoint URL]: 有効なKinesisエンドポイントURLはここで見つかるでしょう。

    • [region name]: 有効なKinesis地域名はここで見つかるでしょう。

    • [checkpoint interval]: Kinesis Client Library がストリーム内の場所を保存する間隔 (例えば、Duration(2000) = 2 秒)。初めての人は、それをストリーミングアプリケーションのバッチの間隔と同じに背手地します。

    • [initial position]: InitialPositionInStream.TRIM_HORIZON あるいは InitialPositionInStream.LATESTのどちらかです (詳細は、Kinesis チェックポイントの章と、Amazon Kinesis API ドキュメントを見てください)。

    • [message handler]: KinesisのRecordを取り、一般的な Tを出力する関数。

    APIの他のバージョンでは、AWSアクセスキーと秘密キーを直接指定することもできます。

  3. Deploying: Sparkアプリケーションと同様に、アプリケーションを起動するためにspark-submit が使われます。しかし、Scala/JavaアプリケーションとPythonアプリケーションについては、詳細が少し異なります。

    Scala と Java アプリケーションについては、もしSBTあるいはMavenをプロジェクト管理に使っている場合、spark-streaming-kinesis-asl_2.11 とその依存物をアプリケーションJARにパッケージします。spark-core_2.11spark-streaming_2.11 は既にSparkインストレーションの中に存在するため、それらが provided 依存物として印がつけられるようにしてください。そして、アプリケーションを起動するためにspark-submit を使います (メインプログラムのDeploying section を見てください)。

    SBT/Mavenプロジェクト管理が欠けているPythonアプリケーションについては、spark-streaming-kinesis-asl_2.11とその依存物は--packagesを使ってspark-submitに直接追加することができます (アプリケーション submissionガイドを見てください)。つまり、

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.1 ...
    

    もう一つの方法として、Maven repository からMaven artifactspark-streaming-kinesis-asl-assemblyのJARをダウンロードし、それを--jarsを使ってspark-submitに追加することもできます。

    Spark ストリーミング Kinesis 構造

    実行時に覚えておくべきこと:

    • Kinesis データ処理はパーティションごとに整列され、メッセージごとに少なくとも1回起こります。

    • 複数のアプリケーションが同じKinesisストリームから読み込むことができます。Kinesisはアプリケーション固有の破片とチェックポイントの情報をDynamoDBに保持するでしょう。

    • 1つのKinesis ストリームの破片は、一度に一つの入力DStreamによって処理されます。

    • 1つのKinesis入力DStreamは、複数のKinesisRecordProcessorスレッドを作成することで複数のKinesisストリームの破片から読み込むことができます。

    • 別々のプロセス/インスタンス内で実行中の複数の入力DStreamは、1つのKinesisストリームから読み込むことができます。

    • 各入力DStreamが1つのshardを処理する少なくとも1つの KinesisRecordProcessor スレッドを生成するため、Kinesisストリームshardより多くのKinesis入力DStreamを必要としません。

    • 水平方向のスケーリングはKinesis入力DStream(1つのプロセス内、あるいは複数のプロセス/インスタンスを横断して)の追加/削除によって行うことができます - 以前のポイントあたりの総Kinesisストリーム破片まで。

    • Kinesis入力Dstreamは全てのDStream間で負荷をバランスするでしょう - たとえプロセス/インスタンスを横断しても。

    • Kinesis入力DStreamは、ロードの変化によって再破片イベント(マージと分割)の間に負荷をバランスするでしょう

    • ベストプラクティスとして、可能であればオーバープロビジョニングによる再破片のジッターを避けることをお勧めします。

    • 各Kinesis入力DStreamは独自のチェックポイント情報を保持します。詳細はKinesisチェックポイントの章を見てください。

    • Kinesisストリーム破片と、入力DStream処理中のSparkクラスタを横断して生成されたRDDパーティション/破片の間に関連はありません。2つの独立したパーティションスキーマがあります。

例の実行

例を実行するには、

Record De-aggregation

データが Kinesis Producer Library (KPL)を使って生成される場合、メッセージはコストを減らすために集約されるかも知れません。Spark ストリーミングは消費中に自動的にレコードを反集約するでしょう。

Kinesis チェックポイント

TOP
inserted by FC2 system