Apache Kafka コネクタ

このコネクタはApache Kafkaによって提供されるイベントストリームへのアクセスを提供します。

FlinkはデータをKafkaトピックから/への読み込みおよび書き込みのための特別なKafkaコネクタを提供します。Flink Kafka コンシューマは確実に1回の処理のセマンティクスを提供するためにFlinkのチェックポイント機構を統合します。これを行うために、FlinkはKafkaのコンシューマグループのオフセットトラッキングに完全に依存しているわけではありませんが、内部的にもこれらのオフセットを追跡およびチェックポイントを行います。

ユースケースと環境のために、パッケージ(maven artifact id)とクラス名を選択してください。ほとんどのユーザについては、FlinkKafkaConsumer08 (flink-connector-kafkaの一部) が適切でしょう。

Maven 依存 以下からサポートされています コンシューマと
プロデューサ クラス名
Kafka のバージョン 備考
flink-connector-kafka-0.8_2.10 1.0.0 FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x KafkaのSimpleConsumer API を内部的に使います。オフセットはFlinkによって ZK にコミットされます。
flink-connector-kafka-0.9_2.10 1.0.0 FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x 新しいコンシューマ API Kafka を使います。
flink-connector-kafka-0.10_2.10 1.2.0 FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x このコネクタは生成と消費の両方のために、タイムスタンプを持つKafkaメッセージをサポートします。

そして、mavenプロジェクトにコネクタをインポートします:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます

Apache Kafkaのインストール

  • コードをダウンロードしサーバを起動する(アプリケーションを開始する前に毎回ZookeeperとKafkaサーバを起動する必要があります)ために、Kafkaのクイックスタート の説明に従います。
  • Kafka および Zookeeper サーバがリモートマシーン上で動いている場合、config/server.propertiesファイル内のadvertised.host.name 設定がマシーンのIPアドレスに設定されている必要があります。

Kafka コンシューマ

FlinkのKafka コンシューマは FlinkKafkaConsumer08 (あるいはKafka 0.9.0.xバージョンについては 09 など) と呼ばれます。1つ以上のKafkaトピックへのアクセスを提供します。

コンストラクタは以下の引数を受け付けます:

  1. トピック名 / トピック名のリスト
  2. Kafkaからのデータをデシリアライズするための DeserializationSchema / KeyedDeserializationSchema
  3. Kafkaコンシューマに関するプロパティ。以下のプロパティが必要です:
    • “bootstrap.servers” (Kafkaブローカーのカンマ区切りのリスト)
    • “zookeeper.connect” (Zookeeper サーバのカンマ区切りのリスト) (Kafka 0.8 でのみ必要です)
    • “group.id” コンシューマグループのid

例:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
	.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
    .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
    .print

現在のFlinkKafkaConsumerの実装はトピックおよびパーティションのリストのクエリのために(コンストラクタを呼ぶ時に)クライアントから接続を確立するでしょう。

これが動作するためには、コンシューマはFlinkクラスタにジョブを発行するマシーンからコンシューマにアクセスできる必要があります。クライアント側のKafkaコンシューマで何か問題がある場合は、クライアントログが失敗したリクエストなどについての情報を含んでいるかもしれません。

DeserializationSchema

Flink Kafka コンシューマはKafka内のバイナリデータを Java/Scala オブジェクトに変換する方法を知る必要があります。DeserializationSchema によりユーザはそのような方法を指定することができます。Kafkaから値を渡す時に、T deserialize(byte[] message) メソッドは各Kafkaメッセージごとに呼ばれます。

通常はAbstractDeserializationSchemaから始めるのが役に立ちます。これは 生成された Java/Scala 型の Flink の型システムに説明する面倒をみます。vanilla DeserializationSchema を実装するユーザはgetProducedType(...) メソッド自身を実装する必要があります。

Kafkaメッセージのキーおよび値の両方にアクセスするために、KeyedDeserializationSchema は以下のデシリアライズ メソッド` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)` を持ちます。

利便性のために、Flinkは以下のスキーマを提供します:

  1. TypeInformationSerializationSchema (および TypeInformationKeyValueSerializationSchema) はFlinkのTypeInformationに基づいたスキーマを生成します。これはデータがFlinkによって書き込みおよび読み込みの両方をされる場合に有用です。This schema is a performant Flink-specific alternative to other generic serialization approaches.

  2. JsonDeserializationSchema (および JSONKeyValueDeserializationSchema) はシリアライズ化された JSON を ObjectNode オブジェクトに変換します。これは objectNode.get(“field”).as(Int/String/…)() を使ってアクセスすることができます。KeyValue objectNode は 全てのフィールドを含む “key” および “value” フィールドと、このメッセージについてのオフセット/パーティション/トピックを公開する任意の “metadata” フィールドを含みます。

