Amazon AWS Kinesis ストリーム コネクタ

Kinesis コネクタは Amazon AWS Kinesis ストリームへのアクセスを提供します。

コネクタを使うには、以下のMaven依存物をプロジェクトに追加してください:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kinesis_2.11</artifactId>
  <version>1.5-SNAPSHOT</version>
</dependency>

flink-connector-kinesis_2.11Amazon Software License (ASL) 下でライセンスされたコードに依存します。flink-connector-kinesis をリンクすると、あなたのアプリケーションにASLライセンスされたコードを含むでしょう。

このライセンス問題のため、flink-connector-kinesis_2.11 アーティファクトはFlinkリリースの一部としてMavenセントラルに配備されていません。従って、ソースからあなた自身でコネクタをビルドする必要があります。

Flinkのソースコードをダウンロード、あるいはgitリポジトリからチェックアウトします。そして、モジュールをビルドするために以下のMavenコマンドを使います:

mvn clean install -Pinclude-kinesis -DskipTests
# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again.
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests

ストリーミング コネクタはバイナリ配布物の一部ではありません。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます

Amazon Kinesis ストリーム サービスの使用

KinessストリームをセットアップするにはAmazon Kinesis ストリーム開発者ガイド の説明に従います。Kinesisストリームをread/writeするために適切なIAMポリシーとユーザを作成するようにしてください。

Kinesis コンシューマ

FlinkKinesisConsumer は同じAWSサービスリージョン内で複数のAWS Kinesisストリームを購読する確実に1回の並行ストリーミングデータソースで、ストリーミングの再シャーディングをジョブの実行中に透過的に処理することができます。コンシューマの各サブタスクは複数のKinesisシャードからレコードを取り出す責任があります。各サブタスクによって取り出されたシャードの数は、シャードがKinesisによって閉じられ生成されることで変わるでしょう。

Kinesisストリームからのデータを消費する前に、全てのストリームがAWSダッシュボード内でステータス “ACTIVE” で生成されるようにしてください。

Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

val env = StreamExecutionEnvironment.getEnvironment

val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))

上はコンシューマを使う簡単な例です。コンシューマのための設定はjava.util.Properties インスタンスを使って提供されます。キーは ConsumerConfigConstants で見つけることができます。例はAWS リージョン “us-east-1” 内の1つのKinesisストリームの消費を説明します。AWSの証明書はAWS access key ID 内の基本的な方法を使って提供され、秘密アクセスキーは設定内で直接提供されます (他のオプションは ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDERENV_VAR, SYS_PROP, PROFILE および AUTOにしています)。また、データはKinesisストリーム内の最新の位置から消費されます (他のオプションはConsumerConfigConstants.STREAM_INITIAL_POSITIONTRIM_HORIZONに設定し、これはコンシューマに可能な限り最も早いレコードからKinesisストリームの読み込みを開始させるでしょう)。

コンシューマのための他の任意の設定キーはConsumerConfigConstantsで見つけることができます。

Flink Kinesisコンシューマ ソースの設定された並行度はKinesisストリーム内のシャードの総数と完全に依存しないことに注意してください。シャードの数がコンシューマの並行度より大きい場合、各コンシューマのサブタスクは複数のシャードの会員かもしれません; そうではなくもしシャードの数がコンシューマの並行度より少ない場合、いくつかのコンシューマのサブタスクは単純に仕事をせず、新しいシャードに割り当てられるまで待つでしょう (つまり、ストリームは共有されたKinesisサービスのスループットを高くするためにシャードの数を増やすために再シャードされた時)。

開始位置の設定

