Overview
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

オペレータ #

オペレータは、1つ以上のDataStreamを新しいDataStreamに変換します。プログラムは、複数の変換を組み合わせて、洗練されたデータフロートポロジを作成できます。

このセクションでは、基本的な変換、それを適用した後の効果的な物理パーティショニング、Flinkのオペレータチェーンについて説明します。

データストリームの変換 #

マップ #

DataStream → DataStream #

一つの要素を取り、一つの要素を生成します。入力ストリームの値を2倍にするmap関数:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
dataStream.map { x => x * 2 }
data_stream = env.from_collection(collection=[1, 2, 3, 4, 5])
data_stream.map(lambda x: 2 * x, output_type=Types.INT())

FlatMap #

DataStream → DataStream #

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。文を単語に分割するflatmap関数

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
dataStream.flatMap { str => str.split(" ") }
data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())

フィルター #

DataStream → DataStream #

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。ゼロの値を取り除くフィルタ:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
dataStream.filter { _ != 0 }
data_stream = env.from_collection(collection=[0, 1, 2, 3, 4, 5])
data_stream.filter(lambda x: x != 0)

KeyBy #

DataStream → KeyedStream #

ストリームを独立したパーティションに論理的に分割します。同じキーを持つ全てのレコードは、同じパーティションに割り当てられます。内部的には、_keyBy()_はハッシュパーティショニングを使って実装されています。キーを指定するには様々な方法があります。

dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);
dataStream.keyBy(_.someKey)
dataStream.keyBy(_._1)
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'b')])
data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) // Key by the result of KeySelector

A type cannot be a key if:

  1. これはPOJO型ですが、hashCode()メソッドを上書きせず、Object.hashCode()実装に依存します。
  2. 任意の型の配列。

Reduce #

KeyedStream → DataStream #

キー付けされたデータストリーム上での “rolling” reduce。最後に削減された値を使って現在の要素を組み合わせ、新しい値を発行します。

部分的な合計のストリームを生成するreduce関数:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
keyedStream.reduce { _ + _ }
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))

Window #

KeyedStream → WindowedStream #

ウィンドウは既にパーティションされたKeyedStream上で定義することができます。ウィンドウは各キー内のデータをいくつかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。 ウィンドウの完全な説明については、windowsを参照してください。

dataStream
  .keyBy(value -> value.f0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))); 
dataStream
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))) 
data_stream.key_by(lambda x: x[1]).window(TumblingEventTimeWindows.of(Time.seconds(5)))

WindowAll #

DataStream → AllWindowedStream #

ウィンドウは一般的なデータストリーム上で定義することができます。ウィンドウはストリームイベント全てを幾つかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。ウィンドウの完全な説明については、windowsを参照してください。

多くの場合、非並列変換です。全てのレコードはwindowAllオペレータのために1つのタスク内に集められます。
dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
data_stream.window_all(TumblingEventTimeWindows.of(Time.seconds(5)))

Window Apply #

WindowedStream → DataStream #

AllWindowedStream → DataStream #

全体として、ウィンドウに一般的な関数を適用します。以下は手動でウィンドウの要素を合計する関数です。

windowAll変換を使っている場合は、代わりにAllWindowFunctionを使う必要があります。
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):

    def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[1]
        yield sum


class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):

    def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[1]
        yield sum


windowed_stream.apply(MyWindowFunction())

# applying an AllWindowFunction on non-keyed window stream
all_windowed_stream.apply(MyAllWindowFunction())

WindowReduce #

WindowedStream → DataStream #

ウィンドウに実用的なreduce関数を適用し、reduceされた値を返します。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});
windowedStream.reduce { _ + _ }
class MyReduceFunction(ReduceFunction):

    def reduce(self, value1, value2):
        return value1[0], value1[1] + value2[1]


windowed_stream.reduce(MyReduceFunction())

和集合 #

DataStream* → DataStream #

全てのストリームからの全ての要素を含んでいる新しいストリームを生成している1つ以上のストリームをunionする。注意: データストリームを自分自身とunionする場合、結果のストリーム内で各要素を2回取得するでしょう。

dataStream.union(otherStream1, otherStream2, ...);
dataStream.union(otherStream1, otherStream2, ...)
data_stream.union(otherStream1, otherStream2, ...)

ウィンドウ結合 #

DataStream,DataStream → DataStream #

