<

ドキュメント

Kafka 2.7 Documentation

以前のリリース: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X.

6. 操作

Here is some information on actually running Kafka as a production system based on usage and experience at LinkedIn. Please send us any additional tips you know of.

6.1基本的なKafkaの操作

この章ではあなたがKafkaクラスタで実施するだろう最も一般的な操作を再検討するでしょう。この章で検討される全てのツールはKafkaの配布物のbin/ディレクトリの下で利用可能で、各ツールは引数無しで実行した場合に全ての可能なコマンドラインオプションの詳細を出力するでしょう。

トピックの追加と削除

トピックを手動で追加するか、あるいは存在しないトピックに初めてデータが発行された時に自動的に生成されるようにするかの選択肢があります。トピックが自動生成された場合、自動生成されたトピックに使われるデフォルトのトピックの設定を調整したいかもしれません。

トピックはトピックツールを使って追加および修正されます:

  > bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
        --partitions 20 --replication-factor 3 --config x=y
リプリケーションのファクターはどれだけ多くのサーバが書き込まれる各メッセージをリプリケートするかを制御します。リプリケーションファクターが3の場合、データへのアクセスが失われるまでに2つまでのサーバが失敗することができます。データの消費無しにマシーンを透過的に解雇できるように、リプリケーションファクターを2または3にすることをお勧めします。

パーティションのカウントはどれだけ多くのログをトピックが共有するかを制御します。パーティションのカウントの効果には幾つかあります。最初に、各パーティションは完全に1つのサーバ上に納まらなければなりません。つまり、20のパーティションを持つ場合(そして、読み込みと書き込みをする)、全体のデータセットは20以下のサーバで処理されるでしょう(レプリカを数えません)。結果的に、パーティション数はコンシューマの最大並行度に影響を与えます。これは概念の章で詳細に議論されます。

共有された各パーティションログはKafkaログディレクトリの下の独自のフォルダに配置されます。そのようなフォルダの名前はダッシュ (-) とパーティションidが追加されたトピック名から成ります。一般的なフォルダ名は255を超える文字超にはできないため、トピック名の長さには制限があるでしょう。パーティションの数はいつでも100,000を超えないでしょう。従って、トピック名は249文字よりも長くすることはできません。これによりフォルダ名にはダッシュと5桁の可能性のあるパーティションidにちょうど余裕があります。

コマンドライン上で追加された設定は、データが維持しなければならない期間のようなサーバのデフォルト設定を上書きします。トピック毎の設定の完全なセットはここで説明されます。

トピックの修正

同じトピックツールを使っているトピックの設定あるいはパーティショニングを変更する事ができます。

パーティションを追加するには

  > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
        --partitions 40
パーティションのユースケースの1つは意味論的にデータをパーティション化することであり、パーティションの追加は既存データのパーティションを変更しないため、もしコンシューマがそのパーティションに依存する場合はコンシューマを乱すかもしれないことに注意してください。つまり、もしデータがhash(key) % number_of_partitionsでパーティション化されている場合、このパーティション化はパーティションの追加によってシャッフルされるかもしれませんが、Kafkaはどうであれ自動的にはデータを再分配しようとしないでしょう。

設定を追加するには:

  > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
設定を削除するには:
  > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
そして、結果的にトピックを削除するには:
  > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

Kafka は今のところトピックについてのパーティションの数の削減をサポートしていません。

トピックのリプリケーション ファクターについての説明はここで見つけることができます。

Graceful シャットダウン

Kafkaクラスタは自動的にブローkーアのシャットダウンあるいは障害を検知し、そのマシーン上のパーティションの新しいリーダーを選出するでしょう。これは、サーバが失敗するか、メンテナンスあるいは設定の変更のために意図的に落とすかのどちらかで、起こるでしょう。後者の場合、Kafkaはサーバを停止するために単にkillするよりもより優雅な機構をサポートします。サーバがgracefullyに停止した場合、利用可能な2つの最適化があります:
  1. 再起動をした時にログの回復をする必要を避けるためにディスクに全てのログを同期するでしょう (つまり、ログのtailの中で全てのメッセージについてチェックサムの検証をする)。ログの回復には時間がかかるため、これは内部的な再起動を高速化します。
  2. シャットダウンする前に、そのサーバがリーダーになっている全てのパーティションを他のレプリカに移行するでしょう。リーダーシップの転送を高速化し、各パーティションが利用できない時間を数ミリ秒に最小化するでしょう。
サーバがハードkillで停止されない場合はログの同期が自動的に起こりますが、制御されたリーダーシップの移設には特別な設定の利用が必要です:
      controlled.shutdown.enable=true
制御されたシャットダウンはブローカー上でホストされる全てのパーティションがレプリカを持つ場合のみ成功するでしょう (つまり、レプリケーションのファクターが1以上で少なくともそれらのレプリカの1つが活動中である)。最後のレプリカをシャットダウンするとトピックパーティションが使えなくなるので、これは一般的に望ましいものです。

リーダーシップのバランシング

ブローカーが停止あるいはクラッシュした時はいつでも、ブローカーのパーティションが他のレプリカに転送されます。ブローカが再起動した場合、ブローカはそのすべてのパーティションのフォロワーになるだけで、クライアントの読み取りと書き込みには使われません。

この不均衡を避けるために、Kafkaは優先のレプリカの概念を持ちます。パーティションのためのレプリカのリストが1,5,9の場合、ノード1はレプリカのリストの中で先にあるためノード1がノード5あるいは9のどちらよりもリーダーとして優先されます。デフォルトでは、Kafka クラスタは回復されたレプリカにリーダーシップを回復しようとします。この動作は以下のように設定されています:

      auto.leader.rebalance.enable=true
これを false に設定することもできますが、その後以下のコマンドを実行して、回復されたレプリカにリーダーシップを手動で回復する必要があります:
  > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

ラックを超えたレプリカのバランシング

ラックの認識機能は異なるラックを超えた同じパーティションのレプリカを分散します。これはKafkaがラック障害を補うためにブローカーの障害に対して提供する保証を延長し、ラック上の全てのブローカーが一度に障害になった場合のデータの喪失のリスクを制限します。その機能はEC2での可用性領域のような他のブローカーのグルーピングへ適用することもできます。

ブローカーの設定にプロパティを追加することで、ブローカーが特定のラックに所属するように指定することもできます:
  broker.rack=my-rack-id
トピックが生成, 修正 あるいはレプリカが再分配された時に、レプリカができる限り多くのラックにまたがるようにしながら、ラックの制約が尊重されるでしょう。

レプリカをブローカーに割り当てるために使われるアルゴリズムは、ブローカーがどのようにラックを超えて分散されるかに関係なく、ブローカーあたりのリーダーの数が一定であることを確実にします。これはバランスのスループットを保証します。

しかし、もしラックが異なる数のブローカーに割り当てられる場合、レプリカの割り当ては均等ではないでしょう。より多くのストレージを使用しより多くのレプリケーションにリソースを割り当てることを意味するため、より少ないブローカーを持つラックはより多くのレプリカを取得するでしょう。従って、ラック毎に同じ数のブローカーを設定することは意味があります。

クラスタ間のデータのミラーリング

1つのクラスタ内のノードの間で起こるレプリケーションとの混乱を避けるために、Kafkaクラスタ のデータのレプリケートのプロセスを "ミラーリング"とします。KafkaはKafkaクラスタ間でデータをミラーリングするためのツールを同梱します。ツールはソースクラスタから消費し、目的のクラスタへ生成します。この種類のミラーリングについての一般的なユースケースは、他のデータセンター内のレプリカを提供することです。このシナリオは次の章で詳細に議論されるでしょう。

スループットを増やし耐障害性のために多くのそのようなミラーリング プロセスを実行することができます (もし1つのプロセスが終了すると、他のプロセスは追加の負荷を引き継ぎます)。

データはソースクラスタ内のトピックから読み込まれ、目的のクラスタ内の同じ名前のトピックに書き込まれるでしょう。実施はミラーメイカーはKafkaコンシューマおよび一緒にフックされるプロデューサよりも少し多くの事をします。

ソースおよび目的のクラスタは完全にエンティティに依存しません: それらは異なる数のパーティションを持ち、オフセットは同じでは無いでしょう。この理由で、(カスタマの位置が異なるだろうため) ミラー クラスタは対障害性の仕組みとして実際には意図されていません; そのためには通常のクラスタ内のレプリケーションの使用をお勧めします。しかし、ミラーメイカープロセスはパーティションについてのメッセージキーを維持し使用します。つまり順番はキー毎ベースで保持されます。

入力クラスターから1つの(my-topicという名前の)トピックをミラーする方法を示す例です:

  > bin/kafka-mirror-maker.sh
        --consumer.config consumer.properties
        --producer.config producer.properties --whitelist my-topic
--whitelistオプションを使ってトピックのリストを指定することに注意してください。このオプションはJava-style regular expressionsを使う全ての正規表現を許可します。つまり--whitelist 'A|B'を使ってABという名前の2つのトピックをミラーすることができます。あるいは--whitelist '*'を使って全てのトピックをミラーすることができます。シェルがファイルパスとして正規表現を展開しないように、全ての正規表現をクォートするようにしてください。便宜上、トピックのリストを指定するために '|' の代わりに ',' の仕様を許可します。ミラーリングと設定 auto.create.topics.enable=true と組み合わせると、新しいトピックが追加された時でもソースのクラスタ内の全てのデータを自動的に生成しレプリケートするレプリカクラスタを持つことができます。

コンシューマの位置の調査

コンシューマの位置を知るのに便利な時があります。コンシューマグループ内の全てのコンシューマの位置と、それらログの最後からどれだけ遅れているかを知るツールがあります。このツールをmy-topicという名前のトピックを消費しているmy-group という名前のコンシューマグループ上で実行するには、以下のようになるでしょう:
  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

  TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
  my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
  my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
  my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2

コンシューマグループの管理

ConsumerGroupCommand ツールを使って、コンシューマグループをリスト化、表現、削除することができます。コンシューマ グループは手動で削除されるか、グループについての最後にコミットされたオフセットが期限切れになる時に自動的に削除されます。手動の削除はグループが何もアクティブなメンバーを持たない場合のみ動作します。例えば、全てのトピックに渡って全てのコンシューマグループをリスト化するには:
  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

  test-consumer-group
以前に述べたように、オフセットを見るには、以下のようにコンシューマグループを"describe"します:
  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
  topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4
コンシューマグループについてのより詳細な情報を提供するために使うことができる多くの追加の "describe" オプションがあります:
  • --members: このオプションはコンシューマグループ内の全てのアクティブなメンバーのリストを提供します。
          > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
    
          CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
          consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
  • --members --verbose: 上の "--members" オプションで報告される情報の上に、このオプションは各メンバに割り当てられたパーティションの情報も提供します。
          > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
    
          CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
          consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
  • --offsets: これはデフォルトの記述オプションで、"--describe" オプションと同じ出力を提供します。
  • --state: このオプションは有用なグループレベルの情報を提供します。
          > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
    
          COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
          localhost:9092 (0)        range                     Stable               4
