<

ドキュメント

Kafka 2.7 Documentation

以前のリリース: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X.

5. 実装

5.1ネットワーク層

ネットワーク層はかなり単純なNIOサーバであり、あまり詳細には説明はしないつもりです。sendfileの実装はMessageSet インタフェースにwriteToメソッドを指定することで行われます。これにより、ファイルが後方にあるメッセージは、実行中のバッファされた書き込みの代わりにもっと効率的なtransferTo 実装を使うようにすることができます。スレッドモデルは、1つのアクセプタ スレッドと、固定数の接続をそれぞれ処理するN 個の処理スレッドです。この設計はあちこちで 徹底的にテストされていて、実装し易く高速であることが知られています。他の言語でクライアントの更なる実装ができるように、プロトコルは極めて単純にされています。

5.2メッセージ

メッセージは可変長のヘッダ、可変長の不透明なキーのバイト配列、および可変長の不透明な値のバイト配列からなります。ヘッダのフォーマットは以下の章で説明されます。キーと値を不透明なままにしておくことは良い選択です: シリアライズ化ライブラリにおいて現在かなりの進歩があり、特定の選択肢は全ての用途に適しているとは言えません。Kafkaを使っている特定のアプリケーションは言うまでもなくその使用法の一部として独特のシリアライズ タイプを委任されるでしょう。RecordBatch インターフェースはNIOChannelのバルク読み込みと書き込みのために特化したメソッドを使った単純なメッセージ上のイテレータです。

5.3メッセージ形式

メッセージ (または レコード) は常にバッチ内で書き込まれます。メッセージのバッチの技術的な用語はレコードバッチで、レコードバッチは1つ以上のレコードを含みます。縮退した場合では、1つのレコードを含むレコードバッチがあります。レコードバッチとレコードは独自のヘッダを持ちます。それぞれの形式が以下で説明されます。

5.3.1レコードバッチ

以下はレコードバッチのディスク上の形式です。

		baseOffset: int64
		batchLength: int32
		partitionLeaderEpoch: int32
		magic: int8 (current magic value is 2)
		crc: int32
		attributes: int16
			bit 0~2:
				0: no compression
				1: gzip
				2: snappy
				3: lz4
				4: zstd
			bit 3: timestampType
			bit 4: isTransactional (0 means not transactional)
			bit 5: isControlBatch (0 means not a control batch)
			bit 6~15: unused
		lastOffsetDelta: int32
		firstTimestamp: int64
		maxTimestamp: int64
		producerId: int64
		producerEpoch: int16
		baseSequence: int32
		records: [Record]

圧縮が有効な場合、圧縮されたレコードデータはレコードの数に続いて直接シリアライズ化されます。

CRCは属性からバッチの終わりまでのデータをカバーします (つまり、CRCに続く全てのバイト)。それはマジックバイトの後に位置します。それはクライアントはバッチの長さとマジックバイトの間のバイトをどうやって解釈するかを決める前にマジックバイトをパースする必要があることを意味します。このフィールドがブローカーによって受け取られる各バッチによって割り当てられる時に、CRCを再計算する必要を避けるために、CRCの計算内にパーティションリーダーのepochフィールドは含まれません。計算にはCRC-32C (Castagnoli) 多項式が使われます。

圧縮時: 古いメッセージ形式と異なり、magic v2 以上では、ログが掃除される時に元のバッチからの最初と最後のオフセット/シーケンス番号が保持されます。これはログがリロードされる時にプロデューサの状態を回復することができるようにするために必要です。例えば、最後のシーケンス番号を維持しないとした場合、パーティションリーダーが失敗するとプロデューサは OutOfSequence エラーを見るかもしれません。基本のシーケンス番号は重複チェックのために維持されなければなりません (ブローカーは、入ってくるバッチの最初と最後のシーケンス番号がプロデューサからの最後のものと一致することを検証することで、入ってくるプロデューサのリクエストを重複に関してチェックします)。結果として、バッチ内の全てのレコードが掃除されたが、プロデューサの最後のシーケンス番号を保持するためにバッチがまだ維持されている場合、ログ内に空のバッチを持つことができます。ここで1つ風変りな事は、圧縮の間 firstTimestamp フィールドは維持されないということです。つまり、バッチ内の最初のレコードが圧縮されれば変更されるでしょう。

