Flink データセット API プログラミング ガイド

Flinkのデータセット プログラムはデータセット上の変換を実装する通常のプログラムです(例えば、フィルタリング、マッピング、ジョイニング、グルーピング)。データセットは最初に特定のソースから生成されます(例えば、ファイルからの読み込み、あるいはローカルのコレクションから)。結果はsinkを使って返されます。これは例えば(分散された)ファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。

Flink APIの基本的な概念の紹介については基本的な概念を見てください。

独自のFlinkデータセットプログラムを作成するには、Flinkプログラムの内部構造から始め、次第に独自の 変換を追加することをお勧めします。残りのセクションは、追加のオペレーションと上級の特徴についてのリファレンスとして振る舞います。

プログラムの例

以下のプログラムはWordCountの完全に動作する例です。ローカルでそれを動作するためにコードをコピー&ペーストすることができます。しなければならないことは、現在のFlinkのライブラリをプロジェクトに入れ (Flinkとのリンクの章を見てください)、importを指定することだけです。これで準備が整いました!

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

上に戻る

データセットの変換

データ変換は1つ以上のデータセットを新しいデータセットに変換します。プログラムは複数の変換を洗練された集合体に組み合わせることができます。

この章は利用可能な変換の短い概要を説明します。変換ドキュメント には例付きの全ての変換の完全な説明があります。


変換 解説
Map

一つの要素を取り、一つの要素を生成します。

data.map(new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
FlatMap

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。

data.flatMap(new FlatMapFunction<String, String>() {
  public void flatMap(String value, Collector<String> out) {
    for (String s : value.split(" ")) {
      out.collect(s);
    }
  }
});
MapPartition

1回の関数の呼び出しで並行するパーティションを変換する。関数はパーティションをIterable ストリームとして取得し、任意の数の結果の値を生成することができます。各パーティション内の要素の数は並行度の度合と以前のオペレーションに依存します。

data.mapPartition(new MapPartitionFunction<String, Long>() {
  public void mapPartition(Iterable<String> values, Collector<Long> out) {
    long c = 0;
    for (String s : values) {
      c++;
    }
    out.collect(c);
  }
});
フィルター

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。
重要: システムは、関数は述部が適用される要素を修正することは無いと仮定します。この仮定を破ると間違った結果に繋がるかも知れません。

data.filter(new FilterFunction<Integer>() {
  public boolean filter(Integer value) { return value > 1000; }
});
Reduce

再帰的に二つの要素を1つに結合することで、要素のグループを1つの要素に結合します。Reduceは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduce(new ReduceFunction<Integer> {
  public Integer reduce(Integer a, Integer b) { return a + b; }
});
ReduceGroup

要素のグループを1つ以上の要素に結合します。ReduceGroupは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
  public void reduce(Iterable<Integer> values, Collector<Integer> out) {
    int prefixSum = 0;
    for (Integer i : values) {
      prefixSum += i;
      out.collect(prefixSum);
    }
  }
});

reduce がグループ化されたデータセットに適用された場合、CombineHintを二つ目のパラメータに与えることで、ランタイムがreduceの結合フェーズを実行する方法を指定することができます。ハッシュベースの戦略がほとんどの場合において高速で、異なるキーの数が入力の要素の数に比べて小さい場合がそうです。(例えば 1/10)。

Aggregate

値のグループを1つの値に集約します。Aggregation 関数は組み込みのreduce関数と考えることができます。Aggregateは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);

最小、最大、および集約のための略記構文も使うことができます。

Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
Distinct

データセットの明確に別個な要素を返します。入力データセットから、要素の全てのフィールドに関して、あるいはフィールドの部分集合に関して重複したエントリを削除します。

data.distinct();
Join キーが等しい二つのデータセットの要素の全てのペアを生成し結合します。任意で、JoinFunctionを使って要素のペアを1つの要素に変換、あるいは FlatJoinFunction を使って要素のペアを任意の多く(noneを含む)の要素へ変換します。joinキーを定義する方法を学ぶにはkeys sectionを見てください。
result = input1.join(input2)
               .where(0)       // key of the first input (tuple field 0)
               .equalTo(1);    // key of the second input (tuple field 1)
Join Hintsを使ってランタイムがjoinを実行する方法を指定することができます。ヒントはパーティションあるいはブロードキャスティングを経由してjoinが起こるかどうか、およびそれがソート ベースあるいはハッシュ ベースのアルゴリズムを使うかどうかを記述します。可能なヒントのリストと例については、変換ガイドを参照してください。もしヒントが指定されない場合は、システムは入力サイズの推測を行い、それらの推測に応じて最良の戦略を取るようにするでしょう。
// This executes a join by broadcasting the first data set
// using a hash table for the broadcasted data
result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
               .where(0).equalTo(1);
join変換はequi-joinsのためにのみ動作することに注意してください。他のjoinタイプは OuterJoin あるいは CoGroup を使って表現される必要があります。
OuterJoin 二つのデータセット上で、left、right あるいは full outer join を行います。Outer joins は通常(inner)のjoinに似ていて、キーが等しい要素の全てのペアを生成します。更に、"outer"側 (left, right, あるいはfullの場合は両方)はもし他方に一致するキーが見つからない場合でも残されます。要素のペアから1つの要素に変えるために、要素の揃いのペア(あるいは1つの要素と、他の入力のためのnull)がJoinFunctionに渡されます。あるいは、要素のペアから任意の多数(無しも含む)の要素に変えるためにFlatJoinFunctionに渡されます。joinのキーを定義する方法を学ぶには、keys section を見てください。
input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
      .where(0)              // key of the first input (tuple field 0)
      .equalTo(1)            // key of the second input (tuple field 1)
      .with(new JoinFunction<String, String, String>() {
          public String join(String v1, String v2) {
             // NOTE:
             // - v2 might be null for leftOuterJoin
             // - v1 might be null for rightOuterJoin
             // - v1 OR v2 might be null for fullOuterJoin
          }
      });
CoGroup

reduce オペレーションの二次元の変数。1つ以上のフィールドの各入力をグループ化し、グループをjoinします。変換関数はグループの各ペアごとに呼ばれます。coGroupキーを定義する方法を学ぶにはkeys sectionを見てください。

data1.coGroup(data2)
     .where(0)
     .equalTo(1)
     .with(new CoGroupFunction<String, String, String>() {
         public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) {
           out.collect(...);
         }
      });
Cross

要素の全てのペアを作成して、二つの入力のデカルト積(クロス積)を構築します。任意で要素のペアを1つの要素に変換するためにCrossFunctionを使います。

DataSet<Integer> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<Tuple2<Integer, String>> result = data1.cross(data2);

