Flink データセット API プログラミング ガイド

Flinkのデータセット プログラムはデータセット上の変換を実装する通常のプログラムです(例えば、フィルタリング、マッピング、ジョイニング、グルーピング)。データセットは最初に特定のソースから生成されます(例えば、ファイルからの読み込み、あるいはローカルのコレクションから)。結果はsinkを使って返されます。これは例えば(分散された)ファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。

Flink APIの基本的な概念の紹介については基本的な概念を見てください。

独自のFlinkデータセットプログラムを作成するには、Flinkプログラムの内部構造から始め、次第に独自の 変換を追加することをお勧めします。残りのセクションは、追加のオペレーションと上級の特徴についてのリファレンスとして振る舞います。

プログラムの例

以下のプログラムはWordCountの完全に動作する例です。ローカルでそれを動作するためにコードをコピー&ペーストすることができます。しなければならないことは、現在のFlinkのライブラリをプロジェクトに入れ (Flinkとのリンクの章を見てください)、importを指定することだけです。これで準備が整いました!

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

上に戻る

データセットの変換

データ変換は1つ以上のデータセットを新しいデータセットに変換します。プログラムは複数の変換を洗練された集合体に組み合わせることができます。

この章は利用可能な変換の短い概要を説明します。変換ドキュメント には例付きの全ての変換の完全な説明があります。


変換 解説
Map

一つの要素を取り、一つの要素を生成します。

data.map(new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
FlatMap

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。

data.flatMap(new FlatMapFunction<String, String>() {
  public void flatMap(String value, Collector<String> out) {
    for (String s : value.split(" ")) {
      out.collect(s);
    }
  }
});
MapPartition

1回の関数の呼び出しで並行するパーティションを変換する。関数はパーティションをIterable ストリームとして取得し、任意の数の結果の値を生成することができます。各パーティション内の要素の数は並行度の度合と以前のオペレーションに依存します。

data.mapPartition(new MapPartitionFunction<String, Long>() {
  public void mapPartition(Iterable<String> values, Collector<Long> out) {
    long c = 0;
    for (String s : values) {
      c++;
    }
    out.collect(c);
  }
});
フィルター

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。
重要: システムは、関数は述部が適用される要素を修正することは無いと仮定します。この仮定を破ると間違った結果に繋がるかも知れません。

data.filter(new FilterFunction<Integer>() {
  public boolean filter(Integer value) { return value > 1000; }
});
Reduce

再帰的に二つの要素を1つに結合することで、要素のグループを1つの要素に結合します。Reduceは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduce(new ReduceFunction<Integer> {
  public Integer reduce(Integer a, Integer b) { return a + b; }
});
ReduceGroup

要素のグループを1つ以上の要素に結合します。ReduceGroupは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
  public void reduce(Iterable<Integer> values, Collector<Integer> out) {
    int prefixSum = 0;
    for (Integer i : values) {
      prefixSum += i;
      out.collect(prefixSum);
    }
  }
});

reduce がグループ化されたデータセットに適用された場合、CombineHintを二つ目のパラメータに与えることで、ランタイムがreduceの結合フェーズを実行する方法を指定することができます。ハッシュベースの戦略がほとんどの場合において高速で、異なるキーの数が入力の要素の数に比べて小さい場合がそうです。(例えば 1/10)。

Aggregate

値のグループを1つの値に集約します。Aggregation 関数は組み込みのreduce関数と考えることができます。Aggregateは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);

最小、最大、および集約のための略記構文も使うことができます。

Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
Distinct

データセットの明確に別個な要素を返します。入力データセットから、要素の全てのフィールドに関して、あるいはフィールドの部分集合に関して重複したエントリを削除します。

data.distinct();
Join キーが等しい二つのデータセットの要素の全てのペアを生成し結合します。任意で、JoinFunctionを使って要素のペアを1つの要素に変換、あるいは FlatJoinFunction を使って要素のペアを任意の多く(noneを含む)の要素へ変換します。joinキーを定義する方法を学ぶにはkeys sectionを見てください。
result = input1.join(input2)
               .where(0)       // key of the first input (tuple field 0)
               .equalTo(1);    // key of the second input (tuple field 1)
Join Hintsを使ってランタイムがjoinを実行する方法を指定することができます。ヒントはパーティションあるいはブロードキャスティングを経由してjoinが起こるかどうか、およびそれがソート ベースあるいはハッシュ ベースのアルゴリズムを使うかどうかを記述します。可能なヒントのリストと例については、変換ガイドを参照してください。もしヒントが指定されない場合は、システムは入力サイズの推測を行い、それらの推測に応じて最良の戦略を取るようにするでしょう。
// This executes a join by broadcasting the first data set
// using a hash table for the broadcasted data
result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
               .where(0).equalTo(1);
join変換はequi-joinsのためにのみ動作することに注意してください。他のjoinタイプは OuterJoin あるいは CoGroup を使って表現される必要があります。
OuterJoin 二つのデータセット上で、left、right あるいは full outer join を行います。Outer joins は通常(inner)のjoinに似ていて、キーが等しい要素の全てのペアを生成します。更に、"outer"側 (left, right, あるいはfullの場合は両方)はもし他方に一致するキーが見つからない場合でも残されます。要素のペアから1つの要素に変えるために、要素の揃いのペア(あるいは1つの要素と、他の入力のためのnull)がJoinFunctionに渡されます。あるいは、要素のペアから任意の多数(無しも含む)の要素に変えるためにFlatJoinFunctionに渡されます。joinのキーを定義する方法を学ぶには、keys section を見てください。
input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
      .where(0)              // key of the first input (tuple field 0)
      .equalTo(1)            // key of the second input (tuple field 1)
      .with(new JoinFunction<String, String, String>() {
          public String join(String v1, String v2) {
             // NOTE:
             // - v2 might be null for leftOuterJoin
             // - v1 might be null for rightOuterJoin
             // - v1 OR v2 might be null for fullOuterJoin
          }
      });
CoGroup

reduce オペレーションの二次元の変数。1つ以上のフィールドの各入力をグループ化し、グループをjoinします。変換関数はグループの各ペアごとに呼ばれます。coGroupキーを定義する方法を学ぶにはkeys sectionを見てください。

data1.coGroup(data2)
     .where(0)
     .equalTo(1)
     .with(new CoGroupFunction<String, String, String>() {
         public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) {
           out.collect(...);
         }
      });
Cross

要素の全てのペアを作成して、二つの入力のデカルト積(クロス積)を構築します。任意で要素のペアを1つの要素に変換するためにCrossFunctionを使います。

DataSet<Integer> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<Tuple2<Integer, String>> result = data1.cross(data2);

Note: Cross is potentially a very compute-intensive operation which can challenge even large compute clusters!crossWithTiny()crossWithHuge()を使ってシステムにデータセットのサイズについてのヒントを与えることをお勧めします。

和集合

二つのデータセットの和集合を生成します。

DataSet<String> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<String> result = data1.union(data2);
リバランス

データの歪を除去するために一様にデータセットの並行パーティションをリバランスします。Mapのような変換だけがリバランス変換に従うかも知れません。

DataSet<String> in = // [...]
DataSet<String> result = in.rebalance()
                           .map(new Mapper());
