Amazon AWS Kinesis ストリーム コネクタ

Kinesis コネクタは Amazon AWS Kinesis ストリームへのアクセスを提供します。

コネクタを使うには、以下のMaven依存物をプロジェクトに追加してください:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kinesis_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

flink-connector-kinesis_2.10Amazon Software License (ASL) 下でライセンスされたコードに依存します。flink-connector-kinesis をリンクすると、あなたのアプリケーションにASLライセンスされたコードを含むでしょう。

このライセンス問題のため、flink-connector-kinesis_2.10 アーティファクトはFlinkリリースの一部としてMavenセントラルに配備されていません。従って、ソースからあなた自身でコネクタをビルドする必要があります。

Flinkのソースコードをダウンロード、あるいはgitリポジトリからチェックアウトします。そして、モジュールをビルドするために以下のMavenコマンドを使います:

mvn clean install -Pinclude-kinesis -DskipTests
# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again.
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests

ストリーミング コネクタはバイナリ配布物の一部ではありません。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます

Amazon Kinesis ストリーム サービスの使用

KinessストリームをセットアップするにはAmazon Kinesis ストリーム開発者ガイド の説明に従います。Kinesisストリームをread/writeするために適切なIAMポリシーとユーザを作成するようにしてください。

Kinesis コンシューマ

FlinkKinesisConsumer は同じAWSサービスリージョン内で複数のAWS Kinesisストリームを購読する確実に1回の並行ストリーミングデータソースで、ストリーミングの再シャーディングを処理することができます。コンシューマの各サブタスクは複数のKinesisシャードからレコードを取り出す責任があります。各サブタスクによって取り出されたシャードの数は、シャードがKinesisによって閉じられ生成されることで変わるでしょう。

Kinesisストリームからのデータを消費する前に、全てのストリームがAWSダッシュボード内でステータス “ACTIVE” で生成されるようにしてください。

Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

val env = StreamExecutionEnvironment.getEnvironment

val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))

上はコンシューマを使う簡単な例です。Configuration for the consumer is supplied with a java.util.Properties instance, the configuration keys for which can be found in ConsumerConfigConstants. 例はAWS リージョン “us-east-1” 内の1つのKinesisストリームの消費を説明します。AWSの証明書はAWS access key ID 内の基本的な方法を使って提供され、秘密アクセスキーは設定内で直接提供されます (他のオプションは ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDERENV_VAR, SYS_PROP, PROFILE および AUTOにしています)。また、データはKinesisストリーム内の最新の位置から消費されます (他のオプションはConsumerConfigConstants.STREAM_INITIAL_POSITIONTRIM_HORIZONに設定し、これはコンシューマに可能な限り最も早いレコードからKinesisストリームの読み込みを開始させるでしょう)。

コンシューマのための他の任意の設定キーはConsumerConfigConstantsで見つけることができます。

注意: もし何もしないコンシューマのサブタスクがある場合、現在のところ再シャーディングは透過的(つまりジョブの失敗や再起動無し)に処理することができません。これはシャードの総数が設定されたコンシューマの並行度より少ない場合に起こります。ジョブが回復された後で、再シャーディングによる新しいシャードが正しく取り上げられ、Kinesisコンシューマによって消費されるように、ジョブはチェックポイントが可能なように設定されなければなりません。これは一時的な制限で、将来のバージョンでは解決されているでしょう。詳細はFLINK-4341 を見てください。

開始位置の設定

The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting ConsumerConfigConstants.STREAM_INITIAL_POSITION to one of the following values in the provided configuration properties (the naming of the options identically follows the namings used by the AWS Kinesis Streams service):

  • LATEST: 最新のレコードから始まる全てのストリームの全てのシャードを読み込む。
  • TRIM_HORIZON: 可能な限り最も早いレコードから始まる全てのストリームの全てのシャードを読み込む (維持設定に依存してデータはKinesisによってトリムされているかもしれません)。
  • AT_TIMESTAMP: 指定されたタイムスタンプから始まる全てのストリームの全てのシャードを読み込む。タイムスタンプもConsumerConfigConstants.STREAM_INITIAL_TIMESTAMPの値を提供することで設定プロパティ内で指定されなければなりません。日付パターン yyyy-MM-dd'T'HH:mm:ss.SSSXXX (例えば、2016-04-04T19:58:46.480-00:00)、あるいは Unix epochから経過した秒数で表される非負のdouble値 (例えば、1459799926.480)のどちらか。

