<

ドキュメント

Kafka 2.7 Documentation

以前のリリース: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X.

1. 開始

1.1はじめに

イベントストリーミングとは何ですか?

イベントストリーミングは、人体の中枢神経システムのデジタル版です。これは、ビジネスがますますソフトウェア定義および自動化され、ソフトウェアの利用者がますますソフトウェアになる '常にオン' の世界のための技術的基盤です。

技術的に言えば、イベントストリーミングは、データベース、センサー、モバイルデバイス、クラウドサービス、ソフトウェアアプリケーションなどのイベントソースから、イベントのストリーム形式でリアルタイムのデータをキャプチャする; これらのイベントストリームを後で取得できるように格納する; イベントストリームの操作、処理、リアルタイムでの遡及的に対応する; 必要に応じて、イベントストリームを様々な宛先の技術にルーティングする、方法です。従って、イベントストリームにより、データの継続的な流れと解釈が保証され、適切な情報が適切な場所に適切なタイミングで送信されます。

イベントストリーミングは何に利用できますか?

イベントストリームは、様々な業界や組織の幅広い用途に適用されます。以下は多くの例です:

  • 証券取引所、銀行、保険などの支払いや金融取引をリアルタイムで処理するため。
  • 自動車、トラック、船隊、出荷を、物流や自動車業界などで、リアルタイムで追跡および監視する。
  • 工場や風力発電などの IoT デバイスや他の機器から、センサーデータを継続的にキャプチャして分析する。
  • 小売り、ホテル、旅行業界、モバイルアプリケーションなど、顧客とのやり取りや注文を収集し、即座に対応するため。
  • 入院中の患者を監視し、状態の変化を予測して、緊急時のタイムリーな治療を確保する。
  • 企業の様々な部門で作成されたデータを接続、保存、利用できるようにするため。
  • データプラットフォーム、イベント駆動アーキテクチャ、マイクロサービスの基盤として機能します。

Apache Kafka® は、イベントストリーミングプラットフォームです。どういう意味ですか?

Kafka は3つの主要な機能を組み合わせているため、単一の実戦テスト済みの解決策を使って、イベントストリーミングのエンドツーエンドのユースケースを実装できます。

  1. 他のシステムからのデータの継続的なインポート/エクスポートを含むイベントストリームを出版 (書き込み)および購読(読み込み)するため。
  2. イベントのストリームを必要なだけ永続的に確実に保存するため。
  3. イベントが発生した時、あるいは遡及的に処理するため。

そして、これら全ての機能は、分散、高スケーラブル、弾力的、耐障害性、安全な方法で提供されます。Kafka は、ベアメタルハードウェア、仮想マシン、コンテナに、オンプレミスおよびクラウドで展開できます。Kafka 環境を自己管理するか、様々なベンダーが提供するフルマネージドサービスを使うかを選択できます。

Kafka はどのように機能しますか?

Kafka は、高性能の TCP ネットワークプロトコルを介して通信するサーバクライアントで構成される分散システムです。オンプレミスおよびクラウド環境のベアメタルハードウェア、仮想マシン、コンテナに展開することができます。

サーバ: Kafka は、複数のデータセンタまたはクラウドリージョンに跨ることができる1つ以上のサーバのクラスタとして実行されます。これらのサーバの一部は、ブローカと呼ばれるストレージレイヤーを形成します。他のサーバは、Kafka コネクトを実行して、データをイベントストリームとして継続的にインポートおよびエクスポートして、Kafka をリレーショナルデータベースや他の Kafka クラスタなどの既存のシステムと統合します。ミッションクリティカルなユースケースを実装できるように、Kafka クラスタは非常にスケーラブルで耐障害性があります: サーバのいずれかに障害が発生した場合、他のサーバが作業を引き継ぎ、データを失うことなく継続的な運用を保証します。

クライアント: ネットワークの問題あるいはマシンの障害の場合でも、イベントストリームを変更して、大規模に、耐障害性のある方法で、読み取り、書き込み、処理する分散アプリケーションとマイクロサービスに書き込むことができます。Kafka はそのようなクライアントを同梱しています。これらのクライアントは、Kafka コミュニティによって提供される数十のクライアントによって拡張されます: クライアントは、Go、Python、C/C++、他の多くのプログラミング言語と REST API のための、より高いレベルの Kafka ストリームライブラリを含む Java および Scala で利用可能です。

主な概念と用語

イベントは、世界またはあなたのビジネスで、"何かが起こった" という事実を記録します。ドキュメントでは、レコードまたはメッセージとも呼ばれます。Kafka に対してデータを読み取りまたは書き込む場合、イベントの形式でこれを行います。概念的には、イベントにはキー、値、タイムスタンプ、オプションのメタデータヘッダがあります。以下はイベントの例です:

  • イベントキー: "Alice"
  • イベント値: "Made a payment of $200 to Bob"
  • イベントタイムスタンプ: "Jun. 25, 2020 at 2:06 p.m."

プロデューサは、Kafka にイベントを出版(書き込み)するクライアントアプリケーションで、コンシューマはこれらのイベントを購読(読み取りおよび処理)するクライアントアプリケーションです。Kafka では、プロデューサとコンシューマは完全に切り離され、お互いに無関係です。これは Kafka で知られている高いスケーラビリティを実現するための重要な設計要素です。例えば、プロデューサはコンシューマを待つ必要はありません。Kafka はイベントを確実に1回処理する機能など、様々な保証を提供します。

イベントはトピックに整理され、永続的に格納されます。非常に単純化されたトピックは、ファイルシステム内のフォルダに似ており、イベントはフォルダ内のファイルです。トピック名の例としては、"payments" があります。Kafka のトピックは常に複数のプロデューサと複数のサブスクライバです: トピックには、イベントを書き込む 0個以上のプロデューサと、これらのイベントを購読する 0個以上の多くのコンシューマがあります。トピック内のイベントは必要に応じて何度でも読み込まれます — 従来のメッセージングシステムと異なり、イベントは使用後に削除されません。代わりに、トピックごとの構成設定を通じて、Kafka がイベントを保持する期間を定義します。その後、古いイベントは破棄されます。Kafkaのパフォーマンスはデータのサイズに対して事実上一定です。つまり多くのデータを保持する事は問題になりません。

トピックは分割されています。つまり、トピックは、様々な Kafka ブローカーにある幾つかの "buckets" に分散しています。このようにデータを分散配置すると、クライアントアプリケーションが同時に多くのブローカとの間でデータを読み書きできるため、スケーラビリティにとって非常に重要です。新しいイベントがトピックに発行されると、実際にはトピックのパーティションの1つに追加されます。同じイベントキー(例えば、顧客あるいは車両 ID)を持つイベントは同じパーティションに書き込まれ、Kafka は特定のトピックパーティションの全てのコンシューマが常にそのパーティションのイベントを書き込まれた時と同じ順番で読み取ることを保証します

図: このトピックの例には4つのパーティション P1–P4 があります。2つの異なるプロデューサクライアントが、ネットワークを介してトピックパーティションにイベントを書き込むことにより、お互いに独立して新しいイベントをトピックに発行しています。同じキーを持つイベント(図では色で示されます)は、同じパーティションに書き込まれます。必要に応じて、両方のプロデューサが同じパーティションに書き込むことができることに注意してください。

データの耐障害性と高可用性の実現のために、地理的リージョンやデータセンタ間でも、全てのトピックをレプリケートすることができます。そのため、万が一に備えて、ブローカーのメンテナンスなどのために常にデータのコピーを持つ複数のブローカーが存在します。一般的な本番設定では、リプリケーション係数は3です。つまり、データのコピーは常に3つあります。このレプリケーションはトピックパーティションのレベルで行われます。

入門にはこの入門書で十分です。興味があれば、ドキュメントの設計セクションでは、Kafka の様々な概念について詳しく説明しています。

Kafka API

管理と管理タスクのためのコマンドラインツールに加えて、Kafka には Java および Scala 用の5つのコア API があります:

  • トピック、ブローカーおよび他の Kafka オブジェクトの管理と調査のための Admin API
  • イベントのストリームを1つ以上の Kafka トピックに出版(書き込み)するための プロデューサ API
  • 1つ以上のトピックを購読(読み取り)し、それらに対して生成されたイベントのストリームを処理するための コンシューマ API
  • ストリーム処理アプリケーションとマイクロサービスを実装するための Kafka ストリーム API。変換、集約や結合のようなステートフル操作、ウィンドウ、イベント時間に基づいた処理などを含む、イベントストリームを処理するための高レベル関数を提供します。1つ以上のトピックを生成するために、1つ以上のトピックから入力が読み取られ、入力ストリームを出力ストリームに効果的に変換します。
  • Kafka コネクト API は、外部システムおよびアプリケーションとの間でイベントのストリームを消費(読み取り)あるいは生成(書き込み)する再利用可能なデータのインポート/エクスポートコネクタを構築および実行して、Kafka と統合することができます。例えば、PostgreSQL のようなリレーショナルデータベースへのコネクタは、一連のテーブルに対する全ての変更をキャプチャする場合があります。ただし、実際には、Kafka コミュニティは既に数百のすぐに使うことができるコネクタを提供しているため、通常は独自のコネクタを実装する必要はありません。

この後どうすればいいか

1.2ユースケース

以下はApache Kafka®の人気のある使い方の2,3の説明です。これらの領域で活動中の多くの概要については、このブログの投稿を見てください。

メッセージング

Kafka は伝統的なメッセージブローカーの代替として良く動作します。メッセージブローカーは様々な理由で使われます (データプロデューサからの処理の切り離し、未処理メッセージのバッファ、など)。ほとんどのメッセージングシステムと比較してKafkaはより良いスループット、組み込みのパーティショニング、リプリケーション、および大規模メッセージ処理アプリケーションのための良い解決法となる耐障害性を持ちます。

私たちの経験では、メッセージの使い方はしばしば比較的低スループットですが、低end-to-endレンテンシを必要としてしばしばKafkaが提供する強い耐久性の保証に依存するかもしれません。

この領域において、KafkaはActiveMQ あるいは RabbitMQのような伝統的なメッセージングシステムと互換性があります。

Webサイト アクティビティ追跡

Kafkaの元のユースケースはリアルタイム出版-購読フィードのセットとしてユーザのアクティビティトラッキングパイプラインを再構築できるようにすることです。これは、サイトアクティビティ(ページビュー、検索、あるいはユーザが取るかも知れない他のアクション)がアクティビティタイプごとに1つのトピックを持つ中核トピックへ発行されることを意味します。これらのフィードは、リアルタイム処理、リアルタイム監視、および オフライン処理やレポートのためにHadoopあるいはウェアハウス システムへのロードを含む様々な領域の使い方のために購読することができます。

多くのアクティビティメッセージが各ユーザページビューに対して保証されるので、アクティビティトラッキングはしばしば大きなものになります。

マトリックス

Kafka はしばしば操作の監視データのために使われます。これはオペレーションデータの中央集権フィードを生成するために、分散型アプリケーションからの統計の集約を必要とします。

ログの集約

多くの人がKafkaをログ集約の解決法として使います。ログ集約は一般的に物理的なログファイルをサーバから離れて集め、それらを中核の場所(ファイルサーバあるいはHDFSおそらく)に処理のために配置します。Kafka はファイルの詳細を抽象化し、メッセージのストリームとしてログあるいはイベントデータのよりきれいな抽象を行います。これは低レンテンシの処理と、複数のデータソースと分散データの消費のより簡単なサポートを許容します。ScribeあるいはFlumeのようなログ中心のシステムに比べて、Kafkaは同等の良いパフォーマンス、リプリケーションによる強力な耐久性の保証、およびend-to-endのより低いレイテンシを提供します。

ストリーム処理

Kafkaの多くのユーザは多数のステージから成る処理パイプライン内のデータを処理します。生の入力データはKafkaのトピックから消費され、集約され、質を高められ、あるいは更なる消費または処理のフォローアップのために新しいトピックに変換されます。例えば、ニュースの文章のお勧めのためのパイプラインの処理は、RSSフィードから文章の内容をクロールし、それを "articles" トピックに発行するかもしれません; その先の処理はこの内容を正規化あるいは重複を取り除ききれいになった文章の内容を新しいトピックに出版するかもしれません; 最終的な処理のステージでは、この内容をユーザにお勧めするかもしれません。そのような処理パイプラインは各トピックに基づいたリアルタイムデータフローのグラフを生成します。上で述べたようなデータ処理を行うためにApache Kafkaでは0.10.0.0からKafka Streamsと呼ばれる軽量だが強力なストリーム処理ライブラリが利用可能です。Kafkaストリームは別として、代替となるオープンソースストリーム処理ツールは Apache StormApache Samzaを含みます。

イベント ソーシング

イベント ソーシング は状態の変化がレコードの時間順のシーケンスとして記録されるアプリケーション設計の形式です。とても大きな格納ログデータのためのKafkaのサポートは、この形式で構築されたアプリケーションのための洗練されたバックエンドになります。

コミットログ

Kafkaは分散型システムのための外部コミットログの一種として提供することができます。ログはノード間のデータのリプリケートを助け、障害ノードがデータを復旧するための再同期の仕組みとして振る舞います。Kafkaのログ コンパクション 機能はこの使い方をサポートします。この使い方において、KafkaはApache BookKeeper プロジェクトに似ています。

1.3クイックスタート

ステップ 1: Kafka の取得

最新の Kafka リリースをダウンロードし、それを解凍してください:

$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0

ステップ 2: Kafka 環境を開始

注意: ローカル環境に Java 8+ がインストールされている必要があります。

全てのサービスを正しい順番で開始するために、以下のコマンドを実行します:

# ZooKeeper サービスを開始する
# 注意: まもなく、Apache Kafka は Zookeeper を必要としなくなります。
$ bin/zookeeper-server-start.sh config/zookeeper.properties

別の端末セッションを開き、以下を実行します:

# Kafka ブローカーを開始する
$ bin/kafka-server-start.sh config/server.properties

全てのサービスが正常に起動すると、基本的な Kafka 環境が実行され、使えるようになります。

ステップ 3: イベントを格納するトピックを作成

Kafka は、多くのマシン間でイベント (このドキュメントでは、レコードあるいはメッセージとも呼ばれる)を読み込み、書き込み、格納、および処理を可能にする、分散イベントストリーミングプラットフォームです。

イベントの例としては、支払いトランザクション、携帯電話からの地理位置情報の更新、発送注文、IoT デバイスまたは医療機器からのセンサー測定などがあります。これらのイベントはトピックに整理され、格納されます。非常に単純化されたトピックは、ファイルシステム内のフォルダに似ており、イベントはフォルダ内のファイルです。

従って、最初のイベントを書き込む前にトピックを作成する必要があります。別の端末セッションを開き、以下を実行します:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Kafka の全てのコマンドラインツールには、追加のオプションがあります: 引数無しで kafka-topics.sh コマンドを実行すると、使用方法の情報が表示されます。例えば、新しいトピックのパーティション数などの詳細も表示できます。

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

ステップ 4: トピックへのイベントの書き込み

Kafka クライアントは、ネットワークを介して Kafka ブローカーとイベントの書き込み(あるいは読み込み)のための通信を行います。受け取ったブローカーは、必要な限り、永久に、永続的で耐障害性のある方法でイベントを格納します。

コンソールプロデューサクライアントを実行して、幾つかのイベントをトピックに書き込みます。デフォルトでは、入力した各行は、トピックに個別のイベントが書き込まれます。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

いつでも Ctrl-C を使ってプロデューサを停止することができます。

ステップ 5: イベントの読み込み

別の端末セッションを開き、作成したイベントを読み込むためにコンソールコンシューマクライアントを実行します:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

いつでも Ctrl-C を使ってコンシューマを停止することができます。

自由に実験してください: 例えば、プロデューサ端末(前のステップ)に切り替えて、追加のイベントを書き込み、イベントがコンシューマ端末にすぐに表示されることを確認します。

イベントは Kafka に永続的に格納されるため、何回でも、好きなだけの数のコンシューマが読み取ることができます。これを確認するには、さらに別の端末セッションを開いて、前のコマンドを再実行します。

ステップ 6: Kafka コネクトを使ってイベントのストリームとしてデータをインポート/エクスポート

おそらく、リレーショナルデータベースや従来のメッセージングシステムなどの既存のシステムに、すでにこれらのシステムを使っているアプリケーションと一緒に大量のデータがあるはずです。Kafka コネクタにより、外部のシステムから Kafka へ、またはその逆で、継続にデータを取り込むことができます。従って、既存のシステムを Kafka に統合することは非常に簡単です。この処理をさらに簡単にするために、すぐに利用できるそのような数百のコネクタがあります。

Kafka コネクトセクションで、Kafka にデータを継続的にインポート/エクスポートする方法の詳細をご覧ください。

ステップ 7: Kafka ストリームを使ったイベントの処理

データが Kafka にイベントとして格納されると、Java/Scala 用の Kafka ストリームクライアントライブラリを使ってデータを処理することができます。これにより、ミッションクリティカルなリアルタイムアプリケーションとマイクロサービスを実装することができます。ここで、入力および/または出力データは Kafka トピックに格納されます。Kafkaストリームは、これらのアプリケーションがスケーラブルが高く、柔軟で、耐障害性があり、分散されるなど、クライアント側でKafkaのサーバクラスタの技術の恩恵を受けながら標準的なJavaとScalaアプリケーションを書くことと配備することの平易化を兼ね備えます。ライブラリは、確実に1回の処理、ステートフル操作と集約、ウィンドウ、結合、イベント時間に基づいた処理などをサポートします。

最初の一噛みとして、人気のある WordCount アルゴリズムを実装する方法を以下に示します:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));

Kafka ストリームのデモアプリ開発チュートリアルは、そのようなストリーミングアプリケーションを最初から最後までコーディングして実行する方法を示します。

ステップ 8: Kafka 環境の終了

クイックスタートの最後に達したので、自由に Kafka 環境を破棄するか、または引き続き遊んでください。

  1. まだ行っていない場合は、Ctrl-C を使ってプロデューサとコンシューマクライアントを停止します。
  2. Ctrl-C を使って Kafka ブローカーを停止します。
  3. 最後に、Ctrl-C を使って ZooKeeper サーバを停止します。

途中で作成したイベントなど、ローカルの Kafka 環境のデータも削除する場合は、以下のコマンドを実行します:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper

おめでとう!

Apache Kafka クイックスタートが正常に完了しました。

詳細については、次の手順をお勧めします:

  • Kafka が高レベルでどのように動作するか、その主な概念、他のテクノロジーとの比較について学ぶには、短い始めにを読んでください。Kafka をさらに詳細に理解するには、ドキュメントにアクセスしてください。
  • ユースケースを閲覧して、世界中のコミュニティの他のユーザが Kafka からどのように価値を得ているかを学んでください。
  • 地元の Kakfa ミートアップグループに参加し、Kakfa コミュニティのメインカンファレンスであるKafka サミットのトークを見てください。

1.4エコシステム

メインの配布物の他にKafkaを使って統合する有り余るほどのツールがあります。エコシステムのページはこれらの多くを列挙し、ストリーム処理システム、Hadoop統合、監視および開発ツールを含みます。

1.5以前のバージョンからのアップグレード

Upgrading to 2.7.0 from any version 0.8.x through 2.6.x

2.1.x より前からアップグレードしている場合は、コンシューマオフセットを格納するために使われるスキーマの変更について以下の注記を見てください。inter.broker.protocol.version を最新のバージョンに変更すると、2.1より前のバージョンにダウングレードすることができません。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x 以上からアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルのバージョンを上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例えば、2.6, 2.5 など)
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。そうすると、ブローカは最新バージョンを実行し、クラスタの挙動とパフォーマンスが期待通りであることを確認することができます。何か問題がある場合は、この時点でダウングレードすることがまだ可能です。
  3. クラスタの挙動とパフォーマンスを確認すると、inter.broker.protocol.version を編集し、それを 2.7 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。ブローカーが最新のプロトコルを使い始めると、もうクラスタを古いバージョンにダウングレードすることができません。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 2.7 に変更し、それらを1つずつ再起動します。もうメンテナンスされていない古いScalaのクライアントは 0.11で導入された新しいメッセージフォーマットをサポートしないため、転換コストを避ける(あるいは 確実に一回のセマンティクスを利用する)には、新しいJavaクライアントが使われなければならないことに注意してください。
2.7.0 での主要な変更
  • 2.7.0 リリースは、KIP-595 で指定されたコアの Raft 実装を含みます。ロジックのほとんどを含む個々の "raft" モジュールがあります。コントローラと統合するまでユーザが Raft 実装のパフォーマンスをテストするために使えるスタンドアローンのサーバがあります。詳細は raft モジュール内の README.md を見てください。
  • KIP-651 はキーとトラストストアにPEMファイルを使うためのサポートを追加します。
  • KIP-612 はブローカー全体とリスナーごとの接続レートを適用するためのサポートを追加します。2.7.0 リリースには、KIP-612 の最初の部分が含まれ、動的構成は 2.8.0 リリースで提供さrます。
  • KIP-599 によってクラスタが損傷することを防ぐために、トピックとパーティションの作成またはトピックの削除を調節する機能
  • Kafka で新機能が利用可能になると、以下の2つの主要な問題が発生します:
    1. Kafka クライアントはどのようにしてブローカー機能を認識しますか?
    2. ブローカーはどの機能を有効にするかをどのように決定しますか?
    KIP-584 provides a flexible and operationally easy solution for client discovery, feature gating and rolling upgrades using a single restart.
  • ConsoleConsumer を使ってレコードオフセットとヘッダを出力する機能が、KIP-431 を介して可能になりました。
  • KIP-554 の追加は、Kafka からの Zookeeper の削除という目標に向けて進んでいます。KIP-554 の追加は、SCRAM証明書の管理のためにZooKeeperに直接接続する必要が無くなることを意味します。
  • 既存のリスナーの再構成不可能な構成を変更すると、InvalidRequestException が発生します。対称的に、以前の(意図しない)動作により、更新された構成が永続化されますが、ブローカーが再起動されるまで有効になりません。詳細については、KAFKA-10479 を見てください。既存のリスナーでサポートされる再構成可能な構成については、DynamicBrokerConfig.DynamicSecurityConfigsSocketServer.ListenerReconfigurableConfigs を参照してください。
  • Kafka Streams は、KStreams DSL でのスライディングウィンドウ集約のサポートを追加します。
  • 状態ストアの逆反復により、KIP-617 を使ったより効率的な最新の更新検索が可能になります。
  • Kafka ストリームのエンドツーエンドのレイテンシメトリクスの詳細については、KIP-613 を見てください。
  • Kafka ストリームは、KIP-607 でデフォルトの RocksDB プロパティを報告するメトリクスを追加しました。
  • KIP-616 からの Scala の暗黙的な Serdes サポートの改善

バージョン 0.8.x から 2.5.x を介して 2.6.0 へのアップグレード

2.1.x より前からアップグレードしている場合は、コンシューマオフセットを格納するために使われるスキーマの変更について以下の注記を見てください。inter.broker.protocol.version を最新のバージョンに変更すると、2.1より前のバージョンにダウングレードすることができません。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x 以上からアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルのバージョンを上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例えば、2.5, 2.4 など)
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。そうすると、ブローカは最新バージョンを実行し、クラスタの挙動とパフォーマンスが期待通りであることを確認することができます。何か問題がある場合は、この時点でダウングレードすることがまだ可能です。
  3. クラスタの挙動とパフォーマンスを確認すると、inter.broker.protocol.version を編集し、それを 2.6 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。ブローカーが最新のプロトコルを使い始めると、もうクラスタを古いバージョンにダウングレードすることができません。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 2.6 に変更し、それらを1つずつ再起動します。もうメンテナンスされていない古いScalaのクライアントは 0.11で導入された新しいメッセージフォーマットをサポートしないため、転換コストを避ける(あるいは 確実に一回のセマンティクスを利用する)には、新しいJavaクライアントが使われなければならないことに注意してください。
2.6.0 での主要な変更
  • Kafka ストリームは、確実に1回の保証を使って、アプリケーションのスケーラビリティを向上させる新しい処理モードを追加します(ブローカー 2.5以降が必要です)(参照 KIP-447)
  • Java 11 以降では、TLSv1.3 がデフォルトで有効になっています。クライアントとサーバは、どちらもサポートしている場合は TLSv1.3 をネゴシエートし、そうでない場合は TLSv1.2 にフォールバックします。詳細は、KIP-573 を見てください。
  • client.dns.lookup 設定のデフォルト値が、default から use_all_dns_ips に変更されました。ホスト名が複数の IP アドレスに解決される場合、クライアントとブローカーは、接続が正常に確立されるため、各 IP に順番に接続しようとします。詳細については、KIP-602 を見てください。
  • NotLeaderForPartitionException は非推奨になり、NotLeaderOrFollowerException に置き換えられました。ブローカーがレプリカでは無い場合、フェッチ要求とリーダーまたはフォロワーのみを対象としたその他の要求は、REPLICA_NOT_AVAILABLE(9) ではなく NOT_LEADER_OR_FOLLOWER(6) を返し、再割り当て中の一時的なエラーが全てのクライアントによって再試行可能な例外として処理されるようにします。

バージョン 0.8.x から 2.4.x を介して 2.5.0 へのアップグレード

2.1.x より前からアップグレードしている場合は、コンシューマオフセットを格納するために使われるスキーマの変更について以下の注記を見てください。inter.broker.protocol.version を最新のバージョンに変更すると、2.1より前のバージョンにダウングレードすることができません。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x 以上からアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルのバージョンを上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g., 2.4, 2.3, etc.)
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。そうすると、ブローカは最新バージョンを実行し、クラスタの挙動とパフォーマンスが期待通りであることを確認することができます。何か問題がある場合は、この時点でダウングレードすることがまだ可能です。
  3. クラスタの挙動とパフォーマンスを確認すると、inter.broker.protocol.version を編集し、それを 2.5 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。ブローカーが最新のプロトコルを使い始めると、もうクラスタを古いバージョンにダウングレードすることができません。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。全て(またはほとんど)のコンシューマが 0.11.0 以降にアップグレードされたら、各ブローカーで log.message.format.version を 2.5 に変更し、それらを1つずつ再起動します。もうメンテナンスされていない古いScalaのクライアントは 0.11で導入された新しいメッセージフォーマットをサポートしないため、転換コストを避ける(あるいは 確実に一回のセマンティクスを利用する)には、新しいJavaクライアントが使われなければならないことに注意してください。
  6. KIP-455 の完了後、再割り当てツール kafka-reassign-partitions.sh に幾つかの注目すべき変更があります。このツールでは、アクティブな再割り当てスロットルを変更する時に --additional フラグを指定する必要があります。--cancel コマンドを使って、再割り当てのキャンセルが可能になりました。最期に、--zookeeper による再割り当ては非推奨になり、--bootstrap-server が優先されます。詳細は KIP を見てください。
2.5.0 での主要な変更
  • RebalanceProtocol#COOPERATIVE を使う場合、Consumer#poll は、コンシューマがまだ所有しているパーティションのリバランスの最中に、データをまだ返すことができます; さらに、Consumer#commitSync は、致命的な CommitFailedException と区別し、ユーザに実行中のリバランスを完了してからまだ所有しているパーティションのオフセットのコミットを再試行できるように、致命的ではない RebalanceInProgressException を投げます。
  • 一般的なネットワーク環境での回復性を向上させるために、zookeeper.session.timeout.ms のデフォルト値が6秒から18秒に、replica.lag.time.max.ms が10秒から30秒に増加されました。
  • 複数のストリームを一度に集約するための新しい DSL 演算子 cogroup() が追加されました。
  • 入力イベントストリームを KTable に変換するための新しい KStream.toTable() API が追加されました。
  • 入力トピックから null キーまたは null 値を表す新しい Serde 型 Void が追加されました。
  • UsePreviousTimeOnInvalidTimestamp が非推奨になり、UsePartitionTimeOnInvalidTimeStamp に置き換えられました。
  • 保留オフセットフェンシングメカニズムとより強力なトランザクションコミット整合性チェックを追加することにより、確実に一回のセマンティクスが改善されました。これにより、スケーラブルな確実に一回のアプリケーションの実装が簡素化されます。また、examples フォルダの下に、新しい確実に一回のセマンティクスコードの例を追加しました。詳細については、KIP-447 を見てください。
  • Added a new public api KafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried. It provides information about the partition number where the key resides in addition to hosts containing the active and standby partitions for the key.
  • Provided support to query stale stores (for high availability) and the stores belonging to a specific partition by deprecating KafkaStreams.store(String, QueryableStoreType) and replacing it with KafkaStreams.store(StoreQueryParameters).
  • Added a new public api to access lag information for stores local to an instance with KafkaStreams.allLocalStorePartitionLags().
  • Scala 2.11 is no longer supported. See KIP-531 for details.
  • All Scala classes from the package kafka.security.auth have been deprecated. See KIP-504 for details of the new Java authorizer API added in 2.4.0. Note that kafka.security.auth.Authorizer and kafka.security.auth.SimpleAclAuthorizer were deprecated in 2.4.0.
  • TLSv1 and TLSv1.1 have been disabled by default since these have known security vulnerabilities. Only TLSv1.2 is now enabled by default. You can continue to use TLSv1 and TLSv1.1 by explicitly enabling these in the configuration options ssl.protocol and ssl.enabled.protocols.
  • ZooKeeper has been upgraded to 3.5.7, and a ZooKeeper upgrade from 3.4.X to 3.5.7 can fail if there are no snapshot files in the 3.4 data directory. This usually happens in test upgrades where ZooKeeper 3.5.7 is trying to load an existing 3.4 data dir in which no snapshot file has been created. For more details about the issue please refer to ZOOKEEPER-3056. A fix is given in ZOOKEEPER-3056, which is to set snapshot.trust.empty=true config in zookeeper.properties before the upgrade.
  • ZooKeeper version 3.5.7 supports TLS-encrypted connectivity to ZooKeeper both with or without client certificates, and additional Kafka configurations are available to take advantage of this. See KIP-515 for details.

Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x or 2.3.x to 2.4.0

