<

ドキュメント

Kafka 2.7 Documentation

以前のリリース: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X.

8. Kafka接続

8.1概要

KafkaコネクトはApache Kafkaと他のシステム間のスケーラブルで信頼できるストリーミングデータのためのツールです。データの大きなコレクションをKafkaへ、あるいは から、移動するコネクタを素早く定義することを簡単にします。Kafkaコネクタはデータベース全体を取り込み、あるいは全てのアプリケーションサーバからKafkaトピックへ、低レンテンシのストリームの処理が可能なデータを作りながら、メトリクスを集めることができます。抽出ジョブはデータをKafkaトピックから2次のストレージとクエリシステムへ、あるいはオフラインの解析のためにバッチシステムへ配送することができます。

Kafkaコネクトの機能は以下を含みます:

  • Kafkaコネクタのための共通のフレームワーク - Kafkaコネクトはコネクタの開発、配備および管理を単純化し、Kafkaを使って他のデータシステムの統合を標準化します。
  • 分散およびスタンドアローン モード - 大規模、組織全体をサポートする集中的に管理されたサービスのためにスケールアップ、あるいは開発、テスト、小規模な運用配備のためにスケールダウン
  • REST インタフェース - 使いやすいREST APIを使って、Kafkaコネクト クラスタへのコネクタをサブミットおよび管理する
  • 自動オフセット管理 - コネクタからのほんの少しの情報を使って、Kafkaコネクタはオフセットのコミットプロセスを自動的に管理するので、コネクタの開発者はこのエラーが起きがちなコネクタの開発の部分について心配する必要がありません。
  • デフォルトで分散およびスケーラブル - Kafka コネクタは既存のグループ管理のプロトコルを土台にします。Kafkaコネクタ クラスタをスケールアップするために、より多くのワーカーを追加することができます。
  • ストリーミング/バッチの統合 - Kafkaの既存の機能を利用することで、Kafkaコネクトはストリーミングとバッチデータシステムの間を埋める理想的な解決法です。

8.2ユーザガイド

クリックスタートはKafkaコネクトのスタンドアローン バージョンを実行する方法の短い例を提供します。この章はKafkaコネクトをもっと詳細に設定、実行および管理する方法を説明します。

Kafka コネクトの実行

Kafkaコネクトは現在のところ2つの実行のモードをサポートします: スタンドアローン (1つのプロセス) および 分散。

スタンドアローン モードでは、全ての仕事が1つのプロセスの中で実施されます。この設定はセットアップと開始するのがより簡単で、ただ1つのワーカーだけが意味がある状況で有用かもしれません (例えば、ログファイルの収集)。しかしそれは耐障害性のようなKafkaコネクトの幾つかの機能からの恩恵を受けません。T以下のコマンドを使ってスタンドアローンプロセスを開始することができます:

    > bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
    

最初のパラメータはワーカーのための設定です。これはKafkaコネクトパラメータ、シリアライズ化形式、およびオフセットをどれだけの頻度でコミットするかのような設定を含みます。提供される例は config/server.propertiesで提供されるデフォルトの設定で実行中のローカルクラスタと良く連携する筈です。異なる設定あるいはプロダクションの配備で使うには調整する必要があるでしょう。全てのワーカー(スタンドアローンおよび分散の両方)は2,3の設定を必要とします:

  • bootstrap.servers - Kafkaへの起動接続に使われるKafkaサーバのリスト
  • key.converter - Kafkaコネクトの形式と、Kafkaへ書き込まれるシリアライズ化された形式の間で変換するために使われるConverterクラス。これはKafkaに書き込まれる、あるいはKafkaから読み込まれるキーの形式を制御します。これはコネクタから独立しているため、どのようなコネクタもどのようなシリアライズ化形式とも連携することができます。JSONとAvroを含む一般的な形式の例。
  • value.converter - Kafkaコネクトの形式と、Kafkaへ書き込まれるシリアライズ化された形式の間で変換するために使われるConverterクラス。これはKafkaに書き込まれる、あるいはKafkaから読み込まれる値の形式を制御します。これはコネクタから独立しているため、どのようなコネクタもどのようなシリアライズ化形式とも連携することができます。JSONとAvroを含む一般的な形式の例。

スタンドアローンモードに固有の重要な設定オプションは以下の通りです:

  • offset.storage.file.filename - オフセットデータを格納するファイル

ここで設定されたパラメータはKafkaコネクトによって使われるプロデューサおよびコンシューマが設定、オフセットおよび状態トピックスにアクセスすることを目的としています。Kafka ソースタスクで使われるプロデューサと、Kafkaシンクタスクで使われるコンシューマの構成については、同じパラメータを使うことができますが、それぞれ producer.consumer. をプリフィックスとして付ける必要があります。ワーカー構成からプリフィックス無しで継承される唯一の Kafka クライアントパラメータは、bootstrap.servers です。同じクラスタが全ての目的で使われるため、ほとんどの場合これで十分です。注目に値する例外はセキュアなクラスタで、これは接続を許可するために特別なパラメータを必要とします。これらのパラメータは、管理アクセス用、Kafka ソース用、Kafka シンク用に1回、ワーカー構成で最大3回設定する必要があります。

2.3.0 以降は、Kafka ソースあるいは Kafka シンクに対してそれぞれプリフィックス producer.override. および consumer.override. を使って、クライアント構成オーバーライドをコネクタごとに個別に設定できます。これらのオーバーライドは、コネクタの残りの構成プロパティに含まれています。

残りのパラメータはコネクタの設定ファイルです。欲しいだけ含めることができますが、全ては同じプロセス内(の異なるスレッド上)で実行されるでしょう。

分散モードは作業の自動的なバランシングを処理し、動的にスケールアップ(あるいはダウン)することができ、アクティブなタスクと設定およびオフセットコミットデータの両方に耐障害性を提供します。実行はスタンドアローンモードにとても似ています:

    > bin/connect-distributed.sh config/connect-distributed.properties
    

違いは、開始されるクラスと、Kafka コネクトプロセスがどこに設定を格納し、どのように作業を割り当て、オフセットとタスクの状態をどこに格納するのかを決定する方法を変更する構成パラメータです。分散モードでは、Kafkaコネクトはオフセット、設定およびタスクの状態をKafkaトピックの中に格納します。望ましい数のパーティションとリプリケーションファクターを達成するために、手動でオフセット、設定およびステータスのためのトピックを作成することをお勧めします。Kafkaコネクトが開始した時にトピックがまだ生成されていない場合、トピックはデフォルトの数のパーティションとレプリケーション ファクターで自動的に生成されるでしょう。これは使用目的に合致しないかもしれません。

特に上で述べられた一般的な設定に加えて以下の設定パラメータは、クラスタを開始する前に設定することが重要です:

  • group.id (デフォルト connect-cluster) - クラスタのためのユニークな名前。コネクト クラスタグループを生成する時に使用される; これはコンシューマグループのIDと衝突してはいけないことに注意してください。
  • config.storage.topic (デフォルト connect-configs) - コネクタとタスクの設定を格納するために使われるトピック。これは1つのパーティション、高度にレプリケート、圧縮されたトピックでなければなりません。自動生成されたトピックが複数のパーティションを持つか、圧縮よりも削除のための自動的な設定をされているかもしれないため、正しい設定がされるように手動でトピックを生成する必要があるかもしれません。
  • offset.storage.topic (デフォルト connect-offsets) - オフセットを格納するために使われるトピック; このトピックは多くのパーティションを持ち、リプリケートされ、圧縮のための設定をされていなければなりません。
  • status.storage.topic (デフォルト connect-status) - 状態を格納するために使われるトピック; このトピックは複数のパーティションを持つかもしれません。レプリケートされ、圧縮のための設定をされていなければなりません。

分散モードでは、コネクタの設定はコマンドライン上で渡されないことに注意してください。コネクタを生成、修正および破棄するために、以下で説明するREST APIを代わりに使ってください。

