<

ドキュメント

Kafka 2.7 Documentation

以前のリリース: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X.

4. 設計

4.1モティベーション

わたしたちはKafkaを大企業が持つかもしれない全てのリアルタイム データフィードを処理するための統一的なプラットフォームとして振舞うことができるように設計しました。これを行うためにユースケースのかなり大規模なセットを通して考える必要がありました。

リアルタイムのログの集約のような高ボリュームのイベントストリームをサポートするために高スループットを持つ必要があるでしょう。

オフラインシステムからの定期的なデータのロードをサポートできるように大規模なデータのバックログを使ってグレースフルに扱う必要があるでしょう。

それはシステムがもっと伝統的なメッセージングのユースケースを扱うために低レンテンシの配送を扱う必要があることも意味します。

新しく配送されるフィードを作成するためにそれらのフィードのパーティション、分散、リアルタイム処理をサポートする必要がありました。これがパーティションとコンシューマのモデルをする気にさせました。

結果的にストリームが提供のために他のデータシステムにフィードされる場合に、システムはマシーンの障害の時に耐障害性を保証することができる必要があるだろうということが分かりました。

これらの使用をサポートすることで、従来のメッセージングシステムよりデータベースログに近い独自の要素を持つデザインとなりました。以下の章で設計の幾つかの要素の要点を述べるつもりです。

4.2一貫性

ファイルシステムを恐れないでください!

Kafkaはメッセージを格納およびキャッシュするためにファイルシステムに著しく依存します。"ディスクは遅い" という人々が永続的な構造は競争力のあるパフォーマンスを提供することができるということを懐疑的に思わせる一般的な認識があります。実際には、ディスクはどのように使うかによって人々が思うよりかなり遅くなったりかなり速くなったりします; 適切に設計されたディスク構造はしばしばネットワークと同じくらい速くなります。

ディスクのパフォーマンスについての重要な事実は、ハードディスクのスループットはこの10年間でディスクのシークのレイテンシーとは異なっているということです。結果として、six 7200rpm SATA RAID-5 アレイのJBOD 設定への線形書き込みのパフォーマンスは約 600MB/sec ですが、ランダム書き込みのパフォーマンスは約 100k/sec だけです。- 違いは 6000倍以上です。これらの線形読み込みと書き込みは全ての利用パターンのほとんどと予想可能で、オペレーティングシステムによってすごく最適化されます。現代的なオペレーティングシステムは、大きなブロックの倍数のデータを前もって取得し、小さな論理書き込みを大きな物理書き込みにグループ化する、先行先読みと遅延書き込みの技術を提供します。この問題についての更なる議論はこのACM キューの文章で見つけることができます; ある場合においてはシーケンシャル ディスク アクセスはランダムなメモリアクセスよりも速くなりえます!

このパフォーマンスの相違を保証するために、現代的なオペレーティングシステムはディスクキャッシュについてのメインメモリの使用にますます積極的になりました。現代的なOSはメモリを取り戻す時に少しのパフォーマンスの罰則を持って喜んで全ての フリーメモリをディスクのキャッシュに転換するでしょう。全てのディスクの読み込みと書き込みはこの統合されたキャッシュを使い果たすでしょう。この機能はダイレクトI/Oを使わない限りは簡単には無効にすることができません。つまりもしプロセスがデータのプロセス内のキャッシュに保持される場合、このデータはおそらく効果的に全てを2度格納しながらOSページキャッシュにデュプリケートされるでしょう。

更に、私たちはJVMの上に構築しています。Javaのメモリの使い方に時間を費やした人は以下の2つの事を知っています:

  1. オブジェクトのメモリオーバーヘッドは非常に高く、しばしば格納されたデータを2倍(あるいはそれ以上)にします。
  2. Javaのガベージコレクションはヒープ内のデータが増加するに連れて、増加的に扱いづらく遅くなります。

ファイルシステムを使い、ページキャッシュに依存するこれらの要因の結果、メモリ内キャッシュあるいはその他の構造を維持するよりも優れています — 全ての空きメモリへの自動アクセスにより、利用可能なキャッシュが少なくとも2倍になり、個々のオブジェクトでは無くコンパクトなバイト構造を格納することでほぼ2倍になります。そうすることで32GB上のマシーンでGCのペナルティ無しに 28-30GB までのキャッシュ内に納まります。さらにこのキャッシュはサービスが再起動した場合でさえも電源が入ったままですが、プロセス内のキャッシュはメモリ内で再構築する必要があります(10GBのキャッシュは10分掛かるかもしれません)あるいは完全に電源が落ちたキャッシュから開始する必要があるでしょう(これは酷い初期パフォーマンスを意味することになりえます)。キャッシュとファイルシステム間のコヒーレントを維持するための全てのロジックが今はOS内にあるため、これはコードを大幅に簡素化します。これは一回限りのプロセス内の試行よりもより効率的でより正しく行う傾向があります。ディスクの使用方法が線形読み込みに向いている場合、先読みは各ディスクの読み込み時に有用なデータを効果的にあらかじめ入力しておきます。

これは非常に単純な設計を示唆しています: 可能な限りメモリ内に保持し、パニックで全てをファイルシステムにフラッシュするのではなく、スペースを使い果たすとそれを反転します。全てのデータは必ずしもディスクへのフラッシュ無しにファイルシステム上の永続ログにすぐに書き込まれます。実際にはこれは単にカーネルのページキャッシュに転送されることを意味します。

このページキャッシュ中心の設計の方式はVarnishの設計の文章で説明されます (多大なる傲慢さとともに)。

Constant Time Suffices

メッセージング システム内で使われる永続データ構造は、しばしばメッセージに関するメタデータを維持するための関連するBTreeあるいは他の一般的な目的のランダムアクセスデータ構造のコンシューマあたりのキューです。BTreeは利用可能な最も融通のきくデータ構造で、メッセージシステム内でトランザクション的あるいは非トランザクション的なセマンティクスの広範囲に及ぶ違いをサポートすることができます。しかしかなりの高コストが付随します: Btree オペレーションは O(log N) です。通常 O(log N) は本質的に一定の時間と等しいですが、これはディスク操作については正しくありません。ディスクのシークはポップごとに10 msで行われ、各ディスクは一度に1つのシークしかできないため、並行処理が制限されます。従って少量のディスクのシークでさえ高いオーバーヘッドに繋がります。ストレージシステムはとても高速なキャッシュされた操作ととても遅い物理ディスク操作とを混合するため、木構造の観測されるパフォーマンスはデータが固定のキャッシュを使って増加するに従ってしばしば超線形的になります--つまり、データが2倍になると2倍より遅くなります。

直観的に永続的なキューは1つの読み込み上に構築され、ログの解像度で一般的なようにファイルに追加されるかもしれません。この構造は全てのオペレーションが O(1) であり、読み込みが書き込みあるいはお互いをブロックしないという利点があります。パフォーマンス完全にデータサイズから切り離されるためこれは明らかなパフォーマンスの利点があります - 1つのサーバは今では多くの安い、低レンテンシのスピードの 1+TB SATA ドライバを利用することができます。それらはシークのパフォーマンスが低いですが、これらのドライバは大きな読み込みと書き込みについて許容できるパフォーマンスであり、1/3の値段で、3x の容量です。

パフォーマンスの犠牲無しに仮想的に無制限のディスク空間へのアクセスはメッセージング システムで通常見られないような幾つかの特徴を提供することができます。例えば、Kafkaではメッセージが消費されるとすぐに削除しようとする代わりに、比較的長い間メッセージを維持することができます(例えば1週間) 。これにより後で説明するようにコンシューマのための大きな柔軟性に繋がります。

4.3効率

私たちは多くの努力を効率化に注ぎました。主要なユースケースの1つがwebの動きのデータの処理で、これはとても嵩張るものです: 各ページのビューは何十もの書き込みを生成するかもしれません。更に、発行された各メッセージは少なくとも1つ(しばしば多くの)コンシューマによって読み込まれ、従ってできるだけ手軽に消費しようとします。

