基本的なAPIの概念

Flink プログラムは分散されたコレクション上の変換を実装する一般的なプログラムです(例えば、フィルタリング、マッピング、状態の更新、ジョイニング、グルーピング、ウィンドウの定義、集約)。コレクションは最初にソースから生成されます(例えば、ファイル、kafkaのトピック、あるいはローカル、メモリ内のコレクションからの読み込み)。結果はsinkを使って返されます。これは例えば(分散された)ファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。

データソースの種類、つまりソースが有限か無限か、に依存して、データソースAPIがバッチのために使われ、データストリームAPIがストリーミングのために使われている場所で、プログラムあるいはストリーミングプログラムを書きたいかも知れません。このガイドは両方のAPIに共通な基本的な概念を紹介しますが、それぞれのAPIを使ったプログラムの書き方の完全な情報はストリーミングガイドバッチガイド を見てください。

注意: APIがどのように使われるかの実際の例を見せる場合は、StreamingExecutionEnvironmentDataStream APIを使うつもりです。概念はDataSet APIと完全に同じで、ExecutionEnvironment および DataSetを単に置き換えます。

データセットとデータストリーム

Flink はプログラム内でデータを表現するための特別なクラスDataSetDataStream を持ちます。重複を含めることができる不変のコレクションとしてそれらを考えることができます。DataSetの場合データは有限であり、一方でDataStreamについては、要素の数は無制限でありえます。

これらのコレクションは幾つかの主要な方法において、通常のJavaコレクションと異なります。まず、それらは不変です。つまり、一度それらが生成されると要素を追加あるいは削除することができません。単純に要素の内部を調べることもできません。

コレクションは最初にFlinkプログラム内でソースを追加することで生成され、変換後に map, filterなどのようなAPIを使ってこれらから新しいコレクションが引き出されます。

Flinkのプログラムはデータのコレクションを変換する通常のプログラムのように見えます。各プログラムは同じ基本的な部分からできています:

  1. 実行環境の取得、
  2. 初期データのロード/生成、
  3. このデータの変形を指定、
  4. 計算結果を出力する場所を指定、
  5. プログラムの実行を引き起こします。

ここでそれぞれのステップの概要を示すつもりです。詳細はそれぞれの章を参照してください。JavaデータセットAPIの全ての主要なクラスはorg.apache.flink.api.javaパッケージ内で見つかり、一方でJavaデータストリームAPIはorg.apache.flink.streaming.apiで見つかることに注意してください。

StreamExecutionEnvironment は全てのFlinkプログラムの基本です。StreamExecutionEnvironment上でこれらの静的メソッドを使って取得することができます:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

getExecutionEnvironment()はコンテキストに応じて正しく動作するので、一般的にこれを使うだけでよいです: IDE内でプログラムを実行するか通常のJavaプログラムとして実行する場合は、ローカルマシーン上でプログラムを実行するローカル環境を作成するでしょう。プログラムからJARファイルを作成し、コマンドラインから起動する場合は、Flinkクラスタマネージャーはメインメソッドを実行し、getExecutionEnvironment()はクラスタ上でプログラムを実行するための実行環境を返すでしょう。

データソースを指定するために、実行環境は様々なメソッドを使ってファイルから読み込む幾つかのメソッドを持ちます: CSVファイルとして行ごとを読み込むだけ、あるいは完全に独自のデータ入力フォーマットを使って読むことができます。連続する行のテキストファイルを単に読むには、以下を使うことができます:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

これは新しく派生したデータストリームを生成するために変換を適用することができるデータストリームが得られるでしょう。

変換関数を持つデータストリーム上でメソッドを呼び出すことで変換を適用することができます。例えば、map変換はこのようになります:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

これは元のコレクション内の各文字列を数値に変換することで新しいデータストリームを生成するでしょう。

最終的な結果を含むデータストリームを一旦取得すると、sinkを生成することでシステム外に書き出すことができます。以下はsinkを生成するメソッドのいくつかの例です:

writeAsText(String path)

print()

ここでそれぞれのステップの概要を示すつもりです。詳細はそれぞれの章を参照してください。ScalaデータセットAPIの全ての主要なクラスは org.apache.flink.api.scalaパッケージ内で見つかり、一方でScalaデータストリームAPIのクラスは org.apache.flink.streaming.api.scalaで見つかることに注意してください。

StreamExecutionEnvironment は全てのFlinkプログラムの基本です。StreamExecutionEnvironment上でこれらの静的メソッドを使って取得することができます:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