2.1.x より前からアップグレードしている場合は、コンシューマオフセットを格納するために使われるスキーマの変更について以下の注記を見てください。inter.broker.protocol.version を最新のバージョンに変更すると、2.1より前のバージョンにダウングレードすることができません。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x 以上からアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルのバージョンを上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3).
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。そうすると、ブローカは最新バージョンを実行し、クラスタの挙動とパフォーマンスが期待通りであることを確認することができます。何か問題がある場合は、この時点でダウングレードすることがまだ可能です。
  3. クラスタの挙動とパフォーマンスを確認すると、inter.broker.protocol.version を編集し、それを 2.4 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。ブローカーが最新のプロトコルを使い始めると、もうクラスタを古いバージョンにダウングレードすることができません。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.4 on each broker and restart them one by one. もうメンテナンスされていない古いScalaのクライアントは 0.11で導入された新しいメッセージフォーマットをサポートしないため、転換コストを避ける(あるいは 確実に一回のセマンティクスを利用する)には、新しいJavaクライアントが使われなければならないことに注意してください。

アップグレードの追加の注意:

  1. ZooKeeper has been upgraded to 3.5.6. ZooKeeper upgrade from 3.4.X to 3.5.6 can fail if there are no snapshot files in 3.4 data directory. This usually happens in test upgrades where ZooKeeper 3.5.6 is trying to load an existing 3.4 data dir in which no snapshot file has been created. For more details about the issue please refer to ZOOKEEPER-3056. A fix is given in ZOOKEEPER-3056, which is to set snapshot.trust.empty=true config in zookeeper.properties before the upgrade. But we have observed data loss in standalone cluster upgrades when using snapshot.trust.empty=true config. For more details about the issue please refer to ZOOKEEPER-3644. So we recommend the safe workaround of copying empty snapshot file to the 3.4 data directory, if there are no snapshot files in 3.4 data directory. For more details about the workaround please refer to ZooKeeper Upgrade FAQ.
  2. An embedded Jetty based AdminServer added in ZooKeeper 3.5. AdminServer is enabled by default in ZooKeeper and is started on port 8080. AdminServer is disabled by default in the ZooKeeper config (zookeeper.properties) provided by the Apache Kafka distribution. Make sure to update your local zookeeper.properties file with admin.enableServer=false if you wish to disable the AdminServer. Please refer AdminServer config to configure the AdminServer.
Notable changes in 2.4.0
  • A new Admin API has been added for partition reassignments. Due to changing the way Kafka propagates reassignment information, it is possible to lose reassignment state in failure edge cases while upgrading to the new version. It is not recommended to start reassignments while upgrading.
  • ZooKeeper has been upgraded from 3.4.14 to 3.5.6. TLS and dynamic reconfiguration are supported by the new version.
  • The bin/kafka-preferred-replica-election.sh command line tool has been deprecated. It has been replaced by bin/kafka-leader-election.sh.
  • The methods electPreferredLeaders in the Java AdminClient class have been deprecated in favor of the methods electLeaders.
  • Scala code leveraging the NewTopic(String, int, short) constructor with literal values will need to explicitly call toShort on the second literal.
  • The argument in the constructor GroupAuthorizationException(String) is now used to specify an exception message. Previously it referred to the group that failed authorization. This was done for consistency with other exception types and to avoid potential misuse. The constructor TopicAuthorizationException(String) which was previously used for a single unauthorized topic was changed similarly.
  • The internal PartitionAssignor interface has been deprecated and replaced with a new ConsumerPartitionAssignor in the public API. Some methods/signatures are slightly different between the two interfaces. Users implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.
  • The DefaultPartitioner now uses a sticky partitioning strategy. This means that records for specific topic with null keys and no assigned partition will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but it may result in uneven distribution of records across partitions in edge cases. Generally users will not be impacted, but this difference may be noticeable in tests and other situations producing records for a very short amount of time.
  • The blocking KafkaConsumer#committed methods have been extended to allow a list of partitions as input parameters rather than a single partition. It enables fewer request/response iterations between clients and brokers fetching for the committed offsets for the consumer group. The old overloaded functions are deprecated and we would recommend users to make their code changes to leverage the new methods (details can be found in KIP-520).
  • We've introduced a new INVALID_RECORD error in the produce response to distinguish from the CORRUPT_MESSAGE error. To be more concrete, previously when a batch of records were sent as part of a single request to the broker and one or more of the records failed the validation due to various causes (mismatch magic bytes, crc checksum errors, null key for log compacted topics, etc), the whole batch would be rejected with the same and misleading CORRUPT_MESSAGE, and the caller of the producer client would see the corresponding exception from either the future object of RecordMetadata returned from the send call as well as in the Callback#onCompletion(RecordMetadata metadata, Exception exception) Now with the new error code and improved error messages of the exception, producer callers would be better informed about the root cause why their sent records were failed.
  • We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The ConsumerCoordinator will choose the latest RebalanceProtocol that is commonly supported by all of the consumer's supported assignors. You can use the new built-in CooperativeStickyAssignor or plug in your own custom cooperative assignor. To do so you must implement the ConsumerPartitionAssignor interface and include RebalanceProtocol.COOPERATIVE in the list returned by ConsumerPartitionAssignor#supportedProtocols. Your custom assignor can then leverage the ownedPartitions field in each consumer's Subscription to give partitions back to their previous owners whenever possible. Note that when a partition is to be reassigned to another consumer, it must be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the ConsumerPartitionAssignor RebalanceProtocol javadocs for more information.
    To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same ConsumerPartitionAssignor that supports the cooperative protocol. This can be done with two rolling bounces, using the CooperativeStickyAssignor for the example: during the first one, add "cooperative-sticky" to the list of supported assignors for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it. Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see KIP-429.
  • There are some behavioral changes to the ConsumerRebalanceListener, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown all the way up to the Consumer.poll() call. The onPartitionsLost method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions (such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing onPartitionsRevoked API to align with previous behavior. Note however that onPartitionsLost will not be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group.
    The semantics of the ConsumerRebalanceListener's callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to onPartitionsLost, onPartitionsRevoked will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The onPartitionsAssigned callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on the new callback semantics, see the ConsumerRebalanceListener javadocs.
  • The Scala trait kafka.security.auth.Authorizer has been deprecated and replaced with a new Java API org.apache.kafka.server.authorizer.Authorizer. The authorizer implementation class kafka.security.auth.SimpleAclAuthorizer has also been deprecated and replaced with a new implementation kafka.security.authorizer.AclAuthorizer. AclAuthorizer uses features supported by the new API to improve authorization logging and is compatible with SimpleAclAuthorizer. For more details, see KIP-504.

Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0

2.1.x より前からアップグレードしている場合は、コンシューマオフセットを格納するために使われるスキーマの変更について以下の注記を見てください。inter.broker.protocol.version を最新のバージョンに変更すると、2.1より前のバージョンにダウングレードすることができません。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x, 1.0.x, 1.1.x, 2.0.x あるいは 2.1.xからアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルのバージョンを上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2).
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。そうすると、ブローカは最新バージョンを実行し、クラスタの挙動とパフォーマンスが期待通りであることを確認することができます。何か問題がある場合は、この時点でダウングレードすることがまだ可能です。
  3. クラスタの挙動とパフォーマンスを確認すると、inter.broker.protocol.version を編集し、それを 2.3 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。ブローカーが最新のプロトコルを使い始めると、もうクラスタを古いバージョンにダウングレードすることができません。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 2.3 に変更し、それらを1つずつ再起動します。もうメンテナンスされていない古いScalaのクライアントは 0.11で導入された新しいメッセージフォーマットをサポートしないため、転換コストを避ける(あるいは 確実に一回のセマンティクスを利用する)には、新しいJavaクライアントが使われなければならないことに注意してください。
2.3.0 での主要な変更
  • 増分協調リバランシングに基づいたKafkaコネクトのための新しいリバランシング プロトコルを導入しました。新しいプロトコルはコネクト ワーカー間のリバランシング段階で全てのタスクを停止することを必要としません。その代わりに、ワーカー間で交換される必要があるタスクだけが停止され、それらはその後のリバランスで開始されます。新しいコネクト プロトコルは 2.3.0 からデフォルトで有効です。それがどう動作するか、eagerリバランシングの古い挙動を有効にする方法についての詳細は、増分協調リバランシングの設計を調べてください。
  • コンシューマのユーザに対して静的なメンバーシップを導入しています。この機能は通常のアップリケーションのアップグレードあるいはローリング バウンスの間の不必要なリバランスを減らします。使い方の詳細については、静的なメンバーシップの設計を調べてください。
  • Kafka ストリーム DSL は使用するストア型を切り替えます。この変更は主にユーザにとって透過的ですが、コードの変更を必要とするかもしれない幾つかの目立たないケースがあります。詳細はKafka ストリーム アップグレードの章を見てください。
  • Kafka Streams 2.3.0 requires 0.11 message format or higher and does not work with older message format.

0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x , 1.0.x, 1.1.x, 2.0.x あるいは 2.1.x から 2.2.0 へのアップグレード

2.1.x より前からアップグレードしている場合は、コンシューマオフセットを格納するために使われるスキーマの変更について以下の注記を見てください。inter.broker.protocol.version を最新のバージョンに変更すると、2.1より前のバージョンにダウングレードすることができません。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x, 1.0.x, 1.1.x あるいは 2.0.xからアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルのバージョンを上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0).
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。そうすると、ブローカは最新バージョンを実行し、クラスタの挙動とパフォーマンスが期待通りであることを確認することができます。何か問題がある場合は、この時点でダウングレードすることがまだ可能です。
  3. クラスタの挙動とパフォーマンスを確認すると、inter.broker.protocol.version を編集し、それを 2.2 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。ブローカーが最新のプロトコルを使い始めると、もうクラスタを古いバージョンにダウングレードすることができません。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 2.2 に変更し、それらを1つずつ再起動します。もうメンテナンスされていない古いScalaのクライアントは 0.11で導入された新しいメッセージフォーマットをサポートしないため、転換コストを避ける(あるいは 確実に一回のセマンティクスを利用する)には、新しいJavaクライアントが使われなければならないことに注意してください。
Notable changes in 2.2.1
  • Kafka Streams 2.2.1 requires 0.11 message format or higher and does not work with older message format.
2.2.0 での主要な変更
  • デフォルトのコンシューマグループは、空の文字列 ("") から null に変更されました。新しいデフォルトのグループidを使うコンシューマはトピックを購読し、そしてオフセットを取得あるいはコミットすることができません。コンシューマグループidとしての空の文字列は非推奨ですが、将来のメジャー リリースまでサポートされます。空の文字列のグループidに依存する古いクライアントは、今では明示的にそれをコンシューマの設定として提供しなければなりません。詳細な情報はKIP-289を見てください。
  • bin/kafka-topics.sh コマンドライン ツールは今ではzookeeperの代わりに --bootstrap-server を使って直接ブローカーに接続することができます。古い --zookeeper オプションは今のところはまだ利用可能です。詳細な情報は KIP-377を読んでください。
  • Kafka ストリームは MacOS 10.13 以上を必要とするRocksDBの新しいバージョンに依存します。

0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x あるいは 2.0.0 から 2.1.0 へのアップグレード

2.1.x はコンシューマ オフセットを格納するために使われる内部的なスキーマへの変更を含むことに注意してください。アップグレードが完了すると、以前のバージョンにダウングレードすることはできません。詳細は以下のローリング アップグレードを見てください。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x, 1.0.x, 1.1.x あるいは 2.0.xからアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルのバージョンを上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0).
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。そうすると、ブローカは最新バージョンを実行し、クラスタの挙動とパフォーマンスが期待通りであることを確認することができます。何か問題がある場合は、この時点でダウングレードすることがまだ可能です。
  3. クラスタの挙動とパフォーマンスを確認すると、inter.broker.protocol.version を編集し、それを 2.1 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。ブローカーが最新のプロトコルを使い始めると、もうクラスタを古いバージョンにダウングレードすることができません。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 2.1 に変更し、それらを1つずつ再起動します。もうメンテナンスされていない古いScalaのクライアントは 0.11で導入された新しいメッセージフォーマットをサポートしないため、転換コストを避ける(あるいは 確実に一回のセマンティクスを利用する)には、新しいJavaクライアントが使われなければならないことに注意してください。

アップグレードの追加の注意:

  1. このバージョンではオフセットの期限切れのセマンティクスが少し変更されました。新しいセマンティクスによると、グループが対応するトピックに購読されていてまだアクティブ(アクティブなコンシューマを持つ)場合、グループ内のパーティションのオフセットは削除されません。もしグループが空になると、デフォルトの維持期間(あるいはブローカーによって設定されたもの)が過ぎた後で、全てのオフセットが削除されます (グループが再度アクティブにならない限り)。Kafkaグループ管理を使用しないスタンドアローン(単純)なコンシューマに関連付けられたオフセットは、最後のコミットからデフォルトの維持期間(あるいはブローカーによって設定されたもの)が経過した後で削除されます。
  2. group.idが提供されない場合、コンソール コンシューマのenable.auto.commitプロパティのデフォルトは今ではfalseに設定されます。これは、自動生成されたグループが他のコンシューマによって使用される可能性が低いため、コンシューマのコーディネータのキャッシュの汚染 を回避するためです。
  3. KIP-91delivery.timeout.msを導入したため、プロデューサのretries 設定のデフォルト値がInteger.MAX_VALUEに変更されました。これはレコードの送信と肯定の応答を受信するまでの合計時間の上限を設定します。デフォルトでは、配送タイムアウトは2分に設定されています。
  4. デフォルトでは、プロデューサを設定する時に、MirrorMakerはdelivery.timeout.msInteger.MAX_VALUE に上書きします。より早く失敗するためにretriesの値を上書きした場合、代わりにdelivery.timeout.msを上書きする必要があります。
  5. ListGroup API は、推奨される代替手段として、ユーザがリスト化できるグループへのDescribe Group アクセスを今では期待します。古い Describe Cluster アクセスは下位互換性のためにまだサポートされますが、このAPIに使用することはお勧めしません。
  6. KIP-336 は ExtendedSerializer と ExtendedDeserializer インタフェースを廃止し、Serializer と Deserializer の使用を伝播します。ExtendedSerializer と ExtendedDeserializer は、Java 7互換の形式でシリアライザとデシリアライザのためのレコード ヘッダを提供するためにKIP-82 で導入されました。Java 7 のサポートが廃止されたため、これらのインタフェースを統合しました。
2.1.0 での主要な変更
  • Jetty は 9.4.12 にアップグレードされました。TLS_RSA_* cipher は前方秘匿性をサポートしていないため、デフォルトで除外されています。詳細は https://github.com/eclipse/jetty.project/issues/2807 を参照してください。
  • トピックごとの構成の上書きを使うことでunclean.leader.election.enable 設定が動的に更新される場合、コントローラによって明確ではないリーダーの選択が自動的に有効になります。
  • AdminClient はメソッド AdminClient#metrics() を追加しました。AdminClientを使用する全てのアプリケーションは、AdminClientからキャプチャーされたメトリックスを表示することで、より多くの情報と洞察を取得することができます。詳しくは KIP-324 を見てください。
  • Kafka はKIP-110から Zstandard 圧縮をサポートします。使用するには、クライアントと同様にブローカーもアップグレードする必要があります。2.1.0 より前のコンシューマは Zstandard 圧縮を使用するトピックから読み取ることができないため、全てのダウンストリーム コンシューマがアップグレードされるまでトピックに対して使用しないでください。詳細は KIP を見てください。

0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x あるいは 1.1.x から 2.0.0 へのアップグレード

Kafka 2.0.0 はwireプロトコルの変更を導入します。以下で推奨されるrollingアップグレード計画に従うことで、アップグレード中のダウンタイムが無いことを保証します。しかし、アップグレードの前に2.0.0 での注目すべき変更を見直してください。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x, 1.0.x あるいは 1.1.x からアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルの形式を上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1).
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.version を編集し 2.0 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 2.0 に変更し、それらを1つずつ再起動します。古いScalaのコンシューマは 0.11で導入された新しいメッセージフォーマットをサポートしないため、down-conversionのパフォーマンスコストを避ける(あるいは (確実に一回のセマンティクスを利用する)には、新しいJavaコンシューマが使われなければならないことに注意してください。

アップグレードの追加の注意:

  1. ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それらをバックアップします。それらはデフォルトで新しいプロトコルを使って開始するでしょう。
  2. プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。メッセージのフォーマットのバージョンと同じです。
  3. Kafkaストリームコード内でJava8のメソッドのリファレンスを使っている場合は、メソッドの曖昧さを解決するためにコードを更新する必要があるかもしれません。jarファイルのホット スワップだけではうまく行かないかもしれません。
  4. クラスタ内の全てのブローカーが更新されるまで、ACLはプリフィックスされたリソース (KIP-290で追加されました)に追加されるべきではありません。

    注意: クラスタが完全にアップグレードされた後であっても、クラスタに追加されたプリフィックス付きのACLは、クラスタが再びダウングレードされた場合に無視されます。

2.0.0 での主要な変更
  • KIP-186 デフォルトのオフセット維持期間を1日から7日に増やします。これにより頻繁にコミットするアプリケーション内のオフセットを"失われ"にくくします。それはオフセットのアクティブなセットも増やし、従ってブローカー上のメモリの使用率を増やすことができます。コンソールのコンシューマは現在のところデフォルトでオフセットのコミットを有効にしており、この変更は今では1ではなく7日間保持して多数のオフセットのソースに成り得ることに注意してください。ブローカー設定のoffsets.retention.minutes を 1440 に設定することで既存の挙動を維持することができます。
  • Java 7のサポートは無くなり、今ではJava 8が必要とされる最小のバージョンです。
  • ssl.endpoint.identification.algorithmのデフォルトの値は httpsに変更されました。これはホスト名の検証を行います (そうでなければ中間者攻撃が可能です)。以前の挙動を復活されるには ssl.endpoint.identification.algorithm に空の文字を設定してください。
  • KAFKA-5674max.connections.per.ip minimumの下限間隔を0に拡張し、従って入ってくる接続のIPベースのフィルタリングができます。
  • KIP-272 は API バージョンタグをメトリックkafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}に追加しました。このメトリックは今では kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...} になります。これは自動的に集約しないJMX監視ツールに影響があるでしょう。特定のリクエスト型についての総数を取得するには、ツールが異なるバージョンを横断して集約できるように更新される必要があります。
  • KIP-225 トピックおよびパーティションのためのタグを使うためにメトリック "records.lag" を変更しました。名前形式 "{topic}-{partition}.records-lag" を持つ元のバージョンが削除されました。
  • 0.11.0.0から非推奨になった Scala コンシューマが削除されました。Java コンシューマは今ではgracefullyにシャットダウンします。1.1.0 (より古い)Scalaコンシューマは、ブローカーが2.0.0にアップグレードされたとしても動作し続けるでしょう。
  • 0.10.0.0から非推奨になった Scalaプロデューサは削除されました。Java プロデューサは0.9.0.0からお勧めのオプションになっています。Javaプロデューサのデフォルトのパーティショナーの挙動はScalaプロデューサのデフォルトのパーティショナーと異なることに注意してください。移設するユーザは以前の挙動を維持する独自のパーティショナーを設定することを考慮するべきです。1.1.0 (より古い)Scalaプロデューサは、ブローカーが2.0.0にアップグレードされたとしても動作し続けるでしょう。
  • MirrorMaker と ConsoleConsumer はScalaコンシューマをもうサポートしません。それらは常にJavaコンシューマを使います。
  • ConsoleProducer はScalaプロデューサをもうサポートしません。それは常にJavaプロデューサを使います。
  • Scalaクライアントに依存する多くの非推奨のツールが削除されました: ReplayLogProducer, SimpleConsumerPerformance, SimpleConsumerShell, ExportZkOffsets, ImportZkOffsets, UpdateOffsetsInZK, VerifyConsumerRebalance。
  • 非推奨の kafka.tools.ProducerPerformance が削除されました。org.apache.kafka.tools.ProducerPerformance を使ってください。
  • 古いバージョンからのローリング バウンス アップグレードが可能な新しいKafkaストリーム設定パラメータupgrade.from が追加されました。
  • KIP-284 はデフォルトの値から Long.MAX_VALUE に設定することでKafkaストリームのパーティショントピックの維持期間を変更しました。
  • 状態ストアをプロセッサ トポロジに登録するためのKafkaストリームの ProcessorStateManager APIを更新しました。詳細はストリームの更新ガイドを読んでください。
  • 早期リリースでは、コネクタのワーカーの設定は internal.key.converterinternal.value.converter プロパティを必要としました。2.0では、もう必要とされず、デフォルトはJSONコンバータになります。これらのプロパティをコネクト スタンドアローンおよび分散されたワーカー設定から削除することができます:
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=false
  • KIP-266はブロックするかもしれないKafkaConsumerAPIのために使われるデフォルトのタイムアウトを指定するために新しいコンシューマ設定 default.api.timeout.msを追加します。そのKIPはdefault.api.timeout.msによって設定されるデフォルトのタイムアウトを使用する代わりに、それらのそれぞれについて使う特定のタイムアウトの指定をサポートするためにそのようなブロッキングAPIに負荷も掛けます。特に、動的なパーティションの割り当てのためにブロックしない新しい poll(Duration) API が追加されました。古い poll(long) は非推奨になり、将来のバージョンで削除されるでしょう。Durationを取るpartitionsFor, listTopics, offsetsForTimes, beginningOffsets, endOffsets および close のような他のKafkaConsumerメソッドにもオーバーロードが追加されました。
  • KIP-266 の一部として、request.timeout.msのデフォルトの値が30秒に変更されました。以前の値はリバランスに掛かるだろう最大時間を考慮するために5分より少し高い値でした。今では特別な場合としてリバランスジのJoinGroupリクエストを扱い、リクエストのタイムアウトとしてmax.poll.interval.ms から派生した値を使います。他の全てのリクエストの型はrequest.timeout.msで定義されたタイムアウトを使います。
  • 内部メソッドkafka.admin.AdminClient.deleteRecordsBefore が削除されました。org.apache.kafka.clients.admin.AdminClient.deleteRecordsに移動することをお勧めします。
  • AclCommand ツール --producerの便利なオプションは指定されたトピック上でよりKIP-277のより細かいACLを使います。
  • KIP-176 は全てのコンシューマベースのツールについて --new-consumer オプションを削除します。このオプションは--bootstrap-server が定義された場合に新しいコンシューマが自動的に使われるため冗長です。
  • KIP-290 は例えば'foo'から始まるトピックのようなプリフィックス付きのリソース上でのACLを定義する機能を追加します。
  • KIP-283 はKafkaブローカーでのメッセージのダウン変換の処理を改善します。これは一般的にメモリに集中的な操作です。KIPは、一度にパーティションデータのチャンクをダウンコンバートすることで、メモリ消費量の上限に役立つように、メモリ集中が少なくなる仕組みを追加します。この改善により、ブローカーが無効なオフセットを使って特大のメッセージバッチを応答の最後に送信できるように FetchResponse プロトコルの挙動の変更があります。そのような特大のメッセージはKafkaConsumerによって行われるように、コンシューマクライアントによって無視されるべきです。

    KIP-283はダウンコンバートが有効かどうかを制御するために新しいトピックとブローカーの設定、それぞれmessage.downconversion.enablelog.message.downconversion.enable も追加します。無向な場合、ブローカーはダウンコンバージョンを行わず、代わりにクライアントにUNSUPPORTED_VERSIONエラーを送信します。

  • 動的なブローカー設定オプションはブローカーが開始される前に kafka-configs.sh を使ってZooKeeperに格納することができます。このオプションは全てのパスワードがZooKeeperに暗号化して格納されるように、server.propertiesの中で平文のパスワードで格納されることを避けるために使うことができます。
  • 接続の試行が失敗した場合、ZooKeeperのホストは今では再解決されます。しかしもしZooKeeperのホスト名が複数のアドレスに解決され、それらの幾つかが到達可能では無い場合、接続タイムアウトzookeeper.connection.timeout.msを増やす必要があるかもしれません。
新しいプロトコルのバージョン
  • KIP-279: OffsetsForLeaderEpochResponse v1 はパーティションレベル leader_epoch フィールドを導入します。
  • KIP-219: クォータ違反で絞られた非クラスタ アクション リクエストと応答のプロトコルバージョンを上げます。
  • KIP-290: ACL create, descrbe および delete リクエストと応答のプロトコルバージョンを上げます。
1.1 Kafkaストリーム アプリケーションのアップグレード
  • ストリームアプリケーションの 1.1 から 2.0 のアップグレードはブローカーのアップグレードを必要としません。Kafka ストリーム 2.0 アプリケーションは 2.0, 1.1, 1.0, 0.11.0, 0.10.2 および 0.10.1 ブローカーへ接続することができます (ですが、0.10.0 ブローカーに接続することができません)。
  • 2.0では、1.0より前で非推奨になったpublic API を削除したことに注意してください; それらの非推奨のAPIのユーザの利用は適宜コードの変更が必要です。詳細は2.0.0でのストリーム APIの変更を見てください。

0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x あるいは 1.0.x から 1.1.x へのアップグレード