5.3.1.1制御バッチ

制御バッチは制御レコードと呼ばれる1つのレコードを含みます。制御レコードはアプリケーションに渡されるべきではありません。代わりに、それらは破棄されたトランザクションメッセージを除外するために、コンシューマによって使われます。

制御レコードのキーは以下のスキーマに準拠します:

       version: int16 (現在のバージョンは0です)
       type: int16 (0は破棄マーカーを示し、1はコミットを示します)

制御レコードの値についてのスキーマはtypeに依存します。値はクライアントに対して不透明です。

5.3.2レコード

レコード レベルのヘッダーはKafka 0.11.0 で導入されました。ヘッダーを持つレコードのディスク上での形式を以下で説明します。

		length: varint
		attributes: int8
			bit 0~7: unused
		timestampDelta: varint
		offsetDelta: varint
		keyLength: varint
		key: byte[]
		valueLen: varint
		value: byte[]
		Headers => [Header]

5.3.2.1レコード ヘッダ

		headerKeyLength: varint
		headerKey: String
		headerValueLength: varint
		Value: byte[]

Protobufとして同じ変換エンコーディングを使います。後者についての詳細はここで見つけることができます。レコード内のヘッダーの数も変形としてエンコードされます。

5.3.3古いメッセージ形式

Kafka 0.11 より前では、メッセージは変換され、メッセージ セットの中に格納されました。メッセージ セットの中で、各メッセージは独自のメタデータを持ちます。メッセージセットは配列として表されますが、プロトコルの他の配列要素のようにint32の配列サイズが前にあることはありません。

メッセージ セット:

    MessageSet (Version: 0) => [offset message_size message]
        offset => INT64
        message_size => INT32
        message => crc magic_byte attributes key value
            crc => INT32
            magic_byte => INT8
            attributes => INT8
                bit 0~2:
                    0: no compression
                    1: gzip
                    2: snappy
                bit 3~7: unused
            key => BYTES
            value => BYTES

    MessageSet (Version: 1) => [offset message_size message]
        offset => INT64
        message_size => INT32
        message => crc magic_byte attributes timestamp key value
            crc => INT32
            magic_byte => INT8
            attributes => INT8
                bit 0~2:
                    0: no compression
                    1: gzip
                    2: snappy
                    3: lz4
                bit 3: timestampType
                    0: create time
                    1: log append time
                bit 4~7: unused
            timestamp => INT64
            key => BYTES
            value => BYTES

Kafka 0.10より前のバージョンでは、サポートされるメッセージの形式のバージョン (これはマジック値によって示されます)は0でした。メッセージ形式のバージョン 1 はバージョン 0.10 でタイムスタンプのサポートと共に導入されました。

  • バージョン2以上と似て、属性の最下位ビットは圧縮タイプを表します。
  • バージョン1では、プロデューサは常にタイムスタンプ型のビットを0にしなければなりませんでした。トピックがログの追記時間を使うように設定されている場合、(ブローカーのレベル設定 log.message.timestamp.type = LogAppendTime あるいはトピックのレベル設定 message.timestamp.type = LogAppendTime)、ブローカーはタイムスタンプの型とメッセージセット中のタイムスタンプを上書きするでしょう。
  • 属性の最上位ビットは0に設定されなければなりません。

メッセージ形式のバージョン0および1では、Kafkaは圧縮を有効にするために再帰的なメッセージをサポートします。この場合、メッセージの属性は圧縮型の1つを示すように設定されなければならず、値のフィールドはその型で圧縮されたメッセージセットを含むでしょう。ネストされたメッセージを "内部メッセージ"、ラップされたメッセージを "外部メッセージ" と呼びます。キーは外部メッセージについてはnullでなければならず、そのオフセットは最後の内部メッセージのオフセットでしょう。

再帰バージョン 0のメッセージを受信する時、ブローカーはそれらを解凍し、それぞれの内部メッセージは個々にオフセットを割り当てられます。バージョン1では、サーバ側での再圧縮を避けるために、ラップ メッセージだけがオフセットを割り当てられるでしょう。内部メッセージは相対的なオフセットを持つでしょう。絶対的なオフセットは外部メッセージからのオフセットを使って計算することができます。これは最後の内部メッセージに割り当てられたオフセットに対応します。

