Apache Kafka クイックスタート
Kafka を始めることに興味がありますか?このクイックスタートの指示に従うか、以下のビデオを見てください。
最新の Kafka リリースをダウンロードし、それを解凍してください:
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0
Kafka を始めることに興味がありますか?このクイックスタートの指示に従うか、以下のビデオを見てください。
最新の Kafka リリースをダウンロードし、それを解凍してください:
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0
注意: ローカル環境に Java 8+ がインストールされている必要があります。
全てのサービスを正しい順番で開始するために、以下のコマンドを実行します:
# ZooKeeper サービスを開始する
# 注意: まもなく、Apache Kafka は Zookeeper を必要としなくなります。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
他の端末セッションを開き、以下を実行します:
# Kafka ブローカーを開始する
$ bin/kafka-server-start.sh config/server.properties
全てのサービスが正常に起動すると、基本的な Kafka 環境が実行され、使えるようになります。
Kafka は、多くのマシン間でイベント (このドキュメントでは、レコードあるいはメッセージとも呼ばれる)を読み込み、書き込み、格納、および処理を可能にする、分散イベントストリーミングプラットフォームです。
イベントの例としては、支払いトランザクション、携帯電話からの地理位置情報の更新、発送注文、IoT デバイスまたは医療機器からのセンサー測定などがあります。これらのイベントはトピックに整理され、格納されます。非常に単純化されたトピックは、ファイルシステム内のフォルダに似ており、イベントはフォルダ内のファイルです。
従って、最初のイベントを書き込む前にトピックを作成する必要があります。他の端末セッションを開き、以下を実行します:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Kafka の全てのコマンドラインツールには、追加のオプションがあります: 引数無しで kafka-topics.sh
コマンドを実行すると、使用方法の情報が表示されます。例えば、新しいトピックのパーティション数などの詳細も表示できます。
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Kafka クライアントは、ネットワークを介して Kafka ブローカーとイベントの書き込み(あるいは読み込み)のための通信を行います。受け取ったブローカーは、必要な限り、永久に、永続的で耐障害性のある方法でイベントを格納します。
コンソールプロデューサクライアントを実行して、幾つかのイベントをトピックに書き込みます。デフォルトでは、入力した各行は、トピックに個別のイベントが書き込まれます。
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
いつでも Ctrl-C
を使ってプロデューサを停止することができます。
別の端末セッションを開き、作成したイベントを読み込むためにコンソールコンシューマクライアントを実行します:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
いつでも Ctrl-C
を使ってコンシューマを停止することができます。
自由に実験してください: 例えば、プロデューサ端末(前のステップ)に切り替えて、追加のイベントを書き込み、イベントがコンシューマ端末にすぐに表示されることを確認します。
イベントは Kafka に永続的に格納されるため、何回でも、好きなだけの数のコンシューマが読み取ることができます。これを確認するには、さらに別の端末セッションを開いて、前のコマンドを再実行します。
おそらく、リレーショナルデータベースや従来のメッセージングシステムなどの既存のシステムに、すでにこれらのシステムを使っているアプリケーションと一緒に大量のデータがあるはずです。Kafka コネクタにより、外部のシステムから Kafka へ、またはその逆で、継続にデータを取り込むことができます。従って、既存のシステムを Kafka に統合することは非常に簡単です。この処理をさらに簡単にするために、すぐに利用できるそのような数百のコネクタがあります。
Kafka コネクトセクションで、Kafka にデータを継続的にインポート/エクスポートする方法の詳細をご覧ください。
データが Kafka にイベントとして格納されると、Java/Scala 用の Kafka ストリームクライアントライブラリを使ってデータを処理することができます。これにより、ミッションクリティカルなリアルタイムアプリケーションとマイクロサービスを実装することができます。ここで、入力および/または出力データは Kafka トピックに格納されます。Kafkaストリームは、これらのアプリケーションがスケーラブルが高く、柔軟で、耐障害性があり、分散されるなど、クライアント側でKafkaのサーバクラスタの技術の恩恵を受けながら標準的なJavaとScalaアプリケーションを書くことと配備することの平易化を兼ね備えます。ライブラリは、確実に1回の処理、ステートフル操作と集約、ウィンドウ、結合、イベント時間に基づいた処理などをサポートします。
最初の一噛みとして、人気のある WordCount
アルゴリズムを実装する方法を以下に示します:
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));
Kafka ストリームのデモとアプリ開発チュートリアルは、そのようなストリーミングアプリケーションを最初から最後までコーディングして実行する方法を示します。
クイックスタートの最後に達したので、自由に Kafka 環境を破棄するか、または引き続き遊んでください。
Ctrl-C
を使ってプロデューサとコンシューマクライアントを停止します。
Ctrl-C
を使って Kafka ブローカーを停止します。
Ctrl-C
を使って ZooKeeper サーバを停止します。
途中で作成したイベントなど、ローカルの Kafka 環境のデータも削除する場合は、以下のコマンドを実行します:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
Apache Kafka クイックスタートが正常に完了しました。
詳細については、次の手順をお勧めします: