Hadoop 互換性 Beta

FlinkはApache Hadoop MapReduce インタフェースと互換性があり、従って Hadoop MapReduceのために実装されたコードを再利用することができます。

以下のことができます:

このドキュメントは既存のHadoop MapReduceをFlinkと一緒にどうやって使うかを示します。Please refer to the Connecting to other systems guide for reading from Hadoop supported file systems.

プロジェクトの設定

Haddop のinput/output 形式は、Flinkのジョブを書くときに常に必要になるflink-javaflink-scala のMaven モジュールの一部です。このコードはmapredmapreduce APIのための追加のサブパッケージ内のorg.apache.flink.api.java.hadooporg.apache.flink.api.scala.hadoop にあります。

Hadoop Mappers と Reducers のサポートはflink-hadoop-compatibility Maven モジュール内にあります。このコードは org.apache.flink.hadoopcompatibility パッケージ内にあります。

MapperとReducerを再利用したい場合は、以下の依存をpom.xml に追加します。

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-hadoop-compatibility_2.10</artifactId>
	<version>1.3-SNAPSHOT</version>
</dependency>

Hadoop データ型の使用

Flink はそのままで全てのHadoopの WritableWritableComparable データ型をサポートします。Hadoopデータ型を使いたいだけの場合は、Hadoop 互換性の依存を含める必要はありません。詳細はプログラミング ガイドを見てください。

Hadoop InputFormats の使用

Hadoop 入力形式はExecutionEnvironmentreadHadoopFile あるいは createHadoopInput メソッドのうちの一つを使うことによってデータソースを生成するために使うことができます。形式はFileInputFormatから派生した入力形式のために使われますが、後者は一般的な目的の入力形式のために使われるべきです。

結果のDataSet は2つのタプルを含み、最初のフィールドはキーで二つ目のフィールドはHadoopから派生した値です。

以下の例は HadoopのTextInputFormatを使う方法を示します。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<LongWritable, Text>> input =
    env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);

// Do something with the data.
[...]
val env = ExecutionEnvironment.getExecutionEnvironment

val input: DataSet[(LongWritable, Text)] =
  env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)

// Do something with the data.
[...]

Hadoop OutputFormats の使用

Flink はHadoopのOutputFormatsのための互換性のあるラッパーを提供します。org.apache.hadoop.mapred.OutputFormatを実装、あるいはorg.apache.hadoop.mapreduce.OutputFormatを継承したどのようなクラスもサポートされます。OutputFormat ラッパーは入力データがキーと値の二つのタプルを含むデータセットであることを期待します。これらはHadoop OutputFormatによって処理されます。

以下の例はHadoopの TextOutputFormatを使う方法を示します。

// Obtain the result we want to emit
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
  // create the Flink wrapper.
  new HadoopOutputFormat<Text, IntWritable>(
    // set the Hadoop OutputFormat and specify the job.
    new TextOutputFormat<Text, IntWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
hadoopResult.output(hadoopOF);
// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  new TextOutputFormat[Text, IntWritable],
  new JobConf)

hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))

hadoopResult.output(hadoopOF)

Hadoop Mappers および Reducersの使用

HadoopのMappers は意味論的にFlinkのFlatMapFunctionsと等価で、Hadoopの Reducers は Flinkの GroupReduceFunctionsに等価です。Flinkは Hadoopの MapReduceの MapperReducer インタフェースの実装のためのラッパーを提供します。つまり、HadoopのMappersとReducersをFlinkプログラムの中で使うことができます。今のところは、Mapper と Reduce インタフェースのmapred API (org.apache.hadoop.mapred)のみがサポートされます。

ラッパーは入力としてDataSet<Tuple2<KEYIN,VALUEIN>> を取り、出力としてDataSet<Tuple2<KEYOUT,VALUEOUT>> を生成します。この時、KEYINKEYOUT はHadoop関数によって処理されるHadoopキー-バリューペアのキーであり、VALUEINVALUEOUT は値です。For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction) and without a Combiner (HadoopReduceFunction). ラッパーはHadoopのMapperあるいはReducerを設定するためにオプションのJobConfオブジェクトを受け取ります。

Flinkの関数ラッパーは、

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, and
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.

また、通常のFlinkの FlatMapFunctions あるいは GroupReduceFunctionsとして使うことができます。

以下の例はHadoopのMapperReducer 関数を使う方法を示します。

// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

注意してください: Reducer ラッパーはFlinkの groupBy() オペレーションによって定義されたグループ上で動作します。それは、JobConfの中でどのような独自のパーティション、ソート、あるいはグルーピング コンパレータを設定したとしても考慮しません。

完全なHadoop WordCountの例

以下の例は、Hadoopデータ型、入力および出力フォーマット、そしてMapperおよびReducer実装を使った、完全なWordCount実装を示します。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
  new HadoopInputFormat<LongWritable, Text>(
    new TextInputFormat(), LongWritable.class, Text.class, job
  );
TextInputFormat.addInputPath(job, new Path(inputPath));

// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
  new HadoopOutputFormat<Text, IntWritable>(
    new TextOutputFormat<Text, IntWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);

// Execute Program
env.execute("Hadoop WordCount");
TOP
inserted by FC2 system