多数の同じようなシステムの構築と実行の経験から、効率化が効率的なマルチ テナント オペレーションのキーであることも知りました。アプリケーションによる使い方の小さな上昇により、もしダウンストリームのインフラストラクチャ サービスが容易にボトルネックになる場合、そのような小さな変更はしばしば問題になるでしょう。非常に高速であることで、負荷が掛かった状態でインフラストラクチャの前にアプリケーションが転倒することを保証します。使い方のパターンの変更が日常的に起こるため、集中化されたクラスタ上での数十あるいは数百のアプリケーションをサポートする集中化されたサービスを実行しようとする時にこれは特に重要です。

前の章でディスクの効率化について議論しました。一旦貧弱なディスクアクセスパターンが取り除かれると、このタイプのシステムには二つの一般的な非効率の原因があります: あまりに多くの小さな I/O オペレーションと、過度のバイトのコピーです。

小さなI/Oの問題はクライアントとサーバ間、およびサーバ独自の永続化オペレーションの中で起こります。

これを避けるために、生まれつきメッセージをグループ化する"メッセージ セット" 抽象化の周りにプロトコルが構築されます。これによりグループメッセージへのネットワークリクエストを1つにすることができ、一度に1つのメッセージを送るのではなくネットワーク周辺のオーバーヘッドを償却します。サーバは次々にログにメッセージのチャンクを追加し、コンシューマは一度に大きな線形のチャンクを取得します。

この単純な最適化は一桁違う大幅なスピードアップを生成します。バッチは大きなネットワークパケット、大きな連続するディスク操作、連続するメモリブロックなどに繋がります。それら全てによりKafkaは爆発的なランダムなメッセージ書き込みのストリームをコンシューマへ流れる線形の書き込みへ変えることができます。

もう一つの非効率はバイトのコピーです。低いメッセージレートでは、これは問題ではありませんが、高負荷では影響が大きなものになります。これを避けるためにプロデューサ、ブローカー、コンシューマによって共有される標準化されたバイナリメッセージフォーマットを採用します (つまりデータチャンクはそれらの間で修正されずに転送されます)。

ブローカーによって維持されるメッセージログはそれ自身は単なるファイルのディレクトリで、それぞれはプロデューサとコンシューマによって使われるのと同じ形式でディスクに書き込まれたメッセージセットの系列があります。この共通のフォーマットを維持することによりほとんどの重要なオペレーションを最適化することができます: 永続的なログのチャックのネットワーク転送。最新のunixオペレーティングシステムはページキャッシュからソケットへの転送のための高度に最適化されたコードパスを提供します; Linuxではこれは sendfile system callを使って行われます。

sendfileの効果を知るためには、ファイルからソケットへのデータの転送のための共通のデータパスを理解することが重要です:

  1. オペレーティングシステムはデータをディスクからカーネル空間のページキャッシュに読み込む
  2. アプリケーションはデータをカーネル空間からユーザ空間のバッファに読み込む
  3. アプリケーションはデータをソケットバッファのカーネル空間へ書き込む
  4. オペレーティングシステムはデータをソケットバッファからネットワーク上に送信されるNICバッファにコピーする

これは明らかに非効率です。4つのコピーと2つのシステムコールがあります。sendfileを使い、OSにデータをページキャッシュからネットワークに直接送信できるようにすることで、この再コピーを避けることができます。つまり、この最適化されたパスで、最終的なNICバッファへのコピーだけが必要です。

一般的なユースケースがトピック上の複数のコンシューマであることを期待します。上のゼロ-コピー最適化を使って、データは読み込まれるたびにメモリに格納しユーザ空間からコピーして取り出す代わりに、確実に1回ページキャッシュにコピーされ、それぞれの消費の度に再利用されます。これによりメッセージはネットワーク接続の制限に近いレートで消費されることができます。

このpagecacheとsendfileの組み合わせは、コンシューマがほとんど遅れずについていくKafkaクラスタ上では、データを完全にキャッシュから提供するため少しもディスク上の読み込み動作を見ることは無いだろうことを意味します。

更なるJavaでのsendfileとゼロ-コピーのサポートについては、この文章を見てください。

End-to-end バッチ圧縮

ボトルネックが実際にはCPUあるいはディスクでは無いがネットワークの帯域である場合があります。これは広域ネットワークを超えたデータセンター間でメッセージを送信するのに必要なデータパイプラインにとって特に正しいです。もちろんKafkaからの助け無しにユーザは一度に1つのメッセージを常に圧縮することができますが、同じ型のメッセージ間の繰り返しによる相当な冗長性のため、圧縮率が非常に悪くなります (例えば、JSON内のフィールド名あるいはwebログ内のユーザエージェントあるいは共通の文字列の値)。効率的な圧縮には、各メッセージを個別に圧縮するよりも複数のメッセージを一緒に圧縮することが必要です。

Kafka は効率的なバッチ形式を使ってこれをサポートします。メッセージのバッチは圧縮されてまとめられ、この形式でサーバに送信することができます。このメッセージのバッチは圧縮された形式で書き込まれ、ログ内で圧縮されたままで、コンシューマによってのみ解凍されるでしょう。

Kafka は GZIP, Snappy, LZ4 および ZStandard 圧縮プロトコルをサポートします。圧縮についての詳細は ここで見つけることができます。

4.4プロデューサ

ロードバランシング

プロデューサは仲介するルーティングの層無しにパーティションにとってのリーダーであるブローカーにデータを直接に送信します。プロデューサがこれをするのを助けるために、全てのKafkaのノードはプロデューサが適切にリクエストを指示できるようにどのサーバが生きていてトピックのパーティションのリーダーがどこにあるかを指定した時間でリクエストに答えることができます。

クライアントはどのパーティションにメッセージを発行するかを制御します。これはある種のランダムなロードバランシングを実装することでランダムに行うか、何らかのセマンティックなパーティション関数を使って行うことができます。ユーザがパーティションにキーを指定できるようにし、これを使ってパーティションにハッシュすることにより、セマンティック パーティションのインタフェースを公開します (必要に応じてパーティション関数を上書きするオプションもあります)。例えば、もし選択されたキーがユーザidであれば、指定されたユーザについての全てのデータは同じパーティションに送信されるでしょう。これにより次にはコンシューマがそれらの消費についてローカリティの仮説を立てることができます。この形式のパーティション化はコンシューマ内のローカリテイ-センシティブな処理ができるように明示的に設計されます。

非同期送信

バッチ処理は効率化の大きな推進要因の1つであり、Kafkaプロデューサのバッチを有効化することでメモリ内でデータを集約し、1つのリクエスト内でより大きなバッチを送出しようとするでしょう。バッチ処理は一定数以下のメッセージを集約し、いくらかの固定のレイテンシの制限(例えば 64k あるいは 10ms)より長く待たないように構成することができます。これによりより多くのバイトの集約を送信することができ、サーバ上ではI/Oオペレーションがより少なくなります。このバッファリングは設定可能で、より良いスループットのために少量の追加のレイテンシをトレードオフする仕組みを提供します。

設定で詳細に説明され、プロデューサについてのapi はドキュメント内のいたるところで見つけることができます。

4.5コンシューマ

Kafkaのコンシューマは、消費したいパーティションに繋がるブローカーへの"fetch" リクエストを発行することで動作します。コンシューマは各リクエストによってログ内のオフセットを指定し、その位置から始まるログのチャックを受け取ります。従ってコンシューマはこの位置について重要な制御を行い、必要であればデータを再消費するために巻き戻すことができます。

Push vs. pull

私たちが考慮した最初の問題は、コンシューマがデータをブローカーからpullすべきか、ブローカーがデータをコンシューマにpushすべきかでした。この点で、Kafkaはほとんどのメッセージングシステムで共有される、より伝統的な設計に従います。データはプロデューサからブローカーにpushされ、コンシューマによってブローカーからpullされます。Scribe および Apache Flumeのような幾つかのログ中心のシステムでは、データがダウンストリームにpushされる、とても異なるpushベースのパスに従います。両方のやり方には長所と短所があります。しかし、pushベースのシステムはデータが転送される時のレートをブローカーが制御するので、コンシューマの多様性を扱うのが難しいです。目的は一般的にコンシューマが最大限に可能なレートで消費できることです; 残念ながら、pushシステムでは消費のレートが生成のレートを下回る時(本質的にはサービス拒否アタックです)、このことはコンシューマが圧倒される傾向があることを意味します。pullベースのシステムはコンシューマが単純に後れを取り可能であれば追いつくという良い特性があります。これはコンシューマが圧倒されていることを示すなんらかのバックオフ プロトコルを使って緩和することができますが、コンシューマを完全に使用(しかし、使い過ぎない)するために転送のレートを取得することは思うよりは手の込んだものです。前のこの形式でのシステムの構築の試行により、より伝統的なpullモデルでの連携に導かれました。