コネクタの設定

コネクタの設定は単純なキー-値マッピングです。スタンドアローンモードについては、これらはプロパティファイルの中で定義され、コマンドリアン上でコネクタ プロセスに渡されます。分散モードでは、それらはコネクタを作成(あるいは修正)するリクエストのためのJSONペイロード内に含まれるでしょう。

ほとんどの設定はコネクタに依存しますので、ここでは概略を説明されないでしょう。しかし、2,3の共通のオプションがあります:

  • name - コネクタのためのユニークな名前。同じ名前をもう一度登録しようとすると失敗するでしょう。
  • connector.class - コネクタのためのJavaクラス
  • tasks.max - このコネクタのために生成されなければならないタスクの最大数。もしこのレベルの並行度を実現できないなら、コネクタはより少ないタスクを生成するかもしれません。
  • key.converter - (任意) ワーカーによって設定されるデフォルトのキー コンバータを上書きします。
  • value.converter - (任意) ワーカーによって設定されるデフォルトの値コンバータを上書きします。

connector.class設定はいくつかの形式をサポートします: このコネクタのためのクラスの完全な名前あるいはエイリアス。もしコネクタが org.apache.kafka.connect.file.FileStreamSinkConnector なら、設定を少し短くするために、この完全な名前を指定するかあるいはFileStreamSinkConnectorを使うことができます。

シンク コネクタも入力を制御するための2,3の追加のオプションを持ちます。各シンク コネクタは以下のうちの1つを設定する必要があります:

  • topics - このコネクタに関して入力として使うトピックのカンマ区切りのリスト
  • topics.regex - このコネクタに関して入力として使うトピックのJavaの正規表現

他のオプションについては、コネクタのためのドキュメントを参考にする必要があります。

変換

コネクタは軽量の1度に1つのメッセージ修正を行うために変換と一緒に設定することができます。それれはデータメッセージングとイベントルーティングをするのに便利かもしれません。

変換のチェーンはコネクタの設定の中で指定することができます。

  • transforms - どの変換が適用されるかの順番を指定する、変換のためのエイリアスのリスト。
  • transforms.$alias.type - 変換のための完全修飾クラス名。
  • transforms.$alias.$transformationSpecificConfig 変換のためのプロパティの設定

例えば、組み込みのファイルソースのコネクタを取り上げ、静的なフィールドを追加するための変換を使います。

例の至る所でスキーマレスのJSONデータ形式を使うつもりです。スキーマレス形式を使うために、connect-standalone.properties内の以下の2行をtrueからfalseに変更しました。

        key.converter.schemas.enable
        value.converter.schemas.enable
    

ファイル ソース コネクタは文字列として各行を読みます。各行をMap内にラップし、イベントの元を識別するために2つ目のフィールドを追加しましょう。これを行うには、2つの変換を使います:

  • Map内に入力行を配置するためにHoistField
  • 静的フィールドを追加するためにInsertFieldこの例では、レコードがファイル コネクタから来ると示します。

変換を追加した後で、connect-file-source.properties ファイルは以下のように見えます:

        name=local-file-source
        connector.class=FileStreamSource
        tasks.max=1
        file=test.txt
        topic=connect-test
        transforms=MakeMap, InsertSource
        transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
        transforms.MakeMap.field=line
        transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
        transforms.InsertSource.static.field=data_source
        transforms.InsertSource.static.value=test-file-source
    

transformsから始まる全ての行は変換のために追加されました。2つの作成した変換を見つけることができます: "InsertSource" と "MakeMap" は変換を与えると選択したエイリアスです。変換の型は以下で分かるように組み込みの変換のリストに基づいています。各変換の型は追加の設定を持ちます: HoistField は "field" と呼ばれる設定を必要とします。これはファイルからの元の文字列を含むmap内のフィールド名です。InsertFieldにより追加した変換フィールド名と値を指定することができます。

変換無しにサンプルのファイル上でそのファイル ソース コネクタを実行し、それらを kafka-console-consumer.sh を使って読み込んだ時、結果は以下の通りでした:

        "foo"
        "bar"
        "hello world"
   

そして、新しいファイル コネクタを作成し、今度は設定ファイルに変換を追加します。今度は結果は以下の通りになるでしょう:

        {"line":"foo","data_source":"test-file-source"}
        {"line":"bar","data_source":"test-file-source"}
        {"line":"hello world","data_source":"test-file-source"}
    

読んだ行が今ではJSON mapの一部で、指定した静的な値を持つ追加のフィールドがあることが分かります。これが変換を使って行うことができることの1つの例です。

含まれる変換

幾つかのより広く適用可能なデータとルーティングの変換がKafkaコネクトに含まれています:

  • InsertField - 静的なデータあるいはレコードのメタデータのどちらかを使ってフィールドを追加する
  • ReplaceField - フィールドをフィルターあるいはリネームする
  • MaskField - 型の有効な null 値(0、空文字など)、またはカスタム置換(空では無い文字列mたは数値のみ)でフィールドを置き換えます
  • ValueToKey - レコードキーをレコードの値のフィールドの部分集合から形成された新しいキーで置き換えます
  • HoistField - イベント全体を1つのフィールドとしてStructあるいはMap内にラップする
  • ExtractField - StructおよびMapから特定のフィールドを抽出し、このフィールドだけを結果に含める
  • SetSchemaMetadata - スキーマ名あるいはバージョンを修正する
  • TimestampRouter - 元のトピックおよびタイムスタンプに基づいてレコードのトピックを修正する。タイムスタンプに基づいて異なるテーブルあるいはインデックスに書き込む必要があるシンクを使う時に便利です
  • RegexRouter - 元のトピック、置換文字および正規表現に基づいてレコードのトピックを修正する
  • Filter - 以降の全ての処理からメッセージを削除します。これは、述語とともに使われ、特定のメッセージを選択的にフィルタします。

各変換を設定する方法の詳細は以下でリスト化されます:

org.apache.kafka.connect.transforms.InsertField
レコードのメタデータあるいは設定された静的な値から属性を使ってフィールド(s)を挿入します。

レコードキー(org.apache.kafka.connect.transforms.InsertField$Key) あるいは値 (org.apache.kafka.connect.transforms.InsertField$Value) のために設計された具体的な変換型を使います。

  • offset.field

    Kafkaオフセットのためのフィールド名 - シンク コネクタへのみ適用可能。
    これを必須フィールドにするには ! を、任意(デフォルト)にするには ? を後ろに付けます。

    Type:文字列
    デフォルト:null
    有効な値:
    重要:medium
  • partition.field

    Kafkaパーティションのためのフィールド名。これを必須フィールドにするには ! を、任意(デフォルト)にするには ? を後ろに付けます。

    Type:文字列
    デフォルト:null
    有効な値:
    重要:medium
  • static.field

    静的なデータフィールドのためのフィールド名。これを必須フィールドにするには ! を、任意(デフォルト)にするには ? を後ろに付けます。

    Type:文字列
    デフォルト:null
    有効な値:
    重要:medium
  • static.value

    フィールド名が設定された場合、静的なフィールド値。

    Type:文字列
    デフォルト:null
    有効な値:
    重要:medium
  • timestamp.field

    レコードのタイムスタンプのためのフィールド名。これを必須フィールドにするには ! を、任意(デフォルト)にするには ? を後ろに付けます。

    Type:文字列
    デフォルト:null
    有効な値:
    重要:medium
  • topic.field

    Kafkaトピックのためのフィールド名。これを必須フィールドにするには ! を、任意(デフォルト)にするには ? を後ろに付けます。

    Type:文字列
    デフォルト:null
    有効な値:
    重要:medium
org.apache.kafka.connect.transforms.ReplaceField
フィールドをフィルターあるいはリネームする

