User-Defined Functions
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ユーザ定義関数 #

ほとんどのオペレーションはユーザ定義関数を必要とします。このセクションでは、それらを指定する様々な方法をリストします。Flinkアプリケーションについて洞察を得るために使えるAccumulatorsについても説明します。

インタフェースの実装 #

ほとんどの基本的な方法は提供されたインタフェースの1つを実装することです:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}
data.map(new MyMapFunction());

匿名クラス #

関数を匿名クラスとして渡すことができます:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Java 8 Lambdas #

FlinkはJava API内でJava 8 Lambdas もサポートします。

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

Rich 関数 #

ユーザ定義関数を必要とする全ての変換は、代わりに引数としてrich関数を受け取ります。例えば、以下の代わりに

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}

以下のように書くことができます

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}

そして、通常通りに関数をmap変換に渡します:

data.map(new MyMapFunction());

Rich 関数は匿名クラスとして定義することもできます:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Lambda 関数 #

前の例で既に説明したように、全てのオペレーションはオペレーションを表現するためのlambda関数を受け取ります:

val data: DataStream[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataStream[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }

Rich 関数 #

lambda関数が引数として受け取る全ての変換は、代わりにrich関数を受け取ります。例えば、以下の代わりに

data.map { x => x.toInt }

以下のように書くことができます

class MyMapFunction extends RichMapFunction[String, Int] {
  def map(in: String): Int = in.toInt
}

and pass the function to a map transformation:

data.map(new MyMapFunction())

Rich 関数は匿名クラスとして定義することもできます:

data.map (new RichMapFunction[String, Int] {
  def map(in: String): Int = in.toInt
})

Back to top

アキュムレータ & カウンタ #

Accumulatorsはadd operationfinal accumulated resultを備えた単純な構造で、ジョブの終了後に利用可能になります。

最も単純なaccumulatorはcounterです: Accumulator.add(V value)メソッドを使ってインクリメントできます。ジョブの最後にFlinkは全ての部分結果を合計(マージ)し、結果をクライアントに送信します。Accumulatorsはデバッグ中や、データについてすぐ詳しく知りたい場合に役立ちます。

Flinkには現在以下の組み込みaccumulatorsがあります。それぞれは Accumulator インタフェースを実装します。

  • IntCounter LongCounter DoubleCounter : カウンタを使う例については以下を参照してください。
  • Histogram : A histogram implementation for a discrete number of bins. 内部的には、これは整数から整数への単なるマップです。これを使って、値の分布、例えば単語数プログラムの行当たりの単語数の分布、を計算できます。

accumulatorsの使用方法:

まず、使いたいユーザ定義の変換関数でaccumulatorオブジェクト(ここではカウンター)を作成する必要があります。

private IntCounter numLines = new IntCounter();

次に、accumulatorオブジェクトを登録する必要があります。通常はrichopen()メソッドで登録します。ここで名前も定義します。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

open()close()メソッドなど、オペレータ関数のどこでもaccumulatorを使えるようになりました。

this.numLines.add(1);

全体的な結果は実行環境のexecute()メソッドから返されるJobExecutionResultオブジェクトに保存されます(現在、これは実行がジョブの完了を待つ場合にのみ機能します)。

myJobExecutionResult.getAccumulatorResult("num-lines");

全てのアキュムレータはジョブごとに1つの名前空間を共有します。従って、ジョブの異なるオペレータ関数で同じaccumulatorを実行できます。Flinkは同じ名前を持つ全てのaccumulatorを内部的にマージします。

accumulatorとイテレーションでの注意: 現在のところaccumulatorの結果は全てのジョブが終了した後でのみ利用可能です。前回のイテレーションの結果も次のイテレーションで利用できるようにする予定です。 next iteration. Aggregators を使ってイテレーションごとの統計を計算し、そのような統計に基づいてイテレーションの中止を決定できます。

独自のaccumulators:

独自のaccumulatorを実装するには、Accumulatorインタフェースの実装を書くだけです。あなたの独自のaccumulatorがFlinkに同梱されるべきだと思う場合は、遠慮なくプルリクエストを作成してください。 with Flink.

Accumulator または SimpleAccumulator のどちらかを実装するかを選択できます。

Accumulator<V,R>は最も柔軟です: 追加する値の型Vと、最終結果の結果型Rを定義します。例えば、ヒストグラムの場合、Vは数値で、Rはヒストグラムです。SimpleAccumulatorは両方の型が同じ場合に使います。例えば、カウンター。

Back to top

inserted by FC2 system