ハッシュ パーティション

指定されたキーにデータセットをハッシュパーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByHash(0)
                            .mapPartition(new PartitionMapper());
レンジ パーティション

指定されたキーにデータセットをレンジ パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByRange(0)
                            .mapPartition(new PartitionMapper());
カスタム パーティション

手動でデータ上のパーティションを指定します。
注意: このメソッドは1つのフィールドキーでのみ動作します。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
ソート パーティション

指定されたフィールド上で指定された順番でデータセットの全てのパーティションをローカルでソートする。フィールドは、タプル位置あるいはフィールド表現として指定することができます。複数のフィールドでのソートは sortPartition() 呼び出しを繋げることで行われます。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
                            .mapPartition(new PartitionMapper());
First-n

データセットの最初のn(任意)個の要素を返す。First-n は通常のデータセット、グループ化されたデータセット、あるいはグループ化されソートされたデータセット上に適用することができます。グループ化のキーはキー選択機能あるいはフィールド位置キーとして指定することができます。

DataSet<Tuple2<String,Integer>> in = // [...]
// regular data set
DataSet<Tuple2<String,Integer>> result1 = in.first(3);
// grouped data set
DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0)
                                            .first(3);
// grouped-sorted data set
DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0)
                                            .sortGroup(1, Order.ASCENDING)
                                            .first(3);

以下の変換はタプルのデータセット上で利用可能です:

変換 解説
プロジェクト

タプルからフィールドのサブセットを選択する

DataSet<Tuple3<Integer, Double, String>> in = // [...]
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
MinBy / MaxBy

1つ以上のフィールドの値が最小(最大)であるタプルのグループからタプルを選択する。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小(最大)フィールド値を持つ場合は、これらのタプルの任意のタプルが返されます。MinBy (MaxBy) はデータセット全体あるいはグループ化されたデータセットに適用されるかも知れません。

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// a DataSet with a single tuple with minimum values for the Integer and String fields.
DataSet<Tuple3<Integer, Double, String>> out = in.minBy(0, 2);
// a DataSet with one tuple for each group with the minimum value for the Double field.
DataSet<Tuple3<Integer, Double, String>> out2 = in.groupBy(2)
                                                  .minBy(1);


変換 解説
Map

一つの要素を取り、一つの要素を生成します。

data.map { x => x.toInt }
FlatMap

一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。

data.flatMap { str => str.split(" ") }
MapPartition

1回の関数の呼び出しで並行するパーティションを変換する。関数は'Iterator'としてパーティションを取り、任意の数の結果値を生成することができます。各パーティション内の要素の数は並行度の度合と以前のオペレーションに依存します。

data.mapPartition { in => in map { (_, 1) } }
フィルター

各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。
重要: システムは、関数は述部が適用される要素を修正することは無いと仮定します。この仮定を破ると間違った結果に繋がるかも知れません。

data.filter { _ > 1000 }
Reduce

再帰的に二つの要素を1つに結合することで、要素のグループを1つの要素に結合します。Reduceは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduce { _ + _ }
ReduceGroup

要素のグループを1つ以上の要素に結合します。ReduceGroupは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

data.reduceGroup { elements => elements.sum }
Aggregate

値のグループを1つの値に集約します。Aggregation 関数は組み込みのreduce関数と考えることができます。Aggregateは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2);

最小、最大、および集約のための略記構文も使うことができます。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
Distinct

データセットの明確に別個な要素を返します。入力データセットから、要素の全てのフィールドに関して、あるいはフィールドの部分集合に関して重複したエントリを削除します。

data.distinct()
Join キーが等しい二つのデータセットの要素の全てのペアを生成し結合します。任意で、JoinFunctionを使って要素のペアを1つの要素に変換、あるいは FlatJoinFunction を使って要素のペアを任意の多く(noneを含む)の要素へ変換します。joinキーを定義する方法を学ぶにはkeys sectionを見てください。
// In this case tuple fields are used as keys. "0" is the join field on the first tuple
// "1" is the join field on the second tuple.
val result = input1.join(input2).where(0).equalTo(1)
Join Hintsを使ってランタイムがjoinを実行する方法を指定することができます。ヒントはパーティションあるいはブロードキャスティングを経由してjoinが起こるかどうか、およびそれがソート ベースあるいはハッシュ ベースのアルゴリズムを使うかどうかを記述します。可能なヒントのリストと例については、変換ガイドを参照してください。もしヒントが指定されない場合は、システムは入力サイズの推測を行い、それらの推測に応じて最良の戦略を取るようにするでしょう。
// This executes a join by broadcasting the first data set
// using a hash table for the broadcasted data
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                   .where(0).equalTo(1)
join変換はequi-joinsのためにのみ動作することに注意してください。他のjoinタイプは OuterJoin あるいは CoGroup を使って表現される必要があります。
OuterJoin 二つのデータセット上で、left、right あるいは full outer join を行います。Outer joins は通常(inner)のjoinに似ていて、キーが等しい要素の全てのペアを生成します。更に、"outer"側 (left, right, あるいはfullの場合は両方)はもし他方に一致するキーが見つからない場合でも残されます。要素のペアから1つの要素に変えるために、要素の揃いのペア(あるいは1つの要素と、他の入力のための'null'値)がJoinFunctionに渡されます。あるいは、要素のペアから任意の多数(無しも含む)の要素に変えるためにFlatJoinFunctionに渡されます。joinキーを定義する方法を学ぶにはkeys sectionを見てください。
val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
   (left, right) =>
     val a = if (left == null) "none" else left._1
     (a, right)
  }
CoGroup

reduce オペレーションの二次元の変数。1つ以上のフィールドの各入力をグループ化し、グループをjoinします。変換関数はグループの各ペアごとに呼ばれます。coGroupキーを定義する方法を学ぶにはkeys sectionを見てください。

data1.coGroup(data2).where(0).equalTo(1)
Cross

要素の全てのペアを作成して、二つの入力のデカルト積(クロス積)を構築します。任意で要素のペアを1つの要素に変換するためにCrossFunctionを使います。

val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)

Note: Cross is potentially a very compute-intensive operation which can challenge even large compute clusters!crossWithTiny()crossWithHuge()を使ってシステムにデータセットのサイズについてのヒントを与えることをお勧めします。

和集合

二つのデータセットの和集合を生成します。

data.union(data2)
リバランス

データの歪を除去するために一様にデータセットの並行パーティションをリバランスします。Mapのような変換だけがリバランス変換に従うかも知れません。

val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)
ハッシュ パーティション

指定されたキーにデータセットをハッシュパーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
レンジ パーティション

指定されたキーにデータセットをレンジ パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
カスタム パーティション

手動でデータ上のパーティションを指定します。
注意: このメソッドは1つのフィールドキーでのみ動作します。

val in: DataSet[(Int, String)] = // [...]
val result = in
  .partitionCustom(partitioner: Partitioner[K], key)
ソート パーティション

指定されたフィールド上で指定された順番でデータセットの全てのパーティションをローカルでソートする。フィールドは、タプル位置あるいはフィールド表現として指定することができます。複数のフィールドでのソートは sortPartition() 呼び出しを繋げることで行われます。

val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
First-n