getExecutionEnvironment()はコンテキストに応じて正しく動作するので、一般的にこれを使うだけでよいです: IDE内でプログラムを実行するか通常のJavaプログラムとして実行する場合は、ローカルマシーン上でプログラムを実行するローカル環境を作成するでしょう。プログラムからJARファイルを作成し、コマンドラインから起動する場合は、Flinkクラスタマネージャーはメインメソッドを実行し、getExecutionEnvironment()はクラスタ上でプログラムを実行するための実行環境を返すでしょう。

データソースを指定するために、実行環境は様々なメソッドを使ってファイルから読み込む幾つかのメソッドを持ちます: CSVファイルとして行ごとを読み込むだけ、あるいは完全に独自のデータ入力フォーマットを使って読むことができます。連続する行のテキストファイルを単に読むには、以下を使うことができます:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

これは新しく派生したデータストリームを生成するために変換を適用することができるデータストリームが得られるでしょう。

変換関数を持つデータセット上でメソッドを呼び出すことで変換を適用することができます。例えば、map変換はこのようになります:

val input: DataSet[String] = ...

val mapped = input.map { x => x.toInt }

これは元のコレクション内の各文字列を数値に変換することで新しいデータストリームを生成するでしょう。

最終的な結果を含むデータストリームを一旦取得すると、sinkを生成することでシステム外に書き出すことができます。以下はsinkを生成するメソッドのいくつかの例です:

writeAsText(path: String)

print()

一旦終了したプログラムを指定した場合、 StreamExecutionEnvironment上でexecute()を呼び出すことでプログラムの実行を引き起こす必要があります。ExecutionEnvironment の型に依存して、ローカルマシーン上で実行が引き起こされるか、あるいはクラスタ上での実行のためにプログラムをサブミットするかをするでしょう。

execute() メソッドはJobExecutionResultを返し、これは実行時間とアキュムレイターの結果を含みます。

ストリーミングデータソースとシンクについての情報と、データストリーム上でサポートされる変換についての更に詳しい情報は、ストリーミング ガイド を見てください。

バッチデータソースとシンクについての情報と、データセット上でサポートされる変換についての更に詳しい情報は、バッチ ガイド を調べてください。

上に戻る

ゆるい評価

全てのFlinkプログラムは怠惰に実行されます: プログラムのメインメソッドが実行される時にデータのロードと変換は直接起こりません。むしろ、各操作は生成され、プログラムの計画に追加されます。操作は、実行環境でexecute()の呼び出しによって明示的に引き起こされた時に実際に実行されます。プログラムがローカルで実行されるかクラスタ上で実行されるかは、実行環境の種類に依ります。

The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.

上に戻る

キーの指定

幾つかの変換 (join, coGroup, keyBy, groupBy) は要素のコレクション上でキーが定義されることを必要とします。他の変換 (Reduce, GroupReduce, Aggregate, Windows) は変換が適用される前にデータをキーでグループ化することができます。

データセットは以下のようにグループ化されます

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

一方でキーは下記を使ってデータストリーム上で指定することができます

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

Flinkのデータモデルはキー-値ペアに基づいていません。従って、データセットの型を物理的にキーと値にまとめる必要はありません。キーは“virtual”です: それらはグループ化のオペレータを導くために実際のデータ上で関数として定義されています。

注意: 以下の議論では、DataStream API とkeyByを使う予定です。データセットAPIについては、DataSet を単に groupByに置き換える必要があります。

タプルのためのキーの定義

もっとも単純な場合は、タプルの1つ以上のフィールド上にタプルをグループ化します:

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)

タプルは最初のフィールド上(Integer型の一つ)にグループ化されます。

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

ここで、タプルを最初と二つ目のフィールドから成る合成キー上にタプルをグループ化します。

入れ子のタプルでの注意: 以下のような入れ子のタプルを持つデータストリームを持つ場合:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

keyBy(0)を指定すると、システムは完全なTuple2 をキー(IntegerとFloatがキーとなります)として使うでしょう。入れ子になったTuple2に “navigate” を入れたい場合は、以下で説明されるフィールド表現キーを使う必要があります。

フィールド表現を使ったキーの定義

入れ子のフィールドを参照し、グルーピング、ソーティング、ジョイニングあるいはcoグルーピングのキーを設定するために、文字ベースのフィールドの表現を使うことができます。

フィールド表現によりTuple および POJO 型のような(入れ子になった)合成型でのフィールドの選択を簡単にすることができます。

以下の例では、二つのフィールド“word” と “count” を持つWC POJO があるとします。フィールドwordでグループ化するには、単にその名前をkeyBy() 関数に渡せば良いです。

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