確実に1回のユーザ定義状態の更新セマンティクスのための耐障害性

Flinkのチェックポイントを有効にして、FlinkのKinesisコンシューマはKinesisストリームのシャードからのレコードを消費し、定期的に各シャードの進捗をチェックポイントするでしょう。ジョブの障害の場合は、Flinkはストリーミングプログラムを最新の完了したチェックポイントの状態に回復し、チェックポイント内に格納された進捗から初めてKinesisシャードからのレコードを再消費するでしょう。

従ってチェックポイントの取り出しの間隔は、障害時にプログラムが最大でどれだけ戻らなければならないかもしれないかを定義します。

Kinesisコンシューマの耐障害性を使うために、実行環境でトポロジのチェックポイントが有効にされなければなりません。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

もしトポロジを再起動するために十分な処理スロットが利用可能な場合は、Flinkはトポロジを再起動だけすることができることにも注意してください。したがって、もしTaskManagerの喪失のためにトポロジが失敗する場合は、後で利用可能な十分のスロットがそれでもまだなければなりません。YARN上のFlinkは喪失したYARNコンテナの自動的な再起動をサポートします。

消費されたレコードのためのイベント時間

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

もしストリーミング トポロジがレコードのタイムスタンプのためのイベント時間の概念を使用する場合、デフォルトでは近似到着タイムスタンプ が使われるでしょう。will be used by default. このタイムスタンプは、レコードの受信が成功しストリームによって格納されると、Kinesisによってレコードに付けられます。このタイムスタンプは一般的にKinesisのサーバ側のタイムスタンプとして参照され、精度あるいは順番の正しさに何も保証がない (つまり、タイムスタンプは常に増えないかもしれない)ことに注意してください。

ユーザはこのデフォルトをここで説明したように独自のタイムスタンプで上書きすることを選択するか、あるいは事前定義されたものから1つを使うことができます。そうした後で、以下の方法でコンシューマに渡すことができます:

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)

スレッド モデル

Flink Kinesis コンシューマはシャードの発見とデータの消費に複数のスレッドを使います。

シャードの発見のために、コンシューマが開始された時にたとえサブタスクが最初に読み込むシャードが無くても、各並行コンシューマのサブタスクは絶え間なくシャード情報をKinesisに質問する1つのスレッドを持つでしょう。別の言い方をすると、もしコンシューマが並行度10で実行する場合、購買されているストリーム内のシャードの総数に関係なく合計10のスレッドが絶え間なくKinesisに質問するでしょう。

データの消費のために、発見された各シャードを消費するために1つのスレッドが生成されるでしょう。ストリームの再シャードの結果、消費する責任があるシャードが閉じられた時に、スレッドが終了するでしょう。別の言い方をすると、1つの開かれたシャードごとに常に1つのスレッドがあるでしょう。

内部的に使われるKinesis API