レコードキー(org.apache.kafka.connect.transforms.ReplaceField$Key) あるいは値 (org.apache.kafka.connect.transforms.ReplaceField$Value)のために設計された具体的な変換型を使います

  • exclude

    除外するフィールド。This takes precedence over the fields to include.

    Type:list
    デフォルト:""
    有効な値:
    重要:medium
  • include

    含むフィールド。指定された場合、これらのフィールドだけが使われるでしょう。

    Type:list
    デフォルト:""
    有効な値:
    重要:medium
  • renames

    フィールドのリネーム マッピング。

    Type:list
    デフォルト:""
    有効な値:カンマで区切られたペアのリスト。例えば foo:bar,abc:xyz
    重要:medium
  • blacklist

    廃止されました。Use exclude instead.

    Type:list
    デフォルト:null
    有効な値:
    重要:low
  • whitelist

    廃止されました。Use include instead.

    Type:list
    デフォルト:null
    有効な値:
    重要:low
org.apache.kafka.connect.transforms.MaskField
指定されたフィールドをフィールド型にとって有効なnull値にマスクする (つまり、0, false, 空文字 など)。

数値フィールドと文字列フィールドの場合、正しい型に変換されるオプションの置換値を指定することができます。

レコードキー (org.apache.kafka.connect.transforms.MaskField$Key) あるいは値 (org.apache.kafka.connect.transforms.MaskField$Value) のために設計された具体的な変換型を使います。

  • fields

    マスクするフィールドの名前。

    Type:list
    デフォルト:
    有効な値:空では無いリスト
    重要:high
  • replacement

    全ての 'フィールド' 値に適用されるカスタム値の置換(数値あるいは空では無い文字列値のみ)。

    Type:文字列
    デフォルト:null
    有効な値:空では無い文字列
    重要:low
org.apache.kafka.connect.transforms.ValueToKey
レコードキーをレコードの値のフィールドの部分集合から形成された新しいキーで置き換える。

  • fields

    レコードキーとして抽出するためのレコード値上のフィールド名。

    Type:list
    デフォルト:
    有効な値:空では無いリスト
    重要:high
org.apache.kafka.connect.transforms.HoistField
スキーマが存在する時に、Struct内の指定したフィールド名を使ってデータをラップ、あるいはスキーマレスのデータの場合はMapします。

レコードキー (org.apache.kafka.connect.transforms.HoistField$Key) あるいは値 (org.apache.kafka.connect.transforms.HoistField$Value)のために設計された具体的な変換型を使います。

  • フィールド

    結果のStructあるいはMap内で生成されろうだろう1つのフィールドのフィールド名。

    Type:文字列
    デフォルト:
    有効な値:
    重要:medium
org.apache.kafka.connect.transforms.ExtractField
スキーマが存在する時にStructから指定されたフィールドを抽出、あるいはスキーマレスのデータの場合はMapします。null値は修正無しに渡されます。

レコードキー (org.apache.kafka.connect.transforms.ExtractField$Key) あるいは値 (org.apache.kafka.connect.transforms.ExtractField$Value) のために設計された具体的な変換型を使います。

  • フィールド

    抽出するフィールド名。

    Type:文字列
    デフォルト:
    有効な値:
    重要:medium
org.apache.kafka.connect.transforms.SetSchemaMetadata
スキーマ名、バージョン あるいは両方をレコードのキー (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key) あるいは値 (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) スキーマに設定します。

  • schema.name

    設定するスキーマ名。

    Type:文字列
    デフォルト:null
    有効な値:
    重要:high
  • schema.version

    設定するスキーマのバージョン。

    Type:dint
    デフォルト:null
    有効な値:
    重要:high
org.apache.kafka.connect.transforms.TimestampRouter
レコードのトピック フィールドを元のトピックの値とレコードのタイムスタンプの関数として更新します。

トピックフィールドはしばしば宛先のシステム(例えばデータベーステーブルあるいは検索インデックス名)の中の等価なエンティティ名を決定するために使われるため、これは主にシンク コネクタに便利です。

  • timestamp.format

    java.text.SimpleDateFormatと互換性のあるタイムスタンプのためのフォーマット文字列。

    Type:文字列
    デフォルト:yyyyMMdd
    有効な値:
    重要:high
  • topic.format

    トピックとタイムスタンプのそれぞれのプレースホルダーとして ${topic}${timestamp} を含むことができるフォーマット文字列。

    Type:文字列
    デフォルト:${topic}-${timestamp}
    有効な値:
    重要:high
org.apache.kafka.connect.transforms.RegexRouter
設定された正規表現と置き換え文字列を使ってレコードのトピックを更新します。

背後では、正規表現は java.util.regex.Patternにコンパイルされます。パターンが入力トピックに一致する場合、新しいトピックを取得するためにjava.util.regex.Matcher#replaceFirst() が置き換え文字列と一緒に使われます。

  • regex

    一致に使われる正規表現。

    Type:文字列
    デフォルト:
    有効な値:有効な正規表現
    重要:high
  • replacement

    置換文字列

    Type:文字列
    デフォルト:
    有効な値:
    重要:high
org.apache.kafka.connect.transforms.Flatten
設定可能なデリミタ文字を使って各レベルのフィールド名を結合することで、各フィールドについて名前を生成することで、入れ子になったデータ構造を平らにします。スキーマが存在する時はStructに、スキーマレスのデータの場合はMapを適用します。デフォルトのデリミタは '.' です。

レコードキー (org.apache.kafka.connect.transforms.Flatten$Key) あるいは値 (org.apache.kafka.connect.transforms.Flatten$Value) のために設計された具体的な変換型を使います。

  • デリミタ

    出力レコードのためのフィールド名を生成する時に、入力レコードからのフィールド名間に挿入するためのデリミタ

    Type:文字列
    デフォルト:.
    有効な値:
    重要:medium
org.apache.kafka.connect.transforms.Cast
特定の型へフィールドまたは、キーあるいは値をキャストします。例えば、整数フィールドをもっと小さな幅に。単純なプリミティブな型だけがサポートされます -- 整数、浮動小数点数、真偽値 および文字列。

レコードキー (org.apache.kafka.connect.transforms.Cast$Key) あるいは値 (org.apache.kafka.connect.transforms.Cast$Value) のために設計された具体的な変換型を使います。

  • spec

    List of fields and the type to cast them to of the form field1:type,field2:type to cast fields of Maps or Structs. A single type to cast the entire value. 有効な型は、int8, int16, int32, int64, float32, float64, boolean および string です。

    Type:list
    デフォルト:
    有効な値:カンマで区切られたペアのリスト。例えば foo:bar,abc:xyz
    重要:high
org.apache.kafka.connect.transforms.TimestampConverter
Unix epoch, 文字列 およびコネクト Date/Timestamp型のような異なる形式間でタイムスタンプを変換します。個々のフィールドあるいは値全体へ適用します。

レコードキー (org.apache.kafka.connect.transforms.TimestampConverter$Key) あるいは値 (org.apache.kafka.connect.transforms.TimestampConverter$Value) のために設計された具体的な変換型を使います。

  • target.type

    望ましいタイムスタンプの表現: string, unix, Date, Time あるいは Timestamp

    Type:文字列
    デフォルト:
    有効な値:
    重要:high
  • フィールド

    タイムスタンプを含むフィールド、あるいは値全体がタイムスタンプの場合は空

    Type:文字列
    デフォルト:""
    有効な値:
    重要:high
  • 形式

    タイムスタンプのための SimpleDateFormat互換の形式。type=string の場合は出力を生成するために使われます。あるいは入力が文字列の場合は入力をパースするために使われます。

    Type:文字列
    デフォルト:""
    有効な値:
    重要:medium
org.apache.kafka.connect.transforms.Filter
全てのレコードを削除し、チェーン内の後続の変換からそれらをフィルタリングします。これは特定の述語に一致する(あるいは一致しない)レコードをフィルタで除外するために、条件付きで使うことを目的としています。