データセットの最初のn(任意)個の要素を返す。First-n は通常のデータセット、グループ化されたデータセット、あるいはグループ化されソートされたデータセット上に適用することができます。グループ化のキーはキー選択機能、タプル位置あるいはcase class フィールドとして指定することができます。

val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

以下の変換はタプルのデータセット上で利用可能です:

変換 解説
MinBy / MaxBy

1つ以上のフィールドの値が最小(最大)であるタプルのグループからタプルを選択する。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小(最大)フィールド値を持つ場合は、これらのタプルの任意のタプルが返されます。MinBy (MaxBy) はデータセット全体あるいはグループ化されたデータセットに適用されるかも知れません。

val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
                                             .minBy(1)

匿名パターンマッチングを使ったタプル、case class および コレクションからの抽出は、以下のようになります:

val data: DataSet[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

そのままのAPIではサポートされません。この機能を使うには、Scala API 拡張を使う必要があります。

変換の並行度setParallelism(int)によって定義することができ、一方でname(String) はデバッグに便利な独自の名前を変換に割り当てます。同じことが Data SourcesData Sinksについても可能です。

withParameters(Configuration) はConfigurationオブジェクトを渡します。そしてユーザ定義の関数内でopen()メソッドからアクセスすることができます。

上に戻る

データソース

データソースはファイル、あるいはJavaコレクションから、初期データセットを生成します。データセットを生成する一般的な気候はInputFormatの後ろで抽象化されます。Flinkは一般的なファイル形式からデータセットを生成するいくつかの組み込みのフォーマットが付属しています。それらの多くはExecutionEnvironment上にショートカットメソッドを持ちます。

ファイルベース:

  • readTextFile(path) / TextInputFormat - ファイルを行ごとに読み込み、文字列として返します。

  • readTextFileWithValue(path) / TextValueInputFormat - ファイルを行ごとに読み込み、StringValueとして返します。StringValues は変更可能な文字列です。

  • readCsvFile(path) / CsvInputFormat - カンマ(あるいは他の文字)でフィールドが区切られたファイルをパースします。タプルあるいはPOJOのデータセットを返します。基本java型とフィールド型としてそれらのValueに対応するものをサポートします。

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - 改行(あるいはその他の文字シーケンス)によって区切られたString あるいは Integerのようなプリミティブ データ型をパースします。

  • readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat - 改行(あるいはその他の文字シーケンス)によって区切られたString あるいは Integerのようなプリミティブ データ型をパースします。

  • readHadoopFile(FileInputFormat, Key, Value, path) / FileInputFormat - JobConfを生成し指定されたパスから指定されたFileInputFormatとKeyクラスとValueクラスを使ってファイルを読み込み、それらをTuple2<Key, Value>として返します。

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - JobConfを生成し、指定されたパスから指定された型のSequenceFileInputFormatと Key クラスと Value クラスを使ってファイルを読み込み、それらを Tuple2<Key, Value>として返します。

コレクション ベース:

  • fromCollection(Collection) - JavaのJava.util.Collectionからデータセットを生成します。コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator, Class) - イテレータからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(T ...) - 渡されたオブジェクトのシーケンスからデータセットを生成します。全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator, Class) - 並行してイテレータからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

一般:

  • readFile(inputFormat, path) / FileInputFormat - ファイルの入力フォーマットを受け付けます。

  • createInput(inputFormat) / InputFormat - 一般的な入力フォーマットを受け付けます。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

// read text file from a HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
	                       .types(Integer.class, String.class, Double.class);

// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
	                       .types(String.class, Double.class);

// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");


// read a file from the specified path of type TextInputFormat
DataSet<Tuple2<LongWritable, Text>> tuples =
 env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

// generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      // create and configure input format
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .finish(),
      // specify type information for DataSet
      new TupleTypeInfo(Tuple2.class, STRING_TYPE_INFO, INT_TYPE_INFO)
    );

// Note: Flink's program compiler needs to infer the data types of the data items which are returned
// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
// manually provide the type information as shown in the examples above.

CSVパースの設定

Flink はCSVパースのための多くの設定オプションを持ちます:

  • types(Class ... types) パースするフィールドの型を指定します。パースされたフィールドの型を設定することは必須です。 タイプ クラス Boolean.class の場合、“True” (大文字小文字を区別しません)、“False” (大文字小文字を区別しません)、“1” および “0” は真偽値として扱われます。

  • lineDelimiter(String del) 個々のレコードのデリミタを指定する。デフォルトの行デリミタは改行文字'\n'です。

  • fieldDelimiter(String del) レコードのフィールドを分割するデリミタを指定する。デフォルトのフィールドのデリミタはカンマ文字 ','です。

  • includeFields(boolean ... flag), includeFields(String mask) あるいは includeFields(long bitMask) はどのフィールドを入力ファイルから読み取るか(そしてどれを無視するか)を定義します。デフォルトでは、最初の n 個のフィールド (types() 呼び出しの中の型の数で定義) がパースされます。

  • parseQuotedStrings(char quoteChar) はクォートされた文字のパースを有効にします。もし文字列フィールドの最初の文字がクォート文字であれば、文字列はクォートされた文字列としてパースされます (前後の空白はトリムされません)。クォートされた文字列内のフィールド デリミタは無視されます。クォートされたフィールドの最後の文字がクォート文字では無いか、クォートされた文字列フィールドの開始あるいは終了では無い場所に(クォート文字が ‘'を使ってエスケープされずに)クォート文字が現れた場合は、クォートされた文字列のパースは失敗します。クォート文字のパースが有効にされ、フィールドの最初の文字がクォート文字では無い場合は、文字列はクォートされていない文字列としてパースされます。デフォルトでは、クォートされた文字列のパースは無効です。

  • ignoreComments(String commentPrefix) コメントの接頭語を指定します。指定されたコメント接頭語で始まる全ての行はパースされず無視されます。デフォルトでは、無視される行はありません。

  • ignoreInvalidLines() は寛大なパースを有効にします。つまり、正しくパースされない行は無視されます。デフォルトでは、寛大なパースは無効で、無効な行は例外を起こします。

  • ignoreFirstLine() はInputFormatが入力ファイルの最初の行を無視するように設定します。デフォルトでは行は無視されません。

入力パスディレクトリの再帰的な横断

ファイルベースの入力について、入力のパスがディレクトリの場合、入れ子のファイルはデフォルトで列挙されません。代わりに、基本ディレクトリ内のファイルだけが読み込まれ、一方で入れ子のファイルは無視されます。以下の例のようにrecursive.file.enumeration 設定パラメータを使って、入れ子のファイルの再帰的な列挙が可能です。

// enable recursive enumeration of nested input files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);

// pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
			  .withParameters(parameters);

データソースはファイル、あるいはJavaコレクションから、初期データセットを生成します。データセットを生成する一般的な気候はInputFormatの後ろで抽象化されます。Flinkは一般的なファイル形式からデータセットを生成するいくつかの組み込みのフォーマットが付属しています。それらの多くはExecutionEnvironment上にショートカットメソッドを持ちます。

ファイルベース:

  • readTextFile(path) / TextInputFormat - ファイルを行ごとに読み込み、文字列として返します。

  • readTextFileWithValue(path) / TextValueInputFormat - ファイルを行ごとに読み込み、StringValueとして返します。StringValues は変更可能な文字列です。

  • readCsvFile(path) / CsvInputFormat - カンマ(あるいは他の文字)でフィールドが区切られたファイルをパースします。タプル、caseクラスオブジェクト、あるいはPOJOを返します。基本java型とフィールド型としてそれらのValueに対応するものをサポートします。

  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - 改行(あるいはその他の文字シーケンス)によって区切られたString あるいは Integerのようなプリミティブ データ型をパースします。

  • readHadoopFile(FileInputFormat, Key, Value, path) / FileInputFormat - JobConfを生成し指定されたパスから指定されたFileInputFormatとKeyクラスとValueクラスを使ってファイルを読み込み、それらをTuple2<Key, Value>として返します。

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - JobConfを生成し、指定されたパスから指定された型のSequenceFileInputFormatと Key クラスと Value クラスを使ってファイルを読み込み、それらを Tuple2<Key, Value>として返します。

コレクション ベース:

  • fromCollection(Seq) - Seqからデータセットを生成します。コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator) - Iteratorからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(elements: _*) - 指定されたオブジェクトの系列からデータセットを生成します。全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator) - 並行してイテレータからデータセットを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

