このコネクタはApache Kafkaによって提供されるイベントストリームへのアクセスを提供します。
FlinkはデータをKafkaトピックから/への読み込みおよび書き込みのための特別なKafkaコネクタを提供します。Flink Kafka コンシューマは確実に1回の処理のセマンティクスを提供するためにFlinkのチェックポイント機構を統合します。これを行うために、FlinkはKafkaのコンシューマグループのオフセットトラッキングに完全に依存しているわけではありませんが、内部的にもこれらのオフセットを追跡およびチェックポイントを行います。
ユースケースと環境のために、パッケージ(maven artifact id)とクラス名を選択してください。ほとんどのユーザについては、FlinkKafkaConsumer08
(flink-connector-kafka
の一部) が適切でしょう。
Maven 依存 | 以下からサポートされています | コンシューマと プロデューサ クラス名 |
Kafka のバージョン | 備考 |
---|---|---|---|---|
flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 |
0.8.x | KafkaのSimpleConsumer API を内部的に使います。オフセットはFlinkによって ZK にコミットされます。 |
flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 |
0.9.x | 新しいコンシューマ API Kafka を使います。 |
flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 |
0.10.x | このコネクタは生成と消費の両方のために、タイムスタンプを持つKafkaメッセージをサポートします。 |
flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 |
0.11.x | 0.11.xからKafkaはscala 2.10をサポートしません。このコネクタはプロデューサのための確実に1回のセマンティックを提供するためのKafka トランザクション メッセージをサポートします。 |
そして、mavenプロジェクトにコネクタをインポートします:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます
config/server.properties
ファイル内のadvertised.host.name
設定がマシーンのIPアドレスに設定されている必要があります。FlinkのKafka コンシューマは FlinkKafkaConsumer08
(あるいはKafka 0.9.0.xバージョンについては 09
など) と呼ばれます。1つ以上のKafkaトピックへのアクセスを提供します。
コンストラクタは以下の引数を受け付けます:
例:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
.print
DeserializationSchema
Flink Kafka コンシューマはKafka内のバイナリデータを Java/Scala オブジェクトに変換する方法を知る必要があります。DeserializationSchema
によりユーザはそのような方法を指定することができます。Kafkaから値を渡す時に、T deserialize(byte[] message)
メソッドは各Kafkaメッセージごとに呼ばれます。
通常はAbstractDeserializationSchema
から始めるのが役に立ちます。これは 生成された Java/Scala 型の Flink の型システムに説明する面倒をみます。vanilla DeserializationSchema
を実装するユーザはgetProducedType(...)
メソッド自身を実装する必要があります。
Kafkaメッセージのキーおよび値の両方にアクセスするために、KeyedDeserializationSchema
は以下のデシリアライズ メソッド` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)` を持ちます。
利便性のために、Flinkは以下のスキーマを提供します:
TypeInformationSerializationSchema
(および TypeInformationKeyValueSerializationSchema
) はFlinkのTypeInformation
に基づいたスキーマを生成します。これはデータがFlinkによって書き込みおよび読み込みの両方をされる場合に有用です。このスキーマは他の一般的なシリアライズ化の方法に代わる効率的なFlink固有のものです。
JsonDeserializationSchema
(および JSONKeyValueDeserializationSchema
) はシリアライズ化された JSON を ObjectNode オブジェクトに変換します。これは objectNode.get(“field”).as(Int/String/…)() を使ってアクセスすることができます。KeyValue objectNode は 全てのフィールドを含む “key” および “value” フィールドと、このメッセージについてのオフセット/パーティション/トピックを公開する任意の “metadata” フィールドを含みます。
何らかの理由でデシリアライズできない不正なメッセージに遭遇した場合、2つの選択肢があります - ジョブを失敗し再起動させるだろうdeserialize(...)
メソッドから例外を投げるか、Flink Kafka コンシューマが不正なメッセージを静かにスキップできるように null
を返すかのどちらかです。コンシューマの耐障害性(詳細は下の章を見てください)のために、不正メッセージのジョブの障害はコンシューマに再びメッセージをデシリアライズをさせようとするでしょう。従って、もしデシリアライズ化がまだ失敗する場合、コンシューマは非停止の再起動に陥り、その不正なメッセージで失敗し続けるでしょう。
Flink Kafkaコンシューマにより、Kafkaパーティションの開始位置がどのように決定されるかを設定することができます。
例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromGroupOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
...
全てのバージョンの Flink Kafka コンシューマは開始位置についての上の明示的な設定メソッドを持ちます。
setStartFromGroupOffsets
(デフォルトの挙動): Kafkaブローカー(あるいはKafka 0.8については Zookeeper)内のコンシューマグループ (コンシューマプロパティ内のgroup.id
設定) のコミットされたオフセットからパーティションを読み込みます。もしパーティションについてのオフセットが見つけることができない場合は、パーティション内のauto.offset.reset
が使われるでしょう。setStartFromEarliest()
/ setStartFromLatest()
: 最も早い/最新のレコードから開始します。これらのモード下では、Kafka内でコミットされたオフセットは無視され、開始位置としては使われないでしょう。各パーティションについてコンシューマが開始しなければならない性格なオフセットを指定することができます:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
上の例はトピックmyTopic
のパーティション 0,1,2 について、コンシューマが指定されたオフセットから開始するように設定します。オフセットの値はコンシューマが各パーティションについて読み込まなければならない次のレコードでなければなりません。もしコンシューマが提供されたオフセットマップ内の指定のオフセットを持たないパーティションを読み込む必要がある場合は、特定のパーティションについてのデフォルトのグループオフセットの挙動(つまり setStartFromGroupOffsets()
) に戻るだろうことに注意してください。
これら開始位置の設定メソッドは、障害時に自動的にジョブが回復される場合やセーブポイントを使って手動で回復される場合には、開始位置に影響しないことに注意してください。回復時には、各Kafkaパーティションの開始位置はセーブポイントあるいはチェックポイント内に格納されたオフセットから決定されます (コンシューマのための耐障害性を有効にするためのチェックポイントについての情報は次の章を見てください)。
Flinkのチェックポイントを有効にしながら、FlinkのKafkaコンシューマはトピックからのレコードを消費し、定期的に全てのKafkaのオフセットを他のオペレーションの状態と一緒に矛盾の無い方法でチェックポイントするでしょう。ジョブの障害時の場合は、Flinkはストリーミングプログラムを最新のチェックポイントの状態へ回復し、チェックポイント内に格納されたオフセットから開始してKafkaからのレコードを再消費するでしょう。
従ってチェックポイントの取り出しの間隔は、障害時にプログラムが最大でどれだけ戻らなければならないかもしれないかを定義します。
Kafkaコンシューマの耐障害性を使うために、実行環境でトポロジのチェックポイントが有効にされなければなりません。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
もしトポロジを再起動するために十分な処理スロットが利用可能な場合は、Flinkはトポロジを再起動だけすることができることにも注意してください。つまり、もしTaskManagerの喪失のためにトポロジが失敗する場合は、後で利用可能な十分のスロットがそれでもまだなければなりません。YARN上のFlinkは喪失したYARNコンテナの自動的な再起動をサポートします。
チェックポイントが有効ではない場合は、Kafkaコンシューマは定期的にZooKeeperにオフセットをコミットするでしょう。
Flink Kafka コンシューマは動的に生成されたKafkaパーティションの発見をサポートし、確実に1回の保証でそれらを消費します。パーティションのメタデータの初期検索(つまり、ジョブが実行を始めた時)の後で発見された全てのパーティションは最も早い可能なオフセットから消費されるでしょう。
デフォルトでは、パーティションの発見は無効です。有効にするには、指定されたプロパティ設定の中でミリ秒単位の発見間隔を表すflink.partition-discovery.interval-millis
に非負の値を設定します。
制限 コンシューマがFlink 1.3.x より前のFlinkバージョンからセーブポイントを回復する場合、パーティションの発見は回復の実行上で有効にすることができません。有効にした場合、回復は理恵ギアで失敗するでしょう。この場合、パーティションの発見を有効にするためには、Flink 1.3.x でセーブポイントをまず取り、それから再び回復します。
高レベルにおいて、Flink Kafkaコンシューマは、正規表現を使ってトピック名に一致するパターンに基づくトピックを発見することもできます。例については以下を見てください:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)
val stream = env.addSource(myConsumer)
...
上の例では、指定された正規表現(test-topic-
から始まり、1つの数字で終わる)に一致する名前を持つ全てのトピックは、ジョブの実行が開始された時にコンシューマによって購読されるでしょう。
ジョブの実行が開始された後で動的に生成されたトピックがコンシューマによって発見できるように、flink.partition-discovery.interval-millis
に非負数の値を設定します。これにより、コンシューマは指定されたパターンにも一致する名前を持つ新しいトピックのパーティションを見つけることができます。
Flink Kafka コンシューマはオフセットがどのようにKafkaブローカー(あるいは 0.8ではZookeeper)にコミットされるかの挙動を設定することができます。Flink Kafka コンシューマは耐障害性の保証のためにコミットされたオフセットに依存しないことに注意してください。コミットされたオフセットは監視目的のためにコンシューマの進捗を公開するための方法にすぎません。
チェックポイントがジョブのために有効であるかどうかに依存して、オフセットのコミットの挙動を設定する方法が異なります。
チェックポイントの無効化: もしチェックポイントが無効化されると、Flink Kafka コンシューマは内部的に使用されるKafkaクライアントの自動的な定期的なオフセットのコミット機能を頼りにします。従って、オフセットのコミットを有効または無効にするには、単純にenable.auto.commit
(あるいはKafka 0.8については auto.commit.enable
) / 指定されたProperties
設定内でauto.commit.interval.ms
キーを適切な値に設定します。
Checkpointing enabled: もしチェックポイントが有効であれば、Flink Kafkaコンシューマはチェックポイントが完了した時にチェックポイントの状態に格納されたオフセットをコミットするでしょう。これはKafkaブローカー内でコミットされたオフセットがチェックポイントの状態の中のオフセットと一貫性があることを保証します。ユーザはコンシューマ上でsetCommitOffsetsOnCheckpoints(boolean)
メソッドを呼び出すことでオフセットのコミットを無効または有効にすることを選択することができます(デフォルトでは、挙動はtrue
です)。このシナリオでは、Properties
内の自動的な定期的なオフセットのコミットの設定は完全に無視されます。
多くのシナリオでレコードのタイムスタンプがレコード自身に(明示的にあるいは暗黙的に)埋め込まれます。更に、ユーザは定期的に、あるいは不規則な方法のどちらかで、ウォーターマークを発行したいと思うかもしれません。例えば、現在のイベント時間のウォーターマークを含むKafkaストリーム内の特別なレコードに基づいて。このような場合には、Flink KafkaコンシューマはAssignerWithPeriodicWatermarks
あるいは AssignerWithPunctuatedWatermarks
の仕様を許容します。
ここで説明されるような独自のタイムスタンプのエクストラクタ/ウォータマークのエミッタを指定するか、あるいは事前定義されたものから1つを使うことができます。そうした後で、それを以下の方法でコンシューマに渡すことができます:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
.addSource(myConsumer)
.print();
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
stream = env
.addSource(myConsumer)
.print
内部的には、アサイナーのインスタンスがKafkaパーティションごとに実行されます。そのようなアサイナーが指定された場合、Kafkaから読み込まれた各レコードについて、タイムスタンプをレコードに割り当てるためにextractTimestamp(T element, long previousElementTimestamp)
が呼び出され、新しいウォーターマークが発行されるべきか、そしてどのタイムスタンプと一緒に発行されるべきかを決めるために、Watermark getCurrentWatermark()
(定期的) あるいは Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)
(時々中断して) が呼ばれます。
Flinkの Kafka プロデューサは FlinkKafkaProducer08
(Kafka 0.9.0.xバージョンなどでは 09
) と呼ばれます。1つ以上のKafkaトピックへレコードのストリームを書き込むことができます。
例:
DataStream<String> stream = ...;
FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false); // "false" by default
myProducer.setFlushOnCheckpoint(true); // "false" by default
stream.addSink(myProducer);
DataStream<String> stream = ...;
FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream, // input stream
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties); // custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false); // "false" by default
myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer08[String](
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false) // "false" by default
myProducer.setFlushOnCheckpoint(true) // "false" by default
stream.addSink(myProducer)
val stream: DataStream[String] = ...
val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream, // input stream
"my-topic", // target topic
new SimpleStringSchema, // serialization schema
properties) // custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false) // "false" by default
myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
上の例は1つのKafkaの目的トピックにストリームを書き込むためにFlink Kafkaプロデューサを作成する基本的な方法を説明します。もっと進んだ使い方については、以下を提供することができる他のコンストラクタの変異種があります:
KafkaProducer
のための独自のプロパティ設定を提供することができます。Kafkaプロデューサを設定する方法についての詳細はApache Kafka ドキュメントを参照してください。FlinkKafkaPartitioner
の実装を提供することができます。このパーティショナーはレコードが送信されなければならない目的のトピックの正確なパーティションを決定するために、ストリーム内の各レコードについて呼び出されるでしょう。KeyedSerializationSchema
と呼ばれる進んだシリアライザ化スキーマを使うことができます。これによりキーと値をそれぞれシリアライズ化することができます。1つのプロデューサのインスタンスがデータを複数のトピックに送信できるように、目的のトピックを上書きすることもできます。0.9より前のKafkaは少なくとも1回あるいは確実に1回のセマンティクスを保証するための仕組みを提供しませんでした。
Flinkのチェックポイントを有効にすることで、FlinkKafkaProducer09
とFlinkKafkaProducer010
は少なくとも1回の配送保証を提供することができます。
Flinkのチェックポイントに加えて、前の章の例で見られるように、セッター メソッドsetLogFailuresOnly(boolean)
とsetFlushOnCheckpoint(boolean)
を適切に設定する必要もあります:
setLogFailuresOnly(boolean)
: これを有効にすると、プロデューサは障害をキャッチおよび再び投げるだけの代わりに、記録するでしょう。これは、もし目的のKafkaトピックに一度も書き込まれなかったとしても、原則的にレコードが成功したと見なします。これは少なくとも1回のためには無効にされるべきです。setFlushOnCheckpoint(boolean)
: これを有効にすると、Flinkのチェックポイントはチェックポイントを成功する前にKafkaによって通知されるチェックポイントの時間のその場でのレコードを待つでしょう。これは全てのレコードがチェックポイントの前にKafkaに書き込まれることを保証します。これは少なくとも1回のために有効にされるべきです。注意: デフォルトでは、再試行の数は “0” に設定されます。このことは、setLogFailuresOnly
がfalse
に設定された場合、プロデューサはリーダーの変更を伴ってエラー時にすぐに失敗することを意味します。デフォルトで値が “0” に設定されることにより、再試行によって目的のトピック内でメッセージが複製されることを防ぎます。ブローカーの変更がたびたび起こるほとんどのプロダクション環境については、再試行の数を大きな値にすることをお勧めします。
注意: 現在のところKafkaのためのトランザクションなプロデューサは無いため、FlinkはKafkaトピックへの確実に1回の配送の保証をすることができません。
Flinkのチェックポイントを有効にすることで、FlinkKafkaProducer011
は確実に1回の配送保証を提供することができます。
Flinkのチェックポイントの有効化に加えて、適切なsemantic
パラメータを FlinkKafkaProducer011
に渡すことで選択される操作の異なる3つのモードを選択することもできます:
Semantic.NONE
: Flinkは何も保証しないでしょう。生成されるレコードは紛失されるかあるいは重複されるかもしれません。Semantic.AT_LEAST_ONCE
(デフォルトの設定): FlinkKafkaProducer010
でのsetFlushOnCheckpoint(true)
に似ています。これはレコードが紛失されない(しかしそれらは重複するかもしれない)ことを保証します。Semantic.EXACTLY_ONCE
: 確実に1回のセマンティックを提供するためにKafkaのトランザクションを使用します。トランザクションを使ってKafkaに書き込む時はいつでも、Kafkaからレコードを消費する全てのアプリケーションについて望ましい isolation.level
(read_committed
あるいは read_uncommitted
- 後者はデフォルト値です) を設定することを忘れないでください。Semantic.EXACTLY_ONCE
モードは、上述のチェックポイントから回復した後で、チェックポイントを取る前に開始されたトランザクションをコミットする機能を頼りにします。Flinkのアプリケーションキャッシュと完了した再起動の期間がKafkaのトランザクションのタイムアウトより大きい場合は、データが紛失されるでしょう(Kafkaはタイムアウト時間を超過したトランザクションを自動的に中断するでしょう)。これを覚えておき、期待するダウンタイムに適切なトランザクションタイムアウトを設定してください。
Kafka のブローカーはデフォルトでtransaction.max.timeout.ms
を15分に設定します。このプロパティはその値より大きなトランザクションタイムアウトをプロデューサについて設定することができないでしょう。FlinkKafkaProducer011
はデフォルトでプロデューサの設定内の transaction.timeout.ms
プロパティを1時間に設定します。従って、transaction.max.timeout.ms
はSemantic.EXACTLY_ONCE
モードを使う前に増やす必要があります。
KafkaConsumer
のread_committed
モードでは、完了していない(中止でもなく、完了でもない)全てのトランザクションは指定されたKafkaトピックの以前の未完了のトランザクションからの全ての読み込みをブロックするでしょう。つまり以下のイベントのシーケンスの後:
transaction1
を開始し、それを使ってなんらかのレコードを書き込むtransaction2
を開始し、それを使って更になんらかのレコードを書き込むtransaction2
をコミットするtransaction2
からのレコードがすでにコミットされていた場合でも、transaction1
がコミットされるか中止されるまで、それらはコンシューマに見えないでしょう。これには2つの実装があります:
注意: Semantic.EXACTLY_ONCE
モードは各FlinkKafkaProducer011
インスタンス毎にKafkaProducerの固定サイズのプールを使用します。これらのプロデューサのそれぞれは1つのチェックポイント毎に使われます。同時に起こるチェックポイントの数がプールサイズを超えた場合、FlinkKafkaProducer011
は例外を投げ、アプリケーション全体を失敗するでしょう。そのため、最大のプールサイズと最大数の同時接続のチェックポイントの数を設定してください。
注意: Semantic.EXACTLY_ONCE
はコンシューマのKafkaトピックからの読み込みを必要以上にブロックするかもしれない長引いている全てのトランザクションを残さないように可能な全ての手を打ちます。しかし、最初のチェックポイントの前のFlinkアプリケーションの障害時には、そのようなアプリケーションを再起動した後で、システム内には以前のプールサイズについての情報が何もありません。従って、最初のチェックポイントが完了する前にFlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR
より大きな要素によってFlinkのアプリケーションをスケールダウンすることは安全ではありません。
Apache Kafka 0.10+ から、Kafkaのメッセージは、イベントが起こった時間(Apache Flink の “event time”を見てください)あるいはメッセージがKafka ブローカーに書き込まれた時間を示す、timestampsを伝えることができます。
FlinkKafkaConsumer010
は、もしFlink内のタイムスタンプの特徴がTimeCharacteristic.EventTime
(StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
) に設定されている場合、タイムスタンプを付けてレコードを発行するでしょう。
Kafkaのコンシューマはウォーターマークを発行しません。ウォーターマークを発行するには、assignTimestampsAndWatermarks
メソッドを使って上の“Kafka コンシューマとタイムスタンプの抽出/ウォーターマークの発行” で説明されたものと同じ仕組みが適用可能です。
Kafkaからのタイムスタンプを使う時にタイムスタンプのエクストラクタを定義する必要はありません。extractTimestamp()
メソッドのpreviousElementTimestamp
引数はKafkaメッセージによって運ばれるタイムスタンプを含みます。
Kafkaコンシューマのためのタイムスタンプ エクストラクタはこのように見えるでしょう:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
FlinkKafkaProducer010
は、setWriteTimestampToKafka(true)
が設定された場合、タイムスタンプのレコードのみを発行します。
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);
Flinkの Kafka コネクタは、コネクタの挙動を解析するためにFlinkのメトリクス システムを使って幾つかのメトリクスを提供します。プロデューサはFlinkのメトリック システムを使ってサポートされる全てのバージョンについて、Kafkaの内部メトリクスを出力します。コンシューマはKafkaバージョン 0.9から全てのメトリクスを出力します。Kafkaのドキュメントはドキュメントの中で全ての出力されるメトリクスをリスト化します。
これらのメトリクスに加えて、全てのコンシューマは各トピックのパーティションについて現在のオフセット
と コミットされたオフセット
を出力します。現在のオフセット
はパーティション内の現在のオフセットを参照します。これは取り出しおよび発行に成功した最後の要素のオフセットを参照します。コミットされたオフセット
は最後にコミットされたオフセットです。
FlinkでのKafka のコンシューマはオフセットを Zookeeper (Kafka 0.8) あるいは Kafka ブローカー (Kafka 0.9+) に返します。チェックポイントが無効の場合、オフセットは定期的にコミットされます。一旦ストリーミング トポロジ内の全てのオペレータがそれらの状態のチェックポイントを作成したと確信すると、チェックポイントを使ってコミットが起こります。これはユーザにZookeeperあるいはブローカーにコミットされたオフセットについての少なくとも1回のセマンティクスを提供します。Flinkにチェックポイントされたオフセットについては、システムは確実に1回の保証を提供します。
ZKあるいはブローカーにコミットされたオフセットはKafkaコンシューマの読み込みの進捗を追跡するために使うこともできます。各パーティション内でのコミットされたオフセットと最も最近のオフセットの違いはconsumer lagと呼ばれます。Flinkのトポロジが新しいデータが追加されるよりもトピックからの消費が遅い場合は、遅延が増え、コンシューマは遅延するでしょう。大規模なプロダクション配備については、レイテンシが増加することを防ぐためにメトリックを監視することをお勧めします。
Flink はKerberosのために設定されたKafkaインストレーションへの認証のために、Kafkaコネクタを使ってファーストクラスのサポートを提供します。KafkaがKerberos認証を有効にするように、以下のようにしてflink-conf.yaml
内でFlinkを単純に設定します:
security.kerberos.login.use-ticket-cache
: デフォルトでは、これはtrue
で、Flinkは kinit
.によって管理されるチケットキャッシュ内でKerberos証明書を使おうとするでしょう。YARN上に配備されたFlinkジョブ内のKafkaコネクタを使う場合は、チケットキャッシュを使ったKerberos認証は動作しないだろうことに注意してください。チケットキャッシュを使った認証はMesos配備についてサポートされないため、これはMesosを使った場合にも同じです。security.kerberos.login.keytab
と security.kerberos.login.principal
: 代わりにKerberos keytabを使うには、これらのプロパティの両方の値を設定します。KafkaClient
を security.kerberos.login.contexts
に追加します: これはFlinkにKafka認証に使われるKafkaログイン コンテキストに設定されたKerberos証明書を提供するように伝えます。一度KerberosベースのFlinkセキュリティが有効になると、内部的なKafkaクライアントに渡される指定されたプロパティ設定内で以下の2つの設定を単純に含めることで、Flink Kafkaコンシューマあるいはプロデューサのどちらかを使ってKafkaを認証することができます。
security.protocol
を SASL_PLAINTEXT
(デフォルト NONE
)に設定します: Kafkaブローカーと通信するために使われるプロトコル。スタンドアローンのFlink配備を使う場合、SASL_SSL
を使うこともできます; SSLのためにKafkaクライアントをどうやって設定するかはここを見てくださいsasl.kerberos.service.name
を kafka
(デフォルト kafka
) に設定します: これの値はKafkaブローカー設定に使われるsasl.kerberos.service.name
に一致しなければなりません。クライアントとサーバ設定の間のサービス名の不一致は認証を失敗させるでしょう。KerberosセキュリティのためのFlinkの設定についての詳しい情報はここを見てください。Flinkがどのように内部的にKerberosベースのセキュリティをセットアップするかについての更なる詳細はここで 見つけることもできます。