This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Avro形式 #
FlinkにはApache Avroのサポートが組み込まれています。これにより、Flinkを使ってAvroスキーマに基づいてAvroデータを簡単に読み書きできます。 Flinkのシリアライズ化フレームワークは、Avroスキーマから生成されたクラスを処理できます。Avro形式を使うには、ビルド自動化ツール(MavenやSBTなど)を使うプロジェクトに次の依存関係が必要です。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.19-SNAPSHOT</version>
</dependency>
In order to use the Avro format in PyFlink jobs, the following dependencies are required:
PyFlink JAR |
---|
Only available for stable releases. |
Avroファイルからデータを読み込むには、AvroInputFormat
を指定する必要があります。
例:
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataStream<User> usersDS = env.createInput(users);
User
はAvroによって生成されたPOJOであることに注意してください。FlinkはこれらのPOJOの文字ベースキーの選択をすることもできます。例えば:
usersDS.keyBy("name");
FlinkでのGenericData.Record
型の使用は可能ですが、お勧めできません。レコードには完全なスキーマが含まれているため、データ量が非常に多くなり、従って使うと遅くなる可能性があります。
FlinkのPOJOフィールド選択は、Avroから生成されたPOJOでも動作します。しかし、それはフィールドの型が生成されたクラスへ正しく書かれている場合のみ可能です。フィールドの型がObject
の場合、そのフィールドをjoinまたはgroupキーとして使うことはできません。
Avroで{"name": "type_double_test", "type": "double"},
のようにフィールドを指定すると正常に機能しますが、フィールドを1つだけもつUNION型({"name": "type_double_test", "type": ["double"]},
)として指定すると、Object
型のフィールドを生成します。Nullが可能な型({"name": "type_double_test", "type": ["null", "double"]},
)の指定が可能であることに注意してください!
Pythonジョブの場合、Avroファイルから読み取るためにAvroスキーマを定義する必要があり、その要素は標準のPythonオブジェクトになります。例えば:
schema = AvroSchema.parse_string("""
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
}
""")
env = StreamExecutionEnvironment.get_execution_environment()
ds = env.create_input(AvroInputFormat(AVRO_FILE_PATH, schema))
def json_dumps(record):
import json
return json.dumps(record)
ds.map(json_dumps).print()