Apache Avro データソース ガイド

Spark 2.4 リリースから、Spark SQL はApache Avroデータを読み込みおよび書き込みするための組み込みのサポートを提供します。

配備

spark-avro モジュールは外部にあり、spark-submit あるいは spark-shell にデフォルトで含まれません。

Sparkアプリケーションと同様に、アプリケーションを起動するためにspark-submit が使われます。spark-avro_2.12 とその依存物は--packagesを使って直接 spark-submit に追加されるかも知れません。以下のように。

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.2.1 ...

spark-shellでの実験的なものとして、org.apache.spark:spark-avro_2.12とその依存物に直接追加するために--packagesを使うこともできます。

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1 ...

外部的な依存を持つアプリケーションをサブミットすることについての詳細は、アプリケーションのサブミット ガイド を見てください。

ロードと保存機能

spark-avro モジュールは外部にあるため、DataFrameReader あるいは DataFrameWriter.avroAPIはありません。

Avro形式にデータをロード/保存するには、avroとしてデータソース オプションformatを指定する必要があります (あるいは org.apache.spark.sql.avro)。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro() と from_avro()

Avro パッケージはカラムをAvro形式のバイナリとしてエンコードするための関数to_avroと、Avroバイナリデータをカラムにデコードするfrom_avro()を提供します。両方の関数は1つのカラムを他のカラムに変換し、入力/出力 SQLデータ型はcomplex型あるいはプリミティブ型が可能です。

カラムとしてAvroレコードを使うことは、Kafkaのようなストリーミングソースからの読み込みあるいは書き込み時に便利です。各Kafka キー-値 レコードは、Kafkaへの取り込みタイムスタンプ、Kafkaでのオフセットなどのような幾らかのメタデータが増えるでしょう。

import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"user")
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as $"value")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;

// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .select(to_avro(col("user.name")).as("value"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
  filter(
    select(df, alias(from_avro("value", jsonFormatSchema), "user")),
    column("user.favorite_color") == "red"
  ),
  alias(to_avro("user.name"), "value")
)

write.stream(
  output,
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  topic = "topic2"
)

データソース オプション

Avro のデータソースオプションは以下のようにして設定することができます:

プロパティ名デフォルト意味スコープこれ以降のバージョンから
avroSchema None ユーザによって提供されるJSON形式の任意のAvroスキーマ。
  • Avro を読み取る時、このオプションは、互換性はあるが実際の Avro スキーマーとは異なる、進化したスキーマを設定することができます。逆シリアル化スキーマは進化したスキーマと一致します。例えば、デフォルト値を持つ1つの追加の列を含む進化したスキーマを設定する場合、Spark の読み取り結果は新しい列も含みます。
  • Avro を書き込む時、予想される出力 Avro スキーマが Spark によって変換されたスキーマと一致しない場合、このオプションを設定することができます。例えば、1つの列の期待されるスキーマは、デフォルトの変換スキーマである "string" 型ではなく "enum" 型です。
読み取り、書き込み、関数 from_avro 2.4.0
recordName topLevelRecord 書き込みの結果のトップレベルのレコード名。これはAvro specで必要とされます。 write 2.4.0
recordNamespace "" 書き込みの結果のレコードの名前空間。 write 2.4.0
ignoreExtension true オプションは読み込みでの.avro 拡張子無しのファイルの無視を生業します。
もしオプションが有効な場合、(.avro拡張子の有りおよび無しの)全てのファイルがロードされます。
このオプションは非推奨で、将来のリリースでは削除されるでしょう。ファイル名をフィルタするには、一般的なデータソースオプション pathGlobFilter を使ってください。
read 2.4.0
圧縮 snappy compression オプションにより、書き込みで使われる圧縮コーディックを指定することができます。
現在のところサポートされるコーディックはuncompressed, snappy, deflate, bzip2, xzおよびzstandardです。もしオプションが設定されない場合、構成 spark.sql.avro.compression.codec 設定が考慮されます。
write 2.4.0
モード FAILFAST mode オプションにより、関数 from_avro のパースモードを指定することができます。
現在サポートされるモードは:
  • FAILFAST: 破損したレコードの処理で例外を投げます。
  • PERMISSIVE: 破損したレコードは null 結果として処理されます。従って、データスキーマは完全に null にできるように強制されます。これはユーザが提供したものとは異なるかもしれません。
function from_avro 2.4.0
datetimeRebaseMode (spark.sql.avro.datetimeRebaseModeInRead 設定の値) datetimeRebaseMode オプションにより、datetimestamp-microstimestamp-millis(ジュリアン暦から先発グレゴリオ暦までの論理型)の値のリベースモードを指定できます。
現在サポートされるモードは:
  • EXCEPTION: 2つのカレンダー間で曖昧な古代の日付/タイムスタンプの読み取りに失敗。
  • CORRECTED: リベースせずに日付/タイムスタンプをロード。
  • LEGACY: ジュリアン暦から先発グレゴリオ暦への古代の日付/タイムスタンプのリベースを実行。
read and function from_avro 3.2.0
positionalFieldMatching false これを`avroSchema`オプションと組み合わせて使うと、提供されたAvroスキーマのフィールドをSQLスキーマのフィールドと一致させるための動作を調整できます。デフォルトでは、照合はフィールド名を使って実行され、位置は無視されます。オプションが"true"に設定されている場合、一致はフィールドの位置に基づいて行われます。 読み込みと書き込み 3.2.0

設定

Avroの設定はSparkSessionの setConf メソッドを使うか、SQLを使ってSET key=value を実行することで行うことができます。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true true に設定された場合、データソース プロバイダ com.databricks.spark.avro は下位互換性のために組み込みの外部Avroデータソース モジュールにマップされます。
注意: SQL設定はSpark3.2で非推奨になり、将来削除されるかもしれません。
2.4.0
spark.sql.avro.compression.codec snappy AVROファイルの書き込みで使われる圧縮コーディック。サポートされるコーディック: uncompressed, deflate, snappy, bzip2 および xz。デフォルトのコーディックは snappy です。 2.4.0
spark.sql.avro.deflate.level -1 AVROファイルの書き込みで使われるdeflateコーディック圧縮レベル。有効な値は-1または1から10の範囲でなければなりません。デフォルト値は-1で、現在の実装での6レベルに対応します。 2.4.0
spark.sql.avro.datetimeRebaseModeInRead EXCEPTION The rebasing mode for the values of the date, timestamp-micros, timestamp-millis logical types from the Julian to Proleptic Gregorian calendar:
  • EXCEPTION: Sparkは、2つのカレンダー間で曖昧な古代の日付/タイムスタンプを検出すると、読み込みに失敗します。
  • CORRECTED: Sparkはリベースを実行せず、日付/タイムスタンプをそのまま読み込みます。
  • LEGACY: Sparkは、Avroファイルから読み込む時に、日付/タイムスタンプをレガシーハイブリッド(ユリウス暦+グレゴリオ暦)カレンダーから先発グレゴリオ暦にリベースします。
この設定は、Avroファイルのライター情報(Spark、Hiveなど)が不明な場合のみ有効です。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite EXCEPTION The rebasing mode for the values of the date, timestamp-micros, timestamp-millis logical types from the Proleptic Gregorian to Julian calendar:
  • EXCEPTION: Sparkは、2つのカレンダー間で曖昧な古代の日付/タイムスタンプを検出すると、書き込みに失敗します。
  • CORRECTED: Sparkはリベースを実行せず、日付/タイムスタンプをそのまま書き込みます。
  • LEGACY: Sparkは、Avroファイルを書き込む時に、日付/タイムスタンプを先発グレゴリオ暦からレガシーハイブリッド(ユリウス暦+グレゴリオ暦)カレンダーにリベースします。
3.0.0

Databricks spark-avro との互換性

このAvroデータソースモジュールはもともとDatabricksのオープンソースレポジトリ spark-avroから来ていて互換性があります。

デフォルトではSQL設定spark.sql.legacy.replaceDatabricksSparkAvro.enabled を有効にすると、データソースプロバイダ com.databricks.spark.avro がこの組み込みのAvroモジュールにマップされます。カタログメタデータストア内のcom.databricks.spark.avroとしてのProvider プロパティで作成されたSparkテーブルについては、もしこの組み込みのAvroモジュールを使っている場合はこれらのテーブルをロードするためにマッピングが不可欠です。

Databricksのspark-avroでは、暗黙的なクラス AvroDataFrameWriterAvroDataFrameReader がショートカット関数 .avro()のために作成されたことに注意してください。この組み込みの外部モジュールの中で、両方の暗黙的なクラスは削除されました。DataFrameWriter.format("avro")を使うか、代わりにDataFrameReader を使ってください。これらはきちんとしていて十分良い筈です。

もしspark-avro jar ファイルの独自のビルドを使うことを好む場合、単純に設定spark.sql.legacy.replaceDatabricksSparkAvro.enabledを無効にし、アプリケーションを配備する時に--jarsオプションを使うことができます。詳細はアプリケーションのサブミット ガイド内の上級の依存管理の章を読んでください。

Avro -> Spark SQL 変換のためにサポートされる型

現在のところSparkはAvroのレコードの全てのプリミティブ型complex型の読み込みをサポートします。

Avroの型Spark SQLの型
boolean BooleanType
dint IntegerType
long LongType
float FloatType
double DoubleType
文字列 StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union 以下を見てください

上でリスト化された型に加えて、union型の読み込みをサポートします。以下の3つの型は基本的な union 型と見なされます:

  1. union(int, long) はLongTypeにマップされるでしょう。
  2. union(float, double) はDoubleTypeにマップされるでしょう。
  3. union(something, null)。ここでsomethingはサポートされているAvro型です。これは、nullableをtrueに設定して、somethingと同じSpark SQL型にマップされます。他の全てのunion型はcomplexと見なされます。それらはunionのメンバーに従ってフィールド名が member0, member1 などであるStructTypeにマップされるでしょう。これはAvroとParquet間での変換時の挙動と一致します。

次のAvroの論理型の読み込みもサポートします:

Avroの論理型Avroの型Spark SQLの型
date dint DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

現時点では、Avroファイル内にある docs, aliases および他のプロパティを無視します。

Spark SQL -> Avro 変換のためにサポートされる型

Sparkは全てのSpark SQL型のAvroへの書き込みをサポートします。ほとんどの型については、Spark型からAvro型へのマッピングは単純です (例えば、IntegerTypeはintに変換されます); しかし、以下に挙げる幾つかの特別な場合があります:

Spark SQLの型Avroの型Avroの論理型
ByteType dint
ShortType dint
BinaryType bytes
DateType dint date
TimestampType long timestamp-micros
DecimalType fixed decimal

Spark SQL型が他のAvro型に変換できるように、オプションavroSchemaを使ってAvroスキーマの出力全体を指定することもできます。以下の変換はデフォルトでは適用されず、ユーザ定義のAvroスキーマを必要とします:

Spark SQLの型Avroの型Avroの論理型
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal
TOP
inserted by FC2 system