This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
ユーザ定義関数 #
ほとんどのオペレーションはユーザ定義関数を必要とします。このセクションでは、それらを指定する様々な方法をリストします。Flinkアプリケーションについて洞察を得るために使えるAccumulators
についても説明します。
インタフェースの実装 #
ほとんどの基本的な方法は提供されたインタフェースの1つを実装することです:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
}
data.map(new MyMapFunction());
匿名クラス #
関数を匿名クラスとして渡すことができます:
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas #
FlinkはJava API内でJava 8 Lambdas もサポートします。
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rich 関数 #
ユーザ定義関数を必要とする全ての変換は、代わりに引数としてrich関数を受け取ります。例えば、以下の代わりに
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
}
以下のように書くことができます
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
}
そして、通常通りに関数をmap
変換に渡します:
data.map(new MyMapFunction());
Rich 関数は匿名クラスとして定義することもできます:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
Lambda 関数 #
前の例で既に説明したように、全てのオペレーションはオペレーションを表現するためのlambda関数を受け取ります:
val data: DataStream[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataStream[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
Rich 関数 #
lambda関数が引数として受け取る全ての変換は、代わりにrich関数を受け取ります。例えば、以下の代わりに
data.map { x => x.toInt }
以下のように書くことができます
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String): Int = in.toInt
}
and pass the function to a map
transformation:
data.map(new MyMapFunction())
Rich 関数は匿名クラスとして定義することもできます:
data.map (new RichMapFunction[String, Int] {
def map(in: String): Int = in.toInt
})
アキュムレータ & カウンタ #
Accumulatorsはadd operationとfinal accumulated resultを備えた単純な構造で、ジョブの終了後に利用可能になります。
最も単純なaccumulatorはcounterです: Accumulator.add(V value)
メソッドを使ってインクリメントできます。ジョブの最後にFlinkは全ての部分結果を合計(マージ)し、結果をクライアントに送信します。Accumulatorsはデバッグ中や、データについてすぐ詳しく知りたい場合に役立ちます。
Flinkには現在以下の組み込みaccumulatorsがあります。それぞれは Accumulator インタフェースを実装します。
- IntCounter 、 LongCounter 、 DoubleCounter : カウンタを使う例については以下を参照してください。
- Histogram : A histogram implementation for a discrete number of bins. 内部的には、これは整数から整数への単なるマップです。これを使って、値の分布、例えば単語数プログラムの行当たりの単語数の分布、を計算できます。
accumulatorsの使用方法:
まず、使いたいユーザ定義の変換関数でaccumulatorオブジェクト(ここではカウンター)を作成する必要があります。
private IntCounter numLines = new IntCounter();
次に、accumulatorオブジェクトを登録する必要があります。通常はrichのopen()
メソッドで登録します。ここで名前も定義します。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
open()
やclose()
メソッドなど、オペレータ関数のどこでもaccumulatorを使えるようになりました。
this.numLines.add(1);
全体的な結果は実行環境のexecute()
メソッドから返されるJobExecutionResult
オブジェクトに保存されます(現在、これは実行がジョブの完了を待つ場合にのみ機能します)。
myJobExecutionResult.getAccumulatorResult("num-lines");
全てのアキュムレータはジョブごとに1つの名前空間を共有します。従って、ジョブの異なるオペレータ関数で同じaccumulatorを実行できます。Flinkは同じ名前を持つ全てのaccumulatorを内部的にマージします。
accumulatorとイテレーションでの注意: 現在のところaccumulatorの結果は全てのジョブが終了した後でのみ利用可能です。前回のイテレーションの結果も次のイテレーションで利用できるようにする予定です。 next iteration. Aggregators を使ってイテレーションごとの統計を計算し、そのような統計に基づいてイテレーションの中止を決定できます。
独自のaccumulators:
独自のaccumulatorを実装するには、Accumulatorインタフェースの実装を書くだけです。あなたの独自のaccumulatorがFlinkに同梱されるべきだと思う場合は、遠慮なくプルリクエストを作成してください。 with Flink.
Accumulator または SimpleAccumulator のどちらかを実装するかを選択できます。
Accumulator<V,R>
は最も柔軟です: 追加する値の型V
と、最終結果の結果型R
を定義します。例えば、ヒストグラムの場合、V
は数値で、R
はヒストグラムです。SimpleAccumulator
は両方の型が同じ場合に使います。例えば、カウンター。