データセットの変換

このドキュメントはデータセット上で利用可能な変換の深い部分まで説明します。Flink Java APIの一般的な説明については、プログラミング ガイドを参照してください。

denseインデックスを持つデータセット内の要素のジッピングについては、Zip 要素 ガイドを参照してください。

Map

Map変換はユーザ定義のmap関数をデータセットの各要素に適用します。それはone-to-oneマッピングを実装します。つまり、確実に1つの要素が関数によって返されなければなりません。

以下のコードはIntegerのペアのデータセットがIntegerのデータセットに変換されます:

// MapFunction that adds two integer values
public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
  @Override
  public Integer map(Tuple2<Integer, Integer> in) {
    return in.f0 + in.f1;
  }
}

// [...]
DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
DataSet<Integer> intSums = intPairs.map(new IntAdder());
val intPairs: DataSet[(Int, Int)] = // [...]
val intSums = intPairs.map { pair => pair._1 + pair._2 }
 intSums = intPairs.map(lambda x: sum(x))

FlatMap

FlatMap変換はユーザ定義のflat-map関数をデータセットの各要素に適用します。map関数のこの変種は各入力要素についての任意の多くの結果要素(noneを含む)を返すことができます。

以下のコードはテキスト行のデータセットを単語のデータセットに変換します:

// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String, String> {
  @Override
  public void flatMap(String value, Collector<String> out) {
    for (String token : value.split("\\W")) {
      out.collect(token);
    }
  }
}

// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());
val textLines: DataSet[String] = // [...]
val words = textLines.flatMap { _.split(" ") }
 words = lines.flat_map(lambda x,c: [line.split() for line in x])

MapPartition

MapPartition は1つの関数呼び出しの中の並行パーティションを変換します。map-partition 関数は Iterable としてパーティションを取り、任意の数の結果値を生成することができます。各パーティション内の要素の数は並行度の度合と以前のオペレーションに依存します。

以下のコードはテキスト行のデータセットをパーティションごとのカウントのデータセットに変換します:

public class PartitionCounter implements MapPartitionFunction<String, Long> {

  public void mapPartition(Iterable<String> values, Collector<Long> out) {
    long c = 0;
    for (String s : values) {
      c++;
    }
    out.collect(c);
  }
}

// [...]
DataSet<String> textLines = // [...]
DataSet<Long> counts = textLines.mapPartition(new PartitionCounter());
val textLines: DataSet[String] = // [...]
// Some is required because the return value must be a Collection.
// There is an implicit conversion from Option to a Collection.
val counts = texLines.mapPartition { in => Some(in.size) }
 counts = lines.map_partition(lambda x,c: [sum(1 for _ in x)])

フィルター

Filter 変換はデータセットの各要素にユーザ定義のfilter関数を適用し、関数がtrueを返すそれらの要素だけを保持します。

以下のコードはデータセットから0より小さい全てのIntegerを取り除きます:

// FilterFunction that filters out all Integers smaller than zero.
public class NaturalNumberFilter implements FilterFunction<Integer> {
  @Override
  public boolean filter(Integer number) {
    return number >= 0;
  }
}

// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
val intNumbers: DataSet[Int] = // [...]
val naturalNumbers = intNumbers.filter { _ > 0 }
 naturalNumbers = intNumbers.filter(lambda x: x > 0)

重要: システムはその関数が述語が適用される要素を修正しないと仮定します。この仮定を破ると間違った結果に繋がるかも知れません。

タプル データセットの写像

プロジェクトの変換はタプルのデータセットのタプルフィールドを削除あるいは移動します。project(int...) メソッドは、インデックスによって保持されうべきタプルフィールドを選択し、出力のタプルの中でそれらの順番を定義します。

Projection はユーザ関数の定義を必要としません。

以下のコードはデータセット上のProject変換を適用するための異なる方法を示します:

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);

型ヒントを使ったProjection

Javaコンパイラはproject オペレータの返りの型を推測できないことに注意してください。これは以下のようなproject オペレータの結果に他のオペレータを呼び出す場合に問題を起こすかも知れません:

DataSet<Tuple5<String,String,String,String,String>> ds = ....
DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);

この問題は以下のようにproject オペレータの返りの型をほのめかすことで克服することができるかも知れません:

DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);
Not supported.
out = in.project(2,0);

グループ化されたデータセットのTransformation

reduceオペレーションはグループ化されたデータセット上で操作することができます。グルーピングするために使われるキーを指定するには多くの方法があります:

  • キー表現
  • キー セレクタ関数
  • 1つ上のフィールドの場所のキー(タプル データセットのみ)
  • Case クラス フィールド (Case クラスのみ)

グルーピング キーがどうやって指定されるかを見るために、reduceの例を見てみてください。

グループ化されたデータセットのreduce

グループ化されたデータセットに適用されるReduce変換は、ユーザ定義のreduce関数を使って各グループを1つの要素に減らします。入力要素の各グループについては、各グループごとに1つの要素だけが残るまで、reduce関数が連続して要素のペアを1つの要素に組み合わせます。

ReduceFunctionについては、返されたオブジェクトの固定されたフィールドが入力値に一致しなければならないことに注意してください。これは、reduceは暗黙的に組み合わせ可能でありcombineオペレータから出力されたオブジェクトはreduceオペレータに渡された時にキーによってグループ化されるからです。

キー表現によってグループ化されたデータセット上でのreduce

キー表現はデータセットの各要素の1つ以上のフィールドを指定します。各キー表現はpublicなフィールドあるいはgetterメソッドの名前のいずれかです。オブジェクトをドリルダウンするためにdotを使うことができます。キー表現 “*” は全てのフィールドを選択します。以下のコードはキー表現を使ってPOJOデータセットをグループ化およびreduce関数を使ってreduceする方法を示します。

// some ordinary POJO
public class WC {
  public String word;
  public int count;
  // [...]
}

// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
  @Override
  public WC reduce(WC in1, WC in2) {
    return new WC(in1.word, in1.count + in2.count);
  }
}

// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
                         // DataSet grouping on field "word"
                         .groupBy("word")
                         // apply ReduceFunction on grouped DataSet
                         .reduce(new WordCounter());
// some ordinary POJO
class WC(val word: String, val count: Int) {
  def this() {
    this(null, -1)
  }
  // [...]
}

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce {
  (w1, w2) => new WC(w1.word, w1.count + w2.count)
}
Not supported.

KeySelector 関数によってグループ化されたデータセットのreduce

key-selector 関数はデータセットの各様からキーの値を抽出します。抽出されたキーの値はデータセットをグループ化するために使われます。以下のコードはkey-selector関数を使ってPOJOデータセットをグループ化およびreduce関数を使ってreduceする方法を示します。

// some ordinary POJO
public class WC {
  public String word;
  public int count;
  // [...]
}

// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
  @Override
  public WC reduce(WC in1, WC in2) {
    return new WC(in1.word, in1.count + in2.count);
  }
}

// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
                         // DataSet grouping on field "word"
                         .groupBy(new SelectWord())
                         // apply ReduceFunction on grouped DataSet
                         .reduce(new WordCounter());

public class SelectWord implements KeySelector<WC, String> {
  @Override
  public String getKey(Word w) {
    return w.word;
  }
}
// some ordinary POJO
class WC(val word: String, val count: Int) {
  def this() {
    this(null, -1)
  }
  // [...]
}

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy { _.word } reduce {
  (w1, w2) => new WC(w1.word, w1.count + w2.count)
}
class WordCounter(ReduceFunction):
    def reduce(self, in1, in2):
        return (in1[0], in1[1] + in2[1])

words = // [...]
wordCounts = words \
    .group_by(lambda x: x[0]) \
    .reduce(WordCounter())

フィールド位置キーを使ってグループ化されたデータセットをreduce (タプル データセットのみ)

フィールド位置キーはグルーピングキーとして使われた1つ以上のタプルデータセットのフィールドを指定します。以下のコードはフィールド位置キーをどうやって使い、reduce関数を適用するかを示します。

DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
DataSet<Tuple3<String, Integer, Double>> reducedTuples = tuples
                                         // group DataSet on first and second field of Tuple
                                         .groupBy(0, 1)
                                         // apply ReduceFunction on grouped DataSet
                                         .reduce(new MyTupleReducer());
val tuples = DataSet[(String, Int, Double)] = // [...]
// group on the first and second Tuple field
val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
 reducedTuples = tuples.group_by(0, 1).reduce( ... )

Caseクラスフィールドを使ってグループ化されたデータセットをreduce

Caseクラスを使う場合は、フィールドの名前を使ってグルーピングキーを指定することもできます。

Not supported.
case class MyClass(val a: String, b: Int, c: Double)
val tuples = DataSet[MyClass] = // [...]
// group on the first and second field
val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
Not supported.

グループ化されたデータセットのGroupReduce

グループ化されたデータセットに適用されるGroupReduce変換は各グループについてユーザ定義の group-reduce 関数を呼びます。これと Reduce の違いはユーザ定義関数が一度にグループ全体を取得することです。関数はグループの全ての要素に対してIterableを伴って起動され、任意の数の結果要素を返すことができます。

Field Position Keys (Tuple DataSets のみ)によってグループ化されたデータセット上の GroupReduce

以下のコードは、Integerによってグループ化されたデータセットからどのように複製された文字列を削除することができるかを示します。

public class DistinctReduce
         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {

  @Override
  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {

    Set<String> uniqStrings = new HashSet<String>();
    Integer key = null;

    // add all strings of the group to the set
    for (Tuple2<Integer, String> t : in) {
      key = t.f0;
      uniqStrings.add(t.f1);
    }

    // emit all unique strings.
    for (String s : uniqStrings) {
      out.collect(new Tuple2<Integer, String>(key, s));
    }
  }
}

// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Tuple2<Integer, String>> output = input
                           .groupBy(0)            // group DataSet by the first tuple field
                           .reduceGroup(new DistinctReduce());  // apply GroupReduceFunction
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).reduceGroup {
      (in, out: Collector[(Int, String)]) =>
        in.toSet foreach (out.collect)
    }
 class DistinctReduce(GroupReduceFunction):
   def reduce(self, iterator, collector):
     dic = dict()
     for value in iterator:
       dic[value[1]] = 1
     for key in dic.keys():
       collector.collect(key)

 output = data.group_by(0).reduce_group(DistinctReduce())

Key Expression、KeySelector 関数、あるいは Case Class Fields によってグループ化されたデータセット上のGroupReduce

Reduce 変換での key 表現key-selector 関数、およびcase class fieldsと類似した動作をします。

ソートされたグループのGroupReduce

group-reduce 関数はIterableを使ってグループの要素にアクセスします。任意で、Iterableは特定の順序でグループの要素を分配することができます。多くの場合、これはユーザ定義のgroup-reduce関数の複雑さを減らすことに役立ち、その効率性を改善することができます。

以下のコードはIntegerでグループ化され文字列でソートされたデータセット内の重複した文字列を削除する別の例を示します。

// GroupReduceFunction that removes consecutive identical elements
public class DistinctReduce
         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {

  @Override
  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
    Integer key = null;
    String comp = null;

    for (Tuple2<Integer, String> t : in) {
      key = t.f0;
      String next = t.f1;

      // check if strings are different
      if (com == null || !next.equals(comp)) {
        out.collect(new Tuple2<Integer, String>(key, next));
        comp = next;
      }
    }
  }
}

// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Double> output = input
                         .groupBy(0)                         // group DataSet by first field
                         .sortGroup(1, Order.ASCENDING)      // sort groups on second tuple field
                         .reduceGroup(new DistinctReduce());
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
      (in, out: Collector[(Int, String)]) =>
        var prev: (Int, String) = null
        for (t <- in) {
          if (prev == null || prev != t)
            out.collect(t)
        }
    }
 class DistinctReduce(GroupReduceFunction):
   def reduce(self, iterator, collector):
     dic = dict()
     for value in iterator:
       dic[value[1]] = 1
     for key in dic.keys():
       collector.collect(key)

 output = data.group_by(0).sort_group(1, Order.ASCENDING).reduce_group(DistinctReduce())

Note: A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.

結合可能な GroupReduceFunctions

reduce 関数とは違って、group-reduce 関数は暗黙的に結合可能ではありません。group-reduce 関数を結合可能にするには、GroupCombineFunction インタフェースを実装しなければなりません。

重要: GroupCombineFunctionインタフェースの一般的な入力および出力型は、以下の例で示されるようにGroupReduceFunctionの一般的な入力型と等しくなければなりません:

// Combinable GroupReduceFunction that computes a sum.
public class MyCombinableGroupReducer implements
  GroupReduceFunction<Tuple2<String, Integer>, String>,
  GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
{
  @Override
  public void reduce(Iterable<Tuple2<String, Integer>> in,
                     Collector<String> out) {

    String key = null;
    int sum = 0;

    for (Tuple2<String, Integer> curr : in) {
      key = curr.f0;
      sum += curr.f1;
    }
    // concat key and sum and emit
    out.collect(key + "-" + sum);
  }

  @Override
  public void combine(Iterable<Tuple2<String, Integer>> in,
                      Collector<Tuple2<String, Integer>> out) {
    String key = null;
    int sum = 0;

    for (Tuple2<String, Integer> curr : in) {
      key = curr.f0;
      sum += curr.f1;
    }
    // emit tuple with key and sum
    out.collect(new Tuple2<>(key, sum));
  }
}
// Combinable GroupReduceFunction that computes two sums.
class MyCombinableGroupReducer
  extends GroupReduceFunction[(String, Int), String]
  with GroupCombineFunction[(String, Int), (String, Int)]
{
  override def reduce(
    in: java.lang.Iterable[(String, Int)],
    out: Collector[String]): Unit =
  {
    val r: (String, Int) =
      in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
    // concat key and sum and emit
    out.collect (r._1 + "-" + r._2)
  }

  override def combine(
    in: java.lang.Iterable[(String, Int)],
    out: Collector[(String, Int)]): Unit =
  {
    val r: (String, Int) =
      in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
    // emit tuple with key and sum
    out.collect(r)
  }
}
 class GroupReduce(GroupReduceFunction):
   def reduce(self, iterator, collector):
     key, int_sum = iterator.next()
     for value in iterator:
       int_sum += value[1]
     collector.collect(key + "-" + int_sum))

   def combine(self, iterator, collector):
     key, int_sum = iterator.next()
     for value in iterator:
       int_sum += value[1]
     collector.collect((key, int_sum))

data.reduce_group(GroupReduce(), combinable=True)

グループ化されたデータセットのGroupCombine

GroupCombine 変換は結合可能なGroupReduceFunctionの結合ステップの一般化された形式です。それは入力型I を任意の出力型Oに結合できるという意味で一般化されます。対照的に、GroupReduceの結合ステップは入力型I から出力型 Iへの結合のみを許可します。これはGroupReduceFunctionのreduceステップが入力型 Iを期待するからです。

幾つかのアプリケーションでは、追加の変換を実施する前にデータセットを中間形式に結合するのが望ましいです (例えば、データサイズを削減するため)。これはCombineGroup変換を使ってほんの少しのコストで行うことができます。

注意: グループ化されたデータセット上のGroupCombineは、一度に全てのデータを処理しないかもしれないが複数のステップ内で処理する貪欲な戦略を使ってメモリ内で行われます。GroupReduce変換のようにデータの交換無しに個々のパーティション上でも行われます。これは部分的な結果につながるかも知れません。

以下の例はWordCount実装に取って代わるCombineGroup変換の使い方を実演します。

DataSet<String> input = [..] // The words received as input

DataSet<Tuple2<String, Integer>> combinedWords = input
  .groupBy(0); // group identical words
  .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {

    public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
        String key = null;
        int count = 0;

        for (String word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});

DataSet<Tuple2<String, Integer>> output = combinedWords
  .groupBy(0);                             // group by words again
  .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange

    public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
        String key = null;
        int count = 0;

        for (Tuple2<String, Integer> word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});
val input: DataSet[String] = [..] // The words received as input

val combinedWords: DataSet[(String, Int)] = input
  .groupBy(0)
  .combineGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var count = 0

        for (word <- words) {
            key = word
            count += 1
        }
        out.collect((key, count))
}

val output: DataSet[(String, Int)] = combinedWords
  .groupBy(0)
  .reduceGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var sum = 0

        for ((word, sum) <- words) {
            key = word
            sum += count
        }
        out.collect((key, sum))
}
Not supported.

上の代替のWordCount実装は、GroupCombineがGroupReduce変換を行う前にどうやって結合するかを実演します。上の例は単なる概念の証明です。GroupReduceの前に通常は追加のMap変換を必要とするだろうデータセットの型を、結合ステップがどう変更するかに注意してください。

グループ化されたタプルのデータセットのAggregate

よく使われる一般的な集約操作の幾つかです。集約変換は以下の組み込みの集約関数を提供します:

  • Sum,
  • Min, and
  • Max.

集約変換はタプルデータセット上にのみ適用することができ、グルーピングのためにフィールド位置キーのみをサポートします。

以下のコードは集約変換をフィールド位置キーによってグループ化されたデータセット上に適用する方法を示します:

DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
                                   .groupBy(1)        // group DataSet on second field
                                   .aggregate(SUM, 0) // compute sum of the first field
                                   .and(MIN, 2);      // compute minimum of the third field
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
from flink.functions.Aggregation import Sum, Min

