FlinkはApache Hadoop MapReduce インタフェースと互換性があり、従って Hadoop MapReduceのために実装されたコードを再利用することができます。
以下のことができます:
Writable
データ型 を使うInputFormat
をデータソースとして使うOutputFormat
を DataSinkとして使う。Mapper
をFlatMapFunctionとして使う。Reducer
を GroupReduceFunctionとして使う。このドキュメントは既存のHadoop MapReduceをFlinkと一緒にどうやって使うかを示します。Hadoopがサポートするファイルシステムからの読み込みについては 他のシステムへの接続 を参照してください。
Haddop のinput/output 形式は、Flinkのジョブを書くときに常に必要になるflink-java
と flink-scala
のMaven モジュールの一部です。このコードはmapred
と mapreduce
APIのための追加のサブパッケージ内のorg.apache.flink.api.java.hadoop
と org.apache.flink.api.scala.hadoop
にあります。
Hadoop Mappers と Reducers のサポートはflink-hadoop-compatibility
Maven モジュール内にあります。このコードは org.apache.flink.hadoopcompatibility
パッケージ内にあります。
MapperとReducerを再利用したい場合は、以下の依存をpom.xml
に追加します。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
Flink はそのままで全てのHadoopの Writable
と WritableComparable
データ型をサポートします。Hadoopデータ型を使いたいだけの場合は、Hadoop 互換性の依存を含める必要はありません。詳細はプログラミング ガイドを見てください。
Hadoop 入力形式はExecutionEnvironment
のreadHadoopFile
あるいは createHadoopInput
メソッドのうちの一つを使うことによってデータソースを生成するために使うことができます。形式はFileInputFormat
から派生した入力形式のために使われますが、後者は一般的な目的の入力形式のために使われるべきです。
結果のDataSet
は2つのタプルを含み、最初のフィールドはキーで二つ目のフィールドはHadoopから派生した値です。
以下の例は HadoopのTextInputFormat
を使う方法を示します。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input =
env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);
// Do something with the data.
[...]
val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(LongWritable, Text)] =
env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
// Do something with the data.
[...]
Flink はHadoopのOutputFormats
のための互換性のあるラッパーを提供します。org.apache.hadoop.mapred.OutputFormat
を実装、あるいはorg.apache.hadoop.mapreduce.OutputFormat
を継承したどのようなクラスもサポートされます。OutputFormat ラッパーは入力データがキーと値の二つのタプルを含むデータセットであることを期待します。これらはHadoop OutputFormatによって処理されます。
以下の例はHadoopの TextOutputFormat
を使う方法を示します。
// Obtain the result we want to emit
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
// create the Flink wrapper.
new HadoopOutputFormat<Text, IntWritable>(
// set the Hadoop OutputFormat and specify the job.
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
hadoopResult.output(hadoopOF);
// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]
val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
new TextOutputFormat[Text, IntWritable],
new JobConf)
hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
hadoopResult.output(hadoopOF)
HadoopのMappers は意味論的にFlinkのFlatMapFunctionsと等価で、Hadoopの Reducers は Flinkの GroupReduceFunctionsに等価です。Flinkは Hadoopの MapReduceの Mapper
と Reducer
インタフェースの実装のためのラッパーを提供します。つまり、HadoopのMappersとReducersをFlinkプログラムの中で使うことができます。今のところは、Mapper と Reduce インタフェースのmapred API (org.apache.hadoop.mapred
)のみがサポートされます。
ラッパーは入力としてDataSet<Tuple2<KEYIN,VALUEIN>>
を取り、出力としてDataSet<Tuple2<KEYOUT,VALUEOUT>>
を生成します。この時、KEYIN
とKEYOUT
はHadoop関数によって処理されるHadoopキー-バリューペアのキーであり、VALUEIN
とVALUEOUT
は値です。reducerに関しては、Flinkはコンバイナ有り(HadoopReduceCombineFunction
) とコンバイナ無し(HadoopReduceFunction
) のGroupReduceFunctionのラッパーを提供します。ラッパーはHadoopのMapperあるいはReducerを設定するためにオプションのJobConf
オブジェクトを受け取ります。
Flinkの関数ラッパーは、
org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
,org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction
, andorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction
.また、通常のFlinkの FlatMapFunctions あるいは GroupReduceFunctionsとして使うことができます。
以下の例はHadoopのMapper
と Reducer
関数を使う方法を示します。
// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
注意してください: Reducer ラッパーはFlinkの groupBy() オペレーションによって定義されたグループ上で動作します。それは、JobConf
の中でどのような独自のパーティション、ソート、あるいはグルーピング コンパレータを設定したとしても考慮しません。
以下の例は、Hadoopデータ型、入力および出力フォーマット、そしてMapperおよびReducer実装を使った、完全なWordCount実装を示します。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);
// Execute Program
env.execute("Hadoop WordCount");