述語

変換は述語を使って構成することができるため、変換はある条件を満たすメッセージのみに適用されます。特に、Filter変換と組み合わされた場合、述語は特定のメッセージを選択的にフィルタで除外することができます。

述語はコネクタ構成で指定されます。

  • predicates - 一部の変換に適用される述語のエイリアスのセット。
  • transforms.$alias.type - 変換のための完全修飾クラス名。
  • predicates.$alias.$predicateSpecificConfig - 述語のための構成プロパティ。

全ての変換には、暗黙的な構成プロパティ predicatenegate があります。predicular 述語は、変換の predicate 構成を述語のエイリアスに設定することで、変換に関連付けられます。述語の値は、negate 構成プロパティを使うことで反転することができます。

例えば、メッセージを多くの異なるトピックに生成するソースコネクタがあり、以下の事を実行するとします:

  • 'foo' トピック内のメッセージを完全に除外する
  • フィールド名 'other_field' の ExtractField 変換を、トピック 'bar' を除く全てのトピックのレコードに適用します

これを行うには、最初にトピック 'foo' を宛先とするレコードを除外する必要があります。フィルタ変換は、以降の処理からレコードを削除し、TopicNameMatches 述語を使って、特定の正規表現に一致するトピックのレコードにのみ変換を適用できます。TopicNameMatches の唯一の構成プロパティは、pattern で、これはトピック名と照合するための Java 正規表現です。構成は以下のようになります:

        transforms=Filter
        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
        transforms.Filter.predicate=IsFoo

        predicates=IsFoo
        predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
        predicates.IsFoo.pattern=foo
    

次に、レコードのトピック名が 'bar' ではない場合のみ、ExtractField を適用する必要があります。TopicNameMatches を直接使うことはできません。これは、一致しないトピック名ではなく、一致するトピック名に変換が適用されるためです。変換の暗黙的な negate 構成プロパティにより、述語が一致する一連のレコードを反転することができます。この構成を前の例に追加すると、以下のようになります:

        transforms=Filter,Extract
        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
        transforms.Filter.predicate=IsFoo

        transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
        transforms.Extract.field=other_field
        transforms.Extract.predicate=IsBar
        transforms.Extract.negate=true

        predicates=IsFoo,IsBar
        predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
        predicates.IsFoo.pattern=foo

        predicates.IsBar.type=org.apache.kafka.connect.predicates.TopicNameMatches
        predicates.IsBar.pattern=bar
    

Kafka コネクトには、以下の述語が含まれます:

  • TopicNameMatches - 特定の Java 正規表現に一致する名前を持つトピックのレコードに一致します。
  • HasHeaderKey - 指定されたキーのヘッダを持つレコードに一致します。
  • RecordIsTombstone - トゥームストーンレコード、つまり null 値を持つレコードに一致します。

各述語の構成方法の詳細を以下に示します:

org.apache.kafka.connect.transforms.predicates.HasHeaderKey
構成された名前の少なくとも1つのヘッダを持つレコードに当てはまる述語。

  • 名前

    ヘッダ名。

    Type:文字列
    デフォルト:
    有効な値:空では無い文字列
    重要:medium
org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
トゥームストーン(つまり null 値である)レコードに当てはまる述語。

org.apache.kafka.connect.transforms.predicates.TopicNameMatches
構成された正規表現に一致するトピック名を持つレコードに当てはまる述語。

  • パターン

    レコードのトピックの名前と照合するための Java 正規表現。

    Type:文字列
    デフォルト:
    有効な値:空では無い文字列、有効な正規表現
    重要:medium

REST API

Kafkaコネクトはサービスとして動作することを目的としているため、コネクタを管理するためにREST APIも提供します。REST APIサーバはlisteners 設定オプションを使って設定することができます。このフィールドは以下の形式でlistenerのリストを含まなければなりません: protocol://host:port,protocol2://host2:port2. 現在のところサポートされるプロトコルは httphttpsです。例えば:

        listeners=http://localhost:8080,https://localhost:8443
    

デフォルトでは、listenersが指定されない場合、RESTサーバがHTTPプロトコルを使ってポート8083で動きます。HTTPSを使う場合は、設定にSSL設定が含まれなければなりません。デフォルトでは、ssl.* 設定を使うでしょう。Kafkaブローカーへの接続ではなくREST APIのための異なる設定を使う必要がある場合、フィールドは listeners.httpsを前に付けることができます。プリフィックスを使う場合は、前に置かれたオプションのみが使われ、プリフィックスを持たないssl.*オプションは無視されるでしょう。以下のフィールドはREST APIのためのHTTPSを設定するために使うことができます:

  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.keystore.type
  • ssl.key.password
  • ssl.truststore.location
  • ssl.truststore.password
  • ssl.truststore.type
  • ssl.enabled.protocols
  • ssl.provider
  • ssl.protocol
  • ssl.cipher.suites
  • ssl.keymanager.algorithm
  • ssl.secure.random.implementation
  • ssl.trustmanager.algorithm
  • ssl.endpoint.identification.algorithm
  • ssl.client.auth

REST APIはユーザによってKakfaコネクトを監視/管理するためだけに使われるわけではありません。Kafkaコネクトのクラスタ間の通信にも使われます。フォロワーのノードのREST API上で受信されたリクエストは、リーダーノードのREST APIに転送されるでしょう。到達可能なホストがlistenするURIと異なる場合、設定オプション rest.advertised.host.name, rest.advertised.port および rest.advertised.listener を使ってフォロワーノードがリーダーと接続するために使用するURIを変更することができます。HTTPとHTTPSの両方のlistenerを使う場合、どのlistenerがクラスタを横断する通信のために使われるかを定義するためにrest.advertised.listenerオプションも使うことができます。ノード間の通信のためにHTTPSを使う場合、HTTPSクライアントを設定するために同じssl.* あるいは listeners.https オプションが使われるでしょう。

以下は現在ところサポートされるREST APIのエンドポイントです:

  • GET /connectors - アクティブなコネクタのリストを返します
  • POST /connectors - 新しいコネクタを返します; リクエストのボディは文字列name フィールドと、コネクタの設定パラメータを持つ config フィールドを含むJSONオブジェクトでなければなりません。
  • GET /connectors/{name} - 特定のコネクタについての情報を取得します
  • GET /connectors/{name}/config - 特定のコネクタのための設定パラメータを取得します
  • PUT /connectors/{name}/config - 特定のコネクタのための設定パラメータを更新します
  • GET /connectors/{name}/status - 実行中、失敗、中止など、どのワーカーに割り当てられているか、失敗した場合はエラー情報、全てのタスクの状態を含む、現在のコネクタの情報を取得する
  • GET /connectors/{name}/tasks - 現在のところコネクタのために動作しているタスクのリストを取得します
  • GET /connectors/{name}/tasks/{taskid}/status - 実行中、失敗、中止など、どのワーカーに割り当てられているか、失敗した場合はエラー情報を含む、タスクの現在の状態を取得する
  • PUT /connectors/{name}/pause - コネクタとそのタスクを休止します。これはコネクタが再開されるまでメッセージの処理を停止します
  • PUT /connectors/{name}/resume - 休止されたコネクタを再開します (あるいはコネクタが休止されていない場合は何もしません)
  • POST /connectors/{name}/restart - コネクタを再開します (一般的にそれが失敗したからです)
  • POST /connectors/{name}/tasks/{taskId}/restart - 個々のタスクを再開します (一般的にそれが失敗したからです)
  • DELETE /connectors/{name} - 全てのタスクを停止しその設定を削除して、コネクタを削除します
  • GET /connectors/{name}/topics - コネクタが作成されてから、あるいはアクティブなトピックのセットをリセットするリクエストが発行されてから、特定のコネクタが使っているトピックのセットを取得します
  • PUT /connectors/{name}/topics/reset - コネクタのアクティブなトピックのセットを空にするリクエストを送信します