crc フィールドはその後に続くメッセージバイト(マジック バイトからその値へ)の CRC32 (CRC-32C では無い) を含みます。

5.4ログ

2つのパーティションを持つ"my_topic"という名前のトピックについてのログは、トピックについてのメッセージを含むデータファイルがある2つのディレクトリから成ります (すなわち、my_topic_0my_topic_1)。ログファイルの形式は "log entries" の系列です; 各ログエントリはNメッセージのバイトが続くメッセージの長さを格納する4バイトの整数N です。各メッセージはパーティション上のトピックにこれまで送信された全てのメッセージのストリーム内でのこのメッセージの開始位置のバイトを与える 64-ビットの整数のoffsetによって一意に識別されます。各メッセージのディスク上での形式は以下で示されます。各ログファイルはそれが含まれる最初のメッセージのオフセットを使って名前を付けられます。つまり、最初に作成されるファイルは 00000000000.kafka でしょう。そしてそれぞれ追加のファイルは設定で指定される最大のログファイルサイズが S の場合に以前のファイルからおよそ S バイトの整数の名前を持つでしょう。

レコードのための正確なバイナリ形式はバージョン管理され、標準インタフェースとして維持されるので、レコードバッチは必要な時に再コピーあるいは変換無しにプロデューサ、ブローカー およびクライアント間で転送することができます。前の章はレコードのディスク上の形式についての詳細を含んでいました。

メッセージidとしてのメッセージのオフセットの使用は一般的ではありません。私たちの元の考えは、プロデューサによって生成されたGUIDを使い、各ブローカー上でのオフセットのためにGUIDからのマッピングを維持するものでした。しかし、コンシューマは各サーバについてIDを維持する必要があるため、GUIDのグローバルでの一意性は何ももたらしません。更に、ランダムなidからオフセットへのマッピングの維持の複雑さは、本質的に完全な一貫性を持つランダムアクセスのデータ構造を必要とする、ディスクと同期される必要がある重いインデックス構造を必要とします。したがって、構造の検索を簡単にするために、メッセージを一意に識別するためのパーティションidとノードidを連結することができる単純なパーティションごとのアトミックなカウンタを使うことを決めました。しかしいったんカウンタを決定すると、オフセットを使った直接ジャンプは当然のように思えました — 結局パーティションでユニークな整数が単調に増加しています。オフセットはコンシューマAPIからは隠されているため、この決定は最終的に実装の詳細であり、もっと効率的なやり方を伴いました。

書き込み

このログは、常に最後のファイルに移動する連続的な追加を許可します。このファイルは設定可能なサイズ(例えば1GB)に達すると、新しいファイルにロールオーバーされます。ログは2つの設定パラメータを取ります: M はOSにファイルをディスクにフラッシュを強制する前に書き込むメッセージの数を指定し、S はフラッシュが強制された後の秒数を指定します。これはシステムがクラッシュした時に、最大 M 個のメッセージ、あるいは S 秒のデータが失われることの耐久性の保証をします。

読み込み

読み込みはメッセージの64-ビットの論理オフセットとS-バイトの最大チャンクサイズを指定することで行われます。これはS-バイトのバッファ内に含まれるメッセージ上のイテレータを返すでしょう。S はどの1つのメッセージよりも大きくなるように意図されていますが、以上に大きなメッセージの場合には、メッセージの読み込みが成功するまで毎回バッファサイズを2倍にし、読み込みを複数回試行することができます。サーバがあるサイズより大きいメッセージを拒否し、完全なメッセージを取得するために読み込まなければならない最大サイズについてクライアントに制限をつけるために、最大メッセージサイズとバッファサイズを指定することができます。読み込みバッファが部分的なメッセージで終わりがちですが、これはサイズの区切りによって容易に検出されます。

オフセットから読み込む実際のプロセスでは、まずデータが格納されたログ セグメント ファイルの場所を特定し、グローバル オフセット値からファイル固有のオフセットを計算し、そのファイルオフセットから読み込みます。検索は各ファイルについて保持されるメモリ内の範囲に対して単純な二分木検索の派生物として行われます。