Kafka 1.1.0 はwireプロトコルの変更を導入します。以下で推奨されるrollingアップグレード計画に従うことで、アップグレード中のダウンタイムが無いことを保証します。しかし、アップグレードの前に1.1.0 での注目すべき変更を見直してください。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.x あるいは 1.0.x からアップグレードし、メッセージ形式を上書きしていない場合は、内部のブローカープロトコルの形式を上書きする必要だけがあります。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0 あるいは 1.0)。
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.version を編集し 1.1 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 1.1 に変更し、それらを1つずつ再起動します。古いScalaのコンシューマは 0.11で導入された新しいメッセージフォーマットをサポートしないため、down-conversionのパフォーマンスコストを避ける(あるいは (確実に一回のセマンティクスを利用する)には、新しいJavaコンシューマが使われなければならないことに注意してください。

アップグレードの追加の注意:

  1. ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それらをバックアップします。それらはデフォルトで新しいプロトコルを使って開始するでしょう。
  2. プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。メッセージのフォーマットのバージョンと同じです。
  3. Kafkaストリームコード内でJava8のメソッドのリファレンスを使っている場合は、メソッドの曖昧さを解決するためにコードを更新する必要があるかもしれません。jarファイルのホット スワップだけではうまく行かないかもしれません。
1.1.1 での主要な変更
  • バージョン 0.10.0.x からのローリング バウンス アップグレードが可能な新しいKafkaストリーム設定パラメータ upgrade.fromが追加されました
  • この新しい設定についての詳細は Kafka ストリーム アップグレード ガイド を見てください。
1.1.0での主要な変更
  • Mavenでのkafkaのアーティファクトは log4j あるいは slf4j-log4j12 にもう依存しません。kafkaクライアントのアーティファクトと同様に、今ではユーザは適切な slf4j モジュール (slf4j-log4j12, logback など)を含めることでログのバックエンドを選択することができます。リリースのtarballはまだ log4j と slf4j-log4j12 を含みます。
  • KIP-225 トピックおよびパーティションのためのタグを使うためにメトリック "records.lag" を変更しました。名前形式 "{topic}-{partition}.records-lag" を持つ元のバージョンは非推奨になり、2.0.0で削除されるでしょう。
  • Kafkaストリームはブローカーの通信エラーに対してより丈夫になりました。fatal例外でKafkaストリーム クライアントを停止する代わりに、Kafkaストリームは自己回復しクラスタへ再接続しようとします。新しいAdminClientを使用すると、Kafkaストリームがどれだけ再試行するかをより良く制御し、(古いバージョンのようにハードコードされた再試行の代わりに)細かいタイムアウトを設定することができます。
  • Kafkaストリームのリバランス時間が更に削減され、Kafkaストリームのレスポンスが構造しました。
  • 今ではKafkaコネクトはシンクとソースコネクタの両方でメッセージヘッダをサポートし、単純なメッセージ変換を使って操作できるようになりました。コネクタは明示的にそれらを使うように変更されなければなりません。ヘッダがどのように(デ)シリアライズ化されるかを制御するために新しいHeaderConverterが導入され、値の文字列表現を使うためにデフォルトで新しい "SimpleHeaderConverter" が使われます。
  • もしprint-data-logがデコーダのような他のオプションのために明示的あるいは暗黙的に有効な場合、kafka.tools.DumpLogSegmentsは今では自動的にdeep-iterationオプションを設定します。
新しいプロトコルのバージョン
  • KIP-226で DescribeConfigs Request/Response v1 を導入しました。
  • KIP-227でフェッチ リクエスト/応答 v7 を導入しました。
1.0 Kafkaストリームアプリケーションのアップグレード
  • ストリームアプリケーションの 1.0 から 1.1 のアップグレードはブローカーのアップグレードを必要としません。Kafka ストリーム 1.1 アプリケーションは 1.0, 0.11.0, 0.10.2 および 0.10.1 ブローカーへ接続することができます (ですが、0.10.0 ブローカーに接続することはできません)。
  • 詳細は1.1.0でのストリームAPIの変更 を見てください。

0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x あるいは 0.11.0.x から 1.0.0 へのアップグレード

Kafka 1.0.0 はwireプロトコルの変更を導入します。以下で推奨されるrollingアップグレード計画に従うことで、アップグレード中のダウンタイムが無いことを保証します。しかし、アップグレードの前に1.0.0 での注目すべき変更を見直してください。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は使用している現在のメッセージフォーマットを参照します。以前に上書きされたメッセージ形式のバージョンを持っている場合は、それを現在の値に維持する必要があります。別のやり方として、もし 0.11.0.x より前のバージョンからアップグレードする場合は、CURRENT_KAFKA_VERSION に合致するように CURRENT_MESSAGE_FORMAT_VERSION が設定される必要があります。 0.11.0.xからアップグレードしメッセージ形式を上書きしていない場合は、メッセージ形式のバージョンと内部ブローカーのプロトコルのバージョンの両方を 0.11.0 に設定する必要があります。
    • inter.broker.protocol.version=0.11.0
    • log.message.format.version=0.11.0
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.version を編集し 1.0 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。
  5. 上で指示されたようにメッセージ形式のバージョンを上書きした場合、それを最新のバージョンにアップグレードするためにもう一度ローリングリスタートをする必要があります。一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 1.0 に変更し、それらを1つずつ再起動します。0.11.0 からアップグレードし log.ssage.format.version が 0.11.0 に設定されている場合、設定を更新し、ローリング再起動をスキップすることができます。古いScalaのコンシューマは 0.11で導入された新しいメッセージフォーマットをサポートしないため、down-conversionのパフォーマンスコストを避ける(あるいは (確実に一回のセマンティクスを利用する)には、新しいJavaコンシューマが使われなければならないことに注意してください。

アップグレードの追加の注意:

  1. ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それらをバックアップします。それらはデフォルトで新しいプロトコルを使って開始するでしょう。
  2. プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。メッセージのフォーマットのバージョンと同じです。
1.0.2 での主要な変更
  • バージョン 0.10.0.x からのローリング バウンス アップグレードが可能な新しいKafkaストリーム設定パラメータ upgrade.fromが追加されました
  • この新しい設定についての詳細は Kafka ストリーム アップグレード ガイド を見てください。
1.0.1 での主要な変更
  • 0.11.0.x のAdminClientのオプションクラス (例えば、 CreateTopicsOptions, DeleteTopicsOptions など)のバイナリ互換性が復活されました。(ソースではなく)バイナリの互換性は 1.0.0 で不注意で壊れていました。
1.0.0 での主要な変更
  • トピックの削除が今では安定したため、今はその機能がデフォルトで有効です。以前の挙動を保持したいユーザはブローカーの設定delete.topic.enablefalseに設定してください。トピックの削除はデータを削除しオペレーションが可逆ではないことに注意してください (つまり、"undelete" オペレーションはありません)
  • パーティションについてオフセットが見つからない場合はタイムスタンプの検索がサポートされます。そのパーティションは今はnullオフセット値を使って検索結果に含まれます。以前は、パーティションはマップ内に含まれませんでした。この変更は検索の挙動がタイムスタンプ検索をサポートしないトピックの場合と一貫性を持つように行われました。
  • inter.broker.protocol.version が 1.0以上の場合、ブローカーはオフラインのログディレクトリがあるとしてもライブ ログディレクトリ上でレプリカを提供するためにオンラインで有り続けるでしょう。ログディレクトリはハードウェアの障害による IOException のためにオフラインになるかもしれません。ユーザはオフラインログディレクトリがあるかどうかを調べるためにブローカーごとのメトリックofflineLogDirectoryCount を監視する必要があります。
  • 回復可能な例外である KafkaStorageException が追加されました。クライアントの FetchRequest あるいは ProducerRequest のバージョンが KafkaStorageException をサポートしない場合、KafkaStorageException は 応答の中で NotLeaderForPartitionException に変換されるでしょう。
  • -XX:+DisableExplicitGC はデフォルトのJVM設定の中で -XX:+ExplicitGCInvokesConcurrent に置き換えられました。これはある状況下で直接のバッファによるネイティブなメモリの割り当ての間に、メモリ不足例外を避けるのに役立ちます。
  • 上書きされた handleError メソッドの実装は、kafka.api package: FetchRequest, GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetRequest, ProducerRequest および TopicMetadataRequest の中の以下の非推奨のクラスから削除されました。これはブローカー上での使用だけを目的としていましたが、もう使われておらず、実装はメンテナンスされていませんでした。スタブ 実装はバイナリ互換性のために維持されています。
  • Java クライアントとツールは、今では cient-id としてどのような文字も受け付けます。
  • 非推奨のツール kafka-consumer-offset-checker.sh が削除されました。コンシューマ グループの詳細を取得するには、kafka-consumer-groups.sh を使ってください。
  • SimpleAclAuthorizer は今ではデフォルトでオーソライザへのアクセス拒否を記録します。
  • 認証の失敗は今ではAuthenticationException のサブクラスの1つとしてクライアントに報告されます。クライアント接続が認証を失敗した場合はリトライはされないでしょう。
  • 独自の SaslServer 実装は、クライアントに認証エラーを示すエラーメッセージを返すために SaslAuthenticationException を投げるかもしれません。実装者は認証されなかったクライアントへ漏らすべきでは無いセキュリティに重要な情報を例外メッセージの中に含めないように注意しなければなりません。
  • バージョンおよびコミットidを提供するためにJMXに登録されたapp-info mbean は非推奨になり、これらの属性を提供するメトリクスに置き換えられるでしょう。
  • Kafka のメトリクスは今では非数値の値を含むかもしれません。org.apache.kafka.common.Metric#value() は非推奨になり、各クライアントのメトリックの値を読み込むユーザを壊す可能性を最小限にするために0.0 を返すでしょう (MetricsReporter 実装経由またはmetrics()メソッドの呼び出し)。org.apache.kafka.common.Metric#metricValue() は数字あるいは非数字のメトリック値を扱うために使うことができます。
  • 各Kafkaのレートのメトリックはダウンストリームの処理を簡単にするために今ではサフィックス-totalを使って対応する累積カウントメトリックを持ちます。例えば、records-consumed-raterecords-consumed-totalという名前の対応するメトリックを持ちます。
  • Mx4j はシステム プロパティ kafka_mx4jenabletrue に設定された場合にのみ有効にされるでしょう。論理の反転バグにより、以前はデフォルトで有効で、もしkafka_mx4jenabletrue に設定された場合には無効でした。
  • クライアントのjar のパッケージorg.apache.kafka.common.security.auth は公開され、javadocsに追加されました。以前にこのパッケージに配置されていた内部クラスは他の場所に移動されました。
  • オーソライザーを使っており、ユーザがトピックに必要な権限を持たない場合、ブローカーはブローカー上のトピックの存在に関係ないリクエストに TOPIC_AUTHORIZATION_FAILED エラーを返すでしょう。もしユーザが必要なパーミッションを持ち、トピックが存在しない場合は、UNKNOWN_TOPIC_OR_PARTITION エラーコードが返されるでしょう。
  • 新しいコンシューマの設定プロパティを使うために、config/consumer.properties ファイルが更新されます。
新しいプロトコルのバージョン
  • KIP-112: LeaderAndIsrRequest v1 でパーティションレベル is_new フィールドが導入されます。
  • KIP-112: UpdateMetadataRequest v4 でパーティションレベル offline_replicas フィールドが導入されます。
  • KIP-112: MetadataResponse v5 でパーティションレベル offline_replicas フィールドが導入されます。
  • KIP-112: ProduceResponse v4 で KafkaStorageException のためのエラーコードが導入されます。
  • KIP-112: FetchResponse v6 で KafkaStorageException のためのエラーコードが導入されます。
  • KIP-152: SaslAuthenticate リクエスト が認証の失敗の報告を有効にするために追加されました。このリクエストはもし SaslHandshake リクエストのバージョンが 0より大きい場合に使われるでしょう。
0.11.0 Kafkaストリーム アプリケーションの更新
  • ストリームアプリケーションを0.11.0から1.0へ更新するにはブローカーの更新を必要としません。Kafkaストリーム 1.0 アプリケーションは 0.11.0, 0.10.2 および 0.10.1 ブローカーと接続することができます (しかし 0.10.0ブローカーへ接続することはできません)。しかし、Kafkaストリーム 1.0 は 0.10以上のメッセージ形式を必要とし、古いメッセージ形式とは動作しません。
  • メトリクスのセンサー構造が変更されたため、ストリームのメトリクスを監視している場合は、レポート内と監視コード内のメトリクス名にいくつか変更をする必要があるでしょう。
  • ProcessorContext#schedule(), Processor#punctuate() を含む少しのpublic APIがあり、KStreamBuilder, TopologyBuilder は新しいAPIによって非推奨になりました。対応するコードの変更を行うことをお勧めします。新しいAPIはとてもよく似ているため、アップグレードするには大したことはないでしょう。
  • 詳細はStreams API changes in 1.0.0 を見てください。
0.10.2のKafkaストリームアプリケーションをアップグレード
  • ストリームアプリケーションを0.10.2から1.0へ更新するにはブローカーの更新を必要としません。Kafka ストリーム 1.0 アプリケーションは 1.0, 0.11.0, 0.10.2 および 0.10.1 ブローカーへ接続することができます (ですが、0.10.0 ブローカーに接続することはできません)。
  • メトリクスのセンサー構造が変更されたため、ストリームのメトリクスを監視している場合は、レポート内と監視コード内のメトリクス名にいくつか変更をする必要があるでしょう。
  • ProcessorContext#schedule(), Processor#punctuate() を含む少しのpublic APIがあり、KStreamBuilder, TopologyBuilder は新しいAPIによって非推奨になりました。対応するコードの変更を行うことをお勧めします。新しいAPIはとてもよく似ているため、アップグレードするには大したことはないでしょう。
  • 設定内で 独自のkey.serde, value.serde および timestamp.extractor を指定する場合は、これらの設定が非推奨のため、それらの置き換えられた設定パラメータを使うことをお勧めします。
  • 詳細はStreams API changes in 0.11.0 を見てください。
0.10.1のKafkaストリームアプリケーションをアップグレード
  • ストリームアプリケーションを 1.10.1 から 1.0 へ更新するにはブローカーの更新を必要としません。Kafka ストリーム 1.0 アプリケーションは 1.0, 0.11.0, 0.10.2 および 0.10.1 ブローカーへ接続することができます (ですが、0.10.0 ブローカーに接続することはできません)。
  • コードを再コンパイルする必要があります。単にKafkaストリームライブラリの jar ファイルを入れ替えるだけでは動作せず、アプリケーションが壊れるでしょう。
  • メトリクスのセンサー構造が変更されたため、ストリームのメトリクスを監視している場合は、レポート内と監視コード内のメトリクス名にいくつか変更をする必要があるでしょう。
  • ProcessorContext#schedule(), Processor#punctuate() を含む少しのpublic APIがあり、KStreamBuilder, TopologyBuilder は新しいAPIによって非推奨になりました。対応するコードの変更を行うことをお勧めします。新しいAPIはとてもよく似ているため、アップグレードするには大したことはないでしょう。
  • 設定内で 独自のkey.serde, value.serde および timestamp.extractor を指定する場合は、これらの設定が非推奨のため、それらの置き換えられた設定パラメータを使うことをお勧めします。
  • TimestampExtractor インタフェースが変更されたため、独自の(つまりユーザ実装の)タイムスタンプ エクストラクタを使う場合は、このコードを更新する必要があるでしょう。
  • StreamsMetric インタフェースが変更されたため、独自のメトリクスを登録する場合、このコードを変更する必要があるでしょう。
  • 詳細は1.0.0でのストリームAPIの変更, 0.11.0でのストリームAPIの変更 および 0.10.2でのストリームAPIの変更を見てください。
0.10.0のKafkaストリームアプリケーションをアップグレード
  • Kafkaストリーム 1.0 アプリケーションは 0.1, 0.11.0, 0.10.2 あるいは 0.10.1 ブローカーへのみ接続することができるため、ストリームアプリケーションの 0.10.0 から 1.0 へのアップグレードはブローカーのアップグレードを必要とします。
  • 2,3のAPIの変更があります。後方互換性はありません(参照詳細は1.0.0でのストリームAPIの変更, 0.11.0でのストリームAPIの変更, 0.10.2でのストリームAPIの変更 および 0.10.1でのストリームAPIの変更 を見てください。このように、コードを更新し再コンパイルする必要があります。単にKafkaストリームライブラリの jar ファイルを入れ替えるだけでは動作せず、アプリケーションが壊れるでしょう。
  • 0.10.0.xから1.0.2への更新は、最初の更新フェーズについて設定upgrade.from="0.10.0"セットを伴う2回のローリング バウンスを必要とします (参照KIP-268)。別のやり方として、オフラインの更新も可能です。
    • ローリングバウンスのためのアプリケーションインスタンスを準備し、新しいバージョン 0.11.0.3 のために設定upgrade.from"0.10.0" に設定するようにしてください。
    • アプリケーションの各インスタンスを一度バウンスします
    • 2回目のローリングバウンスのために新しく配備された 1.0.2 アプリケーションインスタンスを準備します; 設定upgrade.modeのための値を削除するようにしてください
    • アップグレードを完了するためにもう一度各アプリケーションのインスタンスをバウンスします
  • 0.10.0.x から 1.0.0 あるいは 1.0.1 へのアップグレードはオフライン アップグレードを必要とします (ローリング バウンス アップグレードはサポートされません)
    • 全ての古い (0.10.0.x) アプリケーションのインスタンスを停止します
    • コードを更新し、古いコードとjarファイルを新しいコードと新しいjarファイルと交換します
    • 全ての新しい (1.0.0 あるいは 1.0.1) アプリケーションのインスタンスを再起動します

0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x あるいは 0.10.2.x から 0.11.0.0 へのアップグレード

Kafka 0.11.0.0 は新しいメッセージフォーマットのバージョンとwireプロトコルの変更を導入します。以下で推奨されるrollingアップグレード計画に従うことで、アップグレード中のダウンタイムが無いことを保証します。しかし、アップグレードの前に0.11.0.0 での注目すべき変更を見直してください。

バージョン 0.10.2から、Javaクライアント (プロデューサとコンシューマ)は古いブローカーと通信することができるようになりました。バージョン 0.11.0 のクライアントはバージョン 0.10.0 あるいはそれより新しいブローカーと話すことができます。しかし、ブローカーが 0.10.0 より古い場合は、クライアントをアップグレードする前にKafkaクラスタ内の全てのブローカーをアップグレードしなければなりません。バージョン 0.11.0 のブローカーは 0.8.x とそれより新しいクライアントをサポートします。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties を更新し、以下のプロパティを追加します。CURRENT_KAFKA_VERSION はアップグレード元のバージョンを参照します。CURRENT_MESSAGE_FORMAT_VERSION は現在使用している現在のメッセージフォーマットを参照します。以前にメッセージフォーマットを上書きしていない場合は、CURRENT_MESSAGE_FORMAT_VERSION は CURRENT_KAFKA_VERSION に一致しなければなりません。
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.versionを編集し 0.10.0.0 に設定します。しかしまだ log.message.format.versionを変更してはいけません。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。
  5. 一度全て(あるいはほとんど)のコンシューマが 0.11.0 以上にアップグレードされると、各ブローカー上で log.message.format.version を 0.11.0 に変更し、それらを1つずつ再起動します。古いScalaのコンシューマは新しいメッセージフォーマットをサポートしないため、down-conversionのパフォーマンスコストを避ける(あるいは (確実に一回のセマンティクスを利用する)には、新しいJavaコンシューマが使われなければならないことに注意してください。

アップグレードの追加の注意:

  1. ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それらをバックアップします。それらはデフォルトで新しいプロトコルを使って開始するでしょう。
  2. プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。メッセージのフォーマットのバージョンと同じです。
  3. グローバル設定log.message.format.versionを更新する前に、トピック管理ツール(bin/kafka-topics.sh) を使って個々のトピック上で 0.11.0 メッセージフォーマットを有効にすることも可能です。
  4. 0.10.0より前のバージョンからアップグレードしている場合は、0.11.0 に切り替える前に最初にメッセージフォーマットを 0.10.0 に更新する必要はありません。
0.10.2のKafkaストリームアプリケーションをアップグレード
  • ストリームアプリケーションの 0.10.2 から 0.11.0 へのアップグレードはブローカーのアップグレードを必要としません。Kafka ストリーム 0.11.0 アプリケーションは 0.11.0, 0.10.2 および 0.10.1 ブローカーへ接続することができます (ですが、0.10.0 ブローカーに接続することはできません)。
  • 設定内で 独自のkey.serde, value.serde および timestamp.extractor を指定する場合は、これらの設定が非推奨のため、それらの置き換えられた設定パラメータを使うことをお勧めします。
  • 詳細はStreams API changes in 0.11.0 を見てください。
0.10.1のKafkaストリームアプリケーションをアップグレード
  • ストリームアプリケーションの 0.10.1 から 0.11.0 へのアップグレードはブローカーのアップグレードを必要としません。Kafka ストリーム 0.11.0 アプリケーションは 0.11.0, 0.10.2 および 0.10.1 ブローカーへ接続することができます (ですが、0.10.0 ブローカーに接続することはできません)。
  • コードを再コンパイルする必要があります。単にKafkaストリームライブラリの jar ファイルを入れ替えるだけでは動作せず、アプリケーションが壊れるでしょう。
  • 設定内で 独自のkey.serde, value.serde および timestamp.extractor を指定する場合は、これらの設定が非推奨のため、それらの置き換えられた設定パラメータを使うことをお勧めします。
  • TimestampExtractor インタフェースが変更されたため、独自の(つまりユーザ実装の)タイムスタンプ エクストラクタを使う場合は、このコードを更新する必要があるでしょう。
  • StreamsMetric インタフェースが変更されたため、独自のメトリクスを登録する場合、このコードを変更する必要があるでしょう。
  • 詳細は0.11.0でのストリームAPIの変更, 0.10.2でのストリームAPIの変更 を見てください。
0.10.0のKafkaストリームアプリケーションをアップグレード
  • Kafkaストリーム 0.11.0 アプリケーションは 0.11.0, 0.10.2 あるいは 0.10.1 ブローカーへのみ接続することができるため、ストリームアプリケーションの 0.10.0 から 0.11.0 へのアップグレードはブローカーのアップグレードを必要とします。
  • 2,3のAPIの変更があります。後方互換性はありません(参照詳細は0.11.0 でのストリームAPIの変更, 0.10.2でのストリームAPIの変更 および 0.10.1 でのストリームAPIの変更を見てください)。このように、コードを更新し再コンパイルする必要があります。単にKafkaストリームライブラリの jar ファイルを入れ替えるだけでは動作せず、アプリケーションが壊れるでしょう。
  • 0.10.0.xから0.11.0.3への更新は、最初の更新フェーズについて設定upgrade.from="0.10.0"セットを伴う2回のローリング バウンスを必要とします (参照KIP-268)。別のやり方として、オフラインの更新も可能です。
    • ローリングバウンスのためのアプリケーションインスタンスを準備し、新しいバージョン 0.11.0.3 のために設定upgrade.from"0.10.0" に設定するようにしてください。
    • アプリケーションの各インスタンスを一度バウンスします
    • 2回目のローリングバウンスのために新しく配備された 0.11.0.3 アプリケーションインスタンスを準備します; 設定upgrade.modeのための値を削除するようにしてください
    • アップグレードを完了するためにもう一度各アプリケーションのインスタンスをバウンスします
  • 0.10.0.x から 0.11.0.0, 0.11.0.1 あるいは 0.11.0.2 へのアップグレードはオフライン アップグレードを必要とします (ローリング バウンス アップグレードはサポートされません)
    • 全ての古い (0.10.0.x) アプリケーションのインスタンスを停止します
    • コードを更新し、古いコードとjarファイルを新しいコードと新しいjarファイルと交換します
    • 全ての新しい (0.11.0.0 , 0.11.0.1 あるいは 0.11.0.2) アプリケーションのインスタンスを再起動します
0.11.0.3 での主要な変更
  • バージョン 0.10.0.x からのローリング バウンス アップグレードが可能な新しいKafkaストリーム設定パラメータ upgrade.fromが追加されました
  • この新しい設定についての詳細は Kafka ストリーム アップグレード ガイド を見てください。
0.11.0.0 での主要な変更
  • 不透明なリーダー選出は今はデフォルトで無効です。新しいデフォルトは可用性よりも耐久性を好みます。以前の挙動を保持したいユーザはブローカーの設定unclean.leader.election.enabletrueに設定してください。
  • プロデューサの設定block.on.buffer.full, metadata.fetch.timeout.ms および timeout.ms は削除されました。それらはKafka 0.9.0.0 で最初に非推奨になりました。
  • offsets.topic.replication.factor ブローカー設定は、今では自動トピック設定を強制されます。クラスタのサイズがこのリプリケーション要素の要請に合うまで、内部的な自動トピックの生成は GROUP_COORDINATOR_NOT_AVAILABLE エラーで失敗するでしょう。
  • snappyを使ってデータを圧縮する場合、プロデューサとブローカーは圧縮レートを改善するために1KBの代わりに圧縮のスキーマのデフォルトのブロックサイズ (2 x 32 KB) を使うでしょう。大きなブロックサイズで圧縮される場合、小さなブロックサイズで圧縮されたデータより50%大きくなるという報告があります。snappyの場合については、5000のパーティションを持つプロデューサは更に 315 MB の JVM ヒープを必要とするでしょう。
  • 同様に、gzipでデータを圧縮する場合は、プロデューサとブローカーはバッファサイズとして1 KBではなく 8 KBを使うでしょう。gzipのデフォルトは過度に低いです (512 バイト)。
  • ブローカーの設定 max.message.bytes は今はメッセージのバッチの総サイズに適用されます。以前は、設定は圧縮されたメッセージあるいは非圧縮のメッセージに即座に適用されていました。メッセージのバッチは1つのメッセージからのみ成るかもしれません。つまりほとんどの場合において個々のメッセージのサイズの制限はバッチのフォーマットのオーバーヘッドによってのみ削減されます。しかし、フォーマットの交換はわずかな意味があります (詳細は以下 を見てください)。以前はブローカーは(総およびパーティションレベルの取り出しサイズに関係なく)少なくとも1つのメッセージが各取り込みリクエストで返されることが保証されていましたが、同じ挙動が今では1つのメッセージバッチに適用されることにも注意してください。
  • GC ログのローテーションがデフォルトで有効です。詳細は KAFKA-3754 を見てください。
  • 非推奨の RecordMetadata, MetricName および Cluster クラスのコンストラクタが削除されました。
  • ユーザヘッダの読み込みと書き込みアクセスを提供する新しいヘッダインタフェースを使ってユーザヘッダのサポートが追加されました。
  • ProducerRecord と ConsumerRecord はHeaders headers()メソッド呼び出しを使って新しいヘッダを公開します。
  • ExtendedSerializer と ExtendedDeserializer インタフェースがヘッダのシリアライズ化とデシリアライズ化をサポートするために導入されました。設定されたシリアライザおよびデシリアライザが上のクラスではない場合、ヘッダは無視されるでしょう。
  • 新しい設定 group.initial.rebalance.delay.ms が導入されました。この設定は、GroupCoordinator が初期のコンシューマのリバランスで待つ時間をミリ秒で指定します。リバランスは新しいメンバーがグループに加わった時にgroup.initial.rebalance.delay.msの値だけ更に遅延し、max.poll.interval.msの最大まで遅延するでしょう。これのデフォルトの値は3秒です。開発及びテスト時には、テスト実行の時間を遅らせないために、これを0に設定することが望ましいかもしれません。
  • org.apache.kafka.common.Cluster#partitionsForTopic, partitionsForNode および availablePartitionsForTopic メソッドは、必要とされるトピックのメタデータが存在しない場合、null の代わりに空のリストを返すでしょう (これは悪い慣習と見なされます) 。
  • ストリーム API 設定 パラメータ timestamp.extractor, key.serde および value.serde は非推奨で、それぞれ default.timestamp.extractor, default.key.serde および default.value.serde に置き換えられました。
  • Java コンシューマの commitAsync API 内のオフセット コミットの失敗については、RetriableCommitFailedExceptionのインスタンスがコミットのコールバックに返される時にもう潜在的な原因を公開しません。詳細はKAFKA-5052 を見てください。
新しいプロトコルのバージョン
  • KIP-107: FetchRequest v5 はパーティションレベルの log_start_offset フィールドを導入します。
  • KIP-107: FetchResponse v5 はパーティションレベルの log_start_offset フィールドを導入します。
  • KIP-82: ProduceRequest v3 はkey フィールドと value フィールドを含むメッセージプロトコル内のheaderの配列を導入します。
  • KIP-82: FetchResponse v3 はkey フィールドと value フィールドを含むメッセージプロトコル内のheaderの配列を導入します。
確実に1回のセマンティクスについての注意

Kafka 0.11.0 はプロデューサ内の等冪とトランザクションの機能のためサポートを含みます。等冪の配送は1つのプロデューサの寿命の間にメッセージが確実に1回だけ特定のトピックのパーティションに配送されることを保証します。トランザクション的な配送により、プロデューサはデータを複数のパーティションに送信することができ、全てのメッセージの配送が成功するかあるいはどのメッセージも配送されません。これらの機能が合わさって、Kafkaでの"確実に1回のセマンティクス"を有効にします。これらの機能の詳細はユーザガイド内で利用可能ですが、以下にアップグレードされたクラスタ内でそれらを有効にする際の2,3の具体的な注意を追加します:EoSを有効にする必要はなく、使わない場合はブローカーの挙動には影響がないことに注意してください。

  1. 新しいJavaプロデューサとコンシューマのみが確実に1回のセマンティクスをサポートします。
  2. これらの機能は 0.11.0 メッセージ フォーマットに決定的に依存します。古いフォーマットでそれらを使おうとすると、サポートされないバージョンのエラーになるでしょう。
  3. トランザクションの状態は新しい内部的なトピック __transaction_state に格納されます。このトピックは最初にトランザクション的なリクエストAPIを使おうとするまで生成されません。コンシューマのオフセット トピックに似て、トピックの設定を制御する幾つかの設定があります。例えば、transaction.state.log.min.isr はこのトピックのための最小のISRを制御します。オプションの完全なリストについてはユーザガイド内の設定の章を見てください。
  4. 安全なクラスタのために、トランザクション的なAPIは bin/kafka-acls.shを使って有効にすることができる新しいACLを必要とします。tool.
  5. KafkaでのEoSは新しいAPIを導入し、いくつかの既存のものを修正します。完全な詳細はKIP-98 を見てください。
0.11.0での新しいメッセージフォーマットの注意

0.11.0 メッセージフォーマットはプロデューサのためにより良い配送のセマンティクスをサポートするために幾つかの大きな拡張(KIP-98を見てください)と、改善されたリプリケーションの耐障害性(KIP-101を見てください)を含みます。新しいフォーマットはこれらの改善を可能にするためにより多くの情報を含みますが、バッチ形式をもっと効率的にしました。バッチごとのメッセージの数が2以上である限り、全体のオーバーヘッドが低くなることを期待することができます。しかし、より小さなバッチについては小さなパフォーマンスの影響があるかもしれません。新しいメッセージフォーマットの初期のパフォーマンスの解析結果については、ここを見てください。KIP-98 の提案の中でメッセージフォーマットのより詳細について見つけることもできます。

新しいメッセージでの注目すべき違いの一つは、圧縮されていないメッセージでさえも1つのバッチとして一緒に格納されることです。これはブローカーの設定 max.message.bytes に関して少しの意味合いを持ちます。これは1つのバッチのサイズを制限します。最初に、もし古いクライアントが古いフォーマットを使っているトピックのパーティションにメッセージを生成し、メッセージが個々に max.message.bytesより小さい場合、上位変換の処理中にメッセージが1つのバッチにマージされた後でブローカーはまだ拒否するかもしれません。一般的に、個々のメッセージの集約サイズが max.message.bytes より大きい場合にこれが起きるでしょう。新しいフォーマットからダウン-コンバートされたメッセージを読み込む古いコンシューマにとって似たような効果があります: もし取得サイズが少なくともmax.message.bytesと同じ大きさに設定されていない場合、もし個々の圧縮されていないメッセージが設定された取得サイズよりも小さい場合にコンシューマは進めることができないかもしれません。Java クライアント 0.10.1.0 以上に関しては取得サイズを超えても少なくとも1回のメッセージを返すことが保証される更新された取得プロトコルを使うため、この挙動は影響ありません。これらの問題を避けるために、1) プロデューサのバッチサイズが max.message.bytesより大きく設定されていないこと、2) コンシューマの取得サイズが少なくともmax.message.bytesと同じ大きさに設定されていることを確実にする必要があります。

0.10.0 メッセージ フォーマットへのアップグレードのパフォーマンスへの影響のほとんどの議論は 0.11.0 のアップグレードに関係しています。そのような場合に"zero-copy" 転送がすでに可能ではないために、TLSを使って安全にされていないクラスタに主に影響します。ダウン-コンバージョンのコストを避けるために、コンシューマアプリケーションが最新の0.11.0クライアントにアップグレードされることを確実にする必要があります。古いコンシューマが0.11.0.0で非推奨になったために、新しいメッセージ形式をサポートしません。ダウン-コンバージョンのコスト無しで新しいメッセージ形式を使う新しいコンシューマを使うためにアップグレードする必要があります。0.11.0コンシューマは0.10.0以上のブローカーとの後方互換性をサポートするため、ブローカーの前に最初にクライアントをアップグレードすることができることに注意してください。

0.8.x, 0.9.x, 0.10.0.x あるいは 0.10.1.x to 0.10.2.0 からのアップグレード

0.10.2.0 にはwireプロトコルの変更があります。以下で推奨されるrollingアップグレード計画に従うことで、アップグレード中のダウンタイムが無いことを保証します。しかし、アップグレードの前に0.11.2.0 での注目すべき変更を見直してください。

バージョン 0.10.2から、Javaクライアント (プロデューサとコンシューマ)は古いブローカーと通信することができるようになりました。バージョン 0.10.2 のクライアントはバージョン 0.10.0 あるいはそれより新しいブローカーと話すことができます。しかし、ブローカーが 0.10.0 より古い場合は、クライアントをアップグレードする前にKafkaクラスタ内の全てのブローカーをアップグレードしなければなりません。バージョン 0.10.2 のブローカーは 0.8.x とそれより新しいクライアントをサポートします。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties ファイルを更新し、以下のプロパティを追加します:
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.version を編集し 0.10.2 に設定することでプロトコルのバージョンを上げることができます。
  4. 以前のメッセージフォーマットが 0.10.0 の場合、log.message.format.version を 0.10.2 に変更します (メッセージ形式が 0.10.0, 0.10.1 と 0.10.2 で同じなため、これは操作不要です)。以前のメッセージフォーマットのバージョンが 0.10.0 より低い場合、まだ log.message.format.version を変更しないでください - このパラメータは全てのコンシューマが 0.10.0.0 以降に変更された後で変更されるべきです。
  5. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。
  6. もし log.message.format.version がこの時点でまだ 0.10.0 より低い場合は、全てのコンシューマが 0.10.0 以降にアップグレードするまで待ち、各ブローカー上で log.message.format.version を 0.10.2 に変更し、それらを1つずつ再起動します。

注意: ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それら全てを開始します。それらはデフォルトで新しいプロトコルを使って開始するでしょう。

注意: プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。

0.10.1のKafkaストリームアプリケーションをアップグレード
  • ストリームアプリケーションの 0.10.1 から 0.10.2 へのアップグレードはブローカーのアップグレードを必要としません。Kafka ストリーム 0.10.2 アプリケーションは 0.10.2 と 0.10.1 のブローカーにアクセスすることができます (しかし 0.10.0 ブローカーへ接続することはできません)。
  • コードを再コンパイルする必要があります。単にKafkaストリームライブラリの jar ファイルを入れ替えるだけでは動作せず、アプリケーションが壊れるでしょう。
  • TimestampExtractor インタフェースが変更されたため、独自の(つまりユーザ実装の)タイムスタンプ エクストラクタを使う場合は、このコードを更新する必要があるでしょう。
  • StreamsMetric インタフェースが変更されたため、独自のメトリクスを登録する場合、このコードを変更する必要があるでしょう。
  • 詳細は 0.10.2でのストリーム APIの変更 を見てください
0.10.0のKafkaストリームアプリケーションをアップグレード
  • Kafkaストリーム 0.10.2 アプリケーションは 0.10.2 あるいは 0.10.1 ブローカーへのみ接続することができるため、ストリームアプリケーションの 0.10.0 から 0.10.2 へのアップグレードはブローカーのアップグレードを必要とします。
  • 2,3のAPIの変更があります。後方互換性はありません(参照詳細は0.10.2 でのストリーム APIの変更)。このように、コードを更新し再コンパイルする必要があります。単にKafkaストリームライブラリの jar ファイルを入れ替えるだけでは動作せず、アプリケーションが壊れるでしょう。
  • 0.10.0.xから0.10.2.2 への更新は、最初の更新フェーズについて設定upgrade.from="0.10.0"セットを伴う2回のローリング バウンスを必要とします (参照KIP-268)。別のやり方として、オフラインの更新も可能です。
    • ローリングバウンスのためのアプリケーションインスタンスを準備し、新しいバージョン 0.10.2.2 のために設定upgrade.from"0.10.0" に設定するようにしてください。
    • アプリケーションの各インスタンスを一度バウンスします
    • 2回目のローリングバウンスのために新しく配備された 0.10.2.2 アプリケーションインスタンスを準備します; 設定upgrade.modeのための値を削除するようにしてください
    • アップグレードを完了するためにもう一度各アプリケーションのインスタンスをバウンスします
  • 0.10.0.x から 0.10.2.0 あるいは 0.10.2.1 へのアップグレードはオフライン アップグレードを必要とします (ローリング バウンス アップグレードはサポートされません)
    • 全ての古い (0.10.0.x) アプリケーションのインスタンスを停止します
    • コードを更新し、古いコードとjarファイルを新しいコードと新しいjarファイルと交換します
    • 全ての新しい (0.10.2.0 , 0.10.2.1) アプリケーションのインスタンスを再起動します
0.10.2.2 での主要な変更
  • バージョン 0.10.0.x からのローリング バウンス アップグレードが可能な新しい設定パラメータ upgrade.fromが追加されました
0.10.2.1 での主要な変更
  • StreamConfigクラスの2つの設定のデフォルト値はKafkaストリームアプリケーションの resiliency を改善するために変更されました。内部的なKafkaストリームのプロデューサのretriesのデフォルト値は0から10に変更されました。内部的なKafkaストリームのコンシューマのmax.poll.interval.ms のデフォルト値は 300000 から Integer.MAX_VALUE に変更されました。
0.10.2.0 での主要な変更
  • Javaクライアント(プロデューサとコンシューマ)は古いブローカーと通信することができるようになりました。バージョン 0.10.2 のクライアントはバージョン 0.10.0 あるいはそれより新しいブローカーと話すことができます。古いブローカーが使われた時に、幾つかの機能は利用不可あるいは制限されることに注意してください。
  • Javaコンシューマの幾つかのメソッドは、呼び出し中のスレッドが中断された時に今ではInterruptExceptionを投げます。この変更の掘り下げた説明については、KafkaConsumer Javadoc を参照してください。
  • Java コンシューマは今ではgracefullyにシャットダウンします。デフォルトでは、コンシューマは延期しているリクエストを完了するために30秒まで待ちます。タイムアウトを持つ新しいclose APIが最大待ち時間を制御するためにKafkaConsumerに追加されました。
  • --whitelist オプションを使って新しいJavaコンシューマを持つMirrorMakerへ、カンマで分割された複数の正規表現を渡すことができます。これにより古いScalaコンシューマが使われた時にMirrorMakerと一貫性を持つ挙動をします。
  • ストリームアプリケーションの 0.10.1 から 0.10.2 へのアップグレードはブローカーのアップグレードを必要としません。Kafka ストリーム 0.10.2 アプリケーションは 0.10.2 と 0.10.1 のブローカーにアクセスすることができます (しかし 0.10.0 ブローカーへ接続することはできません)。
  • Stream APIからZookeeperの依存が削除されました。ストリームAPIは内部的なトピックを管理するために今ではZookeeperを直接修正せずにKafkaプロトコルを使います。これによりZookeeperに直接アクセスする権限が必要無くなり、もうストリームのアプリケーションの中で "StreamsConfig.ZOOKEEPER_CONFIG" を設定する必要がありません。Kafkaクラスタが安全な場合、ストリームアプリケーションは新しいトピックを作成するためのセキュリティ権限を持つ必要があります。
  • "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" および "request.timeout.ms" を含む幾つかの新しいフィールドがStreamsConfig クラスに追加されました。ユーザはそれらのデフォルトの値に注意を払い、必要に応じて設定する必要があります。詳細については、3.5 Kafka ストリームの設定を参照してください。
新しいプロトコルのバージョン
  • KIP-88: topics 配列が nullに設定された場合、OffsetFetchRequest v2 は全てのトピックについてのオフセットの扱いをサポートします。
  • KIP-88: OffsetFetchResponse v2 は トップレベルの error_code フィールドを導入します。
  • KIP-103: UpdateMetadataRequest v3 はend_points配列の要素への listener_name フィールドを導入します。
  • KIP-108: CreateTopicsRequest v1 はvalidate_onlyフィールドを導入します。
  • KIP-108: CreateTopicsResponse v1 はtopic_errors配列の要素へのerror_message フィールドを導入します。

0.8.x, 0.9.x あるいは 0.10.0.X から 0.10.1.0 へのアップグレード

0.10.1.0 にはwireプロトコルの変更があります。以下で推奨されるrollingアップグレード計画に従うことで、アップグレード中のダウンタイムが無いことを保証します。しかし、アップグレードする前に0.10.1.0 での潜在的な破壊的な変更に注意してください。
注意: 新しいプロトコルが導入されたため、クライアントをアップグレードする前にKafkaクラスタをアップグレードすることが重要です (つまり、0.10.1.xクライアントは 0.10.1.x以降のブローカーをサポートし、一方で 0.10.1.x ブローカーも古いクライアントをサポートします)。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties ファイルを更新し、以下のプロパティを追加します:
  2. 一度にブローカーをアップグレードします: ブローカーをシャットダウンし、コードを更新し、再起動します。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.version を編集し 0.10.1.0 に設定することでプロトコルのバージョンを上げることができます。
  4. 以前のメッセージフォーマットが 0.10.0 の場合、log.message.format.version を 0.10.1 に変更します (メッセージ形式が 0.10.0 と 0.10.1 で同じなため、これは操作不要です)。以前のメッセージフォーマットのバージョンが 0.10.0 より低い場合、まだ log.message.format.version を変更しないでください - このパラメータは全てのコンシューマが 0.10.0.0 以降に変更された後で変更されるべきです。
  5. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。
  6. もし log.message.format.version がこの時点でまだ 0.10.0 より低い場合は、全てのコンシューマが 0.10.0 以降にアップグレードするまで待ち、各ブローカー上で log.message.format.version を 0.10.1 に変更し、それらを1つずつ再起動します。

注意: ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それら全てを開始します。それらはデフォルトで新しいプロトコルを使って開始するでしょう。

注意: プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。

0.10.1.0での破壊的な変更の可能性
  • ログの保持期間がもうログセグメントの最終更新時間に基づかなくなりました。その代わりにログセグメント内のメッセージの一番大きいタイムスタンプに基づくでしょう。
  • ログのrolling時間がもうログセグメントの生成時間に依存しなくなりました。その代わりに今ではメッセージ内のタイムスタンプに基づきます。もっと具体的に。セグメント内の最初のメッセージのタイムスタンプが T の時、新しいメッセージが T + log.roll.ms より大きいか等しいタイムスタンプを持つ時にログはロールアウトされるでしょう。
  • 0.10.0 のオープン ファイル ハンドラは各セグメントごとの追加の時間インデックスファイルのために ~33%だけ増加するでしょう。
  • 時間インデックスとオフセットインデックスは同じインデックスサイズ設定を共有します。それぞれの時間インデックスのエントリはオフセットのインデックスエントリのサイズの1.5x だからです。潜在的なログのローリングを避けるために、ユーザは log.index.size.max.bytes を増やす必要があるかもしれません。
  • 大きなログセグメント (例えば、>15K)を持つブローカーでは、インデックスファイル数の増加のために、ブローカーの起動時のログのロードプロセスが長くなるかもしれません。私たちの実験によると、num.recovery.threads.per.data.dir を1に設定するとログのロード時間が減るかもしれません。
0.10.0のKafkaストリームアプリケーションをアップグレード
  • Kafkaストリーム 0.10.1 アプリケーションは 0.10.1 ブローカーへのみ接続することができるため、ストリームアプリケーションの 0.10.0 から 0.10.1 へのアップグレードはブローカーのアップグレードを必要としません。
  • 2,3のAPIの変更があります。後方互換性はありません(参照詳細は 0.10.1でのストリーム APIの変更 を見てくださいこのように、コードを更新し再コンパイルする必要があります。単にKafkaストリームライブラリの jar ファイルを入れ替えるだけでは動作せず、アプリケーションが壊れるでしょう。
  • 0.10.0.xから0.10.1.2 への更新は、最初の更新フェーズについて設定upgrade.from="0.10.0"セットを伴う2回のローリング バウンスを必要とします (参照KIP-268)。別のやり方として、オフラインの更新も可能です。
    • ローリングバウンスのためのアプリケーションインスタンスを準備し、新しいバージョン 0.10.1.2 のために設定upgrade.from"0.10.0" に設定するようにしてください。
    • アプリケーションの各インスタンスを一度バウンスします
    • 2回目のローリングバウンスのために新しく配備された 0.10.1.2 アプリケーションインスタンスを準備します; 設定upgrade.modeのための値を削除するようにしてください
    • アップグレードを完了するためにもう一度各アプリケーションのインスタンスをバウンスします
  • 0.10.0.x から 0.10.1.0 あるいは 0.10.1.1 へのアップグレードはオフライン アップグレードを必要とします (ローリング バウンス アップグレードはサポートされません)
    • 全ての古い (0.10.0.x) アプリケーションのインスタンスを停止します
    • コードを更新し、古いコードとjarファイルを新しいコードと新しいjarファイルと交換します
    • 全ての新しい (0.10.1.0 , 0.10.1.1) アプリケーションのインスタンスを再起動します
0.10.1.0 での主要な変更
  • 新しいJavaコンシューマはもうベータではありません。全ての新しい開発でそれをお勧めします。古いScalaコンシューマはまだサポートされますが、それらは次のリリースで非推奨になり、将来の大きなリリースで削除されるでしょう。
  • --new-consumer/--new.consumer スイッチはもうMirrorMakerのようなツールと新しいコンシューマを使うコンソール コンシューマを使う必要はありません; one simply needs to pass a Kafka broker to connect to instead of the ZooKeeper ensemble. 更に、古いコンシューマを持つコンソール コンシューマの使用は非推奨になり、将来の大きなリリースで削除されるでしょう。
  • Kafka クラスタは今ではクラスタidによって一意に識別することができます。それはブローカーが 0.10.1.0 にアップグレードされる時に自動的に生成されます。クラスタidは kafka.server:type=KafkaServer,name=ClusterId メトリックを使って利用可能で、メタデータの応答の一部です。シリアライザ、クライアント インタセプタ およびメトリック レポータは ClusterResourceListener インタフェースを実装することでクラスタidを受け取ることができます。
  • BrokerState "RunningAsController" (value 4) が削除されました。バグにより、ブローカーはそれから遷移する前にほんの少しだけこの状態になるでしょう。従って削除の影響は最小限に違いありません。指定されたブローカーがコントローラかどうかを検知するお勧めの方法は、kafka.controller:type=KafkaController,name=ActiveControllerCount メトリックを使うことです。
  • 新しいJavaコンシューマにより今ではユーザはパーティション上のタイムスタンプを使ってオフセットを検索することができます。
  • 今では新しいJavaコンシューマはバックグランドのスレッドからのハートビートをサポートします。コンシューマがグループから積極的に離れる前のpollの起動間の最大時間を制御する新しい設定max.poll.interval.msがあります (デフォルトでは5分です)。設定 request.timeout.ms の値は常に max.poll.interval.ms より大きくなければなりません。なぜなら、これはコンシューマがリバランスする間に JoinGroup リクエストがサーバ上でブロックすることができる最大時間だからです。そのためそのデフォルトの値をちょうど5分を超えるように変更しました。結果的に、session.timeout.ms のデフォルト値は10秒に調整され、max.poll.records のデフォルト値は 500 に変更されました。
  • Authorizer を使用し、ユーザがトピック上に Describe authorization を持たない場合、ブローカーはトピック名を取り零すため、ブローカーはもうTOPIC_AUTHORIZATION_FAILED エラーを返さないでしょう。代わりに、UNKNOWN_TOPIC_OR_PARTITION エラーコードが返されるでしょう。Kafkaクライアントは一般的に自動的に不明なトピックエラーを再試行するため、プロデューサとコンシューマを使う時にこれは予期しないタイムアウトあるいは遅延を起こすかもしれません。これが起きたかも知れないと疑う場合は、クライアントログを調査すべきです。
  • 取得応答はデフォルトでサイズの制限があります (コンシューマについては 50MB、リプリケーションについては 10MB)。既存のパーティションあたりの制限も適用されます (コンシューマとリプリケーションについて 1MB)。次の項目で説明されるようにこれらの制限のどちらも絶対的な最大ではないことに注意してください。
  • 応答/パーティションサイズの制限より大きいメッセージが見つかった場合は、コンシューマとレプリカは先に進むかもしれません。もっと具体的には、もし取得の最初の空では無いパーティション内の最初のメッセージがどちらかあるいは両方の制限より大きい場合、メッセージはまだ返されるでしょう。
  • 呼び出し元がパーティションの順番を指定できるように、上書きされたコンストラクタがkafka.api.FetchRequestkafka.javaapi.FetchRequest に追加されました (v3では順番が重要なため)。以前の既存のコンストラクタは非推奨で、枯渇問題を避けるためにリクエストが送信される前にパーティションがシャッフルされます。
新しいプロトコルのバージョン
  • ListOffsetRequest v1 はタイムスタンプに基づいた正確なオフセットの検索をサポートします。
  • MetadataResponse v2 は新しいフィールドを導入します: "cluster_id".
  • FetchRequest v3 は(既存のパーティションごとの制限に加えて)応答サイズの制限をサポートします。もし進める必要があり、リクエスト内のパーティションの順番が重要であれば、制限より大きいメッセージを返します。
  • JoinGroup v1 は新しいフィールドを導入します: "rebalance_timeout"。

0.8.x あるいは 0.9.x から 0.10.0.0 へのアップグレード

0.10.0.0 は 潜在的に破壊的な変更 (アップグレードする前に再調査してください)があり、アップグレードに伴うパフォーマンスの問題があるかもしれません。以下で推奨されるrollingアップグレード計画に従うことで、アップグレード中およびそれに伴うダウンタイムとパフォーマンスの問題が無いことを保証します。
注意: 新しいプロトコルが導入されたため、クライアントをアップグレードする前にKafkaクラスタをアップグレードすることが重要です。

バージョン 0.9.0.0 のクライアントの注意: 0.9.0.0 で導入されたバグにより、ZooKeeperに依存するクライアント (古いScalaのハイレベルのコンシューマとMirrorMakerが古いコンシューマと一緒に使われた場合)は 0.10.0.x のブローカーと連携しないでしょう。従って 0.9.0.0 クライアントはブローカーが 0.10.0.x にアップグレードされる前に 0.9.0.1 にアップグレードされる必要があります。このステップは、0.8.X あるいは 0.9.0.1 クライアントの場合には必要ありません。

ローリングアップグレードに関して:

  1. 全てのブローカーの server.properties ファイルを更新し、以下のプロパティを追加します:
  2. ブローカーのアップグレード。単純にブローカーをダウンし、コードを更新し、再起動することで一度に行うことができます。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.version を編集し 0.10.0.0 に設定することでプロトコルのバージョンを上げることができます。注意: まだ log.message.format.version に触れるべきではありません - このパラメータはいったん全てのコンシューマが 0.10.0.0 にアップグレードされてからのみ変更されるべきです。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。
  5. 一度全てのコンシューマが 0.10.0 にアップグレードされると、各ブローカー上で log.message.format.version を 0.11.0 に変更し、それらを1つずつ再起動します。

注意: ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それら全てを開始します。それらはデフォルトで新しいプロトコルを使って開始するでしょう。

注意: プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。

0.10.0.0 へのアップグレードに伴いパフォーマンスの影響の可能性があります。

0.10.0 でのメッセージフォーマットは新しいタイムスタンプのフィールドを含み、圧縮されたメッセージのための相対的なオフセットを使います。ディスク上のメッセージフォーマットは server.properties ファイル内の log.message.format.version を使って設定することができます。デフォルトのディスク上のメッセージフォーマットは 0.10.0 です。0.10.0.0より前のコンシューマクライアントがいる場合は、0.10.0より前のメッセージフォーマットのみを理解します。この場合、ブローカーは古いバージョン上のコンシューマへメッセージを送信する前に、メッセージを0.10.0のフォーマットから以前のフォーマットに変換することができます。しかし、ブローカーはこの場合 ゼロコピー転送を使うことができません。パフォーマンス問題についてのKafkaコミュニティからの報告ではCPUの利用率がアップグレードの後で以前の20%から100%になることを示しています。これはパフォーマンスを通常に戻すために全てのクライアントの即座のアップグレードを強制します。コンシューマが0.10.0.0にアップグレードされる前にそのようなメッセージの変換を避けるために、ブローカーを0.10.0.0にアップグレードする時にlog.message.format.versionを0.8.0あるいは0.9.0に設定することができます。このようにブローカーはデータを古いコンシューマに送信するためにまだzero-copyを使うことができます。コンシューマがアップグレードされると、ブローカー上でメッセージフォーマットを0.10.0に変更し、新しいタイムスタンプと改善された圧縮を含む新しいメッセージ形式を楽しむことができます。互換性を保証するために変換がサポートされ、まだ新しいクライアントに更新されていない2,3のアプリケーションをサポートするのに便利かもしれませんが、過剰なクラスタ上でさえ全てのコンシューマの通信量をサポートすることは現実性がありません。従って、ブローカーがアップグレードされたがクライアントの大半がまだの場合は、できるだけメッセージの変換を避けることが重要です。

0.10.0.0にアップグレードされたクライアントについては、パフォーマンスの影響はありません。

注意: メッセージフォーマットのバージョンを設定することで、既存の全てのメッセージはそのメッセージフォーマットのバージョン以下であると保証します。そうでなければ、0.10.0.0より前のコンシューマが壊れるかも知れません。特に、メッセージフォーマットが0.10.0に設定された後で、0.10.0.0より前のバージョンのコンシューマを壊すかも知れないので以前のフォーマットに戻す変更をするべきではありません。

注意: 各メッセージ内に導入された追加のタイムスタンプによって、小さなメッセージを送信しているプロデューサはオーバーヘッドの増加によりメッセージのスループットの減少を経験するかもしれません。さらに、リプリケーションは今度はメッセージごとに追加の8バイトを転送します。もしクラスタのネットワーク許容量すれすれで実行している場合は、ネットワークカードを圧倒し過負荷による障害とパフォーマンスの問題に遭遇するでしょう。

注意: もしプロデューサ上で圧縮を有効にすると、場合によってはプロデューサのスループットの減少とブローカー上の圧縮レートの低下に気づくかもしれません。圧縮されたメッセージを受け取る場合、0.10.0ブローカーはメッセージの圧縮を避けます。この事は一般的にレイテンシーを下げ、スループットを改善します。しかし、特定の場合において、これはプロデューサ上のバッチのサイズを削減するかも知れません。このことはスループットの改悪につながるかも知れません。これが起きた場合は、ユーザはより良いスループットのためにプロデューサのlinger.msとbatch.sizeを調整することができます。さらに、snappyを使ってメッセージを圧縮するために使われるプロデューサのバッファは、ブローカーによって使われるものよりも小さくなります。これはディスク上のメッセージの圧縮レートへの負の衝撃を持つかも知れません。将来のKafkaリリースではこれを設定可能にするつもりです。

0.10.0.0での破壊的な変更の可能性
  • Kafka 0.10.0.0から、KafkaでのメッセージフォーマットのバージョンはKafkaのバージョンとして表されています。例えば、メッセージフォーマット 0.9.0 はKafka 0.9.0 によってサポートされるもっとも高いメッセージのバージョンを参照します。
  • メッセージフォーマット 0.10.0 が導入され、デフォルトで使われています。メッセージ内にタイムスタンプのフィールドが含まれ、圧縮されたメッセージのために相対的なオフセットが使われます。
  • ProduceRequest/Response v2 が導入され、デフォルトでメッセージフォーマット 0.10.0 をサポートするために使われます。
  • FetchRequest/Response v2 が導入され、デフォルトでメッセージフォーマット 0.10.0 をサポートするために使われます。
  • MessageFormatter インタフェースがdef writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) から def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)に変更されました。
  • MessageReader インタフェースがdef readMessage(): KeyedMessage[Array[Byte], Array[Byte]] から def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]に変更されました。
  • MessageFormatter のパッケージはkafka.tools からkafka.commonに変更されました。
  • MessageReaderのパッケージがkafka.tools からkafka.commonに変更されました。
  • handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])は呼ばれないため、MirrorMakerMessageHandler はもうそれを公開しません。
  • 0.7 KafkaMigrationTool はもうKafkaと一緒にパッケージされません。0.7 から 0.10.0 に移設する場合は、最初に 0.8 に移設し、それから説明されるアップグレード手順に従って 0.8 から 0.10.0 にアップグレードしてください。
  • 新しいコンシューマはAPIをメソッドパラメータのための型の連続としてjava.util.Collectionを受け付けるように標準化しました。既存のコードは0.10.0クライアントライブラリと連携するように更新しなければならないかも知れません。
  • LZ4-compressed メッセージ処理は共同利用できる構成仕様(LZ4f v1.5.1)を使うように変更されました。古いクライアントと互換性を維持するために、この変更はメッセージフォーマット 0.10.0 以降にのみ適用されます。v0/v1 (メッセージフォーマット 0.9.0)を使って LZ4-compressed メッセージを生成/取得するクライアントは、0.9.0 の構成実装を使い続ける必要があります。プロトコル v2 以降を生成/取得するクライアントは、共同利用できる LZ4f 構成を使う必要があります。共同利用できる LZ4 ライブラリのリストは http://www.lz4.org/ で利用可能です。
