Apache Avro データソース ガイド
- 配備
- ロードと保存機能
- to_avro() と from_avro()
- データソース オプション
- 設定
- Databricks spark-avro との互換性
- Avro -> Spark SQL 変換のためにサポートされる型
- Spark SQL -> Avro 変換のためにサポートされる型
Spark 2.4 リリースから、Spark SQL はApache Avroデータを読み込みおよび書き込みするための組み込みのサポートを提供します。
配備
spark-avro
モジュールは外部にあり、spark-submit
あるいは spark-shell
にデフォルトで含まれません。
Sparkアプリケーションと同様に、アプリケーションを起動するためにspark-submit
が使われます。spark-avro_2.12
とその依存物は--packages
を使って直接 spark-submit
に追加されるかも知れません。以下のように。
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.2.1 ...
spark-shell
での実験的なものとして、org.apache.spark:spark-avro_2.12
とその依存物に直接追加するために--packages
を使うこともできます。
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1 ...
外部的な依存を持つアプリケーションをサブミットすることについての詳細は、アプリケーションのサブミット ガイド を見てください。
ロードと保存機能
spark-avro
モジュールは外部にあるため、DataFrameReader
あるいは DataFrameWriter
に.avro
APIはありません。
Avro形式にデータをロード/保存するには、avro
としてデータソース オプションformat
を指定する必要があります (あるいは org.apache.spark.sql.avro
)。
to_avro() と from_avro()
Avro パッケージはカラムをAvro形式のバイナリとしてエンコードするための関数to_avro
と、Avroバイナリデータをカラムにデコードするfrom_avro()
を提供します。両方の関数は1つのカラムを他のカラムに変換し、入力/出力 SQLデータ型はcomplex型あるいはプリミティブ型が可能です。
カラムとしてAvroレコードを使うことは、Kafkaのようなストリーミングソースからの読み込みあるいは書き込み時に便利です。各Kafka キー-値 レコードは、Kafkaへの取り込みタイムスタンプ、Kafkaでのオフセットなどのような幾らかのメタデータが増えるでしょう。
- データを含む “value” フィールドがAvroにある場合、データを抽出、その質を高め、それを片付け、Kafkaへのダウンストリームに再びプッシュ、あるいはそれをファイルに書き込むために、
from_avro()
を使うことができます。 to_avro()
は構造をAvroレコードに変換するために使うことができます。このメソッドは特にKafkaにデータを書き込む時に複数のカラムを1つのカラムに再エンコードしたい時に便利です。
データソース オプション
Avro のデータソースオプションは以下のようにして設定することができます:
DataFrameReader
またはDataFrameWriter
の.option
メソッド。from_avro
巻数のoptions
。
プロパティ名 | デフォルト | 意味 | スコープ | これ以降のバージョンから |
---|---|---|---|---|
avroSchema |
None | ユーザによって提供されるJSON形式の任意のAvroスキーマ。
|
読み取り、書き込み、関数 from_avro |
2.4.0 |
recordName |
topLevelRecord | 書き込みの結果のトップレベルのレコード名。これはAvro specで必要とされます。 | write | 2.4.0 |
recordNamespace |
"" | 書き込みの結果のレコードの名前空間。 | write | 2.4.0 |
ignoreExtension |
true | オプションは読み込みでの.avro 拡張子無しのファイルの無視を生業します。もしオプションが有効な場合、( .avro 拡張子の有りおよび無しの)全てのファイルがロードされます。このオプションは非推奨で、将来のリリースでは削除されるでしょう。ファイル名をフィルタするには、一般的なデータソースオプション pathGlobFilter を使ってください。 |
read | 2.4.0 |
圧縮 |
snappy | compression オプションにより、書き込みで使われる圧縮コーディックを指定することができます。現在のところサポートされるコーディックは uncompressed , snappy , deflate , bzip2 , xz およびzstandard です。もしオプションが設定されない場合、構成 spark.sql.avro.compression.codec 設定が考慮されます。 |
write | 2.4.0 |
モード |
FAILFAST | mode オプションにより、関数 from_avro のパースモードを指定することができます。現在サポートされるモードは:
|
function from_avro |
2.4.0 |
datetimeRebaseMode |
(spark.sql.avro.datetimeRebaseModeInRead 設定の値) |
datetimeRebaseMode オプションにより、date 、timestamp-micros 、timestamp-millis (ジュリアン暦から先発グレゴリオ暦までの論理型)の値のリベースモードを指定できます。現在サポートされるモードは:
|
read and function from_avro |
3.2.0 |
positionalFieldMatching |
false | これを`avroSchema`オプションと組み合わせて使うと、提供されたAvroスキーマのフィールドをSQLスキーマのフィールドと一致させるための動作を調整できます。デフォルトでは、照合はフィールド名を使って実行され、位置は無視されます。オプションが"true"に設定されている場合、一致はフィールドの位置に基づいて行われます。 | 読み込みと書き込み | 3.2.0 |
設定
Avroの設定はSparkSessionの setConf
メソッドを使うか、SQLを使ってSET key=value
を実行することで行うことができます。
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true |
true に設定された場合、データソース プロバイダ com.databricks.spark.avro は下位互換性のために組み込みの外部Avroデータソース モジュールにマップされます。注意: SQL設定はSpark3.2で非推奨になり、将来削除されるかもしれません。 |
2.4.0 |
spark.sql.avro.compression.codec | snappy | AVROファイルの書き込みで使われる圧縮コーディック。サポートされるコーディック: uncompressed, deflate, snappy, bzip2 および xz。デフォルトのコーディックは snappy です。 | 2.4.0 |
spark.sql.avro.deflate.level | -1 | AVROファイルの書き込みで使われるdeflateコーディック圧縮レベル。有効な値は-1または1から10の範囲でなければなりません。デフォルト値は-1で、現在の実装での6レベルに対応します。 | 2.4.0 |
spark.sql.avro.datetimeRebaseModeInRead | EXCEPTION |
The rebasing mode for the values of the date , timestamp-micros , timestamp-millis logical types from the Julian to Proleptic Gregorian calendar:
|
3.0.0 |
spark.sql.avro.datetimeRebaseModeInWrite | EXCEPTION |
The rebasing mode for the values of the date , timestamp-micros , timestamp-millis logical types from the Proleptic Gregorian to Julian calendar:
|
3.0.0 |
Databricks spark-avro との互換性
このAvroデータソースモジュールはもともとDatabricksのオープンソースレポジトリ spark-avroから来ていて互換性があります。
デフォルトではSQL設定spark.sql.legacy.replaceDatabricksSparkAvro.enabled
を有効にすると、データソースプロバイダ com.databricks.spark.avro
がこの組み込みのAvroモジュールにマップされます。カタログメタデータストア内のcom.databricks.spark.avro
としてのProvider
プロパティで作成されたSparkテーブルについては、もしこの組み込みのAvroモジュールを使っている場合はこれらのテーブルをロードするためにマッピングが不可欠です。
Databricksのspark-avroでは、暗黙的なクラス AvroDataFrameWriter
と AvroDataFrameReader
がショートカット関数 .avro()
のために作成されたことに注意してください。この組み込みの外部モジュールの中で、両方の暗黙的なクラスは削除されました。DataFrameWriter
で .format("avro")
を使うか、代わりにDataFrameReader
を使ってください。これらはきちんとしていて十分良い筈です。
もしspark-avro
jar ファイルの独自のビルドを使うことを好む場合、単純に設定spark.sql.legacy.replaceDatabricksSparkAvro.enabled
を無効にし、アプリケーションを配備する時に--jars
オプションを使うことができます。詳細はアプリケーションのサブミット ガイド内の上級の依存管理の章を読んでください。
Avro -> Spark SQL 変換のためにサポートされる型
現在のところSparkはAvroのレコードの全てのプリミティブ型 と complex型の読み込みをサポートします。
Avroの型 | Spark SQLの型 |
---|---|
boolean | BooleanType |
dint | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
文字列 | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | 以下を見てください |
上でリスト化された型に加えて、union
型の読み込みをサポートします。以下の3つの型は基本的な union
型と見なされます:
union(int, long)
はLongTypeにマップされるでしょう。union(float, double)
はDoubleTypeにマップされるでしょう。union(something, null)
。ここでsomethingはサポートされているAvro型です。これは、nullableをtrueに設定して、somethingと同じSpark SQL型にマップされます。他の全てのunion型はcomplexと見なされます。それらはunionのメンバーに従ってフィールド名が member0, member1 などであるStructTypeにマップされるでしょう。これはAvroとParquet間での変換時の挙動と一致します。
次のAvroの論理型の読み込みもサポートします:
Avroの論理型 | Avroの型 | Spark SQLの型 |
---|---|---|
date | dint | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
現時点では、Avroファイル内にある docs, aliases および他のプロパティを無視します。
Spark SQL -> Avro 変換のためにサポートされる型
Sparkは全てのSpark SQL型のAvroへの書き込みをサポートします。ほとんどの型については、Spark型からAvro型へのマッピングは単純です (例えば、IntegerTypeはintに変換されます); しかし、以下に挙げる幾つかの特別な場合があります:
Spark SQLの型 | Avroの型 | Avroの論理型 |
---|---|---|
ByteType | dint | |
ShortType | dint | |
BinaryType | bytes | |
DateType | dint | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
Spark SQL型が他のAvro型に変換できるように、オプションavroSchema
を使ってAvroスキーマの出力全体を指定することもできます。以下の変換はデフォルトでは適用されず、ユーザ定義のAvroスキーマを必要とします:
Spark SQLの型 | Avroの型 | Avroの論理型 |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |