Apache Kafka コネクタ

このコネクタはApache Kafkaによって提供されるイベントストリームへのアクセスを提供します。

FlinkはデータをKafkaトピックから/への読み込みおよび書き込みのための特別なKafkaコネクタを提供します。Flink Kafka コンシューマは確実に1回の処理のセマンティクスを提供するためにFlinkのチェックポイント機構を統合します。これを行うために、FlinkはKafkaのコンシューマグループのオフセットトラッキングに完全に依存しているわけではありませんが、内部的にもこれらのオフセットを追跡およびチェックポイントを行います。

ユースケースと環境のために、パッケージ(maven artifact id)とクラス名を選択してください。ほとんどのユーザについては、FlinkKafkaConsumer08 (flink-connector-kafkaの一部) が適切でしょう。

Maven 依存 以下からサポートされています コンシューマと
プロデューサ クラス名
Kafka のバージョン 備考
flink-connector-kafka-0.8_2.11 1.0.0 FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x KafkaのSimpleConsumer API を内部的に使います。オフセットはFlinkによって ZK にコミットされます。
flink-connector-kafka-0.9_2.11 1.0.0 FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x 新しいコンシューマ API Kafka を使います。
flink-connector-kafka-0.10_2.11 1.2.0 FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x このコネクタは生成と消費の両方のために、タイムスタンプを持つKafkaメッセージをサポートします。
flink-connector-kafka-0.11_2.11 1.4.0 FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x 0.11.xからKafkaはscala 2.10をサポートしません。このコネクタはプロデューサのための確実に1回のセマンティックを提供するためのKafka トランザクション メッセージをサポートします。

そして、mavenプロジェクトにコネクタをインポートします:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
  <version>1.6-SNAPSHOT</version>
</dependency>

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます

Apache Kafkaのインストール

  • コードをダウンロードしサーバを起動する(アプリケーションを開始する前に毎回ZookeeperとKafkaサーバを起動する必要があります)ために、Kafkaのクイックスタート の説明に従います。
  • Kafka および Zookeeper サーバがリモートマシーン上で動いている場合、config/server.propertiesファイル内のadvertised.host.name 設定がマシーンのIPアドレスに設定されている必要があります。

Kafka コンシューマ

FlinkのKafka コンシューマは FlinkKafkaConsumer08 (あるいはKafka 0.9.0.xバージョンについては 09 など) と呼ばれます。1つ以上のKafkaトピックへのアクセスを提供します。

コンストラクタは以下の引数を受け付けます:

  1. トピック名 / トピック名のリスト
  2. Kafkaからのデータをデシリアライズするための DeserializationSchema / KeyedDeserializationSchema
  3. Kafkaコンシューマに関するプロパティ。以下のプロパティが必要です:
    • “bootstrap.servers” (Kafkaブローカーのカンマ区切りのリスト)
    • “zookeeper.connect” (Zookeeper サーバのカンマ区切りのリスト) (Kafka 0.8 でのみ必要です)
    • “group.id” コンシューマグループのid

例:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
	.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
    .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
    .print()

DeserializationSchema

Flink Kafka コンシューマはKafka内のバイナリデータを Java/Scala オブジェクトに変換する方法を知る必要があります。DeserializationSchema によりユーザはそのような方法を指定することができます。Kafkaから値を渡す時に、T deserialize(byte[] message) メソッドは各Kafkaメッセージごとに呼ばれます。

通常はAbstractDeserializationSchemaから始めるのが役に立ちます。これは 生成された Java/Scala 型の Flink の型システムに説明する面倒をみます。vanilla DeserializationSchema を実装するユーザはgetProducedType(...) メソッド自身を実装する必要があります。

Kafkaメッセージのキーおよび値の両方にアクセスするために、KeyedDeserializationSchema は以下のデシリアライズ メソッド` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)` を持ちます。

利便性のために、Flinkは以下のスキーマを提供します:

  1. TypeInformationSerializationSchema (および TypeInformationKeyValueSerializationSchema) はFlinkのTypeInformationに基づいたスキーマを生成します。これはデータがFlinkによって書き込みおよび読み込みの両方をされる場合に有用です。このスキーマは他の一般的なシリアライズ化の方法に代わる効率的なFlink固有のものです。

  2. JsonDeserializationSchema (および JSONKeyValueDeserializationSchema) はシリアライズ化された JSON を ObjectNode オブジェクトに変換します。これは objectNode.get(“field”).as(Int/String/…)() を使ってアクセスすることができます。KeyValue objectNode は 全てのフィールドを含む “key” および “value” フィールドと、このメッセージについてのオフセット/パーティション/トピックを公開する任意の “metadata” フィールドを含みます。

何らかの理由でデシリアライズできない不正なメッセージに遭遇した場合、2つの選択肢があります - ジョブを失敗し再起動させるだろうdeserialize(...)メソッドから例外を投げるか、Flink Kafka コンシューマが不正なメッセージを静かにスキップできるように null を返すかのどちらかです。コンシューマの耐障害性(詳細は下の章を見てください)のために、不正メッセージのジョブの障害はコンシューマに再びメッセージをデシリアライズをさせようとするでしょう。従って、もしデシリアライズ化がまだ失敗する場合、コンシューマは非停止の再起動に陥り、その不正なメッセージで失敗し続けるでしょう。

Kafka コンシューマのスタート位置の設定

Flink Kafkaコンシューマにより、Kafkaパーティションの開始位置がどのように決定されるかを設定することができます。

例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

val stream = env.addSource(myConsumer)
...

全てのバージョンの Flink Kafka コンシューマは開始位置についての上の明示的な設定メソッドを持ちます。

  • setStartFromGroupOffsets (デフォルトの挙動): Kafkaブローカー(あるいはKafka 0.8については Zookeeper)内のコンシューマグループ (コンシューマプロパティ内のgroup.id 設定) のコミットされたオフセットからパーティションを読み込みます。もしパーティションについてのオフセットが見つけることができない場合は、パーティション内のauto.offset.reset が使われるでしょう。
  • setStartFromEarliest() / setStartFromLatest(): 最も早い/最新のレコードから開始します。これらのモード下では、Kafka内でコミットされたオフセットは無視され、開始位置としては使われないでしょう。
  • setStartFromTimestamp(long): 指定されたタイムスタンプから開始します。各パーティションについて、タイムスタンプが指定されたタイムスタンプ以上のレコードが開始位置として使われるでしょう。もしパーティションの最新レコードがタイムスタンプより前であれば、パーティションは単純に最新のレコードから読み込まれるでしょう。このモードでは、Kafka内でコミットされたオフセットは無視され、開始位置としては使われないでしょう。

各パーティションについてコンシューマが開始しなければならない性格なオフセットを指定することができます:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)

myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

上の例はトピックmyTopicのパーティション 0,1,2 について、コンシューマが指定されたオフセットから開始するように設定します。オフセットの値はコンシューマが各パーティションについて読み込まなければならない次のレコードでなければなりません。もしコンシューマが提供されたオフセットマップ内の指定のオフセットを持たないパーティションを読み込む必要がある場合は、特定のパーティションについてのデフォルトのグループオフセットの挙動(つまり setStartFromGroupOffsets()) に戻るだろうことに注意してください。

これら開始位置の設定メソッドは、障害時に自動的にジョブが回復される場合やセーブポイントを使って手動で回復される場合には、開始位置に影響しないことに注意してください。回復時には、各Kafkaパーティションの開始位置はセーブポイントあるいはチェックポイント内に格納されたオフセットから決定されます (コンシューマのための耐障害性を有効にするためのチェックポイントについての情報は次の章を見てください)。

Kafka コンシューマと耐障害性

Flinkのチェックポイントを有効にしながら、FlinkのKafkaコンシューマはトピックからのレコードを消費し、定期的に全てのKafkaのオフセットを他のオペレーションの状態と一緒に矛盾の無い方法でチェックポイントするでしょう。ジョブの障害時の場合は、Flinkはストリーミングプログラムを最新のチェックポイントの状態へ回復し、チェックポイント内に格納されたオフセットから開始してKafkaからのレコードを再消費するでしょう。

従ってチェックポイントの取り出しの間隔は、障害時にプログラムが最大でどれだけ戻らなければならないかもしれないかを定義します。

Kafkaコンシューマの耐障害性を使うために、実行環境でトポロジのチェックポイントが有効にされなければなりません。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

もしトポロジを再起動するために十分な処理スロットが利用可能な場合は、Flinkはトポロジを再起動だけすることができることにも注意してください。つまり、もしTaskManagerの喪失のためにトポロジが失敗する場合は、後で利用可能な十分のスロットがそれでもまだなければなりません。YARN上のFlinkは喪失したYARNコンテナの自動的な再起動をサポートします。

チェックポイントが有効ではない場合は、Kafkaコンシューマは定期的にZooKeeperにオフセットをコミットするでしょう。

Kafka コンシューマのトピックとパーティションの捜索

パーティションの発見

Flink Kafka コンシューマは動的に生成されたKafkaパーティションの発見をサポートし、確実に1回の保証でそれらを消費します。パーティションのメタデータの初期検索(つまり、ジョブが実行を始めた時)の後で発見された全てのパーティションは最も早い可能なオフセットから消費されるでしょう。

デフォルトでは、パーティションの発見は無効です。有効にするには、指定されたプロパティ設定の中でミリ秒単位の発見間隔を表すflink.partition-discovery.interval-millisに非負の値を設定します。

制限 コンシューマがFlink 1.3.x より前のFlinkバージョンからセーブポイントを回復する場合、パーティションの発見は回復の実行上で有効にすることができません。有効にした場合、回復は理恵ギアで失敗するでしょう。この場合、パーティションの発見を有効にするためには、Flink 1.3.x でセーブポイントをまず取り、それから再び回復します。

トピックの発見

高レベルにおいて、Flink Kafkaコンシューマは、正規表現を使ってトピック名に一致するパターンに基づくトピックを発見することもできます。例については以下を見てください:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("test-topic-[0-9]"),
  new SimpleStringSchema,
  properties)

val stream = env.addSource(myConsumer)
...

上の例では、指定された正規表現(test-topic- から始まり、1つの数字で終わる)に一致する名前を持つ全てのトピックは、ジョブの実行が開始された時にコンシューマによって購読されるでしょう。

ジョブの実行が開始された後で動的に生成されたトピックがコンシューマによって発見できるように、flink.partition-discovery.interval-millis に非負数の値を設定します。これにより、コンシューマは指定されたパターンにも一致する名前を持つ新しいトピックのパーティションを見つけることができます。

Kafka コンシューマのオフセットのコミット挙動の設定

Flink Kafka コンシューマはオフセットがどのようにKafkaブローカー(あるいは 0.8ではZookeeper)にコミットされるかの挙動を設定することができます。Flink Kafka コンシューマは耐障害性の保証のためにコミットされたオフセットに依存しないことに注意してください。コミットされたオフセットは監視目的のためにコンシューマの進捗を公開するための方法にすぎません。

チェックポイントがジョブのために有効であるかどうかに依存して、オフセットのコミットの挙動を設定する方法が異なります。

  • チェックポイントの無効化: もしチェックポイントが無効化されると、Flink Kafka コンシューマは内部的に使用されるKafkaクライアントの自動的な定期的なオフセットのコミット機能を頼りにします。従って、オフセットのコミットを有効または無効にするには、単純にenable.auto.commit (あるいはKafka 0.8については auto.commit.enable) / 指定されたProperties設定内でauto.commit.interval.ms キーを適切な値に設定します。

  • Checkpointing enabled: もしチェックポイントが有効であれば、Flink Kafkaコンシューマはチェックポイントが完了した時にチェックポイントの状態に格納されたオフセットをコミットするでしょう。これはKafkaブローカー内でコミットされたオフセットがチェックポイントの状態の中のオフセットと一貫性があることを保証します。ユーザはコンシューマ上でsetCommitOffsetsOnCheckpoints(boolean)メソッドを呼び出すことでオフセットのコミットを無効または有効にすることを選択することができます(デフォルトでは、挙動はtrueです)。このシナリオでは、Properties内の自動的な定期的なオフセットのコミットの設定は完全に無視されます。

Kafka コンシューマとタイムスタンプの抽出/ウォータマークの発行

多くのシナリオでレコードのタイムスタンプがレコード自身に(明示的にあるいは暗黙的に)埋め込まれます。更に、ユーザは定期的に、あるいは不規則な方法のどちらかで、ウォーターマークを発行したいと思うかもしれません。例えば、現在のイベント時間のウォーターマークを含むKafkaストリーム内の特別なレコードに基づいて。このような場合には、Flink KafkaコンシューマはAssignerWithPeriodicWatermarks あるいは AssignerWithPunctuatedWatermarks の仕様を許容します。

ここで説明されるような独自のタイムスタンプのエクストラクタ/ウォータマークのエミッタを指定するか、あるいは事前定義されたものから1つを使うことができます。そうした後で、それを以下の方法でコンシューマに渡すことができます:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

DataStream<String> stream = env
	.addSource(myConsumer)
	.print();
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
    .addSource(myConsumer)
    .print()

内部的には、アサイナーのインスタンスがKafkaパーティションごとに実行されます。そのようなアサイナーが指定された場合、Kafkaから読み込まれた各レコードについて、タイムスタンプをレコードに割り当てるためにextractTimestamp(T element, long previousElementTimestamp) が呼び出され、新しいウォーターマークが発行されるべきか、そしてどのタイムスタンプと一緒に発行されるべきかを決めるために、Watermark getCurrentWatermark() (定期的) あるいは Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) (時々中断して) が呼ばれます。

注意: もしウォーターマーク アサイナがウォーターマークを前に進めるためにKafkaから読み込まれたレコードに依存する場合(これはよくあることです)、全てのトピックとパーティションはレコードの連続するストリームを持つ必要があります。そうでなければ、アプリケーション全体のウォーターマークは前に進むことができず、時間ウィンドウあるいはタイマーを持つ関数のような全ての時間ベースの操作は進むことができません。1つの動いていないKafkaパーティションがこの挙動を引き起こします:Flinkの改善はこれが起きることを防ぐことを計画しています (FLINK-5479 を見てください: FlinkKafkaConsumer内のパーティションごとのウォーターマークは不活性のパーティションと見なさなければなりません)。その間の可能な代替策はハートビート メッセージを不活性のパーティションのウォーターマークを進める全ての消費されたパーティションに送信することです。

Kafka プロデューサ

Flinkの Kafka プロデューサはFlinkKafkaProducer011と呼ばれます (あるいは Kafka 0.10.0.xバージョンについては010など)。1つ以上のKafkaトピックへレコードのストリームを書き込むことができます。

例:

DataStream<String> stream = ...;

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);

stream.addSink(myProducer);
val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer011[String](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new SimpleStringSchema)   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)

stream.addSink(myProducer)

上の例は1つのKafkaの目的トピックにストリームを書き込むためにFlink Kafkaプロデューサを作成する基本的な方法を説明します。もっと進んだ使い方については、以下を提供することができる他のコンストラクタの変異種があります:

  • 独自のプロパティの提供: プロデューサは内部的なKafkaProducerのための独自のプロパティ設定を提供することができます。Kafkaプロデューサを設定する方法についての詳細はApache Kafka ドキュメントを参照してください。
  • 独自のパーティショナー: レコードを特定のパーティションに割り当てるために、コンストラクタにFlinkKafkaPartitionerの実装を提供することができます。このパーティショナーはレコードが送信されなければならない目的のトピックの正確なパーティションを決定するために、ストリーム内の各レコードについて呼び出されるでしょう。詳細はKafka プロデューサのパーティションのスキーマを見てください。
  • 更に進んだシリアライズ化スキーマ: コンシューマと似て、プロデューサもKeyedSerializationSchemaと呼ばれる進んだシリアライザ化スキーマを使うことができます。これによりキーと値をそれぞれシリアライズ化することができます。1つのプロデューサのインスタンスがデータを複数のトピックに送信できるように、目的のトピックを上書きすることもできます。

Kafka Producer Partitioning Scheme

デフォルトでは、もし独自のパーティションがFlinkのKafkaプロデューサを指定しない場合は、プロデューサは各Flink Kafkaプロデューサの並行サブタスクを1つのKafkaパーティションにマップするFlinkFixedPartitioner を使うでしょう (つまりシンクのサブタスクによって受け取られる全てのレコードは同じKafkaパーティションになるでしょう)。

独自のパーティションはFlinkKafkaPartitioner クラスを拡張することで実装することができます。全てのKafkaバージョンのコンストラクタはプロデューサをインスタンス化する時に独自のパーティショナーを提供することができます。パーティショナーの実装ははFlinkのノードに渡って転送されるだろうため、シリアライズ可能でなければならないことに注意してください。また、パーティショナーはプロデューサのチェックポイントされた状態の一部ではないため、パーティショナー内のどのような状態もジョブの失敗時には失われるだろうことを忘れないでください。

