This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Hadoop
Hadoop形式 #
プロジェクトの設定 #
Hadoopサポートは、flink-hadoop-compatibility
に含まれます。
hadoopを使うには、次の依存関係をpom.xml
に追加します。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.19-SNAPSHOT</version>
</dependency>
Flinkアプリケーションをローカルで(例えばIDEから)実行したい場合、次のようにhadoop-client
依存関係も追加する必要があります:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.2</version>
<scope>provided</scope>
</dependency>
Hadoop InputFormats の使用 #
FlinkでHadoopInputFormats
を使うには、まずHadoopInputs
ユーティリティクラスのreadHadoopFile
またはcreateHadoopInput
を使ってフォーマットをラップする必要があります。
前者はFileInputFormat
から派生した入力形式のために使われますが、後者は汎用の入力形式のために使われる必要があります。
結果のInputFormat
はExecutionEnvironment#createInput
を使ってデータソースを作成するために使えます。
結果のDataStream
には2つのタプルが含まれており、最初フィールドはキーで、2番目のフィールドはHadoop InputFormatから取得した値です。
以下の例はHadoopのTextInputFormat
を使う方法を示しています。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KeyValueTextInputFormat textInputFormat = new KeyValueTextInputFormat();
DataStream<Tuple2<Text, Text>> input = env.createInput(HadoopInputs.readHadoopFile(
textInputFormat, Text.class, Text.class, textPath));
// Do something with the data.
[...]
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textInputFormat = new KeyValueTextInputFormat
val input: DataStream[(Text, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
textInputFormat, classOf[Text], classOf[Text], textPath))
// Do something with the data.
[...]