0.10.0.0 での主要な変更
  • Kafka 0.10.0.0 から、Kafka Streamsという名前の新しいクライアントライブラリがKafkaトピックの中で格納されるデータ上でストリーミング処理をするために利用可能です。この新しいクライアントライブラリは上で述べたメッセージフォーマットの変更のために0.10.x以上のバージョンのブローカーとのみ連携します。詳しい情報はストリーム ドキュメントを読んでください。
  • 設定パラメータreceive.buffer.bytes のデフォルト値は、新しいコンシューマのために今では 64K です。
  • 新しいコンシューマは内部トピックが正規表現の申し込み内で偶然含まれることを避けるために新しいパラメータ exclude.internal.topics を公開します。デフォルトで有効です。
  • 古いScalaプロデューサは非推奨になりました。ユーザはそれらのコードをできる限りKafka-client JARの中に含まれるJavaプロデューサに移設するべきです。
  • 新しいコンシューマAPIは安定版として印が付けられました。

0.8.0, 0.8.1.X あるいは 0.8.2.X から 0.9.0.0 へのアップグレード

0.9.0.0 は破壊的な変更の可能性 (アップグレードする前に精査してください)があり、以前のバージョンからのブローカー間のプロトコルの変更があります。このことはアップグレードされたブローカーとクライアントは古いバージョンと互換性が無いかも知れないことを意味します。クライアントをアップグレードする前にKafkaクラスタをアップグレードすることが重要です。MirrorMaker ダウンストリームを使っている場合は、クラスタも同様に最初にアップグレードされるべきです。