Note: Cross is potentially a very compute-intensive operation which can challenge even large compute clusters!crossWithTiny()crossWithHuge()を使ってシステムにデータセットのサイズについてのヒントを与えることをお勧めします。

Union

二つのデータセットの和集合を生成します。

DataSet<String> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<String> result = data1.union(data2);
リバランス

データの歪を除去するために一様にデータセットの並行パーティションをリバランスします。Mapのような変換だけがリバランス変換に従うかも知れません。

DataSet<String> in = // [...]
DataSet<String> result = in.rebalance()
                           .map(new Mapper());
ハッシュ パーティション

指定されたキーにデータセットをハッシュパーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByHash(0)
                            .mapPartition(new PartitionMapper());
レンジ パーティション

指定されたキーにデータセットをレンジ パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByRange(0)
                            .mapPartition(new PartitionMapper());
カスタム パーティション

手動でデータ上のパーティションを指定します。
注意: このメソッドは1つのフィールドキーでのみ動作します。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
ソート パーティション

指定されたフィールド上で指定された順番でデータセットの全てのパーティションをローカルでソートする。フィールドは、タプル位置あるいはフィールド表現として指定することができます。複数のフィールドでのソートは sortPartition() 呼び出しを繋げることで行われます。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
                            .mapPartition(new PartitionMapper());
First-n

データセットの最初のn(任意)個の要素を返す。First-n は通常のデータセット、グループ化されたデータセット、あるいはグループ化されソートされたデータセット上に適用することができます。グループ化のキーはキー選択機能あるいはフィールド位置キーとして指定することができます。

DataSet<Tuple2<String,Integer>> in = // [...]
// regular data set
DataSet<Tuple2<String,Integer>> result1 = in.first(3);
// grouped data set
DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0)
                                            .first(3);
// grouped-sorted data set
DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0)
                                            .sortGroup(1, Order.ASCENDING)
                                            .first(3);

以下の変換はタプルのデータセット上で利用可能です:

変換 解説
プロジェクト

タプルからフィールドのサブセットを選択する

DataSet<Tuple3<Integer, Double, String>> in = // [...]
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
MinBy / MaxBy

1つ以上のフィールドの値が最小(最大)であるタプルのグループからタプルを選択する。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小(最大)フィールド値を持つ場合は、これらのタプルの任意のタプルが返されます。MinBy (MaxBy) はデータセット全体あるいはグループ化されたデータセットに適用されるかも知れません。

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// a DataSet with a single tuple with minimum values for the Integer and String fields.
DataSet<Tuple3<Integer, Double, String>> out = in.minBy(0, 2);
// a DataSet with one tuple for each group with the minimum value for the Double field.
DataSet<Tuple3<Integer, Double, String>> out2 = in.groupBy(2)
                                                  .minBy(1);


変換 解説
Map

一つの要素を取り、一つの要素を生成します。

data.map { x => x.toInt }
FlatMap

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。

data.flatMap { str => str.split(" ") }
MapPartition

1回の関数の呼び出しで並行するパーティションを変換する。関数は'Iterator'としてパーティションを取り、任意の数の結果値を生成することができます。各パーティション内の要素の数は並行度の度合と以前のオペレーションに依存します。

data.mapPartition { in => in map { (_, 1) } }
フィルター

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。
重要: システムは、関数は述部が適用される要素を修正することは無いと仮定します。この仮定を破ると間違った結果に繋がるかも知れません。

data.filter { _ > 1000 }
Reduce

再帰的に二つの要素を1つに結合することで、要素のグループを1つの要素に結合します。Reduceは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduce { _ + _ }
ReduceGroup

要素のグループを1つ以上の要素に結合します。ReduceGroupは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduceGroup { elements => elements.sum }
Aggregate

値のグループを1つの値に集約します。Aggregation 関数は組み込みのreduce関数と考えることができます。Aggregateは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2);

最小、最大、および集約のための略記構文も使うことができます。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
Distinct

データセットの明確に別個な要素を返します。入力データセットから、要素の全てのフィールドに関して、あるいはフィールドの部分集合に関して重複したエントリを削除します。

data.distinct()
Join キーが等しい二つのデータセットの要素の全てのペアを生成し結合します。任意で、JoinFunctionを使って要素のペアを1つの要素に変換、あるいは FlatJoinFunction を使って要素のペアを任意の多く(noneを含む)の要素へ変換します。joinキーを定義する方法を学ぶにはkeys sectionを見てください。
// In this case tuple fields are used as keys. "0" is the join field on the first tuple
// "1" is the join field on the second tuple.
val result = input1.join(input2).where(0).equalTo(1)
Join Hintsを使ってランタイムがjoinを実行する方法を指定することができます。ヒントはパーティションあるいはブロードキャスティングを経由してjoinが起こるかどうか、およびそれがソート ベースあるいはハッシュ ベースのアルゴリズムを使うかどうかを記述します。可能なヒントのリストと例については、変換ガイドを参照してください。もしヒントが指定されない場合は、システムは入力サイズの推測を行い、それらの推測に応じて最良の戦略を取るようにするでしょう。
// This executes a join by broadcasting the first data set
// using a hash table for the broadcasted data
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                   .where(0).equalTo(1)
join変換はequi-joinsのためにのみ動作することに注意してください。他のjoinタイプは OuterJoin あるいは CoGroup を使って表現される必要があります。
OuterJoin 二つのデータセット上で、left、right あるいは full outer join を行います。Outer joins は通常(inner)のjoinに似ていて、キーが等しい要素の全てのペアを生成します。更に、"outer"側 (left, right, あるいはfullの場合は両方)はもし他方に一致するキーが見つからない場合でも残されます。要素のペアから1つの要素に変えるために、要素の揃いのペア(あるいは1つの要素と、他の入力のための'null'値)がJoinFunctionに渡されます。あるいは、要素のペアから任意の多数(無しも含む)の要素に変えるためにFlatJoinFunctionに渡されます。joinキーを定義する方法を学ぶにはkeys sectionを見てください。
val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
   (left, right) =>
     val a = if (left == null) "none" else left._1
     (a, right)
  }
CoGroup

reduce オペレーションの二次元の変数。1つ以上のフィールドの各入力をグループ化し、グループをjoinします。変換関数はグループの各ペアごとに呼ばれます。coGroupキーを定義する方法を学ぶにはkeys sectionを見てください。

data1.coGroup(data2).where(0).equalTo(1)
Cross

要素の全てのペアを作成して、二つの入力のデカルト積(クロス積)を構築します。任意で要素のペアを1つの要素に変換するためにCrossFunctionを使います。

val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)

Note: Cross is potentially a very compute-intensive operation which can challenge even large compute clusters!crossWithTiny()crossWithHuge()を使ってシステムにデータセットのサイズについてのヒントを与えることをお勧めします。

