オペレータ

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

This section gives a description of the basic transformations, the effective physical partitioning after applying those as well as insights into Flink’s operator chaining.

データストリームの変換


変換 解説
Map
DataStream → DataStream

一つの要素を取り、一つの要素を生成します。入力ストリームの値を2倍にするmap関数:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
FlatMap
DataStream → DataStream

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。文を単語に分割するflatmap関数

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
Filter
DataStream → DataStream

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。ゼロの値を取り除くフィルタ:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. 内部的には、これはハッシュパーティショニングを使って実装されています。キーを指定する方法についてはkeysを見てください。この変換は KeyedStream を返します。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

注意 もし以下であれば、タイプはキーになりえません:

  1. POJO型ではあるが、hashCode() メソッドを上書きせず、Object.hashCode()実装に依存しない。
  2. 任意の型の配列。

Reduce
KeyedStream → DataStream

キー付けされたデータストリーム上での "rolling" reduce。最後に削減された値を使って現在の要素を組み合わせ、新しい値を発行します。

部分的な合計のストリームを生成するreduce関数:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
Fold
KeyedStream → DataStream

初期値を持つキー付けされたデータストリーム上での "rolling" fold。最後に組み合わされた値を使って現在の要素を組み合わせ、新しい値を発行します。

数列(1,2,3,4,5)に適用された場合、fold関数は数列"start-1", "start-1-2", "start-1-2-3", ... を発行します。

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

Aggregations
KeyedStream → DataStream

キー付けされたデータストリーム上での Rolling aggregation。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
Window
KeyedStream → WindowedStream

ウィンドウは既にパーティションされたKeyedStream上で定義することができます。ウィンドウは各キー内のデータをいくつかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

ウィンドウは一般的なデータストリーム上で定義することができます。ウィンドウはストリームイベント全てを幾つかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。

警告: これは多くの場合において非並行 変換です。全てのレコードはwindowAllオペレータのために1つのタスク内に集められます。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

全体として、ウィンドウに一般的な関数を適用します。以下は手動でウィンドウの要素を合計する関数です。

注意: windowAll変換を使っている場合、代わりにAllWindowFunctionを使う必要があります。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
Window Reduce
WindowedStream → DataStream

ウィンドウに実用的なreduce関数を適用し、reduceされた値を返します。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});
Window Fold
WindowedStream → DataStream

ウィンドウに実用的なfold関数を適用し、foldされた値を返します。数列 (1,2,3,4,5) に適用された場合、例の関数は数列を文字列 "start-1-2-3-4-5"に畳み込みます:

windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});
ウィンドウ上の集約
WindowedStream → DataStream

ウィンドウの内容を集約します。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
Union
DataStream* → DataStream

全てのストリームからの全ての要素を含んでいる新しいストリームを生成している1つ以上のストリームをunionする。注意: データストリームを自分自身とunionする場合、結果のストリーム内で各要素を2回取得するでしょう。

dataStream.union(otherStream1, otherStream2, ...);
ウィンドウ Join
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをjoinする。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
ウィンドウ CoGroup
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをcogoroupする。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
Connect
DataStream,DataStream → ConnectedStreams

型を維持したまま二つのデータストリームを"connects"する。二つのストリーム間の共有された状態を考慮して接続する

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap, CoFlatMap
ConnectedStreams → DataStream

接続されたデータストリーム上のmapとflatMapに似ている

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
Split
DataStream → SplitStream

ストリームを幾つかの条件に応じて2つ以上のストリームに分割する

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

Select
SplitStream → DataStream

分割されたストリームから1つ以上のストリームを選択する。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Iterate
DataStream → IterativeStream → DataStream

1つのオペレータの出力を幾つかの前段のオペレータへリダイレクトすることで、フロー内の"feedback"ループを生成する。これは特に連続的にモデルを更新するアルゴリズムを定義するのに役立ちます。以下のコードはストリームで始まり、連続的にbodyの繰り返しを適用します。0より大きい要素はフィードバック経路に送り返され、要素の残りは下流に転送されます。完全な説明は繰り返し を見てください。

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value <= 0;
    }
});

タイムスタンプの抽出
DataStream → DataStream

イベントタイム セマンティクスを使うウィンドウと連携するために、レコードからタイムスタンプを抽出する。イベントタイムを見てください。

stream.assignTimestamps (new TimeStampExtractor() {...});


変換 解説
Map
DataStream → DataStream

一つの要素を取り、一つの要素を生成します。入力ストリームの値を2倍にするmap関数:

dataStream.map { x => x * 2 }
FlatMap
DataStream → DataStream

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。文を単語に分割するflatmap関数

dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。ゼロの値を取り除くフィルタ:

dataStream.filter { _ != 0 }
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. 内部的には、これはハッシュパーティショニングを使って実装されています。キーを指定する方法についてはkeysを見てください。この変換は KeyedStream を返します。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream

キー付けされたデータストリーム上での "rolling" reduce。最後に削減された値を使って現在の要素を組み合わせ、新しい値を発行します。

部分的な合計のストリームを生成するreduce関数:

keyedStream.reduce { _ + _ }
Fold
KeyedStream → DataStream

初期値を持つキー付けされたデータストリーム上での "rolling" fold。最後に組み合わされた値を使って現在の要素を組み合わせ、新しい値を発行します。

数列(1,2,3,4,5)に適用された場合、fold関数は数列"start-1", "start-1-2", "start-1-2-3", ... を発行します。

val result: DataStream[String] =
    keyedStream.fold("start")((str, i) => { str + "-" + i })

Aggregations
KeyedStream → DataStream

キー付けされたデータストリーム上での Rolling aggregation。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream

ウィンドウは既にパーティションされたKeyedStream上で定義することができます。ウィンドウは各キー内のデータをいくつかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。ウィンドウの説明についてはwindows を見てください。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

ウィンドウは一般的なデータストリーム上で定義することができます。ウィンドウはストリームイベント全てを幾つかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。

警告: これは多くの場合において非並行 変換です。全てのレコードはwindowAllオペレータのために1つのタスク内に集められます。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

全体として、ウィンドウに一般的な関数を適用します。以下は手動でウィンドウの要素を合計する関数です。

注意: windowAll変換を使っている場合、代わりにAllWindowFunctionを使う必要があります。

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream → DataStream

ウィンドウに実用的なreduce関数を適用し、reduceされた値を返します。

windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream

ウィンドウに実用的なfold関数を適用し、foldされた値を返します。数列 (1,2,3,4,5) に適用された場合、例の関数は数列を文字列 "start-1-2-3-4-5"に畳み込みます:

val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i })
ウィンドウ上の集約
WindowedStream → DataStream

ウィンドウの内容を集約します。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* → DataStream

全てのストリームからの全ての要素を含んでいる新しいストリームを生成している1つ以上のストリームをunionする。注意: データストリームを自分自身とunionする場合、結果のストリーム内で各要素を2回取得するでしょう。

dataStream.union(otherStream1, otherStream2, ...)
ウィンドウ Join
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをjoinする。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }
ウィンドウ CoGroup
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをcogoroupする。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}
Connect
DataStream,DataStream → ConnectedStreams

二つのストリーム間の共有された状態を考慮して、それらの型を維持している二つのデータストリームを"connect"する。

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream

接続されたデータストリーム上のmapとflatMapに似ている

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)
Split
DataStream → SplitStream

ストリームを幾つかの条件に応じて2つ以上のストリームに分割する

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)

Select
SplitStream → DataStream

分割されたストリームから1つ以上のストリームを選択する。

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

Iterate
DataStream → IterativeStream → DataStream

1つのオペレータの出力を幾つかの前段のオペレータへリダイレクトすることで、フロー内の"feedback"ループを生成する。これは特に連続的にモデルを更新するアルゴリズムを定義するのに役立ちます。以下のコードはストリームで始まり、連続的にbodyの繰り返しを適用します。0より大きい要素はフィードバック経路に送り返され、要素の残りは下流に転送されます。完全な説明は繰り返し を見てください。

initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
}

タイムスタンプの抽出
DataStream → DataStream

イベントタイム セマンティクスを使うウィンドウと連携するために、レコードからタイムスタンプを抽出する。イベントタイムを見てください。

stream.assignTimestamps { timestampExtractor }

匿名パターンマッチングを使ったタプル、case class および コレクションからの抽出は、以下のようになります:

val data: DataStream[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

そのままのAPIではサポートされません。この機能を使うには、Scala API 拡張を使う必要があります。

以下の変換はタプルのデータストリーム上で利用可能です:


変換 解説
Project
DataStream → DataStream

タプルからフィールドのサブセットを選択する

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

物理的なパーティショニング

Flinkは変換の後の正確なストリームパーティション上で(必要であれば)以下の関数を使って低レベルの制御も行えます:


変換 解説
独自のパーティション
DataStream → DataStream

各要素のための目的のタスクを選択するためにユーザ定義のパーティショナーを使用する。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

Random partitioning
DataStream → DataStream

要素を一様な分散に応じてランダムにパーティションする。

dataStream.shuffle();

リバランシング (ラウンドロビン パーティション)
DataStream → DataStream

パーティションごとに等しい負荷を生成して、要素をラウンドロビンでパーティションする。データの歪があるところでパフォーマンスの最適化に役立ちます。

dataStream.rebalance();

再スケーリング
DataStream → DataStream

ラウンドロビンで要素をダウンストリームのオペレーションの部分集合に分割する。This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. これはタスクマネージャのスロットの数のような他の設定に依存して、データをネットワーク上に転送する代わりにローカルのデータ転送を必要とします。

アップストリームのオペレーションが要素を送信するダウンストリームのオペレーションのサブセットは、アップストリームとダウンストリームのオペレーションの両方の並行度に依存します。例えば、もしアップストリームのオペレーションが並行度2で、ダウンストリームの変更度が6の場合、1つのアップストリームのオペレーションは要素を3つのダウンストリームのオペレーションに分配し、一方で他のアップストリームのオペレーションは他の3つのダウンストリームのオペレーションに分配するでしょう。反対に、もしダウンストリームのオペレーションが並行度2でアップストリームのオペレーションが並行度6の場合、3つのアップストリームのオペレーションは1つのダウンストリームのオペレーションに分配し、一方で他の3つのアップストリームのオペレーションは他のダウンストリームのオペレーションに分配するでしょう。

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

上の例の接続パターンの可視化の図を見てください:

データストリーム内のチェックポイントのバリア

dataStream.rescale();

Broadcasting
DataStream → DataStream

要素を各パーティションにブロードキャストする。

dataStream.broadcast();


変換 解説
独自のパーティション
DataStream → DataStream

各要素のための目的のタスクを選択するためにユーザ定義のパーティショナーを使用する。

dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)

Random partitioning
DataStream → DataStream

要素を一様な分散に応じてランダムにパーティションする。

dataStream.shuffle()

リバランシング (ラウンドロビン パーティション)
DataStream → DataStream

パーティションごとに等しい負荷を生成して、要素をラウンドロビンでパーティションする。データの歪があるところでパフォーマンスの最適化に役立ちます。

dataStream.rebalance()

再スケーリング
DataStream → DataStream

ラウンドロビンで要素をダウンストリームのオペレーションの部分集合に分割する。This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. これはタスクマネージャのスロットの数のような他の設定に依存して、データをネットワーク上に転送する代わりにローカルのデータ転送を必要とします。

アップストリームのオペレーションが要素を送信するダウンストリームのオペレーションのサブセットは、アップストリームとダウンストリームのオペレーションの両方の並行度に依存します。例えば、もしアップストリームのオペレーションが並行度2で、ダウンストリームの変更度が4の場合、1つのアップストリームのオペレーションは要素を2つのダウンストリームのオペレーションに分配し、一方で他のアップストリームのオペレーションは他の2つのダウンストリームのオペレーションに分配するでしょう。反対に、もしダウンストリームのオペレーションが並行度2でアップストリームのオペレーションが並行度4の場合、2つのアップストリームのオペレーションは1つのダウンストリームのオペレーションに分配し、一方で他の2つのアップストリームのオペレーションは他のダウンストリームのオペレーションに分配するでしょう。

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

上の例の接続パターンの可視化の図を見てください:
データストリーム内のチェックポイントのバリア

dataStream.rescale()

Broadcasting
DataStream → DataStream

要素を各パーティションにブロードキャストする。

dataStream.broadcast()

タスクの連鎖とリソースグループ

2つの連続する変換の連鎖は、より良いパフォーマンスのために同じスレッド内に配置することを意味します。Flinkは可能な場合(例えば、2つの連続するマップ変換)はデフォルトでオペレータを繋げます。望ましい場合はAPIは連鎖にfine-grained制御を与えます:

ジョブ全体で連鎖を無効にしたい場合は、StreamExecutionEnvironment.disableOperatorChaining() を使ってください。fine grained 制御をより良くするために、以下の関数が利用可能です。これらの関数は以前の変換を参照するため、DataStream変換の直後でのみ利用することができることに注意してください。例えば、someStream.map(...).startNewChain()を使うことができますが、someStream.startNewChain()を使うことはできません。

リソースグループはFlinkでのスロットです。スロットを見てください。望ましい場合はオペレータを個々のスロット内に分離することができます。


変換 解説
新しいチェインを開始

このオペレータから始まる新しいチェインを始めます。2つのマッパーはチェインされ、フィルタは最初のマッパーにチェインされないでしょう。

someStream.filter(...).map(...).startNewChain().map(...);

チェインを無効

マップのオペレータをチェインしないでください

someStream.map(...).disableChaining();

スロットの共有グループを設定

オペレーションのスロット共有グループを設定する。SFlinkは同じスロット共有グループを持つオペレータを同じスロットに配置しますが、スロット共有グループを持たないオペレータを他のスロットのままにするでしょう。これはスロットを隔離するために使うことができます。もし全ての入力オペレータが同じスロット共有グループ内にいる場合、スロット共有グループは入力オペレータから継承されます。デフォルトのスロット共有グループの名前は"default"で、slotSharingGroup("default")を呼び出すことで、オペレータは明示的にこのグループに配置することができます。

someStream.filter(...).slotSharingGroup("name");


変換 解説
新しいチェインを開始

このオペレータから始まる新しいチェインを始めます。2つのマッパーはチェインされ、フィルタは最初のマッパーにチェインされないでしょう。

someStream.filter(...).map(...).startNewChain().map(...)

チェインを無効

マップのオペレータをチェインしないでください

someStream.map(...).disableChaining()

スロットの共有グループを設定

オペレーションのスロット共有グループを設定する。SFlinkは同じスロット共有グループを持つオペレータを同じスロットに配置しますが、スロット共有グループを持たないオペレータを他のスロットのままにするでしょう。これはスロットを隔離するために使うことができます。もし全ての入力オペレータが同じスロット共有グループ内にいる場合、スロット共有グループは入力オペレータから継承されます。デフォルトのスロット共有グループの名前は"default"で、slotSharingGroup("default")を呼び出すことで、オペレータは明示的にこのグループに配置することができます。

someStream.filter(...).slotSharingGroup("name")

上に戻る

TOP
inserted by FC2 system