Flink データストリーム API プログラミング ガイド

Flinkでのデータストリームプログラムはデータストリーム上の変換を実装する一般的なプログラムです(例えば、フィルタリング、状態の更新、ウィンドウの定義、集約)。データストリームは最初に様々なソースから生成されます (例えば、メッセージキュー、ソケットストリーム、ファイル)。結果はsinkを使って返されます。これは例えばファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。

Flink APIの基本的な概念の紹介については基本的な概念を見てください。

独自のFlinkデータストリームプログラムを作成するには、Flinkプログラムの内部構造から始め、次第に独自の 変換を追加することをお勧めします。残りのセクションは、追加のオペレーションと上級の特徴についてのリファレンスとして振る舞います。

プログラムの例

以下のプログラムは、webソケットから5秒のウィンドウ内でやってくる単語を数える、ストリーミング ウィンドウ カウントアプリケーションの完全に動作する例です。ローカルでそれを動作するためにコードをコピー&ペーストすることができます。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print

    env.execute("Window Stream WordCount")
  }
}

例のプログラムを実行するためには、最初にターミナルからnetcatを使って入力ストリームを開始します:

nc -lk 9999

単にいくつかの単語をタイプし、新しい単語のためにリターンを打ちます。これらはワードカウントプログラムへの入力になるでしょう。1以上のカウントを見たい場合は、同じ単語を5秒以内に繰り返しタイプします(そんなに速く打てない場合は、ウィンドウサイズを5秒から増やします☺)。

上に戻る

データストリームの変換

データ変換は、1つ以上のデータストリームを新しいデータストリームに変換します。プログラムは複数の変換を洗練された位相に組み合わせることができます。

この章は全ての利用可能な変換について説明します。


変換 解説
Map
DataStream → DataStream

一つの要素を取り、一つの要素を生成します。入力ストリームの値を2倍にするmap関数:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
FlatMap
DataStream → DataStream

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。文を単語に分割するflatmap関数

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
Filter
DataStream → DataStream

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。ゼロの値を取り除くフィルタ:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. 内部的には、これはハッシュパーティショニングを使って実装されています。キーを指定する方法についてはkeysを見てください。この変換はKeyedDataStreamを返します。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream

キー付けされたデータストリーム上での "rolling" reduce。最後に削減された値を使って現在の要素を組み合わせ、新しい値を発行します。

部分的な合計のストリームを生成するreduce関数:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
Fold
KeyedStream → DataStream

初期値を持つキー付けされたデータストリーム上での "rolling" fold。最後に組み合わされた値を使って現在の要素を組み合わせ、新しい値を発行します。

数列(1,2,3,4,5)に適用された場合、fold関数は数列"start-1", "start-1-2", "start-1-2-3", ... を発行します。

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

Aggregations
KeyedStream → DataStream

キー付けされたデータストリーム上での Rolling aggregation。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
Window
KeyedStream → WindowedStream

ウィンドウは既にパーティションされたKeyedStream上で定義することができます。ウィンドウは各キー内のデータをいくつかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

ウィンドウは一般的なデータストリーム上で定義することができます。ウィンドウはストリームイベント全てを幾つかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。

警告: これは多くの場合において非並行 変換です。全てのレコードはwindowAllオペレータのために1つのタスク内に集められます。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

全体として、ウィンドウに一般的な関数を適用します。以下は手動でウィンドウの要素を合計する関数です。

注意: windowAll変換を使っている場合、代わりにAllWindowFunctionを使う必要があります。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
Window Reduce
WindowedStream → DataStream

ウィンドウに実用的なreduce関数を適用し、reduceされた値を返します。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});
Window Fold
WindowedStream → DataStream

ウィンドウに実用的なfold関数を適用し、foldされた値を返します。数列 (1,2,3,4,5) に適用された場合、例の関数は数列を文字列 "start-1-2-3-4-5"に畳み込みます:

windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});
ウィンドウ上の集約
WindowedStream → DataStream

ウィンドウの内容を集約します。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
Union
DataStream* → DataStream

全てのストリームからの全ての要素を含んでいる新しいストリームを生成している1つ以上のストリームをunionする。注意: データストリームを自分自身とunionする場合、結果のストリーム内で各要素を2回取得するでしょう。