1つ以上のコンシューマグループを手動で削除するには、"--delete" オプションを使うことができます:
  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

  リクエストされたコンシューマグループ ('my-group', 'my-other-group') の削除が正常に完了しました。

コンシューマ グループのオフセットを再設定するには、"--reset-offsets" オプションを使うことができます。このオプションは一度に1つのコンシューマグループをサポートします。以下のスコープを定義する必要があります: --all-topics あるいは --topic. '--from-file' scenario を使わない場合は、1つのスコープが選択されなければなりません。また、最初にコンシューマのインスタンスがアクティブではないことを確認してください。詳細はKIP-122を見てください。

3つの実行オプションを持ちます:

  • (デフォルト) どのオフセットを再設定するかを表示する。
  • --execute : --reset-offsets プロセスを実行する。
  • --export : 結果をCSV形式にエクスポートする。

--reset-offsets は次のシナリオから選択することもできます (少なくとも1つのシナリオを選択する必要があります):

  • --to-datetime <String: datetime> : オフセットをdatetimeからのオフセットに再設定する。Format: 'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : オフセットを最も早いオフセットに再設定する。
  • --to-latest : オフセットを最新のオフセットに再設定する。
  • --shift-by <Long: number-of-offsets> : 現在のオフセットを 'n' だけ移すオフセットに再設定する。'n' は正または負を指定できます。
  • --from-file : オフセットをCSVファイル内で定義された値に再設定する。
  • --to-current : オフセットを現在のオフセットに再設定する。
  • --by-duration <String: duration> : オフセットを現在のタイムスタンプから持続期間のオフセットに再設定する。Format: 'PnDTnHnMnS'
  • --to-offset : オフセットを特定のオフセットに再設定する。
範囲外のオフセットは利用可能なオフセットの終わりに調整されることに注意してください。例えば、オフセットが10で終わり、オフセットの移動リクエストが15の場合、10のオフセットが実際には選択されまう。

例えば、コンシューマグループのオフセットを最新のオフセットに再設定するには:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

  TOPIC                          PARTITION  NEW-OFFSET
  topic1                         0          0

古い高レベルのコンシューマを使っていて、ZooKeeperにグループのメタデータが格納している場合(つまりoffsets.storage=zookeeper)、--bootstrap-serverの代わりに--zookeeperを渡します:

  > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

クラスタの拡張

Kafkaクラスタへサーバを追加することは簡単で、単にそれらにユニークなブローカーidを割り当て、新しいサーバ上でKafkaを開始します。しかし、これらの新しいサーバは自動的にデータのパーティションを割り当てられないでしょう。つまり、パーティションがそれらに移動されない場合、新しいトピックが作成されるまでそれらは何も仕事をしないでしょう。そのようにして通常はクラスタにマシーンを追加する時に何らかの既存のデータをこれらのマシーンに移設したいでしょう。

データの移設のプロセスは手動で起動されますが、完全に自動化されます。背後では、Kafkaが新しいサーバを移設しているパーティションのフォロワーとして追加し、パーティション内の既存のデータを完全にレプリケートすることができます。新しいサーバがこのパーティションの内容を完全にレプリケートし同期のレプリカが加わった時に、既存のレプリカの1つがそれらのパーティションデータを削除するでしょう。

ブローカーを超えてパーティションを移動するためにパーティション再割り当てツールを使うことができます。理想的なパーティションの分散は、全てのブローカーに渡って均等なデータの負荷とパーティションのサイズを保証するでしょう。パーティションの再割り当てツールは、自動的にKafkaクラスタ内のデータ分散を学習して負荷の均等な分散を行うためにパーティションを移動する機能はありません。そのように、管理者はどのトピックあるいはパーティションが周りに移動されなければならないかを見積もる必要があります。

パーティションの再割り当てツールは3つの相互排他モードで実行することができます:

  • --generate: このモードでは、トピックのリストとブローカーのリストを指定し、ツールは指定されたトピックの全てのパーティションを新しいブローカーへ移動するための再割り当ての候補を生成します。このオプションは、トピックと目的のブローカーのリストが指定されたパーティションの再割り当て計画を生成する簡易な方法を提供するだけです。
  • --execute: このモードでは、ツールはユーザが提供した再割り当て計画に基づいたパーティションの再割り当て計画を開始します。(--reassignment-json-file オプションを使います)。これは、管理者によって手作りされた独自の再割り当て計画か、あるいは--generateオプションを使って提供されたもののどちらかかもしれません
  • --verify: このモードでは、ツールは最後の --execute の間リスト化された全てのパーティションについての再割り当ての状態を検証します。状態は、完全に完了した、失敗した、あるいは実行中のいずれかです
新しいマシーンへのデータの自動的な移設
パーティション再割り当てツールは現在のブローカーのセットから新しく追加されたブローカーへ幾つかのトピックを移動するのに使うことができます。トピック全体を新しいブローカーのセットへ移動するのが簡単なため、これは既存のクラスタを拡張し、1つのパーティションを同時に移動するのに有用です。これを使う時には、ユーザは新しいブローカーのセットに移動されるべきトピックのリストと目的の新しいブローカーのリストを提供する必要があります。ツールは結果的に指定されたトピックのリストについての全てのパーティションを新しいブローカーのセットに渡って分散します。この移動の間、トピックのリプリケーションのファクターは一定に保たれます。入力のトピックのリストについての全てのパーティションのレプリカは、古いブローカーのセットから新しく追加されたブローカーへ効果的に移動されます。

例えば、以下の例はトピック foo1, foo2 の全てのパーティションを新しいブローカーのセット 5,6 へ移動するでしょう。この移動の最後に、トピック foo1 と foo2 の全てのパーティションはブローカー 5,6 にのみ存在します。

ツールは入力のトピックのリストをjsonファイルとして受け付けるため、最初に移動したいトピックを識別し以下のようにjsonファイルを作成する必要があります:

  > cat topics-to-move.json
  {"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
  }
jsonファイルの準備ができると、候補の割り当てを生成するためにパーティション再割り当てツールを使います:
  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
  現在のパーティションのレプリカの割り当て

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                {"topic":"foo1","partition":0,"replicas":[3,4]},
                {"topic":"foo2","partition":2,"replicas":[1,2]},
                {"topic":"foo2","partition":0,"replicas":[3,4]},
                {"topic":"foo1","partition":1,"replicas":[2,3]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }

  指定されたパーティションの再割り当ての設定

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }

ツールはトピック foo1, foo2 からブローカー 5,6 へ全てのパーティションを移動する候補の割り当てを生成します。しかし、この時点では、パーティションの移動は開始されておらず、現在の割り当てと提案された新しい割り当てを知らせるだけなことに注意してください。ロールバックしたい場合は現在の割り当てが保存されなければなりません。新しい割り当ては、以下のように --execute オプションを使ってツールの入力になるように、jsonファイルに保存されなければなりません (例えば、expand-cluster-reassignment.json):

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
  現在のパーティションのレプリカの割り当て

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                {"topic":"foo1","partition":0,"replicas":[3,4]},
                {"topic":"foo2","partition":2,"replicas":[1,2]},
                {"topic":"foo2","partition":0,"replicas":[3,4]},
                {"topic":"foo1","partition":1,"replicas":[2,3]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }

  ロールバックの間に --reassignment-json-file オプションとして使うために、これを保存します
  パーティションの再割り当ての開始の成功
  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }

最後に、パーティションの再割り当ての状態を調べるために、ツールと一緒に--verify オプションを使うことができます。--verify オプションと一緒に、同じ expand-cluster-reassignment.json (--execute オプションと一緒に使われた) が使われなければならないことに注意してください:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
  パーティションの再割り当ての状態:
  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo1,1] is in progress
  Reassignment of partition [foo1,2] is in progress
  Reassignment of partition [foo2,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully
  Reassignment of partition [foo2,2] completed successfully
独自のパーティションの割り当てと移設
パーティション再割り当てツールはパーティションのレプリカを選択的に特定のブローカーのセットに移動するために使うこともできます。このように使った場合、--generateステップを効率的にスキップし--executeステップに直接移動しながら、ユーザが再割り当て計画を知っていて、ツールが候補の再割り当てを必要としないことを仮定します。

例えば、以下の例はトピック foo1 の パーティション 0 を ブローカー 5,6 へ、トピック foo2 のパーティション 1 を ブローカー 2,3 へ移動します:

最初のステップはjsonファイルに独自の再割り当て計画を手作りすることです:

  > cat custom-reassignment.json
  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
そして --execute オプションにjsonファイルを使って、プロセスを再割り当てし始めます:
  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
  現在のパーティションのレプリカの割り当て

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
                {"topic":"foo2","partition":1,"replicas":[3,4]}]
  }

  ロールバックの間に --reassignment-json-file オプションとして使うために、これを保存します
  パーティションの再割り当ての開始の成功
  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }

パーティションの再割り当ての状態を調べるために、ツールに --verify オプションを使うことができます。同じ custom-reassignment.json (--execute オプションで使われる) を --verify オプションで使う必要があることに注意してください:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
  パーティションの再割り当ての状態:
  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully

ブローカーの縮退

パーティション再割り当てツールはブローカーを退役するために再割り当て計画を自動的に生成する機能をまだ持っていません。そのため、管理者はブローカー上でホストされる全てのパーティションのためのレプリカを退役するために移動するための再割り当て計画をブローカーの残りに対して提供する必要があります。再割り当ては、全てのレプリカが廃止されたブローカーから他の1つだけのブローカーに移動されないようにする必要があるため、比較的面倒な作業かもしれません。このプロセスを容易くするために、退役するブローカーのためのツールのサポートを将来追加する予定です。

リプリケーション要素の増加

既存のパーティションのリプリケーション ファクターを増加することは容易です。独自の再割り当てjsonファイル内の余分なレプリカを単に指定し、指定されたパーティションのレプリケーション ファクタを増やすために --execute オプションと一緒に使います。

例えば、以下の例はトピックfooのパーティション0のリプリケーション ファクターを1から3に増やします。リプリケーション ファクターを増やす前は、ブローカー5上にはパーティションのレプリカだけが存在します。レプリケーション ファクターの増加の一部として、ブローカー6と7にレプリカを追加するでしょう。

最初のステップはjsonファイルに独自の再割り当て計画を手作りすることです:

  > cat increase-replication-factor.json
  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
そして --execute オプションにjsonファイルを使って、プロセスを再割り当てし始めます:
  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
  現在のパーティションのレプリカの割り当て

  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

  ロールバックの間に --reassignment-json-file オプションとして使うために、これを保存します
  パーティションの再割り当ての開始の成功
  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

パーティションの再割り当ての状態を調べるために、ツールに --verify オプションを使うことができます。--verify オプションと一緒に、同じ increase-replication-factor.json (--execute オプションと一緒に使われた) が使われなければならないことに注意してください:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
  パーティションの再割り当ての状態:
  Reassignment of partition [foo,0] completed successfully
