あるアルゴリズム内で、ユニークな識別子をデータセット要素に割り当てる必要があるかもしれません。このドキュメントはそのような目的のためにどのようにDataSetUtilsを使うことができるかを示します。
zipWithIndex
は連続するラベルを要素に割り当て、入力としてデータセットを受け取り(unique id, initial value)
2-タプルの新しいデータセットを返します。この処理は二つのパスを必要とします。まず数え上げ、それから要素をラベル付けします。カウントの動機のためにパイプライン化することはできません。新しいzipWithUniqueId
はパイプラインの形式で動作し、ユニークなラベル付けで十分な場合に好まれます。例えば、以下のコード:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
result.writeAsCsv(resultPath, "\n", ",");
env.execute();
import org.apache.flink.api.scala._
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
val result: DataSet[(Long, String)] = input.zipWithIndex
result.writeAsCsv(resultPath, "\n", ",")
env.execute()
from flink.plan.Environment import get_environment
env = get_environment()
env.set_parallelism(2)
input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
result = input.zipWithIndex()
result.write_text(result_path)
env.execute()
タプルを生成することができます: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
多くの場合において、連続するラベルを付ける必要は無いかもしれません。zipWithUniqueId
はパイプラインの形式で動作し、ラベルの割り当て処理を高速化します。このメソッドは入力としてデータセットを受け取り、(unique id, initial value)
2-タプルの新しいデータセットを返します。例えば、以下のコード:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
result.writeAsCsv(resultPath, "\n", ",");
env.execute();
import org.apache.flink.api.scala._
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
val result: DataSet[(Long, String)] = input.zipWithUniqueId
result.writeAsCsv(resultPath, "\n", ",")
env.execute()
タプルを生成することができます: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)