Kafka コンシューマのスタート位置の設定

Flink Kafkaコンシューマにより、Kafkaパーティションの開始位置がどのように決定されるかを設定することができます。

例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromGroupOffsets()  // the default behaviour

val stream = env.addSource(myConsumer)
...

全てのバージョンの Flink Kafka コンシューマは開始位置についての上の明示的な設定メソッドを持ちます。

  • setStartFromGroupOffsets (デフォルトの挙動): Kafkaブローカー(あるいはKafka 0.8については Zookeeper)内のコンシューマグループ (コンシューマプロパティ内のgroup.id 設定) のコミットされたオフセットからパーティションを読み込みます。もしパーティションについてのオフセットが見つけることができない場合は、パーティション内のauto.offset.reset が使われるでしょう。
  • setStartFromEarliest() / setStartFromLatest(): 最も早い/最新のレコードから開始します。これらのモード下では、Kafka内でコミットされたオフセットは無視され、開始位置としては使われないでしょう。

これらの設定は、障害時に自動的にジョブが回復される場合やセーブポイントを使って手動で回復される場合には、開始位置に影響しないことに注意してください。回復時には、各Kafkaパーティションの開始位置はセーブポイントあるいはチェックポイント内に格納されたオフセットから決定されます (コンシューマのための耐障害性を有効にするためのチェックポイントについての情報は次の章を見てください)。

Kafka コンシューマと耐障害性

Flinkのチェックポイントを有効にしながら、FlinkのKafkaコンシューマはトピックからのレコードを消費し、定期的に全てのKafkaのオフセットを他のオペレーションの状態と一緒に矛盾の無い方法でチェックポイントするでしょう。ジョブの障害時の場合は、Flinkはストリーミングプログラムを最新のチェックポイントの状態へ回復し、チェックポイント内に格納されたオフセットから開始してKafkaからのレコードを再消費するでしょう。

従ってチェックポイントの取り出しの間隔は、障害時にプログラムが最大でどれだけ戻らなければならないかもしれないかを定義します。

Kafkaコンシューマの耐障害性を使うために、実行環境でトポロジのチェックポイントが有効にされなければなりません。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

もしトポロジを再起動するために十分な処理スロットが利用可能な場合は、Flinkはトポロジを再起動だけすることができることにも注意してください。つまり、もしTaskManagerの喪失のためにトポロジが失敗する場合は、後で利用可能な十分のスロットがそれでもまだなければなりません。YARN上のFlinkは喪失したYARNコンテナの自動的な再起動をサポートします。

チェックポイントが有効ではない場合は、Kafkaコンシューマは定期的にZooKeeperにオフセットをコミットするでしょう。

Kafka コンシューマとタイムスタンプの抽出/ウォータマークの発行

多くのシナリオでレコードのタイムスタンプがレコード自身に(明示的にあるいは暗黙的に)埋め込まれます。更に、ユーザは定期的に、あるいは不規則な方法のどちらかで、ウォーターマークを発行したいと思うかもしれません。例えば、現在のイベント時間のウォーターマークを含むKafkaストリーム内の特別なレコードに基づいて。このような場合には、Flink KafkaコンシューマはAssignerWithPeriodicWatermarks あるいは AssignerWithPunctuatedWatermarks の仕様を許容します。

ここで説明されるような独自のタイムスタンプのエクストラクタ/ウォータマークのエミッタを指定するか、あるいは事前定義されたものから1つを使うことができます。そうした後で、それを以下の方法でコンシューマに渡すことができます:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

DataStream<String> stream = env
	.addSource(myConsumer)
	.print();
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
stream = env
    .addSource(myConsumer)
    .print

内部的には、アサイナーのインスタンスがKafkaパーティションごとに実行されます。そのようなアサイナーが指定された場合、Kafkaから読み込まれた各レコードについて、タイムスタンプをレコードに割り当てるためにextractTimestamp(T element, long previousElementTimestamp) が呼び出され、新しいウォーターマークが発行されるべきか、そしてどのタイムスタンプと一緒に発行されるべきかを決めるために、Watermark getCurrentWatermark() (定期的) あるいは Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) (時々中断して) が呼ばれます。

Kafka プロデューサ

Flinkの Kafka プロデューサは FlinkKafkaProducer08 (Kafka 0.9.0.xバージョンなどでは 09) と呼ばれます。1つ以上のKafkaトピックへレコードのストリームを書き込むことができます。

例:

DataStream<String> stream = ...;

FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema

// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false);   // "false" by default
myProducer.setFlushOnCheckpoint(true);  // "false" by default

stream.addSink(myProducer);
DataStream<String> stream = ...;

FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
        stream,                     // input stream
        "my-topic",                 // target topic
        new SimpleStringSchema(),   // serialization schema
        properties);                // custom configuration for KafkaProducer (including broker list)

// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false);   // "false" by default
myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer08[String](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new SimpleStringSchema)   // serialization schema

// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false)   // "false" by default
myProducer.setFlushOnCheckpoint(true)  // "false" by default

stream.addSink(myProducer)
val stream: DataStream[String] = ...

val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
        stream,                   // input stream
        "my-topic",               // target topic
        new SimpleStringSchema,   // serialization schema
        properties)               // custom configuration for KafkaProducer (including broker list)

// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false)   // "false" by default
myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default

上の例は1つのKafkaの目的トピックにストリームを書き込むためにFlink Kafkaプロデューサを作成する基本的な方法を説明します。もっと進んだ使い方については、以下を提供することができる他のコンストラクタの変異種があります:

  • 独自のプロパティの提供: プロデューサは内部的なKafkaProducerのための独自のプロパティ設定を提供することができます。Kafkaプロデューサを設定する方法についての詳細はApache Kafka ドキュメントを参照してください。
  • 独自のパーティショナー: レコードを特定のパーティションに割り当てるために、コンストラクタにKafkaPartitionerの実装を提供することができます。このパーティショナーはレコードが正確にどのパーティションに送られるかを決定するために、ストリーム内の各レコードごとに呼ばれるでしょう。
  • 更に進んだシリアライズ化スキーマ: コンシューマと似て、プロデューサもKeyedSerializationSchemaと呼ばれる進んだシリアライザ化スキーマを使うことができます。これによりキーと値をそれぞれシリアライズ化することができます。1つのプロデューサのインスタンスがデータを複数のトピックに送信できるように、目的のトピックを上書きすることもできます。

Kafka プロデューサと耐障害性

Flinkのチェックポイントを有効にして、Flink Kafkaプロデューサは少なくとも1回の配送の保証を提供することができます。

Flinkのチェックポイントに加えて、前の章の例で見られるように、セッター メソッドsetLogFailuresOnly(boolean)setFlushOnCheckpoint(boolean) を適切に設定する必要もあります:

  • setLogFailuresOnly(boolean): これを有効にすると、プロデューサは障害をキャッチおよび再び投げるだけの代わりに、記録するでしょう。これは、もし目的のKafkaトピックに一度も書き込まれなかったとしても、原則的にレコードが成功したと見なします。これは少なくとも1回のためには無効にされるべきです。
  • setFlushOnCheckpoint(boolean): with this enabled, Flink’s checkpoints will wait for any on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before succeeding the checkpoint. これは全てのレコードがチェックポイントの前にKafkaに書き込まれることを保証します。これは少なくとも1回のために有効にされるべきです。

注意: デフォルトでは、再試行の数は “0” に設定されます。このことは、setLogFailuresOnlyfalseに設定された場合、プロデューサはリーダーの変更を伴ってエラー時にすぐに失敗することを意味します。デフォルトで値が “0” に設定されることにより、再試行によって目的のトピック内でメッセージが複製されることを防ぎます。ブローカーの変更がたびたび起こるほとんどのプロダクション環境については、再試行の数を大きな値にすることをお勧めします。

注意: 現在のところKafkaのためのトランザクションなプロデューサは無いため、FlinkはKafkaトピックへの確実に1回の配送の保証をすることができません。

Apache Kafka 0.10+ から、Kafkaのメッセージは、イベントが起こった時間(Apache Flink の “event time”を見てください)あるいはメッセージがKafka ブローカーに書き込まれた時間を示す、timestampsを伝えることができます。

FlinkKafkaConsumer010 は、もしFlink内のタイムスタンプの特徴がTimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)) に設定されている場合、タイムスタンプを付けてレコードを発行するでしょう。

Kafkaのコンシューマはウォーターマークを発行しません。ウォーターマークを発行するには、assignTimestampsAndWatermarksメソッドを使って上の“Kafka コンシューマとタイムスタンプの抽出/ウォーターマークの発行” で説明されたものと同じ仕組みが適用可能です。

