ドキュメント
Kafka 2.7 Documentation
以前のリリース: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X.6. 操作
Here is some information on actually running Kafka as a production system based on usage and experience at LinkedIn. Please send us any additional tips you know of.6.1基本的なKafkaの操作
この章ではあなたがKafkaクラスタで実施するだろう最も一般的な操作を再検討するでしょう。この章で検討される全てのツールはKafkaの配布物のbin/
ディレクトリの下で利用可能で、各ツールは引数無しで実行した場合に全ての可能なコマンドラインオプションの詳細を出力するでしょう。
トピックの追加と削除
トピックを手動で追加するか、あるいは存在しないトピックに初めてデータが発行された時に自動的に生成されるようにするかの選択肢があります。トピックが自動生成された場合、自動生成されたトピックに使われるデフォルトのトピックの設定を調整したいかもしれません。トピックはトピックツールを使って追加および修正されます:
> bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
--partitions 20 --replication-factor 3 --config x=y
リプリケーションのファクターはどれだけ多くのサーバが書き込まれる各メッセージをリプリケートするかを制御します。リプリケーションファクターが3の場合、データへのアクセスが失われるまでに2つまでのサーバが失敗することができます。データの消費無しにマシーンを透過的に解雇できるように、リプリケーションファクターを2または3にすることをお勧めします。
パーティションのカウントはどれだけ多くのログをトピックが共有するかを制御します。パーティションのカウントの効果には幾つかあります。最初に、各パーティションは完全に1つのサーバ上に納まらなければなりません。つまり、20のパーティションを持つ場合(そして、読み込みと書き込みをする)、全体のデータセットは20以下のサーバで処理されるでしょう(レプリカを数えません)。結果的に、パーティション数はコンシューマの最大並行度に影響を与えます。これは概念の章で詳細に議論されます。
共有された各パーティションログはKafkaログディレクトリの下の独自のフォルダに配置されます。そのようなフォルダの名前はダッシュ (-) とパーティションidが追加されたトピック名から成ります。一般的なフォルダ名は255を超える文字超にはできないため、トピック名の長さには制限があるでしょう。パーティションの数はいつでも100,000を超えないでしょう。従って、トピック名は249文字よりも長くすることはできません。これによりフォルダ名にはダッシュと5桁の可能性のあるパーティションidにちょうど余裕があります。
コマンドライン上で追加された設定は、データが維持しなければならない期間のようなサーバのデフォルト設定を上書きします。トピック毎の設定の完全なセットはここで説明されます。
トピックの修正
同じトピックツールを使っているトピックの設定あるいはパーティショニングを変更する事ができます。パーティションを追加するには
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
--partitions 40
パーティションのユースケースの1つは意味論的にデータをパーティション化することであり、パーティションの追加は既存データのパーティションを変更しないため、もしコンシューマがそのパーティションに依存する場合はコンシューマを乱すかもしれないことに注意してください。つまり、もしデータがhash(key) % number_of_partitions
でパーティション化されている場合、このパーティション化はパーティションの追加によってシャッフルされるかもしれませんが、Kafkaはどうであれ自動的にはデータを再分配しようとしないでしょう。
設定を追加するには:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
設定を削除するには:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
そして、結果的にトピックを削除するには:
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
Kafka は今のところトピックについてのパーティションの数の削減をサポートしていません。
トピックのリプリケーション ファクターについての説明はここで見つけることができます。
Graceful シャットダウン
Kafkaクラスタは自動的にブローkーアのシャットダウンあるいは障害を検知し、そのマシーン上のパーティションの新しいリーダーを選出するでしょう。これは、サーバが失敗するか、メンテナンスあるいは設定の変更のために意図的に落とすかのどちらかで、起こるでしょう。後者の場合、Kafkaはサーバを停止するために単にkillするよりもより優雅な機構をサポートします。サーバがgracefullyに停止した場合、利用可能な2つの最適化があります:- 再起動をした時にログの回復をする必要を避けるためにディスクに全てのログを同期するでしょう (つまり、ログのtailの中で全てのメッセージについてチェックサムの検証をする)。ログの回復には時間がかかるため、これは内部的な再起動を高速化します。
- シャットダウンする前に、そのサーバがリーダーになっている全てのパーティションを他のレプリカに移行するでしょう。リーダーシップの転送を高速化し、各パーティションが利用できない時間を数ミリ秒に最小化するでしょう。
controlled.shutdown.enable=true
制御されたシャットダウンはブローカー上でホストされる全てのパーティションがレプリカを持つ場合のみ成功するでしょう (つまり、レプリケーションのファクターが1以上で少なくともそれらのレプリカの1つが活動中である)。最後のレプリカをシャットダウンするとトピックパーティションが使えなくなるので、これは一般的に望ましいものです。
リーダーシップのバランシング
ブローカーが停止あるいはクラッシュした時はいつでも、ブローカーのパーティションが他のレプリカに転送されます。ブローカが再起動した場合、ブローカはそのすべてのパーティションのフォロワーになるだけで、クライアントの読み取りと書き込みには使われません。この不均衡を避けるために、Kafkaは優先のレプリカの概念を持ちます。パーティションのためのレプリカのリストが1,5,9の場合、ノード1はレプリカのリストの中で先にあるためノード1がノード5あるいは9のどちらよりもリーダーとして優先されます。デフォルトでは、Kafka クラスタは回復されたレプリカにリーダーシップを回復しようとします。この動作は以下のように設定されています:
auto.leader.rebalance.enable=true
これを false に設定することもできますが、その後以下のコマンドを実行して、回復されたレプリカにリーダーシップを手動で回復する必要があります:
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
ラックを超えたレプリカのバランシング
ラックの認識機能は異なるラックを超えた同じパーティションのレプリカを分散します。これはKafkaがラック障害を補うためにブローカーの障害に対して提供する保証を延長し、ラック上の全てのブローカーが一度に障害になった場合のデータの喪失のリスクを制限します。その機能はEC2での可用性領域のような他のブローカーのグルーピングへ適用することもできます。 ブローカーの設定にプロパティを追加することで、ブローカーが特定のラックに所属するように指定することもできます: broker.rack=my-rack-id
トピックが生成, 修正 あるいはレプリカが再分配された時に、レプリカができる限り多くのラックにまたがるようにしながら、ラックの制約が尊重されるでしょう。
レプリカをブローカーに割り当てるために使われるアルゴリズムは、ブローカーがどのようにラックを超えて分散されるかに関係なく、ブローカーあたりのリーダーの数が一定であることを確実にします。これはバランスのスループットを保証します。
しかし、もしラックが異なる数のブローカーに割り当てられる場合、レプリカの割り当ては均等ではないでしょう。より多くのストレージを使用しより多くのレプリケーションにリソースを割り当てることを意味するため、より少ないブローカーを持つラックはより多くのレプリカを取得するでしょう。従って、ラック毎に同じ数のブローカーを設定することは意味があります。
クラスタ間のデータのミラーリング
1つのクラスタ内のノードの間で起こるレプリケーションとの混乱を避けるために、Kafkaクラスタ 間のデータのレプリケートのプロセスを "ミラーリング"とします。KafkaはKafkaクラスタ間でデータをミラーリングするためのツールを同梱します。ツールはソースクラスタから消費し、目的のクラスタへ生成します。この種類のミラーリングについての一般的なユースケースは、他のデータセンター内のレプリカを提供することです。このシナリオは次の章で詳細に議論されるでしょう。スループットを増やし耐障害性のために多くのそのようなミラーリング プロセスを実行することができます (もし1つのプロセスが終了すると、他のプロセスは追加の負荷を引き継ぎます)。
データはソースクラスタ内のトピックから読み込まれ、目的のクラスタ内の同じ名前のトピックに書き込まれるでしょう。実施はミラーメイカーはKafkaコンシューマおよび一緒にフックされるプロデューサよりも少し多くの事をします。
ソースおよび目的のクラスタは完全にエンティティに依存しません: それらは異なる数のパーティションを持ち、オフセットは同じでは無いでしょう。この理由で、(カスタマの位置が異なるだろうため) ミラー クラスタは対障害性の仕組みとして実際には意図されていません; そのためには通常のクラスタ内のレプリケーションの使用をお勧めします。しかし、ミラーメイカープロセスはパーティションについてのメッセージキーを維持し使用します。つまり順番はキー毎ベースで保持されます。
入力クラスターから1つの(my-topicという名前の)トピックをミラーする方法を示す例です:
> bin/kafka-mirror-maker.sh
--consumer.config consumer.properties
--producer.config producer.properties --whitelist my-topic
--whitelist
オプションを使ってトピックのリストを指定することに注意してください。このオプションはJava-style regular expressionsを使う全ての正規表現を許可します。つまり--whitelist 'A|B'
を使ってA と Bという名前の2つのトピックをミラーすることができます。あるいは--whitelist '*'
を使って全てのトピックをミラーすることができます。シェルがファイルパスとして正規表現を展開しないように、全ての正規表現をクォートするようにしてください。便宜上、トピックのリストを指定するために '|' の代わりに ',' の仕様を許可します。ミラーリングと設定 auto.create.topics.enable=true
と組み合わせると、新しいトピックが追加された時でもソースのクラスタ内の全てのデータを自動的に生成しレプリケートするレプリカクラスタを持つことができます。
コンシューマの位置の調査
コンシューマの位置を知るのに便利な時があります。コンシューマグループ内の全てのコンシューマの位置と、それらログの最後からどれだけ遅れているかを知るツールがあります。このツールをmy-topicという名前のトピックを消費しているmy-group という名前のコンシューマグループ上で実行するには、以下のようになるでしょう: > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 2 2 3 1 consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2 /127.0.0.1 consumer-2
コンシューマグループの管理
ConsumerGroupCommand ツールを使って、コンシューマグループをリスト化、表現、削除することができます。コンシューマ グループは手動で削除されるか、グループについての最後にコミットされたオフセットが期限切れになる時に自動的に削除されます。手動の削除はグループが何もアクティブなメンバーを持たない場合のみ動作します。例えば、全てのトピックに渡って全てのコンシューマグループをリスト化するには: > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
test-consumer-group
以前に述べたように、オフセットを見るには、以下のようにコンシューマグループを"describe"します:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
コンシューマグループについてのより詳細な情報を提供するために使うことができる多くの追加の "describe" オプションがあります:
- --members: このオプションはコンシューマグループ内の全てのアクティブなメンバーのリストを提供します。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0
- --members --verbose: 上の "--members" オプションで報告される情報の上に、このオプションは各メンバに割り当てられたパーティションの情報も提供します。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
- --offsets: これはデフォルトの記述オプションで、"--describe" オプションと同じ出力を提供します。
- --state: このオプションは有用なグループレベルの情報を提供します。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 4
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
リクエストされたコンシューマグループ ('my-group', 'my-other-group') の削除が正常に完了しました。
コンシューマ グループのオフセットを再設定するには、"--reset-offsets" オプションを使うことができます。このオプションは一度に1つのコンシューマグループをサポートします。以下のスコープを定義する必要があります: --all-topics あるいは --topic. '--from-file' scenario を使わない場合は、1つのスコープが選択されなければなりません。また、最初にコンシューマのインスタンスがアクティブではないことを確認してください。詳細はKIP-122を見てください。
3つの実行オプションを持ちます:
- (デフォルト) どのオフセットを再設定するかを表示する。
- --execute : --reset-offsets プロセスを実行する。
- --export : 結果をCSV形式にエクスポートする。
--reset-offsets は次のシナリオから選択することもできます (少なくとも1つのシナリオを選択する必要があります):
- --to-datetime <String: datetime> : オフセットをdatetimeからのオフセットに再設定する。Format: 'YYYY-MM-DDTHH:mm:SS.sss'
- --to-earliest : オフセットを最も早いオフセットに再設定する。
- --to-latest : オフセットを最新のオフセットに再設定する。
- --shift-by <Long: number-of-offsets> : 現在のオフセットを 'n' だけ移すオフセットに再設定する。'n' は正または負を指定できます。
- --from-file : オフセットをCSVファイル内で定義された値に再設定する。
- --to-current : オフセットを現在のオフセットに再設定する。
- --by-duration <String: duration> : オフセットを現在のタイムスタンプから持続期間のオフセットに再設定する。Format: 'PnDTnHnMnS'
- --to-offset : オフセットを特定のオフセットに再設定する。
例えば、コンシューマグループのオフセットを最新のオフセットに再設定するには:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
古い高レベルのコンシューマを使っていて、ZooKeeperにグループのメタデータが格納している場合(つまりoffsets.storage=zookeeper
)、--bootstrap-server
の代わりに--zookeeper
を渡します:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
クラスタの拡張
Kafkaクラスタへサーバを追加することは簡単で、単にそれらにユニークなブローカーidを割り当て、新しいサーバ上でKafkaを開始します。しかし、これらの新しいサーバは自動的にデータのパーティションを割り当てられないでしょう。つまり、パーティションがそれらに移動されない場合、新しいトピックが作成されるまでそれらは何も仕事をしないでしょう。そのようにして通常はクラスタにマシーンを追加する時に何らかの既存のデータをこれらのマシーンに移設したいでしょう。データの移設のプロセスは手動で起動されますが、完全に自動化されます。背後では、Kafkaが新しいサーバを移設しているパーティションのフォロワーとして追加し、パーティション内の既存のデータを完全にレプリケートすることができます。新しいサーバがこのパーティションの内容を完全にレプリケートし同期のレプリカが加わった時に、既存のレプリカの1つがそれらのパーティションデータを削除するでしょう。
ブローカーを超えてパーティションを移動するためにパーティション再割り当てツールを使うことができます。理想的なパーティションの分散は、全てのブローカーに渡って均等なデータの負荷とパーティションのサイズを保証するでしょう。パーティションの再割り当てツールは、自動的にKafkaクラスタ内のデータ分散を学習して負荷の均等な分散を行うためにパーティションを移動する機能はありません。そのように、管理者はどのトピックあるいはパーティションが周りに移動されなければならないかを見積もる必要があります。
パーティションの再割り当てツールは3つの相互排他モードで実行することができます:
- --generate: このモードでは、トピックのリストとブローカーのリストを指定し、ツールは指定されたトピックの全てのパーティションを新しいブローカーへ移動するための再割り当ての候補を生成します。このオプションは、トピックと目的のブローカーのリストが指定されたパーティションの再割り当て計画を生成する簡易な方法を提供するだけです。
- --execute: このモードでは、ツールはユーザが提供した再割り当て計画に基づいたパーティションの再割り当て計画を開始します。(--reassignment-json-file オプションを使います)。これは、管理者によって手作りされた独自の再割り当て計画か、あるいは--generateオプションを使って提供されたもののどちらかかもしれません
- --verify: このモードでは、ツールは最後の --execute の間リスト化された全てのパーティションについての再割り当ての状態を検証します。状態は、完全に完了した、失敗した、あるいは実行中のいずれかです
新しいマシーンへのデータの自動的な移設
パーティション再割り当てツールは現在のブローカーのセットから新しく追加されたブローカーへ幾つかのトピックを移動するのに使うことができます。トピック全体を新しいブローカーのセットへ移動するのが簡単なため、これは既存のクラスタを拡張し、1つのパーティションを同時に移動するのに有用です。これを使う時には、ユーザは新しいブローカーのセットに移動されるべきトピックのリストと目的の新しいブローカーのリストを提供する必要があります。ツールは結果的に指定されたトピックのリストについての全てのパーティションを新しいブローカーのセットに渡って分散します。この移動の間、トピックのリプリケーションのファクターは一定に保たれます。入力のトピックのリストについての全てのパーティションのレプリカは、古いブローカーのセットから新しく追加されたブローカーへ効果的に移動されます。例えば、以下の例はトピック foo1, foo2 の全てのパーティションを新しいブローカーのセット 5,6 へ移動するでしょう。この移動の最後に、トピック foo1 と foo2 の全てのパーティションはブローカー 5,6 にのみ存在します。
ツールは入力のトピックのリストをjsonファイルとして受け付けるため、最初に移動したいトピックを識別し以下のようにjsonファイルを作成する必要があります:
> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
jsonファイルの準備ができると、候補の割り当てを生成するためにパーティション再割り当てツールを使います:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
現在のパーティションのレプリカの割り当て
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
指定されたパーティションの再割り当ての設定
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
ツールはトピック foo1, foo2 からブローカー 5,6 へ全てのパーティションを移動する候補の割り当てを生成します。しかし、この時点では、パーティションの移動は開始されておらず、現在の割り当てと提案された新しい割り当てを知らせるだけなことに注意してください。ロールバックしたい場合は現在の割り当てが保存されなければなりません。新しい割り当ては、以下のように --execute オプションを使ってツールの入力になるように、jsonファイルに保存されなければなりません (例えば、expand-cluster-reassignment.json):
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
現在のパーティションのレプリカの割り当て
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
ロールバックの間に --reassignment-json-file オプションとして使うために、これを保存します
パーティションの再割り当ての開始の成功
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
最後に、パーティションの再割り当ての状態を調べるために、ツールと一緒に--verify オプションを使うことができます。--verify オプションと一緒に、同じ expand-cluster-reassignment.json (--execute オプションと一緒に使われた) が使われなければならないことに注意してください:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
パーティションの再割り当ての状態:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
独自のパーティションの割り当てと移設
パーティション再割り当てツールはパーティションのレプリカを選択的に特定のブローカーのセットに移動するために使うこともできます。このように使った場合、--generateステップを効率的にスキップし--executeステップに直接移動しながら、ユーザが再割り当て計画を知っていて、ツールが候補の再割り当てを必要としないことを仮定します。例えば、以下の例はトピック foo1 の パーティション 0 を ブローカー 5,6 へ、トピック foo2 のパーティション 1 を ブローカー 2,3 へ移動します:
最初のステップはjsonファイルに独自の再割り当て計画を手作りすることです:
> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
そして --execute オプションにjsonファイルを使って、プロセスを再割り当てし始めます:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
現在のパーティションのレプリカの割り当て
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
ロールバックの間に --reassignment-json-file オプションとして使うために、これを保存します
パーティションの再割り当ての開始の成功
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
パーティションの再割り当ての状態を調べるために、ツールに --verify オプションを使うことができます。同じ custom-reassignment.json (--execute オプションで使われる) を --verify オプションで使う必要があることに注意してください:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
パーティションの再割り当ての状態:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
ブローカーの縮退
パーティション再割り当てツールはブローカーを退役するために再割り当て計画を自動的に生成する機能をまだ持っていません。そのため、管理者はブローカー上でホストされる全てのパーティションのためのレプリカを退役するために移動するための再割り当て計画をブローカーの残りに対して提供する必要があります。再割り当ては、全てのレプリカが廃止されたブローカーから他の1つだけのブローカーに移動されないようにする必要があるため、比較的面倒な作業かもしれません。このプロセスを容易くするために、退役するブローカーのためのツールのサポートを将来追加する予定です。リプリケーション要素の増加
既存のパーティションのリプリケーション ファクターを増加することは容易です。独自の再割り当てjsonファイル内の余分なレプリカを単に指定し、指定されたパーティションのレプリケーション ファクタを増やすために --execute オプションと一緒に使います。例えば、以下の例はトピックfooのパーティション0のリプリケーション ファクターを1から3に増やします。リプリケーション ファクターを増やす前は、ブローカー5上にはパーティションのレプリカだけが存在します。レプリケーション ファクターの増加の一部として、ブローカー6と7にレプリカを追加するでしょう。
最初のステップはjsonファイルに独自の再割り当て計画を手作りすることです:
> cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
そして --execute オプションにjsonファイルを使って、プロセスを再割り当てし始めます:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
現在のパーティションのレプリカの割り当て
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
ロールバックの間に --reassignment-json-file オプションとして使うために、これを保存します
パーティションの再割り当ての開始の成功
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
パーティションの再割り当ての状態を調べるために、ツールに --verify オプションを使うことができます。--verify オプションと一緒に、同じ increase-replication-factor.json (--execute オプションと一緒に使われた) が使われなければならないことに注意してください:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
パーティションの再割り当ての状態:
Reassignment of partition [foo,0] completed successfully
kafka-topicツールを使ってレプリケーション ファクターの増加を検証することもできます:
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
データの移設の間の帯域の使い方の制限
マシーンからマシーンへレプリカを移動するために使われる帯域に上限を設定して、Kafkaはレプリケーションのトラフィックに抑圧を適用します。これは、クラスタをリバランス、新しいブローカーをブートストラップ、ブローカーを削除する時に、それらのデータに集中的な操作がユーザに与える影響を制限するため、便利です。 絞りを保証するために使うことができる2つのインタフェースがあります。最も単純で最も安全なのはkafka-reassign-partitions.shを起動する時に絞りを適用することですが、kafka-config.shは直接絞りの値を表示および変更するために使うこともできます。 つまり、例えば前のコマンドを使ってリバランスを実行した場合、50MB/s未満でパーティションを移動するでしょう。$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000このスクリプトを実行すると、絞りが行われるのが分かるでしょう。
絞りの限界は 50000000 B/s に設定されていました
パーティションの再割り当てが無事に開始されました
リバランス中にスロットルを変更したい場合、スループットを上げてより速く完了するようにしたい場合は、同じreassignment-json-fileを渡すexecuteコマンドを再実行することでこれを行うことができます:
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000 既存の実行中の再割り当てがあります 絞りの制限は 700000000 B/s に設定されていました
リバランスがいったん完了すると、管理者は --verify オプションを使ってリバランスの状態を調べることができます。リバランスが完了すると、絞りは --verifyコマンドによって削除されるでしょう。--verify オプションと共にコマンドを実行することでリバランスが完了すると、管理者がしかるべき方法で絞りを削除することが重要です。それに失敗すると通常のレプリケーションのトラフィックが絞られるかもしれません。
--verify オプションが実行され、再割り当てが完了すると、スクリプトは絞りが削除されたことを確認するでしょう:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file bigger-cluster.json
パーティションの再割り当ての状態:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.
管理者はkafka-configs.shを使って割り当てられた設定を検証することもできます。絞りのプロセスを管理するために使われる絞りの設定の2つのペアがあります。最初のペアは絞り値自体を参照します。これは動的なプロパティを使ってブローカーレベルで設定されます:
leader.replication.throttled.rate
follower.replication.throttled.rate
次に、絞られたレプリカの列挙されたセットの構成ペアがあります:
leader.replication.throttled.replicas
follower.replication.throttled.replicas
これらはトピックごとに設定されます。
4つ全ての設定値はkafka-reassign-partitions.shによって自動的に割り当てられます (以下で議論されます)。
絞りの制限設定を見るには:
> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
これはレプリケーションプロトコルのリーダーおよびフォロワーの両方に適用される絞りを示します。デフォルトでは両方の側は同じ絞られたスループットの値を割り当てられます。
絞られたレプリカのリストを見るには:
> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
ここで、リーダーの絞りがブローカー102上のパーティション1 および ブローカー101上のパーティション0に適用されることを見ます。同様にフォロワーの絞りはぶろかー101上のパーティション1およびブローカー102上のパーティション0に適用されます。
デフォルトでは、kafka-reassign-partitions.sh はリーダーの絞りをリバランス前に存在する全てのレプリカに適用します。それらのうちの1つはリーダーかもしれません。それはフォロワーの絞りを全ての移動の宛先に適用します。つまり、ブローカー102, 103 に再割り当てされるブローカー101,102上のレプリカを持つパーティションがある場合、そのパーティションのリーダーの絞りは101,102に適用され、フォロワーの絞りは103だけに適用されるでしょう。
必要であれば、絞りの設定を手動で変更するためにkafka-configs.shの--alterスイッチを使うこともできます。-
絞られたレプリケーションの安全な使い方
絞られたレプリケーションを使う時は幾らかの注意が必要です。実際には:
(1) 絞りの削除:
一旦再割り当てが完了すると、しかるべき方法で絞りを削除する必要があります (kafka-reassign-partitions.sh --verify を実行することで)。(2) 進捗の保証:
入ってくる書き込みレートに比較して絞りがあまりに低く設定された場合、レプリケーションが進捗しないことがありえます。これは以下の時に起こります:
max(BytesInPerSec) > throttle
BytesInPerSec はプロデューサの各ブローカーへの書き込みスループットを監視するメトリックです。
管理者は、以下のメトリックを使って、リバランスの間にレプリケーションが進捗しているかどうかを監視することができます:
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
レプリケーションの間に遅延は定常的に減らなければなりません。メトリックが減らない場合、管理者は上で説明したように絞りのスループットを増やさなければなりません。
クォータの設定
クォータは、ここで説明されるように、(user, client-id)、ユーザあるいはクライアントidレベルで設定されるデフォルトを上書きします。デフォルトでは、クライアントは無制限のクォータを受け取ります。各 (user, client-id)、ユーザあるいはクライアントidグループ について、独自のクォータを設定することができます。(user=user1, client-id=clientA) についての独自のクォータを設定する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
エンティティ: user-principal 'user1', client-id 'clientA' について更新された設定。
user=user1 についての独自のクォータを設定する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
エンティティ: user-principal 'user1' について更新された設定。
client-id=clientA について独自のクォータを設定する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
エンティティ: client-id 'clientA' について更新された設定。
--entity-nameの代わりに--entity-defaultオプションを指定することで、各 (user, client-id)、ユーザあるいはクライアントidグループについてのデフォルトのクォータを設定することができます。
user=userAについてのデフォルトのclient-id クォータを設定する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
エンティティ: user-principal 'user1'、デフォルト client-id について更新された設定。
ユーザについてのデフォルトのクォータを設定する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
エンティティ: デフォルト user-principal についての設定を更新する
client-idについてのデフォルトのクォータを設定する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
エンティティ: デフォルト client-id についての設定を更新する
指定された (user, client-id) についてのクォータを説明する方法:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
指定された user についてのクォータを説明する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
指定された client-id についてのクォータを説明する:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
エンティティ名が指定されない場合、指定された型の全てのエンティティが説明されます。例えば、全てのユーザを説明するには:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
同様に、(user, client) については:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
これらの設定をブローカーに設定することで、全てのクライアントidに適用されるデフォルトのクォータを設定することができます。これらのプロパティはクォータが上書きするか、デフォルトがZooKeeper内で設定されていない場合のみ適用されます。デフォルトでは、各クライアントidは無制限のクォータを受け取ります。以下はプロデューサとコンシューマ毎のデフォルトのクォータを 10MB/sec に設定します。
quota.producer.default=10485760
quota.consumer.default=10485760
これらのプロパティは非推奨で、将来のリリースでは削除されるかもしれないことに注意してください。kafka-configs.shを使って設定されるデフォルトは、これらのプロパティよりも優先されます。
6.2データセンタ
幾つかの配備は複数のデータセンタをまたぐデータパイプラインを管理する必要があるでしょう。これのお勧めの方法は、各データセンター内でローカルクラスタ内でのみやり取りをしクラスタ間でミラーリングをするアプリケーション インスタンスを各データセンタ内でローカルKafkaクラスタを配備しすることです (これを行う方法についてはミラー メイカー ツールのドキュメントを見てください)。この配備のパターンによりデータセンタは依存しないエンティティとして振舞うことができ、中央集権的にデータセンター内を管理および調整することができます。これにより、データセンター間が利用不可であっても各ファシリティはスタンドアローンで操作することができます: これが起きた時、リンクが追いつく時点まで回復するまでミラーリングは遅れます。
全てのデータのグローバルビューを必要とするアプリケーションのために、全てのデータセンター内のローカルクラスタからミラーされた集約データを持つクラスタを提供するためにミラーリングを使うことができます。これらの集約クラスタは完全なデータセットを必要とするアプリケーションによって読み込まれるために使われます。
これは配備のパターンにだけ可能なわけではありません。WANを超えてリモートのKafkaクラスタから読み込みあるいは書き込みすることが可能ですが、これはクラスタを取得するために必要な何らかのレイテンシを追加するでしょう。
Kafkaは本質的にプロデューサおよびコンシューマの両方の中にデータを揃えます。つまり高レイテンシの接続上であっても高スループットを達成することができます。しかしこれを可能にするには、socket.send.buffer.bytes
と socket.receive.buffer.bytes
の設定を使って、プロデューサ、コンシューマおよびブローカーのTCPソケットバッファサイズを増やす必要があるかもしれません。これを設定する適切な方法はここで提供されます。
高レンテンシのリンク上で複数のデータセンタに掛かる単独のKafkaクラスタを実行することは、一般的に望ましいものではありません。これはKafkaの書き込みおよびZookeeperの書き込みの両方に高レプリケーション レイテンシを招き、もし場所間のネットワークが利用できない場合にKafkaあるいはZooKeeperのどちらも全ての場所で利用できないままになるでしょう。
6.3Kafka 設定
重量なクライアント設定
最も重要なプロデューサの設定は以下の通りです:- acks
- 圧縮
- バッチ サイズ
全ての設定は設定の章の中で提供されます。
プロダクションのサーバ設定
以下はプロダクションのサーバ設定の例です: # ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]
# ログの設定
num.partitions=8
default.replication.factor=3
log.dir=[ディレクトリのリスト。Kafka は独自の専用ディスク(s)あるいはSSD(s)を持つべきです]
# 他の設定
broker.id=[整数。0から開始し、新しいブローカー毎に1追加。]
listeners=[リスナーのリスト]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[同時実行リクエストの数]
私たちのクライアントの構成はユースケースごとにかなり異なります。
6.4Java バージョン
Java 8 と Java 11 がサポートされます。Java 11 は、TLS が有効な場合にパフォーマンスが大幅に向上するため、Java 11 を強くお勧めします (他にも多くのパフォーマンス向上が含まれます: G1GC、 CRC32C、Compact Strings、Thread-Local Handshakes など)。自由に利用可能な古いバージョンはセキュリティ脆弱性が暴露されたため、セキュリティの観点から、最新のリリースされたパッチバージョンを使うことをお勧めします。OpenJDK ベースの Java実装 (Oracle JDK を含む)で Kakfa を実行するための一般的な引数は以下の通りです: -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent
参考までに、上記の Java 引数を使う LinkedIn の最も忙しいクラスタの1つ(ピーク時)の統計は以下の通りです:
- 60 ブローカ
- 50k パーティション (リプリケーション要素 2)
- 800k メッセージ/秒 in
- 300 MB/sec inbound, 1 GB/sec+ outbound
6.5ハードウェアとOS
24GBのメモリを持つデュアル クアッド-コア Intel Xeon マシーンを使っています。アクティブ リーダーとライターをバッファするために十分なメモリが必要です。30秒間のバッファができるようにしたいと仮定することで、必要メモリの簡単な計算の推測と、必要なメモリをwrite_throughtput * 3 として計算できます。
ディスクのスループットは重要です。8x7200 rpm SATA ドライブがあります。一般的に、ディスクのスループットはパフォーマンスのボトルネックで、ディスクが多いほうが良いです。フラッシュの挙動をどのように設定するかに依存して、より高価なディスクから恩恵を受けるか受けないかもしれません (しばしばフラッシュを強制する場合は、高い RPM SAS ドライブがより良いかもしれません)。
OS
Kafkaはどのようなunixシステム上でも良く動作する必要があり、LinuxおよびSolaris上でテストされています。Windows上での実行の2,3の問題を見つけました。Windowsは現在のところ良くサポートされたプラットフォームではありませんが、それを変えることができれば嬉しいです。
多くのOSレベルの調整を必要とすることはないでしょうが、3つの潜在的な重要なOSレベルの設定があります:
- ファイル ディスクリプタの制限: Kafkaはセグメントの記録と接続を開くためにファイルディスクリプタを使います。ブローカーが多くのパーティションをホストする場合は、ブローカーが作成する接続の数に加えて、全てのログセグメントを追跡するためにブローカーは少なくとも (number_of_partitions)*(partition_size/segment_size) を必要とすることを考慮してください。開始時点としてブローカーの処理のために少なくとも 100000 のファイルディスクリプタを許可することをお勧めします。注意: mmap() 関数は、ファイル記述子 fildes に関連付けられたファイルへの追加の参照を追加します。これはファイル記述子に対する後続の close() では削除されません。この参照はファイルへのマッピングがもう無くなった場合に削除されます。
- 最大ソケット バッファ サイズ: ここで説明されたようにデータセンタ間で高パフォーマンスのデータ転送を有効にするために増やされるかもしれません。
- プロセスが持つメモリマップ領域への最大数 (aka vm.max_map_count)。Linux カーネルのドキュメントを見て下さい。ブローカーが持つことができるパーティションの最大数を考慮する時に、このOSレベルのプロパティに注意する必要があります。デフォルトでは、多くのLinuxシステムでは、vm.max_map_count の値は 65535 前後です。パーティションごとに割り当てられた各ログのセグメントは、インデックス/タイムインデックス ファイルnオペアを必要とし、これらの各ファイル は1つのマップ領域を消費します。つまり、各ログ セグメントは2つのマップ領域を使います。従って、各パーティションは1つのログセグメントをホストする限り、最低2つのマップ領域を必要とします。つまり、ブローカーに 50000 個のパーティションを作成すると、100000 個のマップ領域が割り当てられ、デフォルトのvm.max_map_countのシステムでは OutOfMemoryError (Map failed) が発生してブローカーをクラッシュする可能性があります。パーティションごとのログセグメントの数は、セグメントサイズ、負荷の強度、リテンション ポリシーによって異なり、一般的には1つ以上になる傾向があることに注意してください。
ディスクとファイルシステム
良いスループットを得るために複数のドライブを使い、良いレンテンシを保証するためにKafkaデータに使われるのと同じドライブをアプリケーションログあるいは他のOSファイルシステムの動作と共有しないことをお勧めします。これらのドライブを1つのボリュームにRAIDするか、独自のディレクトリとして各ドライブをフォーマットおよびmountすることができます。Kafkaはリプリケーションを持つため、RAIDによって提供される冗長性はアプリケーションレベルでも提供することができます。この選択には幾つかのトレードオフがあります。複数のデータディレクトリを設定する場合、パティションはデータディレクトリにラウンドロビンで割り当てられるでしょう。各パーティションは完全にデータディレクトリのうちの1つの中でしょう。もしデータがパーティション間で良くバランスされない場合、これはディスク間で負荷の不均衡につながるかもしれません。
RAIDは負荷を低レベルでバランスするため、RAIDは潜在的にディスク間の負荷のバランスを良くします(しかし常にそうではないようにみえます)。RAIDの主な不利な点は、通常書き込みのスループットに大きな余分に掛かる負荷があり、利用可能なディスクスペースを減らします。
RAIDの他の潜在的な恩恵はディスク障害への耐性です。しかし私たちの経験ではRAIDアレイの再構築はとてもI/Oが強烈で事実上サーバを使えなくします。つまりこれは現実的な可用性の改善を提供しません。
アプリケーション 対 OSフラッシュ管理
Kafkaは常にすぐに全てのデータをファイルシステムに書き込み、いつデータがフラッシュを使ってOSキャッシュからディスクに強制的に追い出すかを制御できるフラッシュポリシーを設定する機能をサポートします。このフラッシュポリシーは一定時間の後、あるいはある数のメッセージが書き込まれた後で強制的にデータをディスクに追い出すことを制御することができます。この設定には幾つかの選択があります。Kafkaはデータがフラッシュされたことを知るために結果的にfsyncを呼びます。クラッシュから回復する時に、fsyncしたかどうかを知らないログセグメントのためにKafkaはCRCをチェックすることで各メッセージの完全性を調べ、開始時に実行される回復プロセスの一部として付属するオフセットインデックスファイルも再構築するでしょう。
故障したノードは常にそのレプリカから回復されるだろうため、Kafkaでの耐久性はディスクへの同期を必要としないことに注意してください。
完全なアプリケーションの同期を無効にするデフォルトのフラッシュの設定を使うことをお勧めします。このことはOSによって行われるバックグラウンドのフラッシュとKafkaの独自のバックグランドのフラッシュを頼りにすることを意味します。これはほとんどの使い方にとって最善を提供します: 調整ツマミが無く、スループットとレイテンシが大きく、完全な回復保証。レプリケーションによって提供される保証がローカルディスクへの同期よりも強いと一般的に思いますが、偏執症な人はまだ両方を持つことを好むかも知れず、アプリケーションレベルのfsyncポリシーはまだサポートされます。
アプリケーションレベルのフラッシュ設定を使う不利な点は、ディスクの使用パターンで非効率なことです(それにより書き込みを再び並べるのにOSに行動の自由がない)。バックグランドのフラッシュはページレベルのブロックであるのに対し、ほとんどのLinuxファイルシステムでのfsyncはファイルへの書き込みをブロックするため、レイテンシを導入するかもしれません。
一般的にファイルシステムの低レベルの調整をする必要はありませんが、以下の幾つかの章ではそれが有用な場合に備えてこれの幾つか調べます。
Linux OSのフラッシュの挙動の理解
Linuxではファイルシステムへのデータの書き込みは(アプリケーションレベルのfsyncあるいはOS独自のフラッシュポリシーによって)ディスクに書き込まれなければならなくなるまで、ページキャッシュ内に維持されます。データのフラッシュはpdflush(あるいは以前の2.6.32カーネルでは"flusher threads")と呼ばれる一組のバックグランド スレッドによって行われます。Pdflush はどれだけ多くのdirtyデータがキャッシュ内に維持され、ディスクに書き戻されなければならなくなるまでどれだけ長く維持されるかを制御する設定可能なポリシーを持ちます。このポリシーはここで説明されます。書き込まれているデータの速度に Pdflush が追い付かない場合、最終的にデータの蓄積を遅くするために書き込みプロセスの書き込みの遅延を起こします。
以下を行うことでOSメモリの使用の現在の状態を見ることができます
> cat /proc/meminfoこれらの値の意味は上のリンクで説明されます。
ディスクに書き込まれるデータの格納について、pagecacheの使用はプロセス内のキャッシュよりいくつかの利点があります:
- I/O スケジューラは連続する小さな書き込みをスループットを改善するより大きな物理書き込みに詰め込むでしょう。
- I/O スケジューラはスループットを改善するディスクの移動の最小化のために書き込みを再順序化しようとするでしょう。
- それは自動的にマシーン上のフリーなメモリの全てを使います
ファイルシステムの選択
Kafkaはディスク上の通常ファイルを使い、それ自体では特定のファイルシステムへの厳しい依存はありません。しかし最もよく使われる2つのファイルシステムは、EXT4 と XFS です。歴史的にはEXT4の使用が増えていますが、XFSファイルシステムの最近の改良により、安定性が損なわれることなくKafkaの作業のパフォーマンス特性が向上しています。.
様々なファイルシステムの作成とmountオプションを使って、意味のあるメッセージの負荷を持つクラスタ上で比較テストが行われました。監視されるKafkaの主要なメトリックは "Request Local Time" で、追加操作が行われた時間を示します。XFS はより良いローカル時間 (最善のEXT4設定について 160ms 対 250ms+) と、より低い平均待ち時間の結果になりました。XFSのパフォーマンスもディスクパフォーマンスにおいて変わりにくいことを示しました。
一般的なファイルシステムの注意
Linuxシステム上で、データディレクトリに使われる全てのファイルシステムについて、以下のオプションがmount時に使われることをお勧めします:- noatime: このオプションは、ファイルが読み込まれる時にファイルのatime(最後のアクセス時間)属性の更新を無効にします。これは特にコンシューマが起動する場合のファイルシステムの書き込み数を取り除くことができます。Kafkaはatime属性を全く頼りにしないため、これを無効にしても安全です。
XFSの注意
XFS ファイルシステムは適切な多くの自動調整を持つため、ファイルシステムの作成時あるいはmount時のどちらかでデフォルトの設定に変更をする必要はありません。考慮する価値のある唯一の調整パラメータは以下の通りです:- largeio: これはstat呼び出しによって報告される優先I/Oサイズに影響します。これは大きなディスクの書き込みでよりパフォーマンスが高くなりますが、実際にはパフォーマンスには最小限あるいは全く効果がありません。
- nobarrier: バッテリーで支援されるキャッシュを持つ背後にあるデバイスのためで、このオプションは定期的な書き込みフラッシュを無効にすることでほんの少しのパフォーマンスを提供することができます。しかし、もし背後にあるデバイスが良く振舞った場合、それはファイルシステムにフラッシュが必要無いと報告します。このオプションは影響が無いでしょう。
EXT4の注意
EXT4はKafkaデータディレクトリとしてファイルシステムの実用的な選択ですが、それで最善のパフォーマンスを得るには幾つかのmountオプションの調整を必要とするでしょう。更に、これらのオプションは一般的に障害時のシナリオで安全では無く、より多くのデータロスと衝突に終わるでしょう。1つのブローカーの障害については、ディスクは拭い取られレプリカがクラスタから再構築されるため、心配することはほとんどありません。停電のような複数の障害のシナリオにおいて、これは簡単には回復できない背後にあるファイルシステム(従ってデータ)の衝突を意味するかもしれません。以下のオプションが調節可能です:- data=writeback: Ext4のデフォルトは data=ordered で、いくつかの書き込みにおいて順番に強きを置きます。Kafkaは全てのフラッシュされていないログにとても偏執的なデータ回復を行うため、この順番は必要としません。この設定は順番の制約を削除し、レイテンシを極めて減らすように見えます。
- ジャーナリングの無効: ジャーナリングはトレードオフです: サービスのクラッシュの後で再起動を高速化しますが、書き込みパフォーマンスへの変化をもたらす多くの追加のロックを導入します。再起動時間を気にせず、書き込みレイテンシのスパイクの主要な原因を減らしたい人は、ジャーナリングを完全に止めることができます。
- commit=num_secs: これはext4がメタデータジャーナルにコミットする頻度を調整します。これを少ない値に設定すると、クラッシュ中のフラッシュされていないデータの紛失を減らします。これを大きな値に設定するとスループットを改善するでしょう。
- nobh: この設定は data=writeback モードを使う時に追加の順番の保証を制御します。書き込みの順番に依存しないため、Kafkaではこれは安全に違いなく、スループットとレイテンシを改善します。
- delalloc: Delayed allocation はファイルシステムが物理的な書き込みが発生するまでブロックの割り当てを避けることを意味します。これによりext4は小さなページの代わりに大きな範囲を割り当てることができ、データが連続して書き込まれることを保証します。この機能はスループットにとって素晴らしいです。ファイルシステム内でほんの少しのレイテンシの変化を追加するなんらかのロッキングを必要とするようです。
6.6監視
Kafkaはサーバ内のメトリクスレポートのためにYammer Metricsを使います。Java クライアントはKafkaメトリクス、クライアントアプリケーションにpullされる推移的依存を最小化する組み込みのメトリクスレジストリを使います。両方ともメトリクスをJMXを使って公開し、監視システムにホックアップするためにプラグ可能なstatsレポーターを使ってstatsをレポートするように設定することができます。
全てのKafkaのレート メトリックにはサフィクス-total
が付いた対応する累積カウント メトリックがあります。例えば、records-consumed-rate
はrecords-consumed-total
という名前の対応するメトリックを持ちます。
利用可能なメトリクスを見る最も簡単な方法はjconsoleを立ち上げ、それを実行中のkafkaクライアントあるいはサーバに向けることです; これにより全てのメトリクスをJMXを使ってブラウズすることができるでしょう。
JMXを使ったリモート監視のためのセキュリティの考慮
Apache KafkaはデフォルトでリモートのJMXを無効にします。リモート JMXをプログラム的に有効にするために、CLIまたは標準Javaシステムのプロパティを使って開始されたプロセスの環境変数JMX_PORT
を設定して、リモート監視を有効にすることができます。プロダクション シナリオでリモートJMXを有効にする場合、未認証のユーザがブローカーあるいはアプリケーション、およびこれらが実行されているプラットフォームを監視または制御できないように、セキュリティを有効にする必要があります。KafkaではデフォルトでJMXの認証が無効になっていて、CLIを使って開始されたプロセスの環境変数 KAFKA_JMX_OPTS
を設定するか、適切なJavaシステムプロパティを設定することで、プロダクションの配備のためにセキュリティ設定を上書きしなければならないことに注意してください。JMXの保護の詳細についてはJMX技術を使った監視と管理を見てください。
以下のメトリクスをグラフおよびアラートします:
説明 | Mbean 名 | 通常値 |
---|---|---|
メッセージのレート | kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec | |
クライアントからのバイトのレート | kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec | |
他のブローカーからのバイトのレート | kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec | |
リクエストのレート | kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower} | |
エラー レート | kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+) | リクエスト型、エラーコード毎にカウントされた応答の中のエラーの数。応答が複数のエラーを含む場合は、全てがカウントされます。error=NONE indicates successful responses. |
リクエスト サイズのバイト | kafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+) | 各リクエストの型についてのリクエストのサイズ。 |
一時的なメモリサイズのバイト | kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce|Fetch} | メッセージ形式の変換と解凍のために使われる一時メモリ。 |
メッセージの変換時間 | kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch} | メッセージ形式の変換に費やされる時間のミリ秒。 |
メッセージの変換レート | kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+) | メッセージ形式の変換に必要なレコードの数。 |
Request Queue Size | kafka.network:type=RequestChannel,name=RequestQueueSize | Size of the request queue. |
クライアントへのバイトのレート | kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec | |
他のブローカーへのバイトのレート | kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec | |
圧縮されたトピックに指定されたキーが無いことによる、メッセージの検証の失敗レート | kafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec | |
無効なマジック番号によるメッセージ検証失敗率 | kafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec | |
不正な crc チェックサムによるメッセージ検証失敗率 | kafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec | |
バッチ内の非連続なオフセットまたはシーケンス番号によるメッセージ検証失敗率 | kafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec | |
ログのフラッシュレートと時間 | kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs | |
# of under replicated partitions (the number of non-reassigning replicas - the number of ISR > 0) | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | 0 |
# of under minIsr partitions (|ISR| < min.insync.replicas) | kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount | 0 |
# of at minIsr partitions (|ISR| = min.insync.replicas) | kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount | 0 |
# of offline log directories | kafka.log:type=LogManager,name=OfflineLogDirectoryCount | 0 |
ブローカー上でコントローラがアクティブかどうか | kafka.controller:type=KafkaController,name=ActiveControllerCount | クラスタ内の唯一のブローカーが1つを持つ筈です |
リーダーの選出レート | kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs | ブローカーの障害の時には非0 |
消去されていないリーダーの選出レート | kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec | 0 |
保留中のトピックの削除 | kafka.controller:type=KafkaController,name=TopicsToDeleteCount | |
保留中のレプリカの削除 | kafka.controller:type=KafkaController,name=ReplicasToDeleteCount | |
対象外の保留中のトピックの削除 | kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount | |
対象外の保留中のレプリカの削除 | kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount | |
パーティションの数 | kafka.server:type=ReplicaManager,name=PartitionCount | ほとんどはブローカーを横断して偶数 |
リーダーのレプリカの数 | kafka.server:type=ReplicaManager,name=LeaderCount | ほとんどはブローカーを横断して偶数 |
ISR のシュリンク レート | kafka.server:type=ReplicaManager,name=IsrShrinksPerSec | もしブローカーがダウンする場合、パーティションのISRはシュリンクするでしょう。ブローカーが再び上がる時、一旦レプリカが完全に追いつくとISRは一度拡大するでしょう。それと違う場合、ISRシュリンクレートと拡張レートの両方の期待値は0です。 |
ISR 拡大レート | kafka.server:type=ReplicaManager,name=IsrExpandsPerSec | 上を見てください |
フォロワーとリーダーレプリカの間でのメッセージの最大の遅れ | kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica | 遅れは生成リクエストの最大バッチサイズに比例するべきです。 |
フォロワーレプリカごとのメッセージの遅れ | kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+) | 遅れは生成リクエストの最大バッチサイズに比例するべきです。 |
Requests waiting in the producer purgatory | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce | ack=-1 が使われる場合非0 |
Requests waiting in the fetch purgatory | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch | サイズはコンシューマ内のfetch.wait.max.msに依存します |
リクエストの総時間 | kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower} | キュー、ローカル、リモートおよび応答の送信時間に分割されます |
リクエストがリクエストキューの中で待つ時間 | kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
リクエストがリーダーで処理される時間 | kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
リクエストがフォロワーを待つ時間 | kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower} | ack=-1の時はプロデューサのリクエストについては非0 |
リクエストが応答キューの中で待つ時間 | kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
応答を送信する時間 | kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
コンシューマがプロデューサに遅れるメッセージの数。ブローカーでは無く、コンシューマによって発表されます。 | kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max | |
ネットワーク プロセッサが仕事をしていない平均断片時間 | kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent | 0と1の間。理想的には > 0.3 |
クライアントが再認証を行わず、再認証以外の目的で有効期限が切れた接続を使用したために、プロセッサで切断された接続の数。 | kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count | 再認証が有効な場合は利用的には0です。この(リスナー、プロセッサ)組み合わせに接続している2.2.0以前のクライアントが存在しないことを意味します。 |
クライアントが再認証を行わず、再認証以外の目的で有効期限が切れた接続を使用したために、全てのプロセッサにわたって切断された接続の合計数。 | kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount | 再認証が有効な場合は利用的には0です。このブローカーに接続している2.2.0以前のクライアントが存在しないことを意味します。 |
リクエスト ハンドラのスレッドが仕事をしていない平均断片時間 | kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent | 0と1の間。理想的には > 0.3 |
ユーザあるいはクライアントidごと(user, client-id)の帯域割り当てメトリクス | kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+) | 2つの属性。throttle-time はクライアントが絞られていた総時間をミリ秒で示します。Ideally = 0. byte-rate はクライアントのデータの生成/消費レートを bytes/sec で示します。(user, client-id) 割り当て量については、ユーザおよびクライアントidの両方が指定されます。もしclient-id毎の割り当て量がクライアントに適用される場合は、ユーザは指定されません。もしuser毎の割り当てが適用される場合、クライアントidは指定されません。 |
ユーザあるいはクライアントidごと (user, client-id)のリクエストの割り当て量のメトリクス | kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+) | 2つの属性。throttle-time はクライアントが絞られていた総時間をミリ秒で示します。Ideally = 0. request-time はクライアントグループからリクエストを処理するためにブローカー ネットワークとI/Oスレッド内で費やされた時間のパーセンテージを示します。(user, client-id) 割り当て量については、ユーザおよびクライアントidの両方が指定されます。もしclient-id毎の割り当て量がクライアントに適用される場合は、ユーザは指定されません。もしuser毎の割り当てが適用される場合、クライアントidは指定されません。 |
絞りを免れたリクエスト | kafka.server:type=Request | exempt-throttle-time は絞りを免れたリクエストを処理するためにブローカーネットワークとI/Oスレッドで費やされた時間のパーセンテージを示します。 |
ZooKeeperのクライアント リクエストのレイテンシ | kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs | ブローカーからのZooKeeperのリクエストについてのレイテンシのミリ秒。 |
ZooKeeperの接続ステータス | kafka.server:type=SessionExpireListener,name=SessionState | ブローカーのZooKeeperセッションの接続ステータスで、Disconnected|SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|Expired のうちの1つが可能です。 |
グループメタデータをロードする最大時間 | kafka.server:type=group-coordinator-metrics,name=partition-load-time-max | 過去30秒間に読み込まれたコンシューマオフセットパーティションからオフセットとグループメタデータを読み込むのにかかった最大時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む) |
グループメタデータをロードする平均時間 | kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg | 過去30秒間に読み込まれたコンシューマオフセットパーティションからオフセットとグループメタデータを読み込むのにかかった平均時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む) |
トランザクションメタデータをロードする最大時間 | kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max | 過去30秒間に読み込まれたコンシューマオフセットパーティションからトランザクションメタデータを読み込むのにかかった最大時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む) |
トランザクションメタデータをロードする平均時間 | kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg | 過去30秒間に読み込まれたコンシューマオフセットパーティションからトランザクションメタデータを読み込むのにかかった平均時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む) |
コンシューマグループオフセット数 | kafka.server:type=GroupMetadataManager,name=NumOffsets | コンシューマグループのコミットされたオフセットの総数 |
コンシューマグループ数 | kafka.server:type=GroupMetadataManager,name=NumGroups | コンシューマグループの総数 |
状態ごとのコンシューマグループ数 | kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead] | 各状態のコンシューマグループ数: PreparingRebalance, CompletingRebalance, Empty, Stable, Dead |
パーティションの再割り当て数 | kafka.server:type=ReplicaManager,name=ReassigningPartitions | ブローカー上のリーダーパーティションの再割り当て数。 |
再割り当てトラフィックの発信バイトレート | kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec | |
再割り当てトラフィックの受信バイトレート | kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec |
プロデューサ/コンシューマ/コネクタ/ストリームのための一般的な監視メトリクス
以下のメトリクスがプロデューサ/コンシューマ/コネクタ/ストリーム インスタンス上で利用可能です。特定のメトリクスについては、以下の章を見てください。メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
connection-close-rate | ウィンドウ内での秒間あたりの閉じられた接続 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
connection-close-total | ウィンドウ内で閉じられた総接続。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
connection-creation-rate | ウィンドウ内で秒間あたりに確立された新しい接続 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
connection-creation-total | ウィンドウ内で確立された新しい総接続。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
network-io-rate | 全ての接続の秒間あたりのネットワーク オペレーション(読み込みあるいは書き込み)の数の平均。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
network-io-total | 全ての接続の秒間あたりのネットワーク オペレーション(読み込みあるいは書き込み)の総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
外に行くバイトのレート | 全てのサーバへの送信される外に行くバイトの秒間あたりの平均数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
outgoing-byte-total | 全てのサーバに送信された外に行くバイトの総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
リクエスト レート | 秒間あたり送信されるリクエストの平均数 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
request-total | 送信されたリクエストの総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
リクエストのサイズの平均 | ウィンドウ内の全てのリクエストの平均サイズ。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
リクエストサイズの最大 | ウィンドウ内に送信されるリクエストの最大サイズ。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
やってくるバイトのレート | 全てのソケットの読み込みのバイト/秒。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
incoming-byte-total | 全てのソケットの読み込みの総バイト。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
応答レート | 秒間あたり受け取った応答。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
response-total | 受信した応答の総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
select-rate | I/O層が新しいI/Oを調べる秒間あたりの回数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
select-total | I/O層が新しいI/Oを調べる総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-wait-time-ns-avg | I/Oスレッドが読み込みあるいは書き込みの準備ができたソケットを待つために費やした平均時間のナノ秒数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-wait-ratio | I/Oスレッドが待つのに費やした断片時間。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-time-ns-avg | select呼び出しごとのI/Oの平均時間のナノ秒数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-ratio | I/OスレッドがI/Oを行うために費やした断片時間。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
connection-count | 現在のアクティブな接続の数 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
successful-authentication-rate | SASLあるいはSSLを使って認証が成功した秒間あたりの接続数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
successful-authentication-total | SASLあるいはSSLを使って認証が成功した接続の総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
failed-authentication-rate | 認証が失敗した秒間あたりの接続数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
failed-authentication-total | 認証が失敗した接続の総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
successful-reauthentication-rate | SASLあるいはSSLを使って再認証が成功した秒間あたりの接続数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
successful-reauthentication-total | SASLあるいはSSLを使って再認証が成功した接続の総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
reauthentication-latency-max | 再認証による最大レイテンシのミリ秒。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
reauthentication-latency-avg | 再認証による平均レイテンシのミリ秒。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
failed-reauthentication-rate | 再認証に失敗した秒間あたりの接続数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
failed-reauthentication-total | 再認証に失敗した接続の総数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
successful-authentication-no-reauth-total | 再認証をサポートしない2.2.0より前のSASLクライアントを使って認証が成功した接続の総数。May only be non-zero | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
プロデューサ/コンシューマ/コネクタ/ストリームのためのブローカーごとの一般的なメトリクス
以下のメトリクスがプロデューサ/コンシューマ/コネクタ/ストリーム インスタンス上で利用可能です。特定のメトリクスについては、以下の章を見てください。メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
外に行くバイトのレート | ノードについての外に送信されるバイトの秒間あたりの平均数。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
outgoing-byte-total | ノードに送信された外に行くバイトの総数。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
リクエスト レート | ノードについての秒間あたり送信されるリクエストの平均数 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
request-total | ノードに送信されたリクエストの総数。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
リクエストのサイズの平均 | ノードについてのウィンドウ内の全てのリクエストの平均サイズ。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
リクエストサイズの最大 | ノードについてのウィンドウ内に送信されるリクエストの最大サイズ。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
やってくるバイトのレート | ノードについての秒間あたり受信されるバイトの平均数。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
incoming-byte-total | ノードについて受信されたバイトの総数。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
request-latency-avg | ノードについての平均リクエストレイテンシのms | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
request-latency-max | ノードについての最大リクエストレイテンシのms | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
応答レート | ノードについて秒間あたり受け取った応答。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
response-total | ノードについて受信した応答の総数。 | kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
プロデューサの監視
以下のメトリクスがプロデューサのインスタンス上で利用可能です。メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
waiting-threads | バッファメモリがレコードをキューに入れるのをブロックして待っているユーザスレッドの数。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
buffer-total-bytes | クライアントが利用することができるバッファメモリの最大量 (現在それが使われているかどうか)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
buffer-available-bytes | 使用されていないバッファメモリの総量 (割り当てられていないかフリーのリストにあるかどちらか)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
bufferpool-wait-time | appenderが空間の割り当てを待つ時間の割合。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
プロデューサの送信のメトリクス
kafka.producer:type=producer-metrics,client-id="{client-id}" | ||
属性名 | 説明 | |
---|---|---|
batch-size-avg | リクエスト毎のパーティション毎に送信されるバイトの平均数。 | |
batch-size-max | リクエスト毎のパーティション毎に送信されるバイトの最大数。 | |
batch-split-rate | 秒間あたりの平均バッチ分割数 | |
batch-split-total | バッチの総分割数 | |
compression-rate-avg | The average compression rate of record batches, defined as the average ratio of the compressed batch size over the uncompressed size. | |
metadata-age | 使用されている現在のプロデューサのメタデータの経過時間の秒。 | |
produce-throttle-time-avg | ブローカーによって絞られたリクエストの平均ms | |
produce-throttle-time-max | ブローカーによって絞られたリクエストの最大ms | |
record-error-rate | エラーになったレコード送信数の秒間あたりの平均 | |
record-error-total | エラーになったレコード送信数の総数 | |
record-queue-time-avg | レコードバッチがsendバッファ内で費やした平均時間。 | |
record-queue-time-max | レコードバッチがsendバッファ内で費やした最大時間のms | |
record-retry-rate | 再送信されたレコード数の秒間あたりの平均 | |
record-retry-total | 再送信されたレコードの総数 | |
record-send-rate | 秒間あたり送信されたレコードの平均数。 | |
record-send-total | 送信されたレコードの総数。 | |
record-size-avg | 平均レコードサイズ | |
record-size-max | 最大レコードサイズ | |
records-per-request-avg | リクエスト毎のレコードの平均数。 | |
request-latency-avg | 平均リクエストレイテンシのms | |
request-latency-max | 最大リクエストレイテンシのms | |
requests-in-flight | 応答を待っている送信中のリクエストの現在の数。 | |
kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}" | ||
属性名 | 説明 | |
byte-rate | トピックについての送信されるバイトの秒間あたりの平均数。 | |
byte-total | トピックについての送信された総バイト数 | |
compression-rate | The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size over the uncompressed size. | |
record-error-rate | トピックについてのエラーになったレコード送信数の秒間あたりの平均 | |
record-error-total | トピックについてのエラーになったレコード送信数の総数 | |
record-retry-rate | トピックについての再送信されたレコード数の秒間あたりの平均 | |
record-retry-total | トピックについての再送信されたレコードの総数 | |
record-send-rate | トピックについての送信されるレコードの秒間あたりの平均数。 | |
record-send-total | トピックについての送信された総レコード数 |
コンシューマの監視
以下のメトリクスはコンシューマのインスタンス上で利用可能です。メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
time-between-poll-avg | poll() の呼び出し間の平均遅延。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
time-between-poll-max | poll() の呼び出し間の最大遅延。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
last-poll-seconds-ago | 最後の poll() 呼び出しからの秒数。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
poll-idle-ratio-avg | ユーザコードがレコードを処理するのを待つのではなく、コンシューマの poll() がアイドル状態である時間の平均割合。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
コンシューマグループのメトリクス
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
commit-latency-avg | コミット リクエストにかかる平均時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
commit-latency-max | コミット リクエストにかかる最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
commit-rate | 秒間あたりのコミット呼び出しの数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
commit-total | コミット呼び出しの総数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
assigned-partitions | 現在のところこのコンシューマに割り当てられたパーティションの数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
heartbeat-response-time-max | ハートビート リクエストへの応答を受信するのに掛かった最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
heartbeat-rate | 秒間あたりのハートビートの平均数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
heartbeat-total | ハートビートの総数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
join-time-avg | グループのrejoinにかかる平均時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
join-time-max | グループのrejoinにかかる最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
join-rate | 秒間あたりのグループjoinの数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
join-total | グループjoinの総数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
sync-time-avg | グループのsyncにかかる平均時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
sync-time-max | グループのsyncにかかる最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
sync-rate | 秒間あたりのグループ同期の数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
sync-total | グループsyncの総数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
rebalance-latency-avg | グループのリバランスにかかる平均時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
rebalance-latency-max | グループのリバランスにかかる最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
rebalance-latency-total | これまでにグループのリバランスにかかった合計時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
rebalance-total | 参加したグループのリバランスの総数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
rebalance-rate-per-hour | 1時間当たりのグループリバランスの総数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
failed-rebalance-total | 失敗したリバランス合計数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
failed-rebalance-rate-per-hour | 1時間当たりの失敗したグループリバランスイベントの数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
last-rebalance-seconds-ago | 最後のリバランスイベントからの秒数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
last-heartbeat-seconds-ago | 最後のコントローラーのハートビートからの秒数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-revoked-latency-avg | パーティションで取り残されたリバランスリスナーコールバックにかかった平均時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-revoked-latency-max | パーティションで取り残されたリバランスリスナーコールバックにかかった最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-assigned-latency-avg | パーティションに割り当てられたリバランスリスナーコールバックにかかった平均時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-assigned-latency-max | パーティションに割り当てられたリバランスリスナーコールバックにかかった最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-lost-latency-avg | パーティションに割り当てられたリバランスリスナーコールバックにかかった平均時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-lost-latency-max | パーティションで失われたリバランスリスナーコールバックにかかった最大時間 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
コンシューマのフェッチ メトリクス
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}" | ||
属性名 | 説明 | |
---|---|---|
bytes-consumed-rate | 秒間あたりに消費されるバイトの平均数 | |
bytes-consumed-total | 消費されるバイトの総数 | |
fetch-latency-avg | フェッチリクエストにかかる平均時間 | |
fetch-latency-max | フェッチリクエストに掛かる最大時間。 | |
fetch-rate | 秒間あたりのフェッチリクエストの数。 | |
fetch-size-avg | リクエスト毎にフェッチされる平均バイト数 | |
fetch-size-max | 秒間あたりにフェッチされる最大バイト数 | |
fetch-throttle-time-avg | 平均スロットル時間のms | |
fetch-throttle-time-max | 最大スロットル時間のms | |
fetch-total | フェッチリクエストの総数 | |
records-consumed-rate | 秒間あたりに消費される平均レコード数 | |
records-consumed-total | 消費されるレコードの総数 | |
records-lag-max | このウィンドウ内の任意のパーティションについてレコード数という点での最大の遅延 | |
records-lead-min | このウィンドウ内の任意のパーティションについてレコード数という点での最小のリード数 | |
records-per-request-avg | 各リクエスト内の平均レコード数 | |
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}" | ||
属性名 | 説明 | |
bytes-consumed-rate | トピックについての秒間あたりに消費されるバイトの平均数。 | |
bytes-consumed-total | トピックについての消費された総バイト数 | |
fetch-size-avg | トピックについてのリクエスト毎にフェッチされる平均バイト数 | |
fetch-size-max | トピックについてのリクエスト毎にフェッチされる最大バイト数 | |
records-consumed-rate | トピックについての秒間あたりに消費されるレコードの平均数。 | |
records-consumed-total | トピックについての消費された総レコード数 | |
records-per-request-avg | トピックについての各リクエスト内の平均レコード数 | |
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}" | ||
属性名 | 説明 | |
preferred-read-replica | パーティションの現在のリードレプリカ、あるいはリーダから読み取る場合は -1 | |
records-lag | パーティションの最新の遅延 | |
records-lag-avg | パーティションの平均遅延 | |
records-lag-max | パーティションの最大遅延 | |
records-lead | パーティションの最新のリード | |
records-lead-avg | パーティションの平均のリード | |
records-lead-min | パーティションの最小のリード |
接続の監視
コネクトのワーカープロセスは全てのプロデューサとコンシューマのメトリクスと、コネクトに固有のメトリクスを含みます。ワーカープロセス自身は多くのメトリクスを持ちますが、各コネクタとタスクは追加のメトリクスを持ちます。[2020-12-04 10:03:05,630] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [2020-12-04 10:03:05,632] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)kafka.connect:type=connect-worker-metrics | ||
属性名 | 説明 | |
---|---|---|
connector-count | このワーカー内で実行されるコネクタの数。 | |
connector-startup-attempts-total | このワーカーが試行したコネクタの開始の総数。 | |
connector-startup-failure-percentage | このワーカーのコネクタの開始が失敗した平均パーセンテージ。 | |
connector-startup-failure-total | コネクタの開始が失敗した総数。 | |
connector-startup-success-percentage | このワーカーのコネクタの開始が成功した平均パーセンテージ。 | |
connector-startup-success-total | コネクタの開始が成功した総数。 | |
task-count | このワーカー内で実行されるタスクの数。 | |
task-startup-attempts-total | このワーカーが試行したタスクの開始の総数。 | |
task-startup-failure-percentage | このワーカーのタスクの開始が失敗した平均パーセンテージ。 | |
task-startup-failure-total | タスクの開始が失敗した総数。 | |
task-startup-success-percentage | このワーカーのタスクの開始が成功した平均パーセンテージ。 | |
task-startup-success-total | タスクの開始が成功した総数。 | |
kafka.connect:type=connect-worker-metrics,connector="{connector}" | ||
属性名 | 説明 | |
connector-destroyed-task-count | ワーカー上のコネクタの破棄されたタスクの数。 | |
connector-failed-task-count | ワーカー上のコネクタの失敗したタスクの数。 | |
connector-paused-task-count | ワーカー上のコネクタの一時停止されたタスクの数。 | |
connector-running-task-count | ワーカー上のコネクタの実行中のタスクの数。 | |
connector-total-task-count | ワーカー上のコネクタのタスクの数。 | |
connector-unassigned-task-count | ワーカー上のコネクタの割り当てられていないタスクの数。 | |
kafka.connect:type=connect-worker-rebalance-metrics | ||
属性名 | 説明 | |
completed-rebalances-total | このワーカーによって完了したリバランスの総数。 | |
connect-protocol | このクラスタで使われるコネクトプロトコル | |
epoch | このワーカーのepochあるいは世代数。 | |
leader-name | グループリーダーの名前。 | |
rebalance-avg-time-ms | このワーカーによってリバランスするのにかかった平均時間のミリ秒数 | |
rebalance-max-time-ms | このワーカーによってリバランスするのに費やされた最大時間のミリ秒数。 | |
rebalancing | このワーカーが現在リバランスしているかどうか。 | |
time-since-last-rebalance-ms | このワーカーが最も最近のリバランスを完了してからのミリ秒数。 | |
kafka.connect:type=connector-metrics,connector="{connector}" | ||
属性名 | 説明 | |
connector-class | コネクタ クラスの名前 | |
connector-type | コネクタの型'source' あるいは 'sink' の1つ。 | |
connector-version | コネクタによって報告されたコネクタクラスのバージョン。 | |
状態 | コネクタの状態。'unassigned', 'running', 'paused', 'failed' または 'destroyed' のうちの1つ。 | |
kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}" | ||
属性名 | 説明 | |
batch-size-avg | コネクタによって処理されたバッチの平均サイズ。 | |
batch-size-max | コネクタによって処理されたバッチの最大サイズ。 | |
offset-commit-avg-time-ms | このタスクによってオフセットをコミットするのに掛かった平均時間のミリ秒数。 | |
offset-commit-failure-percentage | このタスクのオフセットのコミットの試行が失敗した平均パーセンテージ。 | |
offset-commit-max-time-ms | このタスクによってオフセットをコミットするのにかかった最大時間のミリ秒数。 | |
offset-commit-success-percentage | このタスクのオフセットのコミットの試行が成功した平均パーセンテージ。 | |
pause-ratio | このタスクが休止状態で費やした断片時間。 | |
running-ratio | このタスクが実行中の状態で費やした断片時間。 | |
状態 | コネクタのタスクの状態。'unassigned', 'running', 'paused', 'failed' または 'destroyed' のうちの1つ。 | |
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}" | ||
属性名 | 説明 | |
offset-commit-completion-rate | 完了が成功したオフセット コミットの完了の秒間あたりの平均数。 | |
offset-commit-completion-total | 完了が成功したオフセット コミットの完了の総数。 | |
offset-commit-seq-no | オフセット コミットのための現在のシーケンス番号。 | |
offset-commit-skip-rate | あまりに遅くスキップ/無視されたオフセット コミットの完了の秒間あたりの平均数。 | |
offset-commit-skip-total | あまりに遅くスキップ/無視されたオフセット コミットの完了の総数。 | |
partition-count | このワーカー内で名前付きのシンクコネクタへ所属するこのタスクに割り当てられるトピックパーティションの数。 | |
put-batch-avg-time-ms | このタスクがシンクのレコードのバッチを格納するのに掛かる平均時間。 | |
put-batch-max-time-ms | このタスクがシンクのレコードのバッチを格納するのに掛かる最大時間。 | |
sink-record-active-count | Kafkaから読み込まれたが、まだシンクタスクによって完全には コミット/フラッシュ/通知 されていないレコードの数。 | |
sink-record-active-count-avg | Kafkaから読み込まれたが、まだシンクタスクによって完全には コミット/フラッシュ/通知 されていないレコードの平均数。 | |
sink-record-active-count-max | Kafkaから読み込まれたが、まだシンクタスクによって完全には コミット/フラッシュ/通知 されていないレコードの最大数。 | |
sink-record-lag-max | シンクのタスクがどのトピックのパーティションに対してもコンシューマの位置よりも後ろにあるレコードの数という点での最大の遅延。 | |
sink-record-read-rate | このワーカー内の名前付きのシンク コネクタに所属するこのタスクについてKafkaから読み込まれるレコードの数の秒あたりの平均数。これは変換前が適用されます。 | |
sink-record-read-total | タスクが最後に再起動されてから、このワーカー内の名前付きのシンク コネクタに所属するこのタスクによってKafkaから読み込まれるレコードの総数。 | |
sink-record-send-rate | 変換から出力され、このワーカー内の名前付きのシンク コネクタに所属するこのタスクに送信/配置されたレコードの秒間あたりの平均数。これは変換後が適用され、変換によって除外された全てのレコードが除外されます。 | |
sink-record-send-total | 変換から出力され、タスクが最後に再起動されてから、このワーカー内の名前付きシンク コネクタに所属するこのタスクに送信/配置されたレコードの総数。 | |
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}" | ||
属性名 | 説明 | |
poll-batch-avg-time-ms | このタスクがソースのレコードのバッチをポーリングするのに掛かる平均ミリ秒。 | |
poll-batch-max-time-ms | このタスクがソースのレコードのバッチをポーリングするのに掛かる最大ミリ秒。 | |
source-record-active-count | このタスクによって生成されたがまだKafkaへ完全には書き込まれていないレコードの数。 | |
source-record-active-count-avg | このタスクによって生成されたがまだKafkaへ完全には書き込まれていないレコードの平均数。 | |
source-record-active-count-max | このタスクによって生成されたがまだKafkaへ完全には書き込まれていないレコードの最大数。 | |
source-record-poll-rate | このワーカー内の名前付きのソース コネクタに所属するこのタスクによって(変換前に)生成/ポーリングされるレコードの数の秒あたりの平均数。 | |
source-record-poll-total | このワーカー内の名前付きのソース コネクタに所属するこのタスクによって(変換前に)生成/ポーリングされるレコードの数の総数。 | |
source-record-write-rate | 変換から出力され、このワーカー内の名前付きのシンク コネクタに所属するこのタスクについてKafkaに書き込まれたレコードの秒間あたりの平均数。これは変換後が適用され、変換によって除外された全てのレコードが除外されます。 | |
source-record-write-total | 変換から出力され、タスクが最後に再起動されてから、このワーカー内の名前付きソース コネクタに所属するこのタスクについてKafkaに書き込まれたレコードの数。 | |
kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}" | ||
属性名 | 説明 | |
deadletterqueue-produce-failures | デッド レター キューへの書き込みに失敗した数。 | |
deadletterqueue-produce-requests | デッド レター キューへの書き込み試行回数。 | |
last-error-timestamp | このタスクが最後にエラーが発生した時のエポック タイムスタンプ。 | |
total-errors-logged | 記録されたエラーの数。 | |
total-record-errors | このタスクのレコード処理エラーの数。 | |
total-record-failures | このタスクのレコード処理失敗の数。 | |
total-records-skipped | エラーによってスキップされたレコードの数。 | |
total-retries | 再試行されたオペレーションの数。 |
ストリームの監視
Kafka ストリームインスタンスは、全てのプロデューサとコンシューマのメトリクスと、ストリームに固有の追加のメトリクスを含みます。デフォルトで、Kafkaストリームは2つの記録レベルを持つメトリクスを持ちます:debug
と info
。
メトリクスは4層構造を持つことに注意してください。最上位には、開始された Kafka ストリームクライアントごとにクライアントレベルのメトリックスがあります。各クライアントには、独自のメトリックスを持つストリームスレッドがあります。各ストリームスレッドには、独自のメトリックスを持つタスクがあります。各タスクは独自のマトリックスメトリックスを持つプロセッサーノードを持ちます。各タスクも独自のマトリックスを持つ多くの状態ストアとレコードキャッシュを持ちます。
どのメトリックスを収集したいかを指定するために以下の設定オプションを使います:metrics.recording.level="info"
クライアントメトリックス
以下の全てのメトリックスの記録レベルは、info
です:
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
バージョン | Kafka ストリームクライアントのバージョン。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
commit-id | Kafka ストリームクライアントのバージョン管理コミット ID。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
application-id | Kafka ストリームクライアントのアプリケーション ID。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
topology-description | Kafka ストリームクライアントで実行されるトポロジの説明。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
state | Kafka ストリームクライアントの状態。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
スレッドのメトリクス
以下の全てのメトリックスの記録レベルは、info
です:
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
commit-latency-avg | このスレッドの全ての実行中のタスクに渡る、コミットのための平均実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
commit-latency-max | このスレッドの全ての実行中のタスクに渡る、コミットのための最大実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
poll-latency-avg | コンシューマポーリングの平均実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
poll-latency-max | コンシューマポーリングの最大実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
process-latency-avg | 処理の平均実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
process-latency-max | 処理の最大実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-latency-avg | 中断の平均実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-latency-max | 中断の最大実行時間のミリ秒。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
commit-rate | 秒間あたりのコミットの平均数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
commit-total | コミット呼び出しの総数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
poll-rate | 秒間あたりのコンシューマ呼び出しの平均数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
poll-total | コンシューマポーリングの呼び出しの総数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
process-rate | 秒間あたりに処理されたレコードの平均数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
process-total | 処理されたレコードの総数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-rate | 秒間あたりの中断呼び出しの平均数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-total | 中断呼び出しの総数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
task-created-rate | 秒間あたりに作成されたタスクの平均数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
task-created-total | 作成されたタスクの総数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
task-closed-rate | 秒間あたりに閉じられるタスクの平均数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
task-closed-total | 閉じられたタスクの総数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
タスクのメトリクス
以下のメトリックスは、記録レべルがinfo
の dropped-records-rate and dropped-records-total を除いて、全て記録レベルが debug
です。:
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
process-latency-avg | 処理のための平均実行時間のナノ秒。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
process-latency-max | 処理のための最大実行時間のナノ秒。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
process-rate | このタスクの全てのソースプロセッサノード全体で1秒あたりに処理されたレコードの平均数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
process-total | このタスクの全てのソースプロセッサノードで処理されたレコードの総数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
commit-latency-avg | コミットの平均実行時間のナノ秒。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
commit-latency-max | コミットの最大実行時間のナノ秒。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
commit-rate | 秒間あたりのコミット呼び出しの平均数 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
commit-total | コミット呼び出しの総数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
record-lateness-avg | レコードの観測された遅延の平均 (ストリーム時間 - レコードのタイムスタンプ)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
record-lateness-max | レコードの観測された遅延の最大 (ストリーム時間 - レコードのタイムスタンプ)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
enforced-processing-rate | 秒間あたりの強制処理の平均数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
enforced-processing-total | 強制処理の総数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
dropped-records-rate | このタスク内でドロップされたレコード数の平均 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
dropped-records-total | このタスク内でドロップされたレコードの総数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
プロセッサーノードのメトリクス
以下のメトリックスは、特定のタイプのノードでのみ利用可能です。つまり、プロセスレートとプロセス合計はソースプロセッサノードでのみ利用可能で、suppression-emit-rate と suppression-emit-total は要請操作ノードでのみ利用可能です。全てのメトリックスの記録レベルはdebug
です:
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
process-rate | 秒間あたりにソースプロセッサノードによって処理されたレコードの平均数。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
process-total | 秒間あたりにソースプロセッサノードによって処理されたレコードの総数。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
suppression-emit-rate | 抑制操作ノードからダウンストリームに送信されたレコードの割合。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
suppression-emit-total | 抑制操作ノードからダウンストリームに送信されたレコードの総数。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
応対ストアのメトリクス
以下の全てのメトリクスはdebug
の記録レベルを持ちます。store-scope
の値はユーザの独自の状態ストアのStoreSupplier#metricsScope()
で指定されることに注意してください; 組み込みの状態ストアについては、現在のところ以下のものを持ちます:
in-memory-state
in-memory-lru-state
in-memory-window-state
in-memory-suppression
(抑制バッファ用)rocksdb-state
(RocksDBで支援される key-value ストア)rocksdb-window-state
(RocksDBで支援される window ストア)rocksdb-session-state
(RocksDBで支援される session ストア)
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
put-latency-avg | 平均配置実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-latency-max | 最大配置実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-if-absent-latency-avg | 存在しない時に配置する平均実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-if-absent-latency-max | 存在しない時に配置する最大実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
get-latency-avg | 平均取得実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
get-latency-max | 最大取得実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
delete-latency-avg | 平均削除実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
delete-latency-max | 最大削除実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-all-latency-avg | put-all 実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-all-latency-max | 最大 put-all 実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
all-latency-avg | 全てのオペレーションの平均実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
all-latency-max | 全てのオペレーションの最大実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
range-latency-avg | 平均range実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
range-latency-max | 最大range実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
flush-latency-avg | 平均flush実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
flush-latency-max | 最大flush実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
restore-latency-avg | 平均回復時刻時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
restore-latency-max | 最大回復実行時間のns。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-rate | このストアについての平均putレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-if-absent-rate | このストアについての平均put-if-absentレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
get-rate | このストアについての平均getレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
delete-rate | このストアについての平均deleteレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-all-rate | このストアについての平均put-allレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
all-rate | このストアについての全てのオペレーションの平均レート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
range-rate | このストアについての平均rangeレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
flush-rate | このストアについての平均flushレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
restore-rate | このストアについての平均restoreレート。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
suppression-buffer-size-avg | サンプリング ウィンドウでバッファされたデータの平均総サイズのバイト。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
suppression-buffer-size-max | サンプリング ウィンドウでバッファされたデータの最大総サイズのバイト。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
suppression-buffer-count-avg | サンプリング ウィンドウでバッファされたレコードの平均数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
suppression-buffer-count-max | サンプリング ウィンドウでバッファされたレコードの最大数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
RocksDB メトリックス
以下の全てのメトリクスはdebug
の記録レベルを持ちます。メトリックスは、RocksDB 状態ストアから毎分収集されます。時間及びセッションウィンドウでの集計の場合のように、状態ストアが複数の RocksDB インスタンスで構成される場合、各メトリックスは状態ストアの RocksDB インスタンスの集計を報告します。組み込みの RocksDB 状態ストアの store-scope
は、現在のところ以下の通りです:
rocksdb-state
(RocksDBで支援される key-value ストア)rocksdb-window-state
(RocksDBで支援される window ストア)rocksdb-session-state
(RocksDBで支援される session ストア)
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
bytes-written-rate | RocksDB 状態ストアに秒間あたりに書き込まれた平均バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
bytes-written-total | RocksDB 状態ストアに秒間あたりに書き込まれた総バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
bytes-read-rate | RocksDB 状態ストアから秒間あたり読み込まれた平均バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
bytes-read-total | RocksDB 状態ストアに秒間あたり読み込まれた総バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable-bytes-flushed-rate | memtable から ディスクに秒間あたりフラッシュされた平均バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable-bytes-flushed-total | memtable から ディスクに秒間あたりフラッシュされた総バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable-hit-ratio | memtable への全てのルックアップに対する memtable ヒットの比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
block-cache-data-hit-ratio | ブロックキャッシュへのデータブロックの全てのルックアップに対する、データブロックのブロックキャッシュヒットの比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
block-cache-index-hit-ratio | ブロックキャッシュへのインデックスブロックの全てのルックアップに対する、インデックスブロックのブロックキャッシュヒットの比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
block-cache-filter-hit-ratio | ブロックキャッシュへのフィルタブロックの全てのルックアップに対する、フィルタブロックのブロックキャッシュヒットの比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
write-stall-duration-avg | 書き込みストールの平均持続時間のミリ秒。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
write-stall-duration-total | 書き込みストールの総持続時間のミリ秒。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
bytes-read-compaction-rate | 圧縮中の秒間あたりの読み込み平均バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
bytes-written-compaction-rate | 圧縮中の秒間あたりの書き込み平均バイト数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
number-open-files | 現在開かれているファイル数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
number-file-errors-total | ファイルエラーの総発生数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
レコード キャッシュのメトリクス
以下の全てのメトリクスはdebug
の記録レベルを持ちます:
メトリック/属性 名 | 説明 | Mbean 名 |
---|---|---|
hit-ratio-avg | キャッシュ読み取り要求の総数に対するキャッシュ読み取りヒットの比率として定義される平均キャッシュヒット率。 | kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+) |
hit-ratio-min | 最小キャッシュ ヒット レシオ。 | kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+) |
hit-ratio-max | 最大キャッシュ ヒット レシオ。 | kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+) |
その他
GC時間と他のstats、そしてCPU 利用率、I/O サービス 時間などのような様々なサーバのstatsを監視することをお勧めします。クライアント側ではメッセージ/バイト レート(グローバルとトピック毎)、リクエスト レート/サイズ/時間、そしてコンシューマー側では全てのパーティションに渡るメッセージ内の最大遅延と最小取得リクエストレートを監視することをお勧めします。コンシューマが遅れずについていくには、最大の遅延が閾値より少ない必要があり、最小の取得レートが0より大きい必要があります。6.7 ZooKeeper
安定バージョン
現在の安定ブランチは、3.5 です。Kafka は、3.5 シリーズの最新のリリースを含むように定期的に更新されます。ZooKeeperを操作可能にする
操作上は、健全なZooKeeperのインストレーションのために以下を行います:- 物理/ハードウェア/ネットワークレイアウトの冗長性: それらをすべて同じラックに入れないようにし、ハードウェアをまとも(気が狂わない程度)にし、冗長な電源とネットワークパスなどを維持するようにします。一般的なZooKeeperの集合は5または7のサーバを持ちます。これはそれぞれ2および3のサーバのダウンに耐性があります。もし小さな配備を持つ場合は、3つのサーバの使用が許容されますが、この場合1つのサーバのダウンのみに耐性があるだろうことを忘れないでください。
- I/O 細分化: 書き込み型のトラフィックを多くする場合は、ほとんど間違いなく専用のディスクグループにトランザクションログが欲しいでしょう。トランザクションログへの書き込みは非同期(しかしパフォーマンスのためにバッチ化される)で、結果的に同時に起こる書き込みがパフォーマンスに極めて影響するかもしれません。ZooKeeperのスナップショットは同時に起こる書き込みのソースのようなものかもしれません。理想的にはトランザクションログとは別にディスクグループに書き込まれるべきです。スナップショットは非同期でディスクに書き込まれます。つまり、オペレーティングシステムとメッセージログファイルと一緒に共有することは一般的に大丈夫です。dataLogDirパラメータを持つ別個のディスクグループを持つようにサーバを設定することができます。
- アプリケーションの分離: 同じbox上にインストールしたい他のアプリケーションのパターンを本当に理解しない限り、ZooKeeperを分離して実行することは良い考えかもしれません。(しかしこれはハードウェアの機能を使ったバランス行為かもしれません)。
- 仮想化には注意してください: クラスタ レイアウト、読み込み/書き込み パターンおよびSLAに応じて動作しますが、非常に時間に依存する可能性があるため、仮想化層によって導入された僅かなオーバーヘッドによってZookeeperが追加および破棄されるかもしれません。 time sensitive
- ZooKeeperの設定: javaですので '十分な' ヒープ空間を与えるようにしてください (通常それらを3-5Gで実行しますが、それはほとんど持っているデータセットのサイズです)。残念ながらそれの良い計算式を持っていませんが、ZooKeeperの状態をより考慮することはスナップショットが大きくなるかもしれないことを意味し、大きなスナップショットは回復時間に影響します。実際、もしスナップショットがあまりに大きく(2,3ギガバイト)なると、サーバが回復し集合に加わるのに十分な時間を与えるためにinitLimit パラメータを増やす必要があるかもしれません。
- 監視: JMX と 4文字の単語 (4lw) のコマンドはとても便利で、それらは時にはオーバーラップします(そしてそれらの場合私たちは4文字のコマンドを好みます。それらはより予想可能、あるいはそれらはLI監視インフラストラクチャーと良く連携することは確かです)
- クラスタを建て過ぎないでください: 大きなクラスタ、特に書き込みが激しい使用パターンは多くのクラスタ内の通信(書き込みの定員とその後のクラスタメンバーの更新)を意味しますが、中途半端に建てないでください(リスクがクラスタを圧倒します)。サーバを増やすと、読み取りの容量が増えます。