kafka-topicツールを使ってレプリケーション ファクターの増加を検証することもできます:
  > bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
  Topic:foo	PartitionCount:1	ReplicationFactor:3	Configs:
    Topic: foo	Partition: 0	Leader: 5	Replicas: 5,6,7	Isr: 5,6,7

データの移設の間の帯域の使い方の制限

マシーンからマシーンへレプリカを移動するために使われる帯域に上限を設定して、Kafkaはレプリケーションのトラフィックに抑圧を適用します。これは、クラスタをリバランス、新しいブローカーをブートストラップ、ブローカーを削除する時に、それらのデータに集中的な操作がユーザに与える影響を制限するため、便利です。

絞りを保証するために使うことができる2つのインタフェースがあります。最も単純で最も安全なのはkafka-reassign-partitions.shを起動する時に絞りを適用することですが、kafka-config.shは直接絞りの値を表示および変更するために使うこともできます。

つまり、例えば前のコマンドを使ってリバランスを実行した場合、50MB/s未満でパーティションを移動するでしょう。
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000
このスクリプトを実行すると、絞りが行われるのが分かるでしょう。
  絞りの限界は 50000000 B/s に設定されていました
  パーティションの再割り当てが無事に開始されました

リバランス中にスロットルを変更したい場合、スループットを上げてより速く完了するようにしたい場合は、同じreassignment-json-fileを渡すexecuteコマンドを再実行することでこれを行うことができます:

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
  既存の実行中の再割り当てがあります
  絞りの制限は 700000000 B/s に設定されていました

リバランスがいったん完了すると、管理者は --verify オプションを使ってリバランスの状態を調べることができます。リバランスが完了すると、絞りは --verifyコマンドによって削除されるでしょう。--verify オプションと共にコマンドを実行することでリバランスが完了すると、管理者がしかるべき方法で絞りを削除することが重要です。それに失敗すると通常のレプリケーションのトラフィックが絞られるかもしれません。

--verify オプションが実行され、再割り当てが完了すると、スクリプトは絞りが削除されたことを確認するでしょう:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json
  パーティションの再割り当ての状態:
  Reassignment of partition [my-topic,1] completed successfully
  Reassignment of partition [mytopic,0] completed successfully
  Throttle was removed.

管理者はkafka-configs.shを使って割り当てられた設定を検証することもできます。絞りのプロセスを管理するために使われる絞りの設定の2つのペアがあります。最初のペアは絞り値自体を参照します。これは動的なプロパティを使ってブローカーレベルで設定されます:

    leader.replication.throttled.rate
    follower.replication.throttled.rate

次に、絞られたレプリカの列挙されたセットの構成ペアがあります:

    leader.replication.throttled.replicas
    follower.replication.throttled.replicas

これらはトピックごとに設定されます。

4つ全ての設定値はkafka-reassign-partitions.shによって自動的に割り当てられます (以下で議論されます)。

絞りの制限設定を見るには:

  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

これはレプリケーションプロトコルのリーダーおよびフォロワーの両方に適用される絞りを示します。デフォルトでは両方の側は同じ絞られたスループットの値を割り当てられます。

絞られたレプリカのリストを見るには:

  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

ここで、リーダーの絞りがブローカー102上のパーティション1 および ブローカー101上のパーティション0に適用されることを見ます。同様にフォロワーの絞りはぶろかー101上のパーティション1およびブローカー102上のパーティション0に適用されます。

デフォルトでは、kafka-reassign-partitions.sh はリーダーの絞りをリバランス前に存在する全てのレプリカに適用します。それらのうちの1つはリーダーかもしれません。それはフォロワーの絞りを全ての移動の宛先に適用します。つまり、ブローカー102, 103 に再割り当てされるブローカー101,102上のレプリカを持つパーティションがある場合、そのパーティションのリーダーの絞りは101,102に適用され、フォロワーの絞りは103だけに適用されるでしょう。

必要であれば、絞りの設定を手動で変更するためにkafka-configs.shの--alterスイッチを使うこともできます。-

絞られたレプリケーションの安全な使い方

絞られたレプリケーションを使う時は幾らかの注意が必要です。実際には:

(1) 絞りの削除:

一旦再割り当てが完了すると、しかるべき方法で絞りを削除する必要があります (kafka-reassign-partitions.sh --verify を実行することで)。

(2) 進捗の保証:

入ってくる書き込みレートに比較して絞りがあまりに低く設定された場合、レプリケーションが進捗しないことがありえます。これは以下の時に起こります:

max(BytesInPerSec) > throttle

BytesInPerSec はプロデューサの各ブローカーへの書き込みスループットを監視するメトリックです。

管理者は、以下のメトリックを使って、リバランスの間にレプリケーションが進捗しているかどうかを監視することができます:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

レプリケーションの間に遅延は定常的に減らなければなりません。メトリックが減らない場合、管理者は上で説明したように絞りのスループットを増やさなければなりません。

クォータの設定

クォータは、ここで説明されるように、(user, client-id)、ユーザあるいはクライアントidレベルで設定されるデフォルトを上書きします。デフォルトでは、クライアントは無制限のクォータを受け取ります。各 (user, client-id)、ユーザあるいはクライアントidグループ について、独自のクォータを設定することができます。

(user=user1, client-id=clientA) についての独自のクォータを設定する:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  エンティティ: user-principal 'user1', client-id 'clientA' について更新された設定。
user=user1 についての独自のクォータを設定する:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
  エンティティ: user-principal 'user1' について更新された設定。
client-id=clientA について独自のクォータを設定する:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
  エンティティ: client-id 'clientA' について更新された設定。
--entity-nameの代わりに--entity-defaultオプションを指定することで、各 (user, client-id)、ユーザあるいはクライアントidグループについてのデフォルトのクォータを設定することができます。

user=userAについてのデフォルトのclient-id クォータを設定する:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
  エンティティ: user-principal 'user1'、デフォルト client-id について更新された設定。
ユーザについてのデフォルトのクォータを設定する:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
  エンティティ: デフォルト user-principal についての設定を更新する
client-idについてのデフォルトのクォータを設定する:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
  エンティティ: デフォルト client-id についての設定を更新する
指定された (user, client-id) についてのクォータを説明する方法:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
指定された user についてのクォータを説明する:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
指定された client-id についてのクォータを説明する:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
  Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
エンティティ名が指定されない場合、指定された型の全てのエンティティが説明されます。例えば、全てのユーザを説明するには:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
同様に、(user, client) については:
  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
  Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

これらの設定をブローカーに設定することで、全てのクライアントidに適用されるデフォルトのクォータを設定することができます。これらのプロパティはクォータが上書きするか、デフォルトがZooKeeper内で設定されていない場合のみ適用されます。デフォルトでは、各クライアントidは無制限のクォータを受け取ります。以下はプロデューサとコンシューマ毎のデフォルトのクォータを 10MB/sec に設定します。

    quota.producer.default=10485760
    quota.consumer.default=10485760
これらのプロパティは非推奨で、将来のリリースでは削除されるかもしれないことに注意してください。kafka-configs.shを使って設定されるデフォルトは、これらのプロパティよりも優先されます。

6.2データセンタ

幾つかの配備は複数のデータセンタをまたぐデータパイプラインを管理する必要があるでしょう。これのお勧めの方法は、各データセンター内でローカルクラスタ内でのみやり取りをしクラスタ間でミラーリングをするアプリケーション インスタンスを各データセンタ内でローカルKafkaクラスタを配備しすることです (これを行う方法についてはミラー メイカー ツールのドキュメントを見てください)。

この配備のパターンによりデータセンタは依存しないエンティティとして振舞うことができ、中央集権的にデータセンター内を管理および調整することができます。これにより、データセンター間が利用不可であっても各ファシリティはスタンドアローンで操作することができます: これが起きた時、リンクが追いつく時点まで回復するまでミラーリングは遅れます。

全てのデータのグローバルビューを必要とするアプリケーションのために、全てのデータセンター内のローカルクラスタからミラーされた集約データを持つクラスタを提供するためにミラーリングを使うことができます。これらの集約クラスタは完全なデータセットを必要とするアプリケーションによって読み込まれるために使われます。

これは配備のパターンにだけ可能なわけではありません。WANを超えてリモートのKafkaクラスタから読み込みあるいは書き込みすることが可能ですが、これはクラスタを取得するために必要な何らかのレイテンシを追加するでしょう。

Kafkaは本質的にプロデューサおよびコンシューマの両方の中にデータを揃えます。つまり高レイテンシの接続上であっても高スループットを達成することができます。しかしこれを可能にするには、socket.send.buffer.bytessocket.receive.buffer.bytesの設定を使って、プロデューサ、コンシューマおよびブローカーのTCPソケットバッファサイズを増やす必要があるかもしれません。これを設定する適切な方法はここで提供されます。

高レンテンシのリンク上で複数のデータセンタに掛かる単独のKafkaクラスタを実行することは、一般的に望ましいものではありません。これはKafkaの書き込みおよびZookeeperの書き込みの両方に高レプリケーション レイテンシを招き、もし場所間のネットワークが利用できない場合にKafkaあるいはZooKeeperのどちらも全ての場所で利用できないままになるでしょう。

6.3Kafka 設定

重量なクライアント設定

最も重要なプロデューサの設定は以下の通りです:
  • acks
  • 圧縮
  • バッチ サイズ
最も重要なコンシューマの設定はfetchサイズです。

全ての設定は設定の章の中で提供されます。

プロダクションのサーバ設定

以下はプロダクションのサーバ設定の例です:
  # ZooKeeper
  zookeeper.connect=[list of ZooKeeper servers]

  # ログの設定
  num.partitions=8
  default.replication.factor=3
  log.dir=[ディレクトリのリスト。Kafka は独自の専用ディスク(s)あるいはSSD(s)を持つべきです]

  # 他の設定
  broker.id=[整数。0から開始し、新しいブローカー毎に1追加。]
  listeners=[リスナーのリスト]
  auto.create.topics.enable=false
  min.insync.replicas=2
  queued.max.requests=[同時実行リクエストの数]
私たちのクライアントの構成はユースケースごとにかなり異なります。

6.4Java バージョン

Java 8 と Java 11 がサポートされます。Java 11 は、TLS が有効な場合にパフォーマンスが大幅に向上するため、Java 11 を強くお勧めします (他にも多くのパフォーマンス向上が含まれます: G1GC、 CRC32C、Compact Strings、Thread-Local Handshakes など)。自由に利用可能な古いバージョンはセキュリティ脆弱性が暴露されたため、セキュリティの観点から、最新のリリースされたパッチバージョンを使うことをお勧めします。OpenJDK ベースの Java実装 (Oracle JDK を含む)で Kakfa を実行するための一般的な引数は以下の通りです:
  -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
  -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
  -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent
参考までに、上記の Java 引数を使う LinkedIn の最も忙しいクラスタの1つ(ピーク時)の統計は以下の通りです:
  • 60 ブローカ
  • 50k パーティション (リプリケーション要素 2)
  • 800k メッセージ/秒 in
  • 300 MB/sec inbound, 1 GB/sec+ outbound
