クイックススタート
bin/
の代わりに、スクリプトの拡張子を.bat
に変更します。
ステップ 1: コードのダウンロード
{{fullDotVersion}} リリースをダウンロードし、それをun-tarします。> tar -xzf kafka_2.11-{{fullDotVersion}}.tgz > cd kafka_2.11-{{fullDotVersion}}
ステップ 2: サーバの開始
Kafka は ZooKeeperを使います。そのためまだZooKeeperが無い場合はそれを最初に開始する必要があります。quick-and-dirty single-node ZooKeeper インスタンスを取得するためにKafkaにパッケージ化されている便利なスクリプトを使うことができます。
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
これでKafkaサーバを開始します:
> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
ステップ 3: トピックの作成
1つのパーティションと1つのレプリカだけを持つ"test"という名前のトピックを作成しましょう:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
これで list トピックコマンドを実行するとトピックを見ることができます:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
別のやり方として、手動でトピックを作成する代わりに、既存に無いトピックが発行された場合にブローカーがトピックを自動生成するように設定することもできます。
ステップ 4: メッセージを送信
Kafka はファイルあるいは標準入力から入力を受け取りそれをメッセージとしてKafkaクラスタに送信するコマンドライン クライアントが付属しています。デフォルトで各行は別個のメッセージとして送信されるでしょう。
プロデューサを実行し、その後サーバに送信するために2,3のメッセージをコンソールに入力します。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
ステップ 5: コンシューマの開始
Kafka はメッセージを標準出力に出力するコマンドラインコンシューマも持ちます。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
上のコマンドのそれぞれを異なるターミナル内で実行する場合、プロデューサターミナル内にメッセージを入力することができ、それらがコンシューマターミナル内に現れるのを見るでしょう。
コマンドラインツールの全ては追加のオプションを持ちます; 引数無しのコマンドの実行は使い方のもっと詳しい説明を表示するでしょう。
ステップ 6: 複数のブローカークラスタのセットアップ
これまでのところ1つのブローカーを実行してきましたが、それは楽しくありません。Kafkaにとっては1つのブローカーはサイズが1のクラスタなので、ある程度のブローカーインスタンスを開始する以外の何も違いはありません。しかし、それを感じるために、クラスタを3つのノードに拡張してみましょう (まだ全てがローカルのマシーン上にあります)。
まず、各ブローカーのための設定を作ります (Windows上では代わりにcopy
コマンドを使います):
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
今度はこれらの新しいファイルを編集し以下のプロパティを設定します:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
broker.id
プロパティは、クラスタ内の各ノードのユニークで恒久的な名前です。同じマシーン上でこれら全てを実行していて、同じポート上に登録しようとする全てのブローカーを維持し、他のデータのそれぞれを上書きたいため、ポートとログディレクトリのみを上書く必要があります。
すでにZookeeperがあり1つのノードを開始しているため、二つの新しいノードを開始する必要があります:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
今度は3つのリプリケーション ファクターを持つ新しいトピックを作成します:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
これで良いですが、今はクラスタがあるのでどうやってどのブローカーが何をしているのかを知ることができますか?それを見るには、"describe topics" コマンドを実行します:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
ここからが出力の解説です。最初の行は全てのパーティションの概要を与え、各追加の行は1つのパーティションの情報を与えます。このトピックについて1つのパーティションのみを持つため、1つの行しかありません。
- "header"は指定されたパーティションについて全ての読み書きに責任があるノードです。各ノードはランダムに選択されたパーティションの一部分についてのリーダーでしょう。
- "replicas"はこのパーティションに関して、それらがリーダーかどうか、あるいは現在活動状態かどうかさえも関係なく、ログをリプリケートするノードのリストです。
- "isr"は"in-sync"リプリカのセットです。これは現在活動中でリーダーに追いついてきているレプリカリストの部分集合です。
例では、ノード1がトピックの唯一のパーティションのためのリーダーであることに注意してください。
どこにあるかを見るために、生成した元のトピック上で同じコマンドを実行することができます:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
何も驚くことはありません — 元のトピックはレプリカを持たず、server 0、それを生成した時にはクラスタ内での唯一のサーバ、にあります。
新しいトピックに2,3のメッセージを発行してみましょう:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
今度はこれらのメッセージを消費してみましょう:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
今度は耐障害性を試してみましょう。ブローカー1はリーダーとして振る舞っていました。そういうわけでそれをkillしましょう:
> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564Windowsでの使い方:
> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid ProcessId 6016 > taskkill /pid 6016 /f
リーダーシップはスレーブの一つに切り替わり、ノード1はin-syncレプリカセットの中にはもういません:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
しかし、もともと書き込んだリーダーがダウンしたにも関わらず、メッセージはまだ消費可能です:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
ステップ 7: データをインポート/エクスポートするためにKafka Connectを使用
コンソールからデータを書き込み、コンソールに書き込み返すのは始めるのに具合が良いですが、おそらく他のソースからのデータを使うかKafkaから他のシステムへデータをエクスポートしたいでしょう。多くのシステムについて、データをインポートあるいはエクスポートするために独自の統合コードを書く代わりにKafka Connectを使うことができます。
Kafka Connect はKafkaにデータをインポートあるいはエクスポートするKafkaに含まれているツールです。connectorsを実行する拡張ツールで、外部システムと相互作用するための独自のロジックを実装します。このクイックスタートの中で、ファイルからKafkaトピックへデータをインポートしKafkaトピックからファイルへデータをエクスポートする単純なコネクタを使ってKafka Connectを実行する方法を見ます。
まず、テストするためのいくつの種のデータを作成することで始めます:
> echo -e "foo\nbar" > test.txtあるいは Windows では:
> echo foo> test.txt > echo bar>> test.txt
次に、スタンドアローン モードで実行中の二つのコネクタを開始します。これは1つのローカルの専用のプロセス内で実行することを意味します。パラメータとして3つの設定ファイルを与えます。一番最初は常にKafkaコネクト処理のための設定で、Kafkaブローカーの接続先およびデータのシリアライズ形式のような一般的な設定を含みます。残りの設定ファイルはそれぞれ生成するコネクタを指定します。これらのファイルはユニークなコネクタ名、インスタンス化するコネクタクラス、およびコネクタによって必要とされるその他の設定を含みます。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
これらの例の設定ファイルは、Kafkaに含まれ、前に開始したデフォルトのローカルクラスタの設定を使用し、二つのコネクタを生成します: 1つ目のコネクタは入力ファイルから行を読み込むソースコネクタで、Kafkaトピックにそれぞれを生成します。二つ目のコネクタはKafkaトピックからメッセージを読み込み、出力ファイル中に行としてそれぞれを生成するシンクコネクタです。
スタートアップの間に、コネクタがインスタンス化されたことを示す多くのログメッセージを見るでしょう。一度Kafkaコネクトの処理が開始されると、ソースのコネクタはtest.txt
からの行の読み込みを開始し、それらをトピックconnect-test
へ生成しなければなりません。シンクのコネクタはトピックconnect-test
からメッセージの読み込みを開始し、それらをファイルtest.sink.txt
に書き込まなければなりません。出力ファイルの内容を調べることでパイプライン全体を通して配送されるデータを検証することができます。
> more test.sink.txt foo bar
データはKafkaトピックconnect-test
内に格納されることに注意してください。つまり、トピック内のデータを見るためにコンソールのコンシューマを実行することもできます (あるいはそれを処理するために独自のコンシューマコードを使います):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} ...
コネクタはデータを処理しつづけます。つまり、データをファイルに追加しパイプラインを使って移動するのを見ることができます:
> echo Another line>> test.txt
コンシューマ出力のコンソール内とシンクファイル内に行が現れるのが見えるはずです。
ステップ 8: データを処理するためにKafkaストリームを使う
Kafkaストリームはミッション クリティカルなリアルタイムアプリケーションとマイクロサービスのためのクライアントライブラリです。この時、入力および/あるいは出力データはKafkaクラスタに格納されます。Kafkaストリームは、これらのアプリケーションがスケーラブルが高く、柔軟で、耐障害性があり、分散されるなど、クライアント側でKafkaのサーバクラスタの技術の恩恵を受けながら標準的なJavaとScalaアプリケーションを書くことと配備することの平易化を兼ね備えます。これは ライブラリ内でコード化されたストリームアプリケーションを実行する方法を示すでしょう。