Flink Kinesis コンシューマは現在のところKinesisストリームのどこから読み込むかを設定するための以下のオプションを提供します。単純に提供された設定プロパティの中で以下の値の1つを ConsumerConfigConstants.STREAM_INITIAL_POSITION に設定します (オプションの名前はAWS Kinesis ストリーム サービスで使われる名前と同様に従います):

  • LATEST: 最新のレコードから始まる全てのストリームの全てのシャードを読み込む。
  • TRIM_HORIZON: 可能な限り最も早いレコードから始まる全てのストリームの全てのシャードを読み込む (維持設定に依存してデータはKinesisによってトリムされているかもしれません)。
  • AT_TIMESTAMP: 指定されたタイムスタンプから始まる全てのストリームの全てのシャードを読み込む。タイムスタンプもConsumerConfigConstants.STREAM_INITIAL_TIMESTAMPのための値を提供することで、設定プロパティの中で指定されなければなりません。以下の日付パターンのうちの1つです:
    • Unix epochから経過した秒数を表す非負のdouble値(例えば1459799926.480)。
    • ユーザ定義のパターン。これはConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMATによって提供される SimpleDateFormatについての有効なパターンです。もしConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMATが定義されていない場合、デフォルトのパターンはyyyy-MM-dd'T'HH:mm:ss.SSSXXXでしょう (例えば、ユーザによって指定されるタイムスタンプの値は 2016-04-04 でパターンは yyyy-MM-dd、あるいはパターン指定無しのタイムスタンプ2016-04-04T19:58:46.480-00:00)。

確実に1回のユーザ定義状態の更新セマンティクスのための耐障害性

Flinkのチェックポイントを有効にして、FlinkのKinesisコンシューマはKinesisストリームのシャードからのレコードを消費し、定期的に各シャードの進捗をチェックポイントするでしょう。ジョブの障害の場合は、Flinkはストリーミングプログラムを最新の完了したチェックポイントの状態に回復し、チェックポイント内に格納された進捗から初めてKinesisシャードからのレコードを再消費するでしょう。

従ってチェックポイントの取り出しの間隔は、障害時にプログラムが最大でどれだけ戻らなければならないかもしれないかを定義します。

Kinesisコンシューマの耐障害性を使うために、実行環境でトポロジのチェックポイントが有効にされなければなりません。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

もしトポロジを再起動するために十分な処理スロットが利用可能な場合は、Flinkはトポロジを再起動だけすることができることにも注意してください。したがって、もしTaskManagerの喪失のためにトポロジが失敗する場合は、後で利用可能な十分のスロットがそれでもまだなければなりません。YARN上のFlinkは喪失したYARNコンテナの自動的な再起動をサポートします。

消費されたレコードのためのイベント時間

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

もしストリーミング トポロジがレコードのタイムスタンプのためのイベント時間の概念を使用する場合、デフォルトでは近似到着タイムスタンプ が使われるでしょう。will be used by default. このタイムスタンプは、レコードの受信が成功しストリームによって格納されると、Kinesisによってレコードに付けられます。このタイムスタンプは一般的にKinesisのサーバ側のタイムスタンプとして参照され、精度あるいは順番の正しさに何も保証がない (つまり、タイムスタンプは常に増えないかもしれない)ことに注意してください。

ユーザはこのデフォルトをここで説明したように独自のタイムスタンプで上書きすることを選択するか、あるいは事前定義されたものから1つを使うことができます。そうした後で、以下の方法でコンシューマに渡すことができます:

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)

スレッド モデル

Flink Kinesis コンシューマはシャードの発見とデータの消費に複数のスレッドを使います。

シャードの発見のために、コンシューマが開始された時にたとえサブタスクが最初に読み込むシャードが無くても、各並行コンシューマのサブタスクは絶え間なくシャード情報をKinesisに質問する1つのスレッドを持つでしょう。別の言い方をすると、もしコンシューマが並行度10で実行する場合、購買されているストリーム内のシャードの総数に関係なく合計10のスレッドが絶え間なくKinesisに質問するでしょう。

データの消費のために、発見された各シャードを消費するために1つのスレッドが生成されるでしょう。ストリームの再シャードの結果、消費する責任があるシャードが閉じられた時に、スレッドが終了するでしょう。別の言い方をすると、1つの開かれたシャードごとに常に1つのスレッドがあるでしょう。

