Kafka ストリーム
基幹業務のリアルタイムアプリケーションとマイクロサービスを書く最も簡単な方法
Kafkaストリームはアプリケーションとマイクロサービスを構築するためのクライアントライブラリです。この時、入力および/あるいは出力デーtはKafkaクラスタに格納されます。クライアント側でKafkaのサーバクラスタの技術の恩恵を受けながら標準的なJavaとScalaアプリケーションを書くことと配備することの平易化を兼ね備えます。
STREAMS APIのツアー
1Streamsへの導入
2Stream アプリケーションの作成
3データのPtの変換1
4データのPtの変換。11
なぜKafkaストリームを使うのが楽しいのか!
- 柔軟性があり、高スケーラブルで、耐障害性がある
- コンテナ、VM、bare metal、クラウドへの配備
- 小規模、中規模、および大規模なユースケースに等しく適している
- Kafkaセキュリティに完全に統合されている
- 標準的なJavaおよびScalaアプリケーションを書く
- 確実に1回の処理セマンティクス
- 処理クラスタの分割が必要ではない
- Mac, Linux, Windows で開発
Kafka ストリームの利用例
ニューヨークタイムズはApache Kafkaを使用し、Kafkaストリームを使って発行された内容を読者が利用できるようにする様々なアプリケーションとシステムにリアルタイムで格納および分配します。
ヨーロッパの先進的なオンラインファッションリテーラーのZalandoはKafkaをESB (Enterprise Service Bus)として使います。これはモノリシックからマイクロサービスの構造への移行に役立ちます。イベントストリームを処理するためにKafkaを使用することで、技術チームはほぼリアルタイムのbusiness intelligenceを行うことができます。
LINE は Apache Kafkaを サービス間の通信のための中心のデータハブとして使います。日に無数のメッセージが生成され、様々なビジネスロジック、脅威の検出、インデックスの検索およびデータの分析のために使われます。LINEはKafka Streamを利用してトピックを変換およびフィルタし、サブトピックが効率的に消費できるようにします。その一方で、洗練された最小限のコードベースにより、メンテナンスが容易です。
Pinterest は Apache Kafka と the Kafka Streams を広告インフラストラクチャのリアルタイムで予言的な予算システムに動力を与えるために大規模に使用します。Kafkaストリームを使って消費の予想は以前よりより正確になります。
Rabobankはオランダの最も大きな3つの銀行のうちの1つです。そのデジタルな神経質なシステムであるビジネス イベント バスは、Apache Kafka を使っています。ますます多くの金融プロセスとサービスで使われており、そのうちの1つが Rabo Alerts です。このサービスは会計上のイベント時にリアルタイムで顧客に警告し、Kafka ストリームを使って構築されています
Trivagoはグローバルなホテル検索プラットフォームです。旅行者がホテルを検索して比較する方法を作り直すことに注力していて、一方でwebサイトとアプリによって多くの旅行者へアクセスを提供することでホテルの広告主がビジネスを成長することを可能にします。2017年の時点で、190国以上の約1800万のホテルとその他の居住区へのアクセスとを提供します。社内で開発者が自由にデータにアクセスできるようにKafka, Kafkaコネクト およびKafkaストリームを使います。Kafka ストリームは解析パイプラインの一部分を強化し、手元に持つデータソースの調査と操作を行うための絶え間のないオプションを届けます。
こんにちはKafkaストリーム
以下のコードは柔軟で、高スケーラブル、耐障害性、ステートフル、そして大規模のプロダクションで実行することができるWordCountアプリケーションを実装します
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String textLine) {
return Arrays.asList(textLine.toLowerCase().split("\\W+"));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
import Serdes._
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count()(Materialized.as("counts-store"))
wordCounts.toStream.to("WordsWithCountsTopic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}
}