一般:

  • readFile(inputFormat, path) / FileInputFormat - ファイルの入力フォーマットを受け付けます。

  • createInput(inputFormat) / InputFormat - 一般的な入力フォーマットを受け付けます。

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000);

// read a file from the specified path of type TextInputFormat
val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
 classOf[Text], "hdfs://nnHost:nnPort/path/to/file")

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file")

CSVパースの設定

Flink はCSVパースのための多くの設定オプションを持ちます:

  • lineDelimiter: String は個々のレコードのデリミタを指定します。デフォルトの行デリミタは改行文字'\n'です。

  • fieldDelimiter: String はレコードの個々のフィールドを分割するデリミタを指定します。デフォルトのフィールドのデリミタはカンマ文字 ','です。

  • includeFields: Array[Int] はどのフィールドを入力ファイルから読み込むか(そして無視するか)を定義します。デフォルトでは、最初の n 個のフィールド(types() 呼び出し内での型の数によって定義されます)がパースされます。

  • pojoFields: Array[String] はCSVフィールドにマップされるPOJOのフィールドを指定します。CSVフィールドのためのパーサは自動的にPOJOフィールドの型と順番に基づいて初期化されます。

  • parseQuotedStrings: Character はクォートされた文字列のパースを有効にします。もし文字列フィールドの最初の文字がクォート文字であれば、文字列はクォートされた文字列としてパースされます (前後の空白はトリムされません)。クォートされた文字列内のフィールド デリミタは無視されます。もしクォートされた文字列フィールドの最後の文字がクォート文字では無い場合は、クォートされた文字列のパースが失敗します。クォート文字のパースが有効にされ、フィールドの最初の文字がクォート文字では無い場合は、文字列はクォートされていない文字列としてパースされます。デフォルトでは、クォートされた文字列のパースは無効です。

  • ignoreComments: String はコメントのプリフィックスを指定します。指定されたコメント接頭語で始まる全ての行はパースされず無視されます。デフォルトでは、無視される行はありません。

  • lenient: Boolean は寛大なパースを有効にします。つまり、正しくパースされない行は無視されます。デフォルトでは、寛大なパースは無効で、無効な行は例外を起こします。

  • ignoreFirstLine(): Boolean はInputFormatが入力ファイルの最初の行を無視するように設定します。デフォルトでは行は無視されません。

入力パスディレクトリの再帰的な横断

ファイルベースの入力について、入力のパスがディレクトリの場合、入れ子のファイルはデフォルトで列挙されません。代わりに、基本ディレクトリ内のファイルだけが読み込まれ、一方で入れ子のファイルは無視されます。以下の例のようにrecursive.file.enumeration 設定パラメータを使って、入れ子のファイルの再帰的な列挙が可能です。

// enable recursive enumeration of nested input files
val env  = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)

圧縮ファイルの読み込み

もし入力ファイルが適切なファイル拡張子でマークされている場合、Flinkは現在のところ透過的な解凍をサポートします。特に、このことは入力フォーマットの設定がこれ以上必要無く、どのよな FileInputFormat も独自の入力フォーマットを含む圧縮をサポートすることを意味します。圧縮されたファイルは並行して読めないかも知れず、従ってジョブのスケーラビリティに影響があるということに注意してください。

以下の表は現在サポートされている圧縮メソッドをリスト化します。


圧縮メソッド ファイル拡張子 並行可能
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no

上に戻る

データのsink

データ シンクはデータセットを消費し、それらを格納あるいは返すために使う事ができます。データ シンクのオペレーションはOutputFormatを使って説明されます。Flinkはデータセット上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。

  • writeAsText() / TextOuputFormat - 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。
  • writeAsFormattedText() / TextOutputFormat - 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のユーザ定義のformat() メソッドを呼ぶことで取得されます。
  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。
  • print() / printToErr() / print(String msg) / printToErr(String msg) - 標準出力/標準エラー ストリーム上の各要素のtoString()の値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。これはprintの異なる呼び出し間で区別するのに役立ちます。並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。
  • write() / FileOutputFormat - 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。
  • output()/ OutputFormat - (データべースに結果を格納するような)ファイルベースでは無いデータ シンクについての、ほとんどの一般的な出力メソッド。

データセットは複数のオペレータの入力かも知れません。プログラムはデータセットを書き込みあるいは出力することができ、同時にそれらに追加の変換を実行します。

標準的なデータ シンク メソッド:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

独自の出力フォーマットを使う:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

ローカルでソートされた出力

データシンクの出力はタプルのフィールドの場所 あるいは フィールド表現を使って特定のフィールド上で指定された順番でローカルでソートすることができます。これは各出力フォーマットについて動作します。

以下の例はこの機能をどうやって使うかを示します:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

グローバルでソートされた出力はまだサポートされません。

データ シンクはデータセットを消費し、それらを格納あるいは返すために使う事ができます。データ シンクのオペレーションはOutputFormatを使って説明されます。Flinkはデータセット上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。

  • writeAsText() / TextOutputFormat - 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。
  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。
  • print() / printToErr() - 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。
  • write() / FileOutputFormat - 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。
  • output()/ OutputFormat - (データべースに結果を格納するような)ファイルベースでは無いデータ シンクについての、ほとんどの一般的な出力メソッド。

データセットは複数のオペレータの入力かも知れません。プログラムはデータセットを書き込みあるいは出力することができ、同時にそれらに追加の変換を実行します。

標準的なデータ シンク メソッド:

// text data
val textData: DataSet[String] = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")

ローカルでソートされた出力

データシンクの出力はタプルのフィールドの場所 あるいは フィールド表現を使って特定のフィールド上で指定された順番でローカルでソートすることができます。これは各出力フォーマットについて動作します。

以下の例はこの機能をどうやって使うかを示します:

val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print;

// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print;

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...);

グローバルでソートされた出力はまだサポートされません。

上に戻る

繰り返し操作