pullベースのシステムのもう一つの利点は、それはコンシューマに送信されるデータの集約的なバッチに役に立つということです。pushベースのシステムは、ダウンストリームのコンシューマがすぐに処理できるかどうかを知らないまま、すぐにリクエストを送信するか、もっとデータを集めそれからそれを送信するかのどちらかを選択しなければなりません。低レンテンシに調整された場合、転送が最後にはどうしてもバッファされるとしても、これは一度に1つのメッセージを送信することになるでしょう。これは無駄です。pullベースの設計はコンシューマが常にログの現在の位置より後(あるいは何らかの設定可能な最大サイズまで)の全ての利用可能なメッセージをpullすることでこれを正します。そして不必要なレイテンシを導入することなく最適なバッチを手にすることができます。

pullベースのシステムの欠陥は、もしブローカーがデータを持たない場合にコンシューマが短いループで事実上データの到着を忙しく待ってpullすることになるかも知れないということです。これを避けるために、コンシューマのリクエストがデータが到着するまで "長いpoll" の中で待つことを阻止することができるpullリクエスト内のパラメータを持ちます (そして、任意に大きな転送サイズを保証するために指定されたバイト数が利用可能になるまで待ちます)。

end-to-endのpullだけの他の有り得そうな設計を想像することができるでしょう。プロデューサは局所的にローカルログに書き込み、ブローカーはそれらをpullするコンシューマを使ってpullするでしょう。"store-and-forward" のようなプロデューサがしばしば提案されます。これは興味深いですが、無数のプロデューサがある私たちの目的の使い方にはあまりふさわしくないように思えました。永続的なデータシステムを大規模に実行した経験から、多くのアプリケーションにまたがるシステムに数千のディスクを含めても、実際には信頼性が向上せず、操作が悪夢になると分かりました。そして実際に、プロデューサの一貫性の必要性無しに大規模なSLAを使ってパイプラインを実行することができることを知りました。

コンシューマの位置

何が消費されたかを追跡し続けることは、意外にもメッセージシステムの主要なパフォーマンスの要点の1つです。

ほとんどのメッセージングシステムはどのメッセージがブローカーで消費されたかについてのメタデータを保持します。つまり、メッセージはコンシューマへ提出され、ブローカーはローカルで即座にその事実を記録あるいはコンシューマからの通知を待つかもしれません。 これはかなり直観的な選択で、実際1つのマシーン上のサーバではこの状態が他のどこへ行くかが明確ではありません。多くのメッセージングシステムでの格納に使われるデータ構造はスケールに乏しいため、これは実用主義でもあります ーブローカーは何が消費されたかを知るため、データサイズが小さいままでそれを即座に削除することができます。

恐らく明らかでないことは、何が消費されたかについてブローカーとコンシューマが同意に至るまでが些細な問題では無いということです。もしブローカーがネットワーク越しにメッセージが提出されるたびにすぐにメッセージをconsumed として記録すると、もしコンシューマがメッセージの処理に失敗すると(つまり、クラッシュあるいはリクエストのタイムアウトなどなんでも)、メッセージは失われるでしょう。この問題を解決するために、多くのメッセージングシステムはメッセージが送信された時にconsumedではなく単にsent としてマークされることを意味する通知機能を追加します; ブローカーはメッセージを consumedとして記録するためにコンシューマからの特定の通知を待ちます。この戦略はメッセージの喪失の問題を解決しますが新しい問題を起こします。まず最初に、もしコンシューマがメッセージを処理したが通知を送信できる前に失敗した場合、そのメッセージは2度消費されるでしょう。2つ目の問題はパフォーマンス周りです。ブローカーはそれぞれ1つのメッセージについて複数の状態を保持しなければなりません(最初に2度送信されないようにロックし、そして削除できるように恒久的に消費されたとしてマークします)。送信されたが通知されないメッセージをどうするかのような、微妙な問題が扱われなければなりません。

Kafka はこれをそれぞれに扱います。トピックは全体として並べられたパーティションのセットに分割され、それぞれは確実に1回のコンシューマによって各購買コンシューマグループ内で指定された時に消費されます。このことは各パーティション内のコンシューマの位置は次に消費するメッセージのオフセットである単なる1つの整数であることを意味します。これにより何が消費されたかについての状態は各パーティションごとに1つの数値でとても小さくなります。この状態は定期的にチェックポイントすることができます。これによりメッセージの通知に相当するものはとても安上がりなものになります。

この決定には副次的な恩恵があります。コンシューマは故意に古いオフセットへrewindし、データを再消費することができます。これはキューの一般的な規約に違反しますが、多くのコンシューマにとって不可欠な機能ということが分かります。例えば、もしコンシューマのコードにバグがあり、いくつかのメッセージが消費された後で発見された場合、バグが修正された後でコンシューマはこれらのメッセージを再消費することができます。

オフライン データのロード

スケーラブルな永続性は、Hadoopあるいはリレーショナル データ ウェアハウスのようなオフラインのシステムに、データを定期的にバルクロードするバッチのように定期的に消費するだけのコンシューマの可能性を考慮します。

Hadoopの場合、ロードの完全な並行化を可能にしてそれぞれのノード/トピック/パーティションの組み合わせについての個々のマップタスクのロードを分割することでデータのロードを並行化します。Hadoopはタスクの管理を提供し、失敗したタスクはデータの重複の危険性無しに再起動することができます - それらは単に元の位置から再起動します。

静的なメンバーシップ

静的なメンバーシップは、ストリームアプリケーション、コンシューマグループおよびグループ リバランス プロトコル上に構築された他のアプリケーションの可用性を向上させることを目的としています。リバランス プロトコルはエンティティidをグループメンバーに割り当てるためにグループ コーディネータに依存します。これらの生成されたidは一時的なもので、メンバーが再起動及び再参加すると変更されます。コンシューマ ベースのアプリケーションの場合、この "動的なメンバーシップ" により、コードの展開、設定の更新および定期的な再起動などの管理作業中に、タスクの大部分が異なるインスタンスに再割り当てされます。大規模な状態のアプリケーションの場合、シャッフルされたタスクは、処理する前にローカルの状態を回復し、アプリケーションを部分的または完全に使用できなくするために長い時間が必要です。この観察によって動機づけられたKafkaのグループ管理プロトコルにより、グループメンバーは永続的なエンティティidを提供することができます。グループのメンバーシップはこれらのIDに基づいて変更されないため、リバランスは引き起こされません。

静的メンバーシップを使いたい場合、

  • ブローカークラスタとクライアントアプリの両方を2.3以上にアップグレードし、アップグレードされたブローカーが2.3以降のinter.broker.protocol.versionも使っていることを確認します。
  • 設定 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG を1つのグループのもとの各カスタマ インスタンスについて一意の値に設定します。
  • Kafkaストリーム アプリケーションの場合、インスタンスで使用されるスレッドの数に関係なく、KafkaStreamのインスタンスごとに一意の ConsumerConfig#GROUP_INSTANCE_ID_CONFIG を設定するだけで十分です。
ブローカーが2.3より古いバージョンだが、クライアント側でConsumerConfig#GROUP_INSTANCE_ID_CONFIGを設定するか音を選択した場合、アプリケーションはブローカーのバージョンを検知し、UnsupportedException を投げます。誤って異なるインスタンスに重複idを設定した場合、ブローカー側の防護の仕組みがorg.apache.kafka.common.errors.FencedInstanceIdExceptionを引き起こすことにより、重複クライアントを直ちにシャットダウンするように通知します。詳細はKIP-345を見てください。

4.6メッセージ配送セマンティクス