input = # [...]
output = input.group_by(1).aggregate(Sum, 0).and_agg(Min, 2)

複数の集約をデータセットに適用するには、最初の集約の後に.and() 関数を使う必要があります。このことは.aggregate(SUM, 0).and(MIN, 2)はフィールド0と元のデータセットのフィールド2の最小の合計を生成することを意味します。対照的に、.aggregate(SUM, 0).aggregate(MIN, 2) は集約上に集約を適用するでしょう。指定された例では、フィールド1によってグループ化されたフィールド2の合計を計算した後で、フィールド2の最小を生成します。

注意: 集約関数のセットは将来的に拡張されるでしょう。

グループ化されたタプルのデータセット上のMinBy / MaxBy

MinBy (MaxBy) 変換はタプルの各グループについて1つのタプルを選択します。選択されたタプルは1つ以上の指定されたフィールドが最小 (最大)の値のタプルです。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小 (最大)のフィールド値を持つ場合、それらのタプルの任意のタプルが返されます。

以下のコードは、DataSet<Tuple3<Integer, String, Double>>からの同じString値を持つ各タプルグループについてのInteger および Double フィールドについて最小の値を持つタプルを選択する方法を示します:

DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
                                   .groupBy(1)   // group DataSet on second field
                                   .minBy(0, 2); // select tuple with minimum values for first and third field.
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input
                                   .groupBy(1)  // group DataSet on second field
                                   .minBy(0, 2) // select tuple with minimum values for first and third field.
Not supported.

完全なデータセットのReduce

Reduce変換はユーザ定義のreduce関数をデータセットの各要素に適用します。1つの要素だけが残るまで、reduce関数が連続して要素のペアを1つの要素に組み合わせます。

以下のコードはIntegerのデータセットの全ての要素を合計する方法を示します:

// ReduceFunction that sums Integers
public class IntSummer implements ReduceFunction<Integer> {
  @Override
  public Integer reduce(Integer num1, Integer num2) {
    return num1 + num2;
  }
}

// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
val intNumbers = env.fromElements(1,2,3)
val sum = intNumbers.reduce (_ + _)
 intNumbers = env.from_elements(1,2,3)
 sum = intNumbers.reduce(lambda x,y: x + y)

Reduce変換を使ってデータセット全体をreduceすることは、最終のReduce操作が並行して行うことができないことを意味します。However, a reduce function is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.

完全なデータセットのGroupReduce

GroupReduce変換はユーザ定義のgroup-reduce関数をデータセットの全ての要素に適用します。group-reduce はデータセットの全ての要素上で繰り返し、任意の数の結果要素を返します。

以下の例はデータセット全体のGroupReduce変換をどうやって適用するかを示します:

DataSet<Integer> input = // [...]
// apply a (preferably combinable) GroupReduceFunction to a DataSet
DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
val input: DataSet[Int] = // [...]
val output = input.reduceGroup(new MyGroupReducer())
 output = data.reduce_group(MyGroupReducer())

注意: group-reduce関数が結合可能でなければ、データセット全体のGroupReduce変換は行うことができません。したがって、これはコンピュータにとても向いている操作になりえます。稀有豪可能なgroup-reduce関数を実装する方法を学ぶには、上の“結合可能なGroupReduceFunctions” の段落を見てください。

完全なデータセットのGroupCombine

データセット全体のGroupCombineはグループ化されたデータセット上のGroupCombineと似た動作をします。データは全てのノード上にパーティション化され、その後に貪欲的な形式で結合されます (つまり、メモリに収まっているデータのみが一度に結合されます)。

完全なタプルデータセットのAggregate

よく使われる一般的な集約操作の幾つかです。集約変換は以下の組み込みの集約関数を提供します:

  • Sum,
  • Min, and
  • Max.

集約変換はタプルのデータセットにのみ適用することができます。

以下のコードはデータセット全体のAggregation変換をどうやって適用するかを示します:

DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input
                                     .aggregate(SUM, 0)    // compute sum of the first field
                                     .and(MIN, 1);    // compute minimum of the second field
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.aggregate(SUM, 0).and(MIN, 2)
from flink.functions.Aggregation import Sum, Min

input = # [...]
output = input.aggregate(Sum, 0).and_agg(Min, 2)

注意: サポートされる集約関数のセットの拡張がロードマップにあります。

完全なタプルデータセットのMinBy / MaxBy

MinBy (MaxBy) 変換はタプルのデータセットから1つのタプルを選択します。選択されたタプルは1つ以上の指定されたフィールドが最小 (最大)の値のタプルです。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小 (最大)のフィールド値を持つ場合、それらのタプルの任意のタプルが返されます。

以下のコードは、DataSet<Tuple3<Integer, String, Double>>から Integer および Double フィールドについて最大値を持つタプルを選択する方法を示します:

DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
                                   .maxBy(0, 2); // select tuple with maximum values for first and third field.
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input                          
                                   .maxBy(0, 2) // select tuple with maximum values for first and third field.
Not supported.

Distinct

Distinct変換は元のデータセットの明確に異なる要素のデータセットを計算します。以下のコードはデータセットから全ての重複する要素を削除します:

DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input.distinct();
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.distinct()
Not supported.

以下のようにしてデータセット内の要素の識別が決定されるかを変更することも可能です:

  • 1つ以上のフィールド位置のキー (タプルデータセットのみ)、
  • key-selector 関数、あるいは
  • キー表現。

フィールド位置のキーを使った識別

DataSet<Tuple2<Integer, Double, String>> input = // [...]
DataSet<Tuple2<Integer, Double, String>> output = input.distinct(0,2);
val input: DataSet[(Int, Double, String)] = // [...]
val output = input.distinct(0,2)
Not supported.

KeySelector 関数を使った識別

