CSV
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

CSV形式 #

CSV形式を使うには、プロジェクトにFlink CSV依存関係を追加する必要があります:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-csv</artifactId>
	<version>1.19-SNAPSHOT</version>
</dependency>

PyFlinkユーザの場合、ジョブの中で直接使えます。

FlinkはCsvReaderFormatを使ったCSVファイルの読み込みをサポートします。リーダーはJacksonライブラリを利用し、CSVスキーマと解析オプションに対応する設定を渡すことができます。

CsvReaderFormatは、次のように初期化して使えます:

CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source = 
        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

この場合、CSV解析のスキーマは、Jackson ライブラリを使って、SomePojo クラスのフィールドに基づいて自動的に算出されます。

注意: CSVファイルの列のフィールドと正確に一致するフィール順番を使って、クラス定義に@JsonPropertyOrder({field1, field2, ...})アノテーションを追加する必要がある場合があります。

上級の設定 #

CSV スキーマまたは解析オプションをより詳細に制御する必要がある場合は、CsvReaderFormatのより低レベルのforSchema静的ファクトリーメソッドを使います:

CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory, 
                             Function<CsvMapper, CsvSchema> schemaGenerator, 
                             TypeInformation<T> typeInformation) 

以下は、独自のカラムのセパラータを使ってPOJOを読み込む例です:

//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
                    "adminName","capital","population"})
    public static class CityPojo {
    public String city;
    public BigDecimal lat;
    public BigDecimal lng;
    public String country;
    public String iso2;
    public String adminName;
    public String capital;
    public long population;
}

Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
        mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');

CsvReaderFormat<CityPojo> csvFormat =
        CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));

FileSource<CityPojo> source =
        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

対応するCSVファイル:

Berlin|52.5167|13.3833|Germany|DE|Berlin|primary|3644826
San Francisco|37.7562|-122.443|United States|US|California||3592294
Beijing|39.905|116.3914|China|CN|Beijing|primary|19433000

きめ細かいJackson設定を使って、より複雑なデータ型を読み込むこともできます:

public static class ComplexPojo {
    private long id;
    private int[] array;
}

CsvReaderFormat<ComplexPojo> csvFormat =
        CsvReaderFormat.forSchema(
                CsvSchema.builder()
                        .addColumn(
                                new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
                        .addColumn(
                                new CsvSchema.Column(4, "array", CsvSchema.ColumnType.ARRAY)
                                        .withArrayElementSeparator("#"))
                        .build(),
                TypeInformation.of(ComplexPojo.class));

PyFlinkユーザの場合、手動でカラムを追加することでcsvスキーマを定義でき、csvソースの出力型は各列がフィールドにマップされた行になります。

schema = CsvSchema.builder() \
    .add_number_column('id', number_type=DataTypes.BIGINT()) \
    .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
    .set_column_separator(',') \
    .build()

source = FileSource.for_record_stream_format(
    CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()

# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')

対応するCSVファイル:

0,1#2#3
1,
2,1

TextLineInputFormatと同様に、CsvReaderFormatは継続モードとバッチモードで使えます(例として、TextLineInputFormatを参照)。

PyFlinkユーザの場合、CsvBulkWritersを使ってBulkWriterFactoryを作成し、レコードをCSV形式でファイルに書き込むことができます。

schema = CsvSchema.builder() \
    .add_number_column('id', number_type=DataTypes.BIGINT()) \
    .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
    .set_column_separator(',') \
    .build()

sink = FileSink.for_bulk_format(
    OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()

ds.sink_to(sink)
inserted by FC2 system