It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition the written records by their attached key (as determined for each record using the provided serialization schema). これを行うには、プロデューサをインスタンス化する時に独自のnullパーティショナーを渡してください。独自のパーティショナーとしてnullを渡すことが重要です; 上で説明されたように、もし独自のパーティショナーが指定されない場合、FlinkFixedPartitionerが代わりに使われます。

Kafka プロデューサと耐障害性

Kafka 0.8

0.9より前のKafkaは少なくとも1回あるいは確実に1回のセマンティクスを保証するための仕組みを提供しませんでした。

Kafka 0.9 と 0.10

Flinkのチェックポイントを有効にすることで、FlinkKafkaProducer09FlinkKafkaProducer010 は少なくとも1回の配送保証を提供することができます。

Flinkのチェックポイントを有効にすることに加えて、セッター メソッド setLogFailuresOnly(boolean)setFlushOnCheckpoint(boolean)を適切に設定する必要もあります。

  • setLogFailuresOnly(boolean): デフォルトで、これはfalseに設定されます。これを有効にするとプロデューサは障害を捕らえて再び投げる代わりに、それらの記録だけをするでしょう。これは、もし目的のKafkaトピックに一度も書き込まれなかったとしても、原則的にレコードが成功したと見なします。これは少なくとも1回のためには無効にされるべきです。
  • setFlushOnCheckpoint(boolean): デフォルトで、これはtrueに設定されます。これを有効にすると、Flinkのチェックポイントはチェックポイントを成功する前にKafkaによって通知されるチェックポイントの時間のその場でのレコードを待つでしょう。これは全てのレコードがチェックポイントの前にKafkaに書き込まれることを保証します。これは少なくとも1回のために有効にされるべきです。

結論として、setLogFailureOnlyfalse に設定し setFlushOnCheckpointtrueに設定することでバージョン 0.9と0.10について、Kakfaのプロデューサはデフォルトで少なくとも1回の保証を持ちます。

注意: デフォルトでは、再試行の数は “0” に設定されます。このことは、setLogFailuresOnlyfalseに設定された場合、プロデューサはリーダーの変更を伴ってエラー時にすぐに失敗することを意味します。デフォルトで値が “0” に設定されることにより、再試行によって目的のトピック内でメッセージが複製されることを防ぎます。ブローカーの変更がたびたび起こるほとんどのプロダクション環境については、再試行の数を大きな値にすることをお勧めします。

注意: 現在のところKafkaのためのトランザクションなプロデューサは無いため、FlinkはKafkaトピックへの確実に1回の配送の保証をすることができません。

注意: Kafkaの設定に依存して、たとえKafkaが書き込みを通知した後でもデータの喪失を経験するかもしれません。特に、以下のKafkaの設定を忘れないでください:
  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*
上のオプションのためのデフォルトの値は容易にデータの紛失に繋がります。詳細な説明についてはKafkaドキュメントを参照してください。

Kafka 0.11

Flinkのチェックポイントを有効にすることで、FlinkKafkaProducer011 は確実に1回の配送保証を提供することができます。

Flinkのチェックポイントの有効化に加えて、適切なsemantic パラメータを FlinkKafkaProducer011に渡すことで選択される操作の異なる3つのモードを選択することもできます:

  • Semantic.NONE: Flinkは何も保証しないでしょう。生成されるレコードは紛失されるかあるいは重複されるかもしれません。
  • Semantic.AT_LEAST_ONCE (デフォルトの設定): FlinkKafkaProducer010でのsetFlushOnCheckpoint(true) に似ています。これはレコードが紛失されない(しかしそれらは重複するかもしれない)ことを保証します。
  • Semantic.EXACTLY_ONCE: 確実に1回のセマンティックを提供するためにKafkaのトランザクションを使用します。トランザクションを使ってKafkaに書き込む時はいつでも、Kafkaからレコードを消費する全てのアプリケーションについて望ましい isolation.level (read_committed あるいは read_uncommitted - 後者はデフォルト値です) を設定することを忘れないでください。
注意: Kafkaの設定に依存して、たとえKafkaが書き込みを通知した後でもデータの喪失を経験するかもしれません。特に、Kafka設定の以下のプロパティを忘れないでください:
  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*
上のオプションのためのデフォルトの値は容易にデータの紛失に繋がります。詳細な説明についてはKafkaドキュメントを参照してください。
警告