そのクラスタ内のブローカーは、約21msの90% GC休止、秒間あたりの young GC は1未満です。

6.5ハードウェアとOS

24GBのメモリを持つデュアル クアッド-コア Intel Xeon マシーンを使っています。

アクティブ リーダーとライターをバッファするために十分なメモリが必要です。30秒間のバッファができるようにしたいと仮定することで、必要メモリの簡単な計算の推測と、必要なメモリをwrite_throughtput * 3 として計算できます。

ディスクのスループットは重要です。8x7200 rpm SATA ドライブがあります。一般的に、ディスクのスループットはパフォーマンスのボトルネックで、ディスクが多いほうが良いです。フラッシュの挙動をどのように設定するかに依存して、より高価なディスクから恩恵を受けるか受けないかもしれません (しばしばフラッシュを強制する場合は、高い RPM SAS ドライブがより良いかもしれません)。

OS

Kafkaはどのようなunixシステム上でも良く動作する必要があり、LinuxおよびSolaris上でテストされています。

Windows上での実行の2,3の問題を見つけました。Windowsは現在のところ良くサポートされたプラットフォームではありませんが、それを変えることができれば嬉しいです。

多くのOSレベルの調整を必要とすることはないでしょうが、3つの潜在的な重要なOSレベルの設定があります:

  • ファイル ディスクリプタの制限: Kafkaはセグメントの記録と接続を開くためにファイルディスクリプタを使います。ブローカーが多くのパーティションをホストする場合は、ブローカーが作成する接続の数に加えて、全てのログセグメントを追跡するためにブローカーは少なくとも (number_of_partitions)*(partition_size/segment_size) を必要とすることを考慮してください。開始時点としてブローカーの処理のために少なくとも 100000 のファイルディスクリプタを許可することをお勧めします。注意: mmap() 関数は、ファイル記述子 fildes に関連付けられたファイルへの追加の参照を追加します。これはファイル記述子に対する後続の close() では削除されません。この参照はファイルへのマッピングがもう無くなった場合に削除されます。
  • 最大ソケット バッファ サイズ: ここで説明されたようにデータセンタ間で高パフォーマンスのデータ転送を有効にするために増やされるかもしれません。
  • プロセスが持つメモリマップ領域への最大数 (aka vm.max_map_count)。Linux カーネルのドキュメントを見て下さい。ブローカーが持つことができるパーティションの最大数を考慮する時に、このOSレベルのプロパティに注意する必要があります。デフォルトでは、多くのLinuxシステムでは、vm.max_map_count の値は 65535 前後です。パーティションごとに割り当てられた各ログのセグメントは、インデックス/タイムインデックス ファイルnオペアを必要とし、これらの各ファイル は1つのマップ領域を消費します。つまり、各ログ セグメントは2つのマップ領域を使います。従って、各パーティションは1つのログセグメントをホストする限り、最低2つのマップ領域を必要とします。つまり、ブローカーに 50000 個のパーティションを作成すると、100000 個のマップ領域が割り当てられ、デフォルトのvm.max_map_countのシステムでは OutOfMemoryError (Map failed) が発生してブローカーをクラッシュする可能性があります。パーティションごとのログセグメントの数は、セグメントサイズ、負荷の強度、リテンション ポリシーによって異なり、一般的には1つ以上になる傾向があることに注意してください。

ディスクとファイルシステム

良いスループットを得るために複数のドライブを使い、良いレンテンシを保証するためにKafkaデータに使われるのと同じドライブをアプリケーションログあるいは他のOSファイルシステムの動作と共有しないことをお勧めします。これらのドライブを1つのボリュームにRAIDするか、独自のディレクトリとして各ドライブをフォーマットおよびmountすることができます。Kafkaはリプリケーションを持つため、RAIDによって提供される冗長性はアプリケーションレベルでも提供することができます。この選択には幾つかのトレードオフがあります。

複数のデータディレクトリを設定する場合、パティションはデータディレクトリにラウンドロビンで割り当てられるでしょう。各パーティションは完全にデータディレクトリのうちの1つの中でしょう。もしデータがパーティション間で良くバランスされない場合、これはディスク間で負荷の不均衡につながるかもしれません。

RAIDは負荷を低レベルでバランスするため、RAIDは潜在的にディスク間の負荷のバランスを良くします(しかし常にそうではないようにみえます)。RAIDの主な不利な点は、通常書き込みのスループットに大きな余分に掛かる負荷があり、利用可能なディスクスペースを減らします。

RAIDの他の潜在的な恩恵はディスク障害への耐性です。しかし私たちの経験ではRAIDアレイの再構築はとてもI/Oが強烈で事実上サーバを使えなくします。つまりこれは現実的な可用性の改善を提供しません。

アプリケーション 対 OSフラッシュ管理

Kafkaは常にすぐに全てのデータをファイルシステムに書き込み、いつデータがフラッシュを使ってOSキャッシュからディスクに強制的に追い出すかを制御できるフラッシュポリシーを設定する機能をサポートします。このフラッシュポリシーは一定時間の後、あるいはある数のメッセージが書き込まれた後で強制的にデータをディスクに追い出すことを制御することができます。この設定には幾つかの選択があります。

Kafkaはデータがフラッシュされたことを知るために結果的にfsyncを呼びます。クラッシュから回復する時に、fsyncしたかどうかを知らないログセグメントのためにKafkaはCRCをチェックすることで各メッセージの完全性を調べ、開始時に実行される回復プロセスの一部として付属するオフセットインデックスファイルも再構築するでしょう。

故障したノードは常にそのレプリカから回復されるだろうため、Kafkaでの耐久性はディスクへの同期を必要としないことに注意してください。

完全なアプリケーションの同期を無効にするデフォルトのフラッシュの設定を使うことをお勧めします。このことはOSによって行われるバックグラウンドのフラッシュとKafkaの独自のバックグランドのフラッシュを頼りにすることを意味します。これはほとんどの使い方にとって最善を提供します: 調整ツマミが無く、スループットとレイテンシが大きく、完全な回復保証。レプリケーションによって提供される保証がローカルディスクへの同期よりも強いと一般的に思いますが、偏執症な人はまだ両方を持つことを好むかも知れず、アプリケーションレベルのfsyncポリシーはまだサポートされます。

アプリケーションレベルのフラッシュ設定を使う不利な点は、ディスクの使用パターンで非効率なことです(それにより書き込みを再び並べるのにOSに行動の自由がない)。バックグランドのフラッシュはページレベルのブロックであるのに対し、ほとんどのLinuxファイルシステムでのfsyncはファイルへの書き込みをブロックするため、レイテンシを導入するかもしれません。

一般的にファイルシステムの低レベルの調整をする必要はありませんが、以下の幾つかの章ではそれが有用な場合に備えてこれの幾つか調べます。

Linux OSのフラッシュの挙動の理解

Linuxではファイルシステムへのデータの書き込みは(アプリケーションレベルのfsyncあるいはOS独自のフラッシュポリシーによって)ディスクに書き込まれなければならなくなるまで、ページキャッシュ内に維持されます。データのフラッシュはpdflush(あるいは以前の2.6.32カーネルでは"flusher threads")と呼ばれる一組のバックグランド スレッドによって行われます。

Pdflush はどれだけ多くのdirtyデータがキャッシュ内に維持され、ディスクに書き戻されなければならなくなるまでどれだけ長く維持されるかを制御する設定可能なポリシーを持ちます。このポリシーはここで説明されます。書き込まれているデータの速度に Pdflush が追い付かない場合、最終的にデータの蓄積を遅くするために書き込みプロセスの書き込みの遅延を起こします。

以下を行うことでOSメモリの使用の現在の状態を見ることができます

 > cat /proc/meminfo 
これらの値の意味は上のリンクで説明されます。

ディスクに書き込まれるデータの格納について、pagecacheの使用はプロセス内のキャッシュよりいくつかの利点があります:

  • I/O スケジューラは連続する小さな書き込みをスループットを改善するより大きな物理書き込みに詰め込むでしょう。
  • I/O スケジューラはスループットを改善するディスクの移動の最小化のために書き込みを再順序化しようとするでしょう。
  • それは自動的にマシーン上のフリーなメモリの全てを使います

ファイルシステムの選択

Kafkaはディスク上の通常ファイルを使い、それ自体では特定のファイルシステムへの厳しい依存はありません。しかし最もよく使われる2つのファイルシステムは、EXT4 と XFS です。歴史的にはEXT4の使用が増えていますが、XFSファイルシステムの最近の改良により、安定性が損なわれることなくKafkaの作業のパフォーマンス特性が向上しています。.

様々なファイルシステムの作成とmountオプションを使って、意味のあるメッセージの負荷を持つクラスタ上で比較テストが行われました。監視されるKafkaの主要なメトリックは "Request Local Time" で、追加操作が行われた時間を示します。XFS はより良いローカル時間 (最善のEXT4設定について 160ms 対 250ms+) と、より低い平均待ち時間の結果になりました。XFSのパフォーマンスもディスクパフォーマンスにおいて変わりにくいことを示しました。

一般的なファイルシステムの注意
Linuxシステム上で、データディレクトリに使われる全てのファイルシステムについて、以下のオプションがmount時に使われることをお勧めします:
  • noatime: このオプションは、ファイルが読み込まれる時にファイルのatime(最後のアクセス時間)属性の更新を無効にします。これは特にコンシューマが起動する場合のファイルシステムの書き込み数を取り除くことができます。Kafkaはatime属性を全く頼りにしないため、これを無効にしても安全です。
XFSの注意
XFS ファイルシステムは適切な多くの自動調整を持つため、ファイルシステムの作成時あるいはmount時のどちらかでデフォルトの設定に変更をする必要はありません。考慮する価値のある唯一の調整パラメータは以下の通りです:
  • largeio: これはstat呼び出しによって報告される優先I/Oサイズに影響します。これは大きなディスクの書き込みでよりパフォーマンスが高くなりますが、実際にはパフォーマンスには最小限あるいは全く効果がありません。
  • nobarrier: バッテリーで支援されるキャッシュを持つ背後にあるデバイスのためで、このオプションは定期的な書き込みフラッシュを無効にすることでほんの少しのパフォーマンスを提供することができます。しかし、もし背後にあるデバイスが良く振舞った場合、それはファイルシステムにフラッシュが必要無いと報告します。このオプションは影響が無いでしょう。
