Apache Kafka

チュートリアル: Kafkaストリームアプリケーションを書く

このガイドの中で、Kafkaストリームを使ってストリームアプリケーションを書くために独自のプロジェクトをスクラッチからセットアップに取り掛かります。まだ読んでいなければ、Kafkaストリームで書かれたストリームアプリケーションを実行する方法について最初にクイックスタートを読むことを強くお勧めします。

Mavenプロジェクトのセットアップ

以下のコマンドを使ってストリームのプロジェクト構造を作成するためにKafkaストリーム Maven Archetypeを使うつもりです:

        mvn archetype:generate \
            -DarchetypeGroupId=org.apache.kafka \
            -DarchetypeArtifactId=streams-quickstart-java \
            -DarchetypeVersion=2.7.0 \
            -DgroupId=streams.examples \
            -DartifactId=streams.examples \
            -Dversion=0.1 \
            -Dpackage=myapps

groupId, artifactIdpackage パラメータについて異なる値を好きなように使うことができます。上のパラメータの値が使われたと仮定した場合、このコマンドは以下のようなプロジェクトの構造を作成するでしょう:

        > tree streams.examples
        streams-quickstart
        |-- pom.xml
        |-- src
            |-- main
                |-- java
                |   |-- myapps
                |       |-- LineSplit.java
                |       |-- Pipe.java
                |       |-- WordCount.java
                |-- resources
                    |-- log4j.properties

プロジェクトに含まれる pom.xml ファイルは既に定義されたストリーム依存を持ちます。生成された pom.xml はJava8を対象にしており、より高いJavaバージョンでは動作しないことに注意してください。

