ステップ 1: Kafka の取得

最新の Kafka リリースをダウンロードし、それを解凍してください:

$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0

ステップ 2: Kafka 環境を開始

注意: ローカル環境に Java 8+ がインストールされている必要があります。

全てのサービスを正しい順番で開始するために、以下のコマンドを実行します:

# ZooKeeper サービスを開始する
# 注意: まもなく、Apache Kafka は Zookeeper を必要としなくなります。
$ bin/zookeeper-server-start.sh config/zookeeper.properties

他の端末セッションを開き、以下を実行します:

# Kafka ブローカーを開始する
$ bin/kafka-server-start.sh config/server.properties

全てのサービスが正常に起動すると、基本的な Kafka 環境が実行され、使えるようになります。

ステップ 3: イベントを格納するトピックを作成

Kafka は、多くのマシン間でイベント (このドキュメントでは、レコードあるいはメッセージとも呼ばれる)を読み込み、書き込み、格納、および処理を可能にする、分散イベントストリーミングプラットフォームです。

イベントの例としては、支払いトランザクション、携帯電話からの地理位置情報の更新、発送注文、IoT デバイスまたは医療機器からのセンサー測定などがあります。これらのイベントはトピックに整理され、格納されます。非常に単純化されたトピックは、ファイルシステム内のフォルダに似ており、イベントはフォルダ内のファイルです。

従って、最初のイベントを書き込む前にトピックを作成する必要があります。他の端末セッションを開き、以下を実行します:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Kafka の全てのコマンドラインツールには、追加のオプションがあります: 引数無しで kafka-topics.sh コマンドを実行すると、使用方法の情報が表示されます。例えば、新しいトピックのパーティション数などの詳細も表示できます。

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

ステップ 4: トピックへのイベントの書き込み

Kafka クライアントは、ネットワークを介して Kafka ブローカーとイベントの書き込み(あるいは読み込み)のための通信を行います。受け取ったブローカーは、必要な限り、永久に、永続的で耐障害性のある方法でイベントを格納します。

コンソールプロデューサクライアントを実行して、幾つかのイベントをトピックに書き込みます。デフォルトでは、入力した各行は、トピックに個別のイベントが書き込まれます。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

いつでも Ctrl-C を使ってプロデューサを停止することができます。

ステップ 5: イベントの読み込み

別の端末セッションを開き、作成したイベントを読み込むためにコンソールコンシューマクライアントを実行します:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

いつでも Ctrl-C を使ってコンシューマを停止することができます。

自由に実験してください: 例えば、プロデューサ端末(前のステップ)に切り替えて、追加のイベントを書き込み、イベントがコンシューマ端末にすぐに表示されることを確認します。

イベントは Kafka に永続的に格納されるため、何回でも、好きなだけの数のコンシューマが読み取ることができます。これを確認するには、さらに別の端末セッションを開いて、前のコマンドを再実行します。

ステップ 6: Kafka コネクトを使ってイベントのストリームとしてデータをインポート/エクスポート

おそらく、リレーショナルデータベースや従来のメッセージングシステムなどの既存のシステムに、すでにこれらのシステムを使っているアプリケーションと一緒に大量のデータがあるはずです。Kafka コネクタにより、外部のシステムから Kafka へ、またはその逆で、継続にデータを取り込むことができます。従って、既存のシステムを Kafka に統合することは非常に簡単です。この処理をさらに簡単にするために、すぐに利用できるそのような数百のコネクタがあります。

Kafka コネクトセクションで、Kafka にデータを継続的にインポート/エクスポートする方法の詳細をご覧ください。

ステップ 7: Kafka ストリームを使ったイベントの処理

データが Kafka にイベントとして格納されると、Java/Scala 用の Kafka ストリームクライアントライブラリを使ってデータを処理することができます。これにより、ミッションクリティカルなリアルタイムアプリケーションとマイクロサービスを実装することができます。ここで、入力および/または出力データは Kafka トピックに格納されます。Kafkaストリームは、これらのアプリケーションがスケーラブルが高く、柔軟で、耐障害性があり、分散されるなど、クライアント側でKafkaのサーバクラスタの技術の恩恵を受けながら標準的なJavaとScalaアプリケーションを書くことと配備することの平易化を兼ね備えます。ライブラリは、確実に1回の処理、ステートフル操作と集約、ウィンドウ、結合、イベント時間に基づいた処理などをサポートします。

最初の一噛みとして、人気のある WordCount アルゴリズムを実装する方法を以下に示します:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));

Kafka ストリームのデモアプリ開発チュートリアルは、そのようなストリーミングアプリケーションを最初から最後までコーディングして実行する方法を示します。

ステップ 8: Kafka 環境の終了

クイックスタートの最後に達したので、自由に Kafka 環境を破棄するか、または引き続き遊んでください。

  1. まだ行っていない場合は、Ctrl-C を使ってプロデューサとコンシューマクライアントを停止します。
  2. Ctrl-C を使って Kafka ブローカーを停止します。
  3. 最後に、Ctrl-C を使って ZooKeeper サーバを停止します。

途中で作成したイベントなど、ローカルの Kafka 環境のデータも削除する場合は、以下のコマンドを実行します:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper

おめでとう!

Apache Kafka クイックスタートが正常に完了しました。

詳細については、次の手順をお勧めします:

  • Kafka が高レベルでどのように動作するか、その主な概念、他のテクノロジーとの比較について学ぶには、も次回始めにを読んでください。Kafka をさらに詳細に理解する胃は、ドキュメントにアクセスしてください。
  • ユースケースを閲覧して、世界中のコミュニティの他のユーザが Kafka からどのように価値を得ているかを学んでください。
  • 地元の Kakfa ミートアップグループに参加し、Kakfa コミュニティのメインカンファレンスであるKafka サミットのトークを見てください。
TOP
inserted by FC2 system