Upgrade Guide and API Changes
Upgrading from any older version to 2.7.0 is possible: if
upgrading from 2.3 or below, you will need to do two rolling bounces,
where during the first rolling bounce phase you set the config upgrade.from="older version"
(possible values are "0.10.0" - "2.3"
) and during
the second you remove it. This is required to safely upgrade to the new
cooperative rebalancing protocol of the embedded consumer. Note that you
will remain using the old eager
rebalancing protocol if you skip or delay the second rolling
bounce, but you can safely switch over to cooperative at any time once
the entire group is on 2.4+ by removing the config value and bouncing.
For more details please refer to
KIP-429:
- ローリング バウンス用にアプリケーションインスタンスを準備し、設定
upgrade.from
がアップグレード元のバージョンに設定されていることを確認します。 - アプリケーションの各インスタンスを一度バウンスします
- prepare your newly deployed 2.7.0 application instances for
a second round of rolling bounces; make sure to remove the value for
config
upgrade.from
- アップグレードを完了するためにもう一度各アプリケーションのインスタンスをバウンスします
別のやり方として、オフラインの更新も可能です。Upgrading from any versions as old as 0.10.0.x to 2.7.0 in offline mode require the following steps:
- 全ての古い (例えば 0.10.0.x) アプリケーション インスタンスを停止します
- コードを更新し、古いコードとjarファイルを新しいコードと新しいjarファイルと交換します
- restart all new (2.7.0) application instances
Starting in Kafka Streams 2.6.x, a new processing mode "exactly_once_beta"
(configurable via parameter
processing.guarantee
) is available.
To use this new feature, your brokers must be on version 2.5.x or newer.
A switch from "exactly_once"
to "exactly_once_beta"
(or the other way around) is
only possible if the application is on version 2.6.x.
If you want to upgrade your application from an older version and enable this feature,
you first need to upgrade your application to version 2.6.x, staying on "exactly_once"
,
and then do second round of rolling bounces to switch to "exactly_once_beta"
.
For a downgrade, do the reverse: first switch the config from "exactly_once_beta"
to
"exactly_once"
to disable the feature in your 2.6.x application.
Afterward, you can downgrade your application to a pre-2.6.x version.
To run a Kafka Streams application version 2.2.1, 2.3.0, or higher a broker version 0.11.0 or higher is required and the on-disk message format must be 0.11 or higher. Brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 to 2.2.0. Additionally, on-disk message format must be 0.10 or higher to run a Kafka Streams application version 1.0 to 2.2.0. Kafka ストリーム 0.10.0 の場合、ブローカー バージョン 0.10.0 以上が必要です。
Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.
留意すべきもう1つの重要なこと: 非推奨の KStreamBuilder
クラスでは、KTable
がKStreamBuilder.table()
を介して作成された時に、マテリアライズされた状態ストアはソーストピックを復元のために変更ログトピックとして再利用し、ログを無効化してソーストピックへんも新しい更新の追加を回避します; 1.0で導入されたStreamsBuilder
クラスでは、この挙動が誤って変更されました: ソーストピックを復元用の変更ログとして再利用しますが、ソーストピックから更新レコードを追加する別の変更ログトピックも作成します。In the 2.0 release, we have fixed this issue and now users can choose whether or not to reuse the source topic based on the StreamsConfig#TOPOLOGY_OPTIMIZATION
: if you are upgrading from the old KStreamBuilder
class and hence you need to change your code to use the new StreamsBuilder
, you should set this config value to StreamsConfig#OPTIMIZE
to continue reusing the source topic; if you are upgrading from 1.0 or 1.1 where you are already using StreamsBuilder
and hence have already created a separate changelog topic, you should set this config value to StreamsConfig#NO_OPTIMIZATION
when upgrading to 2.7.0 in order to use that changelog topic for restoring the state store. 新しい設定 StreamsConfig#TOPOLOGY_OPTIMIZATION
についての詳細は、KIP-295で見つけることができます。
Streams API changes in 2.7.0
In KeyQueryMetadata
we deprecated getActiveHost()
, getStandbyHosts()
as well as getPartition()
and replaced them with activeHost()
, standbyHosts()
and partition()
respectively.
KeyQueryMetadata
was introduced in Kafka Streams 2.5 release with getter methods having prefix get
.
The intend of this change is to bring the method names to Kafka custom to not use the get
prefix for getter methods.
The old methods are deprecated and is not effected.
(Cf. KIP-648.)
The StreamsConfig
variable for configuration parameter "topology.optimization"
is renamed from TOPOLOGY_OPTIMIZATION
to TOPOLOGY_OPTIMIZATION_CONFIG
.
The old variable is deprecated. Note, that the parameter name itself is not affected.
(Cf. KIP-629.)
The configuration parameter retries
is deprecated in favor of the new parameter task.timeout.ms
.
Kafka Streams' runtime ignores retries
if set, however, it would still forward the parameter
to its internal clients.
We added SlidingWindows
as an option for windowedBy()
windowed aggregations as described in
KIP-450.
Sliding windows are fixed-time and data-aligned windows that allow for flexible and efficient windowed aggregations.
The end-to-end latency metrics introduced in 2.6 have been expanded to include store-level metrics. The new store-level metrics are recorded at the TRACE level, a new metrics recording level. Enabling TRACE level metrics will automatically turn on all higher levels, ie INFO and DEBUG. See KIP-613 for more information.
Streams API changes in 2.6.0
We added a new processing mode that improves application scalability using exactly-once guarantees
(via KIP-447).
You can enable this new feature by setting the configuration parameter processing.guarantee
to the
new value "exactly_once_beta"
.
Note that you need brokers with version 2.5 or newer to use this feature.
For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out), Streams will assign a warmup replica to the target instance so it can begin restoring the state while the active task stays available on an instance that already had the task. The instances warming up tasks will communicate their progress to the group so that, once ready, Streams can move active tasks to their new owners in the background. Check out KIP-441 for full details, including several new configs for control over this new feature.
New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s) and end/terminal node(s) of a task. See KIP-613 for more information.
As of 2.6.0 Kafka Streams deprecates KStream.through()
in favor of the new KStream.repartition()
operator
(as per KIP-221).
KStream.repartition()
is similar to KStream.through()
, however Kafka Streams will manage the topic for you.
If you need to write into and read back from a topic that you mange, you can fall back to use KStream.to()
in combination with StreamsBuilder#stream()
.
Please refer to the developer guide for more details about KStream.repartition()
.
The usability of StateStore
s within the Processor API is improved: ProcessorSupplier
and TransformerSupplier
now extend ConnectedStoreProvider
as per KIP-401,
enabling a user to provide StateStore
s with alongside Processor/Transformer logic so that they are automatically
added and connected to the processor.
We added a --force
option in StreamsResetter to force remove left-over members on broker side when long session time out was configured
as per KIP-571.
We added Suppressed.withLoggingDisabled()
and Suppressed.withLoggingEnabled(config)
methods to allow disabling or configuring of the changelog topic and allows for configuration of the changelog topic
as per KIP-446.
Streams API changes in 2.5.0
We add a new cogroup()
operator (via KIP-150)
that allows to aggregate multiple streams in a single operation.
Cogrouped streams can also be windowed before they are aggregated.
Please refer to the developer guide for more details.
We added a new KStream.toTable()
API to translate an input event stream into a changelog stream as per
KIP-523.
We added a new Serde type Void
in KIP-527 to represent
null keys or null values from input topic.
Deprecated UsePreviousTimeOnInvalidTimestamp
and replaced it with UsePartitionTimeOnInvalidTimeStamp
as per
KIP-530.
Deprecated KafkaStreams.store(String, QueryableStoreType)
and replaced it with KafkaStreams.store(StoreQueryParameters)
to allow querying
for a store with variety of parameters, including querying a specific task and stale stores, as per
KIP-562 and
KIP-535 respectively.
Streams API changes in 2.4.0
As of 2.4.0 Kafka Streams offers a KTable-KTable foreign-key join (as per KIP-213). This joiner allows for records to be joined between two KTables with different keys. Both INNER and LEFT foreign-key joins are supported.
In the 2.4 release, you now can name all operators in a Kafka Streams DSL topology via
KIP-307.
Giving your operators meaningful names makes it easier to understand the topology
description (Topology#describe()#toString()
) and
understand the full context of what your Kafka Streams application is doing.
There are new overloads on most KStream
and KTable
methods
that accept a Named
object. Typically you'll provide a name for the DSL operation by
using Named.as("my operator name")
. Naming of repartition topics for aggregation
operations will still use Grouped
and join operations will use
either Joined
or the new StreamJoined
object.
Before the 2.4.0 version of Kafka Streams, users of the DSL
could not name the state stores involved in a stream-stream join.
If users changed their topology and added a operator before the
join, the internal names of the state stores would shift,
requiring an application reset when redeploying.
In the 2.4.0 release, Kafka Streams adds the StreamJoined
class, which gives users the ability to name the join processor, repartition topic(s) (if a repartition is required),
and the state stores involved in the join. Also, by naming the state stores, the changelog topics
backing the state stores are named as well. It's important to note that naming the stores
will not make them queryable via Interactive Queries.
Another feature delivered by StreamJoined
is that you can now configure the type of state store used in the join.
You can elect to use in-memory stores or custom state stores for a stream-stream join. Note that the provided stores
will not be available for querying via Interactive Queries. With the addition
of StreamJoined
, stream-stream join operations
using Joined
have been deprecated. Please switch over to stream-stream join methods using the
new overloaded methods. You can get more details from
KIP-479.
With the introduction of incremental cooperative rebalancing,
Streams no longer requires all tasks be revoked at the beginning of a
rebalance. Instead, at the completion of the rebalance only those tasks
which are to be migrated to another consumer
for overall load balance will need to be closed and revoked.
This changes the semantics of the StateListener
a bit, as it will not necessarily transition to REBALANCING
at the beginning of a rebalance anymore. Note that
this means IQ will now be available at all times except during
state restoration, including while a rebalance is in progress. If
restoration is occurring when a rebalance begins, we will continue to
actively restore the state stores and/or process
standby tasks during a cooperative rebalance. Note that with
this new rebalancing protocol, you may sometimes see a rebalance be
followed by a second short rebalance that ensures all tasks are safely
distributed. For details on please see
KIP-429.
The 2.4.0 release contains newly added and reworked metrics.
KIP-444
adds new client level (i.e., KafkaStreams
instance level) metrics to the existing
thread-level, task-level, and processor-/state-store-level metrics.
For a full list of available client level metrics, see the
KafkaStreams monitoring
section in the operations guide.
Furthermore, RocksDB metrics are exposed via
KIP-471.
For a full list of available RocksDB metrics, see the
RocksDB monitoring
section in the operations guide.
Kafka Streams test-utils
got improved via
KIP-470
to simplify the process of using TopologyTestDriver
to test your application code.
We deprecated ConsumerRecordFactory
, TopologyTestDriver#pipeInput()
,
OutputVerifier
, as well as TopologyTestDriver#readOutput()
and replace them with
TestInputTopic
and TestOutputTopic
, respectively.
We also introduced a new class TestRecord
that simplifies assertion code.
For full details see the
Testing section in the developer guide.
In 2.4.0, we deprecated WindowStore#put(K key, V value)
that should never be used.
Instead the existing WindowStore#put(K key, V value, long windowStartTimestamp)
should be used
(KIP-474).
Furthermore, the PartitionGrouper
interface and its corresponding configuration parameter
partition.grouper
were deprecated
(KIP-528)
and will be removed in the next major release (KAFKA-7785.
Hence, this feature won't be supported in the future any longer and you need to updated your code accordingly.
If you use a custom PartitionGrouper
and stop to use it, the created tasks might change.
Hence, you will need to reset your application to upgrade it.
2.3.0でのストリーム APIの変更
バージョン 2.3.0 は、kafka-streams-scala
Ktable API にSuppress演算子を追加します。
2.3.0 の時点で、ストリームはRocksDBに基づいた永続的なものに加えて、ウィンドウのメモリ内のバージョン(KIP-428) およびセッション (KIP-445) ストアを提供します。新しい public インタフェース inMemoryWindowStore()
と inMemorySessionStore()
が Stores
に追加され、組み込みのインメモリ ウィンドウとセッションストアが追加されます。
2.3.0 の時点で、最適化を有効にする方法を更新しました。Now to enable optimizations, you need to do two things. First add this line to your properties properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
, as you have done before. 次に、KafkaStreams
インスタンスを構築する時に、オーバーロードされたStreamsBuilder.build(Properties)
メソッドを使ってトポロジを構築する時に、構成プロパティを渡す必要があります。For example KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)
.
2.3.0 では、Serializer
, Deserializer
および Serde
のために、それらがlambda表現で実装できるように close()
とconfigure()
にデフォルトの実装を追加しました。詳細については、KIP-331を読んでください。
演算子のセマンティクスを改善するために、key-value ペアあるいはウィンドウごとに追加のタイムスタンプを格納することができる新しいストア型が追加されます。一部の DSL 演算子 (例えば、KTables) はこれらの新しいストアを使っています。従って、QueryableStoreType
としてTimestampedKeyValueStoreType
あるいは TimestampedWindowStoreType
を指定した場合、Interactive Queries を介して最後の更新のタイムスタンプを取得することができます。これらの変更は主に透過的ですが、コードの変更が必要となるめったに発生しない厄介なケースがあります: 警告: 型無しのストアを受け取りキャストを使うと、正しい型にキャストするためにコードを更新する必要があるかもしれません。そうしなければ、ストアから値を取得する時に、java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class YOUR-VALUE-TYPE
のような例外が発生する場合があります。 更に、TopologyTestDriver#getStateStore()
は組み込みでは無いストアのみを返し、組み込みのストアがアクセスされた場合に例外を投げます。詳細については、KIP-258を読んでください。
型の安全性を改善するために、新しい演算子 KStream#flatTransformValues
が追加されました。詳細については、KIP-313を読んでください。
Kafka ストリームは設定パラメータ max.poll.interval.ms
を Integer.MAX_VALUE
に設定していました。このデフォルト値は削除され、今ではKafkaストリームはコンシューマのデフォルト値を使います。詳細については、KIP-442を読んでください。
再パーティション トピックのためのデフォルトの設定が変更されました: インデックスファイルのためのセグメントサイズ(segment.index.bytes
)はもう 50MBではなく、クラスタのデフォルトを使います。同様に、設定segment.ms
はもう10msではなく、クラスタのデフォルトを使います。最後に、保持期間 (retention.ms
) が Long.MAX_VALUE
から -1
(制限無し) に変更されました。詳細については、KIP-443を読んでください。
メモリリークを避けるために、RocksDBConfigSetter
はシャットダウン時に呼ばれる新しい close()
メソッドを持ちます。ユーザはRocksDB設定オブジェクトで使われる全てのメモリを、それらのオブジェクトを閉じることで解放するために、このメソッドを実装しなければなりません。詳細については、KIP-453 を読んでください。
RocksDB 依存関係はバージョン 5.18.3
に更新されました。The new version allows to specify more RocksDB configurations, including WriteBufferManager
which helps to limit RocksDB off-heap memory usage. 詳細については、KAFKA-8215を読んでください。
Notable changes in Kafka Streams 2.2.1
As of Kafka Streams 2.2.1 a message format 0.11 or higher is required; this implies that brokers must be on version 0.11.0 or higher.
2.2.0でのストリーム APIの変更
We've simplified the KafkaStreams#state
transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from CREATED
to RUNNING
, and then to REBALANCING
to get the first
stream task assignment, and then back to RUNNING
; starting in 2.2.0 it will transit from CREATED
directly to REBALANCING
and then to RUNNING
.
If you have registered a StateListener
that
captures state transition events, you may need to adjust your listener
implementation accordingly for this simplification (in practice, your
listener logic should be very unlikely to be affected at all).
In WindowedSerdes
, we've added a new static constructor to return a TimeWindowSerde
with configurable window size. This is to help users to construct time
window serdes to read directly from a time-windowed store's changelog.
More details can be found in KIP-393.
In 2.2.0 we have extended a few public interfaces including KafkaStreams
to extend AutoCloseable
so that they can be
used in a try-with-resource statement. For a full list of public interfaces that get impacted please read KIP-376.
Streams API changes in 2.1.0
We updated TopologyDescription
API to allow for better runtime checking.
Users are encouraged to use #topicSet()
and #topicPattern()
accordingly on TopologyDescription.Source
nodes,
instead of using #topics()
, which has since been deprecated. Similarly, use #topic()
and #topicNameExtractor()
to get descriptions of TopologyDescription.Sink
nodes. For more details, see
KIP-321.
We've added a new class Grouped
and deprecated Serialized
. The intent of adding Grouped
is the ability to
name repartition topics created when performing aggregation
operations. Users can name the potential repartition topic using the
Grouped#as()
method which takes a String
and is used as part of the repartition topic name. The resulting repartition
topic name will still follow the pattern of ${application-id}->name<-repartition
. The Grouped
class is now favored over
Serialized
in KStream#groupByKey()
, KStream#groupBy()
, and KTable#groupBy()
.
Note that Kafka Streams does not automatically create repartition topics for aggregation operations.
Additionally, we've updated the Joined
class with a new method Joined#withName
enabling users to name any repartition topics required for
performing Stream/Stream or Stream/Table join. For more details
repartition
topic naming, see KIP-372.
As a result we've updated the Kafka Streams Scala API and removed the Serialized
class in favor of adding Grouped
.
If you just rely on the implicit Serialized
, you just need to recompile; if you pass in Serialized
explicitly, sorry you'll have to make code changes.
We've added a new config named max.task.idle.ms
to allow users specify how to handle out-of-order data within a task that may be processing multiple
topic-partitions (see Out-of-Order Handling section for more details).
The default value is set to 0
, to favor minimized
latency over synchronization between multiple input streams from
topic-partitions.
If users would like to wait for longer time when some of the
topic-partitions do not have data available to process and hence cannot
determine its corresponding stream time,
they can override this config to a larger value.
We've added the missing SessionBytesStoreSupplier#retentionPeriod()
to be consistent with the WindowBytesStoreSupplier
which allows users to get the specified retention period for session-windowed stores.
We've also added the missing StoreBuilder#withCachingDisabled()
to allow users to turn off caching for their customized stores.
We added a new serde for UUIDs (Serdes.UUIDSerde
) that you can use via Serdes.UUID()
(cf. KIP-206).
We updated a list of methods that take long
arguments as either timestamp (fix point) or duration (time period)
and replaced them with Instant
and Duration
parameters for improved semantics.
Some old methods base on long
are deprecated and users are encouraged to update their code.
In particular, aggregation windows (hopping/tumbling/unlimited
time windows and session windows) as well as join windows now take Duration
arguments to specify window size, hop, and gap parameters.
Also, window sizes and retention times are now specified as Duration
type in Stores
class.
The Window
class has new methods #startTime()
and #endTime()
that return window start/end timestamp as Instant
.
For interactive queries, there are new #fetch(...)
overloads taking Instant
arguments.
Additionally, punctuations are now registerd via ProcessorContext#schedule(Duration interval, ...)
.
For more details, see KIP-358.
We deprecated KafkaStreams#close(...)
and replaced it with KafkaStreams#close(Duration)
that accepts a single timeout argument
Note: the new #close(Duration)
method has improved (but slightly different) semantics.
For more details, see KIP-358.
The newly exposed AdminClient
metrics are now available when calling the KafkaStream#metrics()
method.
For more details on exposing AdminClients
metrics
see KIP-324
We deprecated the notion of segments in window stores as those are intended to be an implementation details.
Thus, method Windows#segments()
and variable Windows#segments
were deprecated.
If you implement custom windows, you should update your code accordingly.
Similarly, WindowBytesStoreSupplier#segments()
was deprecated and replaced with WindowBytesStoreSupplier#segmentInterval()
.
If you implement custom window store, you need to update your code accordingly.
Finally, Stores#persistentWindowStore(...)
were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer.
For more details, see KIP-319
(note: KIP-328 and
KIP-358 'overlap' with KIP-319).
We've added an overloaded StreamsBuilder#build
method that accepts an instance of java.util.Properties
with the intent of using the
StreamsConfig#TOPOLOGY_OPTIMIZATION
config added in Kafka Streams 2.0. Before 2.1, when building a topology with
the DSL, Kafka Streams writes the physical plan as the user makes calls on the DSL. Now by providing a java.util.Properties
instance when
executing a StreamsBuilder#build
call, Kafka Streams can optimize the physical plan of the topology, provided the StreamsConfig#TOPOLOGY_OPTIMIZATION
config is set to StreamsConfig#OPTIMIZE
. By setting StreamsConfig#OPTIMIZE
in addition to the KTable
optimization of
reusing the source topic as the changelog topic, the topology
may be optimized to merge redundant repartition topics into one
repartition topic. The original no parameter version of StreamsBuilder#build
is still available for those who wish to not
optimize their topology. Note that enabling optimization of the
topology may require you to do an application reset when redeploying
the application. For more
details, see KIP-312
We are introducing static membership towards Kafka Streams user. この機能は通常のアップリケーションのアップグレードあるいはローリング バウンスの間の不必要なリバランスを減らします。使い方の詳細については、静的なメンバーシップの設計を調べてください。Note, Kafka Streams uses the same ConsumerConfig#GROUP_INSTANCE_ID_CONFIG
, and you only need to make sure it is uniquely defined across different stream instances in one application.
Streams API changes in 2.0.0
In 2.0.0 we have added a few new APIs on the ReadOnlyWindowStore
interface (for details please read Streams API changes below).
If you have customized window store implementations that extends the ReadOnlyWindowStore
interface you need to make code changes.
In addition, if you using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. Hot-swapping the jar-file only might not work for this case. See below a complete list of 2.0.0 API and semantic changes that allow you to advance your application and/or simplify your code base.
We moved Consumed
interface from org.apache.kafka.streams
to org.apache.kafka.streams.kstream
as it was mistakenly placed in the previous release. If your
code has already used it there is a simple one-liner change needed in
your import statement.
We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0. See below for a detailed list of removed APIs.
We have removed the skippedDueToDeserializationError-rate
and skippedDueToDeserializationError-total
metrics.
Deserialization errors, and all other causes of record skipping, are now accounted for in the pre-existing metrics
skipped-records-rate
and skipped-records-total
. When a record is skipped, the event is
now logged at WARN level. If these warnings become burdensome, we recommend explicitly filtering out unprocessable
records instead of depending on record skipping semantics. For more details, see
KIP-274.
As of right now, the potential causes of skipped records are:
null
keys in table sourcesnull
keys in table-table inner/left/outer/right joinsnull
keys or values in stream-table joinsnull
keys or values in stream-stream joinsnull
keys or values in aggregations on grouped streamsnull
keys or values in reductions on grouped streamsnull
keys in aggregations on windowed streamsnull
keys in reductions on windowed streamsnull
keys in aggregations on session-windowed streams-
Errors producing results, when the configured
default.production.exception.handler
decides toCONTINUE
(the default is toFAIL
and throw an exception). -
Errors deserializing records, when the configured
default.deserialization.exception.handler
decides toCONTINUE
(the default is toFAIL
and throw an exception). This was the case previously captured in theskippedDueToDeserializationError
metrics. - Fetched records having a negative timestamp.
We've also fixed the metrics name for time and session windowed
store operations in 2.0. As a result, our current built-in stores
will have their store types in the metric names as in-memory-state
, in-memory-lru-state
,
rocksdb-state
, rocksdb-window-state
, and rocksdb-session-state
. For example, a RocksDB time windowed store's
put operation metrics would now be
kafka.streams:type=stream-rocksdb-window-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),rocksdb-window-state-id=([-.\w]+)
.
Users need to update their metrics collecting and reporting
systems for their time and session windowed stores accordingly.
For more details, please read the State Store Metrics section.
We have added support for methods in ReadOnlyWindowStore
which allows for querying a single window's key-value pair.
For users who have customized window store implementations on
the above interface, they'd need to update their code to implement the
newly added method as well.
For more details, see KIP-261.
We have added public WindowedSerdes
to allow users to read from / write to a topic storing windowed table changelogs directly.
In addition, in StreamsConfig
we have also added default.windowed.key.serde.inner
and default.windowed.value.serde.inner
to let users specify inner serdes if the default serde classes are windowed serdes.
For more details, see KIP-265.
We've added message header support in the Processor API
in Kafka 2.0.0. In particular, we have added a new API ProcessorContext#headers()
which returns a Headers
object that keeps track of
the headers of the source topic's message that is being processed.
Through this object, users can manipulate
the headers map that is being propagated throughout the
processor topology as well. For more details please feel free to read
the Developer Guide section.
We have deprecated constructors of KafkaStreams
that take a StreamsConfig
as parameter.
Please use the other corresponding constructors that accept java.util.Properties
instead.
For more details, see KIP-245.
Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor API (KIP-251).
To enable this new feature, ProcessorContext#forward(...)
was modified.
The two existing overloads #forward(Object key, Object value, String childName)
and #forward(Object key, Object value, int childIndex)
were deprecated and a new overload #forward(Object key, Object value, To to)
was added.
The new class To
allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
Forwarding based on child index is not supported in the new API any longer.
We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level Topology#addSink
and higher-level KStream#to
APIs, we have added variants that
take a TopicNameExtractor
instance instead of a specific String
typed topic name, such that for each received record from the upstream
processor, the library will dynamically determine which Kafka topic to
write to
based on the record's key and value, as well as record context.
Note that all the Kafka topics that may possibly be used are still
considered as user topics and hence required to be pre-created. In
addition to that, we have modified the
StreamPartitioner
interface to add the topic name
parameter since the topic name now may not be known beforehand; users
who have customized implementations of this interface would need to
update their code while upgrading their application
to use Kafka Streams 2.0.0.
KIP-284 changed the retention time for repartition topics by setting its default value to Long.MAX_VALUE
.
Instead of relying on data retention Kafka Streams uses the new
purge data API to delete consumed data from those topics and to keep
used storage small now.
We have modified the ProcessorStateManger#register(...)
signature and removed the deprecated loggingEnabled
boolean parameter as it is specified in the StoreBuilder
.
Users who used this function to register their state stores into
the processor topology need to simply update their code and remove this
parameter from the caller.
Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection types, a way to implicitly provide SerDes to reduce boilerplate from your application and make it more typesafe, and more! For more information see the Kafka Streams DSL for Scala documentation and KIP-270.
We have removed these deprecated APIs:
KafkaStreams#toString
no longer returns the topology and runtime metadata; to get topology metadata users can callTopology#describe()
and to get thread runtime metadata users can callKafkaStreams#localThreadsMetadata
(they are deprecated since 1.0.0). For detailed guidance on how to update your code please read hereTopologyBuilder
andKStreamBuilder
are removed and replaced byTopology
andStreamsBuidler
respectively (they are deprecated since 1.0.0). For detailed guidance on how to update your code please read hereStateStoreSupplier
are removed and replaced withStoreBuilder
(they are deprecated since 1.0.0); and the correspondingStores#create
andKStream, KTable, KGroupedStream
overloaded functions that use it have also been removed. For detailed guidance on how to update your code please read hereKStream, KTable, KGroupedStream
overloaded functions that requires serde and other specifications explicitly are removed and replaced with simpler overloaded functions that useConsumed, Produced, Serialized, Materialized, Joined
(they are deprecated since 1.0.0). For detailed guidance on how to update your code please read hereProcessor#punctuate
,ValueTransformer#punctuate
,ValueTransformer#punctuate
andProcessorContext#schedule(long)
are removed and replaced byProcessorContext#schedule(long, PunctuationType, Punctuator)
(they are deprecated in 1.0.0).- The second
boolean
typed parameter "loggingEnabled" inProcessorContext#register
has been removed; users can now useStoreBuilder#withLoggingEnabled, withLoggingDisabled
to specify the behavior when they create the state store. KTable#writeAs, print, foreach, to, through
are removed, users can callKTable#tostream()#writeAs
instead for the same purpose (they are deprecated since 0.11.0.0). For detailed list of removed APIs please read hereStreamsConfig#KEY_SERDE_CLASS_CONFIG, VALUE_SERDE_CLASS_CONFIG, TIMESTAMP_EXTRACTOR_CLASS_CONFIG
are removed and replaced withStreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG, DEFAULT_VALUE_SERDE_CLASS_CONFIG, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
respectively (they are deprecated since 0.11.0.0).StreamsConfig#ZOOKEEPER_CONNECT_CONFIG
are removed as we do not need ZooKeeper dependency in Streams any more (it is deprecated since 0.10.2.0).
Streams API changes in 1.1.0
We have added support for methods in ReadOnlyWindowStore
which allows for querying WindowStore
s
without the necessity of providing keys.
For users who have customized window store implementations on
the above interface, they'd need to update their code to implement the
newly added method as well.
For more details, see KIP-205.
There is a new artifact kafka-streams-test-utils
providing a TopologyTestDriver
, ConsumerRecordFactory
, and OutputVerifier
class.
You can include the new artifact as a regular dependency to your
unit tests and use the test driver to test your business logic of your
Kafka Streams application.
For more details, see KIP-247.
The introduction of KIP-220
enables you to provide configuration parameters for the embedded
admin client created by Kafka Streams, similar to the embedded producer
and consumer clients.
You can provide the configs via StreamsConfig
by adding the configs with the prefix admin.
as defined by StreamsConfig#adminClientPrefix(String)
to distinguish them from configurations of other clients that share the same config names.
New method in KTable
-
transformValues
methods have been added toKTable
. Similar to those onKStream
, these methods allow for richer, stateful, value transformation similar to the Processor API.
New method in GlobalKTable
- A method has been provided such that it will return the store name associated with the
GlobalKTable
ornull
if the store name is non-queryable.
New methods in KafkaStreams
:
- added overload for the constructor that allows overriding the
Time
object used for tracking system wall-clock time; this is useful for unit testing your application code.
New methods in KafkaClientSupplier
:
- added
getAdminClient(config)
that allows to override anAdminClient
used for administrative requests such as internal topic creations, etc.
New error handling for exceptions during production:
- added interface
ProductionExceptionHandler
that allows implementors to decide whether or not Streams shouldFAIL
orCONTINUE
when certain exception occur while trying to produce. - provided an implementation,
DefaultProductionExceptionHandler
that always fails, preserving the existing behavior by default. - changing which implementation is used can be done by settings
default.production.exception.handler
to the fully qualified name of a class implementing this interface.
Changes in StreamsResetter
:
- added options to specify input topics offsets to reset according to KIP-171
Streams API changes in 1.0.0
With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use.
This change includes the five main classes KafkaStreams
, KStreamBuilder
,
KStream
, KTable
, and TopologyBuilder
(and some more others).
All changes are fully backward compatible as old API is only deprecated but not removed.
We recommend to move to the new API as soon as you can.
We will summarize all API changes in the next paragraphs.
The two main classes to specify a topology via the DSL (KStreamBuilder
)
or the Processor API (TopologyBuilder
) were deprecated and replaced by
StreamsBuilder
and Topology
(both new classes are located in
package org.apache.kafka.streams
).
Note, that StreamsBuilder
does not extend Topology
, i.e.,
the class hierarchy is different now.
The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API.
However, some internal methods that were public in KStreamBuilder
and TopologyBuilder
but not part of the actual API are not present
in the new classes any longer.
Furthermore, some overloads were simplified compared to the original classes.
See KIP-120
and KIP-182
for full details.
Changing how a topology is specified also affects KafkaStreams
constructors,
that now only accept a Topology
.
Using the DSL builder class StreamsBuilder
one can get the constructed
Topology
via StreamsBuilder#build()
.
Additionally, a new class org.apache.kafka.streams.TopologyDescription
(and some more dependent classes) were added.
Those can be used to get a detailed description of the specified topology
and can be obtained by calling Topology#describe()
.
An example using this new API is shown in the quickstart section.
New methods in KStream
:
- With the introduction of KIP-202
a new method
merge()
has been created inKStream
as the StreamsBuilder class'sStreamsBuilder#merge()
has been removed. The method signature was also changed, too: instead of providing multipleKStream
s into the method at the once, only a singleKStream
is accepted.
New methods in KafkaStreams
:
- retrieve the current runtime information about the local threads via
localThreadsMetadata()
- observe the restoration of all state stores via
setGlobalStateRestoreListener()
, in which users can provide their customized implementation of theorg.apache.kafka.streams.processor.StateRestoreListener
interface
Deprecated / modified methods in KafkaStreams
:
-
toString()
,toString(final String indent)
were previously used to return static and runtime information. They have been deprecated in favor of using the new classes/methodslocalThreadsMetadata()
/ThreadMetadata
(returning runtime information) andTopologyDescription
/Topology#describe()
(returning static information). -
With the introduction of KIP-182
you should no longer pass in
Serde
toKStream#print
operations. If you can't rely on usingtoString
to print your keys an values, you should instead you provide a customKeyValueMapper
via thePrinted#withKeyValueMapper
call. -
setStateListener()
now can only be set before the application start running, i.e. beforeKafkaStreams.start()
is called.
Deprecated methods in KGroupedStream
-
Windowed aggregations have been deprecated from
KGroupedStream
and moved toWindowedKStream
. You can now perform a windowed aggregation by, for example, usingKGroupedStream#windowedBy(Windows)#reduce(Reducer)
.
Modified methods in Processor
:
-
The Processor API was extended to allow users to schedule
punctuate
functions either based on data-driven stream time or wall-clock time. As a result, the originalProcessorContext#schedule
is deprecated with a new overloaded function that accepts a user customizablePunctuator
callback interface, which triggers itspunctuate
API method periodically based on thePunctuationType
. ThePunctuationType
determines what notion of time is used for the punctuation scheduling: either stream time or wall-clock time (by default, stream time is configured to represent event time viaTimestampExtractor
). In addition, thepunctuate
function insideProcessor
is also deprecated.Before this, users could only schedule based on stream time (i.e.
PunctuationType.STREAM_TIME
) and hence thepunctuate
function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data. If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered. On the other hand, When wall-clock time (i.e.PunctuationType.WALL_CLOCK_TIME
) is used,punctuate
will be triggered purely based on wall-clock time. So for example if thePunctuator
function is scheduled based onPunctuationType.WALL_CLOCK_TIME
, if these 60 records were processed within 20 seconds,punctuate
would be called 2 times (one time every 10 seconds); if these 60 records were processed within 5 seconds, then nopunctuate
would be called at all. Users can schedule multiplePunctuator
callbacks with differentPunctuationType
s within the same processor by simply callingProcessorContext#schedule
multiple times inside processor'sinit()
method.
If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed: The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy. As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0. Detailed metrics sensor can be found in the Streams Monitoring section.
The introduction of KIP-161
enables you to provide a default exception handler for
deserialization errors when reading data from Kafka rather than throwing
the exception all the way out of your streams application.
You can provide the configs via the StreamsConfig
as StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
.
The specified handler must implement the org.apache.kafka.streams.errors.DeserializationExceptionHandler
interface.
The introduction of KIP-173
enables you to provide topic configuration parameters for any topics created by Kafka Streams.
This includes repartition and changelog topics.
You can provide the configs via the StreamsConfig
by adding the configs with the prefix as defined by StreamsConfig#topicPrefix(String)
.
Any properties in the StreamsConfig
with the prefix will be applied when creating internal topics.
Any configs that aren't topic configs will be ignored.
If you already use StateStoreSupplier
or Materialized
to provide configs for changelogs, then they will take precedence over those supplied in the config.
Streams API changes in 0.11.0.0
Updates in StreamsConfig
:
- new configuration parameter
processing.guarantee
is added - configuration parameter
key.serde
was deprecated and replaced bydefault.key.serde
- configuration parameter
value.serde
was deprecated and replaced bydefault.value.serde
- configuration parameter
timestamp.extractor
was deprecated and replaced bydefault.timestamp.extractor
- method
keySerde()
was deprecated and replaced bydefaultKeySerde()
- method
valueSerde()
was deprecated and replaced bydefaultValueSerde()
- new method
defaultTimestampExtractor()
was added
New methods in TopologyBuilder
:
- added overloads for
addSource()
that allow to define aTimestampExtractor
per source node - added overloads for
addGlobalStore()
that allow to define aTimestampExtractor
per source node associated with the global store
New methods in KStreamBuilder
:
- added overloads for
stream()
that allow to define aTimestampExtractor
per input stream - added overloads for
table()
that allow to define aTimestampExtractor
per input table - added overloads for
globalKTable()
that allow to define aTimestampExtractor
per global table
Deprecated methods in KTable
:
-
void foreach(final ForeachAction super K, ? super V> action)
-
void print()
-
void print(final String streamName)
-
void print(final Serde<K> keySerde, final Serde<V> valSerde)
-
void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName)
-
void writeAsText(final String filePath)
-
void writeAsText(final String filePath, final String streamName)
-
void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)
-
void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)
The above methods have been deprecated in favor of using the Interactive Queries API. If you want to query the current content of the state store backing the KTable, use the following approach:
- Make a call to
KafkaStreams.store(final String storeName, final QueryableStoreType<T> queryableStoreType)
- Then make a call to
ReadOnlyKeyValueStore.all()
to iterate over the keys of aKTable
.
If you want to view the changelog stream of the KTable
then you could call KTable.toStream().print(Printed.toSysOut)
.
Metrics using exactly-once semantics:
If "exactly_once"
processing is enabled via the processing.guarantee
parameter,
internally Streams switches from a producer-per-thread to a producer-per-task runtime model.
Using "exactly_once_beta"
does use a producer-per-thread, so client.id
doesn't change,
compared with "at_least_once"
for this case).
In order to distinguish the different producers, the producer's client.id
additionally encodes the task-ID for this case.
Because the producer's client.id
is used to report JMX metrics, it might be required to update tools that receive those metrics.
Producer's client.id
naming schema:
- at-least-once (default):
[client.Id]-StreamThread-[sequence-number]
- exactly-once:
[client.Id]-StreamThread-[sequence-number]-[taskId]
- exactly-once-beta:
[client.Id]-StreamThread-[sequence-number]
[client.Id]
is either set via Streams configuration parameter client.id
or defaults to [application.id]-[processId]
([processId]
is a random UUID).
0.10.2.1 での主要な変更
Parameter updates in StreamsConfig
:
- The default config values of embedded producer's
retries
and consumer'smax.poll.interval.ms
have been changed to improve the resiliency of a Kafka Streams application
Streams API changes in 0.10.2.0
New methods in KafkaStreams
:
- set a listener to react on application state change via
setStateListener(StateListener listener)
- retrieve the current application state via
state()
- retrieve the global metrics registry via
metrics()
- apply a timeout when closing an application via
close(long timeout, TimeUnit timeUnit)
- specify a custom indent when retrieving Kafka Streams information via
toString(String indent)
Parameter updates in StreamsConfig
:
- parameter
zookeeper.connect
was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol (cf. KIP-4, Section "Topic Admin Schema") - added many new parameters for metrics, security, and client configurations
Changes in StreamsMetrics
interface:
- removed methods:
addLatencySensor()
- added methods:
addLatencyAndThroughputSensor()
,addThroughputSensor()
,recordThroughput()
,addSensor()
,removeSensor()
New methods in TopologyBuilder
:
- added overloads for
addSource()
that allow to define aauto.offset.reset
policy per source node - added methods
addGlobalStore()
to add globalStateStore
s
New methods in KStreamBuilder
:
- added overloads for
stream()
andtable()
that allow to define aauto.offset.reset
policy per input stream/table - added method
globalKTable()
to create aGlobalKTable
New joins for KStream
:
- added overloads for
join()
to join withKTable
- added overloads for
join()
andleftJoin()
to join withGlobalKTable
- note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x (cf. Kafka Streams Join Semantics in the Apache Kafka wiki)
Aligned null
-key handling for KTable
joins:
- like all other KTable operations,
KTable-KTable
joins do not throw an exception onnull
key records anymore, but drop those records silently
New window type Session Windows:
- added class
SessionWindows
to specify session windows - added overloads for
KGroupedStream
methodscount()
,reduce()
, andaggregate()
to allow session window aggregations
Changes to TimestampExtractor
:
- method
extract()
has a second parameter now - new default timestamp extractor class
FailOnInvalidTimestamp
(it gives the same behavior as old (and removed) default extractorConsumerRecordTimestampExtractor
) - new alternative timestamp extractor classes
LogAndSkipOnInvalidTimestamp
andUsePreviousTimeOnInvalidTimestamps
Relaxed type constraints of many DSL interfaces, classes, and methods (cf. KIP-100).
Streams API changes in 0.10.1.0
Stream grouping and aggregation split into two methods:
- old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey()
- new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count()
- Example: stream.countByKey() changes to stream.groupByKey().count()
Auto Repartitioning:
- a call to through() after a key-changing operator and before an aggregation/join is no longer required
- Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count()
TopologyBuilder:
- methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups()
DSL: new parameter to specify state store names:
- The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName)
- KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName)
- KTable#through(String topic) changes to #through(String topic, String storeName)
- KGroupedStream #aggregate(), #reduce(), and #count() require additional parameter "String storeName"
- Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName")
Windowing:
- Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names)
- JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000)