Avro
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Avro形式 #

FlinkにはApache Avroのサポートが組み込まれています。これにより、Flinkを使ってAvroスキーマに基づいてAvroデータを簡単に読み書きできます。 Flinkのシリアライズ化フレームワークは、Avroスキーマから生成されたクラスを処理できます。Avro形式を使うには、ビルド自動化ツール(MavenやSBTなど)を使うプロジェクトに次の依存関係が必要です。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.19-SNAPSHOT</version>
</dependency>

In order to use the Avro format in PyFlink jobs, the following dependencies are required:

PyFlink JAR
Only available for stable releases.
See Python dependency management for more details on how to use JARs in PyFlink.

Avroファイルからデータを読み込むには、AvroInputFormatを指定する必要があります。

:

AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataStream<User> usersDS = env.createInput(users);

UserはAvroによって生成されたPOJOであることに注意してください。FlinkはこれらのPOJOの文字ベースキーの選択をすることもできます。例えば:

usersDS.keyBy("name");

FlinkでのGenericData.Record型の使用は可能ですが、お勧めできません。レコードには完全なスキーマが含まれているため、データ量が非常に多くなり、従って使うと遅くなる可能性があります。

FlinkのPOJOフィールド選択は、Avroから生成されたPOJOでも動作します。しかし、それはフィールドの型が生成されたクラスへ正しく書かれている場合のみ可能です。フィールドの型がObjectの場合、そのフィールドをjoinまたはgroupキーとして使うことはできません。 Avroで{"name": "type_double_test", "type": "double"},のようにフィールドを指定すると正常に機能しますが、フィールドを1つだけもつUNION型({"name": "type_double_test", "type": ["double"]},)として指定すると、Object型のフィールドを生成します。Nullが可能な型({"name": "type_double_test", "type": ["null", "double"]},)の指定が可能であることに注意してください!

Pythonジョブの場合、Avroファイルから読み取るためにAvroスキーマを定義する必要があり、その要素は標準のPythonオブジェクトになります。例えば:

schema = AvroSchema.parse_string("""
{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": ["int", "null"]},
        {"name": "favoriteColor", "type": ["string", "null"]}
    ]
}
""")

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.create_input(AvroInputFormat(AVRO_FILE_PATH, schema))

def json_dumps(record):
    import json
    return json.dumps(record)

ds.map(json_dumps).print()
inserted by FC2 system