プロデューサとコンシューマの働きについて少し理解できたので、Kafkaがプロデューサとコンシューマ間で提供するセマンティックな保証について議論しましょう。提供することができる可能なメッセージ配送の保証は明らかに複数あります:

  • 最大1回—メッセージは紛失するかもしれませんが、決して再配送はされません。
  • 少なくとも1回—メッセージは決して紛失されませんが、再配送されるかもしれません。
  • 確実に1回—これはみんなが本当に欲しいもので、各メッセージは1回だけ配送されます。
これは2つの問題に分類されることに注目する価値があります: メッセージを発行する際の耐久性の保証と、メッセージを消費する時の保証です。

多くのシステムは "確実に1回"の配送セマンティクスを提供するように要求します。しかし但し書きを読むことが重要です。それらのほとんどの要求には語弊があります (つまり、それらはコンシューマあるいはプロデューサが失敗する時、複数のコンシューマ処理がある時、あるいはディスクに書き込まれるデータが紛失されるかも知れない時について説明しません)。

Kafkaのセマンティクスは素直なものです。メッセージを発行する時、ログにメッセージが”コミットされた”ことの記録を持ちます。発行されたメッセージがコミットされると、このメッセージが書き込まれたパーティションをリプリケートする1つのブローカーが "alive" である限り紛失されないでしょう。コミットされたメッセージの定義、aliveパーティション、私たちが処理しようとしている種類の障害の説明については、次の章で詳細に説明されるでしょう。今のところは、完全、紛失無しのブローカーを仮定し、プロデューサとコンシューマについての保証を理解しましょう。Fもしプロデューサがメッセージを発行しようとしてネットワークのエラーに遭遇した場合、もしこのエラーがメッセージが発行された前後で発生すると、確実ではありません。これは自動生成されたキーを持つデータベーステーブルへの挿入のセマンティクスに似ています。

0.11.0.0 より前は、もしプロデューサがメッセージがコミットされたことを示す応答の受信に失敗した場合、メッセージを再送するというちょっとした選択がありました。元のリクエストが実際には成功していた場合再送の間にメッセージがログに再び書き込まれるかも知れないため、これは少なくとも1回の配送セマンティクスを提供します。0.11.0.0 からはKafkaのプロデューサは再送がログ内の重複エントリに結果的にならないように保証する等冪配送オプションもサポートします。これを行うために、ブローカーは各プロデューサにIDを割り当て、各メッセージと共にプロデューサによって送信されるシーケンス番号を使ってメッセージを1つにします。また 0.11.0.0から、プロデューサはトランザクションのようなセマンティクスを使って複数のトピックパーティションにメッセージを送信する機能をサポートします: つまり、全てのメッセージは完全に書き込まれるか、何も書き込まれないか、のどちらかです。これの主要な利用法はKafkaトピック間の確実に1回の処理です (以下で説明されます)。

全ての利用法でそのような強い保証が必要なわけではありません。レンテンシーを気にするユーザについては、プロデューサが望ましいレベルの持続性を指定することができます。プロデューサがメッセージがコミットされる時に待ちたいことを指定する場合、これは10ミリ秒程度掛かるかもしれません。しかし、プロデューサは、完全に同期して送信をしたい、あるいはリーダー(必ずしもフォロワーではない)がメッセージを受け取るまでのみ待ちたいということも指定することができます。

ではコンシューマの視点からのセマンティクスを説明しましょう。全てのレプリカは同じオフセットを使って確実に同じログを持ちます。コンシューマはログ内のその位置を制御します。もしコンシューマが停止しなければ、メモリ内にこの位置を格納するだけです。しかしもしコンシューマが故障し、このトピックパーティションを他のプロセスに交代したい場合は、新しい処理は処理を開始する適切な位置を選択する必要があります。例えば、コンシューマが何らかのメッセージを読むとした場合 -- メッセージを処理しその位置を更新するには幾つかのオプションがあります。

  1. メッセージを読み、ログ内に位置を保存し、最後にメッセージを処理することができます。この場合、位置を保存した後だがそのメッセージの処理の出力を保存する前にコンシューマの処理が停止する可能性があります。この場合、処理を引き継いだプロセスは、その位置より前の2,3のメッセージが処理されなかったとしても、保存された位置から開始するでしょう。この場合コンシューマの不足したメッセージは処理されないかも知れないので、これは"最大1回"のセマンティクスに対応します。
  2. メッセージを読み込み、メッセージを処理し、最後にその位置を保存することができます。この場合、メッセージの処理の後だがその位置を保存する前にコンシューマのプロセスが停止する可能性があります。この場合、新しいプロセスが引き継いだ時に、受信する最初の2,3のメッセージがすでに処理されているでしょう。コンシューマの不足の場合"少なくとも1回"のセマンティクスに対応します。多くの場合においてメッセージはプライマリキーを持ち、そのため更新は等冪です (同じメッセージを2度受信すると、それ自身の他のコピーを使って単純にレコードを上書きします)。

それで、確実に1回のセマンティクスとは何でしょうか (言い換えると、あなたが本当に欲しいもの)?Kafkaトピックからの消費と他のトピックへの生成(Kafka Streams アプリケーション)時に、上で述べたような 0.11.0.0 での新しいトランザクションのプロデューサ機能を利用することができます。コンシューマの位置はトピック内でメッセージとして格納されるため、処理されたデータを受信する出力トピックとして同じトランザクション内のKafkaへのオフセットを書くことができます。もしトランザクションが中断された場合、コンシューマの位置はそれの古い値に戻され、出力トピック上の生成されたデータは"隔離レベル"に応じて他のコンシューマに見えなくなるでしょう。デフォルトの"read_uncommitted" 隔離レベルでは、全てのメッセージはたとえそれらが中断されたトランザクションであってもコンシューマに見えますが、"read_committed"では、コンシューマはコミットされた(そしてトランザクションの一部ではない全てのメッセージ)トランザクションからのメッセージのみを返すでしょう。

外部システムに書き込む時の制限は、コンシューマの位置を実際に出力として保存されるものと調整する必要があるという事です。これを行う伝統的な方法は、コンシューマの位置の格納とコンシューマの出力の格納との間の2つのフェーズのコミットを導入することでしょう。しかし、これはコンシューマに出力と同じように同じ場所にオフセットを格納させることで、より簡単に一般的に処理することができます。コンシューマが書き込みたい多くの出力システムでは2フェーズのコミットをサポートしないため、これはより良いでしょう。この例として、データとオフセットの両方が更新されるか、あるいはどちらも更新されないことを保証するために、読み込んだデータのオフセットと共にHDFSのデータを取り込むKafka Connectを考えてみましょう。これらのより強力なセマンティクスを必要とし、メッセージに重複排除を可能とする主キーが無い他の多くのデータシステムについても、同様のパターンに従います。

つまりKafkaはKafka Streamsの中で確実に1回の配送を効果的にサポートし、Kafkaトピック間でデータを転送および処理する時に確実に1回の配送を提供するためにトランザクションのプロデューサ/コンシューマを一般的に使うことができます。他の宛先のシステムのための確実に1回の配送は一般的にそのようなシステムとの協力を必要としますが、Kafkaはこれを実装することを便利にするオフセットを提供します (Kafka Connectも見てください)。それ以外の場合、Kafkaはデフォルトで少なくとも1回の配送を保証し、プロデューサ上での再試行を無効にしてメッセージのバッチを処理する前にコンシューマ内のオフセットをコミットすることでユーザは最大で1回の配送を実装することができます。

4.7リプリケーション

Kafkaは各トピックのパーティションについて設定可能な数のサーバに渡ってログをリプリケートします (このレプリケーションのファクタをトピック毎に基づいて設定することができます)。これによりクラスタ内のサーバに障害が発生した場合にこれらのレプリカに自動的にフェールオーバーすることができ、障害時にメッセージが利用可能になります。

他のメッセージングシステムはレプリケーション関係の機能をいくつか提供しますが、私たちの(完全にバイアスされた)意見では、これは悪用され、あまり使われておらず、大きな欠点があります: レプリカは非アクティブで、スループットに大きな影響があり、扱いにくい手動の設定が必要です。Kafkaはデフォルトでレプリケーションと一緒に使われることになっています - 実際私たちはリプリケーションされていないトピックをリプリケーション要素が1のリプリケートされたトピックとして実装します。