Kafka コネクトはコネクタのプラグインについての情報を取得するREST APIも提供します:

  • GET /connector-plugins- Kafkaコネクト クラスタ内にインストールされているコネクタ プラグインのリストを返します。APIはリクエストを処理するワーカー上のコネクタのみをチェックします。新しいコネクタのjarを追加した場合特にrollingアップグレードの間に一貫性の無い結果を見るかもしれないことを意味します
  • PUT /connector-plugins/{connector-type}/config/validate - 設定の定義に対して提供された設定値を検証します。このAPIは設定の検証ごとに実施し、示唆される値と検証中のエラーメッセージを返します。

以下は、トップレベル(ルート)エンドポイントでサポートされる REST リクエストです:

  • GET /- REST リクエストを処理するコネクトワーカーのバージョン(ソースコードの git commit ID を含む)や、接続されている Kafka クラスタ ID など、Kafka コネクタクラスタに関する基本情報を返します。

コネクトでのエラー報告

Kafka コネクトは、処理の様々な段階で発生したエラーを処理するためのエラー報告を提供します。デフォルトでは、転換または変換中にエラーが発生すると、コネクタが失敗します。各コネクタ構成では、これらのエラーをスキップし、必要に応じて各エラーと失敗した操作の詳細および問題のあるレコード(様々なレベルの詳細)を Connect アプリケーションログに書き込むことで、そのようなエラーを許容できます。これらの仕組みは、シンクコネクタが Kafka トピックから消費されたメッセージを処理している時にもエラーをキャプチャします。全てのエラーは構成可能な "デッドレターキュー" (DLQ) Kafka トピックに書き込むことができます。

コネクタのコンバータ、変換、またはシンクコネクタ自体内のエラーをログに報告するには、コネクタ構成で errors.log.enable=true を設定して、各エラーと問題のレコードのトピック、パーティション、オフセットの詳細をログに記録します。追加のデバッグ目的で、errors.log.include.messages=true を設定して、問題レコードのキー、値、ヘッダーもログに記録します (これにより機密情報が記録される場合があります)。

コネクタのコンバータ、変換、あるいはシンクコネクタ自体のエラーをデッドレターキューに報告するには、errors.deadletterqueue.topic.name を設定し、オプションで errors.deadletterqueue.context.headers.enable=true を設定します。

デフォルトでは、コネクタはエラーまたは例外が発生するとすぐに "fail fast" 動作を示します。これは、次の構成プロパティをデフォルトでコネクタ構成に追加することと同じです:

        # 失敗時の再試行を無効にします
        errors.retry.timeout=0

        # エラーとそれらのコンテキストをログに記録しません
        errors.log.enable=false

        # デッドレターキュートピックにエラーを記録しません
        errors.deadletterqueue.topic.name=

        # 最初のエラーで失敗します
        errors.tolerance=none
    

これらおよびその他の関連するコネクタ構成プロパティを変更して、異なる動作を提供できます。例えば、次の構成プロパティをコネクタ構成に追加して、複数の再試行によるエラー処理を設定し、アプリケーションログと my-connector-errors Kafka トピックに記録し、レポートすることで、コネクタタスクを失敗させるのではなく、全てのエラーを許容できます:

        # 連続して失敗するまで最大30秒待機し、最大10分間再試行します
        errors.retry.timeout=600000
        errors.retry.delay.max.ms=30000

        # アプリケーションログとともにエラーコンテキストをログに記録しますが、構成とメッセージは含みません
        errors.log.enable=true
        errors.log.include.messages=false

        # エラーコンテキストを Kafka トピックに生成します
        errors.deadletterqueue.topic.name=my-connector-errors

        # 全てのエラーを許容します。
        errors.tolerance=all
    

8.3コネクタ開発ガイド

このガイドは開発者がKafkaコネクタがKafkaと他のシステム間でデータを移動するために新しいコネクタを書く方法を説明します。2,3の主要な概念を簡単に復習し、それから簡単なコネクタを生成する方法を説明します。

中核となる概念とAPI

コネクタとタスク

データをKafkaと他のシステム間でコピーするには、ユーザはデータをpullあるいはpushするシステムのためのコネクタを作成します。コネクタは2つの持ち味で供給されます: SourceConnectors は他のシステムからデータをインポートします (たとえば JDBCSourceConnector はリレーショナル データベースをKafkaにインポートするでしょう)、そしてSinkConnectors はデータをエクスポートします (たとえば HDFSSinkConnector はKafka トピックの内容をHDFSファイルにエクスポートするでしょう)。

コネクタ はそれら自身のデータのコピーを行いません: それらの設定はコピーされるデータを記述します。コネクタ はジョブをワーカーに分散することができる タスク のセットへ分割する責任があります。これらのタスクも2つの対応する持ち味で提供されます: SourceTaskSinkTask

手持ちの割り当てを使って、各タスク データのサブセットをKafkaへ、あるいはKafkaからコピーしなければなりません。Kafka コネクトでは、これらの割り当てを一貫性のあるスキーマを持つレコードから成る入力と出力ストリームのセットとして常に構成できる必要があります。このマッピングは時には明白です: ログファイルのセット中の各ファイルは、ファイル内のバイトのオフセットとして格納された同じスキーマとオフセットを使ってレコードを形成する各パースされた行を持つストリームと見なすことができます。他の場合では、このモデルにマップするためにより努力が求められるかもしれません: JDBC コネクタは各テーブルをストリームにマップすることができますが、オフセットはより不明確です。1つの可能なマッピングは、逐次的に新しいデータを返すクエリを生成するためにタイムスタンプのカラムを使い、最後にクエリされたタイムスタンプはオフセットとして使うことができます。

ストリームとレコード

各ストリームはキー-値 レコードの系列でなければなりません。キーと値の両方は複雑な構造を持つかもしれません -- 多くのprimitiveな型が提供されますが、配列、オフジェクト および入れ子の構造も同様に表現されるかもしれません。実行時のデータ形式は特定のシリアライズ化形式を仮定しません; この変換はフレームワークによって内部的に処理されます。

キーと値に加えて、レコード(ソースによって生成され、シンクに配送されるものの両方)は関連するストリームIDとオフセットを持ちます。障害時に不必要な再処理とイベントの複製を避けながら、処理が最後のコミットされたオフセットから再開できるように、処理されたデータのオフセットを定期的にコミットするためにフレームワークによってそれらが使われます。

動的なコネクタ

全てのジョブが静的ではないため、コネクタの実装は再設定が必要になるかもしれない全ての変更のために外部システムを監視する責任もあります。例えば、JDBCSourceConnectorの例では、Connector は各Taskにテーブルのセットを割り当てるかもしれません。新しいテーブルが生成される時、設定を更新することで新しいテーブルをTasksの1つに割り当てることができるように、これを発見する必要があります。再設定を必要とする変更(あるいはTasksの数の変更)に気づいた時、それはフレームワークに通知し、フレームワークは全ての対応するTasksを更新します。

簡単なコネクタの開発

コネクタの開発は2つのインタフェースの実装のみを必要とします。ConnectorTaskfile パッケージ内にKafkaのためのソースコードと一緒に簡単な例が含まれます。このコネクタはスタンドアローン モードで使うことを想定されていて、ファイルの各行を読み込みそれをレコードとして発行するためにSourceConnector/SourceTaskと、各レコードをファイルに書き込むSinkConnector/SinkTaskの実装を持ちます。

この章の残りはコネクタを作成するための主要なステップを実演するためにいくつかのコードを渡り歩くつもりですが、簡潔さのために詳細が省略されているため開発者は完全な例のソースコードも参照すべきです。

コネクタの例

簡単な例としてSourceConnectorをカバーするつもりです。SinkConnector 実装はとても似ています。SourceConnector から継承したクラスを生成することで開始し、パースされた設定情報(読み込むファイル名とデータを送信するトピック)を格納する2,3のフィールドを追加します。

    public class FileStreamSourceConnector extends SourceConnector {
        private String filename;
        private String topic;
    

代理をするための最も簡単なメソッドはtaskClass()です。これはデータを実際に読み込むためにワーカープロセス内でインスタンス化されるべきクラスを定義します:

    @Override
    public Class<? extends Task> taskClass() {
        return FileStreamSourceTask.class;
    }
    

以下でFileStreamSourceTask クラスを定義するつもりです。次に、いくつかの標準的なライフサイクル メソッド start()stop() を追加します:

    @Override
    public void start(Map<String, String> props) {
        // 完全なバージョンはエラーハンドリングも含みます。
        filename = props.get(FILE_CONFIG);
        topic = props.get(TOPIC_CONFIG);
    }

    @Override
    public void stop() {
        // バックグランドの監視が必要では無いため、何もしません。
    }
    

最後に、実装の実際の核心は taskConfigs() にあります。この場合、1つのファイルを処理しているだけです。つまりmaxTasks 引数によってより多くのタスクを生成することが許されているかもしれませんが、1つのエントリだけを持つリストを返します:

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        ArrayList<Map<String, String>> configs = new ArrayList<>();
        // ただ1つの入力ストリームだけが意味を為します。
        Map<String, String> config = new HashMap<>();
        if (filename != null)
            config.put(FILE_CONFIG, filename);
        config.put(TOPIC_CONFIG, topic);
        configs.add(config);
        return configs;
    }
    

例では使われていませんが、SourceTask もソース システム内のオフセットをコミットするための2つAPIを提供します: commitcommitRecord。APIはメッセージのための通知の仕組みを持つソース システムのために提供されます。これらのメソッドを上書きすることにより、一度メッセージがKafkaに書き込まれると、ソースコネクタはソース システムにまとめてあるいは個々にメッセージを通知することができます。commit API はソースシステム内にpollによって返されたオフセットまでオフセットを格納します。このAPIの実装はコミットが完了するまでブロックしなければなりません。Kafkaに書き込まれた後で、commitRecord API は各SourceRecord についてソース システム内のオフセットを保存します。Kafkaコネクトは自動的にオフセットを記録するため、SourceTaskはそれらを実装する必要はありません。コネクタがソースシステム内のメッセージを通知する必要が無い場合、APIのうちの1つだけが一般的に必要とされます。

複数のタスクであっても、このメソッドの実装は通常とても単純です。単に入力タスクの数を決定する必要があります。これはデータを取り出してきたリモートのサービスへの連絡を必要とするかも知れず、従ってそれらを分け合います。作業をタスクに分割する幾つかのパターンはとても一般的なため、これらの問題を単純にするためにいくつかのユーティリティがConnectorUtilsで提供されます。

この単純な例は動的な入力を含まないことに注意してください。タスクの設定への更新がどうやって起動するかについては次の章の議論をみてください。

タスクの理恵 - ソース タスク

次に、対応する SourceTaskの実装を説明するつもりです。実装は短いですが、このガイド内で完全にカバーするにはあまりに長いです。実装のほとんどを説明するために疑似コード使うつもりですが、完全な例についてはソースコードを参照することができます。

コネクタの時と全く同じように、適切な基本Task クラスから継承しているクラスを作成する必要があります。それも幾つかの標準的なライフサイクルのメソッドを持ちます:

    public class FileStreamSourceTask extends SourceTask {
        String filename;
        InputStream stream;
        String topic;

        @Override
        public void start(Map<String, String> props) {
            filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
            stream = openOrThrowError(filename);
            topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
        }

        @Override
        public synchronized void stop() {
            stream.close();
        }
    

これらはわずかに単純化されたバージョンですが、これらのメソッドが比較的単純でなければならず、それらが実施すべき唯一の仕事はリソースの割り当てあるいは解放であることを示します。この実装について注意する2つのポイントがあります。最初に、start() メソッドはまだ以前のオフセットからの再開を処理しません。これは後の章で説明されるでしょう。次に、stop() メソッドは同期されます。SourceTasks は無期限にブロックすることができる専用スレッドが与えられているため、これが必要でしょう。つまり、それらはワーカー内の別のスレッドからの呼び出しを使って停止される必要があります。

次に、タスクの主要な機能を実装します。入力システムからイベントを取得し、List<SourceRecord>を返すpoll()メソッド:

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        try {
            ArrayList<SourceRecord> records = new ArrayList<>();
            while (streamValid(stream) && records.isEmpty()) {
                LineAndOffset line = readToNextLine(stream);
                if (line != null) {
                    Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                    Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                    records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
                } else {
                    Thread.sleep(1);
                }
            }
            return records;
        } catch (IOException e) {
            // Underlying stream was killed, probably as a result of calling stop. Allow to return
            // null, and driving thread will handle any shutdown if necessary.
        }
        return null;
    }
    

また幾つかの詳細を省略しましたが、重要なステップを見ることができます: poll() メソッドは繰り返し呼ばれ、各呼び出しについてファイルからレコードを読み込みを繰り返すでしょう。読み込む各行について、ファイルのオフセットも追跡します。情報の4つの構成要素を持つ出力SourceRecord を生成するためにこの情報を使います: ソース パーティション (1つだけ存在し、読み込まれる1つのファイル)、ソース オフセット (ファイル内のバイトのオフセット)、出力トピック名、そして出力値 (行。この値が常に文字列であることを示すスキーマを含みます)。SourceRecordコンストラクタの他の変種も特定の出力パーティション、キーおよびヘッダを含むことができます。

この実装は通常のJava InputStream インタフェースを使用し、データが利用可能ではない場合はsleepするかもしれないことに注意してください。Kafkaコネクトは専用のスレッドを使って各タスクを提供するため、これは許容されます。タスクの実装は基本的なpoll()インタフェースに準拠する必要がありますが、それらがどのように実装されるかについて多くの柔軟性を持ちます。この場合、NIOに基づいた実装は最も効果的でしょうが、この単純なやり方は動作し、実装が速く、Javaの古いバージョンと互換性があります。

シンク タスク

以前の章は単純な SourceTaskを実装する方法を説明しました。SourceConnectorSinkConnectorとは異なり、SourceTask はpullインタフェースを使用しSinkTask はpushインタフェースを使用するため、SourceTaskSinkTask はとても異なるインタフェースを持ちます。両方とも共通のライフサイクル メソッドを共有しますが、SinkTask インタフェースはとても異なります:

    public abstract class SinkTask implements Task {
        public void initialize(SinkTaskContext context) {
            this.context = context;
        }

        public abstract void put(Collection<SinkRecord> records);

        public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        }
    

SinkTask ドキュメントは完全な詳細を含みますが、このインタフェースはSourceTaskとよく似て単純です。put() メソッドは、SinkRecordsのセットを受け付け、必要な全ての変換を行い、宛先のシステム内にそれらを格納する実装のほとんどを含む必要があります。このメソッドは返す前にデータが完全に宛先のシステムに書き込まれることを確実にする必要はありません。実際、多くの場合で内部的なバッファリングは便利なため、レコードの全体のバッチは一度に送ることができ、イベントをダウンストリームのデータストアに挿入するオーバーヘッドを減らします。SinkRecordsSourceRecordsとして本質的に同じ情報を含みます: Kafkaトピック, パーティション, オフセット, イベント キーと値、および任意のヘッダ。

flush() メソッドはオフセットのコミット処理の間に使われます。これによりイベントが紛失されないようにタスクは障害から回復しセーフポイントから回復することができます。メソッドは未解決の全てのデータを宛先のシステムにpushする必要があり、書き込みが通知されるまでブロックします。offsets パラメータはしばしば無視することができますが、確実に1回の配送を提供するために宛先のストア内のオフセット情報を格納したい実装の場合は便利です。例えば、HDFSコネクタはこれを行い、flush()オペレーションが自動的にデータとHDFS内の最終的な場所へのオフセットをコミットするようにするためにatomicな移動オペレーションを使うことができます。

