Kafkaストリーム デモ アプリケーションの実行
このチュートリアルでは、あなたは新しく始めて既存のKafkaあるいはZooKeeperデータが無いと仮定します。しかし、もし既にKafkaとZooKeeperを始めている場合は、最初の2つのステップを遠慮なくスキップしてください。
Kafkaストリームはミッション クリティカルなリアルタイムアプリケーションとマイクロサービスのためのクライアントライブラリです。この時、入力および/あるいは出力データはKafkaクラスタに格納されます。Kafkaストリームは、これらのアプリケーションがスケーラブルが高く、柔軟で、耐障害性があり、分散されるなど、クライアント側でKafkaのサーバクラスタの技術の恩恵を受けながら標準的なJavaとScalaアプリケーションを書くことと配備することの平易化を兼ね備えます。
このクイックスタートの例はこのライブラリ内でコードされたストリーミングアプリケーションを実行する方法を説明するでしょう。これは WordCountDemo
の例のコードの要旨です (読みやすいようにJava 8 lamda表現を使うように変換されています)。
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(
"streams-plaintext-input",
Consumed.with(stringSerde, stringSerde)
);
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count();
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
WordCount アルゴリズムを実装しています。これは入力テキストから単語の出現ヒストグラムを計算します。しかし、あなたがこれまで見てきたような有限データ上で操作するWordCountの例と異なり、WordCoundデモアプリケーションは 無限、制限のないストリームのデータ上での操作をするように設計されているため、わずかに異なる挙動をします。有限の変異種に似て、それは単語の数を追跡および更新するステートフルなアルゴリズムです。しかし、無限の入力データの可能性があると仮定しなければならないため、"全ての"入力データが処理されたかを知ることができないため、データを処理している間は現在の状態と結果を定期的に出力するでしょう。
最初のステップとして、Kafkaを開始し(まだ開始していない場合)、Kafkaトピックへの入力データを準備します。これは後でKafkaストリームアプリケーションによって処理されるでしょう。
ステップ 1: コードのダウンロード
Download the 2.7.0 release and un-tar it. Note that there are multiple downloadable Scala versions and we choose to use the recommended version (2.13) here:> tar -xzf kafka_2.13-2.7.0.tgz
> cd kafka_2.13-2.7.0
ステップ 2: Kafkaサーバの開始
Kafka は ZooKeeperを使います。そのためまだZooKeeperが無い場合はそれを最初に開始する必要があります。quick-and-dirty single-node ZooKeeper インスタンスを取得するためにKafkaにパッケージ化されている便利なスクリプトを使うことができます。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
これでKafkaサーバを開始します:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
ステップ 3: 入力トピックを準備し、Kafkaプロデューサを開始
次に、streams-plaintext-input という名前の入力トピックと、streams-wordcount-outputという名前の出力トピックを作成します:> bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Created topic "streams-plaintext-input".
注意: 出力ストリームは変更ログストリームのため、コンパクションが可能な出力トピックを作成します (参照 以下のアプリケーション出力の説明 )。
> bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Created topic "streams-wordcount-output".
作成されたトピックは同じkafka-topics ツールを受かって記述することができます:
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
ステップ 4: Wordcount アプリケーションの開始
以下のコマンドは WordCount デモ アプリケーションを開始します:> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
デモ アプリケーションは入力トピック streams-plaintext-input から読み込み、各読み込みメッセージ上で WordCount アルゴリズムの計算を行い、現在の結果を絶え間なく出力トピック streams-wordcount-output に書き込みます。従って、結果としてKafkaに書き戻されるログ エントリ以外のSTDOUT出力はありません。
これで、何らかの入力データをこのトピックに書き込むために、別のターミナル内でコンソール プロデューサを開始することができます:> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
そして、別のターミナル内のコンソール コンシューマを使って出力トピックから読み込むことでWordCountデモアプリケーションの出力を調べることができます:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
ステップ 5: データの処理
では、1行のテキストを入力して <RETURN> を打つことで、コンソール プロデューサを使っていくつかのメッセージを入力トピックstreams-plaintext-input に書き込んでみましょう。これは新しいメッセージを入力トピックに送信するでしょう。メッセージのキーはnullでメッセージの値はちょうど打ち込んだテキストの行の符号化された文字列です (実際には、アプリケーションの入力データは、このクイックスタートで行うような手動で入力されるものではなく、一般的にKafkaへの連続したストリーミングです):> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
このメッセージはWordcountアプリケーションによって処理され、以下のような出力データは streams-wordcount-outputトピックに書き込まれ、コンソールコンシューマによって出力されるでしょう:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
ここで、最初のカラムはjava.lang.String
形式のKafkaメッセージ キーで、カウントされた単語を表します。2つ目のカラムは java.lang.Long
形式のメッセージの値で、単語の最新のカウントを表します。
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
コンソール コンシューマが実行中の他のターミナル内で、WordCountアプリケーションが新しい出力データを書き込んだことを見つけるでしょう:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
ここで、最後に出力された行 kafka 2 と streams 2 は、カウントが1 から 2にインクリメントされたキー kafka と streams への更新を示します。入力トピックに入力メッセージを書き込む度に、streams-wordcount-output トピックに新しいメッセージが追加されることを見つけ、WordCountアプリケーションによって計算された最新の単語のカウントが表示されます。このクイックスタートを終える前に、コンソールプロデューサ内で入力トピック streams-plaintext-input に最後の入力テキスト行 "join kafka summit" を入力し、<RETURN> を打ちます:
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
join kafka summit
streams-wordcount-output トピックは続いて対応する更新された単語のカウントを示すでしょう (最後の3行を見てください):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
御覧のように、Wrodcountアプリケーションの出力は実際には更新の連続するストリームです。各出力レコード(別の言い方をすると、上の元の出力内の各行)は、1つの単語の更新されたカウント、つまり"kafka"のようなレコードキーです。同じキーの複数のレコードについては、それぞれの後者のレコードが前者のレコードを更新します。
以下の2つの図は、背後で本質的に何が起こっているかを説明します。最初のカラムはcount
についての単語の出現をカウントしているKTable<String, Long>
の現在の状態の進み方を表します。2つ目のカラムは、KTableの状態更新の結果と出力Kafkaトピック streams-wordcount-outputへ送信されるレコードの変化を表します。
まず、テキスト行 "all streams lead to kafka" が処理されます。新しい単語が新しいテーブルエントリ (緑色の背景で強調表示される)になると、KTable
が構築され、対応する変更レコードが下流のKStream
に送信されます。
2つ目のテキスト行 "hello kafka streams" が処理される時に、初めてKTable
内の既存のエントリが更新されることを見つけます (ここでは: 単語"kafka"と"streams"について)。そして再び変更レコードが出力トピックに送信されます。
等々 (3つ目の行がどう処理されるかの説明をスキップします)。これはなぜ出力トピックが上で説明した内容を持つかを説明します。それは変更の完全なレコードを持つからです。
この具体的な例の範囲を超えてKafkaストリームがここで行っていることは、テーブルと変更ログストリームの二重性を活用することです (ここで: table = KTable, 変更ログストリーム = ダウンストリーム KStream): テーブルの各変更をスートリームに公開し、もし変更ログ全体を最初から最後まで消費する場合は、テーブルの内容を再構成することができます)。
ステップ 6: アプリケーションの取り壊し
これで、Ctrl-Cを使って、コンソール コンシューマ、コンソール プロデューサ、WordCountアプリケーション、KafkaブローカーおよびZooKeeperサーバを順番に停止することができます。