Union

二つのデータセットの和集合を生成します。

data.union(data2)
リバランス

データの歪を除去するために一様にデータセットの並行パーティションをリバランスします。Mapのような変換だけがリバランス変換に従うかも知れません。

val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)
ハッシュ パーティション

指定されたキーにデータセットをハッシュパーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
レンジ パーティション

指定されたキーにデータセットをレンジ パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
カスタム パーティション

手動でデータ上のパーティションを指定します。
注意: このメソッドは1つのフィールドキーでのみ動作します。

val in: DataSet[(Int, String)] = // [...]
val result = in
  .partitionCustom(partitioner: Partitioner[K], key)
ソート パーティション

指定されたフィールド上で指定された順番でデータセットの全てのパーティションをローカルでソートする。フィールドは、タプル位置あるいはフィールド表現として指定することができます。複数のフィールドでのソートは sortPartition() 呼び出しを繋げることで行われます。

val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
First-n

データセットの最初のn(任意)個の要素を返す。First-n は通常のデータセット、グループ化されたデータセット、あるいはグループ化されソートされたデータセット上に適用することができます。グループ化のキーはキー選択機能、タプル位置あるいはcase class フィールドとして指定することができます。

val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

以下の変換はタプルのデータセット上で利用可能です:

変換 解説
MinBy / MaxBy

1つ以上のフィールドの値が最小(最大)であるタプルのグループからタプルを選択する。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小(最大)フィールド値を持つ場合は、これらのタプルの任意のタプルが返されます。MinBy (MaxBy) はデータセット全体あるいはグループ化されたデータセットに適用されるかも知れません。

val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
                                             .minBy(1)

匿名パターンマッチングを使ったタプル、case class および コレクションからの抽出は、以下のようになります:

val data: DataSet[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

そのままのAPIではサポートされません。この機能を使うには、Scala API 拡張を使う必要があります。

変換の並行度setParallelism(int)によって定義することができ、一方でname(String) はデバッグに便利な独自の名前を変換に割り当てます。同じことが Data SourcesData Sinksについても可能です。

withParameters(Configuration) はConfigurationオブジェクトを渡します。そしてユーザ定義の関数内でopen()メソッドからアクセスすることができます。

上に戻る

データソース

データソースはファイル、あるいはJavaコレクションから、初期データセットを生成します。データセットを生成する一般的な気候はInputFormatの後ろで抽象化されます。Flinkは一般的なファイル形式からデータセットを生成するいくつかの組み込みのフォーマットが付属しています。それらの多くはExecutionEnvironment上にショートカットメソッドを持ちます。

ファイルベース:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues は変更可能な文字列です。

  • readCsvFile(path) / CsvInputFormat - カンマ(あるいは他の文字)でフィールドが区切られたファイルをパースします。タプルあるいはPOJOのデータセットを返します。基本java型とフィールド型としてそれらのValueに対応するものをサポートします。

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - 改行(あるいはその他の文字シーケンス)によって区切られたString あるいは Integerのようなプリミティブ データ型をパースします。

  • readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat - 改行(あるいはその他の文字シーケンス)によって区切られたString あるいは Integerのようなプリミティブ データ型をパースします。

  • readHadoopFile(FileInputFormat, Key, Value, path) / FileInputFormat - JobConfを生成し指定されたパスから指定されたFileInputFormatとKeyクラスとValueクラスを使ってファイルを読み込み、そらをTuple2<Key, Value>として返します。

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - JobConfを生成し、指定されたパスから指定された型のSequenceFileInputFormatと Key クラスと Value クラスを使ってファイルを読み込み、それらを Tuple2<Key, Value>として返します。

コレクション ベース:

  • fromCollection(Collection) - JavaのJava.util.Collectionからデータセットを生成します。コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator, Class) - イテレータからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(T ...) - 渡されたオブジェクトのシーケンスからデータセットを生成します。全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator, Class) - 並行してイテレータからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

一般:

  • readFile(inputFormat, path) / FileInputFormat - ファイルの入力フォーマットを受け付けます。

  • createInput(inputFormat) / InputFormat - 一般的な入力フォーマットを受け付けます。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

// read text file from a HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
	                       .types(Integer.class, String.class, Double.class);

// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
	                       .types(String.class, Double.class);

// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");


// read a file from the specified path of type TextInputFormat
DataSet<Tuple2<LongWritable, Text>> tuples =
 env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

// generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      // create and configure input format
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .finish(),
      // specify type information for DataSet
      new TupleTypeInfo(Tuple2.class, STRING_TYPE_INFO, INT_TYPE_INFO)
    );

// Note: Flink's program compiler needs to infer the data types of the data items which are returned
// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
// manually provide the type information as shown in the examples above.

CSVパースの設定

Flink はCSVパースのための多くの設定オプションを持ちます:

  • types(Class ... types) パースするフィールドの型を指定します。パースされたフィールドの型を設定することは必須です。 タイプ クラス Boolean.class の場合、“True” (大文字小文字を区別しません)、“False” (大文字小文字を区別しません)、“1” および “0” は真偽値として扱われます。

  • lineDelimiter(String del) 個々のレコードのデリミタを指定する。デフォルトの行デリミタは改行文字'\n'です。

  • fieldDelimiter(String del) レコードのフィールドを分割するデリミタを指定する。デフォルトのフィールドのデリミタはカンマ文字 ','です。

  • includeFields(boolean ... flag), includeFields(String mask) あるいは includeFields(long bitMask) はどのフィールドを入力ファイルから読み取るか(そしてどれを無視するか)を定義します。デフォルトでは、最初の n 個のフィールド (types() 呼び出しの中の型の数で定義) がパースされます。

  • parseQuotedStrings(char quoteChar) はクォートされた文字のパースを有効にします。もし文字列フィールドの最初の文字がクォート文字であれば、文字列はクォートされた文字列としてパースされます (前後の空白はトリムされません)。クォートされた文字列内のフィールド デリミタは無視されます。クォートされたフィールドの最後の文字がクォート文字では無いか、クォートされた文字列フィールドの開始あるいは終了では無い場所に(クォート文字が ‘'を使ってエスケープされずに)クォート文字が現れた場合は、クォートされた文字列のパースは失敗します。クォート文字のパースが有効にされ、フィールドの最初の文字がクォート文字では無い場合は、文字列はクォートされていない文字列としてパースされます。デフォルトでは、クォートされた文字列のパースは無効です。

  • ignoreComments(String commentPrefix) コメントの接頭語を指定します。指定されたコメント接頭語で始まる全ての行はパースされず無視されます。デフォルトでは、無視される行はありません。

  • ignoreInvalidLines() は寛大なパースを有効にします。つまり、正しくパースされない行は無視されます。デフォルトでは、寛大なパースは無効で、無効な行は例外を起こします。

  • ignoreFirstLine() はInputFormatが入力ファイルの最初の行を無視するように設定します。デフォルトでは行は無視されません。

入力パスディレクトリの再帰的な横断

ファイルベースの入力について、入力のパスがディレクトリの場合、入れ子のファイルはデフォルトで列挙されません。代わりに、基本ディレクトリ内のファイルだけが読み込まれ、一方で入れ子のファイルは無視されます。以下の例のようにrecursive.file.enumeration 設定パラメータを使って、入れ子のファイルの再帰的な列挙が可能です。

// enable recursive enumeration of nested input files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);

// pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
			  .withParameters(parameters);

データソースはファイル、あるいはJavaコレクションから、初期データセットを生成します。データセットを生成する一般的な気候はInputFormatの後ろで抽象化されます。Flinkは一般的なファイル形式からデータセットを生成するいくつかの組み込みのフォーマットが付属しています。それらの多くはExecutionEnvironment上にショートカットメソッドを持ちます。

ファイルベース:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues は変更可能な文字列です。

  • readCsvFile(path) / CsvInputFormat - カンマ(あるいは他の文字)でフィールドが区切られたファイルをパースします。タプル、caseクラスオブジェクト、あるいはPOJOを返します。基本java型とフィールド型としてそれらのValueに対応するものをサポートします。

  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - 改行(あるいはその他の文字シーケンス)によって区切られたString あるいは Integerのようなプリミティブ データ型をパースします。

  • readHadoopFile(FileInputFormat, Key, Value, path) / FileInputFormat - JobConfを生成し指定されたパスから指定されたFileInputFormatとKeyクラスとValueクラスを使ってファイルを読み込み、そらをTuple2<Key, Value>として返します。

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - JobConfを生成し、指定されたパスから指定された型のSequenceFileInputFormatと Key クラスと Value クラスを使ってファイルを読み込み、それらを Tuple2<Key, Value>として返します。

コレクション ベース:

  • fromCollection(Seq) - Seqからデータセットを生成します。コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator) - Iteratorからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(elements: _*) - 指定されたオブジェクトの系列からデータセットを生成します。全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator) - 並行してイテレータからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

一般:

  • readFile(inputFormat, path) / FileInputFormat - ファイルの入力フォーマットを受け付けます。

  • createInput(inputFormat) / InputFormat - 一般的な入力フォーマットを受け付けます。

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000);

// read a file from the specified path of type TextInputFormat
val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
 classOf[Text], "hdfs://nnHost:nnPort/path/to/file")

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file")

CSVパースの設定

Flink はCSVパースのための多くの設定オプションを持ちます:

  • lineDelimiter: String は個々のレコードのデリミタを指定します。デフォルトの行デリミタは改行文字'\n'です。

  • fieldDelimiter: String はレコードの個々のフィールドを分割するデリミタを指定します。デフォルトのフィールドのデリミタはカンマ文字 ','です。

  • includeFields: Array[Int] はどのフィールドを入力ファイルから読み込むか(そして無視するか)を定義します。デフォルトでは、最初の n 個のフィールド(types() 呼び出し内での型の数によって定義されます)がパースされます。

  • pojoFields: Array[String] はCSVフィールドにマップされるPOJOのフィールドを指定します。CSVフィールドのためのパーサは自動的にPOJOフィールドの型と順番に基づいて初期化されます。

  • parseQuotedStrings: Character はクォートされた文字列のパースを有効にします。もし文字列フィールドの最初の文字がクォート文字であれば、文字列はクォートされた文字列としてパースされます (前後の空白はトリムされません)。クォートされた文字列内のフィールド デリミタは無視されます。もしクォートされた文字列フィールドの最後の文字がクォート文字では無い場合は、クォートされた文字列のパースが失敗します。クォート文字のパースが有効にされ、フィールドの最初の文字がクォート文字では無い場合は、文字列はクォートされていない文字列としてパースされます。デフォルトでは、クォートされた文字列のパースは無効です。

  • ignoreComments: String はコメントのプリフィックスを指定します。指定されたコメント接頭語で始まる全ての行はパースされず無視されます。デフォルトでは、無視される行はありません。

  • lenient: Boolean は寛大なパースを有効にします。つまり、正しくパースされない行は無視されます。デフォルトでは、寛大なパースは無効で、無効な行は例外を起こします。

  • ignoreFirstLine(): Boolean はInputFormatが入力ファイルの最初の行を無視するように設定します。デフォルトでは行は無視されません。

入力パスディレクトリの再帰的な横断

ファイルベースの入力について、入力のパスがディレクトリの場合、入れ子のファイルはデフォルトで列挙されません。代わりに、基本ディレクトリ内のファイルだけが読み込まれ、一方で入れ子のファイルは無視されます。以下の例のようにrecursive.file.enumeration 設定パラメータを使って、入れ子のファイルの再帰的な列挙が可能です。

// enable recursive enumeration of nested input files
val env  = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)

圧縮ファイルの読み込み

もし入力ファイルが適切なファイル拡張子でマークされている場合、Flinkは現在のところ透過的な解凍をサポートします。特に、このことは入力フォーマットの設定がこれ以上必要無く、どのよな FileInputFormat も独自の入力フォーマットを含む圧縮をサポートすることを意味します。圧縮されたファイルは並行して読めないかも知れず、従ってジョブのスケーラビリティに影響があるということに注意してください。

以下の表は現在サポートされている圧縮メソッドをリスト化します。


圧縮メソッド ファイル拡張子 並行可能
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no

上に戻る

データのsink

データ シンクはデータセットを消費し、それらを格納あるいは返すために使う事ができます。データ シンクのオペレーションはOutputFormatを使って説明されます。Flinkはデータセット上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. 文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。
  • writeAsFormattedText() / TextOutputFormat - Write elements line-wise as Strings. 文字列はそれぞれの要素のユーザ定義のformat() メソッドを呼ぶことで取得されます。
  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。
  • print() / printToErr() / print(String msg) / printToErr(String msg) - 標準出力/標準エラー ストリーム上の各要素のtoString()の値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。これはprintの異なる呼び出し間で区別するのに役立ちます。並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。
  • write() / FileOutputFormat - 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。
  • output()/ OutputFormat - (データべースに結果を格納するような)ファイルベースでは無いデータ シンクについての、ほとんどの一般的な出力メソッド。

データセットは複数のオペレータの入力かも知れません。プログラムはデータセットを書き込みあるいは出力することができ、同時にそれらに追加の変換を実行します。

標準的なデータ シンク メソッド:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

独自の出力フォーマットを使う:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

ローカルでソートされた出力

