Flink プログラムは分散されたコレクション上の変換を実装する一般的なプログラムです(例えば、フィルタリング、マッピング、状態の更新、ジョイニング、グルーピング、ウィンドウの定義、集約)。コレクションは最初にソースから生成されます(例えば、ファイル、kafkaのトピック、あるいはローカル、メモリ内のコレクションからの読み込み)。結果はsinkを使って返されます。これは例えば(分散された)ファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。
データソースの種類、つまりソースが有限か無限か、に依存して、データソースAPIがバッチのために使われ、データストリームAPIがストリーミングのために使われている場所で、プログラムあるいはストリーミングプログラムを書きたいかも知れません。このガイドは両方のAPIに共通な基本的な概念を紹介しますが、それぞれのAPIを使ったプログラムの書き方の完全な情報はストリーミングガイド と バッチガイド を見てください。
注意: APIがどのように使われるかの実際の例を見せる場合は、StreamingExecutionEnvironment
と DataStream
APIを使うつもりです。概念はDataSet
APIと完全に同じで、ExecutionEnvironment
および DataSet
を単に置き換えます。
Flink はプログラム内でデータを表現するための特別なクラスDataSet
と DataStream
を持ちます。重複を含めることができる不変のコレクションとしてそれらを考えることができます。DataSet
の場合データは有限であり、一方でDataStream
については、要素の数は無制限でありえます。
これらのコレクションは幾つかの主要な方法において、通常のJavaコレクションと異なります。まず、それらは不変です。つまり、一度それらが生成されると要素を追加あるいは削除することができません。単純に要素の内部を調べることもできません。
コレクションは最初にFlinkプログラム内でソースを追加することで生成され、変換後に map
, filter
などのようなAPIを使ってこれらから新しいコレクションが引き出されます。
Flinkのプログラムはデータのコレクションを変換する通常のプログラムのように見えます。各プログラムは同じ基本的な部分からできています:
実行環境
の取得、ここでそれぞれのステップの概要を示すつもりです。詳細はそれぞれの章を参照してください。JavaデータセットAPIの全ての主要なクラスはorg.apache.flink.api.javaパッケージ内で見つかり、一方でJavaデータストリームAPIはorg.apache.flink.streaming.apiで見つかることに注意してください。
StreamExecutionEnvironment
は全てのFlinkプログラムの基本です。StreamExecutionEnvironment
上でこれらの静的メソッドを使って取得することができます:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
getExecutionEnvironment()
はコンテキストに応じて正しく動作するので、一般的にこれを使うだけでよいです: IDE内でプログラムを実行するか通常のJavaプログラムとして実行する場合は、ローカルマシーン上でプログラムを実行するローカル環境を作成するでしょう。プログラムからJARファイルを作成し、コマンドラインから起動する場合は、Flinkクラスタマネージャーはメインメソッドを実行し、getExecutionEnvironment()
はクラスタ上でプログラムを実行するための実行環境を返すでしょう。
データソースを指定するために、実行環境は様々なメソッドを使ってファイルから読み込む幾つかのメソッドを持ちます: CSVファイルとして行ごとを読み込むだけ、あるいは完全に独自のデータ入力フォーマットを使って読むことができます。連続する行のテキストファイルを単に読むには、以下を使うことができます:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
これは新しく派生したデータストリームを生成するために変換を適用することができるデータストリームが得られるでしょう。
変換関数を持つデータストリーム上でメソッドを呼び出すことで変換を適用することができます。例えば、map変換はこのようになります:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
これは元のコレクション内の各文字列を数値に変換することで新しいデータストリームを生成するでしょう。
最終的な結果を含むデータストリームを一旦取得すると、sinkを生成することでシステム外に書き出すことができます。以下はsinkを生成するメソッドのいくつかの例です:
writeAsText(String path)
print()
ここでそれぞれのステップの概要を示すつもりです。詳細はそれぞれの章を参照してください。ScalaデータセットAPIの全ての主要なクラスは org.apache.flink.api.scalaパッケージ内で見つかり、一方でScalaデータストリームAPIのクラスは org.apache.flink.streaming.api.scalaで見つかることに注意してください。
StreamExecutionEnvironment
は全てのFlinkプログラムの基本です。StreamExecutionEnvironment
上でこれらの静的メソッドを使って取得することができます:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
getExecutionEnvironment()
はコンテキストに応じて正しく動作するので、一般的にこれを使うだけでよいです: IDE内でプログラムを実行するか通常のJavaプログラムとして実行する場合は、ローカルマシーン上でプログラムを実行するローカル環境を作成するでしょう。プログラムからJARファイルを作成し、コマンドラインから起動する場合は、Flinkクラスタマネージャーはメインメソッドを実行し、getExecutionEnvironment()
はクラスタ上でプログラムを実行するための実行環境を返すでしょう。
データソースを指定するために、実行環境は様々なメソッドを使ってファイルから読み込む幾つかのメソッドを持ちます: CSVファイルとして行ごとを読み込むだけ、あるいは完全に独自のデータ入力フォーマットを使って読むことができます。連続する行のテキストファイルを単に読むには、以下を使うことができます:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
これは新しく派生したデータストリームを生成するために変換を適用することができるデータストリームが得られるでしょう。
変換関数を持つデータセット上でメソッドを呼び出すことで変換を適用することができます。例えば、map変換はこのようになります:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
これは元のコレクション内の各文字列を数値に変換することで新しいデータストリームを生成するでしょう。
最終的な結果を含むデータストリームを一旦取得すると、sinkを生成することでシステム外に書き出すことができます。以下はsinkを生成するメソッドのいくつかの例です:
writeAsText(path: String)
print()
一旦終了したプログラムを指定した場合、 StreamExecutionEnvironment
上でexecute()
を呼び出すことでプログラムの実行を引き起こす必要があります。ExecutionEnvironment
の型に依存して、ローカルマシーン上で実行が引き起こされるか、あるいはクラスタ上での実行のためにプログラムをサブミットするかをするでしょう。
execute()
メソッドはJobExecutionResult
を返し、これは実行時間とアキュムレイターの結果を含みます。
ストリーミングデータソースとシンクについての情報と、データストリーム上でサポートされる変換についての更に詳しい情報は、ストリーミング ガイド を見てください。
バッチデータソースとシンクについての情報と、データセット上でサポートされる変換についての更に詳しい情報は、バッチ ガイド を調べてください。
全てのFlinkプログラムは怠惰に実行されます: プログラムのメインメソッドが実行される時にデータのロードと変換は直接起こりません。むしろ、各操作は生成され、プログラムの計画に追加されます。操作は、実行環境でexecute()
の呼び出しによって明示的に引き起こされた時に実際に実行されます。プログラムがローカルで実行されるかクラスタ上で実行されるかは、実行環境の種類に依ります。
レイジー評価により、Flinkが1つの総合的な計画単位として実行する、洗練されたプログラムを構築することができます。
幾つかの変換 (join, coGroup, keyBy, groupBy) は要素のコレクション上でキーが定義されることを必要とします。他の変換 (Reduce, GroupReduce, Aggregate, Windows) は変換が適用される前にデータをキーでグループ化することができます。
データセットは以下のようにグループ化されます
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
一方でキーは下記を使ってデータストリーム上で指定することができます
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flinkのデータモデルはキー-値ペアに基づいていません。従って、データセットの型を物理的にキーと値にまとめる必要はありません。キーは“virtual”です: それらはグループ化のオペレータを導くために実際のデータ上で関数として定義されています。
注意: 以下の議論では、DataStream
API とkeyBy
を使う予定です。データセットAPIについては、DataSet
を単に groupBy
に置き換える必要があります。
もっとも単純な場合は、タプルの1つ以上のフィールド上にタプルをグループ化します:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
タプルは最初のフィールド上(Integer型の一つ)にグループ化されます。
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)
ここで、タプルを最初と二つ目のフィールドから成る合成キー上にタプルをグループ化します。
入れ子のタプルでの注意: 以下のような入れ子のタプルを持つデータストリームを持つ場合:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
keyBy(0)
を指定すると、システムは完全なTuple2
をキー(IntegerとFloatがキーとなります)として使うでしょう。入れ子になったTuple2
に “navigate” を入れたい場合は、以下で説明されるフィールド表現キーを使う必要があります。
入れ子のフィールドを参照し、グルーピング、ソーティング、ジョイニングあるいはcoグルーピングのキーを設定するために、文字ベースのフィールドの表現を使うことができます。
フィールド表現によりTuple および POJO 型のような(入れ子になった)合成型でのフィールドの選択を簡単にすることができます。
以下の例では、二つのフィールド“word” と “count” を持つWC
POJO があるとします。フィールドword
でグループ化するには、単にその名前をkeyBy()
関数に渡せば良いです。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
フィールド表現構文:
フィールド名を使ってPOJOフィールドを選択します。例えば、"user"
はPOJO型の “user” フィールドを参照します。
それらのフィールド名あるいは0オフセットのフィールドインデックスを使ってタプルのフィールドを選択します。例えば、"f0"
と "5"
はそれぞれJavaのタプル型の1番目および6番目のフィールドを参照します。
POJOおよびタプル内で入れ子になったフィールドを選択することができます。例えば、"user.zip"
はPOJO型の“user”フィールド内に格納されているPOJOの “zip” フィールドを参照します。"f1.user.zip"
あるいは "user.f3.1.zip"
のような任意のPOJOとタプルの入れ子および混合がサポートされます。
"*"
ワイルドカード表現を使って全ての方を選択することができます。これはタプルあるいはPOJO型では無い型についても動作します。
フィールド表現の例:
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
これらは上のコードの例のための有効なフィールド表現です:
"count"
: WC
クラス内のcountフィールド。
"complex"
: POJO型ComplexNestedClass
の複合フィールドの全てのフィールドを再帰的に選択します。
"complex.word.f2"
: 入れ子になったTuple3
の最後のフィールドを選択します。
"complex.hadoopCitizen"
: Hadoop IntWritable
型を選択します。
以下の例では、二つのフィールド“word” と “count” を持つWC
POJO があるとします。フィールドword
でグループ化するには、単にその名前をkeyBy()
関数に渡せば良いです。
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
フィールド表現構文:
フィールド名を使ってPOJOフィールドを選択します。例えば、"user"
はPOJO型の “user” フィールドを参照します。
1オフセットのフィールド名あるいは0オフセットのフィールドインデックスによってタプルのフィールドを選択します。例えば、"_1"
と "5"
はそれぞれScalaのタプル型の1番目および6番目のフィールドを参照します。
POJOおよびタプル内で入れ子になったフィールドを選択することができます。例えば、"user.zip"
はPOJO型の“user”フィールド内に格納されているPOJOの “zip” フィールドを参照します。"_2.user.zip"
あるいは "user._4.1.zip"
のような任意のPOJOとタプルの入れ子および混合がサポートされます。
"_"
ワイルドカード表現を使って全ての方を選択することができます。これはタプルあるいはPOJO型では無い型についても動作します。
フィールド表現の例:
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
これらは上のコードの例のための有効なフィールド表現です:
"count"
: WC
クラス内のcountフィールド。
"complex"
: POJO型ComplexNestedClass
の複合フィールドの全てのフィールドを再帰的に選択します。
"complex.word._3"
: 入れ子になったTuple3
の最後のフィールドを選択します。
"complex.hadoopCitizen"
: Hadoop IntWritable
型を選択します。
キーを定義する別の方法が “key selector” 関数です。key selector 関数は入力として1つの要素を取り、要素のためのキーを返します。キーは任意の型で任意の計算によって導かれます。
以下の例は、単にオブジェクトのフィールドを返す key selector 関数の例を示します。
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
ほとんどの変換はユーザ定義の関数を必要とします。この章ではそれらがどのように指定することができるかの別のやり方をリスト化します
ほとんどの基本的な方法は提供されたインタフェースの1つを実装することです:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
});
data.map(new MyMapFunction());
関数を匿名クラスとして渡すことができます:
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
FlinkはJava API内でJava 8 Lambdas もサポートします。Java 8 ガイドを見てください。
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
ユーザ定義関数を必要とする全ての変換は、代わりに引数としてrich 関数を取ることができます。例えば、以下の代わりに
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
});
以下のように書くことができます
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
});
そして、いつものように関数をmap
変換に渡します。
data.map(new MyMapFunction());
Rich 関数は匿名クラスとして定義することもできます:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
既に前の例で見たように、全てのオペレータはオペレーションを表現するためにLambda関数を受け付けます:
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
引数としてlambda関数を取る全ての変換は、代わりに引数として rich 関数を取ることができます。例えば、以下の代わりに
data.map { x => x.toInt }
以下のように書くことができます
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
そして、関数をmap
変換に渡します:
data.map(new MyMapFunction())
Rich 関数は匿名クラスとして定義することもできます:
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
ユーザ定義の関数 (map, reduce, etc) に加えて、Rich関数は4つのメソッドを提供します: open
, close
, getRuntimeContext
および setRuntimeContext
。これらは、関数をパラメータ化 (関数にパラメータを渡す)、ローカル状態の作成と終了、ブロードキャスト変数へのアクセス (Broadcast Variablesを見てください)に便利で、accumulators と counters (Accumulators と Countersを見てください)のようなランタイム情報へのアクセス、およびイテレーション(Iterationsを見てください)の情報にアクセスするのに便利です。
Flink はデータセットあるいはデータストリーム内にある要素の型についていくつかの制限を設けます。この理由は、システムが効率的な実行ストラテジを決定するために型を解析するからです。
6つの異なるデータ型のカテゴリがあります:
タプルは様々な型を持つ固定数のフィールドを含む複合型です。Java API はTuple1
から Tuple25
までのクラスを提供します。タプルの各フィールドは、入れ子のタプルによる更なるタプルを含む任意のFlink型かも知れません。タプルのフィールドはtuple.f4
のようなフィールド名を直接使ってアクセスするか、一般的なgetterメソッドtuple.getField(int position)
を使ってアクセスすることができます。フィールドのインデックスは0から始まります。これはScalaのタプルとは対照的ですが、Javaの一般的なインデックスとより矛盾しません。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
Scalaのcaseクラス (および caseクラスの特別な場合のScalaのタプル) は、様々な型を持つ固定数のフィールドを含む複合型です。タプルのフィールドは、最初のフィールドのための_1
のような1オフセットの名前によって扱われます。Case クラスのフィールドはそれらの名前によってアクセスされます。
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
Java と Scala クラスはFlinkによって、もしそれらが以下の要求を満たす場合は、特別なPOJOデータ型として扱われます。
クラスがpublicでなければなりません。
引数無しのpublicコンストラクタ(デフォルトのコンストラクタ)を持たなければなりません。
全てのフィールドはそれぞれpublic、あるいはgetterおよびsetter関数によってアクセス可能でなければなりません。foo
という名前のフィールドについて、getterおよびsetterメソッドは getFoo()
および setFoo()
という名前でなければなりません。
フィールドの型がFlinkによってサポートされていなければなりません。現時点では、Flinkは(Date
のような)任意のオブジェクトをシリアライズ化するために Avroを使用します。
Flink はPOJO型の構造を解析します。つまり、POJOのフィールドについて確認します。結果的に、POJO型は一般的な型ようりも扱い易いです。更に、Flinkは一般的な型よりPOJOをもっと効率的に処理することができます。
以下の例は2つのpublicフィールドを持つ単純なPOJOを示します。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
Flink はInteger
, String
および Double
のような全てのJavaおよびScalaのprimitive型をサポートします。
Flink はほとんどのJavaおよびScalaのクラスをサポートします (APIおよびカスタム)。ファイルポインタ、I/Oストリーム、あるいは他のネイティブなリソースのようなシリアライズできないフィールドを含むクラスに、制限が適用されます。Java Beansの慣習に従うクラスは一般的にうまく動作します。
POJO型として認識されないすべてのクラス (上記のPOJOの要求を見てください)は、一般的なクラス型としてFlinkによって扱われます。Flink はこれらのデータ型をブラックボックスとして扱い、それらの内容(つまり、効率的なソート)にアクセスすることができません。一般的な型はシリアライズ フレームワーク Kryo を使ってデシリアライズ/シリアライズされます。
Value 型は手動でシリアライズ化とデシリアライズ化を表現します。一般的な目的のシリアライズ化フレームワークを通るのでは無く、それらはメソッドread
とwrite
を持つorg.apache.flinktypes.Value
インタフェースを実装することで、それらのオペレーションに独自のコードを提供します。一般的な目的のシリアライズ化がとても非効率な場合は、Value型を使うことは意味があります。要素のsparseベクトルを配列として実装するデータ型がその例です。一般的な目的のシリアライズ化は単純に全ての配列の要素を書きますが、配列のほとんどがゼロであることを知ることで、非ゼロの要素についての特別な符号化を使うことができます。
org.apache.flinktypes.CopyableValue
インタフェースは似た方法の手動の内部クローンロジックをサポートします。
Flink は基本的なデータ型に対応するあらかじめ定義されたValue型を同梱しています。(ByteValue
, ShortValue
, IntValue
, LongValue
, FloatValue
, DoubleValue
, StringValue
, CharValue
, BooleanValue
). これらのValue型は基本的なデータ型の変更可能な変化として振る舞います: プログラマはオブジェクトを再利用しガベージコレクションを取り除きながら、それらの値を変更することができます。
org.apache.hadoop.Writable
インタフェースを実装する型を使うことができます。write()
と readFields()
メソッドの中で定義されたシリアライズ化ロジックがシリアライズ化のために使われるでしょう。
Scala の Either
, Option
および Try
を含む、特別な型を使う事ができます。Java API はEither
の独自の実装を持ちます。Scala の Either
と同様に、Left あるいは Rightの二つの取りうる型の値を表します。Either
は、エラーハンドリング、あるいは二つの異なる型のレコードを出力する必要があるオペレータで使うことができます。
注意: この章はJavaにのみ関連します。
Javaのコンパイラはコンパイルの後で一般型の情報のほとんどを捨てます。これはJavaでの型の抹消として知られています。実行時にオブジェクトのインスタンスは一般型をもう知らないことを意味します。例えば、DataStream<String>
と DataStream<Long>
はJVMにとって同じに見えます。
Flinkは実行のためにプログラムを準備している時(プログラムのメインメソッドが呼ばれる時)に、型の情報を必要とします。FlinkのJava APIは捨てられた型の情報を様々な方法で再構築しようとし、明示的にデータセットとオペレータの中に格納しようとします。DataStream.getType()
を使って型を取り出すことができます。メソッドはTypeInformation
を返し、これはFlinkの型の内部表現形式です。
型の推論には制限があり、いくつかの場合でプログラマの“協力”を必要とします。それらの例としては、ExecutionEnvironment.fromCollection()
のようなコレクションからデータセットを生成するメソッドがそうで、型を説明する引数を渡すことができます。しかし、MapFunction<I, O>
のような一般的な関数は特別な型情報を必要とするかも知れません。
返す型について明示的にAPIに入力フォーマットと関数を伝えるためにResultTypeQueryable インタフェースを実装することができます。関数が一緒に呼び出される入力の型は通常以前のオペレータの結果の型によって類推することができます。
アキュムレータは単純にadd オペレータ と 最終的に集約された結果で構築されます。これはジョブが終了した後で利用可能です。
最も単純なアキュムレータはcounterです: Accumulator.add(V value)
メソッドを使ってその数を増やすことができます。ジョブの最後でFlinkは全ての部分結果を合計(マージ)し、結果をクライアントに送信します。アキュムレータはデバッグ中あるいは素早くデータについてより詳しく調べたい場合に便利です。
Flink は現在のところ以下の組み込みのアキュムレータを持ちます。それぞれはAccumulatorインタフェースを実装します。
アキュムレータの使い方:
最初に使いたいユーザ定義変換関数の中にアキュムレータオブジェクト(ここではカウンタ)を作成する必要があります。
private IntCounter numLines = new IntCounter();
次に、アキュムレータをオブジェクトを一般的にはrich関数のopen()
メソッドの中に登録する必要があります。ここで名前も定義します。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
これでアキュムレータをopen()
および close()
メソッドを含む任意のオペレータ関数の中で使うことができます。
this.numLines.add(1);
全体の結果は実行環境のexecute()
メソッドから返されるJobExecutionResult
オブジェクトの中に格納されるでしょう (現在のところ、これは実行がジョブの終了を待つ場合のみ動作します)。
myJobExecutionResult.getAccumulatorResult("num-lines")
全てのアキュムレータはジョブごとに1つの名前空間を共有します。従って同じアキュムレータをジョブの異なるオペレータ関数の中で使うことができます。Flink は内部的に全てのアキュムレータを同じ名前でマージするでしょう。
アキュムレータとイテレーションでの注意: 現在のところアキュムレータの結果は全てのジョブが終了した後でのみ利用可能です。前回のイテレーションの結果を次のイテレーションで利用可能にすることも計画しています。You can use Aggregators to compute per-iteration statistics and base the termination of iterations on such statistics.
独自のアキュムレータ:
独自のアキュムレータを実装するには、単純にアキュムレータのインタフェースの実装を書く必要があります。あなたの独自のアキュムレータがFlinkと同梱されるべきだと思う場合は、遠慮なくプルリクエストを作成してください。
Accumulator あるいは SimpleAccumulatorのどちらかを実装することを選択できます。
Accumulator<V,R>
が最も柔軟です: それは、addする値について型V
を、最終的な結果について結果の型R
を定義します。例えば、ヒストグラムについて、V
は数値で、R
はヒストグラムです。SimpleAccumulator
は両方の型が同じである場合のためのものです。例えばカウンタ。