リプリケーションの単位はトピックのパーティションです。障害が無い状況では、Kafka内の各パーティションは1つのリーダと0以上のフォロワーを持ちます。リーダーを含むレプリカの総数はレプリケーション要素を構築します。全ての読み込みおよび書き込みはパーティションのリーダーに行きます。一般的に、ブローカーよりも多くのパーティションがあり、リーダーは結果的にブローカー間に分散されます。フォロワーのログはリーダーのログと同じです — 全て同じ順番で同じオフセットとメッセージを持ちます (しかしもちろんいつでもリーダーはログの最後にまだリプリケートされていない少しのメッセージを持つかもしれません)。

フォロワーはリーダーからのメッセージを通常のKafkaコンシューマがするように消費し、それらを自身のログに適用します。フォロワーをリーダーからpullすることは、フォロワーがログに適用するログエントリを自然に一括してバッチ処理することをできる素晴らしい機能を持ちます。

ほとんどの分散システムと同様に、自動的な障害の処理にはノードが "alive" ということは何なのかを正確に定義する必要があります。Kafkaのノードの生存には2つの条件があります

  1. ノードはZooKeeperを使ってセッションを維持できなければなりません (ZooKeeperのハートビート機構)
  2. フォロワーであれば、リーダーに起きた書き込みをレプリケートしなければならず、"あまりに遅れて"はなりません
"alive" あるいは "failed"の曖昧さを避けるために、この2つの条件を満たすノードを "in sync" と呼びます。リーダーは"同期" ノードのセットを追跡します。もしフォロワーが死亡、立ち往生、あるいは脱落すると、リーダーはそれを同期レプリカのリストから削除するでしょう。立ち往生、レプリカの遅れの決定は replica.lag.time.max.ms 設定によって制御されます。

分散システムの用語では、ノードが突然動作を停止しその後回復(おそらくそれらが死んだことを知ること無しに)する障害の "fail/recover" モデルのみを扱おうとします。Kafkaはノードが任意あるいは悪意のある応答(おそらくバグあるいは不正な操作による)による所謂 "Byzantine" 障害は扱いません。

パーティションについての全ての同期レプリカがメッセージをログに適用した時に、メッセージがコミットされたと見なすと、より明確に定義することができます。今までコミットされたメッセージだけがコンシューマに配布されなければなりません。このことはリーダーが失敗した時に紛失されるかも知れないメッセージを一時的に見ることについて、コンシューマが心配する必要がないことを意味します。逆にプロデューサは、レイテンシと持続性の間のトレードオフについての選択に応じて、メッセージがコミットされるのを待つかそうでないかのどちらかの選択肢を持ちます。この特徴はプロデューサが使用するack設定によって制御されます。メッセージが完全な同期レプリカセットに書き込まれたことの通知をプロデューサが要求する時にチェックされる、in-syncレプリカの "最小数" の設定をトピックが持つことに注意してください。あまり厳しくない通知がプロデューサによって要求される場合、in-syncレプリカがその最小数よりも少なくてもメッセージをコミットし消費することができます (例えば、リーダーと同じだけ少ないかもしれません)。

Kafkaが提供する保証は、常に同期レプリカのうちの少なくとも1つが有効である限り、コミットされたメッセージが喪失されないだろうことです。

Kafkaは短時間のフェイルオーバー期間の後でノード障害時に利用可能でいますが、ネットワークの分割の場合には利用可能では無いかもしれません。

レプリケートされたログ: Quorums, ISRs および State Machines (Oh my!)

Kakfaパーティションの核心はレプリケートされたログです。レプリケートされたログは分散データシステムでの最も基本的なプリミティブの1つですが、それを実装するには多くのやり方があります。リプリケートされたログは他のシステムによって state-machine styleの他の分散システムを実装するためのプリミティブとして使うことができます。

リプリケートされたログは一連の値の順序について合意するプロセスをモデル化します (一般的に、ログエントリの番号は 0, 1, 2, ...)。これを実装するには多くの方法がありますが、最も簡単で速い方法はそれが提供される値の順番を選択するリーダーを使うことです。リーダーが有効である限りは、全てのフォロワーはリーダーが選択した値と順番をコピーする必要があります。

もちろん、もしリーダーが失敗しなかった場合は、フォロワーを必要としなかったでしょう!リーダーが死亡した場合、フォロワーの中から新しいリーダーを選択する必要があります。しかしフォロワー自身は遅延するか、クラッシュするかもしれません。つまり最新のフォロワーを選択しなければなりません:ログのリプリケーション アルゴリズムが提供しなければならない基本的な保証は、もしメッセージがコミットされたとクライアントに知らせて、リーダーが失敗した場合、選択した新しいリーダーもそのメッセージを受け取らなければならないということです。これはトレードオフをもたらします: もしメッセージがコミットされたことを申告するより前に、より多くのフォロワーがメッセージを応答するのをリーダーが待つ場合、一時的に選択可能なリーダーがより多くいるでしょう。

必要とされる通知の数と、重複が保証されるようにリーダーを選出するために比較されなければならないログの数をせんたくする場合、これは定足数と呼ばれます。

このトレードオフの一般的なやり方は、コミットの決定とリーダー選択の両方について多数決を使うことです。これはKafkaが行うものとは違いますが、トレードオフを理解するためにとにかく調べてみましょう。2f+1 のレプリカがあるとします。リーダーによってコミットが宣言される前に f+1 のレプリカが受信しなければならない場合、そして少なくとも f+1 のレプリカから最も完了したログを持ち、f未満の障害のフォロワーを選択することで新しいリーダーが選出された場合、リーダーは全てのコミットされたメッセージを持つことが保証されます。これはf+1 のレプリカの中で、全てのコミットされたメッセージを含む少なくとも1つのレプリカがある筈だからです。そのレプリカのログは最も完了して、下が立って新しいリーダーとして選択されるでしょう。各アルゴリズムが処理しなければならない(たとえばログをより完全にする、リーダーの障害時のログの一貫性の保証、あるいはレプリカセット内のサーバセットの変更を正確に定義する)詳細はたくさんありますが、今はそれらを無視します。

この多数決のやり方はとても良い特性を持ちます: レイテンシは最も速いサーバのみに依存します。つまり、もしレプリケーションの要素が3つであれば、レイテンシは遅いサーバでは無く最も速いフォロワーによって決定されます。

ZooKeeperの Zab, Raft および Viewstamped Replication を含む数多くのこの類のアルゴリズムがあります。Kafkaの実際の実装で意識した最も類似した学術的な発表はMicrosoftの PacificA です。

多数決のマイナス面は、選出可能なリーダーが居ないままにするのに多くの失敗が必要ではないということです。1つの障害を耐えるには3つのデータのコピーが必要で、2つの障害を耐えるには5つのデータのコピーが必要です。経験上、実際のシステムにおいて1つの障害に耐えるのに十分な冗長性を持つだけでは十分ではありませんが、5倍のディスク容量と5分の1のスループットで5回の書き込みを実行することは大量のデータ問題に対してはあまり実用的ではありません。これはおそらく定足数アルゴリズムがZooKeeperのような共有クラスタ構成でよく一般的に見かけられる理由ですが、一次データストレージではあまり一般的ではありません。例えばHDFSでは名前ノードの高可用性機能が多数決ベースのジャーナル上に構築されますが、この効果なやり方はデータ自身には使われません。

Kafkaは定員数のセットを選択するために少し異なるやり方をします。多数決の代わりに、Kafkaは動的にリーダについていく同期レプリカ(ISR)のセットを整備します。このセットのメンバーだけがリーダーとして選出の資格があります。Kafkaのパーティションへの書き込みは、全ての 同期レプリカが書き込みを受け取るまで、コミットされたと見なされません。このISRセットは変更時に常にZooKeeperに存続します。これにより、ISR内の全てのレプリカは選出されたリーダーになる資格があります。多くのパーティションがありリーダーシップのバランスが重要な場合は、Kafkaの利用モデルにとってこれは重要な要素です。このISRモデルとf+1 レプリカを使って、Kafkaのトピックはコミットされたメッセージの喪失無しに f 個の障害に耐えることができます。