private static class AbsSelector implements KeySelector<Integer, Integer> {
private static final long serialVersionUID = 1L;
	@Override
	public Integer getKey(Integer t) {
    	return Math.abs(t);
	}
}
DataSet<Integer> input = // [...]
DataSet<Integer> output = input.distinct(new AbsSelector());
val input: DataSet[Int] = // [...]
val output = input.distinct {x => Math.abs(x)}
Not supported.

キー表現を使った識別

// some ordinary POJO
public class CustomType {
  public String aName;
  public int aNumber;
  // [...]
}

DataSet<CustomType> input = // [...]
DataSet<CustomType> output = input.distinct("aName", "aNumber");
// some ordinary POJO
case class CustomType(aName : String, aNumber : Int) { }

val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber")
Not supported.

ワイルドカード文字を使って全てのフィールドを使うように指示することも可能です:

DataSet<CustomType> input = // [...]
DataSet<CustomType> output = input.distinct("*");
// some ordinary POJO
val input: DataSet[CustomType] = // [...]
val output = input.distinct("_")
Not supported.

Join

Join変換は2つのデータセットを1つのデータセットにjoinします。両方のデータセットの要素は、以下を使って指定することができる1つ以上のキーにjoinされます。

  • キー表現
  • キー セレクタ関数
  • 1つ以上のフィールド位置キー (タプルデータセットのみ)。
  • Case Class フィールド

以下で示されるようにJoin変換を行う2,3の異なる方法があります。

デフォルトのJoin (Tuple2へのJoin)

デフォルトのJoin変換は二つのフィールドを持つ新しいタプルデータセットを生成します。各タプルは、最初のタプルフィールド内の最初の入力データセットのjoinされた要素と、2つ目のフィールド内の2つ目の入力データセットの合致する要素を保持します。

以下のコードはフィールド位置キーを使ったデフォルトのJoin変換を示します:

public static class User { public String name; public int zip; }
public static class Store { public Manager mgr; public int zip; }
DataSet<User> input1 = // [...]
DataSet<Store> input2 = // [...]
// result dataset is typed as Tuple2
DataSet<Tuple2<User, Store>>
            result = input1.join(input2)
                           .where("zip")       // key of the first input (users)
                           .equalTo("zip");    // key of the second input (stores)
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Double, Int)] = // [...]
val result = input1.join(input2).where(0).equalTo(1)
 result = input1.join(input2).where(0).equal_to(1)

Join関数を使ったJoin

Join変換はjoinするタプルを処理するためにユーザ定義のjoin関数を呼ぶこともできます。join関数は最初の入力データセットの1つの要素と2つ目の入力データセットの1つの要素を受け取り、正確に1つの要素を返します。

以下のコードはキー選択関数を使って独自のjavaオブジェクトとタプルデータセットを持つデータセットのjoinを実施し、ユーザ定義のjoin関数を使う方法を示します:

// some POJO
public class Rating {
  public String name;
  public String category;
  public int points;
}

// Join function that joins a custom POJO with a Tuple
public class PointWeighter
         implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {

  @Override
  public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
    // multiply the points and rating and construct a new output tuple
    return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
  }
}

DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Double>> weights = // [...]
DataSet<Tuple2<String, Double>>
            weightedRatings =
            ratings.join(weights)

                   // key of the first input
                   .where("category")

                   // key of the second input
                   .equalTo("f0")

                   // applying the JoinFunction on joining pairs
                   .with(new PointWeighter());
case class Rating(name: String, category: String, points: Int)

val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]

val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
  (rating, weight) => (rating.name, rating.points * weight._2)
}
 class PointWeighter(JoinFunction):
   def join(self, rating, weight):
     return (rating[0], rating[1] * weight[1])
       if value1[3]:

 weightedRatings =
   ratings.join(weights).where(0).equal_to(0). \
   with(new PointWeighter());

Flat-Join関数を使ったJoin

MapとFlatMapに似て、FlatJoinはJoinとして同じ方法で振る舞いますが、1つの要素を返す代わりに、0, 1 以上の要素を返す(集める)ことができます。

public class PointWeighter
         implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
  @Override
  public void join(Rating rating, Tuple2<String, Double> weight,
	  Collector<Tuple2<String, Double>> out) {
	if (weight.f1 > 0.1) {
		out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
	}
  }
}

DataSet<Tuple2<String, Double>>
            weightedRatings =
            ratings.join(weights) // [...]
case class Rating(name: String, category: String, points: Int)

val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]

val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
  (rating, weight, out: Collector[(String, Double)]) =>
    if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
}

Not supported.

射影を使ったJoin (Java/Python Only)

Join変換はここで示すように射影を使って結果のタプルを構築することができます:

DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>
            result =
            input1.join(input2)
                  // key definition on first DataSet using a field position key
                  .where(0)
                  // key definition of second DataSet using a field position key
                  .equalTo(0)
                  // select and reorder fields of matching tuples
                  .projectFirst(0,2).projectSecond(1).projectFirst(1);

projectFirst(int...) and projectSecond(int...) select the fields of the first and second joined input that should be assembled into an output Tuple. インデックスの順番は出力タプル内でのフィールドの順番を定義します。join射影は非タプルデータセットにも動作します。この場合、projectFirst() あるいは projectSecond() はjoinされた要素を出力タプルに追加するための引数無しで呼ばれなければなりません。

Not supported.
 result = input1.join(input2).where(0).equal_to(0) \
  .project_first(0,2).project_second(1).project_first(1);

project_first(int...) and project_second(int...) select the fields of the first and second joined input that should be assembled into an output Tuple. インデックスの順番は出力タプル内でのフィールドの順番を定義します。join射影は非タプルデータセットにも動作します。この場合、project_first() あるいは project_second() はjoinされた要素を出力タプルに追加するための引数無しで呼ばれなければなりません。

データセットのサイズヒントを持つJoin

オプティマイザが正しい実行戦略を選ぶように導くために、ここで示すようにjoinにデータセットのサイズをほのめかすことができます:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result1 =
            // hint that the second DataSet is very small
            input1.joinWithTiny(input2)
                  .where(0)
                  .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result2 =
            // hint that the second DataSet is very large
            input1.joinWithHuge(input2)
                  .where(0)
                  .equalTo(0);
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]

// hint that the second DataSet is very small
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)

// hint that the second DataSet is very large
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
 #hint that the second DataSet is very small
 result1 = input1.join_with_tiny(input2).where(0).equal_to(0)

 #hint that the second DataSet is very large
 result1 = input1.join_with_huge(input2).where(0).equal_to(0)

Join アルゴリズムのヒント

Flinkのランタイムは様々な方法でjoinを実行することができます。それぞれの可能な方法は、異なる状況で他に勝っています。システムは適切な方法を自動的に選ぼうとしますが、joinを実行する特定の方法を強制したい場合は、手動で戦略を選ぶことができます。

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");
val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]

// hint that the second DataSet is very small
val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
Not supported.

以下のヒントが利用可能です:

  • OPTIMIZER_CHOOSES: ヒントを全く与えないのと等価。選択をシステムに任せます。

  • BROADCAST_HASH_FIRST: 最初の入力をブロードキャストし、それからハッシュテーブルを構築します。これは2つ目の入力によって走査されます。最初の入力がとても小さい場合には良い戦略です。

  • BROADCAST_HASH_SECOND: 2つ目の入力をブロードキャストし、それからハッシュテーブルを構築します。これは最初の入力によって走査されます。2つ目の入力がとても小さい場合には良い戦略です。

  • REPARTITION_HASH_FIRST: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、最初の入力からハッシュテーブルを構築します。最初の入力が2つ目よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。注意: これはサイズの予測が行えず、既存のパーティションとソート順が再利用できない場合にシステムが使用するデフォルトの予備の戦略です。

  • REPARTITION_HASH_SECOND: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、2つ目の入力からハッシュテーブルを構築します。2つ目の入力が最初よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。

  • REPARTITION_SORT_MERGE: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、(まだソートされていない場合は)各入力をソートします。入力はソートされた入力のストリーム マージによってjoinされます。1つあるいは両方の入力が既にソートされている場合に良い戦略です。

OuterJoin

OuterJoin変換は2つのデータセットに left, right, あるいは full outer join を行います。Outer joins は通常(inner)のjoinに似ていて、キーが等しい要素の全てのペアを生成します。更に、“outer” 側 (left, right, あるいはfullの場合は両方)はもし他方に一致するキーが見つからない場合でも残されます。要素のペアから1つの要素に変えるために、要素の揃いのペア(あるいは1つの要素と、他の入力のためのnull)がJoinFunctionに渡されます。あるいは、要素のペアから任意の多数(無しも含む)の要素に変えるためにFlatJoinFunction に渡されます。

両方のデータセットの要素は、以下を使って指定することができる1つ以上のキーにjoinされます。

  • キー表現
  • キー セレクタ関数
  • 1つ以上のフィールド位置キー (タプルデータセットのみ)。
  • Case Class フィールド

OuterJoins はJavaおよびScalaのデータセットAPIにのみサポートされます。

Join関数を使ったOuterJoin

OuterJoin変換はjoinするタプルを処理するためにユーザ定義のjoin関数を呼びます。join関数は最初の入力データセットの1つの要素と2つ目の入力データセットの1つの要素を受け取り、正確に1つの要素を返します。outer join (left, right, full) の型に応じて、join関数の2つの入力要素のうちの1つがnullになりえます。

以下のコードはキー選択関数を使って独自のjavaオブジェクトとタプルデータセットを持つデータセットのleft outer joinを実施し、ユーザ定義のjoin関数を使う方法を示します:

// some POJO
public class Rating {
  public String name;
  public String category;
  public int points;
}

// Join function that joins a custom POJO with a Tuple
public class PointAssigner
         implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {

  @Override
  public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating) {
    // Assigns the rating points to the movie.
    // NOTE: rating might be null
    return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points;
  }
}

DataSet<Tuple2<String, String>> movies = // [...]
DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Integer>>
            moviesWithPoints =
            movies.leftOuterJoin(ratings)

                   // key of the first input
                   .where("f0")

                   // key of the second input
                   .equalTo("name")

                   // applying the JoinFunction on joining pairs
                   .with(new PointAssigner());
case class Rating(name: String, category: String, points: Int)

val movies: DataSet[(String, String)] = // [...]
val ratings: DataSet[Ratings] = // [...]

val moviesWithPoints = movies.leftOuterJoin(ratings).where(0).equalTo("name") {
  (movie, rating) => (movie._1, if (rating == null) -1 else rating.points)
}
Not supported.

Flat-Join関数を使ったOuterJoin

MapとFlatMapに似て、flat-join関数を使ったOuterJoinはJoin関数を使ったOuterJoinとして同じ方法で振る舞いますが、1つの要素を返す代わりに、0, 1 以上の要素を返す(集める)ことができます。

public class PointAssigner
         implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
  @Override
  public void join(Tuple2<String, String> movie, Rating rating
    Collector<Tuple2<String, Integer>> out) {
  if (rating == null ) {
    out.collect(new Tuple2<String, Integer>(movie.f0, -1));
  } else if (rating.points < 10) {
    out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
  } else {
    // do not emit
  }
}

DataSet<Tuple2<String, Integer>>
            moviesWithPoints =
            movies.leftOuterJoin(ratings) // [...]
Not supported.
Not supported.

Join アルゴリズムのヒント

Flinkのランタイムは様々な方法でouter joinを実行することができます。それぞれの可能な方法は、異なる状況で他に勝っています。システムは適切な方法を自動的に選ぼうとしますが、outer joinを実行する特定の方法を強制したい場合は、手動で戦略を選ぶことができます。

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result1 =
      input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
            .where("id").equalTo("key");