src/main/javaの下にストリームライブラリを使って書かれた幾つかのプログラムの例が既にあります。そのようなプログラムをスクラッチから書き始めるため、今はそれらの例を削除することができます:

        > cd streams-quickstart
        > rm src/main/java/myapps/*.java

最初のストリームアプリケーションを書く: Pipe

コーディングの時間です!自由に好みのIDEを開き、このMavenプロジェクトをインポートするか、単純にテキストエディタを開き、src/main/java/myappsの下にjavaファイルを作成します。それにPipe.javaという名前をつけましょう:
        package myapps;

        public class Pipe {

            public static void main(String[] args) throws Exception {

            }
        }

このpipeプログラムを書くために main 関数の中に書き入れるつもりです。IDEは通常import文を自動的に追加することができるため、いつも行うようにはそれらをリストしないことに注意してください。しかし、もしテキストエディタを使っている場合は、importを手動で追加する必要があります。この章の最後で、import文を持つ完全なコードの断片を示すつもりです。

ストリーム アプリケーションを書く最初のステップは、StreamsConfigで定義されたように異なるストリーム実行設定を指定するためにjava.util.Properties のマップを作成することです。設定する必要がある2つの重要な設定値: StreamsConfig.BOOTSTRAP_SERVERS_CONFIG、これはKafkaクラスタへの初期接続を確立するために使われるホスト/ポートのペアのリストを指定します。StreamsConfig.APPLICATION_ID_CONFIG、これは同じKafkaクラスタに話しかけている他のアプリケーションとストリームアプリケーションとを識別するためのユニークな識別子を与えます。

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker this application is talking to runs on local machine with port 9092

さらに、例えばレコードのキーと値のペアのデフォルトのシリアライズ化およびデシリアライズ化ライブラリなど、同じマップの中の他の設定をカスタマイズすることができます:

        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

Kafkaストリームの設定の完全なリストについては、この を参照してください。

次に、ストリームアプリケーションの計算ロジックを定義します。Kafkaストリームでは、この計算ロジックは接続されたプロセッサのノードのトポロジ として定義されます。そのようなトポロジを構築するためにトポロジ ビルダーを使うことができます。

        final StreamsBuilder builder = new StreamsBuilder();

しして、このトポロジビルダーを使ってstreams-plaintext-inputという名前のKafkaトピックからソースストリームを作成します:

        KStream<String, String> source = builder.stream("streams-plaintext-input");

これで、ソースKafkaトピックstreams-plaintext-inputからレコードを絶え間なく生成するKStream を取得します。レコードはString 型有りのキー-値ペアとして組織化されます。このストリームを使ってできる最も簡単な事は、それを例えば streams-pipe-outputという名前の他のKafkaトピックに書き込むことです:

        source.to("streams-pipe-output");

上の2つの行を以下のように1つの行に連結することもできることに注意してください:

        builder.stream("streams-plaintext-input").to("streams-pipe-output");

以下を行うことでこのビルダーからどのようなトポロジが生成されたかを調べることができます:

        final Topology topology = builder.build();

そして、その記述を標準出力に出力します:

        System.out.println(topology.describe());

もしここで止めて、プログラムをコンパイルおよび実行すると、それは以下の情報を出力するでしょう:

        > mvn clean package
        > mvn exec:java -Dexec.mainClass=myapps.Pipe
        Sub-topologies:
          Sub-topology: 0
            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
            Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
        Global Stores:
          none

上で示されたように、それは構築されたトポロジが2つのプロセッサノード、ソースノードKSTREAM-SOURCE-0000000000 およびシンクノード KSTREAM-SINK-0000000001を持つことを明らかにします。KSTREAM-SOURCE-0000000000 は Kafka トピック streams-plaintext-input から連続してレコードを読み込み、それらをダウンストリーム ノード KSTREAM-SINK-0000000001にパイプします; KSTREAM-SINK-0000000001 は受信した各レコードを他のKafkaトピック streams-pipe-output に順番に書き込むでしょう (--><-- の矢印はこのノードのダウンストリームとアップストリーム プロセッサ ノードを指示します。つまりトポロジ グラフ内の "children" と "parents" です)。それはまたこの単純なトポロジがそれに関係するグローバルな状態ストアを持たないことを明らかにします (以下の章で状態ストアについて話すつもりです)。

コード内でトポロジを構築している間に、任意の時点で上記のようにトポロジをいつでも記述できることに注意してください。そのため、満足するまでトポロジで定義された計算ロジックを対話的に "試して味見をする" ことができます。あるKafkaトピックから別のトピックにデータをエンドレス ストリーミング方式でパイプするだけの簡単なトポロジが既に完了していると仮定すると、上記で構築した2つのコンポ―ネントでストリームクライアントを構築することができます: java.util.Properties インスタンスと Topology オブジェクト内で定義された設定マップ。

        final KafkaStreams streams = new KafkaStreams(topology, props);

start() 関数を呼び出すことで、このクライアントの実行を引き起こすことができます。クライアントでclose()が呼ばれるまで、実行は止まりません。例えば、カウントダウン ラッチ付きのシャットダウン フックを追加して、ユーザの割り込みをキャプチャし、このプログラムの終了時にクライアントを閉じることができます:

        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

これまでの完全なコードはこのようになります:

        package myapps;

        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.Topology;

        import java.util.Properties;
        import java.util.concurrent.CountDownLatch;

        public class Pipe {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                final StreamsBuilder builder = new StreamsBuilder();

                builder.stream("streams-plaintext-input").to("streams-pipe-output");

                final Topology topology = builder.build();

                final KafkaStreams streams = new KafkaStreams(topology, props);
                final CountDownLatch latch = new CountDownLatch(1);

                // attach shutdown handler to catch control-c
                Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                    @Override
                    public void run() {
                        streams.close();
                        latch.countDown();
                    }
                });

                try {
                    streams.start();
                    latch.await();
                } catch (Throwable e) {
                    System.exit(1);
                }
                System.exit(0);
            }
        }

すでにKafkaブローカーがlocalhost:9092で起動していて、トピック streams-plaintext-inputstreams-pipe-output が ブローカーで作成された場合、IDEまたはコマンドライン上でMavenを使ってこのコードを実行することができます:

        > mvn clean package
        > mvn exec:java -Dexec.mainClass=myapps.Pipe

ストリームアプリケーションを実行し、その計算結果を観察する方法の詳細な手順については、ストリームアプリケーションを触る の章を読んでください。この章の残りの部分では、これについては説明しません。

2つ目のストリームアプリケーションの作成: Line Split

2つの主要なコンポーネントを使ってストリームクライアントを作成する方法を学びました: StreamsConfigTopology。次に、現在のトポロジを拡張して、実際の処理ロジックを追加してみましょう。最初に既存のPipe.java クラスをコピーして、別のプログラムを作成します:

        > cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java

そして、元のプログラムと区別するために、クラス名とアプリケーションid設定を変更します:

        public class LineSplit {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
                // ...
            }
        }

それぞれのソース ストリームのレコードはString に型付けされた key-value ペアなので、文字列値をテキスト行として扱い、それを FlatMapValues オペレータを使って単語に分割します:

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.split("\\W+"));
                    }
                });

オペレータは入力としてsource ストリームを取り、ソース ストリームの各レコードを順番に処理し、値文字列を分割することで、words という名前の新しいストリームを生成し、各単語を出力words ストリームへの新しいレコードとして生成します。これは以前に受信したレコードまたは処理結果を追跡する必要が無い、ステートレス オペレーションです。JDK 8 を使っている場合は、lambda表現を使うことができ、上のコードを以下のように簡略化することができます:

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));

最後に、単語ストリームを他のKafkaトピック、例えば streams-linesplit-output に書き戻すことができます。繰り返しますが、これら2つのステップは、以下のように連結できます (ラムダ表現が使われると仮定):

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
              .to("streams-linesplit-output");

この拡張トポロジをSystem.out.println(topology.describe())として説明すると、以下を取得します:

        > mvn clean package
        > mvn exec:java -Dexec.mainClass=myapps.LineSplit
        Sub-topologies:
          Sub-topology: 0
            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
            Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
          Global Stores:
            none

上記のように、新しいプロセッサ ノードKSTREAM-FLATMAPVALUES-0000000001は元のソースノードとシンクノードの間のトポロジに挿入されます。ソースノードを親として、シンクノードは子として取られます。つまり、ソースノードによってフェッチされた各レコードは、最初に新しく追加された KSTREAM-FLATMAPVALUES-0000000001 ノードに移動して処理され、1つ以上の新しいレコードが結果として生成されます。それらはシンクノードまで移動を続け、Kafkaに書き戻されます。このプロセッサは、どのストアにも関連付けられていないため、"ステートレス" であることに注意してください (つまり (stores: []))。

完全なコードはこのようになります (lambda表現が使われると仮定):

        package myapps;

        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.Topology;
        import org.apache.kafka.streams.kstream.KStream;

        import java.util.Arrays;
        import java.util.Properties;
        import java.util.concurrent.CountDownLatch;

        public class LineSplit {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                final StreamsBuilder builder = new StreamsBuilder();

                KStream<String, String> source = builder.stream("streams-plaintext-input");
                source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                      .to("streams-linesplit-output");

                final Topology topology = builder.build();
                final KafkaStreams streams = new KafkaStreams(topology, props);
                final CountDownLatch latch = new CountDownLatch(1);

                // ... same as Pipe.java above
            }
        }

3番目のストリームアプリケーションを書く: Wordcount

次にソーステキストストリームから分割された単語の出現回数をカウントすることにより、トポロジに幾らかの "ステートフル" 計算を追加するために、さらに一歩進んでみましょう。同様の手順に従って、LineSplit.java クラスに基づいて別のプログラムを作成しましょう:

        public class WordCount {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
                // ...
            }
        }

単語をカウントするために、最初にflatMapValues オペレータを修正して、それら全てを小文字として扱うことができます (lambda表現が使われると仮定します):

        source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                    }
                });

In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a groupBy operator. この演算子は新しいグループ化されたストリームを生成します。このグループ化されたストリームはcount演算子によって集約され、グループ化されたキーごとに実行カウントを生成します:

        KTable<String, Long> counts =
        source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                    }
                })
              .groupBy(new KeyValueMapper<String, String, String>() {
                   @Override
                   public String apply(String key, String value) {
                       return value;
                   }
                })
              // Materialize the result into a KeyValueStore named "counts-store".
              // The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store.
              .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

count演算子には、実行カウントをcounts-storeという名前の状態ストアに格納することを指定するMaterializedパラメータがあることに注意してください。この Countsストアはリアルタイムでクエリすることができます。詳細については開発者マニュアルで説明されています。

counts KTableの変更ログストリームを他のKafkaストリーム、例えば streams-wordcount-outputに書き戻すことができます。結果は変更ログストリームのため、出力トピック streams-wordcount-output はログ圧縮が有効に設定されていなければなりません。今回は、値の型はもうStringではなくLongであるため、デフォルトのシリアライズ化クラスはKafkaにもう書き込むことができません。Long型のための上書きされたシリアライズ化メソッドを提供する必要があります。そうでなければ、ランタイム例外が投げられます:

        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

トピックstreams-wordcount-outputからの変更ログストリームを読み込むには、値のデシリアライズ化を org.apache.kafka.common.serialization.LongDeserializerとして設定する必要があることに注意してください。Details of this can be found in the Play with a Streams Application section. JDK 8 の lambda表現が使えると仮定すると、上のコードは以下のように簡略化ができます:

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
              .groupBy((key, value) -> value)
              .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
              .toStream()
              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

再び、この拡張トポロジをSystem.out.println(topology.describe())として説明すると、以下を取得します:

        > mvn clean package
        > mvn exec:java -Dexec.mainClass=myapps.WordCount
        Sub-topologies:
          Sub-topology: 0
            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
            Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
            Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
            Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
          Sub-topology: 1
            Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
            Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
            Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
            Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
        Global Stores:
          none

上記のように、トポロジは2つの切断された子トポロジを含みます。最初の子トポロジのsinkノードKSTREAM-SINK-0000000004は再パーティショントピック Counts-repartitionに書き込みます。これは2つ目の子トポロジのソースノード KSTREAM-SOURCE-0000000006 によって読み込まれます。再パーティション トピックは集約キーによってソースストリームを "シャッフル" するために使われ、この場合は値文字列です。さらに、最初の子トポロジ内で、ステートレスな KSTREAM-FILTER-0000000005 ノードは、グループ化 KSTREAM-KEY-SELECT-0000000002ノードと集約キーが空の中間レコードを通すシンクノードの間に挿入されます。

2つ目の子トポロジ内で、集約ノード KSTREAM-AGGREGATE-0000000003Counts (名前はユーザによって count演算子内で指定されます)という名前の状態ストアと関連付けられます。次のストリームソースノードから各レコードを受信すると、集約プロセッサはまずキーの現在のカウントを取得するために関連するCountsストアにクエリを実行し、1つ増やしてから、新しいカウントをストアに書き戻します。キーの更新された各カウントはKTABLE-TOSTREAM-0000000007ノードへのパイプされたダウンストリームで、さらにシンクノードKSTREAM-SINK-0000000008にパイプされる前にこの更新ストリームをKafkaに書き戻すためにレコードストリームとして解釈します。

完全なコードはこのようになります (lambda表現が使われると仮定):

        package myapps;

        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.Topology;
        import org.apache.kafka.streams.kstream.KStream;

        import java.util.Arrays;
        import java.util.Locale;
        import java.util.Properties;
        import java.util.concurrent.CountDownLatch;

        public class WordCount {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                final StreamsBuilder builder = new StreamsBuilder();

                KStream<String, String> source = builder.stream("streams-plaintext-input");
                source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                      .groupBy((key, value) -> value)
                      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                      .toStream()
                      .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

                final Topology topology = builder.build();
                final KafkaStreams streams = new KafkaStreams(topology, props);
                final CountDownLatch latch = new CountDownLatch(1);

                // ... same as Pipe.java above
            }
        }
inserted by FC2 system