FileSystem
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

FileSystem #

このコネクタはBATCHSTREAMINGに統合されたソースとシンクを提供し、Flink FileSystem abstractionによってサポートされるファイルシステムに対して、(パーティション化された)読み込みまたは書き込みを提供します。このファイルシステムコネクタは、BATCHSTREAMINGの両方に同じ保証を提供し、STREAMING 実行に確実に1回のセマンティクスを提供するように設計されています。

コネクタは、format(例えば、Avro、CSV、Parquet)を持つ、任意の(分散)ファイルシステム(例えば、POSIX、S3、HDFS)からの一連のファイルの読み書きを提供し、ストリームまたはレコードを生成します。

ファイルソース #

The File Source is based on the Source API, a unified data source that reads files - both in batch and in streaming mode. これは、SplitEnumeratorSourceReader の2つの部分に分かれています。

  • SplitEnumerator は、読み取るファイルを検出して識別し、それらを SourceReader に割り当てる責任があります。
  • SourceReader は処理する必要があるファイルを要求し、ファイルシステムからファイルを読み取ります。

ファイルソースとformat を組み合わせる必要があります。これにより、CSVの解析、AVROの復号化、Parquetカラム型ファイルを読み取れます。

制限付きストリームと制限なしストリーム #

制限付きFile Sourceは、(SplitEnumerator - フィルタリングされた隠しファイルを含む再帰ディレクトリリストを介して)全てのファイルをリストし、それらすべてを読み取ります。

定期的なファイル検出用に列挙子を設定した場合に、制限なしFile Sourceが作成されます。 この場合、SplitEnumerator は、制限付きの場合と同様に列挙しますが、一定の時間が過ぎると、列挙を繰り返します。 列挙が繰り返される場合、SplitEnumerator は以前に検出されたファイルを除外し、新しいファイルのみをSourceReaderに送信します。

使い方 #

以下のAPI呼び出しのいずれかを使って、ファイルソースの構築を開始できます:

// reads the contents of a file from a file stream. 
FileSource.forRecordStreamFormat(StreamFormat,Path...);
        
// reads batches of records from a file at a time
FileSource.forBulkFileFormat(BulkFormat,Path...);
# reads the contents of a file from a file stream.
FileSource.for_record_stream_format(stream_format, *path)

# reads batches of records from a file at a time
FileSource.for_bulk_file_format(bulk_format, *path)

これにより、ファイルソースの全てのプロパティを設定できるFileSource.FileSourceBuilderが作成されます。

制限付き/バッチの場合、ファイルソースは指定されたパス(s)の下の全てのファイルを処理します。 連続/ストリーミングの場合、ソースは定期的に新しいファイルのパスをチェックし、それらの読み込みを開始します。

(上記のいずれかの方法で作成されたFileSource.FileSourceBuilderを介して)ファイルソースの作成を開始すると、ソースはデフォルトで制限付き/バッチモードになります。the source is in bounded/batch mode by default. AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)を呼び出して、ソースを連続ストリーミングモードに設定できます。

final FileSource<String> source =
        FileSource.forRecordStreamFormat(...)
        .monitorContinuously(Duration.ofMillis(5))  
        .build();
source = FileSource.for_record_stream_format(...) \
    .monitor_continously(Duration.of_millis(5)) \
    .build()

フォーマットの種類 #

各ファイルの読み取りは、ファイル形式で定義されたファイルリーダーを通じて行われます。 これらはファイルの内容の解析ロジックを定義します。ソースがサポートするクラスは複数あります。 インタフェースは、実装の単純さと柔軟性/効率のトレードオフです。

  • StreamFormat は、ファイルストリームからのファイルの内容を読み取ります。これは最も簡単な形式であり、すぐに利用できる多くの機能(チェックポイントロジックなど)を提供しますが、適用できる最適化(オブジェクトの再利用、バッチなど)に制限があります。

  • BulkFormat はファイルから一度にレコードの固まりを読み取ります。 これは実装するのに最も"低レベル"な形式であり、実装を最適化するために最大の柔軟性を提供します。

TextLine Format #

StreamFormat リーダーは、ファイルのテキスト行をフォーマットします。 リーダーは Java の組み込み InputStreamReader を使って、サポートされる様々な文字セットエンコーディングを使って倍とストリームをでコードします。 このフォーマットは、チェックポイントからの最適化されたリカバリをサポートしません。リカバリ時に、最後のチェックポイントの前に処理された行数を再読み込みして破棄します。これは、ストリーム入力と文字セットデコーダの状態の内部バッファリングを使う文字セットデコードを通じて、ファイル内の行のオフセットを追跡できないという事実によるものです。