データシンクの出力はタプルのフィールドの場所 あるいは フィールド表現を使って特定のフィールド上で指定された順番でローカルでソートすることができます。これは各出力フォーマットについて動作します。

以下の例はこの機能をどうやって使うかを示します:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

グローバルでソートされた出力はまだサポートされません。

データ シンクはデータセットを消費し、それらを格納あるいは返すために使う事ができます。データ シンクのオペレーションはOutputFormatを使って説明されます。Flinkはデータセット上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。

  • writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. 文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。
  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。
  • print() / printToErr() - 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。
  • write() / FileOutputFormat - 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。
  • output()/ OutputFormat - (データべースに結果を格納するような)ファイルベースでは無いデータ シンクについての、ほとんどの一般的な出力メソッド。

データセットは複数のオペレータの入力かも知れません。プログラムはデータセットを書き込みあるいは出力することができ、同時にそれらに追加の変換を実行します。

標準的なデータ シンク メソッド:

// text data
val textData: DataSet[String] = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")

ローカルでソートされた出力

データシンクの出力はタプルのフィールドの場所 あるいは フィールド表現を使って特定のフィールド上で指定された順番でローカルでソートすることができます。これは各出力フォーマットについて動作します。

以下の例はこの機能をどうやって使うかを示します:

val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print;

// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print;

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...);

グローバルでソートされた出力はまだサポートされません。

上に戻る

繰り返し操作

イテレーションはFlinkプログラム内でのループを実装します。イテレーション オペレータはプログラムの部分をカプセル化し、1つの繰り返しの結果(部分的な解)をフィードバックしながら繰り返し実行します。Flinkには二つの種類の繰り返しがあります: BulkIteration および DeltaIteration

この章は両方のぺれーたをどうやって使うかについての素早い例を提供します。もっと詳細な紹介についてはイテレーションの紹介 ページを調べてください。

バルク イテレーション

BulkIteration を生成するには、イテレーションが開始されなければならないデータセットのiterate(int)メソッドを呼び出します。これはIterativeDataSetを返し、通常のオペレータを使って変換することができます。iterate呼び出しの1つの引数は、繰り返しの最大数を指定します。

繰り返しの終了を指定するために、IterativeDataSet 上にどの変換を次の繰り返しにフィードバックすべきかを指定するためのcloseWith(DataSet)を呼び出します。追加でcloseWith(DataSet, DataSet)を使って追加の条件を指定することができます。これは2つ目のデータセットを評価し、このデータセットが空であれば繰り返しを終了します。終了の条件が指定されない場合は、指定された最大数の繰り返しの後で繰り返しが終了します。

以下の例はPiの数字を繰り返し推測します。ランダムな点の数を数えるのが目的です。これは単位円に陥ります。各繰り返しの中で、ランダムな点が取り上げられます。この点が単位円の中にある場合、カウントを増加します。そして、Pi は繰り返しの数を4倍したもので結果のカウントを割ったものとして推測されます。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create initial IterativeDataSet
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);

DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();

env.execute("Iterative Pi Example");

K-Means exampleを調べることもできます。これはBulkIterationをラベル付けされていない点のセットを群にするために使います。

デルタ イテレーション

デルタ イテレーションは、あるアルゴリズムが各繰り返しの中で結果のデータの各点を変更しないという事実を利用します。

各繰り返しの中でフィードバックされる(ワークセットと呼ばれる)部分的な解に加えて、デルタ イテレーションは繰り返し間での状態を維持します(ソルーション セットと呼ばれます)。これはデルタを使って更新することができます。繰り返しの計算の結果は、最後の繰り返しの後での状態です。デルタ イテレーションの基本的な原理の概要については、繰り返しの紹介 を参照してください。

DeltaIteration の定義は BulkIteration の定義に似ています。For delta iterations, two data sets form the input to each iteration (workset and solution set), and two data sets are produced as the result (new workset, solution set delta) in each iteration.

DeltaIteration を生成するには、iterateDelta(DataSet, int, int) (あるいはそれぞれ iterateDelta(DataSet, int, int[])) を呼び出します。このメソッドは初期の結果セット上で呼び出されます。引数は、初期デルタ セット、繰り返しの最大数、キーの場所 です。返された DeltaIteration オブジェクトにより、iteration.getWorkset()iteration.getSolutionSet()を使ってワークセットと結果セットを表すデータセットへアクセスすることができます。

以下はデルタ イテレーションの構文についての例です。

// read the initial data sets
DataSet<Tuple2<Long, Double>> initialSolutionSet = // [...]

DataSet<Tuple2<Long, Double>> initialDeltaSet = // [...]

int maxIterations = 100;
int keyPosition = 0;

DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialSolutionSet
    .iterateDelta(initialDeltaSet, maxIterations, keyPosition);

DataSet<Tuple2<Long, Double>> candidateUpdates = iteration.getWorkset()
    .groupBy(1)
    .reduceGroup(new ComputeCandidateChanges());

DataSet<Tuple2<Long, Double>> deltas = candidateUpdates
    .join(iteration.getSolutionSet())
    .where(0)
    .equalTo(0)
    .with(new CompareChangesToCurrent());

DataSet<Tuple2<Long, Double>> nextWorkset = deltas
    .filter(new FilterByThreshold());

iteration.closeWith(deltas, nextWorkset)
	.writeAsCsv(outputPath);

バルク イテレーション

BulkIteration を生成するには、イテレーションが開始されなければならないデータセットのiterate(int)メソッドを呼び出し、ステップ関数も指定します。ステップ関数は現在の繰り返しについての入力データセットを取得し、新しいデータセットを返さなければなりません。繰り返しの呼び出しのパラメータは、停止するまでの繰り返しの最大数です。

二つのデータセットを返すステップ関数を受け付ける iterateWithTermination(int) もあります: 繰り返しステップの結果と終了条件です。一度 終了条件のデータセットが空になると繰り返しが終了します。

以下の例はPiの数字を繰り返し推測します。ランダムな点の数を数えるのが目的です。これは単位円に陥ります。各繰り返しの中で、ランダムな点が取り上げられます。この点が単位円の中にある場合、カウントを増加します。そして、Pi は繰り返しの数を4倍したもので結果のカウントを割ったものとして推測されます。

val env = ExecutionEnvironment.getExecutionEnvironment()

// Create initial DataSet
val initial = env.fromElements(0)

val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
    val x = Math.random()
    val y = Math.random()
    i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}

val result = count map { c => c / 10000.0 * 4 }

result.print()

env.execute("Iterative Pi Example");

K-Means exampleを調べることもできます。これはBulkIterationをラベル付けされていない点のセットを群にするために使います。

デルタ イテレーション