エラー記録レポータ

コネクタで error reporting が有効になっている場合、コネクタは ErrantRecordReporter を使って、シンクコネクタに送信された個々のレコードの問題を報告できます。以下の例は、コネクタの SinkTask サブクラスが ErrantRecordReporter を取得して使う方法を示しています。DLQ が有効になっていない場合、またはコネクタがこのレポータ機能を持たない古いコネクタランタイムにインストールされている場合、null レポータを安全に処理します:

        private ErrantRecordReporter reporter;

        @Override
        public void start(Map<String, String> props) {
            ...
            try {
                reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
            } catch (NoSuchMethodException | NoClassDefFoundError e) {
                // Will occur in Connect runtimes earlier than 2.6
                reporter = null;
            }
        }

        @Override
        public void put(Collection<SinkRecord> records) {
            for (SinkRecord record: records) {
                try {
                    // attempt to process and send record to data sink
                    process(record);
                } catch(Exception e) {
                    if (reporter != null) {
                        // Send errant record to error reporter
                        reporter.report(record, e);
                    } else {
                        // There's no error reporter, so fail
                        throw new ConnectException("Failed on record", e);
                    }
                }
            }
        }
    
以前のオフセットからの回復

SourceTaskの実装は各レコードを使ってストリームID (入力ファイル名)とオフセット(ファイル内の位置)を含みます。フレームワークは障害時にタスクが回復し、再処理され、ひょっとすると重複するイベントの数を最小化できるように (あるいは、例えばスタンドアローンモード、あるいはジョブの再設定のために、Kafkaコネクトが穏やかに停止した場合は最も最近のオフセットから再開)、オフセットを定期的にコミットするためにこれを使います。このコミット プロセスはフレームワークよって完全に自動化されますが、コネクタのみが位置から回復するために入力ストリーム内の正しい位置への戻りを探す方法を知っています。

開始時に正しく回復するために、タスクはオフセットデータへアクセスするためのinitialize()メソッドに渡されたSourceContextを使うことができます。initialize()の中で(存在するのであれば)オフセットを読み込み、その位置を探すために少し多くのコードを追加するでしょう:

        stream = new FileInputStream(filename);
        Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
        if (offset != null) {
            Long lastRecordedOffset = (Long) offset.get("position");
            if (lastRecordedOffset != null)
                seekToOffset(stream, lastRecordedOffset);
        }
    

もちろん、入力ストリームのそれぞれについて多くのキーを読み込む必要があるかもしれません。OffsetStorageReader インタフェースにより、効率的に全てのオフセットを読み込むためにbulk読み込みを発行し、そして各入力ストリームを調べることで適切な位置に適用することができます。

動的な 入力/出力ストリーム

Kafkaコネクトは、各テーブルを個々にコピーするために多くのジョブを生成するよりデータベース全体をコピーするようなbulkデータのコピージョブを定義することを目的としています。この設計の1つの帰結は、コネクタのための入力あるいは出力ストリームのセットが時間が経てば変わるかもしれないということです。

ソース コネクタは変更のためにソースシステムを監視する必要があります。例えばデータベース内のテーブルの追加/削除。変更を捕らえた時に、それらは再構成が必要なConnectorContext オブジェクトを使ってフレームワークに知らせる必要があります。例えば、SourceConnectorの中で:

        if (inputsChanged())
            this.context.requestTaskReconfiguration();
    

フレームワークはタスクを再構成する前にそれらの進捗をgracefullyにコミットしながら、すぐに新しい設定情報を要求しタスクを更新するでしょう。SourceConnectorの中でこの監視は現在のところコネクタの実装に任されていることに注意してください。この監視に余分なスレッドが必要とされる場合、コネクタは自身でそれを割り当てる必要があります。

理想的には変化の監視のためのこのコードはConnectorに隔離されていて、タスクはそれらについて心配する必要は無いでしょう。しかし、最も一般的に入力ストリームのうちの1つが入力ストリーム内で破棄、例えばもしテーブルがデータベースから削除される場合された時に、変更はタスクにも影響するかもしれません。TaskConnectorの前に問題に遭遇した場合、これはConnector が変更のためにpollする必要がある場合には一般的です、 Task はその後のエラーを処理する必要があるでしょう。ありがたいことに、これは通常単純に適切な例外をキャッチおよび処理することで処理することができます。

SinkConnectorsは通常ストリームの追加のみを処理する必要があります。これはそれらの出力(例えば、新しいデータベース テーブル)への新しいエントリに翻訳するでしょう。フレームワークは、入力トピックのセットが正規表現の購読のために変更になる場合などの、Kafkaの入力への全ての変更を管理します。SinkTasks は新しい入力ストリームを期待すべきです。入力ストリームはデータベース内の新しいテーブルのようなダウンストリーム内の新しいリソースの生成を必要とするかもしれません。これらの問題で最もやりにくい状況は、始めて新しい入力ストリームを見て同時に新しいリソースを生成しようとする複数のSinkTasks間で衝突するかもしれません。一方で、SinkConnectorsは一般的にストリームの動的なセットを処理するための特別なコードを必要としないでしょう。

コネクトの設定の検証

Kafkaコネクトを使って、実行されるコネクタをサブミットする前にコネクタの設定を検証することができ、Kafkaコネクトはエラーについてフィードバックと推奨される値を提供することができます。これを利用するために、コネクタの開発者は設定定義をフレームワークに公開するためのconfig()の実装を提供する必要があります。

FileStreamSourceConnector内の以下のコードは設定を定義し、それをフレームワークに公開します。

        private static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
            .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");

        public ConfigDef config() {
            return CONFIG_DEF;
        }
    

ConfigDef クラスは予想される設定のセットを指定するために使われます。各設定について、名前、型、デフォルトの値、ドキュメント、グループ情報、グループ内の順番、設定値の幅 およびUIでの表示に適した名前を指定することができます。それに加えて、Validator クラスを上書きすることで1つの設定の検証に使われる特別な検証ロジックを提供することができます。更に、設定間で設定間で依存があるかもしれません。例えば有効な値と設定の可視性が他の設定の値に応じて変更されるかもしれません。これを扱うために、ConfigDefを使って設定の依存を指定し、有効な値を取得し現在の設定値に与えられた設定の可視性を設定するために Recommender の実装を提供することができます。

また、Connector内のvalidate()メソッドは各設定について設定エラーと推奨値と一緒に許される設定のリストを返すデフォルトの検証実装を提供します。しかし、設定の検証のために推奨された値を使わないでしょう。カスタマイズされた設定検証のためのデフォルトの実装を上書きを提供するかもしれません。これは推奨値を使うかもしれません。

スキーマとの連携

FileStream コネクタは単純なため良い例ですが、つまらない構造化データも持ちます -- 各行は単なる文字列です。ほとんどすべての現実的なコネクタはもっと複雑なデータ形式を持つスキーマを必要とするでしょう。

もっと複雑なデータを作成するには、Kafkaコネクトdata APIと連携する必要があるでしょう。ほとんどの構造化レコードはprimitiveな型に加えて2つのクラスとやり取りをする必要があるでしょう: SchemaStruct

APIドキュメントは完全な参照を提供しますが、ここでは SchemaStructを生成する簡単な例です:

    Schema schema = SchemaBuilder.struct().name(NAME)
        .field("name", Schema.STRING_SCHEMA)
        .field("age", Schema.INT_SCHEMA)
        .field("admin", SchemaBuilder.bool().defaultValue(false).build())
        .build();

    Struct struct = new Struct(schema)
        .put("name", "Barbara Liskov")
        .put("age", 75);
    