EXT4の注意
EXT4はKafkaデータディレクトリとしてファイルシステムの実用的な選択ですが、それで最善のパフォーマンスを得るには幾つかのmountオプションの調整を必要とするでしょう。更に、これらのオプションは一般的に障害時のシナリオで安全では無く、より多くのデータロスと衝突に終わるでしょう。1つのブローカーの障害については、ディスクは拭い取られレプリカがクラスタから再構築されるため、心配することはほとんどありません。停電のような複数の障害のシナリオにおいて、これは簡単には回復できない背後にあるファイルシステム(従ってデータ)の衝突を意味するかもしれません。以下のオプションが調節可能です:
  • data=writeback: Ext4のデフォルトは data=ordered で、いくつかの書き込みにおいて順番に強きを置きます。Kafkaは全てのフラッシュされていないログにとても偏執的なデータ回復を行うため、この順番は必要としません。この設定は順番の制約を削除し、レイテンシを極めて減らすように見えます。
  • ジャーナリングの無効: ジャーナリングはトレードオフです: サービスのクラッシュの後で再起動を高速化しますが、書き込みパフォーマンスへの変化をもたらす多くの追加のロックを導入します。再起動時間を気にせず、書き込みレイテンシのスパイクの主要な原因を減らしたい人は、ジャーナリングを完全に止めることができます。
  • commit=num_secs: これはext4がメタデータジャーナルにコミットする頻度を調整します。これを少ない値に設定すると、クラッシュ中のフラッシュされていないデータの紛失を減らします。これを大きな値に設定するとスループットを改善するでしょう。
  • nobh: この設定は data=writeback モードを使う時に追加の順番の保証を制御します。書き込みの順番に依存しないため、Kafkaではこれは安全に違いなく、スループットとレイテンシを改善します。
  • delalloc: Delayed allocation はファイルシステムが物理的な書き込みが発生するまでブロックの割り当てを避けることを意味します。これによりext4は小さなページの代わりに大きな範囲を割り当てることができ、データが連続して書き込まれることを保証します。この機能はスループットにとって素晴らしいです。ファイルシステム内でほんの少しのレイテンシの変化を追加するなんらかのロッキングを必要とするようです。

6.6監視

Kafkaはサーバ内のメトリクスレポートのためにYammer Metricsを使います。Java クライアントはKafkaメトリクス、クライアントアプリケーションにpullされる推移的依存を最小化する組み込みのメトリクスレジストリを使います。両方ともメトリクスをJMXを使って公開し、監視システムにホックアップするためにプラグ可能なstatsレポーターを使ってstatsをレポートするように設定することができます。

全てのKafkaのレート メトリックにはサフィクス-totalが付いた対応する累積カウント メトリックがあります。例えば、records-consumed-raterecords-consumed-totalという名前の対応するメトリックを持ちます。

利用可能なメトリクスを見る最も簡単な方法はjconsoleを立ち上げ、それを実行中のkafkaクライアントあるいはサーバに向けることです; これにより全てのメトリクスをJMXを使ってブラウズすることができるでしょう。

JMXを使ったリモート監視のためのセキュリティの考慮

Apache KafkaはデフォルトでリモートのJMXを無効にします。リモート JMXをプログラム的に有効にするために、CLIまたは標準Javaシステムのプロパティを使って開始されたプロセスの環境変数 JMX_PORT を設定して、リモート監視を有効にすることができます。プロダクション シナリオでリモートJMXを有効にする場合、未認証のユーザがブローカーあるいはアプリケーション、およびこれらが実行されているプラットフォームを監視または制御できないように、セキュリティを有効にする必要があります。KafkaではデフォルトでJMXの認証が無効になっていて、CLIを使って開始されたプロセスの環境変数 KAFKA_JMX_OPTS を設定するか、適切なJavaシステムプロパティを設定することで、プロダクションの配備のためにセキュリティ設定を上書きしなければならないことに注意してください。JMXの保護の詳細についてはJMX技術を使った監視と管理を見てください。

以下のメトリクスをグラフおよびアラートします:

説明 Mbean 名 通常値
メッセージのレート kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
クライアントからのバイトのレート kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
他のブローカーからのバイトのレート kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
リクエストのレート kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower}
エラー レート kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+) リクエスト型、エラーコード毎にカウントされた応答の中のエラーの数。応答が複数のエラーを含む場合は、全てがカウントされます。error=NONE indicates successful responses.
リクエスト サイズのバイト kafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+) 各リクエストの型についてのリクエストのサイズ。
一時的なメモリサイズのバイト kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce|Fetch} メッセージ形式の変換と解凍のために使われる一時メモリ。
メッセージの変換時間 kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch} メッセージ形式の変換に費やされる時間のミリ秒。
メッセージの変換レート kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+) メッセージ形式の変換に必要なレコードの数。
Request Queue Size kafka.network:type=RequestChannel,name=RequestQueueSize Size of the request queue.
クライアントへのバイトのレート kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
他のブローカーへのバイトのレート kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
圧縮されたトピックに指定されたキーが無いことによる、メッセージの検証の失敗レート kafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec
無効なマジック番号によるメッセージ検証失敗率 kafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec
不正な crc チェックサムによるメッセージ検証失敗率 kafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec
バッチ内の非連続なオフセットまたはシーケンス番号によるメッセージ検証失敗率 kafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec
ログのフラッシュレートと時間 kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
# of under replicated partitions (the number of non-reassigning replicas - the number of ISR > 0) kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions 0
# of under minIsr partitions (|ISR| < min.insync.replicas) kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount 0
# of at minIsr partitions (|ISR| = min.insync.replicas) kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount 0
# of offline log directories kafka.log:type=LogManager,name=OfflineLogDirectoryCount 0
ブローカー上でコントローラがアクティブかどうか kafka.controller:type=KafkaController,name=ActiveControllerCount クラスタ内の唯一のブローカーが1つを持つ筈です
リーダーの選出レート kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs ブローカーの障害の時には非0
消去されていないリーダーの選出レート kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec 0
保留中のトピックの削除 kafka.controller:type=KafkaController,name=TopicsToDeleteCount
保留中のレプリカの削除 kafka.controller:type=KafkaController,name=ReplicasToDeleteCount
対象外の保留中のトピックの削除 kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount
対象外の保留中のレプリカの削除 kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount
パーティションの数 kafka.server:type=ReplicaManager,name=PartitionCount ほとんどはブローカーを横断して偶数
リーダーのレプリカの数 kafka.server:type=ReplicaManager,name=LeaderCount ほとんどはブローカーを横断して偶数
ISR のシュリンク レート kafka.server:type=ReplicaManager,name=IsrShrinksPerSec もしブローカーがダウンする場合、パーティションのISRはシュリンクするでしょう。ブローカーが再び上がる時、一旦レプリカが完全に追いつくとISRは一度拡大するでしょう。それと違う場合、ISRシュリンクレートと拡張レートの両方の期待値は0です。
ISR 拡大レート kafka.server:type=ReplicaManager,name=IsrExpandsPerSec 上を見てください
フォロワーとリーダーレプリカの間でのメッセージの最大の遅れ kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica 遅れは生成リクエストの最大バッチサイズに比例するべきです。
フォロワーレプリカごとのメッセージの遅れ kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+) 遅れは生成リクエストの最大バッチサイズに比例するべきです。
Requests waiting in the producer purgatory kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce ack=-1 が使われる場合非0
Requests waiting in the fetch purgatory kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch サイズはコンシューマ内のfetch.wait.max.msに依存します
リクエストの総時間 kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower} キュー、ローカル、リモートおよび応答の送信時間に分割されます
リクエストがリクエストキューの中で待つ時間 kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}
リクエストがリーダーで処理される時間 kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower}
リクエストがフォロワーを待つ時間 kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower} ack=-1の時はプロデューサのリクエストについては非0
リクエストが応答キューの中で待つ時間 kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}
応答を送信する時間 kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower}
コンシューマがプロデューサに遅れるメッセージの数。ブローカーでは無く、コンシューマによって発表されます。 kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max
ネットワーク プロセッサが仕事をしていない平均断片時間 kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent 0と1の間。理想的には > 0.3
クライアントが再認証を行わず、再認証以外の目的で有効期限が切れた接続を使用したために、プロセッサで切断された接続の数。 kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count 再認証が有効な場合は利用的には0です。この(リスナー、プロセッサ)組み合わせに接続している2.2.0以前のクライアントが存在しないことを意味します。
クライアントが再認証を行わず、再認証以外の目的で有効期限が切れた接続を使用したために、全てのプロセッサにわたって切断された接続の合計数。 kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount 再認証が有効な場合は利用的には0です。このブローカーに接続している2.2.0以前のクライアントが存在しないことを意味します。
リクエスト ハンドラのスレッドが仕事をしていない平均断片時間 kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent 0と1の間。理想的には > 0.3
ユーザあるいはクライアントidごと(user, client-id)の帯域割り当てメトリクス kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+) 2つの属性。throttle-time はクライアントが絞られていた総時間をミリ秒で示します。Ideally = 0. byte-rate はクライアントのデータの生成/消費レートを bytes/sec で示します。(user, client-id) 割り当て量については、ユーザおよびクライアントidの両方が指定されます。もしclient-id毎の割り当て量がクライアントに適用される場合は、ユーザは指定されません。もしuser毎の割り当てが適用される場合、クライアントidは指定されません。
ユーザあるいはクライアントidごと (user, client-id)のリクエストの割り当て量のメトリクス kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+) 2つの属性。throttle-time はクライアントが絞られていた総時間をミリ秒で示します。Ideally = 0. request-time はクライアントグループからリクエストを処理するためにブローカー ネットワークとI/Oスレッド内で費やされた時間のパーセンテージを示します。(user, client-id) 割り当て量については、ユーザおよびクライアントidの両方が指定されます。もしclient-id毎の割り当て量がクライアントに適用される場合は、ユーザは指定されません。もしuser毎の割り当てが適用される場合、クライアントidは指定されません。
絞りを免れたリクエスト kafka.server:type=Request exempt-throttle-time は絞りを免れたリクエストを処理するためにブローカーネットワークとI/Oスレッドで費やされた時間のパーセンテージを示します。
ZooKeeperのクライアント リクエストのレイテンシ kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs ブローカーからのZooKeeperのリクエストについてのレイテンシのミリ秒。
ZooKeeperの接続ステータス kafka.server:type=SessionExpireListener,name=SessionState ブローカーのZooKeeperセッションの接続ステータスで、Disconnected|SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|Expired のうちの1つが可能です。
グループメタデータをロードする最大時間 kafka.server:type=group-coordinator-metrics,name=partition-load-time-max 過去30秒間に読み込まれたコンシューマオフセットパーティションからオフセットとグループメタデータを読み込むのにかかった最大時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む)
グループメタデータをロードする平均時間 kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg 過去30秒間に読み込まれたコンシューマオフセットパーティションからオフセットとグループメタデータを読み込むのにかかった平均時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む)
トランザクションメタデータをロードする最大時間 kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max 過去30秒間に読み込まれたコンシューマオフセットパーティションからトランザクションメタデータを読み込むのにかかった最大時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む)
トランザクションメタデータをロードする平均時間 kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg 過去30秒間に読み込まれたコンシューマオフセットパーティションからトランザクションメタデータを読み込むのにかかった平均時間(ミリ秒単位)(読み込みタスクがスケジュールされるのを待つために費やす時間を含む)
コンシューマグループオフセット数 kafka.server:type=GroupMetadataManager,name=NumOffsets コンシューマグループのコミットされたオフセットの総数
コンシューマグループ数 kafka.server:type=GroupMetadataManager,name=NumGroups コンシューマグループの総数
状態ごとのコンシューマグループ数 kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead] 各状態のコンシューマグループ数: PreparingRebalance, CompletingRebalance, Empty, Stable, Dead
パーティションの再割り当て数 kafka.server:type=ReplicaManager,name=ReassigningPartitions ブローカー上のリーダーパーティションの再割り当て数。
再割り当てトラフィックの発信バイトレート kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec
再割り当てトラフィックの受信バイトレート kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec

プロデューサ/コンシューマ/コネクタ/ストリームのための一般的な監視メトリクス

以下のメトリクスがプロデューサ/コンシューマ/コネクタ/ストリーム インスタンス上で利用可能です。特定のメトリクスについては、以下の章を見てください。
メトリック/属性 名 説明 Mbean 名
connection-close-rate ウィンドウ内での秒間あたりの閉じられた接続 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
connection-close-total ウィンドウ内で閉じられた総接続。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
connection-creation-rate ウィンドウ内で秒間あたりに確立された新しい接続 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
connection-creation-total ウィンドウ内で確立された新しい総接続。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
network-io-rate 全ての接続の秒間あたりのネットワーク オペレーション(読み込みあるいは書き込み)の数の平均。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
network-io-total 全ての接続の秒間あたりのネットワーク オペレーション(読み込みあるいは書き込み)の総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
外に行くバイトのレート 全てのサーバへの送信される外に行くバイトの秒間あたりの平均数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
outgoing-byte-total 全てのサーバに送信された外に行くバイトの総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
リクエスト レート 秒間あたり送信されるリクエストの平均数 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
request-total 送信されたリクエストの総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
リクエストのサイズの平均 ウィンドウ内の全てのリクエストの平均サイズ。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
リクエストサイズの最大 ウィンドウ内に送信されるリクエストの最大サイズ。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
やってくるバイトのレート 全てのソケットの読み込みのバイト/秒。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
incoming-byte-total 全てのソケットの読み込みの総バイト。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
応答レート 秒間あたり受け取った応答。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
response-total 受信した応答の総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
select-rate I/O層が新しいI/Oを調べる秒間あたりの回数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
select-total I/O層が新しいI/Oを調べる総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
io-wait-time-ns-avg I/Oスレッドが読み込みあるいは書き込みの準備ができたソケットを待つために費やした平均時間のナノ秒数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
io-wait-ratio I/Oスレッドが待つのに費やした断片時間。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
io-time-ns-avg select呼び出しごとのI/Oの平均時間のナノ秒数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
io-ratio I/OスレッドがI/Oを行うために費やした断片時間。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
connection-count 現在のアクティブな接続の数 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
successful-authentication-rate SASLあるいはSSLを使って認証が成功した秒間あたりの接続数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
successful-authentication-total SASLあるいはSSLを使って認証が成功した接続の総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
failed-authentication-rate 認証が失敗した秒間あたりの接続数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
failed-authentication-total 認証が失敗した接続の総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
successful-reauthentication-rate SASLあるいはSSLを使って再認証が成功した秒間あたりの接続数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
successful-reauthentication-total SASLあるいはSSLを使って再認証が成功した接続の総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
reauthentication-latency-max 再認証による最大レイテンシのミリ秒。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
reauthentication-latency-avg 再認証による平均レイテンシのミリ秒。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
failed-reauthentication-rate 再認証に失敗した秒間あたりの接続数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
failed-reauthentication-total 再認証に失敗した接続の総数。 kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
successful-authentication-no-reauth-total 再認証をサポートしない2.2.0より前のSASLクライアントを使って認証が成功した接続の総数。May only be non-zero kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)

プロデューサ/コンシューマ/コネクタ/ストリームのためのブローカーごとの一般的なメトリクス

以下のメトリクスがプロデューサ/コンシューマ/コネクタ/ストリーム インスタンス上で利用可能です。特定のメトリクスについては、以下の章を見てください。
メトリック/属性 名 説明 Mbean 名
外に行くバイトのレート ノードについての外に送信されるバイトの秒間あたりの平均数。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
outgoing-byte-total ノードに送信された外に行くバイトの総数。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
リクエスト レート ノードについての秒間あたり送信されるリクエストの平均数 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
request-total ノードに送信されたリクエストの総数。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
リクエストのサイズの平均 ノードについてのウィンドウ内の全てのリクエストの平均サイズ。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
リクエストサイズの最大 ノードについてのウィンドウ内に送信されるリクエストの最大サイズ。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
やってくるバイトのレート ノードについての秒間あたり受信されるバイトの平均数。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
incoming-byte-total ノードについて受信されたバイトの総数。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
request-latency-avg ノードについての平均リクエストレイテンシのms kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
request-latency-max ノードについての最大リクエストレイテンシのms kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
応答レート ノードについて秒間あたり受け取った応答。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
response-total ノードについて受信した応答の総数。 kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)

プロデューサの監視

以下のメトリクスがプロデューサのインスタンス上で利用可能です。
メトリック/属性 名 説明 Mbean 名
waiting-threads バッファメモリがレコードをキューに入れるのをブロックして待っているユーザスレッドの数。 kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-total-bytes クライアントが利用することができるバッファメモリの最大量 (現在それが使われているかどうか)。 kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-available-bytes 使用されていないバッファメモリの総量 (割り当てられていないかフリーのリストにあるかどちらか)。 kafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-time appenderが空間の割り当てを待つ時間の割合。 kafka.producer:type=producer-metrics,client-id=([-.\w]+)
プロデューサの送信のメトリクス
kafka.producer:type=producer-metrics,client-id="{client-id}"
属性名 説明
batch-size-avgリクエスト毎のパーティション毎に送信されるバイトの平均数。
batch-size-maxリクエスト毎のパーティション毎に送信されるバイトの最大数。
batch-split-rate秒間あたりの平均バッチ分割数
batch-split-totalバッチの総分割数
compression-rate-avgThe average compression rate of record batches, defined as the average ratio of the compressed batch size over the uncompressed size.
metadata-age使用されている現在のプロデューサのメタデータの経過時間の秒。
produce-throttle-time-avgブローカーによって絞られたリクエストの平均ms
produce-throttle-time-maxブローカーによって絞られたリクエストの最大ms
record-error-rateエラーになったレコード送信数の秒間あたりの平均
record-error-totalエラーになったレコード送信数の総数
record-queue-time-avgレコードバッチがsendバッファ内で費やした平均時間。
record-queue-time-maxレコードバッチがsendバッファ内で費やした最大時間のms
record-retry-rate再送信されたレコード数の秒間あたりの平均
record-retry-total再送信されたレコードの総数
record-send-rate秒間あたり送信されたレコードの平均数。
record-send-total送信されたレコードの総数。
record-size-avg平均レコードサイズ
record-size-max最大レコードサイズ
records-per-request-avgリクエスト毎のレコードの平均数。
request-latency-avg平均リクエストレイテンシのms
request-latency-max最大リクエストレイテンシのms
requests-in-flight応答を待っている送信中のリクエストの現在の数。
kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
属性名 説明
byte-rateトピックについての送信されるバイトの秒間あたりの平均数。
byte-totalトピックについての送信された総バイト数
compression-rateThe average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size over the uncompressed size.
record-error-rateトピックについてのエラーになったレコード送信数の秒間あたりの平均
record-error-totalトピックについてのエラーになったレコード送信数の総数
record-retry-rateトピックについての再送信されたレコード数の秒間あたりの平均
record-retry-totalトピックについての再送信されたレコードの総数
record-send-rateトピックについての送信されるレコードの秒間あたりの平均数。
record-send-totalトピックについての送信された総レコード数

コンシューマの監視

以下のメトリクスはコンシューマのインスタンス上で利用可能です。
メトリック/属性 名 説明 Mbean 名
time-between-poll-avg poll() の呼び出し間の平均遅延。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
time-between-poll-max poll() の呼び出し間の最大遅延。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
last-poll-seconds-ago 最後の poll() 呼び出しからの秒数。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
poll-idle-ratio-avg ユーザコードがレコードを処理するのを待つのではなく、コンシューマの poll() がアイドル状態である時間の平均割合。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
コンシューマグループのメトリクス
メトリック/属性 名 説明 Mbean 名
commit-latency-avg コミット リクエストにかかる平均時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-latency-max コミット リクエストにかかる最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-rate 秒間あたりのコミット呼び出しの数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-total コミット呼び出しの総数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
assigned-partitions 現在のところこのコンシューマに割り当てられたパーティションの数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-response-time-max ハートビート リクエストへの応答を受信するのに掛かった最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-rate 秒間あたりのハートビートの平均数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-total ハートビートの総数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-avg グループのrejoinにかかる平均時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-max グループのrejoinにかかる最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-rate 秒間あたりのグループjoinの数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-total グループjoinの総数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-avg グループのsyncにかかる平均時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-max グループのsyncにかかる最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-rate 秒間あたりのグループ同期の数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-total グループsyncの総数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-avg グループのリバランスにかかる平均時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-max グループのリバランスにかかる最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-total これまでにグループのリバランスにかかった合計時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-total 参加したグループのリバランスの総数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-rate-per-hour 1時間当たりのグループリバランスの総数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
failed-rebalance-total 失敗したリバランス合計数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
failed-rebalance-rate-per-hour 1時間当たりの失敗したグループリバランスイベントの数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
last-rebalance-seconds-ago 最後のリバランスイベントからの秒数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
last-heartbeat-seconds-ago 最後のコントローラーのハートビートからの秒数 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-avg パーティションで取り残されたリバランスリスナーコールバックにかかった平均時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-max パーティションで取り残されたリバランスリスナーコールバックにかかった最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-avg パーティションに割り当てられたリバランスリスナーコールバックにかかった平均時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-max パーティションに割り当てられたリバランスリスナーコールバックにかかった最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-avg パーティションに割り当てられたリバランスリスナーコールバックにかかった平均時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-max パーティションで失われたリバランスリスナーコールバックにかかった最大時間 kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
コンシューマのフェッチ メトリクス
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
属性名 説明
bytes-consumed-rate秒間あたりに消費されるバイトの平均数
bytes-consumed-total消費されるバイトの総数
fetch-latency-avgフェッチリクエストにかかる平均時間
fetch-latency-maxフェッチリクエストに掛かる最大時間。
fetch-rate秒間あたりのフェッチリクエストの数。
fetch-size-avgリクエスト毎にフェッチされる平均バイト数
fetch-size-max秒間あたりにフェッチされる最大バイト数
fetch-throttle-time-avg平均スロットル時間のms
fetch-throttle-time-max最大スロットル時間のms
fetch-totalフェッチリクエストの総数
records-consumed-rate秒間あたりに消費される平均レコード数
records-consumed-total消費されるレコードの総数
records-lag-maxこのウィンドウ内の任意のパーティションについてレコード数という点での最大の遅延
records-lead-minこのウィンドウ内の任意のパーティションについてレコード数という点での最小のリード数
records-per-request-avg各リクエスト内の平均レコード数
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
属性名 説明
bytes-consumed-rateトピックについての秒間あたりに消費されるバイトの平均数。
bytes-consumed-totalトピックについての消費された総バイト数
fetch-size-avgトピックについてのリクエスト毎にフェッチされる平均バイト数
fetch-size-maxトピックについてのリクエスト毎にフェッチされる最大バイト数
records-consumed-rateトピックについての秒間あたりに消費されるレコードの平均数。
records-consumed-totalトピックについての消費された総レコード数
records-per-request-avgトピックについての各リクエスト内の平均レコード数
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
属性名 説明
preferred-read-replicaパーティションの現在のリードレプリカ、あるいはリーダから読み取る場合は -1
records-lagパーティションの最新の遅延
records-lag-avgパーティションの平均遅延
records-lag-maxパーティションの最大遅延
records-leadパーティションの最新のリード
records-lead-avgパーティションの平均のリード
records-lead-minパーティションの最小のリード