内部的に使われるKinesis API

Flink Kinesis コンシューマは、シャードの発見とデータの消費のために内部的にKinesis APIを呼び出すために、AWS Java SDKを使います。Amazon のAPI上のKinesisストリームのサービス制限により、コンシューマはユーザが実行しているかもしれない非Flinkの消費アプリケーションと競争するでしょう。以下は、コンシューマがAPIをどうやって使うかの説明があるコンシューマによって呼ばれるAPIのリストと、これらのサービス制限によりFlink Kinesisコンシューマが持つかもしれないエラーあるいは警告をどうやって扱うかの情報です。

  • DescribeStream: これは、ストリームの再シャードの結果として新しいシャードを発見するために各並行コンシューマのサブタスク内の1つのスレッドによって絶え間なく呼ばれます。デフォルトでは、コンシューマは10秒の間隔でシャードの捜索を行い、Kinesisから結果を取得するまで永久に繰り返すでしょう。もしこれが他の非Flinkの消費アプリケーションと干渉する場合、ユーザは提供された設定プロパティ内のConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLISの値を設定することで、このAPIの呼び出しのコンシューマを低速にすることができます。これは異なる値の発見間隔を設定します。シャードはこの間隔の間発見されないため、この設定は新しいシャードの発見とそれの消費の開始の最大遅延に直接影響することに注意してください。

  • GetShardIterator: シャードごとの消費スレッドが開始された時に一度だけ呼ばれます。そしてもしKinesisがAPIのトランザクションの上限を超えたと訴えた場合にデフォルトの3回まで再試行するでしょう。このAPIのレートの上限はシャード(ストリームあたりでは無く)あたりのため、コンシューマ自身はこの上限を超えないことに注意してください。通常、これが起きた場合はユーザはこのAPIを呼んでいる他の非Flink消費アプリケーションを低速にするか、提供された設定プロパティの内のConsumerConfigConstants.SHARD_GETITERATOR_*が前につくキーを設定することでコンシューマ内のこのAPI呼び出しの再試行の挙動を修正することができます。

  • GetRecords: これはKinesisからレコードを取得するためのシャード消費スレッドごとに絶え間なく呼ばれます。シャードが複数の同時に起こるコンシューマを持つ場合(他の非Flink消費アプリケーションが動いている場合)、シャードあたりのレートの制限を超えるかもしれません。デフォルトではこのAPIの各呼び出し時に、もしKinesisがAPIのデータサイズ/トランザクションの上限を超えたと訴えた場合に、コンシューマはデフォルトの3回まで再試行するでしょう。ユーザは他の非Flink消費アプリケーションを低速にするか、提供された設定プロパティの中のConsumerConfigConstants.SHARD_GETRECORDS_MAXConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS キーを設定することで、コンシューマのスループットを調整することができます。前者の設定はレコードの最大数を調整し各消費スレッドは各呼び出し毎にシャードから読み込もうとします(デフォルトは10,000)。一方で後者は各呼び出し毎のsleep間隔を修正します(デフォルトは200)。このAPIを呼ぶときのコンシューマの再試行の挙動はConsumerConfigConstants.SHARD_GETRECORDS_*が前につく他のキーを使って修正することもできます。

Kinesis プロデューサ

FlinkKinesisProducer はデータをFlinkストリームからKinesisストリームに置くためにKinesis Producer Library (KPL) を使います。

プロデューサはFlinkのチェックポイントに参加しておらず、確実に1回の処理の保証を提供しないことに注意してください。また、Kinesisのプロデューサはレコードをシャードに順番に書き込む保証がありません (詳細はここここを見てください)。

障害あるいは再シャードの場合、データはKinesisに再び書き込まれ、重複に繋がります。この挙動は通常 “at-least-once” セマンティクスと呼ばれます。

データをKinesisストリームに持っていくためには、ストリームがAWSダッシュボード内で “ACTIVE” としてマークされているようにしてください。

