Spark ストリーミング + Kinesis 統合
Amazon Kinesis は大規模のストリーミングデータのリアルタイム処理のための完全に管理されたサービスです。Kinesis レシーバーは、Amazon Software License (ASL)のもとにAmazonによって提供されるKinesis Client Library (KCL)を使って入力DStreamを生成します。KCLはAWS Java SDKにライセンスされたApache2.0上に構築され、ロードバランシング、ワーカーの概念を通じてのチェックポイント、チェックポイント、およびShard Leasesを提供します。SparkストリーミングがKinesisからデータを受信するための方法を説明します。
Kinesisの設定
Kinesis ストリームは以下のguideごとに1つ以上のshardを使って1つの有効なKinesisエンドポイント上にセットアップすることができます。
Sparkストリーミングアプリケーションの設定
-
Linking: SBT/Maven プロジェクト定義を使ったScala/Javaアプリケーションについては、ストリーミングアプリケーションを以下のアーチファクトにリンクします (更に詳しい情報はメインのプログラミングガイドのLinking sectionを見てください)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.11 version = 2.1.1
Pythonアプリケーションに関しては、アプリケーションをデプロイする時に、この上のライブラリとその依存物を追加する必要があるでしょう。以下のDeployingのサブセクションを見てください。このライブラリにリンクすることで、アプリケーション内にASLにライセンスされたコードを含むことになることに注意してください。
-
プログラミング ストリーミングアプリケーションコードの中で、
KinesisUtils
をインポートし、以下のようにバイト配列の入力DStreamを生成します:import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream val kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
API ドキュメント と例を見てください。例を実行する方法についての説明は、例の実行のサブセクションを参照してください。
import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.kinesis.*; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
API ドキュメント と例を見てください。例を実行する方法についての説明は、例の実行のサブセクションを参照してください。
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
API ドキュメント と例を見てください。例を実行する方法についての説明は、例の実行のサブセクションを参照してください。
パーティションキーのような,c2>Record内に含まれる他のデータを使いたい場合には、Kinesis
Record
を取り一般的なオブジェクトT
を返す"メッセージハンドラー関数"も提供するかも知れません。これは現在のところScalaとJavaでのみサポートされています。import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream val kinesisStream = KinesisUtils.createStream[T]( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, [message handler])
import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.kinesis.*; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, [message handler], [class T]);
-
streamingContext
: StreamingContext はこのKinesisアプリケーションをKinesisストリームに結びつけるために、Kinesisで使用されるアプリケーション名を含みます。 [Kinesis app name]
: DynamoDBテーブル内でKinesisシーケンス番号をチェックポイントするために使われるだろうアプリケーション名。- アプリケーション名は指定されたアカウントおよび地域でユニークでなければなりません。
- テーブルが存在するが正しくないチェックポイント情報を持つ(異なるストリームあるいは古くて期限が切れたシーケンス番号)場合、一時的なエラーになるかも知れません。
-
[Kinesis stream name]
: このストリーミングアプリケーションがデータを取り出すKinesisストリーム。 -
[endpoint URL]
: 有効なKinesisエンドポイントURLはここで見つかるでしょう。 -
[region name]
: 有効なKinesis地域名はここで見つかるでしょう。 -
[checkpoint interval]
: Kinesis Client Library がストリーム内の場所を保存する間隔 (例えば、Duration(2000) = 2 秒)。初めての人は、それをストリーミングアプリケーションのバッチの間隔と同じに背手地します。 -
[initial position]
:InitialPositionInStream.TRIM_HORIZON
あるいはInitialPositionInStream.LATEST
のどちらかです (詳細は、Kinesis チェックポイント
の章と、Amazon Kinesis API ドキュメント
を見てください)。 [message handler]
: KinesisのRecord
を取り、一般的なT
を出力する関数。
APIの他のバージョンでは、AWSアクセスキーと秘密キーを直接指定することもできます。
-
-
Deploying: Sparkアプリケーションと同様に、アプリケーションを起動するために
spark-submit
が使われます。しかし、Scala/JavaアプリケーションとPythonアプリケーションについては、詳細が少し異なります。Scala と Java アプリケーションについては、もしSBTあるいはMavenをプロジェクト管理に使っている場合、
spark-streaming-kinesis-asl_2.11
とその依存物をアプリケーションJARにパッケージします。spark-core_2.11
とspark-streaming_2.11
は既にSparkインストレーションの中に存在するため、それらがprovided
依存物として印がつけられるようにしてください。そして、アプリケーションを起動するためにspark-submit
を使います (メインプログラムのDeploying section を見てください)。SBT/Mavenプロジェクト管理が欠けているPythonアプリケーションについては、
spark-streaming-kinesis-asl_2.11
とその依存物は--packages
を使ってspark-submit
に直接追加することができます (アプリケーション submissionガイドを見てください)。つまり、./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.1 ...
もう一つの方法として、Maven repository からMaven artifact
spark-streaming-kinesis-asl-assembly
のJARをダウンロードし、それを--jars
を使ってspark-submit
に追加することもできます。実行時に覚えておくべきこと:
-
Kinesis データ処理はパーティションごとに整列され、メッセージごとに少なくとも1回起こります。
-
複数のアプリケーションが同じKinesisストリームから読み込むことができます。Kinesisはアプリケーション固有の破片とチェックポイントの情報をDynamoDBに保持するでしょう。
-
1つのKinesis ストリームの破片は、一度に一つの入力DStreamによって処理されます。
-
1つのKinesis入力DStreamは、複数のKinesisRecordProcessorスレッドを作成することで複数のKinesisストリームの破片から読み込むことができます。
-
別々のプロセス/インスタンス内で実行中の複数の入力DStreamは、1つのKinesisストリームから読み込むことができます。
-
各入力DStreamが1つのshardを処理する少なくとも1つの KinesisRecordProcessor スレッドを生成するため、Kinesisストリームshardより多くのKinesis入力DStreamを必要としません。
-
水平方向のスケーリングはKinesis入力DStream(1つのプロセス内、あるいは複数のプロセス/インスタンスを横断して)の追加/削除によって行うことができます - 以前のポイントあたりの総Kinesisストリーム破片まで。
-
Kinesis入力Dstreamは全てのDStream間で負荷をバランスするでしょう - たとえプロセス/インスタンスを横断しても。
-
Kinesis入力DStreamは、ロードの変化によって再破片イベント(マージと分割)の間に負荷をバランスするでしょう
-
ベストプラクティスとして、可能であればオーバープロビジョニングによる再破片のジッターを避けることをお勧めします。
-
各Kinesis入力DStreamは独自のチェックポイント情報を保持します。詳細はKinesisチェックポイントの章を見てください。
-
Kinesisストリーム破片と、入力DStream処理中のSparkクラスタを横断して生成されたRDDパーティション/破片の間に関連はありません。2つの独立したパーティションスキーマがあります。
-
例の実行
例を実行するには、
-
ダウンロードサイトからSparkのバイナリをダウンロードします。
-
AWS内でKinesisストリーム(前の章を見てください)をセットアップします。Kinesisストリームの名前と、ストリームが生成された地域へ対応するエンドポイントURLに注意してください。
-
環境変数
AWS_ACCESS_KEY_ID
とAWS_SECRET_KEY
をAWS証明を使ってセットアップします。 -
Sparkのルートディレクトリで、以下のように例を実行します
bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.1 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.1 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
bin/spark-submit --jars external/kinesis-asl/target/scala-*/\ spark-streaming-kinesis-asl-assembly_*.jar \ external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
これはデータがKinesisストリームから受け取られるのを待つでしょう。
-
Kinesisストリームにプットするためにランダムな文字列データを生成するには、他の端末で関連したKinesisデータプロデューサを実行します。
bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
これは秒間あたり1000行の、行あたり10のランダムな数値をKinesisストリームにプッシュします。このデータは実行中の例によって受信され処理されなければなりません。
Record De-aggregation
データが Kinesis Producer Library (KPL)を使って生成される場合、メッセージはコストを減らすために集約されるかも知れません。Spark ストリーミングは消費中に自動的にレコードを反集約するでしょう。
Kinesis チェックポイント
-
各Kinesis入力DStreamは定期的に現在のストリームの場所を、背後にあるDynamoDBテーブルに保存します。これによりシステムは障害から回復することができ、DStreamが終了するまで処理を続けることができます。
-
あまりに頻繁にチェックポイントをすると、AWSチェックポイントストレージ層に過剰な負荷を起こし、AWSを抑圧することになるかも知れません。提供された例はこの抑圧をrandom-backoff-retryストラテジを使って処理します。
-
もしKinesisチェックポイント情報が入力DStreamが開始した時になければ、利用可能な最も古いレコード(
InitialPositionInStream.TRIM_HORIZON
)、あるいは最新のtip(InitialPositionInStream.LATEST
)のどちらかから開始するでしょう。これは設定可能です。InitialPositionInStream.LATEST
は、入力DStreamが実行していない(そしてチェックポイント情報が格納されていない)間にもしデータが追加された場合はレコードの損失に繋がるかも知れません。- 効果がチェックポイントの頻度と処理の冪等性に依存する場合、
InitialPositionInStream.TRIM_HORIZON
はレコードの処理の重複に繋がるかも知れません。