Flink Kinesis コンシューマは、シャードの発見とデータの消費のために内部的にKinesis APIを呼び出すために、AWS Java SDKを使います。Amazon のAPI上のKinesisストリームのサービス制限により、コンシューマはユーザが実行しているかもしれない非Flinkの消費アプリケーションと競争するでしょう。以下は、コンシューマがAPIをどうやって使うかの説明があるコンシューマによって呼ばれるAPIのリストと、これらのサービス制限によりFlink Kinesisコンシューマが持つかもしれないエラーあるいは警告をどうやって扱うかの情報です。

  • DescribeStream: これは、ストリームの再シャードの結果として新しいシャードを発見するために各並行コンシューマのサブタスク内の1つのスレッドによって絶え間なく呼ばれます。デフォルトでは、コンシューマは10秒の間隔でシャードの捜索を行い、Kinesisから結果を取得するまで永久に繰り返すでしょう。もしこれが他の非Flinkの消費アプリケーションと干渉する場合、ユーザは提供された設定プロパティ内のConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLISの値を設定することで、このAPIの呼び出しのコンシューマを低速にすることができます。これは異なる値の発見間隔を設定します。シャードはこの間隔の間発見されないため、この設定は新しいシャードの発見とそれの消費の開始の最大遅延に直接影響することに注意してください。

  • GetShardIterator: シャードごとの消費スレッドが開始された時に一度だけ呼ばれます。そしてもしKinesisがAPIのトランザクションの上限を超えたと訴えた場合にデフォルトの3回まで再試行するでしょう。このAPIのレートの上限はシャード(ストリームあたりでは無く)あたりのため、コンシューマ自身はこの上限を超えないことに注意してください。通常、これが起きた場合はユーザはこのAPIを呼んでいる他の非Flink消費アプリケーションを低速にするか、提供された設定プロパティの内のConsumerConfigConstants.SHARD_GETITERATOR_*が前につくキーを設定することでコンシューマ内のこのAPI呼び出しの再試行の挙動を修正することができます。

  • GetRecords: これはKinesisからレコードを取得するためのシャード消費スレッドごとに絶え間なく呼ばれます。シャードが複数の同時に起こるコンシューマを持つ場合(他の非Flink消費アプリケーションが動いている場合)、シャードあたりのレートの制限を超えるかもしれません。デフォルトではこのAPIの各呼び出し時に、もしKinesisがAPIのデータサイズ/トランザクションの上限を超えたと訴えた場合に、コンシューマはデフォルトの3回まで再試行するでしょう。ユーザは他の非Flink消費アプリケーションを低速にするか、提供された設定プロパティの中のConsumerConfigConstants.SHARD_GETRECORDS_MAXConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS キーを設定することで、コンシューマのスループットを調整することができます。Setting the former adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 100), while the latter modifies the sleep interval between each fetch (there will be no sleep by default). このAPIを呼ぶときのコンシューマの再試行の挙動はConsumerConfigConstants.SHARD_GETRECORDS_*が前につく他のキーを使って修正することもできます。

Kinesis プロデューサ

FlinkKinesisProducer はデータをFlinkストリームからKinesisストリームに持っていく時に使うことができます。プロデューサはFlinkのチェックポイントに参加しておらず、確実に1回の処理の保証を提供しないことに注意してください。また、Kinesisのプロデューサはレコードをシャードに順番に書き込む保証がありません (詳細はここここを見てください)。

障害あるいは再シャードの場合、データはKinesisに再び書き込まれ、重複に繋がります。この挙動は通常 “at-least-once” セマンティクスと呼ばれます。

データをKinesisストリームに持っていくためには、ストリームがAWSダッシュボード内で “ACTIVE” としてマークされているようにしてください。

動作の監視のために、ストリームにアクセスするユーザはクラウド ウォッチ サービスへのアクセスを必要とします。

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");

DataStream<String> simpleStringStream = ...;
simpleStringStream.addSink(kinesis);
val producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");

val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");

val simpleStringStream = ...;
simpleStringStream.addSink(kinesis);

上はプロデューサを使う簡単な例です。コンシューマについて上で説明されたように、必須の設定値を持つプロデューサのための設定は java.util.Properties を使って提供されます。例はAWS リージョン “us-east-1” 内の1つのKinesisストリームの生成を説明します。

SerializationSchemaの代わりに、KinesisSerializationSchemaもサポートします。KinesisSerializationSchemaを使ってデータを複数のストリームに送信することができます。これはKinesisSerializationSchema.getTargetStream(T element)メソッドを使って行われます。nullの返却はプロデューサに要素をデフォルトのストリームに書き込むことを指示します。そうでなければ、返却されるストリーム名が使われます。

プロデューサのための他の任意の設定キーはProducerConfigConstantsで見つかります。

テストのための非AWS Kinesisエンドポイントの使用

時にはKinesaliteのような非AWS Kinesisエンドポイントに対してFlink操作をコンシューマあるいはプロデューサとして持つことが望ましいことがあります; これは特にFlinkアプリケーションの機能テストを実施する時に便利です。通常Flink設定の中で設定されるAWSリージョンによって推測されるAWS エンドポイントは、設定プロパティによって上書きされなければなりません。

AWSエンドポイントを上書きするには、例としてプロデューサを取り上げると、 Flinkによって要求されるProducerConfigConstants.AWS_REGIONに加えて、Flink内のProducerConfigConstants.AWS_ENDPOINT を設定します。リージョンは必須ですが、それはAWSエンドポイントのURLを決定するためには使われないでしょう。

以下の例は、どうやってProducerConfigConstants.AWS_ENDPOINT 設定プロパティを提供できるかを示します:

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
val producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
TOP
inserted by FC2 system