データセット内の要素のジッピング

あるアルゴリズム内で、ユニークな識別子をデータセット要素に割り当てる必要があるかもしれません。このドキュメントはそのような目的のためにどのようにDataSetUtilsを使うことができるかを示します。

Denseインデックスを使ったZip

zipWithIndex は連続するラベルを要素に割り当て、入力としてデータセットを受け取り(unique id, initial value) 2-タプルの新しいデータセットを返します。この処理は二つのパスを必要とします。まず数え上げ、それから要素をラベル付けします。カウントの動機のためにパイプライン化することはできません。新しいzipWithUniqueId はパイプラインの形式で動作し、ユニークなラベル付けで十分な場合に好まれます。例えば、以下のコード:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");

DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);

result.writeAsCsv(resultPath, "\n", ",");
env.execute();
import org.apache.flink.api.scala._

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")

val result: DataSet[(Long, String)] = input.zipWithIndex

result.writeAsCsv(resultPath, "\n", ",")
env.execute()
from flink.plan.Environment import get_environment

env = get_environment()
env.set_parallelism(2)
input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")

result = input.zipWithIndex()

result.write_text(result_path)
env.execute()

タプルを生成することができます: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)

上に戻る

ユニークな識別子を使ったZip

多くの場合において、連続するラベルを付ける必要は無いかもしれません。zipWithUniqueId はパイプラインの形式で動作し、ラベルの割り当て処理を高速化します。このメソッドは入力としてデータセットを受け取り、(unique id, initial value) 2-タプルの新しいデータセットを返します。例えば、以下のコード:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");

DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);

result.writeAsCsv(resultPath, "\n", ",");
env.execute();
import org.apache.flink.api.scala._

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")

val result: DataSet[(Long, String)] = input.zipWithUniqueId

result.writeAsCsv(resultPath, "\n", ",")
env.execute()

タプルを生成することができます: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)

上に戻る

TOP
inserted by FC2 system