SimpleStreamFormat Abstract Class #

これは、分割不可能な形式用のStreamFormatの簡易バージョンです。 配列またはファイルの独自の読み取りは、SimpleStreamFormatを実装することで実行できます:

private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
    private static final long serialVersionUID = 1L;

    @Override
    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
            throws IOException {
        return new ArrayReader(stream);
    }

    @Override
    public TypeInformation<byte[]> getProducedType() {
        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    }
}

final FileSource<byte[]> source =
                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();

SimpleStreamFormat の例は CsvReaderFormat です。次のように初期化できます:

CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source = 
        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

この場合、CSV解析のスキーマは、Jackson ライブラリを使って、SomePojo クラスのフィールドに基づいて自動的に算出されます。(注意: CSVファイルの列のフィールドの順番と正確に一致するフィールド順序を使って、クラス定義に@JsonPropertyOrder({field1, field2, ...})アノテーションを追加する必要がある場合があります。

CSV スキーマまたは解析オプションをより詳細に制御する必要がある場合は、CsvReaderFormatのより低レベルのforSchema静的ファクトリーメソッドを使います:

CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory, 
                             Function<CsvMapper, CsvSchema> schemaGenerator, 
                             TypeInformation<T> typeInformation) 

Bulk フォーマット #

BulkFormat は、一度にレコードのバッチを読み取り、でコードします。bulk フォーマットの例は、ORCまたparquetのようなフォーマットです。 外側の BulkFormat クラスは、主にリーダーの設定フォルダおよびファクトリとして機能します。reader. 実際の読み取りは、BulkFormat#createReader(Configuration, FileSourceSplit) メソッドで作成されう BulkFormat.Readerにて行われます。bulk リーダーがチェックポイント付きストリーミング実行中にチェックポイントに基づいて作成された場合、BulkFormat#restoreReader(Configuration, FileSourceSplit) メソッドでリーダーが策作成されます。

SimpleStreamFormatは、StreamFormatAdapter でラップすることで、BulkFormatに変えることができます。

BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));

Customizing File Enumeration #

/**
 * A FileEnumerator implementation for hive source, which generates splits based on
 * HiveTablePartition.
 */
public class HiveSourceFileEnumerator implements FileEnumerator {
    
    // reference constructor
    public HiveSourceFileEnumerator(...) {
        ...
    }

    /***
     * Generates all file splits for the relevant files under the given paths. The {@code
     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
     * exploit parallelism properly.
     */
    @Override
    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
            throws IOException {
        // createInputSplits:splitting files into fragmented collections
        return new ArrayList<>(createInputSplits(...));
    }

    ...

    /***
     * A factory to create HiveSourceFileEnumerator.
     */
    public static class Provider implements FileEnumerator.Provider {

        ...
        @Override
        public FileEnumerator create() {
            return new HiveSourceFileEnumerator(...);
        }
    }
}
// use the customizing file enumeration
new HiveSource<>(
        ...,
        new HiveSourceFileEnumerator.Provider(
        partitions != null ?partitions : Collections.emptyList(),
        new JobConfWrapper(jobConf)),
       ...);

現在の制約 #

ウォーターマークは、ファイルのバックログが大きい場合にあまりうまく動作しません。これは、ファイル内でウォーターマークがどんどん進んでいくため、次のファイルにはウォーターマークよりも後のデータが含まれる可能性があるためです。

無制限のファイルソースの場合、enumerator は現在処理済みの全てのファイルのパスを記憶していますが、これは場合によってはかなり大きくなる可能性がある状態です。 将来的には、既に処理されたファイルを追跡する圧縮形式を追加する予定です(例えば、変更タイムスタンプを境界よりも下に保ちます)。

舞台裏 #

新しいデータソースAPI設計を通じてFileソースがどのように動作するかに興味がある場合は、この部分を参考として読んでください。新しいデータソースAPIについては、documentation on data sources と より詳しい説明については、FLIP-27を参照してください。

ファイル Sink #

ファイル真紅は、受信データをバケットに書き込みます。受信ストリームは制限が無いとした場合、各バケット内のデータは有限サイズの部分ファイルに編成されます。バケット化の挙動は、1時間んごとに新しいバケットの書き込みを開始するデフォルトの時間ベースのバケット化で完全に設定可能です。これは、結果として得られる各バケットには、ストリームから1時間間隔で受信したレコードを含むファイルが含まれることを意味します。

バケットディレクトリ内のデータは部分ファイルに分割されます。各バケットは、そのバケットがデータを受信したsinkのサブタスクごとに少なくとも1つの部分ファイルが含まれます。追加の部分ファイルは設定可能なローリングポリシーに従って作成されます。Row-encoded形式 (File Formatsを参照)の場合、デフォルトポリシーは、サイズ、ファイルを開くことができる最大期間を指定するタイムアウト、ファイルが閉じられるまでの最大非アクティブタイムアウトに基づいて部分ファイルをロールします。Bulk-encoded形式の場合、全てのチェックポイントでロールされ、ユーザはサイズや時間に基づいて追加の条件を指定できます。

重要: STREAMINGモードでFileSinkを使う場合は、チェックポイントを有効にする必要があります。部分ファイルはチェックポイントが成功した場合のみ完了できます。チェックポイントが無効の場合、部分ファイルは永久にin-progressまたはpending状態のままになり、下流のシステムで安全に読み取ることができなくなります。

フォーマットの種類 #

FileSinkは、行単位のエンコード形式とApache Parquetのようなバルクエンコーディング形式をサポートします。 これら2つの変種は以下の静的メソッドで作成されるそれぞれのビルダーが付属しています:

  • Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
  • Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

行またはバルクエンコードされたsinkを作成する場合は、バケットが保存されるベースパスとデータのエンコードロジックを指定する必要があります。

全ての設定オプションと様々なデータ形式の実装に関する詳細なドキュメントについては、 FileSink のJavaDocを参照してください。

Row-encoded 形式 #

行エンコード形式では、進行中の部分ファイルのOutputStreamへ個々の行をシリアライズするために使われるEncoderを指定するひつようがあります。

バケット割り当て機能に加えて、RowFormatBuilderにより、ユーザは以下を指定できます:

  • Custom RollingPolicy : DefaultRollingPolicyを上書きするローリングポリシー
  • bucketCheckInterval (default = 1 min) : ローリングポリシーに基づいた時間をチェックする間隔

従って文字列要素を書き込むための基本的な使用法は以下のようになります:

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

DataStream<String> input = ...;

final FileSink<String> sink = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build())
	.build();

input.sinkTo(sink);
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

import java.time.Duration

val input: DataStream[String] = ...

val sink: FileSink[String] = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build())
    .build()

input.sinkTo(sink)
data_stream = ...

sink = FileSink \
    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
    .with_rolling_policy(RollingPolicy.default_rolling_policy(
        part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000)) \
    .build()

data_stream.sink_to(sink)

この例では、レコードをデフォルトの1時間の時間バケットに割り当てる単純なシンクを作成します。また、次の3つの条件のいずれかに基づいて、進行中の部分ファイルをロールするローリングポリシーも指定します:

  • 15分以上のデータを含んでいる
  • 過去5分間新しいレコードを受信していない
  • (最後のレコードを書き込み後)ファイルサイズが1GBに達した

Bulk-encoded 形式 #

Bulk-encodedされたsinksは行エンコードされたsinkと同様に作成されますが、Encoderを指定する代わりに、 BulkWriter.Factory を指定する必要があります。 BulkWriterロジックは、新しい要素を追加およびフラッシュする方法と、さらにエンコードを目的としてレコードのバッチを完成される方法を定義します。

Flinkには5つの組み込みのBulkWriterファクトリが付属しています:

  • ParquetWriterFactory
  • AvroWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory
  • OrcBulkWriterFactory
重要 Bulk形式には、CheckpointRollingPolicyを拡張するローリングポリシーのみを含めることができます。 後者はチェックポイントでロールします。追加で、サイズまたは処理時間に基づいてロールできるポリシーがあります。
Parquet 形式 #

Flink には、AvroデータのためのParquetライター ファクトリーを作成するための組み込みの便利なメソッドが含まれています。これらメソッドとその関連ドキュメントは、AvroParquetWriters クラスにあります。

他のParquet互換データ形式に書き込むには、ParquetBuilderインタフェースの独自の実装を使ってParquetWriterFactoryを作成する必要があります。

アプリケーション内でParquet バルク エンコーダを使うには、以下の依存関係を追加する必要があります:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet</artifactId>
    <version>1.19-SNAPSHOT</version>
</dependency>

In order to use the Parquet format in PyFlink jobs, the following dependencies are required:

PyFlink JAR
Only available for stable releases.
See Python dependency management for more details on how to use JARs in PyFlink.

AvroデータをParquet形式に書き込むFileSinkは、以下のように作成できます:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.avro.Schema;


Schema schema = ...;
DataStream<GenericRecord> input = ...;

final FileSink<GenericRecord> sink = FileSink
	.forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema))
	.build();

input.sinkTo(sink);
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
    .forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema))
    .build()

input.sinkTo(sink)
schema = AvroSchema.parse_string(JSON_SCHEMA)
# The element could be vanilla Python data structure matching the schema,
# which is annotated with default Types.PICKLED_BYTE_ARRAY()
data_stream = ...

avro_type_info = GenericRecordAvroTypeInfo(schema)
sink = FileSink \
    .for_bulk_format(OUTPUT_BASE_PATH, AvroParquetWriters.for_generic_record(schema)) \
    .build()

# A map to indicate its Avro type info is necessary for serialization
data_stream.map(lambda e: e, output_type=avro_type_info).sink_to(sink)

同様に、ProtobufデータをParquet形式に書き込むFileSinkは、以下のように作成できます:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

// ProtoRecord is a generated protobuf Message class.
DataStream<ProtoRecord> input = ...;

final FileSink<ProtoRecord> sink = FileSink
	.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
	.build();

input.sinkTo(sink);
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters

// ProtoRecord is a generated protobuf Message class.
val input: DataStream[ProtoRecord] = ...

val sink: FileSink[ProtoRecord] = FileSink
    .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
    .build()

input.sinkTo(sink)

PyFlinkユーザの場合、RowをParquetファイルに書き込むBulkWriterFactoryを作成するためにParquetBulkWritersを使えます。

row_type = DataTypes.ROW([
    DataTypes.FIELD('string', DataTypes.STRING()),
    DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
])

sink = FileSink.for_bulk_format(
    OUTPUT_DIR, ParquetBulkWriters.for_row_type(
        row_type,
        hadoop_config=Configuration(),
        utc_timestamp=True,
    )
).build()

ds.sink_to(sink)
Avro形式 #

Flinkは、Avroファイルにデータを書き込むための組み込みサポートも提供します。Avroライターファクトリを作成するための便利なメソッドのリストと、それらの関連ドキュメントはAvroWritersクラスにあります。

プリケーションでAvroライターをつかうには、次の依存関係を追加する必要があります:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>1.19-SNAPSHOT</version>
</dependency>

In order to use the Avro format in PyFlink jobs, the following dependencies are required:

PyFlink JAR
Only available for stable releases.
See Python dependency management for more details on how to use JARs in PyFlink.

データをAvroファイルに書き込むFileSinkは、以下のように作成できます:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.avro.Schema;


Schema schema = ...;
DataStream<GenericRecord> input = ...;

final FileSink<GenericRecord> sink = FileSink
	.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
	.build();

input.sinkTo(sink);
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
    .build()

input.sinkTo(sink)
schema = AvroSchema.parse_string(JSON_SCHEMA)
# The element could be vanilla Python data structure matching the schema,
# which is annotated with default Types.PICKLED_BYTE_ARRAY()
data_stream = ...

avro_type_info = GenericRecordAvroTypeInfo(schema)
sink = FileSink \
    .for_bulk_format(OUTPUT_BASE_PATH, AvroBulkWriters.for_generic_record(schema)) \
    .build()

# A map to indicate its Avro type info is necessary for serialization
data_stream.map(lambda e: e, output_type=avro_type_info).sink_to(sink)

カスタマイズされたAvroライター(例えば、圧縮を有効)を作成するには、ユーザはAvroBuilderインタフェースの独自の実装を持つAvroWriterFactoryを作成する必要があります:

AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> {
	Schema schema = ReflectData.get().getSchema(Address.class);
	DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);

	DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter);
	dataFileWriter.setCodec(CodecFactory.snappyCodec());
	dataFileWriter.create(schema, out);
	return dataFileWriter;
});

DataStream<Address> stream = ...
stream.sinkTo(FileSink.forBulkFormat(
	outputBasePath,
	factory).build());
val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
    override def createWriter(out: OutputStream): DataFileWriter[Address] = {
        val schema = ReflectData.get.getSchema(classOf[Address])
        val datumWriter = new ReflectDatumWriter[Address](schema)

        val dataFileWriter = new DataFileWriter[Address](datumWriter)
        dataFileWriter.setCodec(CodecFactory.snappyCodec)
        dataFileWriter.create(schema, out)
        dataFileWriter
    }
})