dataStream.union(otherStream1, otherStream2, ...);
ウィンドウ Join
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをjoinする。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
ウィンドウ CoGroup
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをcogoroupする。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
Connect
DataStream,DataStream → ConnectedStreams

型を維持したまま二つのデータストリームを"connects"する。二つのストリーム間の共有された状態を考慮して接続する

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap, CoFlatMap
ConnectedStreams → DataStream

接続されたデータストリーム上のmapとflatMapに似ている

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
Split
DataStream → SplitStream

ストリームを幾つかの条件に応じて2つ以上のストリームに分割する

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

Select
SplitStream → DataStream

分割されたストリームから1つ以上のストリームを選択する。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Iterate
DataStream → IterativeStream → DataStream

1つのオペレータの出力を幾つかの前段のオペレータへリダイレクトすることで、フロー内の"feedback"ループを生成する。これは特に連続的にモデルを更新するアルゴリズムを定義するのに役立ちます。以下のコードはストリームで始まり、連続的にbodyの繰り返しを適用します。0より大きい要素はフィードバック経路に送り返され、要素の残りは下流に転送されます。完全な説明は繰り返し を見てください。

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value <= 0;
    }
});

タイムスタンプの抽出
DataStream → DataStream

イベントタイム セマンティクスを使うウィンドウと連携するために、レコードからタイムスタンプを抽出する。イベントタイムを見てください。

stream.assignTimestamps (new TimeStampExtractor() {...});


変換 解説
Map
DataStream → DataStream

一つの要素を取り、一つの要素を生成します。入力ストリームの値を2倍にするmap関数:

dataStream.map { x => x * 2 }
FlatMap
DataStream → DataStream

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。文を単語に分割するflatmap関数

dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。ゼロの値を取り除くフィルタ:

dataStream.filter { _ != 0 }
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. 内部的には、これはハッシュパーティショニングを使って実装されています。キーを指定する方法についてはkeysを見てください。この変換はKeyedDataStreamを返します。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream

キー付けされたデータストリーム上での "rolling" reduce。最後に削減された値を使って現在の要素を組み合わせ、新しい値を発行します。

部分的な合計のストリームを生成するreduce関数:

keyedStream.reduce { _ + _ }
Fold
KeyedStream → DataStream

初期値を持つキー付けされたデータストリーム上での "rolling" fold。最後に組み合わされた値を使って現在の要素を組み合わせ、新しい値を発行します。

数列(1,2,3,4,5)に適用された場合、fold関数は数列"start-1", "start-1-2", "start-1-2-3", ... を発行します。

val result: DataStream[String] =
    keyedStream.fold("start")((str, i) => { str + "-" + i })

Aggregations
KeyedStream → DataStream

キー付けされたデータストリーム上での Rolling aggregation。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream

ウィンドウは既にパーティションされたKeyedStream上で定義することができます。ウィンドウは各キー内のデータをいくつかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。ウィンドウの説明についてはwindows を見てください。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

ウィンドウは一般的なデータストリーム上で定義することができます。ウィンドウはストリームイベント全てを幾つかの特徴によってグループ化します(例えば、最後の5秒以内に到着したデータ)。完全なwindowの説明については、windows を見てください。

警告: これは多くの場合において非並行 変換です。全てのレコードはwindowAllオペレータのために1つのタスク内に集められます。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

全体として、ウィンドウに一般的な関数を適用します。以下は手動でウィンドウの要素を合計する関数です。

注意: windowAll変換を使っている場合、代わりにAllWindowFunctionを使う必要があります。

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream → DataStream

ウィンドウに実用的なreduce関数を適用し、reduceされた値を返します。

windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream

ウィンドウに実用的なfold関数を適用し、foldされた値を返します。数列 (1,2,3,4,5) に適用された場合、例の関数は数列を文字列 "start-1-2-3-4-5"に畳み込みます:

val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i })
ウィンドウ上の集約
WindowedStream → DataStream

ウィンドウの内容を集約します。minとminByの違いは、minは最小の値を返すのに対し、minByはこのフィールド内で最小の値を持つ要素を返します(maxとmaxByと同じ)。

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* → DataStream

全てのストリームからの全ての要素を含んでいる新しいストリームを生成している1つ以上のストリームをunionする。注意: データストリームを自分自身とunionする場合、結果のストリーム内で各要素を2回取得するでしょう。