フィールド表現構文:

  • フィールド名を使ってPOJOフィールドを選択します。例えば、"user" はPOJO型の “user” フィールドを参照します。

  • それらのフィールド名あるいは0オフセットのフィールドインデックスを使ってタプルのフィールドを選択します。例えば、"f0""5" はそれぞれJavaのタプル型の1番目および6番目のフィールドを参照します。

  • POJOおよびタプル内で入れ子になったフィールドを選択することができます。例えば、"user.zip" はPOJO型の“user”フィールド内に格納されているPOJOの “zip” フィールドを参照します。"f1.user.zip" あるいは "user.f3.1.zip"のような任意のPOJOとタプルの入れ子および混合がサポートされます。

  • "*" ワイルドカード表現を使って全ての方を選択することができます。これはタプルあるいはPOJO型では無い型についても動作します。

フィールド表現の例:

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}

これらは上のコードの例のための有効なフィールド表現です:

  • "count": WC クラス内のcountフィールド。

  • "complex": POJO型ComplexNestedClassの複合フィールドの全てのフィールドを再帰的に選択します。

  • "complex.word.f2": 入れ子になったTuple3の最後のフィールドを選択します。

  • "complex.hadoopCitizen": Hadoop IntWritable 型を選択します。

以下の例では、二つのフィールド“word” と “count” を持つWC POJO があるとします。フィールドwordでグループ化するには、単にその名前をkeyBy() 関数に渡せば良いです。

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
  def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

フィールド表現構文:

  • フィールド名を使ってPOJOフィールドを選択します。例えば、"user" はPOJO型の “user” フィールドを参照します。

  • 1オフセットのフィールド名あるいは0オフセットのフィールドインデックスによってタプルのフィールドを選択します。例えば、"_1""5" はそれぞれScalaのタプル型の1番目および6番目のフィールドを参照します。

  • POJOおよびタプル内で入れ子になったフィールドを選択することができます。例えば、"user.zip" はPOJO型の“user”フィールド内に格納されているPOJOの “zip” フィールドを参照します。"_2.user.zip" あるいは "user._4.1.zip"のような任意のPOJOとタプルの入れ子および混合がサポートされます。

  • "_" ワイルドカード表現を使って全ての方を選択することができます。これはタプルあるいはPOJO型では無い型についても動作します。

フィールド表現の例:

class WC(var complex: ComplexNestedClass, var count: Int) {
  def this() { this(null, 0) }
}

class ComplexNestedClass(
    var someNumber: Int,
    someFloat: Float,
    word: (Long, Long, String),
    hadoopCitizen: IntWritable) {
  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

これらは上のコードの例のための有効なフィールド表現です:

  • "count": WC クラス内のcountフィールド。

  • "complex": POJO型ComplexNestedClassの複合フィールドの全てのフィールドを再帰的に選択します。

  • "complex.word._3": 入れ子になったTuple3の最後のフィールドを選択します。

  • "complex.hadoopCitizen": Hadoop IntWritable 型を選択します。

キー セレクター関数を使ってキーを定義します

キーを定義する別の方法が “key selector” 関数です。key selector 関数は入力として1つの要素を取り、要素のためのキーを返します。キーは任意の型で任意の計算によって導かれます。

以下の例は、単にオブジェクトのフィールドを返す key selector 関数の例を示します。

// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words
  .keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
   });
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )

上に戻る

変換関数の指定

ほとんどの変換はユーザ定義の関数を必要とします。この章ではそれらがどのように指定することができるかの別のやり方をリスト化します

インタフェースの実装

ほとんどの基本的な方法は提供されたインタフェースの1つを実装することです:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});
data.map(new MyMapFunction());

匿名クラス

関数を匿名クラスとして渡すことができます:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Java 8 Lambdas

FlinkはJava API内でJava 8 Lambdas もサポートします。Java 8 ガイドを見てください。

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

Rich 関数

ユーザ定義関数を必要とする全ての変換は、代わりに引数としてrich 関数を取ることができます。例えば、以下の代わりに

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});

以下のように書くことができます

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});

そして、いつものように関数をmap 変換に渡します。

data.map(new MyMapFunction());

Rich 関数は匿名クラスとして定義することもできます:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Lambda 関数

既に前の例で見たように、全てのオペレータはオペレーションを表現するためにLambda関数を受け付けます:

val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }

Rich 関数

引数としてlambda関数を取る全ての変換は、代わりに引数として rich 関数を取ることができます。例えば、以下の代わりに

data.map { x => x.toInt }

以下のように書くことができます

class MyMapFunction extends RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
})

そして、関数をmap 変換に渡します:

data.map(new MyMapFunction())

Rich 関数は匿名クラスとして定義することもできます:

data.map (new RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
})

ユーザ定義の関数 (map, reduce, etc) に加えて、Rich関数は4つのメソッドを提供します: open, close, getRuntimeContext および setRuntimeContext。これらは、関数をパラメータ化 (関数にパラメータを渡す)、ローカル状態の作成と終了、ブロードキャスト変数へのアクセス (Broadcast Variablesを見てください)に便利で、accumulators と counters (Accumulators と Countersを見てください)のようなランタイム情報へのアクセス、およびイテレーション(Iterationsを見てください)の情報にアクセスするのに便利です。

上に戻る

サポートされるデータ型

Flink はデータセットあるいはデータストリーム内にある要素の型についていくつかの制限を設けます。この理由は、システムが効率的な実行ストラテジを決定するために型を解析するからです。

6つの異なるデータ型のカテゴリがあります:

  1. Java タプルScala Case Classes
  2. Java POJOs
  3. Primitive 型
  4. Regular クラス
  5. Hadoop Writables
  6. 特別型

タプルと Case クラス

タプルは様々な型を持つ固定数のフィールドを含む複合型です。Java API はTuple1 から Tuple25までのクラスを提供します。タプルの各フィールドは、入れ子のタプルによる更なるタプルを含む任意のFlink型かも知れません。タプルのフィールドはtuple.f4のようなフィールド名を直接使ってアクセスするか、一般的なgetterメソッドtuple.getField(int position)を使ってアクセスすることができます。フィールドのインデックスは0から始まります。これはScalaのタプルとは対照的ですが、Javaの一般的なインデックスとより矛盾しません。

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(0); // also valid .keyBy("f0")

Scalaのcaseクラス (および caseクラスの特別な場合のScalaのタプル) は、様々な型を持つ固定数のフィールドを含む複合型です。タプルのフィールドは、最初のフィールドのための_1 のような1オフセットの名前によって扱われます。Case クラスのフィールドはそれらの名前によってアクセスされます。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(0, 1) // key by field positions 0 and 1

POJOs

Java と Scala クラスはFlinkによって、もしそれらが以下の要求を満たす場合は、特別なPOJOデータ型として扱われます。

  • クラスがpublicでなければなりません。

  • 引数無しのpublicコンストラクタ(デフォルトのコンストラクタ)を持たなければなりません。

  • 全てのフィールドはそれぞれpublic、あるいはgetterおよびsetter関数によってアクセス可能でなければなりません。foo という名前のフィールドについて、getterおよびsetterメソッドは getFoo() および setFoo()という名前でなければなりません。

  • フィールドの型がFlinkによってサポートされていなければなりません。現時点では、Flinkは(Dateのような)任意のオブジェクトをシリアライズ化するために Avroを使用します。

Flink はPOJO型の構造を解析します。つまり、POJOのフィールドについて確認します。結果的に、POJO型は一般的な型ようりも扱い易いです。更に、Flinkは一般的な型よりPOJOをもっと効率的に処理することができます。

以下の例は2つのpublicフィールドを持つ単純なPOJOを示します。

public class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

wordCounts.keyBy("word"); // key by field expression "word"
class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}

val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

Primitive 型

Flink はInteger, String および Double のような全てのJavaおよびScalaのprimitive型をサポートします。

一般的なクラスの型

Flink はほとんどのJavaおよびScalaのクラスをサポートします (APIおよびカスタム)。ファイルポインタ、I/Oストリーム、あるいは他のネイティブなリソースのようなシリアライズできないフィールドを含むクラスに、制限が適用されます。Java Beansの慣習に従うクラスは一般的にうまく動作します。

POJO型として認識されないすべてのクラス (上記のPOJOの要求を見てください)は、一般的なクラス型としてFlinkによって扱われます。Flink はこれらのデータ型をブラックボックスとして扱い、それらの内容(つまり、効率的なソート)にアクセスすることができません。一般的な型はシリアライズ フレームワーク Kryo を使ってデシリアライズ/シリアライズされます。

Value 型は手動でシリアライズ化とデシリアライズ化を表現します。一般的な目的のシリアライズ化フレームワークを通るのでは無く、それらはメソッドreadwriteを持つorg.apache.flinktypes.Valueインタフェースを実装することで、それらのオペレーションに独自のコードを提供します。一般的な目的のシリアライズ化がとても非効率な場合は、Value型を使うことは意味があります。要素のsparseベクトルを配列として実装するデータ型がその例です。一般的な目的のシリアライズ化は単純に全ての配列の要素を書きますが、配列のほとんどがゼロであることを知ることで、非ゼロの要素についての特別な符号化を使うことができます。

org.apache.flinktypes.CopyableValue インタフェースは似た方法の手動の内部クローンロジックをサポートします。

Flink は基本的なデータ型に対応するあらかじめ定義されたValue型を同梱しています。(ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue). これらのValue型は基本的なデータ型の変更可能な変化として振る舞います: プログラマはオブジェクトを再利用しガベージコレクションを取り除きながら、それらの値を変更することができます。

Hadoop Writables

org.apache.hadoop.Writable インタフェースを実装する型を使うことができます。write()readFields() メソッドの中で定義されたシリアライズ化ロジックがシリアライズ化のために使われるでしょう。

特別型

Scala の Either, Option および Tryを含む、特別な型を使う事ができます。Java API はEitherの独自の実装を持ちます。Scala の Eitherと同様に、Left あるいは Rightの二つの取りうる型の値を表します。Either は、エラーハンドリング、あるいは二つの異なる型のレコードを出力する必要があるオペレータで使うことができます。

型の抹消 & 型の推論

注意: この章はJavaにのみ関連します。

Javaのコンパイラはコンパイルの後で一般型の情報のほとんどを捨てます。これはJavaでの型の抹消として知られています。実行時にオブジェクトのインスタンスは一般型をもう知らないことを意味します。例えば、DataStream<String>DataStream<Long> はJVMにとって同じに見えます。

Flinkは実行のためにプログラムを準備している時(プログラムのメインメソッドが呼ばれる時)に、型の情報を必要とします。The Flink Java API tries to reconstruct the type information that was thrown away in various ways and store it explicitly in the data sets and operators. You can retrieve the type via DataStream.getType(). The method returns an instance of TypeInformation, which is Flink’s internal way of representing types.

The type inference has its limits and needs the “cooperation” of the programmer in some cases. Examples for that are methods that create data sets from collections, such as ExecutionEnvironment.fromCollection(), where you can pass an argument that describes the type. But also generic functions like MapFunction<I, O> may need extra type information.

The ResultTypeQueryable interface can be implemented by input formats and functions to tell the API explicitly about their return type. The input types that the functions are invoked with can usually be inferred by the result types of the previous operations.

上に戻る

Accumulators & Counters

Accumulators are simple constructs with an add operation and a final accumulated result, which is available after the job ended.

The most straightforward accumulator is a counter: You can increment it using the Accumulator.add(V value) method. At the end of the job Flink will sum up (merge) all partial results and send the result to the client. Accumulators are useful during debugging or if you quickly want to find out more about your data.

Flink currently has the following built-in accumulators. Each of them implements the Accumulator interface.

  • IntCounter, LongCounter and DoubleCounter: See below for an example using a counter.
  • Histogram: A histogram implementation for a discrete number of bins. Internally it is just a map from Integer to Integer. You can use this to compute distributions of values, e.g. the distribution of words-per-line for a word count program.

How to use accumulators:

First you have to create an accumulator object (here a counter) in the user-defined transformation function where you want to use it.

private IntCounter numLines = new IntCounter();

Second you have to register the accumulator object, typically in the open() method of the rich function. Here you also define the name.

getRuntimeContext().addAccumulator("num-lines", this.numLines);

You can now use the accumulator anywhere in the operator function, including in the open() and close() methods.

this.numLines.add(1);

The overall result will be stored in the JobExecutionResult object which is returned from the execute() method of the execution environment (currently this only works if the execution waits for the completion of the job).

myJobExecutionResult.getAccumulatorResult("num-lines")

All accumulators share a single namespace per job. Thus you can use the same accumulator in different operator functions of your job. Flink will internally merge all accumulators with the same name.

A note on accumulators and iterations: Currently the result of accumulators is only available after the overall job has ended. We plan to also make the result of the previous iteration available in the next iteration. You can use Aggregators to compute per-iteration statistics and base the termination of iterations on such statistics.

Custom accumulators:

To implement your own accumulator you simply have to write your implementation of the Accumulator interface. Feel free to create a pull request if you think your custom accumulator should be shipped with Flink.

You have the choice to implement either Accumulator or SimpleAccumulator.

Accumulator<V,R> is most flexible: It defines a type V for the value to add, and a result type R for the final result. E.g. for a histogram, V is a number and R is a histogram. SimpleAccumulator is for the cases where both types are the same, e.g. for counters.

上に戻る

TOP
inserted by FC2 system