val stream: DataStream[Address] = ...
stream.sinkTo(FileSink.forBulkFormat(
    outputBasePath,
    factory).build());
ORC形式 #

データをORC形式でバルクエンコードできるように、FlinkはVectorizerの具体的な実装を取り込むOrcBulkWriterFactoryを提供しています。

データを一括してエンコードする他の列形式と同様に、FlinkのOrcBulkWriterは入力要素をバッチで書き込みます。これを実現するために、ORCのVectorizedRowBatchを使います。

入力要素をVectorizedRowBatchに変換する必要があるため、ユーザは抽象Vectorizerクラスを拡張し、vectorize(T element, VectorizedRowBatch batch) メソッドを上書きする必要がありまあす。ご覧の通り、このメソッドはユーザが直接使うVectorizedRowBatchのインスタンスを提供するため、ユーザは入力elementColumnVectorsに変換するロジックを記述して、それを指定されたVectorizedRowBatchインスタンスに設定するだけで済みます。

アッ問えば、入力要素の型がPersonの場合、次のようになります:

class Person {
    private final String name;
    private final int age;
    ...
}

次に型Personの要素を変換しそれらをVectorizedRowBatchに設定する子実装は、次のようになります:

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

public class PersonVectorizer extends Vectorizer<Person> implements Serializable {	
	public PersonVectorizer(String schema) {
		super(schema);
	}
	@Override
	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
		BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
		LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
		int row = batch.size++;
		nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
		ageColVector.vector[row] = element.getAge();
	}
}
import java.nio.charset.StandardCharsets
import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}

class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
    val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
    val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
    nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
    ageColVector.vector(batch.size + 1) = element.getAge
  }

}

アプリケーションでORCバルクエンコーダを使うには、ユーザは次の依存関係を追加する必要があります:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-orc</artifactId>
    <version>1.19-SNAPSHOT</version>
</dependency>

次に、ORC形式でデータを書き込むFileSinkを次のように作成できます:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;

String schema = "struct<_col0:string,_col1:int>";
DataStream<Person> input = ...;

final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));

final FileSink<Person> sink = FileSink
	.forBulkFormat(outputBasePath, writerFactory)
	.build();

input.sinkTo(sink);
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory

val schema: String = "struct<_col0:string,_col1:int>"
val input: DataStream[Person] = ...
val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));

val sink: FileSink[Person] = FileSink
    .forBulkFormat(outputBasePath, writerFactory)
    .build()

input.sinkTo(sink)

OrcBulkWriterFactoryは、Hadoop ConfigurationPropertiesも取得できるため、独自のHadoop configurationとORCライターのプロパティを提供できます。

String schema = ...;
Configuration conf = ...;
Properties writerProperties = new Properties();

writerProperties.setProperty("orc.compress", "LZ4");
// Other ORC supported properties can also be set similarly.

final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
    new PersonVectorizer(schema), writerProperties, conf);
val schema: String = ...
val conf: Configuration = ...
val writerProperties: Properties = new Properties()

writerProperties.setProperty("orc.compress", "LZ4")
// Other ORC supported properties can also be set similarly.

val writerFactory = new OrcBulkWriterFactory(
    new PersonVectorizer(schema), writerProperties, conf)

ORCライターのプロパティの完全なリストは、こちらにあります。

ユーザメタデータをORCファイルに追加したいユーザは、オーバーライドするvectorize(...)メソッド内でaddUserMetadata(...)を呼び出すことで追加できます。

public class PersonVectorizer extends Vectorizer<Person> implements Serializable {	
	@Override
	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
		...
		String metadataKey = ...;
		ByteBuffer metadataValue = ...;
		this.addUserMetadata(metadataKey, metadataValue);
	}
}
class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
    ...
    val metadataKey: String = ...
    val metadataValue: ByteBuffer = ...
    addUserMetadata(metadataKey, metadataValue)
  }

}

PyFlinkユーザの場合、OrcBulkWritersを使ってBulkWriterFactoryを作成し、Orc形式でレコードをファイルに書き込むことができます。

In order to use the ORC format in PyFlink jobs, the following dependencies are required:

PyFlink JAR
Only available for stable releases.
See Python dependency management for more details on how to use JARs in PyFlink.

row_type = DataTypes.ROW([
    DataTypes.FIELD('name', DataTypes.STRING()),
    DataTypes.FIELD('age', DataTypes.INT()),
])