Semantic.EXACTLY_ONCE モードは、上述のチェックポイントから回復した後で、チェックポイントを取る前に開始されたトランザクションをコミットする機能を頼りにします。Flinkのアプリケーションキャッシュと完了した再起動の期間がKafkaのトランザクションのタイムアウトより大きい場合は、データが紛失されるでしょう(Kafkaはタイムアウト時間を超過したトランザクションを自動的に中断するでしょう)。これを覚えておき、期待するダウンタイムに適切なトランザクションタイムアウトを設定してください。

Kafka のブローカーはデフォルトでtransaction.max.timeout.ms を15分に設定します。このプロパティはその値より大きなトランザクションタイムアウトをプロデューサについて設定することができないでしょう。FlinkKafkaProducer011 はデフォルトでプロデューサの設定内の transaction.timeout.ms プロパティを1時間に設定します。従って、transaction.max.timeout.msSemantic.EXACTLY_ONCEモードを使う前に増やす必要があります。

KafkaConsumerread_committed モードでは、完了していない(中止でもなく、完了でもない)全てのトランザクションは指定されたKafkaトピックの以前の未完了のトランザクションからの全ての読み込みをブロックするでしょう。つまり以下のイベントのシーケンスの後:

  1. ユーザがtransaction1 を開始し、それを使ってなんらかのレコードを書き込む
  2. ユーザが transaction2を開始し、それを使って更になんらかのレコードを書き込む
  3. ユーザがtransaction2をコミットする

transaction2 からのレコードがすでにコミットされていた場合でも、transaction1 がコミットされるか中止されるまで、それらはコンシューマに見えないでしょう。これには2つの実装があります:

  • まず第一に、通常のFlinkアプリケーションの動作の間、完了したチェックポイント間の平均時間と同様に、ユーザはKafkaトピックへ生成されたレコードの可視化に遅延があるかもしれません。
  • 次に、Flinkアプリケーションの障害時には、このアプリケーションが書き込むトピックは、アプリケーションが再起動あるいは設定されたトランザクションタイムアウトの時間が過ぎるまで、リーダーはブロックされるでしょう。このことは、同じKafkaトピックへ書き込んでいる複数のエーゲント/アプリケーションがある場合にのみ適用されます。

注意: Semantic.EXACTLY_ONCE モードは各FlinkKafkaProducer011インスタンス毎にKafkaProducerの固定サイズのプールを使用します。これらのプロデューサのそれぞれは1つのチェックポイント毎に使われます。同時に起こるチェックポイントの数がプールサイズを超えた場合、FlinkKafkaProducer011 は例外を投げ、アプリケーション全体を失敗するでしょう。そのため、最大のプールサイズと最大数の同時接続のチェックポイントの数を設定してください。

注意: Semantic.EXACTLY_ONCE はコンシューマのKafkaトピックからの読み込みを必要以上にブロックするかもしれない長引いている全てのトランザクションを残さないように可能な全ての手を打ちます。しかし、最初のチェックポイントの前のFlinkアプリケーションの障害時には、そのようなアプリケーションを再起動した後で、システム内には以前のプールサイズについての情報が何もありません。従って、最初のチェックポイントが完了する前にFlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTORより大きな要素によってFlinkのアプリケーションをスケールダウンすることは安全ではありません。

Apache Kafka 0.10+ から、Kafkaのメッセージは、イベントが起こった時間(Apache Flink の “event time”を見てください)あるいはメッセージがKafka ブローカーに書き込まれた時間を示す、timestampsを伝えることができます。

FlinkKafkaConsumer010 は、もしFlink内のタイムスタンプの特徴がTimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)) に設定されている場合、タイムスタンプを付けてレコードを発行するでしょう。

Kafkaのコンシューマはウォーターマークを発行しません。ウォーターマークを発行するには、assignTimestampsAndWatermarksメソッドを使って上の“Kafka コンシューマとタイムスタンプの抽出/ウォーターマークの発行” で説明されたものと同じ仕組みが適用可能です。

Kafkaからのタイムスタンプを使う時にタイムスタンプのエクストラクタを定義する必要はありません。extractTimestamp()メソッドのpreviousElementTimestamp引数はKafkaメッセージによって運ばれるタイムスタンプを含みます。

Kafkaコンシューマのためのタイムスタンプ エクストラクタはこのように見えるでしょう:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

FlinkKafkaProducer010 は、setWriteTimestampToKafka(true)が設定された場合、タイムスタンプのレコードのみを発行します。

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);

