一般的なロード/保存 機能
もっとも簡単な形式では、全てのオペレータのためにデフォルトのデータソース (spark.sql.sources.default
で他のものが指定されていない場合はparquet
) が使われるでしょう。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
手動でのオプションの指定
データソースに渡したいどのような特別のオプションと一緒にデータソースを手動で指定することもできます。データソースは完全修飾名(例えば、org.apache.spark.sql.parquet
)で指定されますが、ビルトインのソースの場合はショート名 (json
, parquet
, jdbc
, orc
, libsvm
, csv
, text
)を使うこともできます。どのようなデータソースからロードされたデータフレームもこの構文を使って他のタイプに変換することができます。
組み込みのソースの使用可能なオプションについては、API ドキュメントを参照してください。例えば、org.apache.spark.sql.DataFrameReader
および org.apache.spark.sql.DataFrameWriter
。ここに記載されているオプションは Scala Spark API 以外(例えば PySpark)にも適用できるはずです。その他の形式については、特定の形式の API ドキュメントを参照してください。
JSONファイルをロードするには、以下を使うことができます:
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
CSVファイルをロードするには、以下を使うことができます:
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
df = spark.read.load("examples/src/main/resources/people.csv",
format="csv", sep=";", inferSchema="true", header="true")
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
書き込み操作には特別なオプションを使うこともできます。例えば、ORCデータソースのためにbloomフィルタと辞書エンコーディングを制御することができます。以下の ORC の例は、bloom フィルタを作成し、favorite_color
に対してのみ辞書エンコーディングを使います。For Parquet, there exists parquet.bloom.filter.enabled
and parquet.enable.dictionary
, too. To find more detailed information about the extra ORC/Parquet options, visit the official Apache ORC / Parquet websites.
ORC data source:
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Parquet data source:
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
ファイル上のSQLを直接実行
ファイルをデータフレームにロードし、それに質問するためにread APIを使う代わりに、SQLを使ってファイルに直接質問することもできます。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
セーブモード
セーブ操作は任意にSaveMode
を取ることができます。これは既存のデータがあった場合にどう扱うかを指定します。これらのセーブモードがどのようなロックも使わないこと、およびアトミックでは無いことを理解しておくことは重要です。更に、Overwrite
を実施する場合、新しいデータを書き込む前にデータは削除されるでしょう。
Scala/Java | Any Language | 意味 |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" or "errorifexists" (default) |
データフレームをデータソースに保存する場合、もしデータが既に存在する場合は例外が投げられるでしょう。 |
SaveMode.Append |
"append" |
データフレームをデータソースに保存する場合、もし データ/テーブル が既に存在する場合は、データフレームの内容は既存のデータに追記されるでしょう。 |
SaveMode.Overwrite |
"overwrite" |
overwrite モードは、データフレームをデータソースに保存する場合に、もし データ/テーブル が存在する場合は既存のデータがデータフレームの内容によって上書きされるだろうことを意味します。 |
SaveMode.Ignore |
"ignore" |
ignore モードは、データフレームをデータソースに保存する場合に、もし データ が存在する場合はセーブ操作によってデータフレームの内容が保存されず、既存のデータが変更されないだろうことを意味します。これはSQLでの CREATE TABLE IF NOT EXISTS に似ています。
|
永続テーブルへの保存
DataFrames
はsaveAsTable
コマンドを使ってHiveのメタストアの中に永続テーブルとして保存することもできます。既存のHive配備はこの機能を使う必要が無いことに注意してください。Sparkは(Derbyを使って)デフォルトのローカルのHiveメタストアを作るでしょう。createOrReplaceTempView
と異なり、saveAsTable
はデータフレームの内容を具体化し、Hiveメタストア内のデータへのポインタを生成するでしょう。同じmetastoreに接続を続ける限り、永続的なテーブルはSparkプログラムが再起動した後もまだ存在しているでしょう。永続的なテーブルのためのデータフレームはSparkSession
上でテーブル名を使って table
メソッドを呼ぶことで生成することができます。
例えば、テキスト、parquet、jsonなどのファイルベースのデータソースについては、path
オプション、例えばdf.write.option("path", "/some/path").saveAsTable("t")
を使って独自のテーブルパスを指定することができます。テーブルが削除された場合、独自のテーブルパスは削除されず、テーブルのデータはまだあるでしょう。独自のテーブルパスが指定されない場合は、Sparkはデータをウェアハウスのディレクトリの下のデフォルトのテーブルパスに書き込むでしょう。テーブルが削除される時、デフォルトのテーブルパスも削除されるでしょう。
Spark 2.1 から、永続データソーステーブルはHive metastoreに格納されるパーティション毎のmetadataを持ちます。これにより幾つかの恩恵があります:
- metastoreはクエリのための必要なパーティションだけを返すことができるため、テーブルへの最初のクエリで全てのパーティションを見つけることはもう必要ではありません。
ALTER TABLE PARTITION ... SET LOCATION
のようなHive DDLが今ではDatasource APIを使って作成されたテーブルに利用可能であることを意味します。
(path
オプションを持つ)外部データソーステーブルを作る時に、パーティション情報はデフォルトでは集められません。metastor内でパーティション情報を同期するために、MSCK REPAIR TABLE
を起動することができます。
バケット化、ソートおよびパーティショニング
ファイルベースのデータソースについては、出力をバケット化およびソート、あるいは分割することができます。バケット化とソートは永続性のあるテーブルのみに適用可能です:
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
一方、データセットAPIを使う場合は、save
とsaveAsTable
の両方でパーティショニングを使うことができます。
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
1つのテーブルについて分割とバケット化の両方を使うことが可能です:
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
usersDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed");
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))
partitionBy
は Partition Discovery の章で説明されるようにディレクトリの構造を作成します。従って、カーディナリティが高いカラムへは適用に制限があります。それとは対照的に、bucketBy
は固定数のバケットに渡ってデータを分散し、ユニークな値に制限が無い場合に使うことができます。