接続の監視

コネクトのワーカープロセスは全てのプロデューサとコンシューマのメトリクスと、コネクトに固有のメトリクスを含みます。ワーカープロセス自身は多くのメトリクスを持ちますが、各コネクタとタスクは追加のメトリクスを持ちます。[2020-12-04 10:03:05,630] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [2020-12-04 10:03:05,632] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
kafka.connect:type=connect-worker-metrics
属性名 説明
connector-countこのワーカー内で実行されるコネクタの数。
connector-startup-attempts-totalこのワーカーが試行したコネクタの開始の総数。
connector-startup-failure-percentageこのワーカーのコネクタの開始が失敗した平均パーセンテージ。
connector-startup-failure-totalコネクタの開始が失敗した総数。
connector-startup-success-percentageこのワーカーのコネクタの開始が成功した平均パーセンテージ。
connector-startup-success-totalコネクタの開始が成功した総数。
task-countこのワーカー内で実行されるタスクの数。
task-startup-attempts-totalこのワーカーが試行したタスクの開始の総数。
task-startup-failure-percentageこのワーカーのタスクの開始が失敗した平均パーセンテージ。
task-startup-failure-totalタスクの開始が失敗した総数。
task-startup-success-percentageこのワーカーのタスクの開始が成功した平均パーセンテージ。
task-startup-success-totalタスクの開始が成功した総数。
kafka.connect:type=connect-worker-metrics,connector="{connector}"
属性名 説明
connector-destroyed-task-countワーカー上のコネクタの破棄されたタスクの数。
connector-failed-task-countワーカー上のコネクタの失敗したタスクの数。
connector-paused-task-countワーカー上のコネクタの一時停止されたタスクの数。
connector-running-task-countワーカー上のコネクタの実行中のタスクの数。
connector-total-task-countワーカー上のコネクタのタスクの数。
connector-unassigned-task-countワーカー上のコネクタの割り当てられていないタスクの数。
kafka.connect:type=connect-worker-rebalance-metrics
属性名 説明
completed-rebalances-totalこのワーカーによって完了したリバランスの総数。
connect-protocolこのクラスタで使われるコネクトプロトコル
epochこのワーカーのepochあるいは世代数。
leader-nameグループリーダーの名前。
rebalance-avg-time-msこのワーカーによってリバランスするのにかかった平均時間のミリ秒数
rebalance-max-time-msこのワーカーによってリバランスするのに費やされた最大時間のミリ秒数。
rebalancingこのワーカーが現在リバランスしているかどうか。
time-since-last-rebalance-msこのワーカーが最も最近のリバランスを完了してからのミリ秒数。
kafka.connect:type=connector-metrics,connector="{connector}"
属性名 説明
connector-classコネクタ クラスの名前
connector-typeコネクタの型'source' あるいは 'sink' の1つ。
connector-versionコネクタによって報告されたコネクタクラスのバージョン。
状態コネクタの状態。'unassigned', 'running', 'paused', 'failed' または 'destroyed' のうちの1つ。
kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
属性名 説明
batch-size-avgコネクタによって処理されたバッチの平均サイズ。
batch-size-maxコネクタによって処理されたバッチの最大サイズ。
offset-commit-avg-time-msこのタスクによってオフセットをコミットするのに掛かった平均時間のミリ秒数。
offset-commit-failure-percentageこのタスクのオフセットのコミットの試行が失敗した平均パーセンテージ。
offset-commit-max-time-msこのタスクによってオフセットをコミットするのにかかった最大時間のミリ秒数。
offset-commit-success-percentageこのタスクのオフセットのコミットの試行が成功した平均パーセンテージ。
pause-ratioこのタスクが休止状態で費やした断片時間。
running-ratioこのタスクが実行中の状態で費やした断片時間。
状態コネクタのタスクの状態。'unassigned', 'running', 'paused', 'failed' または 'destroyed' のうちの1つ。
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
属性名 説明
offset-commit-completion-rate完了が成功したオフセット コミットの完了の秒間あたりの平均数。
offset-commit-completion-total完了が成功したオフセット コミットの完了の総数。
offset-commit-seq-noオフセット コミットのための現在のシーケンス番号。
offset-commit-skip-rateあまりに遅くスキップ/無視されたオフセット コミットの完了の秒間あたりの平均数。
offset-commit-skip-totalあまりに遅くスキップ/無視されたオフセット コミットの完了の総数。
partition-countこのワーカー内で名前付きのシンクコネクタへ所属するこのタスクに割り当てられるトピックパーティションの数。
put-batch-avg-time-msこのタスクがシンクのレコードのバッチを格納するのに掛かる平均時間。
put-batch-max-time-msこのタスクがシンクのレコードのバッチを格納するのに掛かる最大時間。
sink-record-active-countKafkaから読み込まれたが、まだシンクタスクによって完全には コミット/フラッシュ/通知 されていないレコードの数。
sink-record-active-count-avgKafkaから読み込まれたが、まだシンクタスクによって完全には コミット/フラッシュ/通知 されていないレコードの平均数。
sink-record-active-count-maxKafkaから読み込まれたが、まだシンクタスクによって完全には コミット/フラッシュ/通知 されていないレコードの最大数。
sink-record-lag-maxシンクのタスクがどのトピックのパーティションに対してもコンシューマの位置よりも後ろにあるレコードの数という点での最大の遅延。
sink-record-read-rateこのワーカー内の名前付きのシンク コネクタに所属するこのタスクについてKafkaから読み込まれるレコードの数の秒あたりの平均数。これは変換前が適用されます。
sink-record-read-totalタスクが最後に再起動されてから、このワーカー内の名前付きのシンク コネクタに所属するこのタスクによってKafkaから読み込まれるレコードの総数。
sink-record-send-rate変換から出力され、このワーカー内の名前付きのシンク コネクタに所属するこのタスクに送信/配置されたレコードの秒間あたりの平均数。これは変換後が適用され、変換によって除外された全てのレコードが除外されます。
sink-record-send-total変換から出力され、タスクが最後に再起動されてから、このワーカー内の名前付きシンク コネクタに所属するこのタスクに送信/配置されたレコードの総数。
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
属性名 説明
poll-batch-avg-time-msこのタスクがソースのレコードのバッチをポーリングするのに掛かる平均ミリ秒。
poll-batch-max-time-msこのタスクがソースのレコードのバッチをポーリングするのに掛かる最大ミリ秒。
source-record-active-countこのタスクによって生成されたがまだKafkaへ完全には書き込まれていないレコードの数。
source-record-active-count-avgこのタスクによって生成されたがまだKafkaへ完全には書き込まれていないレコードの平均数。
source-record-active-count-maxこのタスクによって生成されたがまだKafkaへ完全には書き込まれていないレコードの最大数。
source-record-poll-rateこのワーカー内の名前付きのソース コネクタに所属するこのタスクによって(変換前に)生成/ポーリングされるレコードの数の秒あたりの平均数。
source-record-poll-totalこのワーカー内の名前付きのソース コネクタに所属するこのタスクによって(変換前に)生成/ポーリングされるレコードの数の総数。
source-record-write-rate変換から出力され、このワーカー内の名前付きのシンク コネクタに所属するこのタスクについてKafkaに書き込まれたレコードの秒間あたりの平均数。これは変換後が適用され、変換によって除外された全てのレコードが除外されます。
source-record-write-total変換から出力され、タスクが最後に再起動されてから、このワーカー内の名前付きソース コネクタに所属するこのタスクについてKafkaに書き込まれたレコードの数。
kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
属性名 説明
deadletterqueue-produce-failuresデッド レター キューへの書き込みに失敗した数。
deadletterqueue-produce-requestsデッド レター キューへの書き込み試行回数。
last-error-timestampこのタスクが最後にエラーが発生した時のエポック タイムスタンプ。
total-errors-logged記録されたエラーの数。
total-record-errorsこのタスクのレコード処理エラーの数。
total-record-failuresこのタスクのレコード処理失敗の数。
total-records-skippedエラーによってスキップされたレコードの数。
total-retries再試行されたオペレーションの数。

ストリームの監視

Kafka ストリームインスタンスは、全てのプロデューサとコンシューマのメトリクスと、ストリームに固有の追加のメトリクスを含みます。デフォルトで、Kafkaストリームは2つの記録レベルを持つメトリクスを持ちます: debuginfo

メトリクスは4層構造を持つことに注意してください。最上位には、開始された Kafka ストリームクライアントごとにクライアントレベルのメトリックスがあります。各クライアントには、独自のメトリックスを持つストリームスレッドがあります。各ストリームスレッドには、独自のメトリックスを持つタスクがあります。各タスクは独自のマトリックスメトリックスを持つプロセッサーノードを持ちます。各タスクも独自のマトリックスを持つ多くの状態ストアとレコードキャッシュを持ちます。