ローリングアップグレードに関して:

  1. 全てのブローカー上の server.properties ファイルを更新し、以下のプロパティを追加します : inter.broker.protocol.version=0.8.2.X
  2. ブローカーのアップグレード。単純にブローカーをダウンし、コードを更新し、再起動することで一度に行うことができます。
  3. クラスタ全体がアップグレードされると、inter.broker.protocol.version を編集し 0.9.0.0 に設定することでプロトコルのバージョンを上げることができます。
  4. 新しいプロトコルのバージョンが効果を現すようにブローカーを1つずつ再起動します。

注意: ダウンタイムを喜んで許容する場合は、単純に全てのブローカーをダウンし、コードを更新し、それら全てを開始します。それらはデフォルトで新しいプロトコルを使って開始するでしょう。

注意: プロトコルのバージョンアップと再起動はブローカーがアップグレードされた後でいつでも行うことができます。すぐにしなければならないことはありません。

0.9.0.0での破壊的な変更の可能性
  • Java 1.6 はもうサポートされません。
  • Scala 2.9 はもうサポートされません。
  • 1000以上のブローカー ID はデフォルトで自動的にブローカーIDに割り当てられるように予約されています。その閾値より大きな既存のブローカーIDを持つクラスタの場合、reserved.broker.max.id ブローカー設定プロパティを適宜増加するようにしてください。
  • 設定パラメータ replica.lag.max.messages は削除されました。パーティション リーダーはどのレプリカを同期するかを決める時に遅れたメッセージの数をもう考慮しません。
  • 設定パラメータ replica.lag.time.max.ms はレプリカからの最後の取得リクエストからの経過時間を参照するだけでなく、レプリカが最後に追いついてからの時間も参照します。リーダーからメッセージを取得しているが replica.lag.time.max.ms 内の最新のメッセージに追いついていないレプリカは同期していないと見なされるでしょう。
  • コンパクト化されたトピックはキー無しのメッセージをもう受け付けず、それが試みられた場合はプロデューサによって例外が投げられます。0.8.xでは、キーを持たないメッセージはコンパクションスレッドが結果的に文句を言い終了(そして全てのコンパクト化されたトピックのコンパクト化を停止)するでしょう。
  • MirrorMaker は複数のターゲットクラスタをもうサポートしません。結果として、1つの --consumer.config パラメータのみを受け付けるでしょう。複数のソースクラスタをミラーするためには、ソースクラスタあたり少なくとも1つのMirrorMakerを必要とするでしょう。それぞれは独自のコンシューマ設定を持ちます。
  • org.apache.kafka.clients.tools.*の下にパッケージ化されたツールはorg.apache.kafka.tools.*に移動されました。含まれていた全てのスクリプトはまだ通常通り動作し、これらのクラスを直接インポートしている独自のコードのみ影響を受けるでしょう。
  • デフォルトのKafka JVMパフォーマンス オプション (KAFKA_JVM_PERFORMANCE_OPTS) は kafka-run-class.sh の中で変更されました。
  • kafka-topics.sh スクリプト (kafka.admin.TopicCommand) は、今は障害時に非ゼロの終了コードで終了します。
  • kafka-topics.sh スクリプト (kafka.admin.TopicCommand) は今ではトピック名の '.' あるいは '_' 使用によってトピック名がメトリックの衝突を起こす危険がある時に警告を出力し、実際の衝突の場合にはエラーを出力するでしょう。
  • kafka-console-producer.sh スクリプト (kafka.tools.ConsoleProducer) は古いScalaプロデューサをデフォルトとする代わりにJavaプロデューサを使うでしょう。ユーザは古いプロデューサを使うためには 'old-producer' を指定する必要があります。
  • デフォルトで全てのコマンドラインツールは全てのログメッセージをstdoutの代わりにstderrに出力するでしょう。
