Run Kafka Streams Demo Application
このチュートリアルでは、あなたは新しく始めて既存のKafkaあるいはZooKeeperデータが無いと仮定します。しかし、もし既にKafkaとZooKeeperを始めている場合は、最初の2つのステップを遠慮なくスキップしてください。
Kafkaストリームはミッション クリティカルなリアルタイムアプリケーションとマイクロサービスのためのクライアントライブラリです。この時、入力および/あるいは出力データはKafkaクラスタに格納されます。Kafkaストリームは、これらのアプリケーションがスケーラブルが高く、柔軟で、耐障害性があり、分散されるなど、クライアント側でKafkaのサーバクラスタの技術の恩恵を受けながら標準的なJavaとScalaアプリケーションを書くことと配備することの平易化を兼ね備えます。
このクイックスタートの例はこのライブラリ内でコードされたストリーミングアプリケーションを実行する方法を説明するでしょう。これは WordCountDemo
の例のコードの要旨です (読みやすいようにJava 8 lamda表現を使うように変換されています)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream( "streams-plaintext-input" , Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split( "\\W+" ))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to( "streams-wordcount-output" , Produced.with(Serdes.String(), Serdes.Long())); |
WordCount アルゴリズムを実装しています。これは入力テキストから単語の出現ヒストグラムを計算します。しかし、あなたがこれまで見てきたような有限データ上で操作するWordCountの例と異なり、WordCoundデモアプリケーションは 無限、制限のないストリームのデータ上での操作をするように設計されているため、わずかに異なる挙動をします。有限の変異種に似て、それは単語の数を追跡および更新するステートフルなアルゴリズムです。しかし、無限の入力データの可能性があると仮定しなければならないため、"全ての"入力データが処理されたかを知ることができないため、データを処理している間は現在の状態と結果を定期的に出力するでしょう。
最初のステップとして、Kafkaを開始し(まだ開始していない場合)、Kafkaトピックへの入力データを準備します。これは後でKafkaストリームアプリケーションによって処理されるでしょう。
ステップ 1: コードのダウンロード
2.0.0リリースをダウンロードし、それを解凍します。複数のダウンロード可能なScalaバージョンがあり、推奨されるバージョン (2.11) を使うことをここで選択することに注意してください:1 2 |
> tar -xzf kafka_2.11-2.0.0.tgz > cd kafka_2.11-2.0.0 |
ステップ 2: Kafkaサーバの開始
Kafka は ZooKeeperを使います。そのためまだZooKeeperが無い場合はそれを最初に開始する必要があります。quick-and-dirty single-node ZooKeeper インスタンスを取得するためにKafkaにパッケージ化されている便利なスクリプトを使うことができます。
1 2 3 |
> bin /zookeeper-server-start .sh config /zookeeper .properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config /zookeeper .properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... |
これでKafkaサーバを開始します:
1 2 3 4 |
> bin /kafka-server-start .sh config /server .properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ... |
ステップ 3: 入力トピックを準備し、Kafkaプロデューサを開始
次に、streams-plaintext-input という名前の入力トピックと、streams-wordcount-outputという名前の出力トピックを作成します:1 2 3 4 5 6 |
> bin /kafka-topics .sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input Created topic "streams-plaintext-input" . |
1 2 3 4 5 6 7 |
> bin /kafka-topics .sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact Created topic "streams-wordcount-output" . |
1 2 3 4 5 6 |
> bin /kafka-topics .sh --zookeeper localhost:2181 --describe Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0 |
ステップ 4: Wordcount アプリケーションの開始
以下のコマンドは WordCount デモ アプリケーションを開始します:1 |
> bin /kafka-run-class .sh org.apache.kafka.streams.examples.wordcount.WordCountDemo |
デモ アプリケーションは入力トピック streams-plaintext-input から読み込み、各読み込みメッセージ上で WordCount アルゴリズムの計算を行い、現在の結果を絶え間なく出力トピック streams-wordcount-output に書き込みます。Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
これで、何らかの入力データをこのトピックに書き込むために、別のターミナル内でコンソール プロデューサを開始することができます:1 |
> bin /kafka-console-producer .sh --broker-list localhost:9092 --topic streams-plaintext-input |
1 2 3 4 5 6 7 8 |
> bin /kafka-console-consumer .sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key= true \ --property print.value= true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer |
ステップ 5: データの処理
では、1行のテキストを入力して <RETURN> を打つことで、コンソール プロデューサを使っていくつかのメッセージを入力トピックstreams-plaintext-input に書き込んでみましょう。これは新しいメッセージを入力トピックに送信するでしょう。メッセージのキーはnullでメッセージの値はちょうど打ち込んだテキストの行の符号化された文字列です (実際には、アプリケーションの入力データは、このクイックスタートで行うような手動で入力されるものではなく、一般的にKafkaへの連続したストリーミングです):1 2 |
> bin /kafka-console-producer .sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka |
このメッセージはWordcountアプリケーションによって処理され、以下のような出力データは streams-wordcount-outputトピックに書き込まれ、コンソールコンシューマによって出力されるでしょう:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
> bin /kafka-console-consumer .sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key= true \ --property print.value= true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 |
ここで、最初のカラムはjava.lang.String
形式のKafkaメッセージ キーで、カウントされた単語を表します。2つ目のカラムは java.lang.Long
形式のメッセージの値で、単語の最新のカウントを表します。
1 2 3 |
> bin /kafka-console-producer .sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka hello kafka streams |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
> bin /kafka-console-consumer .sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key= true \ --property print.value= true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 |
1 2 3 4 |
> bin /kafka-console-producer .sh --broker-list localhost:9092 --topic streams-wordcount-input all streams lead to kafka hello kafka streams join kafka summit |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
> bin /kafka-console-consumer .sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key= true \ --property print.value= true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 join 1 kafka 3 summit 1 |
以下の2つの図は、背後で本質的に何が起こっているかを説明します。最初のカラムはcount
についての単語の出現をカウントしているKTable<String, Long>
の現在の状態の進み方を表します。2つ目のカラムは、KTableの状態更新の結果と出力Kafkaトピック streams-wordcount-outputへ送信されるレコードの変化を表します。
まず、テキスト行 "all streams lead to kafka" が処理されます。新しい単語が新しいテーブルエントリ (緑色の背景で強調表示される)になると、KTable
が構築され、対応する変更レコードが下流のKStream
に送信されます。
2つ目のテキスト行 "hello kafka streams" が処理される時に、初めてKTable
内の既存のエントリが更新されることを見つけます (ここでは: 単語"kafka"と"streams"について)。そして再び変更レコードが出力トピックに送信されます。
等々 (3つ目の行がどう処理されるかの説明をスキップします)。これはなぜ出力トピックが上で説明した内容を持つかを説明します。それは変更の完全なレコードを持つからです。
Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.
ステップ 6: アプリケーションの取り壊し
これで、Ctrl-Cを使って、コンソール コンシューマ、コンソール プロデューサ、WordCountアプリケーション、KafkaブローカーおよびZooKeeperサーバを順番に停止することができます。