デルタ イテレーションは、あるアルゴリズムが各繰り返しの中で結果のデータの各点を変更しないという事実を利用します。

各繰り返しの中でフィードバックされる(ワークセットと呼ばれる)部分的な解に加えて、デルタ イテレーションは繰り返し間での状態を維持します(ソルーション セットと呼ばれます)。これはデルタを使って更新することができます。繰り返しの計算の結果は、最後の繰り返しの後での状態です。デルタ イテレーションの基本的な原理の概要については、繰り返しの紹介 を参照してください。

DeltaIteration の定義は BulkIteration の定義に似ています。For delta iterations, two data sets form the input to each iteration (workset and solution set), and two data sets are produced as the result (new workset, solution set delta) in each iteration.

DeltaIteration を生成するには、初期ソリューションセット上でiterateDelta(initialWorkset, maxIterations, key)を呼びます。ステップ関数は二つのパラメータを取ります: (solutionSet, workset)。そして二つの値を返さなければなりません: (solutionSetDelta, newWorkset)。

以下はデルタ イテレーションの構文についての例です。

// read the initial data sets
val initialSolutionSet: DataSet[(Long, Double)] = // [...]

val initialWorkset: DataSet[(Long, Double)] = // [...]

val maxIterations = 100
val keyPosition = 0

val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}

result.writeAsCsv(outputPath)

env.execute()

上に戻る

関数内のデータオブジェクトへの操作

FlinkのランタイムはJavaオブジェクトの形式のユーザ定義関数を使ってデータを交換します。関数はランタイムからメソッドパラメータとして入力オブジェクトを受け取り、結果として出力オブジェクトを返します。これらのオブジェクトはユーザ関数とランタイムコードによってアクセスされるため、ユーザコードがこれらのオブジェクトにどのようにアクセス、つまり読み込みおよび修正をするかのルールについて理解し従うことがとても重要です。

User functions receive objects from Flink’s runtime either as regular method parameters (like a MapFunction) or through an Iterable parameter (like a GroupReduceFunction). ランタイムがinput objectsとしてユーザ関数に渡すオブジェクトを参照します。User functions can emit objects to the Flink runtime either as a method return value (like a MapFunction) or through a Collector (like a FlatMapFunction). ユーザ関数によってランタイムに出力オブジェクトとして出力されるオブジェクトを参照します。

FlinkのデータセットAPIは、Flinkのランタイムが入力オブジェクトをどのように生成あるいは再利用するかが異なる二つのモデルを特徴とします。この挙動はユーザ関数が入力および出力オブジェクトとやり取りできる方法についての保証および制約に影響します。以下の章はこれらのルールを定義し、安全なユーザ関数コードを書くためのコーディング ガイドラインを与えます。

オブジェクト再利用不可 (デフォルト)

デフォルトで、Flinkはオブジェクト再利用不可モードで作動します。このモードは、関数が関数呼び出しの中で常に新しい入力オブジェクトを受け取ることを確実にします。オブジェクト再利用無効モードは使用についてより良い保証と安全にします。しかし、ある程度の処理のオーバーヘッドがあり、より高頻度のJavaガベージコレクションを引き起こすかも知れません。以下の表は、ユーザ関数がどのようにオブジェクト再利用不可モードで入力および出力オブジェクトにアクセスできるかを説明します。

オペレーション 保証と制約
入力オブジェクトの読み込み メソッドの呼び出しの中で、入力オブジェクトの値が変わらないことが保証されます。これはIterableによって提供されるオブジェクトを含みます。例えば、ListあるいはMapの中でIterableによって提供される入力オブジェクトを集めても安全です。オブジェクトはメソッド呼び出しから離れた後で変更されるかも知れないことに注意してください。関数呼び出しをまたいでオブジェクトを記憶することは安全ではありません
入力オブジェクトの修正 入力オブジェクトを修正することができます。
入力オブジェクトの発行 入力オブジェクトを発行することができます。入力オブジェクトの値は発行の後で変更されるかも知れません。入力オブジェクトが発行された後でそれを読むことは 安全ではありません
出力オブジェクトの読み込み Collectorに渡されたオブジェクト、あるいはメソッドの結果として返されたオブジェクトは、値が変わっているかも知れません。出力オブジェクトを読むことは安全ではありません
出力オブジェクトの修正 オブジェクトが発行された後で修正し、それを再び発行することができます。

オブジェクト再利用不可(デフォルト)モードについてのコーディングガイドライン:

  • メソッドコールをまたいで入力オブジェクトを記録あるいは読み込まないでください。
  • 発行した後でオブジェクトを読み込まないでください。

オブジェクト再利用可

オブジェクト再利用可モードでは、Flinkのランタイムはオブジェクトのインスタンスの数を最小にします。これによりパフォーマンスを改善し、Javaガベージコレクションのプレッシャーを削減することができます。オブジェクト再利用可モードはExecutionConfig.enableObjectReuse()を呼び出すことで有効になります。以下の表は、ユーザ関数がどのようにユーザ関数がオブジェクト再利用可モードで入力オブジェクトと出力オブジェクトにアクセスできるかを説明します。

オペレーション 保証と制約
通常のメソッドパラメータとして受け取った入力オブジェクトの読み込み 通常のメソッド引数として受け取った入力オブジェクトは関数呼び出しの間に修正されません。オブジェクトはメソッド呼び出しから離れた後で変更することができます。関数呼び出しをまたいでオブジェクトを記憶することは安全ではありません
Iterableパラメータから受け取った入力オブジェクトの読み込み Iterableから受け取った入力オブジェクトはnext()メソッドが呼び出されるまでのみ有効です。IterableあるいはIteratorは同じオブジェクトインスタンスを複数回提供することができます。Iterableから受け取った入力オブジェクトを、例えばListあるいはMapに配置することで、記憶することは 安全ではありません
入力オブジェクトの修正 MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction および InputFormat.next(reuse) の入力オブジェクトを除いて、入力オブジェクトを修正するべきではありません
入力オブジェクトの発行 MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction および InputFormat.next(reuse) の入力オブジェクトを除いて、入力オブジェクトを発行するべきではありません
出力オブジェクトの読み込み Collectorに渡されたオブジェクト、あるいはメソッドの結果として返されたオブジェクトは、値が変わっているかも知れません。出力オブジェクトを読むことは安全ではありません
出力オブジェクトの修正 出力オブジェクトを修正し、再び発行することができます。

オブジェクト再利用可についてのコーディングガイドライン:

  • Iterableから受け取った入力オブジェクトを記憶しないでください。
  • メソッドコールをまたいで入力オブジェクトを記録あるいは読み込まないでください。
  • MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction および InputFormat.next(reuse)の入力オブジェクトを除いて、入力オブジェクトを修正あるいは発行しないでください。
  • オブジェクトのインスタンスを削減するために、繰り返し修正されるが読み込まれない専用の出力オブジェクトをいつでも発行することができます。