Kafkaからのタイムスタンプを使う時にタイムスタンプのエクストラクタを定義する必要はありません。extractTimestamp()メソッドのpreviousElementTimestamp引数はKafkaメッセージによって運ばれるタイムスタンプを含みます。

Kafkaコンシューマのためのタイムスタンプ エクストラクタはこのように見えるでしょう:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

FlinkKafkaProducer010 は、setWriteTimestampToKafka(true)が設定された場合、タイムスタンプのレコードのみを発行します。

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);

Kafka コネクタのメトリックス

Flinkの Kafka コネクタは、コネクタの挙動を解析するためにFlinkのメトリクス システムを使って幾つかのメトリクスを提供します。プロデューサはFlinkのメトリック システムを使ってサポートされる全てのバージョンについて、Kafkaの内部メトリクスを出力します。コンシューマはKafkaバージョン 0.9から全てのメトリクスを出力します。Kafkaのドキュメントはドキュメントの中で全ての出力されるメトリクスをリスト化します。

これらのメトリクスに加えて、全てのコンシューマは各トピックのパーティションについて現在のオフセットコミットされたオフセット を出力します。現在のオフセット はパーティション内の現在のオフセットを参照します。これは取り出しおよび発行に成功した最後の要素のオフセットを参照します。コミットされたオフセット は最後にコミットされたオフセットです。

FlinkでのKafka のコンシューマはオフセットを Zookeeper (Kafka 0.8) あるいは Kafka ブローカー (Kafka 0.9+) に返します。チェックポイントが無効の場合、オフセットは定期的にコミットされます。With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they’ve created a checkpoint of their state. これはユーザにZookeeperあるいはブローカーにコミットされたオフセットについて少なくとも1回のセマンテックを提供します。Flinkにチェックポイントされたオフセットについては、システムは確実に1回の保証を提供します。

ZKあるいはブローカーにコミットされたオフセットはKafkaコンシューマの読み込みの進捗を追跡するために使うこともできます。各パーティション内でのコミットされたオフセットと最も最近のオフセットの違いはconsumer lagと呼ばれます。Flinkのトポロジが新しいデータが追加されるよりもトピックからの消費が遅い場合は、遅延が増え、コンシューマは遅延するでしょう。大規模なプロダクション配備については、レイテンシが増加することを防ぐためにメトリックを監視することをお勧めします。

Kerberos 認証の有効化 (バージョン 0.9+以上のみ)

Flink provides first-class support through the Kafka connector to authenticate to a Kafka installation configured for Kerberos. KafkaがKerberos認証を有効にするように、以下のようにしてflink-conf.yaml内でFlinkを単純に設定します:

  1. 以下を設定することでKerberos証明書を設定します -
    • security.kerberos.login.use-ticket-cache: デフォルトでは、これはtrueで、Flinkは kinit.によって管理されるチケットキャッシュ内でKerberos証明書を使おうとするでしょう。YARN上に配備されたFlinkジョブ内のKafkaコネクタを使う場合は、チケットキャッシュを使ったKerberos認証は動作しないだろうことに注意してください。チケットキャッシュを使った認証はMesos配備についてサポートされないため、これはMesosを使った場合にも同じです。
    • security.kerberos.login.keytabsecurity.kerberos.login.principal: 代わりにKerberos keytabを使うには、これらのプロパティの両方の値を設定します。
  2. KafkaClientsecurity.kerberos.login.contextsに追加します: これはFlinkにKafka認証に使われるKafkaログイン コンテキストに設定されたKerberos証明書を提供するように伝えます。

一度KerberosベースのFlinkセキュリティが有効になると、内部的なKafkaクライアントに渡される指定されたプロパティ設定内で以下の2つの設定を単純に含めることで、Flink Kafkaコンシューマあるいはプロデューサのどちらかを使ってKafkaを認証することができます。

  • security.protocolSASL_PLAINTEXT (デフォルト NONE)に設定します: Kafkaブローカーと通信するために使われるプロトコル。スタンドアローンのFlink配備を使う場合、SASL_SSLを使うこともできます; SSLのためにKafkaクライアントをどうやって設定するかはここを見てください
  • sasl.kerberos.service.namekafka (デフォルト kafka) に設定します: これの値はKafkaブローカー設定に使われるsasl.kerberos.service.name に一致しなければなりません。クライアントとサーバ設定の間のサービス名の不一致は認証を失敗させるでしょう。

KerberosセキュリティのためのFlinkの設定についての詳しい情報はここを見てください。Flinkがどのように内部的にKerberosベースのセキュリティをセットアップするかについての更なる詳細はここで 見つけることもできます。

TOP
inserted by FC2 system