This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
CSV
CSV形式 #
CSV形式を使うには、プロジェクトにFlink CSV依存関係を追加する必要があります:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.19-SNAPSHOT</version>
</dependency>
PyFlinkユーザの場合、ジョブの中で直接使えます。
FlinkはCsvReaderFormat
を使ったCSVファイルの読み込みをサポートします。リーダーはJacksonライブラリを利用し、CSVスキーマと解析オプションに対応する設定を渡すことができます。
CsvReaderFormat
は、次のように初期化して使えます:
CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source =
FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
この場合、CSV解析のスキーマは、Jackson
ライブラリを使って、SomePojo
クラスのフィールドに基づいて自動的に算出されます。
注意: CSVファイルの列のフィールドと正確に一致するフィール順番を使って、クラス定義に@JsonPropertyOrder({field1, field2, ...})
アノテーションを追加する必要がある場合があります。
上級の設定 #
CSV スキーマまたは解析オプションをより詳細に制御する必要がある場合は、CsvReaderFormat
のより低レベルのforSchema
静的ファクトリーメソッドを使います:
CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory,
Function<CsvMapper, CsvSchema> schemaGenerator,
TypeInformation<T> typeInformation)
以下は、独自のカラムのセパラータを使ってPOJOを読み込む例です:
//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
"adminName","capital","population"})
public static class CityPojo {
public String city;
public BigDecimal lat;
public BigDecimal lng;
public String country;
public String iso2;
public String adminName;
public String capital;
public long population;
}
Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
CsvReaderFormat<CityPojo> csvFormat =
CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
FileSource<CityPojo> source =
FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
対応するCSVファイル:
Berlin|52.5167|13.3833|Germany|DE|Berlin|primary|3644826
San Francisco|37.7562|-122.443|United States|US|California||3592294
Beijing|39.905|116.3914|China|CN|Beijing|primary|19433000
きめ細かいJackson
設定を使って、より複雑なデータ型を読み込むこともできます:
public static class ComplexPojo {
private long id;
private int[] array;
}
CsvReaderFormat<ComplexPojo> csvFormat =
CsvReaderFormat.forSchema(
CsvSchema.builder()
.addColumn(
new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
.addColumn(
new CsvSchema.Column(4, "array", CsvSchema.ColumnType.ARRAY)
.withArrayElementSeparator("#"))
.build(),
TypeInformation.of(ComplexPojo.class));
PyFlinkユーザの場合、手動でカラムを追加することでcsvスキーマを定義でき、csvソースの出力型は各列がフィールドにマップされた行になります。
schema = CsvSchema.builder() \
.add_number_column('id', number_type=DataTypes.BIGINT()) \
.add_array_column('array', separator='#', element_type=DataTypes.INT()) \
.set_column_separator(',') \
.build()
source = FileSource.for_record_stream_format(
CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
対応するCSVファイル:
0,1#2#3
1,
2,1
TextLineInputFormat
と同様に、CsvReaderFormat
は継続モードとバッチモードで使えます(例として、TextLineInputFormatを参照)。
PyFlinkユーザの場合、CsvBulkWriters
を使ってBulkWriterFactory
を作成し、レコードをCSV形式でファイルに書き込むことができます。
schema = CsvSchema.builder() \
.add_number_column('id', number_type=DataTypes.BIGINT()) \
.add_array_column('array', separator='#', element_type=DataTypes.INT()) \
.set_column_separator(',') \
.build()
sink = FileSink.for_bulk_format(
OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
ds.sink_to(sink)