このドキュメントはデータセット上で利用可能な変換の深い部分まで説明します。Flink Java APIの一般的な説明については、プログラミング ガイドを参照してください。
denseインデックスを持つデータセット内の要素のジッピングについては、Zip 要素 ガイドを参照してください。
Map変換はユーザ定義のmap関数をデータセットの各要素に適用します。それはone-to-oneマッピングを実装します。つまり、確実に1つの要素が関数によって返されなければなりません。
以下のコードはIntegerのペアのデータセットがIntegerのデータセットに変換されます:
// MapFunction that adds two integer values
public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> in) {
return in.f0 + in.f1;
}
}
// [...]
DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
DataSet<Integer> intSums = intPairs.map(new IntAdder());
val intPairs: DataSet[(Int, Int)] = // [...]
val intSums = intPairs.map { pair => pair._1 + pair._2 }
intSums = intPairs.map(lambda x: sum(x))
FlatMap変換はユーザ定義のflat-map関数をデータセットの各要素に適用します。map関数のこの変種は各入力要素についての任意の多くの結果要素(noneを含む)を返すことができます。
以下のコードはテキスト行のデータセットを単語のデータセットに変換します:
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) {
for (String token : value.split("\\W")) {
out.collect(token);
}
}
}
// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());
val textLines: DataSet[String] = // [...]
val words = textLines.flatMap { _.split(" ") }
words = lines.flat_map(lambda x,c: [line.split() for line in x])
MapPartition は1つの関数呼び出しの中の並行パーティションを変換します。map-partition 関数は Iterable としてパーティションを取り、任意の数の結果値を生成することができます。各パーティション内の要素の数は並行度の度合と以前のオペレーションに依存します。
以下のコードはテキスト行のデータセットをパーティションごとのカウントのデータセットに変換します:
public class PartitionCounter implements MapPartitionFunction<String, Long> {
public void mapPartition(Iterable<String> values, Collector<Long> out) {
long c = 0;
for (String s : values) {
c++;
}
out.collect(c);
}
}
// [...]
DataSet<String> textLines = // [...]
DataSet<Long> counts = textLines.mapPartition(new PartitionCounter());
val textLines: DataSet[String] = // [...]
// Some is required because the return value must be a Collection.
// There is an implicit conversion from Option to a Collection.
val counts = texLines.mapPartition { in => Some(in.size) }
counts = lines.map_partition(lambda x,c: [sum(1 for _ in x)])
Filter 変換はデータセットの各要素にユーザ定義のfilter関数を適用し、関数がtrue
を返すそれらの要素だけを保持します。
以下のコードはデータセットから0より小さい全てのIntegerを取り除きます:
// FilterFunction that filters out all Integers smaller than zero.
public class NaturalNumberFilter implements FilterFunction<Integer> {
@Override
public boolean filter(Integer number) {
return number >= 0;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
val intNumbers: DataSet[Int] = // [...]
val naturalNumbers = intNumbers.filter { _ > 0 }
naturalNumbers = intNumbers.filter(lambda x: x > 0)
重要: システムはその関数が述語が適用される要素を修正しないと仮定します。この仮定を破ると間違った結果に繋がるかも知れません。
プロジェクトの変換はタプルのデータセットのタプルフィールドを削除あるいは移動します。project(int...)
メソッドは、インデックスによって保持されうべきタプルフィールドを選択し、出力のタプルの中でそれらの順番を定義します。
Projection はユーザ関数の定義を必要としません。
以下のコードはデータセット上のProject変換を適用するための異なる方法を示します:
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
Javaコンパイラはproject
オペレータの返りの型を推測できないことに注意してください。これは以下のようなproject
オペレータの結果に他のオペレータを呼び出す場合に問題を起こすかも知れません:
DataSet<Tuple5<String,String,String,String,String>> ds = ....
DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);
この問題は以下のようにproject
オペレータの返りの型をほのめかすことで克服することができるかも知れません:
DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);
Not supported.
out = in.project(2,0);
reduceオペレーションはグループ化されたデータセット上で操作することができます。グルーピングするために使われるキーを指定するには多くの方法があります:
グルーピング キーがどうやって指定されるかを見るために、reduceの例を見てみてください。
グループ化されたデータセットに適用されるReduce変換は、ユーザ定義のreduce関数を使って各グループを1つの要素に減らします。入力要素の各グループについては、各グループごとに1つの要素だけが残るまで、reduce関数が連続して要素のペアを1つの要素に組み合わせます。
ReduceFunction
については、返されたオブジェクトの固定されたフィールドが入力値に一致しなければならないことに注意してください。これは、reduceは暗黙的に組み合わせ可能でありcombineオペレータから出力されたオブジェクトはreduceオペレータに渡された時にキーによってグループ化されるからです。
キー表現はデータセットの各要素の1つ以上のフィールドを指定します。各キー表現はpublicなフィールドあるいはgetterメソッドの名前のいずれかです。オブジェクトをドリルダウンするためにdotを使うことができます。キー表現 “*” は全てのフィールドを選択します。以下のコードはキー表現を使ってPOJOデータセットをグループ化およびreduce関数を使ってreduceする方法を示します。
// some ordinary POJO
public class WC {
public String word;
public int count;
// [...]
}
// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
@Override
public WC reduce(WC in1, WC in2) {
return new WC(in1.word, in1.count + in2.count);
}
}
// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
// DataSet grouping on field "word"
.groupBy("word")
// apply ReduceFunction on grouped DataSet
.reduce(new WordCounter());
// some ordinary POJO
class WC(val word: String, val count: Int) {
def this() {
this(null, -1)
}
// [...]
}
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
Not supported.
key-selector 関数はデータセットの各様からキーの値を抽出します。抽出されたキーの値はデータセットをグループ化するために使われます。以下のコードはkey-selector関数を使ってPOJOデータセットをグループ化およびreduce関数を使ってreduceする方法を示します。
// some ordinary POJO
public class WC {
public String word;
public int count;
// [...]
}
// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
@Override
public WC reduce(WC in1, WC in2) {
return new WC(in1.word, in1.count + in2.count);
}
}
// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
// DataSet grouping on field "word"
.groupBy(new SelectWord())
// apply ReduceFunction on grouped DataSet
.reduce(new WordCounter());
public class SelectWord implements KeySelector<WC, String> {
@Override
public String getKey(Word w) {
return w.word;
}
}
// some ordinary POJO
class WC(val word: String, val count: Int) {
def this() {
this(null, -1)
}
// [...]
}
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy { _.word } reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
class WordCounter(ReduceFunction):
def reduce(self, in1, in2):
return (in1[0], in1[1] + in2[1])
words = // [...]
wordCounts = words \
.group_by(lambda x: x[0]) \
.reduce(WordCounter())
フィールド位置キーはグルーピングキーとして使われた1つ以上のタプルデータセットのフィールドを指定します。以下のコードはフィールド位置キーをどうやって使い、reduce関数を適用するかを示します。
DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
DataSet<Tuple3<String, Integer, Double>> reducedTuples = tuples
// group DataSet on first and second field of Tuple
.groupBy(0, 1)
// apply ReduceFunction on grouped DataSet
.reduce(new MyTupleReducer());
val tuples = DataSet[(String, Int, Double)] = // [...]
// group on the first and second Tuple field
val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
reducedTuples = tuples.group_by(0, 1).reduce( ... )
Caseクラスを使う場合は、フィールドの名前を使ってグルーピングキーを指定することもできます。
Not supported.
case class MyClass(val a: String, b: Int, c: Double)
val tuples = DataSet[MyClass] = // [...]
// group on the first and second field
val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
Not supported.
グループ化されたデータセットに適用されるGroupReduce変換は各グループについてユーザ定義の group-reduce 関数を呼びます。これと Reduce の違いはユーザ定義関数が一度にグループ全体を取得することです。関数はグループの全ての要素に対してIterableを伴って起動され、任意の数の結果要素を返すことができます。
以下のコードは、Integerによってグループ化されたデータセットからどのように複製された文字列を削除することができるかを示します。
public class DistinctReduce
implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
Set<String> uniqStrings = new HashSet<String>();
Integer key = null;
// add all strings of the group to the set
for (Tuple2<Integer, String> t : in) {
key = t.f0;
uniqStrings.add(t.f1);
}
// emit all unique strings.
for (String s : uniqStrings) {
out.collect(new Tuple2<Integer, String>(key, s));
}
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Tuple2<Integer, String>> output = input
.groupBy(0) // group DataSet by the first tuple field
.reduceGroup(new DistinctReduce()); // apply GroupReduceFunction
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).reduceGroup {
(in, out: Collector[(Int, String)]) =>
in.toSet foreach (out.collect)
}
class DistinctReduce(GroupReduceFunction):
def reduce(self, iterator, collector):
dic = dict()
for value in iterator:
dic[value[1]] = 1
for key in dic.keys():
collector.collect(key)
output = data.group_by(0).reduce_group(DistinctReduce())
Reduce 変換での key 表現、key-selector 関数、およびcase class fieldsと類似した動作をします。
group-reduce 関数はIterableを使ってグループの要素にアクセスします。任意で、Iterableは特定の順序でグループの要素を分配することができます。多くの場合、これはユーザ定義のgroup-reduce関数の複雑さを減らすことに役立ち、その効率性を改善することができます。
以下のコードはIntegerでグループ化され文字列でソートされたデータセット内の重複した文字列を削除する別の例を示します。
// GroupReduceFunction that removes consecutive identical elements
public class DistinctReduce
implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
Integer key = null;
String comp = null;
for (Tuple2<Integer, String> t : in) {
key = t.f0;
String next = t.f1;
// check if strings are different
if (com == null || !next.equals(comp)) {
out.collect(new Tuple2<Integer, String>(key, next));
comp = next;
}
}
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Double> output = input
.groupBy(0) // group DataSet by first field
.sortGroup(1, Order.ASCENDING) // sort groups on second tuple field
.reduceGroup(new DistinctReduce());
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
(in, out: Collector[(Int, String)]) =>
var prev: (Int, String) = null
for (t <- in) {
if (prev == null || prev != t)
out.collect(t)
}
}
class DistinctReduce(GroupReduceFunction):
def reduce(self, iterator, collector):
dic = dict()
for value in iterator:
dic[value[1]] = 1
for key in dic.keys():
collector.collect(key)
output = data.group_by(0).sort_group(1, Order.ASCENDING).reduce_group(DistinctReduce())
Note: A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
reduce 関数とは違って、group-reduce 関数は暗黙的に結合可能ではありません。group-reduce 関数を結合可能にするには、GroupCombineFunction
インタフェースを実装しなければなりません。
重要: GroupCombineFunction
インタフェースの一般的な入力および出力型は、以下の例で示されるようにGroupReduceFunction
の一般的な入力型と等しくなければなりません:
// Combinable GroupReduceFunction that computes a sum.
public class MyCombinableGroupReducer implements
GroupReduceFunction<Tuple2<String, Integer>, String>,
GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
{
@Override
public void reduce(Iterable<Tuple2<String, Integer>> in,
Collector<String> out) {
String key = null;
int sum = 0;
for (Tuple2<String, Integer> curr : in) {
key = curr.f0;
sum += curr.f1;
}
// concat key and sum and emit
out.collect(key + "-" + sum);
}
@Override
public void combine(Iterable<Tuple2<String, Integer>> in,
Collector<Tuple2<String, Integer>> out) {
String key = null;
int sum = 0;
for (Tuple2<String, Integer> curr : in) {
key = curr.f0;
sum += curr.f1;
}
// emit tuple with key and sum
out.collect(new Tuple2<>(key, sum));
}
}
// Combinable GroupReduceFunction that computes two sums.
class MyCombinableGroupReducer
extends GroupReduceFunction[(String, Int), String]
with GroupCombineFunction[(String, Int), (String, Int)]
{
override def reduce(
in: java.lang.Iterable[(String, Int)],
out: Collector[String]): Unit =
{
val r: (String, Int) =
in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
// concat key and sum and emit
out.collect (r._1 + "-" + r._2)
}
override def combine(
in: java.lang.Iterable[(String, Int)],
out: Collector[(String, Int)]): Unit =
{
val r: (String, Int) =
in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
// emit tuple with key and sum
out.collect(r)
}
}
class GroupReduce(GroupReduceFunction):
def reduce(self, iterator, collector):
key, int_sum = iterator.next()
for value in iterator:
int_sum += value[1]
collector.collect(key + "-" + int_sum))
def combine(self, iterator, collector):
key, int_sum = iterator.next()
for value in iterator:
int_sum += value[1]
collector.collect((key, int_sum))
data.reduce_group(GroupReduce(), combinable=True)
GroupCombine 変換は結合可能なGroupReduceFunctionの結合ステップの一般化された形式です。それは入力型I
を任意の出力型O
に結合できるという意味で一般化されます。対照的に、GroupReduceの結合ステップは入力型I
から出力型 I
への結合のみを許可します。これはGroupReduceFunctionのreduceステップが入力型 I
を期待するからです。
幾つかのアプリケーションでは、追加の変換を実施する前にデータセットを中間形式に結合するのが望ましいです (例えば、データサイズを削減するため)。これはCombineGroup変換を使ってほんの少しのコストで行うことができます。
注意: グループ化されたデータセット上のGroupCombineは、一度に全てのデータを処理しないかもしれないが複数のステップ内で処理する貪欲な戦略を使ってメモリ内で行われます。GroupReduce変換のようにデータの交換無しに個々のパーティション上でも行われます。これは部分的な結果につながるかも知れません。
以下の例はWordCount実装に取って代わるCombineGroup変換の使い方を実演します。
DataSet<String> input = [..] // The words received as input
DataSet<Tuple2<String, Integer>> combinedWords = input
.groupBy(0); // group identical words
.combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
String key = null;
int count = 0;
for (String word : words) {
key = word;
count++;
}
// emit tuple with word and count
out.collect(new Tuple2(key, count));
}
});
DataSet<Tuple2<String, Integer>> output = combinedWords
.groupBy(0); // group by words again
.reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange
public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
String key = null;
int count = 0;
for (Tuple2<String, Integer> word : words) {
key = word;
count++;
}
// emit tuple with word and count
out.collect(new Tuple2(key, count));
}
});
val input: DataSet[String] = [..] // The words received as input
val combinedWords: DataSet[(String, Int)] = input
.groupBy(0)
.combineGroup {
(words, out: Collector[(String, Int)]) =>
var key: String = null
var count = 0
for (word <- words) {
key = word
count += 1
}
out.collect((key, count))
}
val output: DataSet[(String, Int)] = combinedWords
.groupBy(0)
.reduceGroup {
(words, out: Collector[(String, Int)]) =>
var key: String = null
var sum = 0
for ((word, sum) <- words) {
key = word
sum += count
}
out.collect((key, sum))
}
Not supported.
上の代替のWordCount実装は、GroupCombineがGroupReduce変換を行う前にどうやって結合するかを実演します。上の例は単なる概念の証明です。GroupReduceの前に通常は追加のMap変換を必要とするだろうデータセットの型を、結合ステップがどう変更するかに注意してください。
よく使われる一般的な集約操作の幾つかです。集約変換は以下の組み込みの集約関数を提供します:
集約変換はタプルデータセット上にのみ適用することができ、グルーピングのためにフィールド位置キーのみをサポートします。
以下のコードは集約変換をフィールド位置キーによってグループ化されたデータセット上に適用する方法を示します:
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
.groupBy(1) // group DataSet on second field
.aggregate(SUM, 0) // compute sum of the first field
.and(MIN, 2); // compute minimum of the third field
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
from flink.functions.Aggregation import Sum, Min
input = # [...]
output = input.group_by(1).aggregate(Sum, 0).and_agg(Min, 2)
複数の集約をデータセットに適用するには、最初の集約の後に.and()
関数を使う必要があります。このことは.aggregate(SUM, 0).and(MIN, 2)
はフィールド0と元のデータセットのフィールド2の最小の合計を生成することを意味します。対照的に、.aggregate(SUM, 0).aggregate(MIN, 2)
は集約上に集約を適用するでしょう。指定された例では、フィールド1によってグループ化されたフィールド2の合計を計算した後で、フィールド2の最小を生成します。
注意: 集約関数のセットは将来的に拡張されるでしょう。
MinBy (MaxBy) 変換はタプルの各グループについて1つのタプルを選択します。選択されたタプルは1つ以上の指定されたフィールドが最小 (最大)の値のタプルです。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小 (最大)のフィールド値を持つ場合、それらのタプルの任意のタプルが返されます。
以下のコードは、DataSet<Tuple3<Integer, String, Double>>
からの同じString
値を持つ各タプルグループについてのInteger
および Double
フィールドについて最小の値を持つタプルを選択する方法を示します:
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
.groupBy(1) // group DataSet on second field
.minBy(0, 2); // select tuple with minimum values for first and third field.
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input
.groupBy(1) // group DataSet on second field
.minBy(0, 2) // select tuple with minimum values for first and third field.
Not supported.
Reduce変換はユーザ定義のreduce関数をデータセットの各要素に適用します。1つの要素だけが残るまで、reduce関数が連続して要素のペアを1つの要素に組み合わせます。
以下のコードはIntegerのデータセットの全ての要素を合計する方法を示します:
// ReduceFunction that sums Integers
public class IntSummer implements ReduceFunction<Integer> {
@Override
public Integer reduce(Integer num1, Integer num2) {
return num1 + num2;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
val intNumbers = env.fromElements(1,2,3)
val sum = intNumbers.reduce (_ + _)
intNumbers = env.from_elements(1,2,3)
sum = intNumbers.reduce(lambda x,y: x + y)
Reduce変換を使ってデータセット全体をreduceすることは、最終のReduce操作が並行して行うことができないことを意味します。However, a reduce function is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
GroupReduce変換はユーザ定義のgroup-reduce関数をデータセットの全ての要素に適用します。group-reduce はデータセットの全ての要素上で繰り返し、任意の数の結果要素を返します。
以下の例はデータセット全体のGroupReduce変換をどうやって適用するかを示します:
DataSet<Integer> input = // [...]
// apply a (preferably combinable) GroupReduceFunction to a DataSet
DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
val input: DataSet[Int] = // [...]
val output = input.reduceGroup(new MyGroupReducer())
output = data.reduce_group(MyGroupReducer())
注意: group-reduce関数が結合可能でなければ、データセット全体のGroupReduce変換は行うことができません。したがって、これはコンピュータにとても向いている操作になりえます。稀有豪可能なgroup-reduce関数を実装する方法を学ぶには、上の“結合可能なGroupReduceFunctions” の段落を見てください。
データセット全体のGroupCombineはグループ化されたデータセット上のGroupCombineと似た動作をします。データは全てのノード上にパーティション化され、その後に貪欲的な形式で結合されます (つまり、メモリに収まっているデータのみが一度に結合されます)。
よく使われる一般的な集約操作の幾つかです。集約変換は以下の組み込みの集約関数を提供します:
集約変換はタプルのデータセットにのみ適用することができます。
以下のコードはデータセット全体のAggregation変換をどうやって適用するかを示します:
DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input
.aggregate(SUM, 0) // compute sum of the first field
.and(MIN, 1); // compute minimum of the second field
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.aggregate(SUM, 0).and(MIN, 2)
from flink.functions.Aggregation import Sum, Min
input = # [...]
output = input.aggregate(Sum, 0).and_agg(Min, 2)
注意: サポートされる集約関数のセットの拡張がロードマップにあります。
MinBy (MaxBy) 変換はタプルのデータセットから1つのタプルを選択します。選択されたタプルは1つ以上の指定されたフィールドが最小 (最大)の値のタプルです。比較に使われるフィールドは有効なキーフィールド、つまり比較可能、でなければなりません。複数のタプルが最小 (最大)のフィールド値を持つ場合、それらのタプルの任意のタプルが返されます。
以下のコードは、DataSet<Tuple3<Integer, String, Double>>
から Integer
および Double
フィールドについて最大値を持つタプルを選択する方法を示します:
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
.maxBy(0, 2); // select tuple with maximum values for first and third field.
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input
.maxBy(0, 2) // select tuple with maximum values for first and third field.
Not supported.
Distinct変換は元のデータセットの明確に異なる要素のデータセットを計算します。以下のコードはデータセットから全ての重複する要素を削除します:
DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input.distinct();
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.distinct()
Not supported.
以下のようにしてデータセット内の要素の識別が決定されるかを変更することも可能です:
DataSet<Tuple2<Integer, Double, String>> input = // [...]
DataSet<Tuple2<Integer, Double, String>> output = input.distinct(0,2);
val input: DataSet[(Int, Double, String)] = // [...]
val output = input.distinct(0,2)
Not supported.
private static class AbsSelector implements KeySelector<Integer, Integer> {
private static final long serialVersionUID = 1L;
@Override
public Integer getKey(Integer t) {
return Math.abs(t);
}
}
DataSet<Integer> input = // [...]
DataSet<Integer> output = input.distinct(new AbsSelector());
val input: DataSet[Int] = // [...]
val output = input.distinct {x => Math.abs(x)}
Not supported.
// some ordinary POJO
public class CustomType {
public String aName;
public int aNumber;
// [...]
}
DataSet<CustomType> input = // [...]
DataSet<CustomType> output = input.distinct("aName", "aNumber");
// some ordinary POJO
case class CustomType(aName : String, aNumber : Int) { }
val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber")
Not supported.
ワイルドカード文字を使って全てのフィールドを使うように指示することも可能です:
DataSet<CustomType> input = // [...]
DataSet<CustomType> output = input.distinct("*");
// some ordinary POJO
val input: DataSet[CustomType] = // [...]
val output = input.distinct("_")
Not supported.
Join変換は2つのデータセットを1つのデータセットにjoinします。両方のデータセットの要素は、以下を使って指定することができる1つ以上のキーにjoinされます。
以下で示されるようにJoin変換を行う2,3の異なる方法があります。
デフォルトのJoin変換は二つのフィールドを持つ新しいタプルデータセットを生成します。各タプルは、最初のタプルフィールド内の最初の入力データセットのjoinされた要素と、2つ目のフィールド内の2つ目の入力データセットの合致する要素を保持します。
以下のコードはフィールド位置キーを使ったデフォルトのJoin変換を示します:
public static class User { public String name; public int zip; }
public static class Store { public Manager mgr; public int zip; }
DataSet<User> input1 = // [...]
DataSet<Store> input2 = // [...]
// result dataset is typed as Tuple2
DataSet<Tuple2<User, Store>>
result = input1.join(input2)
.where("zip") // key of the first input (users)
.equalTo("zip"); // key of the second input (stores)
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Double, Int)] = // [...]
val result = input1.join(input2).where(0).equalTo(1)
result = input1.join(input2).where(0).equal_to(1)
Join変換はjoinするタプルを処理するためにユーザ定義のjoin関数を呼ぶこともできます。join関数は最初の入力データセットの1つの要素と2つ目の入力データセットの1つの要素を受け取り、正確に1つの要素を返します。
以下のコードはキー選択関数を使って独自のjavaオブジェクトとタプルデータセットを持つデータセットのjoinを実施し、ユーザ定義のjoin関数を使う方法を示します:
// some POJO
public class Rating {
public String name;
public String category;
public int points;
}
// Join function that joins a custom POJO with a Tuple
public class PointWeighter
implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
@Override
public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
// multiply the points and rating and construct a new output tuple
return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
}
}
DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Double>> weights = // [...]
DataSet<Tuple2<String, Double>>
weightedRatings =
ratings.join(weights)
// key of the first input
.where("category")
// key of the second input
.equalTo("f0")
// applying the JoinFunction on joining pairs
.with(new PointWeighter());
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
(rating, weight) => (rating.name, rating.points * weight._2)
}
class PointWeighter(JoinFunction):
def join(self, rating, weight):
return (rating[0], rating[1] * weight[1])
if value1[3]:
weightedRatings =
ratings.join(weights).where(0).equal_to(0). \
with(new PointWeighter());
MapとFlatMapに似て、FlatJoinはJoinとして同じ方法で振る舞いますが、1つの要素を返す代わりに、0, 1 以上の要素を返す(集める)ことができます。
public class PointWeighter
implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
@Override
public void join(Rating rating, Tuple2<String, Double> weight,
Collector<Tuple2<String, Double>> out) {
if (weight.f1 > 0.1) {
out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
}
}
}
DataSet<Tuple2<String, Double>>
weightedRatings =
ratings.join(weights) // [...]
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
(rating, weight, out: Collector[(String, Double)]) =>
if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
}
Not supported.
Join変換はここで示すように射影を使って結果のタプルを構築することができます:
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>
result =
input1.join(input2)
// key definition on first DataSet using a field position key
.where(0)
// key definition of second DataSet using a field position key
.equalTo(0)
// select and reorder fields of matching tuples
.projectFirst(0,2).projectSecond(1).projectFirst(1);
projectFirst(int...)
and projectSecond(int...)
select the fields of the first and second joined input that should be assembled into an output Tuple. インデックスの順番は出力タプル内でのフィールドの順番を定義します。join射影は非タプルデータセットにも動作します。この場合、projectFirst()
あるいは projectSecond()
はjoinされた要素を出力タプルに追加するための引数無しで呼ばれなければなりません。
Not supported.
result = input1.join(input2).where(0).equal_to(0) \
.project_first(0,2).project_second(1).project_first(1);
project_first(int...)
and project_second(int...)
select the fields of the first and second joined input that should be assembled into an output Tuple. インデックスの順番は出力タプル内でのフィールドの順番を定義します。join射影は非タプルデータセットにも動作します。この場合、project_first()
あるいは project_second()
はjoinされた要素を出力タプルに追加するための引数無しで呼ばれなければなりません。
オプティマイザが正しい実行戦略を選ぶように導くために、ここで示すようにjoinにデータセットのサイズをほのめかすことができます:
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result1 =
// hint that the second DataSet is very small
input1.joinWithTiny(input2)
.where(0)
.equalTo(0);
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result2 =
// hint that the second DataSet is very large
input1.joinWithHuge(input2)
.where(0)
.equalTo(0);
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]
// hint that the second DataSet is very small
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
// hint that the second DataSet is very large
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
#hint that the second DataSet is very small
result1 = input1.join_with_tiny(input2).where(0).equal_to(0)
#hint that the second DataSet is very large
result1 = input1.join_with_huge(input2).where(0).equal_to(0)
Flinkのランタイムは様々な方法でjoinを実行することができます。それぞれの可能な方法は、異なる状況で他に勝っています。システムは適切な方法を自動的に選ぼうとしますが、joinを実行する特定の方法を強制したい場合は、手動で戦略を選ぶことができます。
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]
// hint that the second DataSet is very small
val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
Not supported.
以下のヒントが利用可能です:
OPTIMIZER_CHOOSES
: ヒントを全く与えないのと等価。選択をシステムに任せます。
BROADCAST_HASH_FIRST
: 最初の入力をブロードキャストし、それからハッシュテーブルを構築します。これは2つ目の入力によって走査されます。最初の入力がとても小さい場合には良い戦略です。
BROADCAST_HASH_SECOND
: 2つ目の入力をブロードキャストし、それからハッシュテーブルを構築します。これは最初の入力によって走査されます。2つ目の入力がとても小さい場合には良い戦略です。
REPARTITION_HASH_FIRST
: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、最初の入力からハッシュテーブルを構築します。最初の入力が2つ目よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。注意: これはサイズの予測が行えず、既存のパーティションとソート順が再利用できない場合にシステムが使用するデフォルトの予備の戦略です。
REPARTITION_HASH_SECOND
: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、2つ目の入力からハッシュテーブルを構築します。2つ目の入力が最初よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。
REPARTITION_SORT_MERGE
: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、(まだソートされていない場合は)各入力をソートします。入力はソートされた入力のストリーム マージによってjoinされます。1つあるいは両方の入力が既にソートされている場合に良い戦略です。
OuterJoin変換は2つのデータセットに left, right, あるいは full outer join を行います。Outer joins は通常(inner)のjoinに似ていて、キーが等しい要素の全てのペアを生成します。更に、“outer” 側 (left, right, あるいはfullの場合は両方)はもし他方に一致するキーが見つからない場合でも残されます。要素のペアから1つの要素に変えるために、要素の揃いのペア(あるいは1つの要素と、他の入力のためのnull
)がJoinFunction
に渡されます。あるいは、要素のペアから任意の多数(無しも含む)の要素に変えるためにFlatJoinFunction
に渡されます。
両方のデータセットの要素は、以下を使って指定することができる1つ以上のキーにjoinされます。
OuterJoins はJavaおよびScalaのデータセットAPIにのみサポートされます。
OuterJoin変換はjoinするタプルを処理するためにユーザ定義のjoin関数を呼びます。join関数は最初の入力データセットの1つの要素と2つ目の入力データセットの1つの要素を受け取り、正確に1つの要素を返します。outer join (left, right, full) の型に応じて、join関数の2つの入力要素のうちの1つがnull
になりえます。
以下のコードはキー選択関数を使って独自のjavaオブジェクトとタプルデータセットを持つデータセットのleft outer joinを実施し、ユーザ定義のjoin関数を使う方法を示します:
// some POJO
public class Rating {
public String name;
public String category;
public int points;
}
// Join function that joins a custom POJO with a Tuple
public class PointAssigner
implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating) {
// Assigns the rating points to the movie.
// NOTE: rating might be null
return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points;
}
}
DataSet<Tuple2<String, String>> movies = // [...]
DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Integer>>
moviesWithPoints =
movies.leftOuterJoin(ratings)
// key of the first input
.where("f0")
// key of the second input
.equalTo("name")
// applying the JoinFunction on joining pairs
.with(new PointAssigner());
case class Rating(name: String, category: String, points: Int)
val movies: DataSet[(String, String)] = // [...]
val ratings: DataSet[Ratings] = // [...]
val moviesWithPoints = movies.leftOuterJoin(ratings).where(0).equalTo("name") {
(movie, rating) => (movie._1, if (rating == null) -1 else rating.points)
}
Not supported.
MapとFlatMapに似て、flat-join関数を使ったOuterJoinはJoin関数を使ったOuterJoinとして同じ方法で振る舞いますが、1つの要素を返す代わりに、0, 1 以上の要素を返す(集める)ことができます。
public class PointAssigner
implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
@Override
public void join(Tuple2<String, String> movie, Rating rating
Collector<Tuple2<String, Integer>> out) {
if (rating == null ) {
out.collect(new Tuple2<String, Integer>(movie.f0, -1));
} else if (rating.points < 10) {
out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
} else {
// do not emit
}
}
DataSet<Tuple2<String, Integer>>
moviesWithPoints =
movies.leftOuterJoin(ratings) // [...]
Not supported.
Not supported.
Flinkのランタイムは様々な方法でouter joinを実行することができます。それぞれの可能な方法は、異なる状況で他に勝っています。システムは適切な方法を自動的に選ぼうとしますが、outer joinを実行する特定の方法を強制したい場合は、手動で戦略を選ぶことができます。
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result1 =
input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
DataSet<Tuple2<SomeType, AnotherType> result2 =
input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]
// hint that the second DataSet is very small
val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")
val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
Not supported.
以下のヒントが利用可能です。
OPTIMIZER_CHOOSES
: ヒントを全く与えないのと等価。選択をシステムに任せます。
BROADCAST_HASH_FIRST
: 最初の入力をブロードキャストし、それからハッシュテーブルを構築します。これは2つ目の入力によって走査されます。最初の入力がとても小さい場合には良い戦略です。
BROADCAST_HASH_SECOND
: 2つ目の入力をブロードキャストし、それからハッシュテーブルを構築します。これは最初の入力によって走査されます。2つ目の入力がとても小さい場合には良い戦略です。
REPARTITION_HASH_FIRST
: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、最初の入力からハッシュテーブルを構築します。最初の入力が2つ目よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。
REPARTITION_HASH_SECOND
: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、2つ目の入力からハッシュテーブルを構築します。2つ目の入力が最初よりも小さいが、両方の入力がまだ大きい場合に良い戦略です。
REPARTITION_SORT_MERGE
: システムは(入力がまだパーティション化されていない場合は)各入力をパーティション(シャッフル)し、(まだソートされていない場合は)各入力をソートします。入力はソートされた入力のストリーム マージによってjoinされます。1つあるいは両方の入力が既にソートされている場合に良い戦略です。
注意: 各 outer join型によってまだ全ての実行戦略がサポートされているわけではありません。
LeftOuterJoin
は以下をサポートします:
OPTIMIZER_CHOOSES
BROADCAST_HASH_SECOND
REPARTITION_HASH_SECOND
REPARTITION_SORT_MERGE
RightOuterJoin
は以下をサポートします:
OPTIMIZER_CHOOSES
BROADCAST_HASH_FIRST
REPARTITION_HASH_FIRST
REPARTITION_SORT_MERGE
FullOuterJoin
は以下をサポートします:
OPTIMIZER_CHOOSES
REPARTITION_SORT_MERGE
Cross変換は2つのデータセットを1つのデータセットに結合します。それは両方の入力データセットの要素の全てのペアの組み合わせを構築します。つまり、デカルト積を構築します。Cross変換は、要素の各ペア上でユーザ定義のcross関数を呼ぶか、Tuple2を出力するかをします。以下では両方のモードを示します。
注意: Crossは大規模計算クラスタにさえ挑戦する潜在的にとてもコンピュータに向いている操作です。
Cross 変換はユーザ定義のcross関数を呼ぶことができます。cross 関数は最初の入力の1つの要素と2つ目の入力の1つの要素を受け取り、正確に1つの結果要素を返します。
以下のコードはcross関数を使って2つのデータセットにCross変換を適用する方法を示します:
public class Coord {
public int id;
public int x;
public int y;
}
// CrossFunction computes the Euclidean distance between two Coord objects.
public class EuclideanDistComputer
implements CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
// compute Euclidean distance of coordinates
double dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2));
return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
}
}
DataSet<Coord> coords1 = // [...]
DataSet<Coord> coords2 = // [...]
DataSet<Tuple3<Integer, Integer, Double>>
distances =
coords1.cross(coords2)
// apply CrossFunction
.with(new EuclideanDistComputer());
Cross 変換はここで示す投影を使って結果タプルを構築することもできます:
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, Byte, Integer, Double>
result =
input1.cross(input2)
// select and reorder fields of matching tuples
.projectSecond(0).projectFirst(1,0).projectSecond(1);
Cross投影でのフィールド選択はJoin結果の投影と同じ方法で動作します。
case class Coord(id: Int, x: Int, y: Int)
val coords1: DataSet[Coord] = // [...]
val coords2: DataSet[Coord] = // [...]
val distances = coords1.cross(coords2) {
(c1, c2) =>
val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
(c1.id, c2.id, dist)
}
class Euclid(CrossFunction):
def cross(self, c1, c2):
return (c1[0], c2[0], sqrt(pow(c1[1] - c2.[1], 2) + pow(c1[2] - c2[2], 2)))
distances = coords1.cross(coords2).using(Euclid())
Cross 変換はここで示す投影を使って結果タプルを構築することもできます:
result = input1.cross(input2).projectFirst(1,0).projectSecond(0,1);
Cross投影でのフィールド選択はJoin結果の投影と同じ方法で動作します。
オプティマイザが正しい実行戦略を選ぶように導くために、ここで示すようにcrossにデータセットのサイズをほのめかすことができます:
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple4<Integer, String, Integer, String>>
udfResult =
// hint that the second DataSet is very small
input1.crossWithTiny(input2)
// apply any Cross function (or projection)
.with(new MyCrosser());
DataSet<Tuple3<Integer, Integer, String>>
projectResult =
// hint that the second DataSet is very large
input1.crossWithHuge(input2)
// apply a projection (or any Cross function)
.projectFirst(0,1).projectSecond(1);
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]
// hint that the second DataSet is very small
val result1 = input1.crossWithTiny(input2)
// hint that the second DataSet is very large
val result1 = input1.crossWithHuge(input2)
#hint that the second DataSet is very small
result1 = input1.cross_with_tiny(input2)
#hint that the second DataSet is very large
result1 = input1.cross_with_huge(input2)
CoGroup 変換は2つのデータセットのグループの両方を処理します。両方のデータセットは定義されたキーでグループ化され、同じキーを共有する両方のデータセットのグループは共にユーザ定義のco-group関数に渡されます。1つのデータセットだけがグループを持つ特定のキーについては、co-group関数は1つのグループと空のグループを使って呼ばれます。co-group 関数は両方のグループの要素上で個々に繰り返すことができ、結果要素の任意の数を返すことができます。
Reduce、GroupReduceおよびJoinに似て、キーは異なるキー選択メソッドを使って定義することができます。
例はフィールド位置キーでグループにする方法を示します (タプルデータセットのみ)。Pojo-型およびキー表現を使って同じことをすることができます。
// Some CoGroupFunction definition
class MyCoGrouper
implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
Iterable<Tuple2<String, Double>> dVals,
Collector<Double> out) {
Set<Integer> ints = new HashSet<Integer>();
// add all Integer values in group to set
for (Tuple2<String, Integer>> val : iVals) {
ints.add(val.f1);
}
// multiply each Double value with each unique Integer values of group
for (Tuple2<String, Double> val : dVals) {
for (Integer i : ints) {
out.collect(val.f1 * i);
}
}
}
}
// [...]
DataSet<Tuple2<String, Integer>> iVals = // [...]
DataSet<Tuple2<String, Double>> dVals = // [...]
DataSet<Double> output = iVals.coGroup(dVals)
// group first DataSet on first tuple field
.where(0)
// group second DataSet on first tuple field
.equalTo(0)
// apply CoGroup function on each pair of groups
.with(new MyCoGrouper());
val iVals: DataSet[(String, Int)] = // [...]
val dVals: DataSet[(String, Double)] = // [...]
val output = iVals.coGroup(dVals).where(0).equalTo(0) {
(iVals, dVals, out: Collector[Double]) =>
val ints = iVals map { _._2 } toSet
for (dVal <- dVals) {
for (i <- ints) {
out.collect(dVal._2 * i)
}
}
}
class CoGroup(CoGroupFunction):
def co_group(self, ivals, dvals, collector):
ints = dict()
# add all Integer values in group to set
for value in ivals:
ints[value[1]] = 1
# multiply each Double value with each unique Integer values of group
for value in dvals:
for i in ints.keys():
collector.collect(value[1] * i)
output = ivals.co_group(dvals).where(0).equal_to(0).using(CoGroup())
2つのデータセットの結合を生成します。データセットは同じ型でなければなりません。以下で示されるように、2つ以上のデータセットの結合を複数のunion呼び出しを使って実装することができます:
DataSet<Tuple2<String, Integer>> vals1 = // [...]
DataSet<Tuple2<String, Integer>> vals2 = // [...]
DataSet<Tuple2<String, Integer>> vals3 = // [...]
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2).union(vals3);
val vals1: DataSet[(String, Int)] = // [...]
val vals2: DataSet[(String, Int)] = // [...]
val vals3: DataSet[(String, Int)] = // [...]
val unioned = vals1.union(vals2).union(vals3)
unioned = vals1.union(vals2).union(vals3)
データの歪を除去するために一様にデータセットの並行パーティションをリバランスします。
DataSet<String> in = // [...]
// rebalance DataSet and apply a Map transformation.
DataSet<Tuple2<String, String>> out = in.rebalance()
.map(new Mapper());
val in: DataSet[String] = // [...]
// rebalance DataSet and apply a Map transformation.
val out = in.rebalance().map { ... }
Not supported.
指定されたキーのデータセットをハッシュ-パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません (キーを指定する方法についてはReduce examplesを見てください)。
DataSet<Tuple2<String, Integer>> in = // [...]
// hash-partition DataSet by String value and apply a MapPartition transformation.
DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
.mapPartition(new PartitionMapper());
val in: DataSet[(String, Int)] = // [...]
// hash-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByHash(0).mapPartition { ... }
Not supported.
指定されたキーのデータセットをレンジ-パーティションします。キーはパーティションキー、表現キー、およびキー選択関数として指定されるかも知れません (キーを指定する方法についてはReduce examplesを見てください)。
DataSet<Tuple2<String, Integer>> in = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
.mapPartition(new PartitionMapper());
val in: DataSet[(String, Int)] = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByRange(0).mapPartition { ... }
Not supported.
指定されたフィールド上で指定された順番でデータセットの全てのパーティションをローカルでソートする。フィールドはフィールド表現あるいはフィールド位置として指定することができます (キーを指定する方法はReduce examples を見てください)。パーティションは sortPartition()
呼び出しを繋げることで複数のフィールドでソートすることができます。
DataSet<Tuple2<String, Integer>> in = // [...]
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
.sortPartition(0, Order.DESCENDING)
.mapPartition(new PartitionMapper());
val in: DataSet[(String, Int)] = // [...]
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
val out = in.sortPartition(1, Order.ASCENDING)
.sortPartition(0, Order.DESCENDING)
.mapPartition { ... }
Not supported.
データセットの最初のn(任意)個の要素を返します。First-n は通常のデータセット、グループ化されたデータセットあるいはグループ化ソート化されたデータセット上に適用することができます。グループ化キーはキー選択関数あるいはフィールド位置キーとして指定することができます (キーを指定する方法はReduce examples を見てください)。
DataSet<Tuple2<String, Integer>> in = // [...]
// Return the first five (arbitrary) elements of the DataSet
DataSet<Tuple2<String, Integer>> out1 = in.first(5);
// Return the first two (arbitrary) elements of each String group
DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
.first(2);
// Return the first three elements of each String group ordered by the Integer field
DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.first(3);
val in: DataSet[(String, Int)] = // [...]
// Return the first five (arbitrary) elements of the DataSet
val out1 = in.first(5)
// Return the first two (arbitrary) elements of each String group
val out2 = in.groupBy(0).first(2)
// Return the first three elements of each String group ordered by the Integer field
val out3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
Not supported.