ログは、クライアントが"right now"として購読を開始できるように最も最近に書かれたメッセージを取得できる機能を提供します。コンシューマがデータを SLAの指定日数内で消費するのに失敗した場合にも便利です。この場合、クライアントが存在しないオフセットを消費しようとすると、OutOfRangeException が与えられ、ユースケースに応じて自身を再設定するか、失敗するかもしれません。

以下はコンシューマに送信される結果のフォーマットです。

    MessageSetSend (fetch result)

    total length     : 4 bytes
    error code       : 2 bytes
    message 1        : x bytes
    ...
    message n        : x bytes
    MultiMessageSetSend (multiFetch result)

    total length       : 4 bytes
    error code         : 2 bytes
    messageSetSend 1
    ...
    messageSetSend n

削除

データは一度に1つのログセグメントを削除されます。ログマネージャは、削除の対象となるセグメントを識別するために2つのメトリックスを適用します: 時間とサイズ。時間ベースのポリシーの場合、レコードタイムスタンプが考慮され、セグメントファイル内の最大のタイムスタンプ(レコードの順番は関係ありません)がセグメント全体の保持期間を定義します。サイズベースの保持はデフォルトで無効です。サイズベースを有効にすると、ログマネージャはパーティション全体のサイズが構成された制限内に戻るまで、最も古いセグメントファイルを削除し続けます。もし両方のポリシーが同時に有効にされると、どちらかのポリシーによって削除の対象となるセグメントが削除されます。セグメント リストを変更する削除を許可しながら、読み取りのロックを回避するために、一貫性のあるビューを提供する copy-on-write 形式のセグメント リストの実装を使用して、削除の進行中にログ セグメントの不変な静的スナップショット ビューでバイナリ検索を続行できるようにします。

保証

ログはディスクにフラッシュを強制する前に書き込まれるメッセージの最大数を制御する設定パラメータM を提供します。起動時に最新のログセグメント内の全てのメッセージを繰り返し処理し、各メッセージエントリが有効であることを検証するログ回復プロセスが実行されます。メッセージエントリは、そのサイズとオフセットの合計がファイルの長さよりも小さく、メッセージのペイロードのCRC32がメッセージに格納されたCRCに合致する場合に有効です。衝突が検出された場合、ログは最後に有効だったオフセットに切り詰められます。

2種類の衝突が処理されなければならないことに注意してください: クラッシュにより紛失した書き込まれていないブロックの切り詰め、ファイルに無意味なブロックが追加されたことによる衝突。これは、一般的にOSはファイルのinodeと実際のブロックデータの間で書き込みの順番の保証をしないため、もしinodeが新しいサイズで更新されたがデータを含むブロックを書き込む前にクラッシュした場合に、更に書き込まれたデータの喪失に加えてファイルは意味の無いデータを取得するかもしれません。CRCはこの人目の届かないケースを検出し、ログを衝突から防ぎます(しかし、書き込まれていないメッセージはもちろん紛失されます)。

5.5分散

コンシューマのオフセットの追跡

Kafkaコンシューマは各パーティションで消費した最大のオフセットを追跡し、再起動時にこれらのオフセットから再開できるようにオフセットをコミットする機能を持ちます。Kafkaはグループコーディネータと呼ばれる(そのグループの)指定ブローカー内に指定コンシューマグループの全てのオフセットを保存するオプションがあります。つまり、そのコンシューマグループ内の全てのコンシューマインスタンスは、グループコーディネータ(ブローカー)にオフセットとコミットを送信する必要があります。コンシューマグループはそれらのグループ名に基づいてコーディネータに割り当てられます。コンシューマは、FindCoordinatorRequest をKafkaブローカーに発行して、コーディネータの詳細を含む FindCoordinatorResponse を読み込むことで、コーディネータを検索できます。コンシューマはコーディネータ ブローカーからオフセットをコミットまたはフェッチすることができます。コーディネータが移動した場合、コンシューマはコーディネータを再び見つける必要があります。オフセット コミットはコンシューマ インスタンスによって自動的あるいは手動で行うことができます。

グループ コーディネータが OffsetCommitRequest を受け取ると、それはリクエストを特別な 圧縮された __consumer_offsetsという名前のKafkaトピックに追加します。ブローカーは、オフセットトピックの全てのレプリカがオフセットを受け取った時のみ、成功のオフセットコミット応答をコンシューマに送信します。オフセットが設定可能なタイムアウト内でレプリケートに失敗した場合は、オフセットコミットは失敗しコンシューマはバックオフの後でコミットを再試行するかもしれません。このブローカーはパーティション毎に最も最近のオフセットだけを維持する必要があるため、このブローカーは定期的にオフセットを圧縮します。コーディネータはオフセットの取得を素早く提供するために、メモリ内のテーブルの中にオフセットのキャッシュも行います。

コーディネータがオフセットの取得リクエストを受け取る時に、単純にオフセットのキャッシュから最後にコミットされたオフセットのベクトルを返します。コーディネータが開始したばかりか、あるいは(オフセットトピックのパーティションのためのリーダーになることで)新しいコンシューマグループのセットとしてコーディネータになったばかりの場合は、オフセットのトピックパーティションをキャッシュにロードする必要があるかもしれません。この場合、オフセットの取得は CoordinatorLoadInProgressException で失敗し、コンシューマはバックオフの後で OffsetFetchRequest を試行するかもしれません。

ZooKeeperのディレクトリ

以下はコンシューマとブローカー間を対等にするために使われるZooKeeperの構造とアルゴリズムを示します。

表記法

パス内の要素が [xyz] で示される場合、それはxyzの値が固定ではなく、実際にはxyzの値を取りうるZooKeeperのznodeであることを意味します。例えば、/topics/[topic] は各トピック名のサブディレクトリを含む /topics という名前のディレクトリになります。[0...5] のような数値範囲も指定され、サブディレクトリの 0, 1, 2, 3, 4 を示します。矢印 -> は znode の内容を示すために使われます。例えば、/hello -> world は値 "world" を含む znode /hello を示します。

ブローカーノードのレジストリ

    /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)

これは存在する全てのブローカーノードのリストです。それぞれはコンシューマで識別するユニークな論理ブローカーidを提供します (設定の一部として指定されなければなりません)。始めに、ブローカーノードは /broders/ids の下に論理ブローカーidを持つznodeを作成することで自身を登録します。論理ブローカーidの目的はブローカーがコンシューマに影響を与えずに異なる物理マシーンに移動できるようにするということです。既に使用されている(例えば2つのサーバが同じブローカーidで設定されているため)ブローカーidを登録しようとするとエラーになります。

ブローカーは短命のznodeを使って自身をZooKeeperに登録するため、この登録は動的で、もしブローカーがシャットダウンあるいは死亡した場合には消えるでしょう (従って、コンシューマへの通知はもう利用できません)。

ブローカーのトピックのレジストリ

    /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)

各ブローカーは自身をそのトピックについてのパーティションの数を維持し格納するトピックの下に登録します。

クラスターid

クラスターidはKafkaクラスタに割り当てられるユニークで不変の識別子です。クラスタidは最大22文字を持ち、許される文字は正規表現 [a-zA-Z0-9_\-]+ で定義されます。これはパディング無しのURLセーフなBase64の変種によって使われる文字に対応します。概念的には、クラスタが初めて開始した時に自動生成されます。

実装上は、バージョン 0.10.1以降のブローカーが初めて正常に開始された時に生成されます。ブローカーは開始時に/cluster/id znode からクラスタidを取得しようとします。もしznodeが存在しない場合は、ブローカーは新しいクラスタidを生成し、このクラスタidを使ってznodeを生成します。

ブローカーのノードの登録

ブローカーのノードは基本的に依存しません。つまり、それらは何を持っているかについて情報を公開するだけです。ブローカーが参加した時に、それは自身をブローカーノードのレジストリディレクトリに登録し、ホスト名とポートについての情報を書き込みます。ブローカーも既存のトピックのリストとそれらの論理パーティションをブローカーのトピックのレジストリに書き込みます。新しいトピックはそれらがブローカー上に生成された時に動的に登録されます。

TOP
inserted by FC2 system