イテレーションはFlinkプログラム内でのループを実装します。イテレーション オペレータはプログラムの部分をカプセル化し、1つの繰り返しの結果(部分的な解)をフィードバックしながら繰り返し実行します。Flinkには二つの種類の繰り返しがあります: BulkIteration および DeltaIteration

この章は両方のぺれーたをどうやって使うかについての素早い例を提供します。もっと詳細な紹介についてはイテレーションの紹介 ページを調べてください。

バルク イテレーション

BulkIteration を生成するには、イテレーションが開始されなければならないデータセットのiterate(int)メソッドを呼び出します。これはIterativeDataSetを返し、通常のオペレータを使って変換することができます。iterate呼び出しの1つの引数は、繰り返しの最大数を指定します。

繰り返しの終了を指定するために、IterativeDataSet 上にどの変換を次の繰り返しにフィードバックすべきかを指定するためのcloseWith(DataSet)を呼び出します。追加でcloseWith(DataSet, DataSet)を使って追加の条件を指定することができます。これは2つ目のデータセットを評価し、このデータセットが空であれば繰り返しを終了します。終了の条件が指定されない場合は、指定された最大数の繰り返しの後で繰り返しが終了します。

以下の例はPiの数字を繰り返し推測します。ランダムな点の数を数えるのが目的です。これは単位円に陥ります。各繰り返しの中で、ランダムな点が取り上げられます。この点が単位円の中にある場合、カウントを増加します。そして、Pi は繰り返しの数を4倍したもので結果のカウントを割ったものとして推測されます。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create initial IterativeDataSet
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);

DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();

env.execute("Iterative Pi Example");

K-Means exampleを調べることもできます。これはBulkIterationをラベル付けされていない点のセットを群にするために使います。

デルタ イテレーション

デルタ イテレーションは、あるアルゴリズムが各繰り返しの中で結果のデータの各点を変更しないという事実を利用します。

各繰り返しの中でフィードバックされる(ワークセットと呼ばれる)部分的な解に加えて、デルタ イテレーションは繰り返し間での状態を維持します(ソルーション セットと呼ばれます)。これはデルタを使って更新することができます。繰り返しの計算の結果は、最後の繰り返しの後での状態です。デルタ イテレーションの基本的な原理の概要については、繰り返しの紹介 を参照してください。

DeltaIteration の定義は BulkIteration の定義に似ています。For delta iterations, two data sets form the input to each iteration (workset and solution set), and two data sets are produced as the result (new workset, solution set delta) in each iteration.

DeltaIteration を生成するには、iterateDelta(DataSet, int, int) (あるいはそれぞれ iterateDelta(DataSet, int, int[])) を呼び出します。このメソッドは初期の結果セット上で呼び出されます。引数は、初期デルタ セット、繰り返しの最大数、キーの場所 です。返された DeltaIteration オブジェクトにより、iteration.getWorkset()iteration.getSolutionSet()を使ってワークセットと結果セットを表すデータセットへアクセスすることができます。

以下はデルタ イテレーションの構文についての例です。

// read the initial data sets
DataSet<Tuple2<Long, Double>> initialSolutionSet = // [...]

DataSet<Tuple2<Long, Double>> initialDeltaSet = // [...]

int maxIterations = 100;
int keyPosition = 0;

DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialSolutionSet
    .iterateDelta(initialDeltaSet, maxIterations, keyPosition);

DataSet<Tuple2<Long, Double>> candidateUpdates = iteration.getWorkset()
    .groupBy(1)
    .reduceGroup(new ComputeCandidateChanges());

DataSet<Tuple2<Long, Double>> deltas = candidateUpdates
    .join(iteration.getSolutionSet())
    .where(0)
    .equalTo(0)
    .with(new CompareChangesToCurrent());

DataSet<Tuple2<Long, Double>> nextWorkset = deltas
    .filter(new FilterByThreshold());

iteration.closeWith(deltas, nextWorkset)
	.writeAsCsv(outputPath);

バルク イテレーション

BulkIteration を生成するには、イテレーションが開始されなければならないデータセットのiterate(int)メソッドを呼び出し、ステップ関数も指定します。ステップ関数は現在の繰り返しについての入力データセットを取得し、新しいデータセットを返さなければなりません。繰り返しの呼び出しのパラメータは、停止するまでの繰り返しの最大数です。

二つのデータセットを返すステップ関数を受け付ける iterateWithTermination(int) もあります: 繰り返しステップの結果と終了条件です。一度 終了条件のデータセットが空になると繰り返しが終了します。

以下の例はPiの数字を繰り返し推測します。ランダムな点の数を数えるのが目的です。これは単位円に陥ります。各繰り返しの中で、ランダムな点が取り上げられます。この点が単位円の中にある場合、カウントを増加します。そして、Pi は繰り返しの数を4倍したもので結果のカウントを割ったものとして推測されます。

val env = ExecutionEnvironment.getExecutionEnvironment()

// Create initial DataSet
val initial = env.fromElements(0)

val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
    val x = Math.random()
    val y = Math.random()
    i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}

val result = count map { c => c / 10000.0 * 4 }

result.print()

env.execute("Iterative Pi Example");

K-Means exampleを調べることもできます。これはBulkIterationをラベル付けされていない点のセットを群にするために使います。

デルタ イテレーション

デルタ イテレーションは、あるアルゴリズムが各繰り返しの中で結果のデータの各点を変更しないという事実を利用します。

各繰り返しの中でフィードバックされる(ワークセットと呼ばれる)部分的な解に加えて、デルタ イテレーションは繰り返し間での状態を維持します(ソルーション セットと呼ばれます)。これはデルタを使って更新することができます。繰り返しの計算の結果は、最後の繰り返しの後での状態です。デルタ イテレーションの基本的な原理の概要については、繰り返しの紹介 を参照してください。

DeltaIteration の定義は BulkIteration の定義に似ています。For delta iterations, two data sets form the input to each iteration (workset and solution set), and two data sets are produced as the result (new workset, solution set delta) in each iteration.

DeltaIteration を生成するには、初期ソリューションセット上でiterateDelta(initialWorkset, maxIterations, key)を呼びます。ステップ関数は二つのパラメータを取ります: (solutionSet, workset)。そして二つの値を返さなければなりません: (solutionSetDelta, newWorkset)。

以下はデルタ イテレーションの構文についての例です。

// read the initial data sets
val initialSolutionSet: DataSet[(Long, Double)] = // [...]

val initialWorkset: DataSet[(Long, Double)] = // [...]

val maxIterations = 100
val keyPosition = 0

val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}

result.writeAsCsv(outputPath)

env.execute()

上に戻る

関数内のデータオブジェクトへの操作

FlinkのランタイムはJavaオブジェクトの形式のユーザ定義関数を使ってデータを交換します。関数はランタイムからメソッドパラメータとして入力オブジェクトを受け取り、結果として出力オブジェクトを返します。これらのオブジェクトはユーザ関数とランタイムコードによってアクセスされるため、ユーザコードがこれらのオブジェクトにどのようにアクセス、つまり読み込みおよび修正をするかのルールについて理解し従うことがとても重要です。