どのメトリックスを収集したいかを指定するために以下の設定オプションを使います:
metrics.recording.level="info"
クライアントメトリックス
以下の全てのメトリックスの記録レベルは、info です:
メトリック/属性 名 説明 Mbean 名
バージョン Kafka ストリームクライアントのバージョン。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
commit-id Kafka ストリームクライアントのバージョン管理コミット ID。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
application-id Kafka ストリームクライアントのアプリケーション ID。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
topology-description Kafka ストリームクライアントで実行されるトポロジの説明。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
state Kafka ストリームクライアントの状態。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
スレッドのメトリクス
以下の全てのメトリックスの記録レベルは、info です:
メトリック/属性 名 説明 Mbean 名
commit-latency-avg このスレッドの全ての実行中のタスクに渡る、コミットのための平均実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-latency-max このスレッドの全ての実行中のタスクに渡る、コミットのための最大実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-avg コンシューマポーリングの平均実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-max コンシューマポーリングの最大実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-avg 処理の平均実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-max 処理の最大実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-avg 中断の平均実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-max 中断の最大実行時間のミリ秒。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-rate 秒間あたりのコミットの平均数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-total コミット呼び出しの総数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-rate 秒間あたりのコンシューマ呼び出しの平均数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-total コンシューマポーリングの呼び出しの総数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-rate 秒間あたりに処理されたレコードの平均数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-total 処理されたレコードの総数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-rate 秒間あたりの中断呼び出しの平均数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-total 中断呼び出しの総数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-rate 秒間あたりに作成されたタスクの平均数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-total 作成されたタスクの総数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-rate 秒間あたりに閉じられるタスクの平均数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-total 閉じられたタスクの総数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
タスクのメトリクス
以下のメトリックスは、記録レべルが info の dropped-records-rate and dropped-records-total を除いて、全て記録レベルが debugです。:
メトリック/属性 名 説明 Mbean 名
process-latency-avg 処理のための平均実行時間のナノ秒。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-latency-max 処理のための最大実行時間のナノ秒。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-rate このタスクの全てのソースプロセッサノード全体で1秒あたりに処理されたレコードの平均数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-total このタスクの全てのソースプロセッサノードで処理されたレコードの総数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-latency-avg コミットの平均実行時間のナノ秒。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-latency-max コミットの最大実行時間のナノ秒。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-rate 秒間あたりのコミット呼び出しの平均数 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-total コミット呼び出しの総数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-avg レコードの観測された遅延の平均 (ストリーム時間 - レコードのタイムスタンプ)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-max レコードの観測された遅延の最大 (ストリーム時間 - レコードのタイムスタンプ)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-rate 秒間あたりの強制処理の平均数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-total 強制処理の総数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-rate このタスク内でドロップされたレコード数の平均 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-total このタスク内でドロップされたレコードの総数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
プロセッサーノードのメトリクス
以下のメトリックスは、特定のタイプのノードでのみ利用可能です。つまり、プロセスレートとプロセス合計はソースプロセッサノードでのみ利用可能で、suppression-emit-rate と suppression-emit-total は要請操作ノードでのみ利用可能です。全てのメトリックスの記録レベルは debug です:
メトリック/属性 名 説明 Mbean 名
process-rate 秒間あたりにソースプロセッサノードによって処理されたレコードの平均数。 kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
process-total 秒間あたりにソースプロセッサノードによって処理されたレコードの総数。 kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-rate 抑制操作ノードからダウンストリームに送信されたレコードの割合。 kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-total 抑制操作ノードからダウンストリームに送信されたレコードの総数。 kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
応対ストアのメトリクス
以下の全てのメトリクスは debug の記録レベルを持ちます。store-scope の値はユーザの独自の状態ストアのStoreSupplier#metricsScope()で指定されることに注意してください; 組み込みの状態ストアについては、現在のところ以下のものを持ちます:
  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • in-memory-suppression (抑制バッファ用)
  • rocksdb-state (RocksDBで支援される key-value ストア)
  • rocksdb-window-state (RocksDBで支援される window ストア)
  • rocksdb-session-state (RocksDBで支援される session ストア)
メトリックス suppression-buffer-size-avg、suppression-buffer-size-max、suppression-buffer-count-avg、suppression-buffer-count-max は、抑制バッファでのみ利用可能です。他の全てのメトリックスは抑制バッファでは利用できません。
メトリック/属性 名 説明 Mbean 名
put-latency-avg 平均配置実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-latency-max 最大配置実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-avg 存在しない時に配置する平均実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-max 存在しない時に配置する最大実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-avg 平均取得実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-max 最大取得実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-avg 平均削除実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-max 最大削除実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-avg put-all 実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-max 最大 put-all 実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-avg 全てのオペレーションの平均実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-max 全てのオペレーションの最大実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-avg 平均range実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-max 最大range実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-avg 平均flush実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-max 最大flush実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-avg 平均回復時刻時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-max 最大回復実行時間のns。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-rate このストアについての平均putレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-rate このストアについての平均put-if-absentレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-rate このストアについての平均getレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-rate このストアについての平均deleteレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-rate このストアについての平均put-allレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-rate このストアについての全てのオペレーションの平均レート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-rate このストアについての平均rangeレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-rate このストアについての平均flushレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-rate このストアについての平均restoreレート。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
suppression-buffer-size-avg サンプリング ウィンドウでバッファされたデータの平均総サイズのバイト。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-size-max サンプリング ウィンドウでバッファされたデータの最大総サイズのバイト。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-avg サンプリング ウィンドウでバッファされたレコードの平均数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-max サンプリング ウィンドウでバッファされたレコードの最大数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
RocksDB メトリックス
以下の全てのメトリクスは debug の記録レベルを持ちます。メトリックスは、RocksDB 状態ストアから毎分収集されます。時間及びセッションウィンドウでの集計の場合のように、状態ストアが複数の RocksDB インスタンスで構成される場合、各メトリックスは状態ストアの RocksDB インスタンスの集計を報告します。組み込みの RocksDB 状態ストアの store-scope は、現在のところ以下の通りです:
  • rocksdb-state (RocksDBで支援される key-value ストア)
  • rocksdb-window-state (RocksDBで支援される window ストア)
  • rocksdb-session-state (RocksDBで支援される session ストア)
メトリック/属性 名 説明 Mbean 名
bytes-written-rate RocksDB 状態ストアに秒間あたりに書き込まれた平均バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-total RocksDB 状態ストアに秒間あたりに書き込まれた総バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-rate RocksDB 状態ストアから秒間あたり読み込まれた平均バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-total RocksDB 状態ストアに秒間あたり読み込まれた総バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-rate memtable から ディスクに秒間あたりフラッシュされた平均バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-total memtable から ディスクに秒間あたりフラッシュされた総バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-hit-ratio memtable への全てのルックアップに対する memtable ヒットの比率。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-data-hit-ratio ブロックキャッシュへのデータブロックの全てのルックアップに対する、データブロックのブロックキャッシュヒットの比率。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-index-hit-ratio ブロックキャッシュへのインデックスブロックの全てのルックアップに対する、インデックスブロックのブロックキャッシュヒットの比率。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-filter-hit-ratio ブロックキャッシュへのフィルタブロックの全てのルックアップに対する、フィルタブロックのブロックキャッシュヒットの比率。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-avg 書き込みストールの平均持続時間のミリ秒。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-total 書き込みストールの総持続時間のミリ秒。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-compaction-rate 圧縮中の秒間あたりの読み込み平均バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-compaction-rate 圧縮中の秒間あたりの書き込み平均バイト数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-open-files 現在開かれているファイル数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-file-errors-total ファイルエラーの総発生数。 kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
レコード キャッシュのメトリクス
以下の全てのメトリクスは debug の記録レベルを持ちます:
メトリック/属性 名 説明 Mbean 名
hit-ratio-avg キャッシュ読み取り要求の総数に対するキャッシュ読み取りヒットの比率として定義される平均キャッシュヒット率。 kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-min 最小キャッシュ ヒット レシオ。 kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-max 最大キャッシュ ヒット レシオ。 kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)

その他

GC時間と他のstats、そしてCPU 利用率、I/O サービス 時間などのような様々なサーバのstatsを監視することをお勧めします。クライアント側ではメッセージ/バイト レート(グローバルとトピック毎)、リクエスト レート/サイズ/時間、そしてコンシューマー側では全てのパーティションに渡るメッセージ内の最大遅延と最小取得リクエストレートを監視することをお勧めします。コンシューマが遅れずについていくには、最大の遅延が閾値より少ない必要があり、最小の取得レートが0より大きい必要があります。

6.7 ZooKeeper

安定バージョン

現在の安定ブランチは、3.5 です。Kafka は、3.5 シリーズの最新のリリースを含むように定期的に更新されます。

ZooKeeperを操作可能にする

操作上は、健全なZooKeeperのインストレーションのために以下を行います:
  • 物理/ハードウェア/ネットワークレイアウトの冗長性: それらをすべて同じラックに入れないようにし、ハードウェアをまとも(気が狂わない程度)にし、冗長な電源とネットワークパスなどを維持するようにします。一般的なZooKeeperの集合は5または7のサーバを持ちます。これはそれぞれ2および3のサーバのダウンに耐性があります。もし小さな配備を持つ場合は、3つのサーバの使用が許容されますが、この場合1つのサーバのダウンのみに耐性があるだろうことを忘れないでください。
  • I/O 細分化: 書き込み型のトラフィックを多くする場合は、ほとんど間違いなく専用のディスクグループにトランザクションログが欲しいでしょう。トランザクションログへの書き込みは非同期(しかしパフォーマンスのためにバッチ化される)で、結果的に同時に起こる書き込みがパフォーマンスに極めて影響するかもしれません。ZooKeeperのスナップショットは同時に起こる書き込みのソースのようなものかもしれません。理想的にはトランザクションログとは別にディスクグループに書き込まれるべきです。スナップショットは非同期でディスクに書き込まれます。つまり、オペレーティングシステムとメッセージログファイルと一緒に共有することは一般的に大丈夫です。dataLogDirパラメータを持つ別個のディスクグループを持つようにサーバを設定することができます。
  • アプリケーションの分離: 同じbox上にインストールしたい他のアプリケーションのパターンを本当に理解しない限り、ZooKeeperを分離して実行することは良い考えかもしれません。(しかしこれはハードウェアの機能を使ったバランス行為かもしれません)。
  • 仮想化には注意してください: クラスタ レイアウト、読み込み/書き込み パターンおよびSLAに応じて動作しますが、非常に時間に依存する可能性があるため、仮想化層によって導入された僅かなオーバーヘッドによってZookeeperが追加および破棄されるかもしれません。 time sensitive
  • ZooKeeperの設定: javaですので '十分な' ヒープ空間を与えるようにしてください (通常それらを3-5Gで実行しますが、それはほとんど持っているデータセットのサイズです)。残念ながらそれの良い計算式を持っていませんが、ZooKeeperの状態をより考慮することはスナップショットが大きくなるかもしれないことを意味し、大きなスナップショットは回復時間に影響します。実際、もしスナップショットがあまりに大きく(2,3ギガバイト)なると、サーバが回復し集合に加わるのに十分な時間を与えるためにinitLimit パラメータを増やす必要があるかもしれません。
  • 監視: JMX と 4文字の単語 (4lw) のコマンドはとても便利で、それらは時にはオーバーラップします(そしてそれらの場合私たちは4文字のコマンドを好みます。それらはより予想可能、あるいはそれらはLI監視インフラストラクチャーと良く連携することは確かです)
  • クラスタを建て過ぎないでください: 大きなクラスタ、特に書き込みが激しい使用パターンは多くのクラスタ内の通信(書き込みの定員とその後のクラスタメンバーの更新)を意味しますが、中途半端に建てないでください(リスクがクラスタを圧倒します)。サーバを増やすと、読み取りの容量が増えます。
全体としては、私たちはZooKeeperシステムを仕事を処理できるだけ小さく(プラス標準的な成長可能性の計画)、できるだけ単純にしようとします。公式のリリースに比べて設定あるいはアプリケーションレイアウトに気まぐれなことをしないようにし、可能な限り自己内包し続けるようにします。これらの理由から、OSパッケージ化されたバージョンはOSの標準構造に配置する傾向があり、それは言うなれば'乱雑'に成り得るため、私たちはそれらをスキップしがちです。
TOP
inserted by FC2 system