上に戻る

デバッギング

分散クラスタ内で大きなデータセットにデータ解析プログラムを実行する前に、実装アルゴリズムが期待した通りに動作することを確認することは良い考えです。 従って、データ解析プログラムの実装は通常は結果の調査、デバッグ、そして改善です。

Flink はIDE内からのローカルデバッグのサポート、テストデータの挿入、および結果データの収集によって、データ解析プログラムの開発処理を飛躍的に簡単にする2,3の良い機能を提供します。この章ではFlinkプログラムの開発をどうやって簡単にするかのいくつかのヒントを与えます。

ローカルの実行環境

LocalEnvironment は生成されたものと同じJVMプロセス内でFlinkシステムを開始します。IDEから LocalEnvironment を開始する場合、コード内にブレークポイントを設定し、簡単にプログラムをデバッグすることができます。

LocalEnvironment は以下のように生成され利用されます:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

DataSet<String> lines = env.readTextFile(pathToTextFile);
// build your program

env.execute();
val env = ExecutionEnvironment.createLocalEnvironment()

val lines = env.readTextFile(pathToTextFile)
// build your program

env.execute();

コレクションのデータソースとsink

入力ファイルを作成し出力ファイルを読み込むことで行う場合は、解析プログラムのための入力を提供し出力を調査することが厄介です。Flink はテストを簡単にするJavaコレクションで支援される特別なデータソースとシンクを備えています。プログラムがいったんテストされると、ソースとシンクはHDFSのよな外部のデータソースからの読み込み/への書き込みによって簡単に置き換えることができます。

データソースのコレクションは以下のように使うことができます:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

// Create a DataSet from a list of elements
DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataSet from any Java collection
List<Tuple2<String, Integer>> data = ...
DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataSet from an Iterator
Iterator<Long> longIt = ...
DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);

データシンクのコレクションは以下のように指定されます:

DataSet<Tuple2<String, Integer>> myResult = ...

List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>();
myResult.output(new LocalCollectionOutputFormat(outData));

注意: 現在のところ、データシンクのコレクションはデバッグツールのようなローカルの実行に制限されます。

val env = ExecutionEnvironment.createLocalEnvironment()

// Create a DataSet from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataSet from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataSet from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意: 現在のところ、データソースのコレクションはSerializableを実装するデータタイプとイテレータを必要とします。更に、データソースのコレクションは並行 ( 並行度 = 1)で実行することができません。

上に戻る

セマンティックなアノテーション

Flinkに関数の挙動についてのヒントを与えるためにセマンティックなアノテーションを使うことができます。They tell the system which fields of a function’s input the function reads and evaluates and which fields it unmodified forwards from its input to its output. Semantic annotations are a powerful means to speed up execution, because they allow the system to reason about reusing sort orders or partitions across multiple operations. Using semantic annotations may eventually save the program from unnecessary data shuffling or unnecessary sorts and significantly improve the performance of a program.

Note: The use of semantic annotations is optional. However, it is absolutely crucial to be conservative when providing semantic annotations! Incorrect semantic annotations will cause Flink to make incorrect assumptions about your program and might eventually lead to incorrect results. If the behavior of an operator is not clearly predictable, no annotation should be provided. Please read the documentation carefully.

The following semantic annotations are currently supported.

Forwarded Fields Annotation

Forwarded fields information declares input fields which are unmodified forwarded by a function to the same position or to another position in the output. This information is used by the optimizer to infer whether a data property such as sorting or partitioning is preserved by a function. For functions that operate on groups of input elements such as GroupReduce, GroupCombine, CoGroup, and MapPartition, all fields that are defined as forwarded fields must always be jointly forwarded from the same input element. The forwarded fields of each element that is emitted by a group-wise function may originate from a different element of the function’s input group.

Field forward information is specified using field expressions. Fields that are forwarded to the same position in the output can be specified by their position. The specified position must be valid for the input and output data type and have the same type. For example the String "f2" declares that the third field of a Java input tuple is always equal to the third field in the output tuple.

Fields which are unmodified forwarded to another position in the output are declared by specifying the source field in the input and the target field in the output as field expressions. The String "f0->f2" denotes that the first field of the Java input tuple is unchanged copied to the third field of the Java output tuple. The wildcard expression * can be used to refer to a whole input or output type, i.e., "f0->*" denotes that the output of a function is always equal to the first field of its Java input tuple.

Multiple forwarded fields can be declared in a single String by separating them with semicolons as "f0; f2->f1; f3->f2" or in separate Strings "f0", "f2->f1", "f3->f2". When specifying forwarded fields it is not required that all forwarded fields are declared, but all declarations must be correct.

Forwarded field information can be declared by attaching Java annotations on function class definitions or by passing them as operator arguments after invoking a function on a DataSet as shown below.

Function Class Annotations
  • @ForwardedFields for single input functions such as Map and Reduce.
  • @ForwardedFieldsFirst for the first input of a functions with two inputs such as Join and CoGroup.
  • @ForwardedFieldsSecond for the second input of a functions with two inputs such as Join and CoGroup.
Operator Arguments
  • data.map(myMapFnc).withForwardedFields() for single input function such as Map and Reduce.
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst() for the first input of a function with two inputs such as Join and CoGroup.
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond() for the second input of a function with two inputs such as Join and CoGroup.

Please note that it is not possible to overwrite field forward information which was specified as a class annotation by operator arguments.

The following example shows how to declare forwarded field information using a function class annotation:

@ForwardedFields("f0->f2")
public class MyMap implements
              MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>> {
  @Override
  public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
  }
}
@ForwardedFields("_1->_3")
class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
   def map(value: (Int, Int)): (String, Int, Int) = {
    return ("foo", value._2 / 2, value._1)
  }
}

Non-Forwarded Fields

Non-forwarded fields information declares all fields which are not preserved on the same position in a function’s output. The values of all other fields are considered to be preserved at the same position in the output. Hence, non-forwarded fields information is inverse to forwarded fields information. Non-forwarded field information for group-wise operators such as GroupReduce, GroupCombine, CoGroup, and MapPartition must fulfill the same requirements as for forwarded field information.

IMPORTANT: The specification of non-forwarded fields information is optional. However if used, ALL! non-forwarded fields must be specified, because all other fields are considered to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.

Non-forwarded fields are specified as a list of field expressions. The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings. For example both "f1; f3" and "f1", "f3" declare that the second and fourth field of a Java tuple are not preserved in place and all other fields are preserved in place. Non-forwarded field information can only be specified for functions which have identical input and output types.

