Parquet
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Parquet 形式 #

FlinkはParquetファイルの読み取り、 Flink RowData の生成、Avroレコードの生成をサポートします。 この形式を使うには、プロジェクトにflink-parquet依存関係を追加する必要があります:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-parquet</artifactId>
	<version>1.19-SNAPSHOT</version>
</dependency>

Avroレコードを読み取るには、parquet-avro依存関係を追加する必要があります:

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>1.12.2</version>
    <optional>true</optional>
    <exclusions>
        <exclusion>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
        </exclusion>
        <exclusion>
            <groupId>it.unimi.dsi</groupId>
            <artifactId>fastutil</artifactId>
        </exclusion>
    </exclusions>
</dependency>

In order to use the Parquet format in PyFlink jobs, the following dependencies are required:

PyFlink JAR
Only available for stable releases.
See Python dependency management for more details on how to use JARs in PyFlink.

この形式は、バッチ実行モードとストリーミング実行モードの両方で使える新しいソースと互換性があります。 したがって、この形式は次の2種類のデータに使えます:

  • 制限付きデータ: 全てのファイルをリストし、それらすべてを読み取ります。
  • 無宣言データ: ディレクトリ内で阿多rしいファイルが出現するかどうかを監視します。
ファイルソースを開始すると、デフォルトで制限付きデータ用に設定されます。 無制限のデータ用にファイルソースを設定するには、さらにAbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)を呼び出す必要があります。

Vectorizedリーダー

// Parquet rows are decoded in batches
FileSource.forBulkFileFormat(BulkFormat,Path...)

// Monitor the Paths to read data as unbounded data
FileSource.forBulkFileFormat(BulkFormat,Path...)
        .monitorContinuously(Duration.ofMillis(5L))
        .build();
# Parquet rows are decoded in batches
FileSource.for_bulk_file_format(BulkFormat, Path...)

# Monitor the Paths to read data as unbounded data
FileSource.for_bulk_file_format(BulkFormat, Path...) \
          .monitor_continuously(Duration.of_millis(5)) \
          .build()

Avro Parquetリーダー

// Parquet rows are decoded in batches
FileSource.forRecordStreamFormat(StreamFormat,Path...)

// Monitor the Paths to read data as unbounded data
FileSource.forRecordStreamFormat(StreamFormat,Path...)
        .monitorContinuously(Duration.ofMillis(5L))
        .build();
# Parquet rows are decoded in batches
FileSource.for_record_stream_format(StreamFormat, Path...)

# Monitor the Paths to read data as unbounded data
FileSource.for_record_stream_format(StreamFormat, Path...) \
          .monitor_continuously(Duration.of_millis(5)) \
          .build()
以下の例は、全て制限付きデータ用に設定されます。 無制限のデータ用にファイルソースを設定するには、さらにAbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)を呼び出す必要があります。

この例では、ParquetレコードをFlink RowDatasとして含むDataStreamを作成します。このスキーマは、指定されたフィールド(“f7”, “f4” and “f99”)のみを読み取るように投影されます。 Flinkは500レコードずつバッチでレコードを読み取ります。最初のブール値パラメータは、タイムスタンプ列がUTCとして解釈することを指定します。 2つ目のブール値は、投影されたParquetフィールド名が大文字と小文字を区別することをアプリケーションに指示します。 レコードにはイベントのタイムスタンプが含まれていないため、ウォーターマーク戦略は定義されていません。

final LogicalType[] fieldTypes =
        new LogicalType[] {
                new DoubleType(), new IntType(), new VarCharType()};
final RowType rowType = RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"});

final ParquetColumnarRowInputFormat<FileSourceSplit> format =
        new ParquetColumnarRowInputFormat<>(
                new Configuration(),
                rowType,
                InternalTypeInfo.of(rowType),
                500,
                false,
                true);
final FileSource<RowData> source =
        FileSource.forBulkFileFormat(format,  /* Flink Path */)
                .build();
final DataStream<RowData> stream =
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
row_type = DataTypes.ROW([
    DataTypes.FIELD('f7', DataTypes.DOUBLE()),
    DataTypes.FIELD('f4', DataTypes.INT()),
    DataTypes.FIELD('f99', DataTypes.VARCHAR()),
])
source = FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
    row_type=row_type,
    hadoop_config=Configuration(),
    batch_size=500,
    is_utc_timestamp=False,
    is_case_sensitive=True,
), PARQUET_FILE_PATH).build()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")

Avroレコード #

Flinkは、Parquetファイルの意味取りによる3種類のAvroレコードの生成をサポートします(PyFlinkでは汎用レコードのみがサポートされます):

汎用レコード #

AvroスキーマはJSONを使って定義されます。Avro specificationでAvroスキーマと型について詳細な情報が得られます。 この例はofficial Avro tutorialで説明されるAvroスキーマの例を使います:

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favoriteNumber",  "type": ["int", "null"]},
     {"name": "favoriteColor", "type": ["string", "null"]}
 ]
}

このスキーマは、name、favoriteNumber、favoriteColorの3つのフィールドを持つユーザを表すレコードを定義します:Avroスキーマの定義方法の詳細については、record specificationをご覧ください。

次の例では、ParquetレコードをAvro汎用レコードとして含む、DetaStreamを生成します。 JSON文字列に基づいてAvroスキーマを解析します。スキーマを解析する方法は他にもたくさんあります。例えば、java.io.File または java.io.InputStream。詳細はAvro Schemaを参照してください。 その後、Avro汎用レコードのAvroParquetReadersを介してAvroParquetRecordFormatを作成します。