指定されたキーと共通のウィンドウ上で二つのデータストリームをjoinする。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }
この機能はPythonではまだサポートされません

インターバル結合 #

KeyedStream,KeyedStream → DataStream #

e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBoundとなるようい、2つのキー付きストリームの2つの要素e1とe2を、指定されwた時間間隔にわたる共通のキーで結合します。

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) 
    // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...})
この機能はPythonではまだサポートされません

Window CoGroup #

DataStream,DataStream → DataStream #

指定されたキーと共通のウィンドウ上で二つのデータストリームをcogoroupする。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}
この機能はPythonではまだサポートされません

Connect #

DataStream,DataStream → ConnectedStream #

型を維持したまま二つのデータストリームを"connects"する。二つのストリーム間の共有された状態を考慮して接続する

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
stream_1 = ...
stream_2 = ...
connected_streams = stream_1.connect(stream_2)

CoMap, CoFlatMap #

ConnectedStream → DataStream #

接続されたデータストリーム上のmapとflatMapに似ている

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)
class MyCoMapFunction(CoMapFunction):
    
    def map1(self, value):
        return value[0] + 1, value[1]
       
    def map2(self, value):
        return value[0], value[1] + 'flink'
        
class MyCoFlatMapFunction(CoFlatMapFunction):
    
    def flat_map1(self, value)
        for i in range(value[0]):
            yield i
    
    def flat_map2(self, value):
        yield value[0] + 1
        
connectedStreams.map(MyCoMapFunction())
connectedStreams.flat_map(MyCoFlatMapFunction())

Cache #

DataStream → CachedDataStream #

変換の中間結果をキャッシュします。現在、バッチ実行モードで実行されるジョブのみがサポートされています。キャッシュ中間結果は、結果を後のジョブで再利用できるように、中間結果が初めて計算される時に遅延生成されます。キャッシュが失われた場合は、元の変換を使って再計算されます。

DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.

cachedDataStream.print(); // Consume cached result.
env.execute();
val dataStream : DataStream[Int] = //...
val cachedDataStream = dataStream.cache()
cachedDataStream.print() // Do anything with the cachedDataStream
...
env.execute() // Execute and create cache.

cachedDataStream.print() // Consume cached result.
env.execute()
data_stream = ... # DataStream
cached_data_stream = data_stream.cache()
cached_data_stream.print()
# ...
env.execute() # Execute and create cache.

cached_data_stream.print() # Consume cached result.
env.execute()

Physical Partitioning #

Flinkは変換の後の正確なストリームパーティション上で(必要であれば)以下の関数を使って低レベルの制御も行えます:

カスタム パーティション #

DataStream → DataStream #

各要素のための目的のタスクを選択するためにユーザ定義のパーティショナーを使用する。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
data_stream = env.from_collection(collection=[(2, 'a'), (2, 'a'), (3, 'b')])
data_stream.partition_custom(lambda key, num_partition: key % partition, lambda x: x[0])

Random Partitioning #

DataStream → DataStream #

要素を一様な分散に応じてランダムにパーティションする。

dataStream.shuffle();
dataStream.shuffle()
data_stream.shuffle()

Rescaling #

DataStream → DataStream #

ラウンドロビンで要素をダウンストリームのオペレーションの部分集合に分割する。これは、例えば、ソースの各並列インスタンスから複数音マッパーのサブセットにファンアウトして負荷を分散するパイプラインを作成したいが、rebalance()による完全なリバランスは望まない場合に便利です。これはタスクマネージャのスロットの数のような他の設定に依存して、データをネットワーク上に転送する代わりにローカルのデータ転送を必要とします。

アップストリームのオペレーションが要素を送信するダウンストリームのオペレーションのサブセットは、アップストリームとダウンストリームのオペレーションの両方の並行度に依存します。例えば、もしアップストリームのオペレーションが並行度2で、ダウンストリームの変更度が6の場合、1つのアップストリームのオペレーションは要素を3つのダウンストリームのオペレーションに分配し、一方で他のアップストリームのオペレーションは他の3つのダウンストリームのオペレーションに分配するでしょう。反対に、もしダウンストリームのオペレーションが並行度2でアップストリームのオペレーションが並行度6の場合、3つのアップストリームのオペレーションは1つのダウンストリームのオペレーションに分配し、一方で他の3つのアップストリームのオペレーションは他のダウンストリームのオペレーションに分配するでしょう。

異なる並列度がお互いの倍数ではない場合、1つ以上のダウンストリームオペレーションには、upstreamオペレーションとは異なる数の入力が含まれます。

上の例の接続パターンの可視化の図を見てください:

Checkpoint barriers in data streams
dataStream.rescale();
dataStream.rescale()
data_stream.rescale()

Broadcasting #

DataStream → DataStream #

要素を各パーティションにブロードキャストする。

dataStream.broadcast();
dataStream.broadcast()
data_stream.broadcast()

タスクチェーンとリソースグループ #

2つの連続する変換の連鎖は、より良いパフォーマンスのために同じスレッド内に配置することを意味します。Flinkは可能な場合(例えば、2つの連続するマップ変換)はデフォルトでオペレータを繋げます。望ましい場合はAPIは連鎖にfine-grained制御を与えます:

ジョブ全体でチェーンを無効にしたい場合、StreamExecutionEnvironment.disableOperatorChaining()を使ってください。fine grained 制御をより良くするために、以下の関数が利用可能です。これらの関数は以前の変換を参照するため、DataStream変換の直後でのみ利用することができることに注意してください。例えば、someStream.map(...).startNewChain()を使うことができますが、 someStream.startNewChain()を使うことはできません。

リソースグループは、Flinkでのスロットです。スロットを参照してください。望ましい場合はオペレータを個々のスロット内に分離することができます。

新しいチェーンの開始 #

このオペレータから始まる新しいチェインを始めます。 2つのマッパーはチェインされ、フィルタは最初のマッパーにチェインされないでしょう。

someStream.filter(...).map(...).startNewChain().map(...);
someStream.filter(...).map(...).startNewChain().map(...)
some_stream.filter(...).map(...).start_new_chain().map(...)

チェーンの無効化 #

マップオペレータを繋げないでください。

someStream.map(...).disableChaining();
someStream.map(...).disableChaining()
some_stream.map(...).disable_chaining()

スロット共有グループの設定 #

オペレーションのスロット共有グループを設定する。Flinkは同じスロット共有グループを持つオペレータを同じスロットに配置しますが、スロット共有グループを持たないオペレータを他のスロットのままにするでしょう。これはスロットを隔離するために使うことができます。もし全ての入力オペレータが同じスロット共有グループ内にいる場合、スロット共有グループは入力オペレータから継承されます。デフォルトのスロット共有グループの名前は"default"で、slotSharingGroup(“default”)を呼び出すことで、オペレータは明示的にこのグループに配置することができます。

someStream.filter(...).slotSharingGroup("name");
someStream.filter(...).slotSharingGroup("name")
some_stream.filter(...).slot_sharing_group("name")

名前と説明 #

flinkのオペレータとジョブ頂点には、名前と説明があります。 名前と説明は両方とも、オペレータまたはジョブ頂点が何を行っているかを説明するものですが、使用方法が異なります。

オペレータとジョブ頂点の名前は、web UI、スレッド名、ログ、メトリクスなどで使われます。 ジョブ頂点の名前はジョブ頂点内の名前に基づいて構築されます。 外部システムへの負荷を避けるために、名前はできるだけ簡潔にする必要があります。

説明は実行プランで使われ、web UIでジョブ頂点の詳細として表示されます。 ジョブ頂点の説明はその中のオペレータの記述に基づいて構築されます。 実行時のデバッグを容易にするために、説明にはオペレータに関する詳細情報を含めることができます。

someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1");
someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1")
some_stream.filter(...).name("filter").set_description("x in (1, 2, 3, 4) and y > 1")

ジョブ頂点の説明の形式は、デフォルトではツリー形式の文字列です。 以前のバージョンのように説明をカスケード形式に設定したい場合、ユーザはpipeline.vertex-description-modeCASCADINGに設定できます。

Flinkによって生成されたオペレータには、デフォルトで演算子の種類とデフォルトでオペレータの種類とIDで構成される名前と証左いな説明が付けられます。 以前のバージョンのように説明をカスケード形式に設定したい場合、ユーザはtable.exec.simplify-operator-name-enabledfalseに設定できます。

パイプラインのトポロジが複雑な場合、ユーザはpipeline.vertex-name-include-index-prefixtrueに設定することで、頂点の名前でトポロジインデックスを追加できます。これにより、ログまたはメトリクスタグに従って、グラフ内の頂点を簡単に見つけることができます。

inserted by FC2 system