ソース コネクタを実装している場合、いつそしてどうやってスキーマを作成するかを決める必要があるでしょう。可能であれば、可能な限りそれらを再計算することを避けるべきです。例えば、もしコネクタが固定のスキーマを持つことを保証される場合、それを静的に作成し1つのシングルトンを再利用します。

しかし、多くのコネクタは動的なスキーマを持つでしょう。これの簡単な例の1つはデータベース コネクタです。たった一つのテーブルだけでも考えると、(テーブルからテーブルに変わるため)スキーマはコネクタ全体について事前定義されないでしょう。しかし、ユーザは ALTER TABLE コマンドを実行するかもしれないため、コネクタのライフタイムを超えて1つのテーブルについて固定されないかもしれません。コネクタはこれらの変更を検知し適切に反応しなければなりません。

シンク コネクタはデータを消費し従ってスキーマを生成する必要がないため、通常は単純です。しかし、受信したスキーマが期待する形式を持っているかを検証するために十分注意する必要があります。スキーマが一致しない場合 -- 通常はアップストリーム プロデューサを示すものが宛先のシステムへ正しく翻訳することができない無効なデータを生成します -- シンク コネクタはこのエラーをシステムに示すために例外を投げるべきです。

Kafka コネクタの管理

Kafka コネクタの REST layer はクラスタの管理を有効にするAPIのセットを提供します。これはコネクタの設定とそれらのタスクの状態を表示し、それらの現在の挙動を変更する(例えば設定の変更とタスクの再起動)ためのAPIを含みます。

コネクタが最初にクラスタに送信される時、新しいコネクタのタスクで構成される負荷を分散するために、コネクト ワーカー間でリバランスが引き起こされます。コネクタが取得するタスクの数を増加あるいは減少する時、あるいはコネクタの設定が変更される時に、またはコネクタ クラスタの意図的なアップグレードの一部としてあるいは失敗時にワーカーが追加あるいは削除される時にも、この同じリバランスの手順が使われます。

2.3.0 より前のバージョンでは、コネクト ワーカーは、各ワーカーがほぼ同じ量の作業を行えるようにする簡単な方法として、コネクタの完全なセットとクラスタ内のそれらのタスクをリバランスしていました。この挙動はconnect.protocol=eagerを設定する事でまだ有効にすることができます。

2.3.0 から、Kafkaのコネクタはデフォルトでコネクタワーカー全体でコネクタとタスクのバランスを段階的に調整する増分協調バランシングを行うプロトコルを使用し、新しい、削除される、あるいは1つのワーカーから他へ移動する必要があるタスクだけに影響します。他のタスクは、古いプロトコルのように、リバランスの間に停止および再起動はされません。

コネクト ワーカーが意図的にあるいは障害によってグループを離れると、コネクタはリバランスを引き起こす前に scheduled.rebalance.max.delay.msの間待機します。この遅延のデフォルトは5分 (300000ms)で、ワーカーの出発の負荷をすぐに再分配することなく、障害あるいはワーカーのアップグレードを許容します。このワーカーが設定された遅延の間に返る場合、以前に割り振られたタスクを完全に取得します。ただし、これはscheduled.rebalance.max.delay.ms で指定された時間が経過するまで、タスクが割り当てられないままになることを意味します。ワーカーがその時間制限の間に返らない場合、コネクタはこれらのタスクをコネクタ クラスタ内の残りのワーカー間で再割り当てします。

コネクタ クラスタを形成する全てのワーカーがconnect.protocol=compatibleを使って設定される場合、新しいコネクタ プロトコルが有効にされます。これは、このプロパティが無い場合のデフォルト値でもあります。従って、新しいコネクタ プロトコルへのアップグレードは、全てのワーカーが 2.3.0 にアップグレートされた場合に自動的に起こります。コネクタ クラスタのローリング アップグレードは、最後のワーカーがバージョン 2.3.0 に参加した時に、増分協調バランシングをアクティブにします。

REST APIを使って、コネクタとそのタスクの現在の状態、それらが割り当てられたワーカーのIDを含む、を表示することができます。例えば、GET /connectors/file-source/status リクエストは、file-sourceという名前のコネクタの状態を表示します:

    {
    "name": "file-source",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.1.208:8083"
    },
    "tasks": [
        {
        "id": 0,
        "state": "RUNNING",
        "worker_id": "192.168.1.209:8083"
        }
    ]
    }
    

コネクタとそれらのタスクはステータスの更新をクラスタ内の全てのワーカーが監視する共有トピック(status.storage.topicを使って設定されます)に公開します。ワーカーはこのトピックを非同期に消費するため、状態の変更が状態APIを使って見える前に一般的に(短い)遅延があります。コネクタあるいはそのタスクの1つについて、以下の状態があり得ます:

  • UNASSIGNED: コネクタ/タスクはまだワーカーに割り当てられていない。
  • RUNNING: コネクタ/タスクが実行中。
  • PAUSED: コネクタ/タスクが管理上停止されている。
  • FAILED: コネクタ/タスクが失敗した(通常は例外を上げることで行われます。これは状態出力の中で報告されます)。
  • DESTROYED: コネクタ/タスクは管理上削除され、コネクタ クラスタに表示されなくなります。

ほとんどの場合において、コネクタとタスクの状態は一致しますが、それらは変更が発生している時、あるいはタスクが失敗した場合に、短い時間の間異なるかもしれません。例えば、コネクタが最初に開始された時、コネクタとそのタスクが全てRUNNING状態に遷移する前に知覚可能な遅延があるかもしれません。コネクタは自動的に失敗したタスクを再起動しないため、タスクが失敗した時にも状態が分岐するでしょう。コネクタ/タスクを手動で再起動するために、上でリスト化された再起動APIを使うことができます。リバランスが発生している間にタスクを再起動しようとすると、コネクタは409(衝突)状態コードを返すだろうことに注意してください。リバランスが完了した後で再試行することができますが、リバランスは効率的にクラスタ内の全てのコネクタとタスクを再起動するため、必要無いかもしれません。

2.5.0 以降、Kafka コネクトは status.storage.topic を使って、各コネクタが使っているトピックに関連する情報も格納します。コネクトワーカーは、これらのコネクタごとのトピックステータス更新を使って、コネクタが使っているトピック名のセットを返すことにより、REST エンドポイント GET /connectors/{name}/topics へのリクエストに応答します。REST エンドポイント PUT /connectors/{name}/topics/reset へのリクエストにより、コネクタのアクティブトピックのセットがリセットされ、コネクタの最新のトピックパターンに基づいて、新しいセットが入力されます。コネクタを削除すると、コネクタのアクティブなトピックのセットも削除されます。トピックの追跡はデフォルトで有効になっていますが、topic.tracking.enable=false を設定することで無効にできます。実行時にコネクタのアクティブなトピックをリセットする要求を許可しない場合は、ワーカープロパティ topic.tracking.allow.reset=false を設定します。

一時的にコネクタのメッセージ処理を停止するために時には便利です。例えば、もしリモートシステムがメンテナンス中の場合、ログを例外のスパムで一杯にする代わりにソースコネクタが新しいデータのためにpollするのを停止することが望ましいでしょう。このユースケースのために、コネクトは休止/再開APIを提供します。ソース コネクタが休止している間、コネクタは追加のレコードのためのpollを停止するでしょう。シンク コネクタが休止している間、コネクタは新しいメッセージをpushすることを停止するでしょう。休止状態は永続的なもので、もしクラスタを再起動したとしても、コネクタはタスクが再開されるまでメッセージの処理を開始しないでしょう。休止される時に途中の処理が完了するまで時間が掛かるため、コネクタのタスクの全てがPAUSED状態に遷移する前に遅延があることに注意してください。更に、失敗したタスクはそれらが再起動されるまでPAUSED状態に遷移されないでしょう。

TOP
inserted by FC2 system