Kafka コネクタのメトリックス

Flinkの Kafka コネクタは、コネクタの挙動を解析するためにFlinkのメトリクス システムを使って幾つかのメトリクスを提供します。プロデューサはFlinkのメトリック システムを使ってサポートされる全てのバージョンについて、Kafkaの内部メトリクスを出力します。コンシューマはKafkaバージョン 0.9から全てのメトリクスを出力します。Kafkaのドキュメントはドキュメントの中で全ての出力されるメトリクスをリスト化します。

これらのメトリクスに加えて、全てのコンシューマは各トピックのパーティションについて現在のオフセットコミットされたオフセット を出力します。現在のオフセット はパーティション内の現在のオフセットを参照します。これは取り出しおよび発行に成功した最後の要素のオフセットを参照します。コミットされたオフセット は最後にコミットされたオフセットです。

FlinkでのKafka のコンシューマはオフセットを Zookeeper (Kafka 0.8) あるいは Kafka ブローカー (Kafka 0.9+) に返します。チェックポイントが無効の場合、オフセットは定期的にコミットされます。一旦ストリーミング トポロジ内の全てのオペレータがそれらの状態のチェックポイントを作成したと確信すると、チェックポイントを使ってコミットが起こります。これはユーザにZookeeperあるいはブローカーにコミットされたオフセットについての少なくとも1回のセマンティクスを提供します。Flinkにチェックポイントされたオフセットについては、システムは確実に1回の保証を提供します。

ZKあるいはブローカーにコミットされたオフセットはKafkaコンシューマの読み込みの進捗を追跡するために使うこともできます。各パーティション内でのコミットされたオフセットと最も最近のオフセットの違いはconsumer lagと呼ばれます。Flinkのトポロジが新しいデータが追加されるよりもトピックからの消費が遅い場合は、遅延が増え、コンシューマは遅延するでしょう。大規模なプロダクション配備については、レイテンシが増加することを防ぐためにメトリックを監視することをお勧めします。

Kerberos 認証の有効化 (バージョン 0.9+以上のみ)

Flink はKerberosのために設定されたKafkaインストレーションへの認証のために、Kafkaコネクタを使ってファーストクラスのサポートを提供します。KafkaがKerberos認証を有効にするように、以下のようにしてflink-conf.yaml内でFlinkを単純に設定します:

  1. 以下を設定することでKerberos証明書を設定します -
    • security.kerberos.login.use-ticket-cache: デフォルトでは、これはtrueで、Flinkは kinit.によって管理されるチケットキャッシュ内でKerberos証明書を使おうとするでしょう。YARN上に配備されたFlinkジョブ内のKafkaコネクタを使う場合は、チケットキャッシュを使ったKerberos認証は動作しないだろうことに注意してください。チケットキャッシュを使った認証はMesos配備についてサポートされないため、これはMesosを使った場合にも同じです。
    • security.kerberos.login.keytabsecurity.kerberos.login.principal: 代わりにKerberos keytabを使うには、これらのプロパティの両方の値を設定します。
  2. KafkaClientsecurity.kerberos.login.contextsに追加します: これはFlinkにKafka認証に使われるKafkaログイン コンテキストに設定されたKerberos証明書を提供するように伝えます。

一度KerberosベースのFlinkセキュリティが有効になると、内部的なKafkaクライアントに渡される指定されたプロパティ設定内で以下の2つの設定を単純に含めることで、Flink Kafkaコンシューマあるいはプロデューサのどちらかを使ってKafkaを認証することができます。

  • security.protocolSASL_PLAINTEXT (デフォルト NONE)に設定します: Kafkaブローカーと通信するために使われるプロトコル。スタンドアローンのFlink配備を使う場合、SASL_SSLを使うこともできます; SSLのためにKafkaクライアントをどうやって設定するかはここを見てください
  • sasl.kerberos.service.namekafka (デフォルト kafka) に設定します: これの値はKafkaブローカー設定に使われるsasl.kerberos.service.name に一致しなければなりません。クライアントとサーバ設定の間のサービス名の不一致は認証を失敗させるでしょう。

KerberosセキュリティのためのFlinkの設定についての詳しい情報はここを見てください。Flinkがどのように内部的にKerberosベースのセキュリティをセットアップするかについての更なる詳細はここで 見つけることもできます。

上に戻る

TOP
inserted by FC2 system