Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.
This section gives a description of the basic transformations, the effective physical partitioning after applying those as well as insights into Flink’s operator chaining.
変換 | 解説 |
---|---|
Map DataStream → DataStream |
一つの要素を取り、一つの要素を生成します。入力ストリームの値を2倍にするmap関数:
|
FlatMap DataStream → DataStream |
一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。文を単語に分割するflatmap関数
|
Filter DataStream → DataStream |
各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。ゼロの値を取り除くフィルタ:
|
KeyBy DataStream → KeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. 内部的には、これはハッシュパーティショニングを使って実装されています。キーを指定する方法についてはkeysを見てください。この変換は KeyedStream を返します。
注意 もし以下であれば、タイプはキーになりえません:
|
Reduce KeyedStream → DataStream |
キー付けされたデータストリーム上での "rolling" reduce。最後に削減された値を使って現在の要素を組み合わせ、新しい値を発行します。
|
Fold KeyedStream → DataStream |
初期値を持つキー付けされたデータストリーム上での "rolling" fold。最後に組み合わされた値を使って現在の要素を組み合わせ、新しい値を発行します。
数列(1,2,3,4,5)に適用された場合、fold関数は数列"start-1", "start-1-2", "start-1-2-3", ... を発行します。
|
Aggregations KeyedStream → DataStream |
キー付けされたデータストリーム上での Rolling aggregation。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。
|
Window KeyedStream → WindowedStream |
ウィンドウは既にパーティションされたKeyedStream上で定義することができます。ウィンドウは各キー内のデータをいくつかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。
|
WindowAll DataStream → AllWindowedStream |
ウィンドウは一般的なデータストリーム上で定義することができます。ウィンドウはストリームイベント全てを幾つかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。 警告: これは多くの場合において非並行 変換です。全てのレコードはwindowAllオペレータのために1つのタスク内に集められます。
|
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
全体として、ウィンドウに一般的な関数を適用します。以下は手動でウィンドウの要素を合計する関数です。 注意: windowAll変換を使っている場合、代わりにAllWindowFunctionを使う必要があります。
|
Window Reduce WindowedStream → DataStream |
ウィンドウに実用的なreduce関数を適用し、reduceされた値を返します。
|
Window Fold WindowedStream → DataStream |
ウィンドウに実用的なfold関数を適用し、foldされた値を返します。数列 (1,2,3,4,5) に適用された場合、例の関数は数列を文字列 "start-1-2-3-4-5"に畳み込みます:
|
ウィンドウ上の集約 WindowedStream → DataStream |
ウィンドウの内容を集約します。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。
|
Union DataStream* → DataStream |
全てのストリームからの全ての要素を含んでいる新しいストリームを生成している1つ以上のストリームをunionする。注意: データストリームを自分自身とunionする場合、結果のストリーム内で各要素を2回取得するでしょう。
|
ウィンドウ Join DataStream,DataStream → DataStream |
指定されたキーと共通のウィンドウ上で二つのデータストリームをjoinする。
|
ウィンドウ CoGroup DataStream,DataStream → DataStream |
指定されたキーと共通のウィンドウ上で二つのデータストリームをcogoroupする。
|
Connect DataStream,DataStream → ConnectedStreams |
型を維持したまま二つのデータストリームを"connects"する。二つのストリーム間の共有された状態を考慮して接続する
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
接続されたデータストリーム上のmapとflatMapに似ている
|
Split DataStream → SplitStream |
ストリームを幾つかの条件に応じて2つ以上のストリームに分割する
|
Select SplitStream → DataStream |
分割されたストリームから1つ以上のストリームを選択する。
|
Iterate DataStream → IterativeStream → DataStream |
1つのオペレータの出力を幾つかの前段のオペレータへリダイレクトすることで、フロー内の"feedback"ループを生成する。これは特に連続的にモデルを更新するアルゴリズムを定義するのに役立ちます。以下のコードはストリームで始まり、連続的にbodyの繰り返しを適用します。0より大きい要素はフィードバック経路に送り返され、要素の残りは下流に転送されます。完全な説明は繰り返し を見てください。
|
タイムスタンプの抽出 DataStream → DataStream |
イベントタイム セマンティクスを使うウィンドウと連携するために、レコードからタイムスタンプを抽出する。イベントタイムを見てください。
|
変換 | 解説 |
---|---|
Map DataStream → DataStream |
一つの要素を取り、一つの要素を生成します。入力ストリームの値を2倍にするmap関数:
|
FlatMap DataStream → DataStream |
一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。文を単語に分割するflatmap関数
|
Filter DataStream → DataStream |
各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。ゼロの値を取り除くフィルタ:
|
KeyBy DataStream → KeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. 内部的には、これはハッシュパーティショニングを使って実装されています。キーを指定する方法についてはkeysを見てください。この変換は KeyedStream を返します。
|
Reduce KeyedStream → DataStream |
キー付けされたデータストリーム上での "rolling" reduce。最後に削減された値を使って現在の要素を組み合わせ、新しい値を発行します。
|
Fold KeyedStream → DataStream |
初期値を持つキー付けされたデータストリーム上での "rolling" fold。最後に組み合わされた値を使って現在の要素を組み合わせ、新しい値を発行します。
数列(1,2,3,4,5)に適用された場合、fold関数は数列"start-1", "start-1-2", "start-1-2-3", ... を発行します。
|
Aggregations KeyedStream → DataStream |
キー付けされたデータストリーム上での Rolling aggregation。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。
|
Window KeyedStream → WindowedStream |
ウィンドウは既にパーティションされたKeyedStream上で定義することができます。ウィンドウは各キー内のデータをいくつかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。ウィンドウの説明についてはwindows を見てください。
|
WindowAll DataStream → AllWindowedStream |
ウィンドウは一般的なデータストリーム上で定義することができます。ウィンドウはストリームイベント全てを幾つかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。 警告: これは多くの場合において非並行 変換です。全てのレコードはwindowAllオペレータのために1つのタスク内に集められます。
|
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
全体として、ウィンドウに一般的な関数を適用します。以下は手動でウィンドウの要素を合計する関数です。 注意: windowAll変換を使っている場合、代わりにAllWindowFunctionを使う必要があります。
|
Window Reduce WindowedStream → DataStream |
ウィンドウに実用的なreduce関数を適用し、reduceされた値を返します。
|
Window Fold WindowedStream → DataStream |
ウィンドウに実用的なfold関数を適用し、foldされた値を返します。数列 (1,2,3,4,5) に適用された場合、例の関数は数列を文字列 "start-1-2-3-4-5"に畳み込みます:
|
ウィンドウ上の集約 WindowedStream → DataStream |
ウィンドウの内容を集約します。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。
|
Union DataStream* → DataStream |
全てのストリームからの全ての要素を含んでいる新しいストリームを生成している1つ以上のストリームをunionする。注意: データストリームを自分自身とunionする場合、結果のストリーム内で各要素を2回取得するでしょう。
|
ウィンドウ Join DataStream,DataStream → DataStream |
指定されたキーと共通のウィンドウ上で二つのデータストリームをjoinする。
|
ウィンドウ CoGroup DataStream,DataStream → DataStream |
指定されたキーと共通のウィンドウ上で二つのデータストリームをcogoroupする。
|
Connect DataStream,DataStream → ConnectedStreams |
二つのストリーム間の共有された状態を考慮して、それらの型を維持している二つのデータストリームを"connect"する。
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
接続されたデータストリーム上のmapとflatMapに似ている
|
Split DataStream → SplitStream |
ストリームを幾つかの条件に応じて2つ以上のストリームに分割する
|
Select SplitStream → DataStream |
分割されたストリームから1つ以上のストリームを選択する。
|
Iterate DataStream → IterativeStream → DataStream |
1つのオペレータの出力を幾つかの前段のオペレータへリダイレクトすることで、フロー内の"feedback"ループを生成する。これは特に連続的にモデルを更新するアルゴリズムを定義するのに役立ちます。以下のコードはストリームで始まり、連続的にbodyの繰り返しを適用します。0より大きい要素はフィードバック経路に送り返され、要素の残りは下流に転送されます。完全な説明は繰り返し を見てください。
|
タイムスタンプの抽出 DataStream → DataStream |
イベントタイム セマンティクスを使うウィンドウと連携するために、レコードからタイムスタンプを抽出する。イベントタイムを見てください。
|
匿名パターンマッチングを使ったタプル、case class および コレクションからの抽出は、以下のようになります:
val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
}
そのままのAPIではサポートされません。この機能を使うには、Scala API 拡張を使う必要があります。
以下の変換はタプルのデータストリーム上で利用可能です:
変換 | 解説 |
---|---|
Project DataStream → DataStream |
タプルからフィールドのサブセットを選択する
|
Flinkは変換の後の正確なストリームパーティション上で(必要であれば)以下の関数を使って低レベルの制御も行えます:
変換 | 解説 |
---|---|
独自のパーティション DataStream → DataStream |
各要素のための目的のタスクを選択するためにユーザ定義のパーティショナーを使用する。
|
Random partitioning DataStream → DataStream |
要素を一様な分散に応じてランダムにパーティションする。
|
リバランシング (ラウンドロビン パーティション) DataStream → DataStream |
パーティションごとに等しい負荷を生成して、要素をラウンドロビンでパーティションする。データの歪があるところでパフォーマンスの最適化に役立ちます。
|
再スケーリング DataStream → DataStream |
ラウンドロビンで要素をダウンストリームのオペレーションの部分集合に分割する。This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. これはタスクマネージャのスロットの数のような他の設定に依存して、データをネットワーク上に転送する代わりにローカルのデータ転送を必要とします。 アップストリームのオペレーションが要素を送信するダウンストリームのオペレーションのサブセットは、アップストリームとダウンストリームのオペレーションの両方の並行度に依存します。例えば、もしアップストリームのオペレーションが並行度2で、ダウンストリームの変更度が6の場合、1つのアップストリームのオペレーションは要素を3つのダウンストリームのオペレーションに分配し、一方で他のアップストリームのオペレーションは他の3つのダウンストリームのオペレーションに分配するでしょう。反対に、もしダウンストリームのオペレーションが並行度2でアップストリームのオペレーションが並行度6の場合、3つのアップストリームのオペレーションは1つのダウンストリームのオペレーションに分配し、一方で他の3つのアップストリームのオペレーションは他のダウンストリームのオペレーションに分配するでしょう。 In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. 上の例の接続パターンの可視化の図を見てください:
|
Broadcasting DataStream → DataStream |
要素を各パーティションにブロードキャストする。
|
変換 | 解説 |
---|---|
独自のパーティション DataStream → DataStream |
各要素のための目的のタスクを選択するためにユーザ定義のパーティショナーを使用する。
|
Random partitioning DataStream → DataStream |
要素を一様な分散に応じてランダムにパーティションする。
|
リバランシング (ラウンドロビン パーティション) DataStream → DataStream |
パーティションごとに等しい負荷を生成して、要素をラウンドロビンでパーティションする。データの歪があるところでパフォーマンスの最適化に役立ちます。
|
再スケーリング DataStream → DataStream |
ラウンドロビンで要素をダウンストリームのオペレーションの部分集合に分割する。This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. これはタスクマネージャのスロットの数のような他の設定に依存して、データをネットワーク上に転送する代わりにローカルのデータ転送を必要とします。 アップストリームのオペレーションが要素を送信するダウンストリームのオペレーションのサブセットは、アップストリームとダウンストリームのオペレーションの両方の並行度に依存します。例えば、もしアップストリームのオペレーションが並行度2で、ダウンストリームの変更度が4の場合、1つのアップストリームのオペレーションは要素を2つのダウンストリームのオペレーションに分配し、一方で他のアップストリームのオペレーションは他の2つのダウンストリームのオペレーションに分配するでしょう。反対に、もしダウンストリームのオペレーションが並行度2でアップストリームのオペレーションが並行度4の場合、2つのアップストリームのオペレーションは1つのダウンストリームのオペレーションに分配し、一方で他の2つのアップストリームのオペレーションは他のダウンストリームのオペレーションに分配するでしょう。 In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. 上の例の接続パターンの可視化の図を見てください:
|
Broadcasting DataStream → DataStream |
要素を各パーティションにブロードキャストする。
|
2つの連続する変換の連鎖は、より良いパフォーマンスのために同じスレッド内に配置することを意味します。Flinkは可能な場合(例えば、2つの連続するマップ変換)はデフォルトでオペレータを繋げます。望ましい場合はAPIは連鎖にfine-grained制御を与えます:
ジョブ全体で連鎖を無効にしたい場合は、StreamExecutionEnvironment.disableOperatorChaining()
を使ってください。fine grained 制御をより良くするために、以下の関数が利用可能です。これらの関数は以前の変換を参照するため、DataStream変換の直後でのみ利用することができることに注意してください。例えば、someStream.map(...).startNewChain()
を使うことができますが、someStream.startNewChain()
を使うことはできません。
リソースグループはFlinkでのスロットです。スロットを見てください。望ましい場合はオペレータを個々のスロット内に分離することができます。
変換 | 解説 |
---|---|
新しいチェインを開始 |
このオペレータから始まる新しいチェインを始めます。2つのマッパーはチェインされ、フィルタは最初のマッパーにチェインされないでしょう。
|
チェインを無効 |
マップのオペレータをチェインしないでください
|
スロットの共有グループを設定 |
オペレーションのスロット共有グループを設定する。SFlinkは同じスロット共有グループを持つオペレータを同じスロットに配置しますが、スロット共有グループを持たないオペレータを他のスロットのままにするでしょう。これはスロットを隔離するために使うことができます。もし全ての入力オペレータが同じスロット共有グループ内にいる場合、スロット共有グループは入力オペレータから継承されます。デフォルトのスロット共有グループの名前は"default"で、slotSharingGroup("default")を呼び出すことで、オペレータは明示的にこのグループに配置することができます。
|
変換 | 解説 |
---|---|
新しいチェインを開始 |
このオペレータから始まる新しいチェインを始めます。2つのマッパーはチェインされ、フィルタは最初のマッパーにチェインされないでしょう。
|
チェインを無効 |
マップのオペレータをチェインしないでください
|
スロットの共有グループを設定 |
オペレーションのスロット共有グループを設定する。SFlinkは同じスロット共有グループを持つオペレータを同じスロットに配置しますが、スロット共有グループを持たないオペレータを他のスロットのままにするでしょう。これはスロットを隔離するために使うことができます。もし全ての入力オペレータが同じスロット共有グループ内にいる場合、スロット共有グループは入力オペレータから継承されます。デフォルトのスロット共有グループの名前は"default"で、slotSharingGroup("default")を呼び出すことで、オペレータは明示的にこのグループに配置することができます。
|