扱いたいほとんどの利用法において、このトレードオフを無理のないものと考えます。実際には、f個の障害を耐えるために、多数決とISRのやり方の両方はメッセージをコミットする前に通知をするために同じ数のレプリカを待つでしょう (例えば、1つの障害を切り抜けるには、多数決の定足数は3つのレプリカと1つの通知を必要とし、ISRのやり方は2つのレプリカと1つの通知を必要とします)。最も遅いサーバ無しにコミットする機能は、多数決のやり方で利点があります。ただし、メッセージ コミットでブロックするかどうかをクライアントが選択できるようにすることで改善されると考えています。必要なレプリケーション係数が低いため、スループットとディスク領域の追加をする価値があります。

もう一つの重要な設計の違いは、Kafkaはクラッシュしたノードが全てのデータをそのまま回復することを必要としないことです。この領域のレプリケーションアルゴリズムが、潜在的な整合性違反無しに障害回復シナリオで失われない"安定したストレージ"の存在に依存することは、珍しくありません。この仮定には2つの重要な問題があります。まず、ディスクのエラーは永続的なデータシステムの現実の操作において見た中で最も一般的な問題であり、それらはしばしばデータを損なうということです。次に、これは問題では無いとしても、fsyncは2から3桁のパフォーマンスの低下を起こすかもしれないため、一貫性の保証のために各書き込みにfsyncの使用を必要としたくありません。レプリカがISRを再結合することができる私たちのプロトコルは、再結合の前にたとえクラッシュ時のフラッシュしていないデータを紛失したとしても完全に再び再syncしなければならないことを保証します。

不透明なリーダー選出: 全て死んだ場合にどうなるか?

同期中である少なくとも1つのレプリカでのデータの喪失についてのKafkaの保証は予想されていることに注意してください。もしパーティションをレプリケートしている全てのノードが死亡した場合、この保証はもう保持されません。

しかし、全てのレプリカが死亡する時に、実際のシステムは何かをする必要があります。これが起きてしまうような不運な場合何が起きるかを考慮することは重要です。実装されているかもしれない2つの挙動があります:

  1. ISR内のレプリカを復活するのを待ち、このレプリカをリーダーとして選択する(できれば、それはまだ全てのデータを持っています)。
  2. 復活する最初のレプリカ(ISR内である必要はありません)をリーダーとして選択する。

これは可用性と一貫性の単純なトレードオフです。もしISRの中のレプリカを待つのであれば、それらのレプリカがダウンする間利用不可能なままになるでしょう。もしそのようなレプリカが破壊されるかそれらのデータが紛失された場合、恒久的にダウンします。一方で、もし非同期のレプリカが復活しそれをリーダーにすることができれば、全てのコミットされたメッセージを持っている保証がないとしてもそのログが事実となるでしょう。バージョン 0.11.0.0からデフォルトでKafkaは最初の戦略を選択し、一貫性のあるレプリカを待つことを好みます。使用可能時間が一貫性より好ましい場合のユースケースをサポートするために、この挙動は unclean.leader.election.enable プロパティの設定を使って変更することができます。

このジレンマはKafkaに限ったものではありません。どのような定員ベースのスキーマに存在します。例えば、多数決のスキーマにおいて、もしサーバの大多数が恒久的な障害を受けた場合は、データの100%を失うか、新しい事実として既存のサーバ上に残っているものを取り出すことで一貫性を乱すかを選択する必要があります。

可用性と持続性の保証

Kafkaに書き込む場合、プロデューサは 0,1 あるいは all (-1) のレプリカによって通知されるメッセージを待つかどうかを選択することができます。"全てのレプリカから通知" は割り当てられたレプリカ全部のセットがメッセージを受け取る保証をするものでは無いことに注意してください。デフォルトでは、acks=allの時に、全ての現在の同期レプリカがメッセージを受け取るとすぐに通知が起きます。例えば、もしトピックが2つのレプリカで構成され、1つが失敗した場合 (つまり、1つの同期レプリカだけが残る)、acks=all を指定した書き込みだけが成功するでしょう。しかし、もし残っているレプリカも失敗した場合は、これらの書き込みは喪失されるかもしれません。これはパーティションの最大の可用性を保証しますが、この挙動は可用性より持続性を望むユーザには望ましくないかもしれません。従って、メッセージの可用性より持続性を好むユーザが使うことができる2つのトピックレベルの設定を提供します。
  1. 汚れたリーダーの選出を無効化 - もし全てのレプリカが利用不可になった場合、ほとんどの最新のリーダー再び利用可能になるまでパーティションは利用不可のままになるでしょう。これはメッセージの喪失のリスクより利用不可を事実上好みます。説明は前の章の汚れたリーダー選出を見てください。
  2. 最小のISRのサイズを指定します - 1つのレプリカへ書き込まれるメッセージの喪失を避けるために、ISRのサイズがこの最小より大きい場合のみパーティションは書き込みを受け付けるでしょう。その後利用不可になります。プロデューサが acks=all を使う場合にのみこの設定は有効で、メッセージが少なくともこれだけの同期レプリカによってメッセージが通知されるだろうことを保証します。この設定は一貫性と可用性のトレードオフを提供します。メッセージが喪失される可能性を減らすためにより多くのレプリカにメッセージが書き込まれることが保証されるため、最小ISRサイズを大きくすると、一貫性が良くなります。しかし、同期レプリカの数が最小の閾値より少ない場合、パーティションが書き込みに利用できないため、可用性が下がります。

レプリカの管理

レプリケートされたログについての上の議論は、実際には1つのログのみ、つまり1つのトピックのパーティションを対象とします。しかし、Kafkaクラスタは数十万のこれらのパーティションを管理するでしょう。少ない数のノード上で高ボリュームのトピックのために全てのパーティションをクラスタリングすることを避けるためにラウンドロビン形式のクラスタ内でパーティションをバランスしてみます。同様にパーティションに比例した占有のために各ノードがリーダーとなるように、リーダーシップをバランスしてみます。

リーダーシップ選出プロセスを最適化することも重要です。これは非可用性の重要なウィンドウだからです。リーダー選出のネィティブな実装は、ノードが失敗した時にノードがホストしている全てのパーティションについてパーティション毎に選択を実行することになります。代わりにブローカーの1つを "controller" として選出します。このコントローラはブローカーレベルで障害を検知し、障害が起きたブローカー内の影響を受けた全てのパーティションのリーダーを変更する責任があります。その結果、必要なリーダーシップ変更の通知をまとめてバッチ処理することができるため、多数のパーティションについて選出のプロセスをはるかに安価で高速にすることができます。コントローラが失敗すると、生き残ったブローカーの1つが新しいコントローラになるでしょう。

4.8ログの圧縮

Logの圧縮は、Kafkaが常に1つのトピックパーティションのデータのログ内で各メッセージキーの値について少なくとも最後に知られている値を維持するでしょう。それは、アプリケーションのクラッシュあるいはシステム障害、またはアプリケーションがオペレーション的なメンテナンスの間に再起動した後でキャッシュをリロードするような、ユースケースやシナリオに取り組みます。これらの利用法についてもっと詳しく分け入り、圧縮がどのように動くかを説明します。

これまで、ある期間の後あるいはなんらかの事前決定されたサイズにログが達した時に廃棄される古いログデータの単純なデータの維持についてのみ説明してきました。これは各レコードが独立しているログのような一時的なイベントデータについては良く動作します。しかしデータストリームの重要なクラスはキー付け、ミュータブルなデータ(例えば、データベースのテーブルへの変更)への変更のログです。

そのようなストリームについての具体的な例を議論しましょう。ユーザのメールアドレスを含むトピックがあるとします; ユーザがメールアドレスを更新するたびにプライマリーキーとしてユーザidを使ってこのトピックにメッセージを送信します。id 123のユーザについて以下のメッセージを長い期間の後に送信するとします。各メッセージはメールアドレスについての変更に対応します (他のidについてのメッセージは省略されます):

       123 => bill@microsoft.com
                .
                .
                .
        123 => bill@gatesfoundation.org
                .
                .
                .
        123 => bill@gmail.com
ログ圧縮により、各プライマリキー(例えば bill@gmail.com) についての最後の更新だけでも維持をする保証ができるように、もっと細かい保持メカニズムを与えます。こうすることでログが最近変更されたキーについてのみではなく各キーについて最終的な値の完全なスナップショットを含むことを保証します。このことは全ての変更の完全なログを維持することなくダウンストリームのコンシューマがこのトピックから独自の状態を回復することができることを意味します。