DataSet<Tuple2<SomeType, AnotherType> result2 =
      input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");
val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]

// hint that the second DataSet is very small
val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")

val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
Not supported.

以下のヒントが利用可能です。

  • OPTIMIZER_CHOOSES: ヒントを全く与えないのと等価。選択をシステムに任せます。

  • BROADCAST_HASH_FIRST: 最初の入力をブロードキャストし、それからハッシュテーブルを構築します。これは2つ目の入力によって走査されます。最初の入力がとても小さい場合には良い戦略です。

  • BROADCAST_HASH_SECOND: 2つ目の入力をブロードキャストし、それからハッシュテーブルを構築します。これは最初の入力によって走査されます。2つ目の入力がとても小さい場合には良い戦略です。

  • REPARTITION_HASH_FIRST: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、最初の入力からハッシュテーブルを構築します。最初の入力が2つ目よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。

  • REPARTITION_HASH_SECOND: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、2つ目の入力からハッシュテーブルを構築します。2つ目の入力が最初よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。

  • REPARTITION_SORT_MERGE: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、(まだソートされていない場合は)各入力をソートします。入力はソートされた入力のストリーム マージによってjoinされます。1つあるいは両方の入力が既にソートされている場合に良い戦略です。

注意: 各 outer join型によってまだ全ての実行戦略がサポートされているわけではありません。

  • LeftOuterJoin は以下をサポートします:
    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_SECOND
    • REPARTITION_HASH_SECOND
    • REPARTITION_SORT_MERGE
  • RightOuterJoin は以下をサポートします:
    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_FIRST
    • REPARTITION_HASH_FIRST
    • REPARTITION_SORT_MERGE
  • FullOuterJoin は以下をサポートします:
    • OPTIMIZER_CHOOSES
    • REPARTITION_SORT_MERGE

Cross

Cross変換は2つのデータセットを1つのデータセットに結合します。それは両方の入力データセットの要素の全てのペアの組み合わせを構築します。つまり、デカルト積を構築します。Cross変換は、要素の各ペア上でユーザ定義のcross関数を呼ぶか、Tuple2を出力するかをします。以下では両方のモードを示します。

注意: Crossは大規模計算クラスタにさえ挑戦する潜在的にとてもコンピュータに向いている操作です。

ユーザ定義関数を使ったCross

Cross 変換はユーザ定義のcross関数を呼ぶことができます。cross 関数は最初の入力の1つの要素と2つ目の入力の1つの要素を受け取り、正確に1つの結果要素を返します。

以下のコードはcross関数を使って2つのデータセットにCross変換を適用する方法を示します:

public class Coord {
  public int id;
  public int x;
  public int y;
}

// CrossFunction computes the Euclidean distance between two Coord objects.
public class EuclideanDistComputer
         implements CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {

  @Override
  public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
    // compute Euclidean distance of coordinates
    double dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2));
    return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
  }
}

DataSet<Coord> coords1 = // [...]
DataSet<Coord> coords2 = // [...]
DataSet<Tuple3<Integer, Integer, Double>>
            distances =
            coords1.cross(coords2)
                   // apply CrossFunction
                   .with(new EuclideanDistComputer());

投影を使ったcross

Cross 変換はここで示す投影を使って結果タプルを構築することもできます:

DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, Byte, Integer, Double>
            result =
            input1.cross(input2)
                  // select and reorder fields of matching tuples
                  .projectSecond(0).projectFirst(1,0).projectSecond(1);

Cross投影でのフィールド選択はJoin結果の投影と同じ方法で動作します。

case class Coord(id: Int, x: Int, y: Int)

val coords1: DataSet[Coord] = // [...]
val coords2: DataSet[Coord] = // [...]

val distances = coords1.cross(coords2) {
  (c1, c2) =>
    val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
    (c1.id, c2.id, dist)
}
 class Euclid(CrossFunction):
   def cross(self, c1, c2):
     return (c1[0], c2[0], sqrt(pow(c1[1] - c2.[1], 2) + pow(c1[2] - c2[2], 2)))

 distances = coords1.cross(coords2).using(Euclid())

投影を使ったcross

Cross 変換はここで示す投影を使って結果タプルを構築することもできます:

result = input1.cross(input2).projectFirst(1,0).projectSecond(0,1);

Cross投影でのフィールド選択はJoin結果の投影と同じ方法で動作します。

データセットサイズのヒントを使ったCross

オプティマイザが正しい実行戦略を選ぶように導くために、ここで示すようにcrossにデータセットのサイズをほのめかすことができます:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple4<Integer, String, Integer, String>>
            udfResult =
                  // hint that the second DataSet is very small
            input1.crossWithTiny(input2)
                  // apply any Cross function (or projection)
                  .with(new MyCrosser());

DataSet<Tuple3<Integer, Integer, String>>
            projectResult =
                  // hint that the second DataSet is very large
            input1.crossWithHuge(input2)
                  // apply a projection (or any Cross function)
                  .projectFirst(0,1).projectSecond(1);
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]

// hint that the second DataSet is very small
val result1 = input1.crossWithTiny(input2)

// hint that the second DataSet is very large
val result1 = input1.crossWithHuge(input2)
 #hint that the second DataSet is very small
 result1 = input1.cross_with_tiny(input2)

 #hint that the second DataSet is very large
 result1 = input1.cross_with_huge(input2)

CoGroup

CoGroup 変換は2つのデータセットのグループの両方を処理します。両方のデータセットは定義されたキーでグループ化され、同じキーを共有する両方のデータセットのグループは共にユーザ定義のco-group関数に渡されます。1つのデータセットだけがグループを持つ特定のキーについては、co-group関数は1つのグループと空のグループを使って呼ばれます。co-group 関数は両方のグループの要素上で個々に繰り返すことができ、結果要素の任意の数を返すことができます。