sink = FileSink.for_bulk_format(
    OUTPUT_DIR,
    OrcBulkWriters.for_row_type(
        row_type=row_type,
        writer_properties=Configuration(),
        hadoop_config=Configuration(),
    )
).build()

ds.sink_to(sink)
Hadoop SequenceFile 形式 #

アプリケーションでSequenceFileバルクエンコーダを使うには、次の依存関係を追加する必要があります:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sequence-file</artifactId>
    <version>1.19-SNAPSHOT</version>
</dependency>

単純なSequenceFileライターは次のように作成できます:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;


DataStream<Tuple2<LongWritable, Text>> input = ...;
Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
final FileSink<Tuple2<LongWritable, Text>> sink = FileSink
  .forBulkFormat(
    outputBasePath,
    new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, Text.class))
	.build();

input.sinkTo(sink);
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.configuration.GlobalConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;

val input: DataStream[(LongWritable, Text)] = ...
val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
val sink: FileSink[(LongWritable, Text)] = FileSink
  .forBulkFormat(
    outputBasePath,
    new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
	.build()

input.sinkTo(sink)

SequenceFileWriterFactoryは、圧縮設定を指定するために追加のコンストラクタパラメータをサポートします。

バケットの割り当て #

バケットロジックはデータがベース出力ディレクトリ内のサブディレクトリにどのように構造化されるかを定義します。

行形式とbulk形式の両方(File Formatsを参照)は、デフォルトの割り当てとしてDateTimeBucketAssignerを使います。 デフォルトでは、DateTimeBucketAssigner はシステムのデフォルトのタイムゾーンに基づいて以下の形式で1時間ごとのバケットを生成します: yyyy-MM-dd--HH。日付形式(つまりバケットサイズ)とタイムゾーンは手動で設定できます。

フォーマットビルダーで.withBucketAssigner(assigner)を呼び出すことで、独自のBucketAssignerを指定できます。

Flinkには2つの組み込みBucketAssignersが付属しています:

  • DateTimeBucketAssigner : デフォルトの時間ベースのアサイナー
  • BasePathBucketAssigner : 全ての部分ファイルをベースパス(単一のグローバルバケット)に格納するアサイナー
注意: PyFlinkは、DateTimeBucketAssignerBasePathBucketAssignerのみをサポートします。

ローリング ポリシー #

RollingPolicyは、特定の進行中の部分ファイルをいつ閉じて保留状態に移動し、その後終了状態に移動するかを定義します。 “finished"状態の部分ファイルは、すぐに表示できる状態であり、障害時に元に戻されない有効なデータが含まれていることが保証されています。 STREAMINGモードでは、ローリングポリシーとチェックポイント間隔(保留中のファイルは次のチェックポイントで完了します)を組み合わせて、部分ファイルが下流ストリームリーダーで利用可能になるまでの時間とこれらの部分ファイルのサイズと数を制御します。BATCHモードでは、部分ファイルはジョブの最後に表示されますが、ローリングポリシーによって最大サイズを制御できます。

Flinkには2つのRollingPoliciesが組み込まれています:

  • DefaultRollingPolicy
  • OnCheckpointRollingPolicy
注意: PyFlinkはDefaultRollingPolicyOnCheckpointRollingPolicyのみをサポートします。

部分ファイルのライフサイクル #

ダウンストリームのシステムでFileSinkの出力を使うために、生成される出力ファイルの命名とライフサイクルを理解する必要があります。

部分ファイルは以下の3つの状態のいずれかです:

  1. In-progress : 現在書き込み中の部分ファイルは進行中です
  2. Pending : コミットを待機している(ローリングポリシーにより)閉じられた進行中のファイル
  3. Finished : 成功したチェックポイント(STREAMING)または入力の終わり(BATCH)で保留中のファイルは"Finished"に移行します。

終了したファイルのみが、後で更新されないことが保証されるため、ダウンストリームで安全に読み込むことができます。

各ライタ サブタスクは各アクティブなバケットについて1つの進行中の部分ファイルを常にもちますが、保留中および完了したファイルが複数ある場合があります。

部分ファイルの例

これらのファイルのライフサイクルをより深く理解するために、2つのsinkサブタスクを持つ簡単な例を見てみましょう:

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575

部分ファイルpart-81fc4980-a6af-41c8-9937-9939408a734b-0がロールすると(大きくなりすぎたとしましょう)、保留中になりますが、リネームされません。その後、sinkは新しい部分ファイルを開きます: part-81fc4980-a6af-41c8-9937-9939408a734b-1:

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

part-81fc4980-a6af-41c8-9937-9939408a734b-0は現在完了待ちのため、次のチェックポイントが成功すると、完了します:

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

バケットポリシーの指示に従って新しいバケットが作成され、これは現在進行中のファイルには影響しません:

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
└── 2019-08-25--13
    └── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1

バケットポリシーはレコードごとに評価されるため、古いバケットはまだ新しいレコードを受信できます。

部分ファイルの設定 #

完了したファイルは、命名スキーマでのみ進行中のファイルと区別することができます。

デフォルトでは、ファイルの命名方法は次の通りです:

  • In-progress / Pending: part-<uid>-<partFileIndex>.inprogress.uid
  • Finished: part-<uid>-<partFileIndex> uidはサブタスクがインスタンス化される時にsinkのサブタスクに割り当てられるランダムIDです。このuidは耐障害性がないため、サブタスクが障害から回復した時に再生成されます。

Flinkを使うと、ユーザは部分ファイルのプレフィックスやサフィックスを指定できます。 これは、OutputFileConfigを使って行うことができます。 タオ手羽、プレフィックス"prefix"とサフィックス”.ext"の場合、sinkは以下のファイルを作成します:

└── 2019-08-25--12
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
    └── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

ユーザは次の方法でOutputFileConfigを指定できます:

OutputFileConfig config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build();
            
FileSink<Tuple2<Integer, Integer>> sink = FileSink
 .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build();
			
val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            
val sink = FileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()
			
config = OutputFileConfig \
    .builder() \
    .with_part_prefix("prefix") \
    .with_part_suffix(".ext") \
    .build()

sink = FileSink \
    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
    .with_bucket_assigner(BucketAssigner.base_path_bucket_assigner()) \
    .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
    .with_output_file_config(config) \
    .build()

コンパクション #

バージョン1.15から、FileSink保留中ファイルの圧縮をサポートしています。これにより、アプリケーションは特にチェックポイントを取得し続ける必要があるbulkエンコード形式を使うときに、チェックポイントの間隔を短くすることができます。

以下のようにして圧縮を有効にできます

FileSink<Integer> fileSink=
	FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
	    .enableCompact(
	        FileCompactStrategy.Builder.newBuilder()
	            .setSizeThreshold(1024)
	            .enableCompactionOnCheckpoint(5)
	            .build(),
	        new RecordWiseFileCompactor<>(
	            new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
	    .build();
val fileSink: FileSink[Integer] =
  FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
    .enableCompact(
      FileCompactStrategy.Builder.newBuilder()
        .setSizeThreshold(1024)
        .enableCompactionOnCheckpoint(5)
        .build(),
      new RecordWiseFileCompactor(
        new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
    .build()
file_sink = FileSink \
    .for_row_format(PATH, Encoder.simple_string_encoder()) \
    .enable_compact(
        FileCompactStrategy.builder()
            .set_size_threshold(1024)
            .enable_compaction_on_checkpoint(5)
            .build(),
        FileCompactor.concat_file_compactor()) \
    .build()

有効にすると、ファイルが保留中になり、コミットされるまでの間で圧縮されます。保留中のファイルは、まずパスが.で始まる一時ファイルにコミットされます。次に、これらのファイルはユーザが指定したcompactorによる戦略にしたがって圧縮され、新しい圧縮された保留中のファイルが生成されます。 その後、これら保留中のファイルはコミッターに送信され、正式なファイルにコミットされます。その後、ソースファイルは削除されます。

圧縮を有効にする場合は、 FileCompactStrategy FileCompactor を指定する必要があります。

FileCompactStrategy は、いつ、どのファイルが圧縮されるかを指定します。現在のところ、2つの平行条件があります: 対象のファイルサイズと通過するチェックポイントの数。 キャッシュされたファイルの合計サイズがサイズの閾値を超えた場合、または最後の圧縮からのチェックポイントの数が指定された値に達すると、キャッシュされたファイルの圧縮がスケジュールされます。

FileCompactor は、Pathの指定されたリストを圧縮し結果ファイルを書き込む方法を指定します。ファイルの書き方により次の2種類に分類されます:

  • OutputStreamBasedFileCompactor : ユーザは圧縮された結果を出力ストリームに書き込めます。ユーザが入力ファイルからレコードを読みたくない、あるいは読み込めない場合に便利です。 例としては、ファイルのリストを直接連結する ConcatFileCompactor があります。
  • RecordWiseFileCompactor : compactorは入力ファイルからレコードを1つずつ読み取り、FileWriterと同様に結果ファイルに書き込めます。 例としては、ソースファイルからレコードを読み取り、それらをCompactingFileWriterを使って書き込む RecordWiseFileCompactor があります。ユーザはソースファイルからレコードを読み取る方法を指定する必要があります。

重要な注意 1 圧縮を有効にした後、圧縮を無効にする場合は、FileSinkを構築する時にdisableCompactを明示的に呼び出す必要があります。

重要な注意 2 圧縮が有効な場合、書き込まれたファイルが表示されるまでに長時間待つ必要があります。

注意: PyFlinkはConcatFileCompactorIdenticalFileCompactorのみをサポートします。

重要な考慮事項 #

概要 #

重要な注意 1: Hadoop < 2.7 を使う場合、各チェックポイントで部分ファイルをロールするOnCheckpointRollingPolicyを使ってください。その理由は、部分ファイルがチェックポイント間隔を"横断"する場合、障害からの回復時にFileSinkが進行中のファイルからコミットされてないデータを破棄するためにtruncate()メソッドを使う可能性があるからです。このメソッドは2.7より前のHaddopバージョンでサポートされておらず、Flinkは例外を投げるでしょう。 and Flink will throw an exception.

重要な注意 2: FlinkのsinksとUDFsは一般的に通常のジョブの終了(例えば有限な入力ストリーム)と障害によるジョブの終了を区別しないことを考慮すると、最後の進行中のファイルは"finished"状態に移行しません。

重要な注意 3: FlinkとFileSinkはコミットされたデータを上書きすることはありません。 これを考慮すると、後続の成功したチェックポイントによってコミットされた進行中のファイルを前提とする古いチェックポイント/セーブポイントから復元しようとすると、FileSinkは進行中のファイルを見つけられないため、再開を拒否し例外を投げます。

重要な注意 4: 現在のところ、FileSinkは以下の5杖のファイルシステムのみをサポートします: HDFS、S3、OSS、ABFS、Local。実行時にサポートされないファイルシステムを使うと、Flinkは例外を投げます。

BATCH固有 #

重要な注意 1: Writerはユーザ指定の並列処理で実行されますが、Committerは1つの並列処理で実行されます。

重要な注意 2: 保留中のファイルはコミットされます。つまり、全ての入力が処理された後でFinished状態へ移行します。

重要な注意 3: 高可用性がアクティブ化されている場合、Committersがコミット中にJobManagerの障害が発生した場合、重複します。この問題は将来のFlinkバージョンで修正される予定です(FLIP-147での進捗を参照)。

S3固有 #

重要な注意 1: S3の場合、FileSinkは、Prestoに基づいた実装ではなく、Hadoop-basedのファイルシステムのみをサポートします。ジョブがS3への書き込みにFileSinkを使っているが、チェックポイント作成にPresto-basedのものを使いたい場合は、sinkの対象パスのスキーマとして*“s3a://”* (Hadoopの場合)と、チェックポイントとして*“s3p://”(Prestoの場合)を明示的に使うことをお勧めします。sinkとチェックポイントの両方に“s3://"*を使うと、両方の実装がスキーマを"listen"するため、予期しない挙動に繋がる可能性があります

重要な注意 2: 効率的でありながら確実に1回のセマンティクスを保証するために、FileSinkはS3のMulti-part Upload(これからはMPU)機能を使います。この機能により、MPUの全ての部分が正常にアップロードされた時に元のファイルに結合できる独立したチャンク(つまり"multi-part”)で、ファイルをアップロードすることができます。 非アクティブな MPU の場合、S3はバケットライフサイクルルールをサポートします。ユーザはこれを使って開始後指定された日数以内に完了しないマルチパートのアップロードを中止することができます。これは、このルールを積極的に設定し、一部の部分ファイルが完全にアップロードされていないセーブポイントを取得した場合、ジョブが再開される前に関連するMPUがタイムアウトする可能性があることを意味します。これにより、保留中の部分ファイルがもう無いためにジョブがそのセーブポイントから復元できなくなり、Flinkはそれらを取得しようとして失敗するために、例外で失敗します。

OSS固有 #

重要な注意: 効率的でありながら確実に1回のセマンティクスを保証するために、FileSinkはOSSのMulti-part Upload機能も使います(S3と同様)。

Back to top

inserted by FC2 system