動作の監視のために、ストリームにアクセスするユーザはクラウド ウォッチ サービスへのアクセスを必要とします。

Properties producerConfig = new Properties();
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");
producerConfig.put("ThreadPoolSize", "15");

// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");

DataStream<String> simpleStringStream = ...;
simpleStringStream.addSink(kinesis);
val producerConfig = new Properties();
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional KPL configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");
producerConfig.put("ThreadPoolSize", "15");

// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST");

val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");

val simpleStringStream = ...;
simpleStringStream.addSink(kinesis);

上はプロデューサを使う簡単な例です。FlinkKinesisProducerを初期化するには、ユーザはjava.util.Propertiesインスタンスを使って AWS_REGION, AWS_ACCESS_KEY_IDおよび AWS_SECRET_ACCESS_KEY を渡す必要があります。ユーザは背後で動作する FlinkKinesisProducer KPLをカスタマイズするために任意のパラメータとしてKPLの設定を渡すこともできます。KPLの完全なリストと説明は ここで見つかります。例はAWS リージョン “us-east-1” 内の1つのKinesisストリームの生成を説明します。

ユーザがKPL設定と値を指定しない場合、FlinkKinesisProducerRateLimitを除くデフォルトのKPLの値を使うでしょう。RateLimit バックエンドの制限のパーセンテージとして、シャードのための最大許可されるputレートを制限します。KPLのデフォルト値は150ですが、頻度が高いとKPLはRateLimitExceededExceptionを投げ、結果としてFlinkシンクは壊れます。従って、FlinkKinesisProducerはKPLのデフォルト値を100に上書きします。

SerializationSchemaの代わりに、KinesisSerializationSchemaもサポートします。KinesisSerializationSchemaを使ってデータを複数のストリームに送信することができます。これはKinesisSerializationSchema.getTargetStream(T element)メソッドを使って行われます。nullの返却はプロデューサに要素をデフォルトのストリームに書き込むことを指示します。そうでなければ、返却されるストリーム名が使われます。

スレッド モデル

Flink 1.4.0 から FlinkKinesisProducer は背後で動作するデフォルトのKPLを1スレッド1リクエストからスレッドプールモードに切り替えます。スレッドプールモード内のKPLはKinesisへのリクエストを実行するためにキューとスレッドプールを使います。これはKPLのネイティブプロセスが生成することができるスレッドの数を制限し、従ってCPUの使用率を多いに低下し、効率を改善します。従って、Flinkユーザがスレッドプールモードを使うことを大いにお勧めします。 デフォルトのスレッドプールのサイズは10です。上の例で示すように、ThreadPoolSizeキーを使ってjava.util.Propertiesインスタンス内のプールサイズを設定することができます。

上の例でコメントアウトされたコード内で示すように、java.util.Properties内のThreadingModelPER_REQUEST のキー-値ペアの設定によって1スレッド1リクエストに切り替えることができます。

テストのための非AWS Kinesisエンドポイントの使用

時にはKinesaliteのような非AWS Kinesisエンドポイントに対してFlink操作をコンシューマあるいはプロデューサとして持つことが望ましいことがあります; これは特にFlinkアプリケーションの機能テストを実施する時に便利です。通常Flink設定の中で設定されるAWSリージョンによって推測されるAWS エンドポイントは、設定プロパティによって上書きされなければなりません。

AWSエンドポイントを上書きするには、例としてプロデューサを取り上げると、 Flinkによって要求されるAWSConfigConstants.AWS_REGIONに加えて、Flink設定内のAWSConfigConstants.AWS_ENDPOINT を設定します。リージョンは必須ですが、それはAWSエンドポイントのURLを決定するためには使われないでしょう。

以下の例は、どうやってAWSConfigConstants.AWS_ENDPOINT 設定プロパティを提供できるかを示します:

Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
val producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");

上に戻る

TOP
inserted by FC2 system