一般的なロード/保存 機能

もっとも簡単な形式では、全てのオペレータのためにデフォルトのデータソース (spark.sql.sources.defaultで他のものが指定されていない場合はparquet ) が使われるでしょう。

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

手動でのオプションの指定

データソースに渡したいどのような特別のオプションと一緒にデータソースを手動で指定することもできます。データソースは完全修飾名(例えば、org.apache.spark.sql.parquet)で指定されますが、ビルトインのソースの場合はショート名 (json, parquet, jdbc, orc, libsvm, csv, text)を使うこともできます。どのようなデータソースからロードされたデータフレームもこの構文を使って他のタイプに変換することができます。

組み込みのソースの使用可能なオプションについては、API ドキュメントを参照してください。例えば、org.apache.spark.sql.DataFrameReader および org.apache.spark.sql.DataFrameWriter。ここに記載されているオプションは Scala Spark API 以外(例えば PySpark)にも適用できるはずです。その他の形式については、特定の形式の API ドキュメントを参照してください。

JSONファイルをロードするには、以下を使うことができます:

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

CSVファイルをロードするには、以下を使うことができます:

val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
Dataset<Row> peopleDFCsv = spark.read().format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.load("examples/src/main/resources/people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

書き込み操作には特別なオプションを使うこともできます。例えば、ORCデータソースのためにbloomフィルタと辞書エンコーディングを制御することができます。以下の ORC の例は、bloom フィルタを作成し、favorite_color に対してのみ辞書エンコーディングを使います。For Parquet, there exists parquet.bloom.filter.enabled and parquet.enable.dictionary, too. To find more detailed information about the extra ORC/Parquet options, visit the official Apache ORC / Parquet websites.

ORC data source:

usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
usersDF.write().format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .option("orc.column.encoding.direct", "name")
    .save("users_with_options.orc"))
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING ORC
OPTIONS (
  orc.bloom.filter.columns 'favorite_color',
  orc.dictionary.key.threshold '1.0',
  orc.column.encoding.direct 'name'
)

Parquet data source:

usersDF.write.format("parquet")
  .option("parquet.bloom.filter.enabled#favorite_color", "true")
  .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
  .option("parquet.enable.dictionary", "true")
  .option("parquet.page.write-checksum.enabled", "false")
  .save("users_with_options.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
usersDF.write().format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet"))
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
OPTIONS (
  `parquet.bloom.filter.enabled#favorite_color` true,
  `parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
  parquet.enable.dictionary true,
  parquet.page.write-checksum.enabled true
)

ファイル上のSQLを直接実行

ファイルをデータフレームにロードし、それに質問するためにread APIを使う代わりに、SQLを使ってファイルに直接質問することもできます。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

セーブモード

セーブ操作は任意にSaveModeを取ることができます。これは既存のデータがあった場合にどう扱うかを指定します。これらのセーブモードがどのようなロックも使わないこと、およびアトミックでは無いことを理解しておくことは重要です。更に、Overwriteを実施する場合、新しいデータを書き込む前にデータは削除されるでしょう。

Scala/JavaAny Language意味
SaveMode.ErrorIfExists (default) "error" or "errorifexists" (default) データフレームをデータソースに保存する場合、もしデータが既に存在する場合は例外が投げられるでしょう。
SaveMode.Append "append" データフレームをデータソースに保存する場合、もし データ/テーブル が既に存在する場合は、データフレームの内容は既存のデータに追記されるでしょう。
SaveMode.Overwrite "overwrite" overwrite モードは、データフレームをデータソースに保存する場合に、もし データ/テーブル が存在する場合は既存のデータがデータフレームの内容によって上書きされるだろうことを意味します。
SaveMode.Ignore "ignore" ignore モードは、データフレームをデータソースに保存する場合に、もし データ が存在する場合はセーブ操作によってデータフレームの内容が保存されず、既存のデータが変更されないだろうことを意味します。これはSQLでの CREATE TABLE IF NOT EXISTS に似ています。

永続テーブルへの保存

DataFramessaveAsTableコマンドを使ってHiveのメタストアの中に永続テーブルとして保存することもできます。既存のHive配備はこの機能を使う必要が無いことに注意してください。Sparkは(Derbyを使って)デフォルトのローカルのHiveメタストアを作るでしょう。createOrReplaceTempView と異なり、saveAsTable はデータフレームの内容を具体化し、Hiveメタストア内のデータへのポインタを生成するでしょう。同じmetastoreに接続を続ける限り、永続的なテーブルはSparkプログラムが再起動した後もまだ存在しているでしょう。永続的なテーブルのためのデータフレームはSparkSession上でテーブル名を使って table メソッドを呼ぶことで生成することができます。

例えば、テキスト、parquet、jsonなどのファイルベースのデータソースについては、path オプション、例えばdf.write.option("path", "/some/path").saveAsTable("t")を使って独自のテーブルパスを指定することができます。テーブルが削除された場合、独自のテーブルパスは削除されず、テーブルのデータはまだあるでしょう。独自のテーブルパスが指定されない場合は、Sparkはデータをウェアハウスのディレクトリの下のデフォルトのテーブルパスに書き込むでしょう。テーブルが削除される時、デフォルトのテーブルパスも削除されるでしょう。

Spark 2.1 から、永続データソーステーブルはHive metastoreに格納されるパーティション毎のmetadataを持ちます。これにより幾つかの恩恵があります:

(path オプションを持つ)外部データソーステーブルを作る時に、パーティション情報はデフォルトでは集められません。metastor内でパーティション情報を同期するために、MSCK REPAIR TABLEを起動することができます。

バケット化、ソートおよびパーティショニング

ファイルベースのデータソースについては、出力をバケット化およびソート、あるいは分割することができます。バケット化とソートは永続性のあるテーブルのみに適用可能です:

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
CREATE TABLE users_bucketed_by_name(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;

一方、データセットAPIを使う場合は、savesaveAsTableの両方でパーティショニングを使うことができます。

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
CREATE TABLE users_by_favorite_color(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);

1つのテーブルについて分割とバケット化の両方を使うことが可能です:

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
usersDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("users_partitioned_bucketed"))
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
CREATE TABLE users_bucketed_and_partitioned(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;

partitionByPartition Discovery の章で説明されるようにディレクトリの構造を作成します。従って、カーディナリティが高いカラムへは適用に制限があります。それとは対照的に、bucketBy は固定数のバケットに渡ってデータを分散し、ユニークな値に制限が無い場合に使うことができます。

TOP
inserted by FC2 system