dataStream.union(otherStream1, otherStream2, ...)
ウィンドウ Join
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをjoinする。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }
ウィンドウ CoGroup
DataStream,DataStream → DataStream

指定されたキーと共通のウィンドウ上で二つのデータストリームをcogoroupする。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}
Connect
DataStream,DataStream → ConnectedStreams

二つのストリーム間の共有された状態を考慮して、それらの型を維持している二つのデータストリームを"connect"する。

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream

接続されたデータストリーム上のmapとflatMapに似ている

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)
Split
DataStream → SplitStream

ストリームを幾つかの条件に応じて2つ以上のストリームに分割する

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)

Select
SplitStream → DataStream

分割されたストリームから1つ以上のストリームを選択する。

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

Iterate
DataStream → IterativeStream → DataStream

1つのオペレータの出力を幾つかの前段のオペレータへリダイレクトすることで、フロー内の"feedback"ループを生成する。これは特に連続的にモデルを更新するアルゴリズムを定義するのに役立ちます。以下のコードはストリームで始まり、連続的にbodyの繰り返しを適用します。0より大きい要素はフィードバック経路に送り返され、要素の残りは下流に転送されます。完全な説明は繰り返し を見てください。

initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
}

タイムスタンプの抽出
DataStream → DataStream

イベントタイム セマンティクスを使うウィンドウと連携するために、レコードからタイムスタンプを抽出する。イベントタイムを見てください。

stream.assignTimestamps { timestampExtractor }

匿名パターンマッチングを使ったタプル、case class および コレクションからの抽出は、以下のようになります:

val data: DataStream[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

そのままのAPIではサポートされません。この機能を使うには、Scala API 拡張を使う必要があります。

以下の変換はタプルのデータストリーム上で利用可能です:


変換 解説
Project
DataStream → DataStream

タプルからフィールドのサブセットを選択する

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

物理的なパーティショニング

Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.


変換 解説
Custom partitioning
DataStream → DataStream

各要素のための目的のタスクを選択するためにユーザ定義のパーティショナーを使用する。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

Random partitioning
DataStream → DataStream

要素を一様な分散に応じてランダムにパーティションする。

dataStream.shuffle();

Rebalancing (Round-robin partitioning)
DataStream → DataStream

パーティションごとに等しい負荷を生成して、要素をラウンドロビンでパーティションする。データの歪があるところでパフォーマンスの最適化に役立ちます。

dataStream.rebalance();

Rescaling
DataStream → DataStream

Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

Please see this figure for a visualization of the connection pattern in the above example:

データストリーム内のチェックポイントのバリア

dataStream.rescale();

Broadcasting
DataStream → DataStream

Broadcasts elements to every partition.

dataStream.broadcast();


変換 解説
Custom partitioning
DataStream → DataStream

各要素のための目的のタスクを選択するためにユーザ定義のパーティショナーを使用する。

dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)

Random partitioning
DataStream → DataStream

要素を一様な分散に応じてランダムにパーティションする。

dataStream.shuffle()

Rebalancing (Round-robin partitioning)
DataStream → DataStream

パーティションごとに等しい負荷を生成して、要素をラウンドロビンでパーティションする。データの歪があるところでパフォーマンスの最適化に役立ちます。

dataStream.rebalance()

Rescaling
DataStream → DataStream

Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations.

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

Please see this figure for a visualization of the connection pattern in the above example:
データストリーム内のチェックポイントのバリア

dataStream.rescale()

Broadcasting
DataStream → DataStream

Broadcasts elements to every partition.

dataStream.broadcast()

タスクの連鎖とリソースグループ

Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.


変換 解説
Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...);

Disable chaining

Do not chain the map operator

someStream.map(...).disableChaining();

Set slot sharing group

Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").

someStream.filter(...).slotSharingGroup("name");


変換 解説
Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...)

Disable chaining

Do not chain the map operator

someStream.map(...).disableChaining()

Set slot sharing group

Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").

someStream.filter(...).slotSharingGroup("name")

上に戻る

データソース


Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction). Flink comes with a number of pre-implemented source functions, but you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

ファイルベース:

  • readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.

  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.

    IMPLEMENTATION:

    Under the hood, Flink splits the file reading process into two sub-tasks, namely directory monitoring and data reading. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, non-parallel (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the watchType), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read muplitple splits, one-by-one.

    IMPORTANT NOTES:

    1. If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.

    2. If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source scans the path once and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

コレクション ベース:

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(T ...) - Creates a data stream from the given sequence of objects. 全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer08<>(...)). See connectors for more details.


Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction). Flink comes with a number of pre-implemented source functions, but you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

ファイルベース:

  • readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.

  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.

  • readFile(fileInputFormat, path, watchType, interval, pathFilter) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.

    IMPLEMENTATION:

    Under the hood, Flink splits the file reading process into two sub-tasks, namely directory monitoring and data reading. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, non-parallel (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the watchType), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read muplitple splits, one-by-one.

    IMPORTANT NOTES:

    1. If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.

    2. If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source scans the path once and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

コレクション ベース:

  • fromCollection(Seq) - Creates a data stream from the Java Java.util.Collection. コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator) - Creates a data stream from an iterator. クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(elements: _*) - Creates a data stream from the given sequence of objects. 全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator) - Creates a data stream from an iterator, in parallel. クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

Custom:

  • addSource - Attach a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer08<>(...)). See connectors for more details.

上に戻る

データのsink


Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOutputFormat - 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。

  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。

  • print() / printToErr() - 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。This can help to distinguish between different calls to print. 並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。

  • writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. 独自のオブジェクトからバイトへの変換をサポートします。

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.


Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOutputFormat - 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。

  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。

  • print() / printToErr() - 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。This can help to distinguish between different calls to print. 並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。

  • writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. 独自のオブジェクトからバイトへの変換をサポートします。

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Note that the write*() methods on DataStream are mainly intended for debugging purposes. They are not participating in Flink’s checkpointing, this means these functions usually have at-least-once semantics. The data flushing to the target system depends on the implementation of the OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up in the target system. Also, in failure cases, those records might be lost.

For reliable, exactly-once delivery of a stream into a file system, use the flink-connector-filesystem. Also, custom implementations through the .addSink(...) method can participate in Flink’s checkpointing for exactly-once semantics.

上に戻る

イテレーション


Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example using filters. First, we define an IterativeStream

IterativeStream<Integer> iteration = input.iterate();

Then, we specify the logic that will be executed inside the loop using a series of transformations (here a simple map transformation)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the stream that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});


Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.

val iteratedStream = someDataStream.iterate(
  iteration => {
    val iterationBody = iteration.map(/* this is executed many times */)
    (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
})

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate(
  iteration => {
    val minusOne = iteration.map( v => v - 1)
    val stillGreaterThanZero = minusOne.filter (_ > 0)
    val lessThanZero = minusOne.filter(_ <= 0)
    (stillGreaterThanZero, lessThanZero)
  }
)

上に戻る

実行パラメータ

The StreamExecutionEnvironment contains the ExecutionConfig which allows to set job specific configuration values for the runtime.

Please refer to execution configuration for an explanation of most parameters. These parameters pertain specifically to the DataStream API:

  • enableTimestamps() / disableTimestamps(): Attach a timestamp to each event emitted from a source. areTimestampsEnabled() returns the current value.

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value with long getAutoWatermarkInterval()

上に戻る

耐障害性

State & Checkpointing describes how to enable and configure Flink’s checkpointing mechanism.

レンテンシの制御

By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can use env.setBufferTimeout(timeoutMillis) on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.

使い方:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)

env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

To maximize throughput, set setBufferTimeout(-1) which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.

上に戻る

デバッギング

Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. 従って、データ解析プログラムの実装は通常は結果の調査、デバッグ、そして改善です。

Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. この章ではFlinkプログラムの開発をどうやって簡単にするかのいくつかのヒントを与えます。

ローカルの実行環境

A LocalStreamEnvironment starts a Flink system within the same JVM process it was created in. IDEから LocalEnvironment を開始する場合、コード内にブレークポイントを設定し、簡単にプログラムをデバッグすることができます。

LocalEnvironment は以下のように生成され利用されます:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */)
// build your program

env.execute()

コレクション データ ソース

Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.

データソースのコレクションは以下のように使うことができます:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意: 現在のところ、データソースのコレクションはSerializableを実装するデータタイプとイテレータを必要とします。更に、データソースのコレクションは並行 ( 並行度 = 1)で実行することができません。

イテレータ データ シンク

Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows:

import org.apache.flink.contrib.streaming.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
import org.apache.flink.contrib.streaming.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter

val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala

上に戻る

TOP
inserted by FC2 system