0.9.0.1 での主要な変更
  • 新しいブローカーidの生成機能は broker.id.generation.enable を false に設定することで無効にすることができます。
  • 設定パラメータ log.cleaner.enable は今はデフォルトでtrueです。このことは cleanup.policy=compact のトピックが今ではデフォルトで圧縮されず、ヒープの128 MBが log.cleaner.dedupe.buffer.size を使ってクリナープロセスに割り当てられるだろうことを意味します。log.cleaner.dedupe.buffer.size と他の log.cleaner 設定値を圧縮されたトピックの使用に基づいて再調査したいかもしれません。
  • 新しいコンシューマのための 設定パラメータ fetch.min.bytes は今ではデフォルトで 1です。
0.9.0.0 での非推奨
  • kafka-topics.sh スクリプト (kafka.admin.TopicCommand) からのトピックの設定の変更は非推奨になりました。進めるには、この機能のために kafka-configs.sh スクリプト (kafka.admin.ConfigCommand) を使ってください。
  • kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) は非推奨になりました。進めるには、この機能のために kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) を使ってください。
  • kafka.tools.ProducerPerformance クラスは非推奨になりました。進めるには、この機能のために org.apache.kafka.tools.ProducerPerformance を使ってください (kafka-producer-perf-test.sh も新しいクラスを使うように変更されるでしょう)。
  • プロデューサ設定 block.on.buffer.full は非推奨になり、将来のリリースでは削除されるでしょう。現在のところ、デフォルト値はfalseに変更されました。KafkaProducer はもう BufferExhaustedException を投げないでしょうが、代わりにブロックするために max.block.ms 値を使うでしょう。それは後で TimeoutException を投げます。block.on.buffer.full プロパティが明示的に true に設定された場合、max.block.ms が Long.MAX_VALUE に設定され、metadata.fetch.timeout.ms は設定されないでしょう。

0.8.1 から 0.8.2 へのアップグレード

0.8.2 は 0.8.1 と完全に互換性があります。単純に1つのブローカーをダウンし、コードを更新し、再起動することで一度にアップグレードを行うことができます。

0.8.0 から 0.8.1 へのアップグレード

0.8.1 は 0.8 と完全に互換性があります。単純に1つのブローカーをダウンし、コードを更新し、再起動することで一度にアップグレードを行うことができます。

0.7からのアップグレード

リリース 0.7 は新しいリリースと互換性がありません。主要な変更はAPI, ZooKeeper データ構造およびプロトコルに行われ、リプリケーションを追加するために設定に行われました (これは 0.7で無くなりました)。0.7から最新のバージョンへのアップグレードは移行のために 特別なツール を必要とします。この移行はダウンタイム無しで行うことができます。
TOP
inserted by FC2 system