Hadoop
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Hadoop形式 #

プロジェクトの設定 #

Hadoopサポートは、flink-hadoop-compatibilityに含まれます。

hadoopを使うには、次の依存関係をpom.xmlに追加します。

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-hadoop-compatibility_2.12</artifactId>
	<version>1.19-SNAPSHOT</version>
</dependency>

Flinkアプリケーションをローカルで(例えばIDEから)実行したい場合、次のようにhadoop-client依存関係も追加する必要があります:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.10.2</version>
    <scope>provided</scope>
</dependency>

Hadoop InputFormats の使用 #

FlinkでHadoopInputFormatsを使うには、まずHadoopInputsユーティリティクラスのreadHadoopFileまたはcreateHadoopInputを使ってフォーマットをラップする必要があります。 前者はFileInputFormatから派生した入力形式のために使われますが、後者は汎用の入力形式のために使われる必要があります。 結果のInputFormatExecutionEnvironment#createInputを使ってデータソースを作成するために使えます。

結果のDataStreamには2つのタプルが含まれており、最初フィールドはキーで、2番目のフィールドはHadoop InputFormatから取得した値です。

以下の例はHadoopのTextInputFormatを使う方法を示しています。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KeyValueTextInputFormat textInputFormat = new KeyValueTextInputFormat();

DataStream<Tuple2<Text, Text>> input = env.createInput(HadoopInputs.readHadoopFile(
  textInputFormat, Text.class, Text.class, textPath));

// Do something with the data.
[...]
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textInputFormat = new KeyValueTextInputFormat
val input: DataStream[(Text, Text)] =
  env.createInput(HadoopInputs.readHadoopFile(
    textInputFormat, classOf[Text], classOf[Text], textPath))

// Do something with the data.
[...]

Back to top

inserted by FC2 system