Reduce、GroupReduceおよびJoinに似て、キーは異なるキー選択メソッドを使って定義することができます。

データセット上のCoGroup

例はフィールド位置キーでグループにする方法を示します (タプルデータセットのみ)。Pojo-型およびキー表現を使って同じことをすることができます。

// Some CoGroupFunction definition
class MyCoGrouper
         implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {

  @Override
  public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
                      Iterable<Tuple2<String, Double>> dVals,
                      Collector<Double> out) {

    Set<Integer> ints = new HashSet<Integer>();

    // add all Integer values in group to set
    for (Tuple2<String, Integer>> val : iVals) {
      ints.add(val.f1);
    }

    // multiply each Double value with each unique Integer values of group
    for (Tuple2<String, Double> val : dVals) {
      for (Integer i : ints) {
        out.collect(val.f1 * i);
      }
    }
  }
}

// [...]
DataSet<Tuple2<String, Integer>> iVals = // [...]
DataSet<Tuple2<String, Double>> dVals = // [...]
DataSet<Double> output = iVals.coGroup(dVals)
                         // group first DataSet on first tuple field
                         .where(0)
                         // group second DataSet on first tuple field
                         .equalTo(0)
                         // apply CoGroup function on each pair of groups
                         .with(new MyCoGrouper());
val iVals: DataSet[(String, Int)] = // [...]
val dVals: DataSet[(String, Double)] = // [...]

val output = iVals.coGroup(dVals).where(0).equalTo(0) {
  (iVals, dVals, out: Collector[Double]) =>
    val ints = iVals map { _._2 } toSet

    for (dVal <- dVals) {
      for (i <- ints) {
        out.collect(dVal._2 * i)
      }
    }
}
 class CoGroup(CoGroupFunction):
   def co_group(self, ivals, dvals, collector):
     ints = dict()
     # add all Integer values in group to set
     for value in ivals:
       ints[value[1]] = 1
     # multiply each Double value with each unique Integer values of group
     for value in dvals:
       for i in ints.keys():
         collector.collect(value[1] * i)


 output = ivals.co_group(dvals).where(0).equal_to(0).using(CoGroup())

和集合

2つのデータセットの結合を生成します。データセットは同じ型でなければなりません。以下で示されるように、2つ以上のデータセットの結合を複数のunion呼び出しを使って実装することができます:

DataSet<Tuple2<String, Integer>> vals1 = // [...]
DataSet<Tuple2<String, Integer>> vals2 = // [...]
DataSet<Tuple2<String, Integer>> vals3 = // [...]
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2).union(vals3);
val vals1: DataSet[(String, Int)] = // [...]
val vals2: DataSet[(String, Int)] = // [...]
val vals3: DataSet[(String, Int)] = // [...]

val unioned = vals1.union(vals2).union(vals3)
 unioned = vals1.union(vals2).union(vals3)

リバランス

データの歪を除去するために一様にデータセットの並行パーティションをリバランスします。

DataSet<String> in = // [...]
// rebalance DataSet and apply a Map transformation.
DataSet<Tuple2<String, String>> out = in.rebalance()
                                        .map(new Mapper());
val in: DataSet[String] = // [...]
// rebalance DataSet and apply a Map transformation.
val out = in.rebalance().map { ... }
Not supported.

ハッシュ パーティション

指定されたキーのデータセットをハッシュ-パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません (キーを指定する方法についてはReduce examplesを見てください)。

DataSet<Tuple2<String, Integer>> in = // [...]
// hash-partition DataSet by String value and apply a MapPartition transformation.
DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
                                        .mapPartition(new PartitionMapper());
val in: DataSet[(String, Int)] = // [...]
// hash-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByHash(0).mapPartition { ... }
Not supported.

レンジ パーティション

指定されたキーのデータセットをレンジ-パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません (キーを指定する方法についてはReduce examplesを見てください)。

DataSet<Tuple2<String, Integer>> in = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
                                        .mapPartition(new PartitionMapper());
val in: DataSet[(String, Int)] = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByRange(0).mapPartition { ... }
Not supported.

ソート パーティション

指定されたフィールド上で指定された順番でデータセットの全てのパーティションをローカルでソートする。フィールドはフィールド表現あるいはフィールド位置として指定することができます (キーを指定する方法はReduce examples を見てください)。パーティションは sortPartition() 呼び出しを繋げることで複数のフィールドでソートすることができます。

DataSet<Tuple2<String, Integer>> in = // [...]
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
                                        .sortPartition(0, Order.DESCENDING)
                                        .mapPartition(new PartitionMapper());
val in: DataSet[(String, Int)] = // [...]
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
val out = in.sortPartition(1, Order.ASCENDING)
            .sortPartition(0, Order.DESCENDING)
            .mapPartition { ... }
Not supported.

First-n

データセットの最初のn(任意)個の要素を返します。First-n は通常のデータセット、グループ化されたデータセットあるいはグループ化ソート化されたデータセット上に適用することができます。グループ化キーはキー選択関数あるいはフィールド位置キーとして指定することができます (キーを指定する方法はReduce examples を見てください)。

DataSet<Tuple2<String, Integer>> in = // [...]
// Return the first five (arbitrary) elements of the DataSet
DataSet<Tuple2<String, Integer>> out1 = in.first(5);

// Return the first two (arbitrary) elements of each String group
DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
                                          .first(2);

// Return the first three elements of each String group ordered by the Integer field
DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
                                          .sortGroup(1, Order.ASCENDING)
                                          .first(3);
val in: DataSet[(String, Int)] = // [...]
// Return the first five (arbitrary) elements of the DataSet
val out1 = in.first(5)

// Return the first two (arbitrary) elements of each String group
val out2 = in.groupBy(0).first(2)

// Return the first three elements of each String group ordered by the Integer field
val out3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
Not supported.
TOP
inserted by FC2 system