ユーザ関数はFlinkのランタイムから(MapFunctionのように)通常のメソッドパラメータとして、あるいは(GroupReduceFunctionのように)Iterable パラメータとしてのどちらかでオブジェクトを受け取ります。ランタイムがinput objectsとしてユーザ関数に渡すオブジェクトを参照します。ユーザ定義関数はFlinkのランタイムへ(MapFunctionのように)メソッドの返り値として、あるいは(FlatMapFunctionのように)Collector を使ってのどちらかでオブジェクトを発行します。ユーザ関数によってランタイムに出力オブジェクトとして出力されるオブジェクトを参照します。

FlinkのデータセットAPIは、Flinkのランタイムが入力オブジェクトをどのように生成あるいは再利用するかが異なる二つのモデルを特徴とします。この挙動はユーザ関数が入力および出力オブジェクトとやり取りできる方法についての保証および制約に影響します。以下の章はこれらのルールを定義し、安全なユーザ関数コードを書くためのコーディング ガイドラインを与えます。

オブジェクト再利用不可 (デフォルト)

デフォルトで、Flinkはオブジェクト再利用不可モードで作動します。このモードは、関数が関数呼び出しの中で常に新しい入力オブジェクトを受け取ることを確実にします。オブジェクト再利用無効モードは使用についてより良い保証と安全にします。しかし、ある程度の処理のオーバーヘッドがあり、より高頻度のJavaガベージコレクションを引き起こすかも知れません。以下の表は、ユーザ関数がどのようにオブジェクト再利用不可モードで入力および出力オブジェクトにアクセスできるかを説明します。

オペレーション 保証と制約
入力オブジェクトの読み込み メソッドの呼び出しの中で、入力オブジェクトの値が変わらないことが保証されます。これはIterableによって提供されるオブジェクトを含みます。例えば、ListあるいはMapの中でIterableによって提供される入力オブジェクトを集めても安全です。オブジェクトはメソッド呼び出しから離れた後で変更されるかも知れないことに注意してください。関数呼び出しをまたいでオブジェクトを記憶することは安全ではありません
入力オブジェクトの修正 入力オブジェクトを修正することができます。
入力オブジェクトの発行 入力オブジェクトを発行することができます。入力オブジェクトの値は発行の後で変更されるかも知れません。入力オブジェクトが発行された後でそれを読むことは 安全ではありません
出力オブジェクトの読み込み Collectorに渡されたオブジェクト、あるいはメソッドの結果として返されたオブジェクトは、値が変わっているかも知れません。出力オブジェクトを読むことは安全ではありません
出力オブジェクトの修正 オブジェクトが発行された後で修正し、それを再び発行することができます。

オブジェクト再利用不可(デフォルト)モードについてのコーディングガイドライン:

  • メソッドコールをまたいで入力オブジェクトを記録あるいは読み込まないでください。
  • 発行した後でオブジェクトを読み込まないでください。

オブジェクト再利用可

オブジェクト再利用可モードでは、Flinkのランタイムはオブジェクトのインスタンスの数を最小にします。これによりパフォーマンスを改善し、Javaガベージコレクションのプレッシャーを削減することができます。オブジェクト再利用可モードはExecutionConfig.enableObjectReuse()を呼び出すことで有効になります。以下の表は、ユーザ関数がどのようにユーザ関数がオブジェクト再利用可モードで入力オブジェクトと出力オブジェクトにアクセスできるかを説明します。

オペレーション 保証と制約
通常のメソッドパラメータとして受け取った入力オブジェクトの読み込み 通常のメソッド引数として受け取った入力オブジェクトは関数呼び出しの間に修正されません。オブジェクトはメソッド呼び出しから離れた後で変更することができます。関数呼び出しをまたいでオブジェクトを記憶することは安全ではありません
Iterableパラメータから受け取った入力オブジェクトの読み込み Iterableから受け取った入力オブジェクトはnext()メソッドが呼び出されるまでのみ有効です。IterableあるいはIteratorは同じオブジェクトインスタンスを複数回提供することができます。Iterableから受け取った入力オブジェクトを、例えばListあるいはMapに配置することで、記憶することは 安全ではありません
入力オブジェクトの修正 MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction および InputFormat.next(reuse) の入力オブジェクトを除いて、入力オブジェクトを修正するべきではありません
入力オブジェクトの発行 MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction および InputFormat.next(reuse) の入力オブジェクトを除いて、入力オブジェクトを発行するべきではありません
出力オブジェクトの読み込み Collectorに渡されたオブジェクト、あるいはメソッドの結果として返されたオブジェクトは、値が変わっているかも知れません。出力オブジェクトを読むことは安全ではありません
出力オブジェクトの修正 出力オブジェクトを修正し、再び発行することができます。

オブジェクト再利用可についてのコーディングガイドライン:

  • Iterableから受け取った入力オブジェクトを記憶しないでください。
  • メソッドコールをまたいで入力オブジェクトを記録あるいは読み込まないでください。
  • MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction および InputFormat.next(reuse)の入力オブジェクトを除いて、入力オブジェクトを修正あるいは発行しないでください。
  • オブジェクトのインスタンスを削減するために、繰り返し修正されるが読み込まれない専用の出力オブジェクトをいつでも発行することができます。

上に戻る

デバッギング

分散クラスタ内で大きなデータセットにデータ解析プログラムを実行する前に、実装アルゴリズムが期待した通りに動作することを確認することは良い考えです。 従って、データ解析プログラムの実装は通常は結果の調査、デバッグ、そして改善です。

Flink はIDE内からのローカルデバッグのサポート、テストデータの挿入、および結果データの収集によって、データ解析プログラムの開発処理を飛躍的に簡単にする2,3の良い機能を提供します。この章ではFlinkプログラムの開発をどうやって簡単にするかのいくつかのヒントを与えます。

ローカルの実行環境

LocalEnvironment は生成されたものと同じJVMプロセス内でFlinkシステムを開始します。IDEから LocalEnvironment を開始する場合、コード内にブレークポイントを設定し、簡単にプログラムをデバッグすることができます。

LocalEnvironment は以下のように生成され利用されます:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

DataSet<String> lines = env.readTextFile(pathToTextFile);
// build your program

env.execute();
val env = ExecutionEnvironment.createLocalEnvironment()

val lines = env.readTextFile(pathToTextFile)
// build your program

env.execute();

コレクションのデータソースとsink

入力ファイルを作成し出力ファイルを読み込むことで行う場合は、解析プログラムのための入力を提供し出力を調査することが厄介です。Flink はテストを簡単にするJavaコレクションで支援される特別なデータソースとシンクを備えています。プログラムがいったんテストされると、ソースとシンクはHDFSのよな外部のデータソースからの読み込み/への書き込みによって簡単に置き換えることができます。

データソースのコレクションは以下のように使うことができます:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

// Create a DataSet from a list of elements
DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataSet from any Java collection
List<Tuple2<String, Integer>> data = ...
DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataSet from an Iterator
Iterator<Long> longIt = ...
DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);

データシンクのコレクションは以下のように指定されます:

DataSet<Tuple2<String, Integer>> myResult = ...

List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>();
myResult.output(new LocalCollectionOutputFormat(outData));

注意: 現在のところ、データシンクのコレクションはデバッグツールのようなローカルの実行に制限されます。

val env = ExecutionEnvironment.createLocalEnvironment()

// Create a DataSet from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataSet from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataSet from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意: 現在のところ、データソースのコレクションはSerializableを実装するデータタイプとイテレータを必要とします。更に、データソースのコレクションは並行 ( 並行度 = 1)で実行することができません。

上に戻る

セマンティックなアノテーション

Flinkに関数の挙動についてのヒントを与えるためにセマンティックなアノテーションを使うことができます。それらは、システムに関数の入力のどのフィールドを関数が読み込み評価するか、どのフィールドを修正せずに入力から出力に転送するかを伝えます。Semantic annotations are a powerful means to speed up execution, because they allow the system to reason about reusing sort orders or partitions across multiple operations. セマンティックなアノテーションは結果的にプログラムから不要なデータのシャッフリングと不要なソートを除き、プログラムのパフォーマンスをとても改善します。

注意: セマンティックなアノテーションの使用は任意です。しかし、セマンティックなアノテーションを提供する時は確実に保守的になってください。間違ったセマンティックなアノテーションはFlinkがプログラムについて間違った仮定をさせ、結果的に間違った結果に繋がるかも知れません。オペレータの挙動が明らかに予想可能では無い場合は、アノテーションは提供されるべきではありません。ドキュメントを注意深く読んでください。

以下のセマンティックなアノテーションは現在のところサポートされます。

Forwarded Fields Annotation

Forwarded fields 情報は修正されていない入力フィールドを関数によって出力内の同じ場所あるいは他の場所に転送することを宣言します。この情報はオプティマイザによって、ソーティングやパーティショニングのようなデータのプロパティが関数によって保持されるかどうかを推論するために使われます。GroupReduce, GroupCombine, CoGroup および MapPartition のような入力要素のグループ上で操作する関数について、forwarded fields として定義された全てのフィールドは同じ入力要素から常に連帯して転送されなければなりません。group-wise関数によって発行された各要素のforwarded fields は関数の入力グループの異なる要素からもたらされるかも知れません。

Field forward 情報は フィールド表現を使って指定されます。出力内で同じ位置に転送されるフィールドはそれらの位置で指定することができます。指定される位置は入力および出力データ型に関して有効でなければならず、同じ型を持たなければなりません。例えば文字列"f2"はJava入力タプルの3つ目のフィールドが出力タプルの3つ目のフィールドと等しいことを宣言します。

出力内での他の位置に転送されるunmodifiedなフィールドは、入力内の元のフィールドと出力内の目的のフィールドをフィールド表現として指定することで宣言されます。文字列 "f0->f2" はJavaの入力タプルの最初のフィールドがJavaの出力タプルの3つ目のフィールドへ変更せずにコピーされることを示します。ワイルドカード表現 * は入力あるいは出力の全体の型を参照するために使うことができます。つまり、"f0->*" は関数の出力が常にJavano入力のタプルの最初のフィールドに等しいことを表します。

複数のforwarded フィールドは"f0; f2->f1; f3->f2"のようにセミコロンで分割した1つの文字列、あるいは分割文字列 "f0", "f2->f1", "f3->f2" で宣言することができます。forwarded フィールドを指定する時には、全てのforwardedフィールドは必要ではありませんが、全ての宣言が正しくなければなりません。

Forwarded フィールドの情報は関数クラス定義のJavaアノテーションのアタッチ、あるいは以下で示すようにデータセット上の関数を起動した後でそれらをオペレータの引数として渡すことによって宣言することができます。

関数クラス アノテーション
  • @ForwardedFields MapおよびReduceのような1つの入力関数。
  • @ForwardedFieldsFirst JoinおよびCoGroupのような2つの入力を持つ関数の最初の入力。
  • @ForwardedFieldsSecond JoinおよびCoGroupのような2つの入力を持つ関数の2つ目の入力。
オペレータの引数
  • data.map(myMapFnc).withForwardedFields() MapおよびReduceのような1つの入力関数。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst() JoinおよびCoGroupのような2つの入力を持つ関数の最初の入力。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond() JoinおよびCoGroupのような2つの入力を持つ関数の2つ目の入力。

オペレータ引数とよるクラスのアノテーションとして指定されたフィールド forward情報は上書くことができないことに注意してください。

以下の例は関数クラスアノテーションを使ってforwardedフィールド情報を宣言する方法を示します:

@ForwardedFields("f0->f2")
public class MyMap implements
              MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>> {
  @Override
  public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
  }
}
@ForwardedFields("_1->_3")
class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
   def map(value: (Int, Int)): (String, Int, Int) = {
    return ("foo", value._2 / 2, value._1)
  }
}

Non-Forwarded フィールド

Non-forwarded フィールド情報は関数の出力内での同じ位置で保持されない全てのフィールドを宣言します。他の全てのフィールドの値は出力内で同じ位置を保持すると見なされます。従って、non-forwarded フィールド情報はforwardedフィールド情報と逆です。GroupReduce, GroupCombine, CoGroup および MapPartitionのようなグループ単位のオペレータのためのNon-forwarded フィールド情報は、forwardedフィールド情報については同じ要求を満たす必要があります。

重要: non-forwarded フィールド情報の仕様は任意です。しかし、使われた場合、他の全てのフィールドは決まった位置に転送されると見なされるため、全ての!non-forwarded フィールドは指定されなければなりません。forwarded フィールドをnon-forwardedとして宣言したほうがよいでしょう。

Non-forwarded フィル度はフィールド表現のリストとして指定されます。リストはセミコロンで分割されたフィールド表現を持つ1つの文字列、あるいは複数の文字列のどちらかで与えることができます。例えば、"f1; f3""f1", "f3" の両方ともJavaタプルの2番目と4番目のフィールドが同じ位置に保持されず、他の全てのフィールドが同じ位置に保持されることを宣言します。Non-forwarded フィールド情報は全く同じ入力と出力の型を持つ関数についてのみ指定することができます。

Non-forwarded フィールド情報は以下のアノテーションを使って関数クラスアノテーションとして指定されます:

  • @NonForwardedFields MapおよびReduceのような1つの入力関数。
  • @NonForwardedFieldsFirst JoinおよびCoGroupのような2つの入力を持つ関数の最初の入力。
  • @NonForwardedFieldsSecond JoinおよびCoGroupのような2つの入力を持つ関数の2つ目の入力。

以下の例はnon-forwarded フィールド情報を宣言する方法を示します:

@NonForwardedFields("f1") // second field is not forwarded
public class MyMap implements
              MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
  }
}
@NonForwardedFields("_2") // second field is not forwarded
class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
  def map(value: (Int, Int)): (Int, Int) = {
    return (value._1, value._2 / 2)
  }
}

フィールドの読み込み

フィールドの読み込み情報は関数によってアクセスされ評価される全てのフィールドを宣言します。つまり、全てのフィールドは結果を計算するために関数によって使われます。例えば、条件文で評価される、あるいは計算のために使われるフィールドは、読み込みフィールド情報を指定する時にreadとしてマークされなければなりません。Fields which are only unmodified forwarded to the output without evaluating their values or fields which are not accessed at all are not considered to be read.

重要: 読み込みフィールド情報の仕様は任意です。しかし、使われた場合、全て!の読み込みフィールドは指定されなければなりません。非readフィールドをreadとして宣言したほうがよいでしょう。

Read フィールドはフィールド表現のリストとして指定されます。リストはセミコロンで分割されたフィールド表現を持つ1つの文字列、あるいは複数の文字列のどちらかで与えることができます。例えば、"f1; f3""f1", "f3" の両方ともJavaタプルの2番目と4番目のフィールドがreadであり、関数によって評価されることを宣言します。

読み込みフィールド情報は以下のアノテーションを使って関数クラスアノテーションとして指定されます:

  • @ReadFields MapおよびReduceのような1つの入力関数。
  • @ReadFieldsFirst JoinおよびCoGroupのような2つの入力を持つ関数の最初の入力。
  • @ReadFieldsSecond JoinおよびCoGroupのような2つの入力を持つ関数の最初の入力。

以下の例はreadフィールド情報を宣言する方法を示します:

@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function.
public class MyMap implements
              MapFunction<Tuple4<Integer, Integer, Integer, Integer>,
                          Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> val) {
    if(val.f0 == 42) {
      return new Tuple2<Integer, Integer>(val.f0, val.f1);
    } else {
      return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
    }
  }
}
@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
    if (value._1 == 42) {
      return (value._1, value._2)
    } else {
      return (value._4 + 10, value._2)
    }
  }
}

上に戻る

ブロードキャスト変数

ブロードキャスト変数により、操作の通常の入力に加えて、データが操作の全ての並行インスタンスに利用可能にします。これは予備のデータセット、あるいはデータに依存するパラメータ化として役に立ちます。データセットはコレクションとしてオペレータでアクセス可能でしょう。

  • ブロードキャスト: ブロードキャストセットはwithBroadcastSet(DataSet, String)を使って名前で登録されます。
  • アクセス: 目的のオペレータにおいてgetRuntimeContext().getBroadcastVariable(String) を使ってアクセス可能です。
// 1. The DataSet to be broadcasted
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcasted DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

ブロードキャストされたデータセットを登録およびアクセスする場合には、名前 (前の例ではbroadcastSetName ) が一致するようにします。完全な例のプログラムについては、K-Means Algorithmを見てください。

// 1. The DataSet to be broadcasted
val toBroadcast = env.fromElements(1, 2, 3)

val data = env.fromElements("a", "b")

data.map(new RichMapFunction[String, String]() {
    var broadcastSet: Traversable[String] = null

    override def open(config: Configuration): Unit = {
      // 3. Access the broadcasted DataSet as a Collection
      broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
    }

    def map(in: String): String = {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

ブロードキャストされたデータセットを登録およびアクセスする場合には、名前 (前の例ではbroadcastSetName ) が一致するようにします。完全な例のプログラムについては、KMeansアルゴリズムを見てください。

注意: ブロードキャスト変数の内容は各ノード上のインメモリに保存されます。あまりに大きくするべきではありません。スカラ値のようにシンプルにするために、パラメータを関数のクロージャーの一部分に単純にするか、設定内でwithParameters(...)を渡すことができます。

上に戻る

分散キャッシュ

Flink はApache Hadoopのように、ファイルがユーザ関数の並行インスタンスにローカルにアクセス可能にするために分散キャッシュを提供します。この機能はディクショナリあるいは機械学習回帰モデルのような静的な外部データを含むファイルを共有するために使うことができます。

キャッシュは以下のように動作します。プログラムはExecutionEnvironment の中の特定の名前の下のローカルあるいはHDFSあるいはS3のようなリモートファイルシステムのファイルあるいはディレクトリを、キャッシュファイルとして登録します。プログラムが実行される時、Flinkは自動的にファイルあるいはディレクトリを全てのworkerののローカルファイルシステムにコピーします。ユーザ関数は特定の名前の下のファイルあるいはディレクトリを調べ、ワーカーのローカルファイルシステムからそれにアクセスすることができます。

分散キャッシュは以下のように使われます:

ExecutionEnvironmentの中のファイルあるいはディレクトリを登録する。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

ユーザ関数(ここでは MapFunction)内のキャッシュされたファイルあるいはディレクトリにアクセスする。関数はRuntimeContextにアクセスする必要があるため、RichFunctionクラスを継承する必要があります。

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {

      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}

ExecutionEnvironmentの中のファイルあるいはディレクトリを登録する。

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

ユーザ関数(ここでは MapFunction)内のキャッシュされたファイルにアクセスする。関数はRuntimeContextにアクセスする必要があるため、RichFunctionクラスを継承する必要があります。

// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

上に戻る

関数へのパラメータの渡し方

パラメータはコンストラクタあるいはwithParameters(Configuration)メソッドを使って関数に渡すことができます。パラメータは関数オブジェクトの一部としてシリアライズ化され、全ての並行のタスクインスタンスに送信されます。

コマンドライン引数を関数に渡す方法についてのベストプラクティスガイドもチェックしてください。

コンストラクタを使って

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

toFilter.filter(new MyFilter(2));

private static class MyFilter implements FilterFunction<Integer> {

  private final int limit;

  public MyFilter(int limit) {
    this.limit = limit;
  }

  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}
val toFilter = env.fromElements(1, 2, 3)

toFilter.filter(new MyFilter(2))

class MyFilter(limit: Int) extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = {
    value > limit
  }
}

withParameters(Configuration)を使って

このメソッドは引数として設定オブジェクトを取ります。これはrich functionopen()メソッドへ渡されるでしょう。設定オブジェクトは文字列キーから異なる値の型へのMapです。

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

Configuration config = new Configuration();
config.setInteger("limit", 2);

toFilter.filter(new RichFilterFunction<Integer>() {
    private int limit;

    @Override
    public void open(Configuration parameters) throws Exception {
      limit = parameters.getInteger("limit", 0);
    }

    @Override
    public boolean filter(Integer value) throws Exception {
      return value > limit;
    }
}).withParameters(config);
val toFilter = env.fromElements(1, 2, 3)

val c = new Configuration()
c.setInteger("limit", 2)

toFilter.filter(new RichFilterFunction[Int]() {
    var limit = 0

    override def open(config: Configuration): Unit = {
      limit = config.getInteger("limit", 0)
    }

    def filter(in: Int): Boolean = {
        in > limit
    }
}).withParameters(c)

大域的にExecutionConfigを使って

Flinkは独自の設定値を環境のExecutionConfig インタフェースに渡すこともできます。実行設定は全ての (rich) ユーザ定義関数内でアクセス可能なため、独自の設定は全ての関数内でグローバルに利用可能でしょう。

独自のグローバル設定の設定

Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);
val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)

ExecutionConfig.GlobalJobParameters クラスを継承する独自のクラスを、グローバルなジョブパラメータとして実行設定に渡すことも可能です。インタフェースにより、webフロントエンド内の設定からの値を次に示すだろうMap<String, String> toMap() メソッドを実装することができます。

グローバル設定からの値へのアクセス

グローバルジョブのパラメータ内のオブジェクトはシステム内の多くの場所でアクセス可能です。RichFunctionインタフェースを実装する全てのユーザ定義関数は実行時コンテキストを使ってアクセスします。

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private String mykey;
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
      Configuration globConf = (Configuration) globalParams;
      mykey = globConf.getString("mykey", null);
    }
    // ... more here ...

上に戻る

TOP
inserted by FC2 system