Non-forwarded field information is specified as function class annotations using the following annotations:

  • @NonForwardedFields for single input functions such as Map and Reduce.
  • @NonForwardedFieldsFirst for the first input of a function with two inputs such as Join and CoGroup.
  • @NonForwardedFieldsSecond for the second input of a function with two inputs such as Join and CoGroup.

The following example shows how to declare non-forwarded field information:

@NonForwardedFields("f1") // second field is not forwarded
public class MyMap implements
              MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
  }
}
@NonForwardedFields("_2") // second field is not forwarded
class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
  def map(value: (Int, Int)): (Int, Int) = {
    return (value._1, value._2 / 2)
  }
}

Read Fields

Read fields information declares all fields that are accessed and evaluated by a function, i.e., all fields that are used by the function to compute its result. For example, fields which are evaluated in conditional statements or used for computations must be marked as read when specifying read fields information. Fields which are only unmodified forwarded to the output without evaluating their values or fields which are not accessed at all are not considered to be read.

IMPORTANT: The specification of read fields information is optional. However if used, ALL! read fields must be specified. It is safe to declare a non-read field as read.

Read fields are specified as a list of field expressions. The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings. For example both "f1; f3" and "f1", "f3" declare that the second and fourth field of a Java tuple are read and evaluated by the function.

Read field information is specified as function class annotations using the following annotations:

  • @ReadFields for single input functions such as Map and Reduce.
  • @ReadFieldsFirst for the first input of a function with two inputs such as Join and CoGroup.
  • @ReadFieldsSecond for the second input of a function with two inputs such as Join and CoGroup.

The following example shows how to declare read field information:

@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function.
public class MyMap implements
              MapFunction<Tuple4<Integer, Integer, Integer, Integer>,
                          Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> val) {
    if(val.f0 == 42) {
      return new Tuple2<Integer, Integer>(val.f0, val.f1);
    } else {
      return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
    }
  }
}
@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
    if (value._1 == 42) {
      return (value._1, value._2)
    } else {
      return (value._4 + 10, value._2)
    }
  }
}

上に戻る

ブロードキャスト変数

Broadcast variables allow you to make a data set available to all parallel instances of an operation, in addition to the regular input of the operation. This is useful for auxiliary data sets, or data-dependent parameterization. The data set will then be accessible at the operator as a Collection.

  • Broadcast: broadcast sets are registered by name via withBroadcastSet(DataSet, String), and
  • Access: accessible via getRuntimeContext().getBroadcastVariable(String) at the target operator.
// 1. The DataSet to be broadcasted
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcasted DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

Make sure that the names (broadcastSetName in the previous example) match when registering and accessing broadcasted data sets. For a complete example program, have a look at K-Means Algorithm.

// 1. The DataSet to be broadcasted
val toBroadcast = env.fromElements(1, 2, 3)

val data = env.fromElements("a", "b")

data.map(new RichMapFunction[String, String]() {
    var broadcastSet: Traversable[String] = null

    override def open(config: Configuration): Unit = {
      // 3. Access the broadcasted DataSet as a Collection
      broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
    }

    def map(in: String): String = {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

Make sure that the names (broadcastSetName in the previous example) match when registering and accessing broadcasted data sets. For a complete example program, have a look at KMeans Algorithm.

Note: As the content of broadcast variables is kept in-memory on each node, it should not become too large. For simpler things like scalar values you can simply make parameters part of the closure of a function, or use the withParameters(...) method to pass in a configuration.

上に戻る

分散キャッシュ

Flink offers a distributed cache, similar to Apache Hadoop, to make files locally accessible to parallel instances of user functions. This functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.

The cache works as follows. A program registers a file or directory of a local or remote filesystem such as HDFS or S3 under a specific name in its ExecutionEnvironment as a cached file. When the program is executed, Flink automatically copies the file or directory to the local filesystem of all workers. A user function can look up the file or directory under the specified name and access it from the worker’s local filesystem.

The distributed cache is used as follows:

Register the file or directory in the ExecutionEnvironment.

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

Access the cached file or directory in a user function (here a MapFunction). The function must extend a RichFunction class because it needs access to the RuntimeContext.

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {

      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}

Register the file or directory in the ExecutionEnvironment.

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

Access the cached file in a user function (here a MapFunction). The function must extend a RichFunction class because it needs access to the RuntimeContext.

// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

上に戻る

関数へのパラメータの渡し方

Parameters can be passed to functions using either the constructor or the withParameters(Configuration) method. The parameters are serialized as part of the function object and shipped to all parallel task instances.

Check also the best practices guide on how to pass command line arguments to functions.

Via Constructor

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

toFilter.filter(new MyFilter(2));

private static class MyFilter implements FilterFunction<Integer> {

  private final int limit;

  public MyFilter(int limit) {
    this.limit = limit;
  }

  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}
val toFilter = env.fromElements(1, 2, 3)

toFilter.filter(new MyFilter(2))

class MyFilter(limit: Int) extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = {
    value > limit
  }
}

Via withParameters(Configuration)

This method takes a Configuration object as an argument, which will be passed to the rich function’s open() method. The Configuration object is a Map from String keys to different value types.

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

Configuration config = new Configuration();
config.setInteger("limit", 2);

toFilter.filter(new RichFilterFunction<Integer>() {
    private int limit;

    @Override
    public void open(Configuration parameters) throws Exception {
      limit = parameters.getInteger("limit", 0);
    }

    @Override
    public boolean filter(Integer value) throws Exception {
      return value > limit;
    }
}).withParameters(config);
val toFilter = env.fromElements(1, 2, 3)

val c = new Configuration()
c.setInteger("limit", 2)

toFilter.filter(new RichFilterFunction[Int]() {
    var limit = 0

    override def open(config: Configuration): Unit = {
      limit = config.getInteger("limit", 0)
    }

    def filter(in: Int): Boolean = {
        in > limit
    }
}).withParameters(c)

Globally via the ExecutionConfig

Flink also allows to pass custom configuration values to the ExecutionConfig interface of the environment. Since the execution config is accessible in all (rich) user functions, the custom configuration will be available globally in all functions.

Setting a custom global configuration

Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);
val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)

Please note that you can also pass a custom class extending the ExecutionConfig.GlobalJobParameters class as the global job parameters to the execution config. The interface allows to implement the Map<String, String> toMap() method which will in turn show the values from the configuration in the web frontend.

Accessing values from the global configuration

Objects in the global job parameters are accessible in many places in the system. All user functions implementing a RichFunction interface have access through the runtime context.

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private String mykey;
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
      Configuration globConf = (Configuration) globalParams;
      mykey = globConf.getString("mykey", null);
    }
    // ... more here ...

上に戻る

TOP
inserted by FC2 system