// parsing avro schema
final Schema schema =
        new Schema.Parser()
            .parse(
                    "{\"type\": \"record\", "
                        + "\"name\": \"User\", "
                        + "\"fields\": [\n"
                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
                        + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
                        + "        {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
                        + "    ]\n"
                        + "    }");

final FileSource<GenericRecord> source =
        FileSource.forRecordStreamFormat(
                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
        .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10L);
        
final DataStream<GenericRecord> stream =
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
# parsing avro schema
schema = AvroSchema.parse_string("""
{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": ["int", "null"]},
        {"name": "favoriteColor", "type": ["string", "null"]}
    ]
}
""")

source = FileSource.for_record_stream_format(
    AvroParquetReaders.for_generic_record(schema), # file paths
).build()

env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(10)

stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")

特定レコード #

以前に定義したスキーマに基づいて、Avroコード生成を使ってクラスを生成できます。 クラスが生成されたら、プログラム内でスキーマを直接使う必要はありません。 avro-tools.jarを使ってコードを手動で生成することも、Avro Mavenプラグインを使って設定されたソースディレクトリに存在する.avscファイルに対してコード生成を実行することもできます。詳細は、Avro Getting Startedをご覧ください。

以下の例では、サンプルスキーマ testdata.avsc を使います:

[
  {"namespace": "org.apache.flink.formats.parquet.generated",
    "type": "record",
    "name": "Address",
    "fields": [
      {"name": "num", "type": "int"},
      {"name": "street", "type": "string"},
      {"name": "city", "type": "string"},
      {"name": "state", "type": "string"},
      {"name": "zip", "type": "string"}
    ]
  }
]

Avro Mavenプラグインを使ってAddress Javaクラスを生成します:

@org.apache.avro.specific.AvroGenerated
public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
    // generated code...
}

Avro固有のレコード用にAvroParquetReadersを介してAvroParquetRecordFormatを作成し、その後Avro固有レコードとしてParquetレコードを含むDataStreamを作成します。

final FileSource<GenericRecord> source =
        FileSource.forRecordStreamFormat(
                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */)
        .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10L);
        
final DataStream<GenericRecord> stream =
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

反映レコード #

事前定義されたAvroスキーマを必要とするAvro汎用レコードと特定レコード以外にも、Flinkは既存のPOJOクラスに基づいたParquetファイルからのDataStreamの生成もサポートします。 この場合、AvroはJavaリフレクションを使って、これらのPOJOクラス用のスキーマとプロトコルを生成します。 Java型はAvroスキーマにマッピングされます。詳細はAvro reflectドキュメントを参照してください。

この例では、単純なJava POJOクラス Datum を使います:

public class Datum implements Serializable {

    public String a;
    public int b;

    public Datum() {}

    public Datum(String a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        Datum datum = (Datum) o;
        return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
    }

    @Override
    public int hashCode() {
        int result = a != null ? a.hashCode() : 0;
        result = 31 * result + b;
        return result;
    }
}

Avro反映レコード用にAvroParquetReadersを介してAvroParquetRecordFormatを作成し、その後Avro反映レコードとしてParquetレコードを含むDataStreamを作成します。

final FileSource<GenericRecord> source =
        FileSource.forRecordStreamFormat(
                AvroParquetReaders.forReflectRecord(Datum.class), /* Flink Path */)
        .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10L);
        
final DataStream<GenericRecord> stream =
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Parquetファイルの前提条件 #

Avro反映レコードの読み取りをサポートするには、Parquetファイルに特定のメタ情報が含まれる必要があります。 Parquetデータの生成に使われるAvroスキーマには、リフレクションプロセスの具体的なJavaクラスを識別するためにプログラムによって使われるnamespaceが含まれている必要があります。

以下の例は、以前に使用したUserスキーマを示しています。ただし、今回は、リフレクションのUserクラスが見つかる場所(この場合はパッケージ)を指す名前空間が含まれています。

// avro schema with namespace
final String schema = 
                    "{\"type\": \"record\", "
                        + "\"name\": \"User\", "
                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro\", "
                        + "\"fields\": [\n"
                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
                        + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
                        + "        {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
                        + "    ]\n"
                        + "    }";

このスキーマで生成されるParquetファイルは、次のようなメタデータを含みます:

creator:        parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:          parquet.avro.schema =
{"type":"record","name":"User","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra:          writer.model.name = avro

file schema:    org.apache.flink.formats.parquet.avro.User
--------------------------------------------------------------------------------
name:           REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1

row group 1:    RC:3 TS:143 OFFSET:4
--------------------------------------------------------------------------------
name:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]

org.apache.flink.formats.parquet.avroパッケージで定義されたUserクラスは次のようになります:

public class User {
        private String name;
        private Integer favoriteNumber;
        private String favoriteColor;

        public User() {}

        public User(String name, Integer favoriteNumber, String favoriteColor) {
            this.name = name;
            this.favoriteNumber = favoriteNumber;
            this.favoriteColor = favoriteColor;
        }

        public String getName() {
            return name;
        }

        public Integer getFavoriteNumber() {
            return favoriteNumber;
        }

        public String getFavoriteColor() {
            return favoriteColor;
        }
    }

次のプログラムを作成して、parquetファイルからUser型のAvro反映レコードを読み取ることができます:

final FileSource<GenericRecord> source =
        FileSource.forRecordStreamFormat(
        AvroParquetReaders.forReflectRecord(User.class), /* Flink Path */)
        .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10L);

final DataStream<GenericRecord> stream =
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
inserted by FC2 system