チュートリアル: Kafkaストリームアプリケーションを書く
このガイドの中で、Kafkaストリームを使ってストリームアプリケーションを書くために独自のプロジェクトをスクラッチからセットアップに取り掛かります。まだ読んでいなければ、Kafkaストリームで書かれたストリームアプリケーションを実行する方法について最初にクイックスタートを読むことを強くお勧めします。
Mavenプロジェクトのセットアップ
以下のコマンドを使ってストリームのプロジェクト構造を作成するためにKafkaストリーム Maven Archetypeを使うつもりです:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=2.7.0 \
-DgroupId=streams.examples \
-DartifactId=streams.examples \
-Dversion=0.1 \
-Dpackage=myapps
groupId
, artifactId
と package
パラメータについて異なる値を好きなように使うことができます。上のパラメータの値が使われたと仮定した場合、このコマンドは以下のようなプロジェクトの構造を作成するでしょう:
> tree streams.examples
streams-quickstart
|-- pom.xml
|-- src
|-- main
|-- java
| |-- myapps
| |-- LineSplit.java
| |-- Pipe.java
| |-- WordCount.java
|-- resources
|-- log4j.properties
プロジェクトに含まれる pom.xml
ファイルは既に定義されたストリーム依存を持ちます。生成された pom.xml
はJava8を対象にしており、より高いJavaバージョンでは動作しないことに注意してください。
src/main/java
の下にストリームライブラリを使って書かれた幾つかのプログラムの例が既にあります。そのようなプログラムをスクラッチから書き始めるため、今はそれらの例を削除することができます:
> cd streams-quickstart
> rm src/main/java/myapps/*.java
最初のストリームアプリケーションを書く: Pipe
コーディングの時間です!自由に好みのIDEを開き、このMavenプロジェクトをインポートするか、単純にテキストエディタを開き、src/main/java/myapps
の下にjavaファイルを作成します。それにPipe.java
という名前をつけましょう:
package myapps;
public class Pipe {
public static void main(String[] args) throws Exception {
}
}
このpipeプログラムを書くために main
関数の中に書き入れるつもりです。IDEは通常import文を自動的に追加することができるため、いつも行うようにはそれらをリストしないことに注意してください。しかし、もしテキストエディタを使っている場合は、importを手動で追加する必要があります。この章の最後で、import文を持つ完全なコードの断片を示すつもりです。
ストリーム アプリケーションを書く最初のステップは、StreamsConfig
で定義されたように異なるストリーム実行設定を指定するためにjava.util.Properties
のマップを作成することです。設定する必要がある2つの重要な設定値: StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
、これはKafkaクラスタへの初期接続を確立するために使われるホスト/ポートのペアのリストを指定します。StreamsConfig.APPLICATION_ID_CONFIG
、これは同じKafkaクラスタに話しかけている他のアプリケーションとストリームアプリケーションとを識別するためのユニークな識別子を与えます。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
さらに、例えばレコードのキーと値のペアのデフォルトのシリアライズ化およびデシリアライズ化ライブラリなど、同じマップの中の他の設定をカスタマイズすることができます:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Kafkaストリームの設定の完全なリストについては、この 表を参照してください。
次に、ストリームアプリケーションの計算ロジックを定義します。Kafkaストリームでは、この計算ロジックは接続されたプロセッサのノードのトポロジ
として定義されます。そのようなトポロジを構築するためにトポロジ ビルダーを使うことができます。
final StreamsBuilder builder = new StreamsBuilder();
しして、このトポロジビルダーを使ってstreams-plaintext-input
という名前のKafkaトピックからソースストリームを作成します:
KStream<String, String> source = builder.stream("streams-plaintext-input");
これで、ソースKafkaトピックstreams-plaintext-input
からレコードを絶え間なく生成するKStream
を取得します。レコードはString
型有りのキー-値ペアとして組織化されます。このストリームを使ってできる最も簡単な事は、それを例えば streams-pipe-output
という名前の他のKafkaトピックに書き込むことです:
source.to("streams-pipe-output");
上の2つの行を以下のように1つの行に連結することもできることに注意してください:
builder.stream("streams-plaintext-input").to("streams-pipe-output");
以下を行うことでこのビルダーからどのようなトポロジ
が生成されたかを調べることができます:
final Topology topology = builder.build();
そして、その記述を標準出力に出力します:
System.out.println(topology.describe());
もしここで止めて、プログラムをコンパイルおよび実行すると、それは以下の情報を出力するでしょう:
> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.Pipe
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
Global Stores:
none
上で示されたように、それは構築されたトポロジが2つのプロセッサノード、ソースノードKSTREAM-SOURCE-0000000000
およびシンクノード KSTREAM-SINK-0000000001
を持つことを明らかにします。KSTREAM-SOURCE-0000000000
は Kafka トピック streams-plaintext-input
から連続してレコードを読み込み、それらをダウンストリーム ノード KSTREAM-SINK-0000000001
にパイプします; KSTREAM-SINK-0000000001
は受信した各レコードを他のKafkaトピック streams-pipe-output
に順番に書き込むでしょう (-->
と <--
の矢印はこのノードのダウンストリームとアップストリーム プロセッサ ノードを指示します。つまりトポロジ グラフ内の "children" と "parents" です)。それはまたこの単純なトポロジがそれに関係するグローバルな状態ストアを持たないことを明らかにします (以下の章で状態ストアについて話すつもりです)。
コード内でトポロジを構築している間に、任意の時点で上記のようにトポロジをいつでも記述できることに注意してください。そのため、満足するまでトポロジで定義された計算ロジックを対話的に "試して味見をする" ことができます。あるKafkaトピックから別のトピックにデータをエンドレス ストリーミング方式でパイプするだけの簡単なトポロジが既に完了していると仮定すると、上記で構築した2つのコンポ―ネントでストリームクライアントを構築することができます: java.util.Properties
インスタンスと Topology
オブジェクト内で定義された設定マップ。
final KafkaStreams streams = new KafkaStreams(topology, props);
start()
関数を呼び出すことで、このクライアントの実行を引き起こすことができます。クライアントでclose()
が呼ばれるまで、実行は止まりません。例えば、カウントダウン ラッチ付きのシャットダウン フックを追加して、ユーザの割り込みをキャプチャし、このプログラムの終了時にクライアントを閉じることができます:
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
これまでの完全なコードはこのようになります:
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
すでにKafkaブローカーがlocalhost:9092
で起動していて、トピック streams-plaintext-input
と streams-pipe-output
が ブローカーで作成された場合、IDEまたはコマンドライン上でMavenを使ってこのコードを実行することができます:
> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.Pipe
ストリームアプリケーションを実行し、その計算結果を観察する方法の詳細な手順については、ストリームアプリケーションを触る の章を読んでください。この章の残りの部分では、これについては説明しません。
2つ目のストリームアプリケーションの作成: Line Split
2つの主要なコンポーネントを使ってストリームクライアントを作成する方法を学びました: StreamsConfig
と Topology
。次に、現在のトポロジを拡張して、実際の処理ロジックを追加してみましょう。最初に既存のPipe.java
クラスをコピーして、別のプログラムを作成します:
> cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
そして、元のプログラムと区別するために、クラス名とアプリケーションid設定を変更します:
public class LineSplit {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
// ...
}
}
それぞれのソース ストリームのレコードはString
に型付けされた key-value ペアなので、文字列値をテキスト行として扱い、それを FlatMapValues
オペレータを使って単語に分割します:
KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split("\\W+"));
}
});
オペレータは入力としてsource
ストリームを取り、ソース ストリームの各レコードを順番に処理し、値文字列を分割することで、words
という名前の新しいストリームを生成し、各単語を出力words
ストリームへの新しいレコードとして生成します。これは以前に受信したレコードまたは処理結果を追跡する必要が無い、ステートレス オペレーションです。JDK 8 を使っている場合は、lambda表現を使うことができ、上のコードを以下のように簡略化することができます:
KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
最後に、単語ストリームを他のKafkaトピック、例えば streams-linesplit-output
に書き戻すことができます。繰り返しますが、これら2つのステップは、以下のように連結できます (ラムダ表現が使われると仮定):
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
この拡張トポロジをSystem.out.println(topology.describe())
として説明すると、以下を取得します:
> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.LineSplit
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
Global Stores:
none
上記のように、新しいプロセッサ ノードKSTREAM-FLATMAPVALUES-0000000001
は元のソースノードとシンクノードの間のトポロジに挿入されます。ソースノードを親として、シンクノードは子として取られます。つまり、ソースノードによってフェッチされた各レコードは、最初に新しく追加された KSTREAM-FLATMAPVALUES-0000000001
ノードに移動して処理され、1つ以上の新しいレコードが結果として生成されます。それらはシンクノードまで移動を続け、Kafkaに書き戻されます。このプロセッサは、どのストアにも関連付けられていないため、"ステートレス" であることに注意してください (つまり (stores: [])
)。
完全なコードはこのようになります (lambda表現が使われると仮定):
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class LineSplit {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// ... same as Pipe.java above
}
}
3番目のストリームアプリケーションを書く: Wordcount
次にソーステキストストリームから分割された単語の出現回数をカウントすることにより、トポロジに幾らかの "ステートフル" 計算を追加するために、さらに一歩進んでみましょう。同様の手順に従って、LineSplit.java
クラスに基づいて別のプログラムを作成しましょう:
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
// ...
}
}
単語をカウントするために、最初にflatMapValues
オペレータを修正して、それら全てを小文字として扱うことができます (lambda表現が使われると仮定します):
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
});
In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a groupBy
operator. この演算子は新しいグループ化されたストリームを生成します。このグループ化されたストリームはcount
演算子によって集約され、グループ化されたキーごとに実行カウントを生成します:
KTable<String, Long> counts =
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return value;
}
})
// Materialize the result into a KeyValueStore named "counts-store".
// The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store.
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
count
演算子には、実行カウントをcounts-store
という名前の状態ストアに格納することを指定するMaterialized
パラメータがあることに注意してください。この Counts
ストアはリアルタイムでクエリすることができます。詳細については開発者マニュアルで説明されています。
counts
KTableの変更ログストリームを他のKafkaストリーム、例えば streams-wordcount-output
に書き戻すことができます。結果は変更ログストリームのため、出力トピック streams-wordcount-output
はログ圧縮が有効に設定されていなければなりません。今回は、値の型はもうString
ではなくLong
であるため、デフォルトのシリアライズ化クラスはKafkaにもう書き込むことができません。Long
型のための上書きされたシリアライズ化メソッドを提供する必要があります。そうでなければ、ランタイム例外が投げられます:
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
トピックstreams-wordcount-output
からの変更ログストリームを読み込むには、値のデシリアライズ化を org.apache.kafka.common.serialization.LongDeserializer
として設定する必要があることに注意してください。Details of this can be found in the Play with a Streams Application section. JDK 8 の lambda表現が使えると仮定すると、上のコードは以下のように簡略化ができます:
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
再び、この拡張トポロジをSystem.out.println(topology.describe())
として説明すると、以下を取得します:
> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores:
none
上記のように、トポロジは2つの切断された子トポロジを含みます。最初の子トポロジのsinkノードKSTREAM-SINK-0000000004
は再パーティショントピック Counts-repartition
に書き込みます。これは2つ目の子トポロジのソースノード KSTREAM-SOURCE-0000000006
によって読み込まれます。再パーティション トピックは集約キーによってソースストリームを "シャッフル" するために使われ、この場合は値文字列です。さらに、最初の子トポロジ内で、ステートレスな KSTREAM-FILTER-0000000005
ノードは、グループ化 KSTREAM-KEY-SELECT-0000000002
ノードと集約キーが空の中間レコードを通すシンクノードの間に挿入されます。
2つ目の子トポロジ内で、集約ノード KSTREAM-AGGREGATE-0000000003
はCounts
(名前はユーザによって count
演算子内で指定されます)という名前の状態ストアと関連付けられます。次のストリームソースノードから各レコードを受信すると、集約プロセッサはまずキーの現在のカウントを取得するために関連するCounts
ストアにクエリを実行し、1つ増やしてから、新しいカウントをストアに書き戻します。キーの更新された各カウントはKTABLE-TOSTREAM-0000000007
ノードへのパイプされたダウンストリームで、さらにシンクノードKSTREAM-SINK-0000000008
にパイプされる前にこの更新ストリームをKafkaに書き戻すためにレコードストリームとして解釈します。
完全なコードはこのようになります (lambda表現が使われると仮定):
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// ... same as Pipe.java above
}
}