これが有用な2,3の利用法を見ることから始め、それがどのように使われるかを見てみましょう。

  1. データベースの変更の購読。しばしば複数のデータシステムにデータセットを持つことが必要になります。そしてしばしばそれらのシステムのうちの1つはなんらかのデータベースです (RDBMSあるいはおそらく新しい価値のあるキー-値ストア)。例えば、データベース、キャッシュ、検索クラスタおよびHadoopクラスタを持つかもしれません。データベースへの各変更はキャッシュ、検索クラスタ、最終的にHadoopに反映される必要があるでしょう。リアルタイムの更新のみを処理する場合は、最新のログだけが必要です。しかし、キャッシュを再読み込み、あるいは障害を起こした検索ノードを回復したい場合は、完全なデータセットが必要かもしれません。
  2. イベント ソース。これは、クエリ処理をアプリケーション設計と同じ場所に配置し、変更のログをアプリケーションのプライマリ ストアとして使う、アプリケーション設計のススタイルです。
  3. 高可用性のためのジャーナリング。ローカル計算を行うプロセスは、ローカル状態に加えた変更を記録することで、障害に耐性を持つようにすることができます。これにより、別のプロセスがこれらの変更をリロードし、失敗した場合に続行することができます。これの具体的な例は、カウントの処理、集約、ストリームクエリシステムでの別の"group by"のような処理です。リアルタイム ストリーム処理フレームのSamzaはまさにこの目的でこの機能を使います
これらの各ケースでは、主に変更のリアルタイム フィードを処理する必要がありますが、時にはマシンがクラッシュあるいはデータが再ロードあるいは再処理する必要がある場合には完全なロードをする必要があります。ログの圧縮によりこれらの両方のユースケースを同じ保証トピックから情報を得ることができます。ログの使い方のこの形式はこのブログ投稿の中で更に詳しく説明されます。

全体的な考えはとても単純です。もし無限のログを保有し、上の場合の各変更を記録した場合、最初に開始された時から各時間においてシステムの状態を獲得していたでしょう。この完全なログを使って、ログ内の最初のNレコードを再生することで時間内の任意の場所へ回復することができるでしょう。この仮説的な完全なログは、安定したデータセットであってもログが無限に成長するため1つのレコードを何度も更新するシステムにとってはあまり実用的ではありません。古い更新を破棄する単純なログ保持メカニズムはスペースを制限しますが、そのログは現在の状態を回復する方法ではなくなります— 古い更新が全くキャプチャされない可能性があるため、ログの最初から復元すると現在の状態が再生成されなくなります。

ログの圧縮は、荒い粒度の時間ベースの保存というより、細かい粒度のレコード単位の保存を行うための仕組みです。同じプライマリキーを持つ、より最近の更新を持つレコードを選択的に削除するという考えです。このやり方でログは各キーについて少なくとも最後の状態を持つことを保証されます。

この保存ポリシーはトピック毎に設定することができるため、1つのクラスタは保持がサイズあるいは時間によって強制される幾つかのトピックと、保持が圧縮によって強制される他のトピックを持つことができます。

この機能はLinkdInの最も古く最も成功したインフラストラクチャのうちの1つから着想を得ています -- Databusと呼ばれるデータベース変更ログキャッシングサービス。ほとんどのログ構造のストレージシステムと異なり、Kafkaは購読のために構築され、速い線形読み込みおよび書き込みのためにデータを管理します。Databusと異なり、Kafkaは信頼できる情報源のストアとして振舞うため、上流のデータソースが再生可能では無い場合でも役に立ちます。

ログの圧縮の基本

各メッセージについてのオフセットを持つKafkaのログの論理的な構造を表す高レベルな図です。

ログの先頭は伝統的なKafkaログと同じです。密度、連続するオフセットを持ち、全てのメッセージを保持します。ログの圧縮はログのtailの処理についての選択肢を追加します。上の図は圧縮されたtailのログを示します。ログの最後にあるメッセージは、それらが最初に書き込まれた時に割り当てられた元のオフセットを保持することに注意してください -- それは変更されません。また全てのオフセットはそのオフセットを含むメッセージが圧縮されていたとしても、ログ内での有効な位置のままです; この場合この位置はログに現れる次に高いオフセットと区別することができません。例えば上の図では、オフセット 36, 37 と 38 は全て等価な位置であり、これらのオフセットのいずれかで始まる読み込みは38で始まるメッセージ セットを返します。

圧縮は削除することもできます。キーと空の荷物を持つメッセージはログからの削除として扱われるでしょう。Such a record is sometimes referred to as a tombstone. このdeleteマーカーにより、そのキーを持つ以前のメッセージは(そのキーを持つ新しいメッセージと同じように)削除されますが、deleteマーカーは領域を解放するために一定時間が経過した後でログから自身が削除されるという点で特別なものです。deleteがもう維持されない時点は、上の図で"delete retention point"としてマークされています。

定期的にログのセグメントを再コピーすることで、バックグラウンドで圧縮が行われます。クリーニングは読み取りをブロックせず、プロデューサとコンシューマに影響を与えることを避けるために設定可能なI/Oスループットの量を超えずに使うように絞ることができます。ログ セグメントの圧縮の実際のプロセスは以下のようなものに見えます:

ログの圧縮は何を保証しますか

ログの圧縮は以下のものを提供します:
  1. ログの先頭に追いついているコンシューマは書き込まれた全てのメッセージを見るでしょう; これらのメッセージは連続するオフセットを持つでしょう。トピックのmin.compaction.lag.ms は圧縮される前に書き込まれるメッセージの後で経過しなければならない最小の長さを保証するために使われます。つまり、各メッセージが(非圧縮の)head内に存在するだろう長さの低い方の境界を提供します。トピックのmax.compaction.lag.ms は、メッセージが書き込まれる時間と、メッセージがコンパクションの対象になる時間の間の最大の遅延を保証するために使うことができます。
  2. メッセージの順番は常に維持されます。圧縮はメッセージの順番を並び変えることはせず、単に幾つかを削除するでしょう。
  3. メッセージについてのオフセットは変わりません。それはログ内の位置についての恒久的な識別子です。
  4. ログの最初から進んでいるコンシューマは書き込まれた順番で全てのレコードの少なくとも最終的な状態を見るでしょう。さらに、コンシューマがトピックのdelete.retention.ms設定(デフォルトは24時間)より少ない時間でログの先頭に到達した場合、削除されたレコードについての全てのdeleteマーカーが表示されます。別の言い方をすると: deleteマーカーの削除は読み込みと同時に起こるため、もし遅れがdelete.retention.msより大きい場合はコンシューマはdeleteマーカーを紛失するかもしれません。

ログの圧縮の詳細

ログの圧縮は、ログクリーナー、ログのセグメントファイルを再コピーするバックグラウンドのスレッドのプール、ログの最初に現れるキーのレコードの削除によって処理されます。各コンパクタースレッドは以下のように動作します:
  1. ログのtailのログの頭の一番高いレートを持つログを選択します
  2. ログの先頭の各キーについての最後のオフセットの簡潔な要約を作成します
  3. ログ内で最近現れたキーを削除しながら、最初から最後までログを再コピーします。新しい、きれいなセグメントはすぐにログに入れ替えられます。つまり必要とされる追加のディスク空間はちょうど1つの追加のログセグメントです (ログの完全なコピーではありません)。
  4. ログの頭の概要は本質的に単なる空間がぎっしりつまったハッシュテーブルです。それはエントリごとに正確に24バイトです。8GBのクリーナーバッファーの結果として、1つのクリナーの繰り返しでログのheadの約366GBをきりえします (1kのメッセージを仮定します)。

ログクリーナーの設定

ログクリーナーはデフォルトで有効です。これはクリーナースレッドのプールを開始するでしょう。特定のトピックのログの掃除を有効にするために、ログ固有のプロパティを追加します。
 log.cleanup.policy=compact
log.cleanup.policy プロパティはブローカーのserver.properties ファイルで定義されるブローカーの構成設定です; ここに記載されているように、設定の上書きが設定されていないクラスタ内の全てのトピックに影響します。ログクリーナーはログの非圧縮の"head"の最小量を維持するように設定することができます。これは圧縮時間のラグを設定することで有効になります。
  log.cleaner.min.compaction.lag.ms
これは最小のメッセージ経過時間より新しいメッセージが圧縮されなければならないことを避けるために使うことができます。設定されない場合は、最後の、つまり現在書き込まれているセグメントを除いて、全てのログのセグメントが圧縮の資格があります。有効なセグメントは、たとえメッセージの全てが最小の圧縮タイムのラグより古くても圧縮されないでしょう。ログ クリーナーは、ログの圧縮がされていない "head" がログ圧縮の対象になるまでの最大遅延を確保するように構成できます。
 log.cleaner.max.compaction.lag.ms
生産率の低いログが無制限の期間にわたって圧縮の対象とならないように、これを使うことができます。設定しない場合、min.cleanable.dirty.ratio を超えないログは圧縮されません。この圧縮期限は、ログ クリーナーのスレッドの可用性と実際の圧縮時間の影響を受けるため、厳密な保証ではないことに注意してください。uncleanable-partitions-count, max-clean-time-secs および max-compaction-delay-secs メトリクスを監視したいでしょう。

クリーナーの更なる設定はここで説明されます。

4.9クォータ

Kafka クラスタはクライアントにいって使われるブローカーのリソースを制御するために、リクエストに割り当てを強制することができます。クォータを共有するクライアントグループごとにKafkaのブローカーによって2種類のクライアント クォータを適用することができます:

  1. ネットワークの帯域の割り当てはバイトレートの閾値を定義します (0.9から)
  2. リクエストのレートの定員はネットワークとI/Oスレッドのパーセンテージとして、CPUの利用率の閾値を定義します (0.11から)

クォータはなぜ必要ですか

プロデューサとコンシューマはとても多くのデータ量を生成/消費、あるいはとても高いレートでリクエストを生成することが可能で、従ってブローカーのリソースを占有し、ネットワークの飽和を起こし、一般的に他のクライアントとブローカー自身のサービス妨害します。クォータを持つことはこれらの問題を防ぎ、少数の悪い挙動をするクライアントが良い挙動をするクライアントのユーザエクスペリエンスを下げるかもしれない大規模なマルチテナントのクラスタにおいてもっと重要です。実際、Kafkaをサービスとして実行する時に、合意に従ってAPIの制限を強制することさえできます。

クライアント グループ

Kafkaクライアントの身分証明はセキュアなクラスタ内での認証されたユーザを表すユーザ プリンシパルです。認証されていないクライアントをサポートするクラスタ内では、ユーザのプリンシパルは設定可能なPrincipalBuilderを使ってブローカーによって選択された未認証のユーザのグループです。クライアントidは、クライアントアプリケーションによって選択された意味がある名前を持つクライアントの論理的なグループです。タプル (user, client-id) は、ユーザ プリンシパルとクライアントidの両方を共有するクライアントの安全なロジカルグループを定義します。

定員は、ユーザあるいはクライアントidグループの、(user, client-id) に適用することができます。指定された接続について、接続に一致するもっとも特定のクォータが適用されます。割り当てグループの全ての接続はグループのために設定された割り当てを共有します。例えば、(user="test-user", client-id="test-client") が10MB/secの生成のクォータを持つ場合、これはクライアント-id "test-client" を持つ ユーザ "test-user" の全てのプロデューサ インスタンスに渡って共有されます。

割り当ての設定

割り当ての設定は (user, client-id)、ユーザとクライアントidのグループについて定義されるかもしれません。高い(あるいは低いでも)割合を必要とするどの割り合いレベルでもデフォルトの割合を上書きすることができます。仕組みはトピック毎のログの設定の上書きに似ています。ユーザと (user, client-id) の割り当てはZooKeeperの /config/users の下に書き込まれ、クライアントidの割り当ての上書きは /config/clientsの下に書き込まれます。これらの上書きは全てのブローカーから読み込まれ、すぐに有効になります。これにより、クラスタ全体のローリング再起動をする必要無しに割り当ての変更ができます。詳細はここを見てください。各グループについてのデフォルトの割り当ては同じ仕組みを使って動的に更新することもできます。

割り当て設定の優先順位は以下の通りです:

  1. /config/users/<user>/clients/<client-id>
  2. /config/users/<user>/clients/<default>
  3. /config/users/<user>
  4. /config/users/<default>/clients/<client-id>
  5. /config/users/<default>/clients/<default>
  6. /config/users/<default>
  7. /config/clients/<client-id>
  8. /config/clients/<default>
ブローカーのプロパティ(quota.producer.default, quota.consumer.default)はクライアントid グループについての帯域のネットワーク割り当てのデフォルトを設定するために使うこともできます。これらのプロパティは非推奨になっていて、後のリリースでは削除されるでしょう。クライアントidのデフォルトの割り当ては他の割り当ての上書きとデフォルトに似てZooKeeper内で設定することができます。

ネットワークの帯域の割り当て

ネットワークの帯域の割り当ては、割り当てを共有するクライアントの各グループについてのバイトレートの閾値として定義されます。デフォルトでは、各ユニークなクライアントグループはクラスタによって定義されたものとして、バイト/秒の一定の割り当てを受け取ります。この割り当てはブローカー単位を基準に定義されます。クライアントの各グループはクライアントが絞られる前にブローカーあたり最大 X バイト/秒を発行/取得することができます。

リクエスト レートの割り当て

リクエスト レートのクォータは、クォータウィンドウ内の各ブローカーのリクエストハンドラ I/Oスレッドとネットワークスレッド上でクライアントが使うことができる時間のパーセンテージとして定義されます。割り当てn% は1つのスレッドのn% を表します。つまり割り当ては ((num.io.threads + num.network.threads) * 100)% の総容量以上です。クライアントの各グループは絞られる前にクォータウィンドウ内の全てのI/Oおよびネットワークスレッドにわたってn% までの合計のパーセンテージを使うことができます。I/Oおよびネットワークスレッドに割り当てられるスレッドの数は一般的にブローカーホスト上で利用可能なコアの数に基づくため、リクエストレートのクォータはクォータを共有するクライアントの各グループによって使うことができるCPUの総パーセンテージを表します。

強制

デフォルトで、各ユニークなクライアントグループはクラスタで設定された固定の割合を受け取ります。この割り当てはブローカー単位を基準に定義されます。各クライアントは絞られるまでにブローカーあたりのこの割り当てまで利用することができます。ブローカーごとにクォータを定義することは、全てのブローカー間でクライアントのクォータの使用を共有する仕組みを必要とするため、クライアントごとに固定のクラスタの帯域幅を持つよりも優れていると判断しました。これはクォータの実装自体よりも難しくなる可能性があります!

ブローカーは割り当ての違反を検出した時に、どのように反応しますか?私たちの解決法では、ブローカーはまず違反しているクライアントをクォータの元に持っていくために必要な遅延量を計算し、遅れた応答をすぐに返します。フェッチ リクエストの場合、応答は何もデータを含まないでしょう。その後、ブローカーは遅延が終了するまで、クライアントからの要求を処理しないようにクライアントのチャネルをミュートします。遅延時間が0ではない応答を受信すると、Kafkaクライアントは遅延中にブローカーへ更にリクエストを送信することを控えるでしょう。従って、絞られたクライアントからの要求は両側から効果的にブロックされます。ブローカーからの応答の遅延を考慮しない古いクライアントの実装であっても、ブローカーがソケットチャンネルをミューとすることによって適用されるバックプレッシャーは依然として動作不良のクライアントのスロットルを処理することができます。絞られたチャンネルに更にリクエストを送信するクライアントは遅延が解消した後でのみ応答を受信するでしょう。

クォータ違反を迅速に検出して訂正するために、複数の小さなウィンドウ(例えば1秒の30個のウィンドウ)に渡ってバイトレートおよびスレッドの使用率が計測されます。一般的に、大きな測定ウィンドウ(例えば30秒の10個のウィンドウ)を持つことは、ユーザにとってはそれほど長くない長期の遅延が続くトラフィックの大きなバーストに繋がります。

TOP
inserted by FC2 system