Spark SQL, データフレーム および データセット ガイド

概要

Spark SQL は構造化されたデータの処理のためのSparkモジュールです。基本のRDD API とは異なり、Spark SQLによって提供されるインタフェースは、Sparkにデータと実行されている計算の両方についての構造のもっと多くの情報を提供します。内部的には、Spark SQLはこの特別な上方を特別な最適化を実施するために使用します。SQLおよびデータセットAPIを含むSpark SQLとやり取りをする幾つかの方法があります。計算を表現するためにどのAPI/言語を使っているかに関係なく、結果を計算する時に同じ実行エンジンが使われます。この単一化は、開発者が指定された変換を表現するもっとも自然な方法に基づく異なるAPI間を容易に行き来できることを意味します。

このページの全ての例はSpark配布物に含まれるサンプルデータを使用し、spark-shell, pyspark シェルあるいは sparkR シェルの中で実行することができます。

SQL

Spark SQLの一つの使い方は、SQLクエリを実行することです。Spark SQLは既存のHiveインストレーションからデータを読み込むために使うこともできます。この起動をどう設定するかの詳細については、 Hive テーブル の章を参照してください。他のプログラミング言語の中でSQLを実行する場合、結果はデータセット/データフレームとして返されるでしょう。command-line あるいは JDBC/ODBCを使ってSQLインタフェースとやり取りすることもできます。

データセットとデータフレーム

データセットはデータの分散型コレクションです。データセットはSparkSQLの最適化実行エンジンを使ってRDDの利点(強力なタイプ、強力なラムダ関数を使用する能力)を提供するSpark 1.6で追加された新しいインタフェースです。データセットはJVMオブジェクトから構築することができ、関数的な変形(map, flatMap, filterなど)を使って操作することができます。データセットAPIは Scala および Javaで利用可能です。PythonはデータセットAPIのためのサポートを持ちません。しかし、Pythonの動的な性質により、データセットAPIの多くの利点が既に利用可能です(たとえば自然に名前row.columnNameによって行のフィールドにアクセスすることができます)。Rの場合も似ています。

データフレームは、名前付きの列に整理されたデータセットです。リレーショナルデータベースでのテーブル、あるいはR/Pythonでのデータフレームと概念的に等価ですが、裏ではもっと最適化されています。データフレームは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRDD。データフレームAPIはScala, Java, Python および R で利用可能です。Scalaと Javaでは、データフレームは Rowのデータセットによって表現されます。Scala APIでは、DataFrame は単純にDataset[Row]のタイプのエイリアスです。一方で、Java APIでは、ユーザはDataFrameを表現するためにDataset<Row>を使う必要があります。

このドキュメントの至る所で、データフレームとしてRowのScala/Javaデータセットをしばしば参照するつもりです。

開始

開始点: SparkSession

Sparkの全ての機能へのエントリーポイントはSQLSession クラスです。基本的な SparkSessionを生成するには、単にSparkSession.builder()を使います:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL Example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。

Sparkの全ての機能へのエントリーポイントはSQLSession クラスです。基本的な SparkSessionを生成するには、単にSparkSession.builder()を使います:

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL Example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" で見つかります。

Sparkの全ての機能へのエントリーポイントはSQLSession クラスです。基本的な SparkSessionを生成するには、単にSparkSession.builderを使います:

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql.py" で見つかります。

Sparkの全ての機能へのエントリーポイントはSQLSession クラスです。基本のSparkSessionを初期化するには、単にsparkR.session()を呼び出します:

sparkR.session(appName = "MyApp", sparkConfig = list(spark.executor.memory = "1g"))
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

初めて起動する時には、sparkR.session() がグローバルなSparkSession シングルトン インスタンスを初期化し、続く起動で常にこのインスタンスへのリファレンスを返すことに注意してください。この方法は、ユーザは1度だけSparkSessionを初期化する必要があり、read.df のSparkRの関数はこのグローバルインスタンスに暗黙的にアクセスすることができ、ユーザはSparkSession インスタンスをあちこちに渡す必要がありません。

Spark 2.0でのSparkSessionはHiveQLを使ってクエリを書き、Hive UDFにアクセスし、Hiveテーブルからデータを読み込むことができる能力を含むHiveの機能のための組み込みのサポートを提供します。 これらの機能を使うために、既存のHiveのセットアップを持つ必要はありません。

データフレームの生成

SparkSessionを使ってアプリケーションは既存のRDD、Hiveテーブル、あるいはSpark data sourcesからデータフレームを作成することができます。

例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。

SparkSessionを使ってアプリケーションは既存のRDD、Hiveテーブル、あるいはSpark data sourcesからデータフレームを作成することができます。

例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" で見つかります。

SparkSessionを使ってアプリケーションは既存のRDD、Hiveテーブル、あるいはSpark data sourcesからデータフレームを作成することができます。

例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame to stdout
df.show()

SparkSessionを使って、アプリケーションはローカルのRのdata.frame、Hiveテーブル、あるいはSpark データソースからデータフレームを生成することができます。

例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。

df <- read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame
head(df)

# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

無タイプのデータセット操作 (別名、DataFrame操作)

データフレームは ScalaJavaPythonおよびR での構造化データ操作のためのドメイン固有の言語を提供します。

上で述べたように、Spark 2.0では、データフレームはScalaおよびJava APIでのまさに行のデータセットです。これらのオペレーションは、強く型付けされたScala/Javaのデータセットが付属している "型有りの変換"に対して、"型無しの変換"として参照されます。

以下は、データセットを使った構造化データ処理の幾つかの基本的な例を含んでいます:

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。

データセットで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データセットは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" で見つかります。

データセットで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データセットは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

Pythonでは属性(df.age) あるいは、インデックス(df['age'])のどちらかを使ってデータフレームのカラムにアクセスすることができます。データを探索するには前者が便利ですが、後者の形式を使うことをとてもお勧めします。これは将来が保証されており、データフレームクラス上の属性もカラム名を使って破壊しないでしょう。

# spark is an existing SparkSession

# Create the DataFrame
df = spark.read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
df.show()
## age  name
## null Michael
## 30   Andy
## 19   Justin

# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
df.filter(df['age'] > 21).show()
## age name
## 30  Andy

# Count people by age
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1

DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
head(df)
## age  name
## null Michael
## 30   Andy
## 19   Justin

# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
head(select(df, "name"))
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
head(where(df, df$age > 21))
## age name
## 30  Andy

# Count people by age
head(count(groupBy(df, "age")))
## age  count
## null 1
## 19   1
## 30   1
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

プログラム的にSQLクエリを実行

SparkSessionsqlはアプリケーションがSQLクエリをプログラム的に実行することを可能にし、結果を DataFrameとして返します。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。

SparkSessionsqlはアプリケーションがSQLクエリをプログラム的に実行することを可能にし、結果を Dataset<Row>として返します。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" で見つかります。

SparkSessionsqlはアプリケーションがSQLクエリをプログラム的に実行することを可能にし、結果を DataFrameとして返します。

# spark is an existing SparkSession
df = spark.sql("SELECT * FROM table")

sql関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はSparkDataFrameとして返されます。

df <- sql("SELECT * FROM table")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

データセットの生成

データセットはRDDに似ていますが、Javaのシリアライズ化あるいはKryoを使う代わりに、ネットワークを越えて処理あるいは転送するためにオブジェクトをシリアライズ化するために特別なEncoderを使用します。エンコーダーと標準シリアライズ化のどちらもオブジェクトをバイトに変えることに責任を持ちますが、エンコーダーは動的に生成されるコードであり、Sparkがバイトをオブジェクトへデシリアライズ化無しにフィルタリング、ソーティングおよびハッシュ化のような多くのオペレーションを実施することができる形式を使用します。

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer call(Integer value) throws Exception {
    return value + 1;
  }
}, integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" で見つかります。

RDDを使った内部操作

Spark SQL は既存のRDDをデータセットに変換する二つの異なるメソッドを提供します。1つ目のメソッドはオブジェクトの特定のタイプを含むRDDのスキーマを推測するためにリフレクションを使用します。このリフレクションを基礎としたやり方は、Sparkアプリケーションを書いている時にスキーマを既に知っている場合には、結果としてより簡潔でよく動きます。

データセットを生成する2つ目のメソッドは、スキーマを構築し既存のRDDに適用することができるプログラム的なインタフェースです。このメソッドは多少冗長ですが、カラムとそれらのタイプが実行時まで分からない場合にデータセットを構築することができます。

リフレクションを使ったスキーマの推測

Spark SQLのためのScalaインタフェースは、データフレームへのcaseクラスを含むRDDの自動的な変換をサポートします。caseクラスはテーブルのスキーマを定義します。caseクラスへの引数の名前はリフレクションを使って読み込まれ、カラムの名前となります。case クラスは入れ子にすることもでき、SeqあるいはArrayのような複雑なタイプを含むことができます。このRDDは明示的にデータフレームに変換され、テーブルとして登録されます。テーブルは後に続くSQL分の中で使うことができます。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。

Spark SQLはJavaBeansのRDDからデータフレームへの自動的な変換をサポートします。 リフレクションを使って取得されたBeanInfoはテーブルのスキーマを定義します。現在のところ、Spark SQLはMapフィールドを含むJavaBeansをサポートしません。しかし、入れ子になったJavaBeansと、List あるいは Arrayフィールドはサポートされます。Serializableを実装し全てのフィールドについてgetterおよびsetterを持つクラスを生成することで、JavaBeanを生成することができます。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(new Function<String, Person>() {
    @Override
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");
      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));
      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.<String>getAs("name");
  }
}, stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" で見つかります。

Spark SQL はデータタイプを推測して、RowオブジェクトのRDDをデータフレームに変換することができます。行はkwargsとしてkey/valueペアをRowクラスに渡すことで構築することができます。このリストのキーは表のカラム名を定義し、JSONファイル上で実施される推測に似たような、データベース全体を標本化することでタイプを推測します。

# spark is an existing SparkSession.
from pyspark.sql import Row
sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)

プログラム的なスキーマの指定

事前にケースクラスを定義できない場合(例えば、レコードの構造が文字列にエンコードされている。あるいはテキストのデータセットがパースされ、フィールドが異なるユーザには異なって抽出されるかもしれない)、DataFrame は3つのステップでプログラム的に生成することができます。

  1. 元のRDDからのRDDを生成する;
  2. ステップ1で生成したRDD内のの構造に一致するStructTypeによって表現されるスキーマを生成する。
  3. スキーマをSparkSessionによって提供されるcreateDataFrameメソッドを使ってのRDDに適用する。

例えば:

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。

事前にJavaBeanクラスを定義できない場合(例えば、レコードの構造が文字列にエンコードされている。あるいはテキストのデータセットがパースされ、フィールドが異なるユーザには異なって抽出されるかもしれない)、Dataset<Row> は3つのステップでプログラム的に生成することができます。

  1. 元のRDDからのRDDを生成する;
  2. ステップ1で生成したRDD内のの構造に一致するStructTypeによって表現されるスキーマを生成する。
  3. スキーマをSparkSessionによって提供されるcreateDataFrameメソッドを使ってのRDDに適用する。

例えば:

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
  @Override
  public Row call(String record) throws Exception {
    String[] attributes = record.split(",");
    return RowFactory.create(attributes[0], attributes[1].trim());
  }
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" で見つかります。

事前にkwargsの辞書を定義できない場合(例えば、レコードの構造が文字列にエンコードされている。あるいはテキストのデータセットがパースされ、フィールドが異なるユーザには異なって抽出されるかもしれない)、DataFrame は3つのステップでプログラム的に生成することができます。

  1. 元のRDDから組あるいはリストのRDDを生成する;
  2. ステップ1で生成したRDD内の組あるいはリストの構造に一致するStructTypeによって表現されるスキーマを生成する。
  3. スキーマをSparkSessionによって提供されるcreateDataFrameメソッドを使ってRDDに適用します。

例えば:

# Import SparkSession and data types
from pyspark.sql.types import *

# spark is an existing SparkSession.
sc = spark.sparkContext

# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print(name)

データソース

Spark SQL はDataFrameインタフェースを使って様々なデータソース上での操作をサポートします。データフレームはrelational transformationを使って操作することができ、一時viewを生成するために使うことができます。データフレームを一時的なビューとして登録することによりデータにSQLクエリを実行することができます。このセクションではSpark データソースを使ってデータをロードおよび保存する一般的なメソッドを説明し、その後ビルトインのデータソースのために利用可能な特定のオプションについて詳しく調べます。

一般的なロード/保存 機能

In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

手動でのオプションの指定

データソースに渡したいどのような特別のオプションと一緒にデータソースを手動で指定することもできます。データソースは完全修飾名(例えば、org.apache.spark.sql.parquet)で指定されますが、ビルトインのソースの場合はショート名(json, parquet, jdbc)を使うことができます。どのようなデータソースからロードされたデータフレームもこの構文を使って他のタイプに変換することができます。

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

ファイル上のSQLを直接実行

ファイルをデータフレームにロードし、それに質問するためにread APIを使う代わりに、SQLを使ってファイルに直接質問することもできます。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

セーブモード

セーブ操作は任意にSaveModeを取ることができます。これは既存のデータがあった場合にどう扱うかを指定します。これらのセーブモードがどのようなロックも使わないこと、およびアトミックでは無いことを理解しておくことは重要です。更に、Overwriteを実施する場合、新しいデータを書き込む前にデータは削除されるでしょう。

Scala/JavaAny Language意味
SaveMode.ErrorIfExists (default) "error" (default) データフレームをデータソースに保存する場合、もしデータが既に存在する場合は例外が投げられるでしょう。
SaveMode.Append "append" データフレームをデータソースに保存する場合、もし データ/テーブル が既に存在する場合は、データフレームの内容は既存のデータに追記されるでしょう。
SaveMode.Overwrite "overwrite" overwrite モードは、データフレームをデータソースに保存する場合に、もし データ/テーブル が存在する場合は既存のデータがデータフレームの内容によって上書きされるだろうことを意味します。
SaveMode.Ignore "ignore" ignore モードは、データフレームをデータソースに保存する場合に、もし データ が存在する場合はセーブ操作によってデータフレームの内容が保存されず、既存のデータが変更されないだろうことを意味します。これはSQLでの CREATE TABLE IF NOT EXISTS に似ています。

永続テーブルへの保存

DataFramessaveAsTableコマンドを使ってHiveのメタストアの中に永続テーブルとして保存することもできます。既存のHive配備はこの機能を使う必要は無いことに注意してください。Sparkは(Derbyを使って)デフォルトのローカルのHiveメタストアを作るでしょう。createOrReplaceTempView と異なり、saveAsTable はデータフレームの内容を具体化し、Hiveメタストア内のデータへのポインタを生成するでしょう。同じmetastoreに接続を続ける限り、永続的なテーブルはSparkプログラムが再起動した後もまだ存在しているでしょう。永続的なテーブルのためのデータフレームはSparkSession上でテーブル名を使って table メソッドを呼ぶことで生成することができます。

デフォルトでは、saveAsTable が"managed table"を生成するでしょう。つまり、データの場所がmetastoreによって制御されるでしょう。managed tables はテーブルが削除される場合に自動的にそれらのデータも削除するでしょう。

Parquet ファイル

Parquet は多くのほかのデータ処理システムでサポートされているコラム状のフォーマットです。Spark SQL は自動的に元のデータのスキーマを保持するParquetファイルの読み書きの両方のサポートを提供します。Parquetファイルを書く場合、互換性の理由から全てのカラムは自動的にnullが可能なように変換されます。

プログラム的なデータのロード

上の例から以下のようにデータを使用します:

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
// import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
# spark from the previous example is used in this example.

schemaPeople # The DataFrame from the previous example.

# DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write.parquet("people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile");
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)
df <- read.df("examples/src/main/resources/people.json", "json")

# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin

# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
  cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

パーティションの発見

テーブルのパーティションはHiveのようなシステムで使われる一般的な最適化の方法です。パーティションされたテーブルの中でデータは各パーティションディレクトリのパスでエンコードされたパーティションカラム値を使って、通常異なるディレクトリに保持されます。Parquetデータソースはこれで自動的にパーティションの情報を発見および推測することができます。例えば、全ての以前に使用したパーティションデータを以下のディレクトリ構造を使って、パーションカラムとしてgender および country の2つの追加のカラムを持つパーティションされたテーブルに格納することができます。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

path/to/tableSparkSession.read.parquet または SparkSession.read.loadのどちらかに渡すことで、Spark SQL は自動的にパスからパーティション情報を抽出することができるでしょう。これで、返されるデータフレームのスキーマは以下のようになります:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

パーティションのカラムのデータタイプは自動的に推測されることに注意してください。現在のところ、数学的なデータタイプおよび文字列のタイプがサポートされます。パーティションカラムのデータタイプの自動的な推測をされたくない場合があるかも知れません。そのような場合のために、自動的なタイプの推測はspark.sql.sources.partitionColumnTypeInference.enabledで設定することができます。デフォルトは trueです。タイプの推測が無効な場合、パーティションカラムとして文字列タイプが使われるでしょう。

Spark 1.6.0から、パーティションの発見はデフォルトで指定されたパスの下のパーティションだけを見つけます。上の例では、もしユーザが path/to/table/gender=maleSparkSession.read.parquet または SparkSession.read.loadのどちらかに渡す場合に、gender はパーティションのカラムとして見なされないでしょう。パーティションの検索を始めるベースパスを指定する必要がある場合は、データソースのオプション内で basePathを設定することができます。例えば、path/to/table/gender=maleがデータのパスである場合、ユーザはbasePathpath/to/table/に設定します。 gender はパーティションカラムになるでしょう。

スキーマのマージ

ProtocolBuffer, Avro および Thrift のように、Parquet もスキーマの評価をサポートします。ユーザは単純なスキーマから開始し、必要に応じて次第にもっとカラムをスキーマに追加することができます。この場合、ユーザは異なるがお互いにスキーマの互換性がある複数のParquetファイルにするかも知れません。Parquetデータソースは現在では自動的にこのケースを検知し、全てのこれらのファイルのスキーマをマージすることができます。

スキーマのマージは比較的高価な操作であり、多くの場合必要ではないため、1.5.0以降からデフォルトではoffにしています。以下のようにして有効にすることができます

  1. (以下の例のように)Parquet ファイルを読む時にデータソースオプションmergeSchematrue に設定、あるいは
  2. グローバルSQLオプションspark.sql.parquet.mergeSchematrueに設定。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key : int (nullable = true)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public static class Square implements Serializable {
  private int value;
  private int square;

  // Getters and setters...

}

public static class Cube implements Serializable {
  private int value;
  private int cube;

  // Getters and setters...

}

List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
  Square square = new Square();
  square.setValue(value);
  square.setSquare(value * value);
  squares.add(square);
}

// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");

List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
  Cube cube = new Cube();
  cube.setValue(value);
  cube.setCube(value * value * value);
  cubes.add(cube);
}

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");

// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key : int (nullable = true)
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
# spark from the previous example is used in this example.

# Create a simple DataFrame, stored into a partition directory
df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df1.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = spark.createDataFrame(sc.parallelize(range(6, 11))
                                   .map(lambda i: Row(single=i, triple=i * 3)))
df2.write.parquet("data/test_table/key=2")

# Read the partitioned table
df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))

# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")

# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: double (nullable = true)
# |-- double: double (nullable = true)
# |-- triple: double (nullable = true)
# |-- key : int (nullable = true)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

Hive メタストア Parquet テーブル交換

Hive metastore Parquetテーブルへの読み書きの時に、Spark SQLはより良いパフォーマンスのためにHive SerDeの代わりに独自のParquetサポートを使おうとするでしょう。この挙動は spark.sql.hive.convertMetastoreParquet設定によって制御され、デフォルトで作動しています。

Hive/Parquet Schema 調整

テーブルスキーマ処理の観点から、HiveとParquetの間には2つの主要な違いがあります。

  1. Hive は大文字小文字を区別しませんが、Parquetは区別します。
  2. Hiveは全てのカラムがnull可能ですが、Parquetには重要な意味があります。

この理由により、Hive metastore Parquet テーブルをSpark SQL Parquet テーブルに変換する場合に、Hive metastore スキーマと Parquet スキーマを調停する必要があります。調停ルールは以下の通りです:

  1. 両方のスキーマで同じ名前を持つフィールドは、null可能かどうかに関係なく同じデータタイプでなければなりません。調停フィールドはParquet側のデータタイプを持たなければなりません。つまりnull可能かどうかが考慮されます。

  2. 調停スキーマはHive metastoreスキーマで定義されるそれらのフィールドを正確に含まなければなりません。

    • Parquetスキーマにのみ現れる全てのフィールドは調停スキーマの中で落とされます。
    • Hive metastoreスキーマにのみ現れる全てのフィールドは調停スキーマの中でnull可能なフィールドとして追加されます。

メタデータのリフレッシュ

Spark SQL はパフォーマンスの向上のためにParquet metadetaとしてキャッシュされます。Hive metastore Parquet テーブルの変換が有効な場合、それらの変換されたテーブルのmetadataもキャッシュされます。もしそれらのテーブルがHiveまたは他の外部のツールで更新された場合、metadataの一貫性のために手動でそれらを更新する必要があります。

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");
# spark is an existing HiveContext
spark.refreshTable("my_table")
REFRESH TABLE my_table;

設定

Parquetの設定はSparkSessionsetConf メソッドを使うか、SQLを使ってSET key=value を実行することで行うことができます。

プロパティ名デフォルト意味
spark.sql.parquet.binaryAsString false 他の幾つかのParquet生成システム、特にImpala, Hive および Spark SQLの古いバージョンは、Parquetスキーマを書き出す時にバイナリデータと文字列の区別をしません。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにバイナリデータを文字列として扱うように指示します。
spark.sql.parquet.int96AsTimestamp true 幾つかのParquet生成システム、特にImparaおよびHiveは、タイムスタンプをINT96に格納します。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにINT96データをタイムスタンプとして解釈するように指示します。
spark.sql.parquet.cacheMetadata true Parquetスキーマメタデータのキャッシュの機構をオンにします。静的なデータのクエリを高速化することができます。
spark.sql.parquet.compression.codec gzip Parquetファイルを書き込む時に圧縮符号化を使うように設定します。利用可能な値には、uncompressed, snappy, gzip, lzo が含まれます。
spark.sql.parquet.filterPushdown true trueに設定された場合は、Parquet filter push-down 最適化を有効化します。
spark.sql.hive.convertMetastoreParquet true falseに設定した場合は、Spark SQLはparquetテーブルのためにビルトインサポートの代わりにHive SerDeを使用するでしょう。
spark.sql.parquet.mergeSchema false

trueの場合、Parquetデータソースは全てのデータファイルから集められたスキーマをマージします。そうでなければ、スキーマは、サマリファイル、あるいはサマリファイルが利用できない場合はランダムデータファイルから取り出されます。

JSON データセット

Spark SQL は自動的にJSONデータセットのスキーマを推測しDataset[Row]としてロードすることができます。この変換は、文字列のRDDあるいはJSONファイルのどちらかでSparkSession.read.json()を使って行うことができます。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string
val otherPeopleRDD = spark.sparkContext.makeRDD(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。

Spark SQL は自動的にJSONデータセットのスキーマを推測しDataset<Row>としてロードすることができます。この変換は、文字列のRDDあるいはJSONファイルのどちらかでSparkSession.read().json()を使って行うことができます。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。

Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。この変換は、JSONファイル上のSparkSession.read.jsonを使って行うことができます。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

# spark is an existing SparkSession.

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
people = spark.read.json("examples/src/main/resources/people.json")

# The inferred schema can be visualized using the printSchema() method.
people.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Creates a temporary view using the DataFrame.
people.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by `spark`.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string.
anotherPeopleRDD = sc.parallelize([
  '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
anotherPeople = spark.jsonRDD(anotherPeopleRDD)

Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。read.json() を使うと、データを各ファイルの各行がJSONオブジェクトであるJSONファイルのディレクトリからデータをロードします。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json"
# Create a DataFrame from the file(s) pointed to by path
people <- read.json(path)

# The inferred schema can be visualized using the printSchema() method.
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Register this DataFrame as a table.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql methods.
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path "examples/src/main/resources/people.json"
)

SELECT * FROM jsonTable

Hive テーブル

Spark SQLはApache Hiveに格納されたデータの読み書きもサポートします。しかし、Hiveは多くの依存性を持つため、これらの依存物はデフォルトのSparkの配布物の中に含まれません。もしHiveの依存物をクラスパス上で見つけることができた場合、Sparkはそれらを自動的にロードするでしょう。Hiveに格納されているデータにアクセスするために全てのワーカーノードはHiveシリアライズおよびデシリアライズライブラリ(SerDes)へのアクセスが必要になるため、これらのHiveの依存物は全てのワーカーノード上にも存在しなければなりません。

Hiveの設定は、hive-site.xml, core-site.xml (セキュリティ設定) および hdfs-site.xml (HDFS 設定) ファイルを conf/に置くことで行われます。

Hiveを動かす場合、永続Hiveメタストアへの接続性、Hive serdesのサポート、および Hiveのユーザ定義関数を含めて、Hiveをサポートする SparkSession をインスタンス化しなければなりません。既存のHiveデプロイメントを持たないユーザは、Hiveサポートを有効にすることができます。hive-site.xmlによって設定されていない場合、コンテキストが自動的に現在のディレクトリに metastore_db を作成し、spark.sql.warehouse.dirによって設定されるディレクトリを生成します。このディレクトリはsparkアプリケーションが開始される現在のディレクトリ内をデフォルトのspark-warehouseにします。Spark 2.0.0からhive-site.xml 内のhive.metastore.warehouse.dir プロパティが非推奨であることに注意してください。代わりにwarehouse内のデータベースのデフォルトの場所を指定するためにspark.sql.warehouse.dirを使います。sparkを開始するユーザへ書き込み権限を与える必要があるかも知れません。

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a HiveContext.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" で見つかります。

Hiveを動かす場合、永続Hiveメタストアへの接続性、Hive serdesのサポート、および Hiveのユーザ定義関数を含めて、Hiveをサポートする SparkSession をインスタンス化しなければなりません。既存のHiveデプロイメントを持たないユーザは、Hiveサポートを有効にすることができます。hive-site.xmlによって設定されていない場合、コンテキストが自動的に現在のディレクトリに metastore_db を作成し、spark.sql.warehouse.dirによって設定されるディレクトリを生成します。このディレクトリはsparkアプリケーションが開始される現在のディレクトリ内をデフォルトのspark-warehouseにします。Spark 2.0.0からhive-site.xml 内のhive.metastore.warehouse.dir プロパティが非推奨であることに注意してください。代わりにwarehouse内のデータベースのデフォルトの場所を指定するためにspark.sql.warehouse.dirを使います。sparkを開始するユーザへ書き込み権限を与える必要があるかも知れません。

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Key: " + row.get(0) + ", Value: " + row.get(1);
  }
}, Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a HiveContext.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java" で見つかります。

Hiveを動かす場合、永続Hiveメタストアへの接続性、Hive serdesのサポート、および Hiveのユーザ定義関数を含めて、Hiveをサポートする SparkSession をインスタンス化しなければなりません。既存のHiveデプロイメントを持たないユーザは、Hiveサポートを有効にすることができます。hive-site.xmlによって設定されていない場合、コンテキストが自動的に現在のディレクトリに metastore_db を作成し、spark.sql.warehouse.dirによって設定されるディレクトリを生成します。このディレクトリはsparkアプリケーションが開始される現在のディレクトリ内をデフォルトのspark-warehouseにします。Spark 2.0.0からhive-site.xml 内のhive.metastore.warehouse.dir プロパティが非推奨であることに注意してください。代わりにwarehouse内のデータベースのデフォルトの場所を指定するためにspark.sql.warehouse.dirを使います。sparkを開始するユーザへ書き込み権限を与える必要があるかも知れません。

# spark is an existing SparkSession

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = spark.sql("FROM src SELECT key, value").collect()

Hiveを動かす場合、HiveサポートのSparkSessionをインスタンス化しなければなりません。これはメタソース内のテーブルを見つけ、HiveSQLを使ってクエリを書き込むためのサポートを追加します。

# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

Hiveメタソースの異なるバージョンの相互影響

SparkのHiveサポートの最も重要な部分の一つにHiveメタストアとの対話があります。これによりSpark SQLはHiveのテーブルのメタデータにアクセスすることができます。Spark 1.4.0から、Spark SQLの単独バイナリビルドが以下で説明する設定を使って異なるバージョンのHiveメタストアにクエリするために使うことができるようになりました。Hiveのバージョンの非依存性はmetastoreについて語られるもので、内部的にはSparkSQLはHive 1.2.1に対してコンパイルされ、それらのクラスを内部的な実行に使用することに注意してください。(serdes, UDFs, UDAFsなど)

以下のオプションがメタデータを扱うのに使われるHiveのバージョンを設定するために使うことができます。

プロパティ名デフォルト意味
spark.sql.hive.metastore.version 1.2.1 Hiveメタストアのバージョン使用可能なオプションは 0.12.0 から 1.2.1です。
spark.sql.hive.metastore.jars ビルトイン HiveMetastoreClientをインスタンス化するために使われるべきjarの場所。このプロパティは以下の3つのオプションのどれか一つです:
  1. ビルトイン
  2. -Phiveが有効な場合は、SparkアセンブリにバンドルされているHive 1.2.1を使います。このオプションが選択された場合、spark.sql.hive.metastore.version1.2.1 あるいは未定義のどちらかでなければなりません。
  3. maven
  4. Mavenリポジトリからダウンロードされた指定のバージョンのHive jarを使用します。この設定は一般的にプロダクション デプロイメントのためにはお勧めされません。
  5. JVMのための標準フォーマットのクラスパス。このクラスパスはHadoopの正しいバージョンを含む全てのHiveおよびその依存物を含まなければなりません。これらのjarはドライバー上にのみ存在する必要がありますが、もしyarnクラスタモードで動かしている場合は、それらがアプリケーションと一緒にパッケージされるようにしなければなりません。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

Spark SQLとHiveの特定のバージョンの間で共有されるクラスローダを使ってロードされるべきカンマ区切りのクラスプリフィックスのリスト。共有されるべきクラスの例はJDBCドライバで、メタストアと対話するために必要とされます。共有される必要がある他のクラスは、既に共有されているクラスとやり取りするためのものです。例えば、log4jによって使われる独自のアペンダーです。

spark.sql.hive.metastore.barrierPrefixes (empty)

Spark SQLと通信をする各バージョンのHiveのために明示的にロードされなければならないクラスのプリフィックスのカンマ区切りのリスト。例えば、一般的なprefixで定義されたHive UDFは共有されるでしょう(例えば、org.apache.spark.*)。

JDBC から他のデータベースへ

Spark SQLはJDBCを使ってほかのデータベースからデータを読み込むことができるデータソースも含みます。この機能はJdbcRDDを使う上で好まれるべきでしょう。なぜなら結果はデータフレームとして返され、それらはSpark SQLの中で簡単に処理することができるか他のデータソースと繋げることができるからです。 JDBCデータソースはユーザにClassTagの提供を要求しないため、JavaあるいはPythonから簡単に使うことができます。(これは他のアプリケーションがSparkSQLを使ってクエリを実行することができるSpark SQL JDBCサーバと異なることに注意してください)。

開始するためには、sparkのクラスパス上に特定のデータベースのためのJDBCドライバを含む必要があるでしょう。例えば、Sparkシェルからpostgresに接続するには、以下のコマンドを実行するかもしれません:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

リモートデータベースからのテーブルは、データソースAPIを使ってデータフレームあるいはSpark SQLの一時テーブルとしてロードすることがdけいます。以下のオプションがサポートされます:

プロパティ名意味
url 接続するための JDBC URL
dbtable 読み込まれる必要があるJDBCテーブル。SQLクエリのFROM句で有効なものを全て使用できることに注意してください。例えば、完全なテーブルの代わりに、丸括弧内のサブクエリも使うことができます。
driver このURLに接続するために使われるJDBCドライバのクラス名。
partitionColumn, lowerBound, upperBound, numPartitions これらのオプションは、いずれかが指定される場合は全て指定されなければなりません。複数のワーカーから並行して読み込む時は、それらはどうやってテーブルを分割するかを説明します。partitionColumn は問題となっているテーブルからの数値のカラムでなければなりません。lowerBound および upperBound はテーブル内の行をフィルタするために使われるのではなく、パーティションの影響範囲を決めるためだけに使われることに注意してください。つまりテーブル内の全ての行が分割され返されるでしょう。
fetchSize JDBCの fetchサイズ。これは一回でどれだけの数の行をfetchするかを決定します。This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
Map<String, String> options = new HashMap<>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");

Dataset<Row> jdbcDF = spark.read().format("jdbc"). options(options).load();
df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load()
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename"
)

トラブルシューティング

パフォーマンス チューニング

ちょっとした次善策として、メモリにデータをキャッシュ、あるいは幾つかの実験的なオプションを調整することでパフォーマンスを改善することができます。

メモリへのデータのキャッシュ

Spark SQL は spark.cacheTable("tableName") あるいはdataFrame.cache()を呼ぶことでインメモリのコラム形式のフォーマットを使ってテーブルをキャッシュすることができます。そして、Spark SQLは必要なカラムだけをスキャンし、メモリの使用量とGCの圧力を最小化するために圧縮を自動的に調整するでしょう。メモリからテーブルを削除するために spark.uncacheTable("tableName") を呼ぶことができます。

メモリ内キャッシングの設定はSparkSessionsetConfメソッドあるいは SQLを使ってSET key=valueコマンドを実行することで行うことができます。

プロパティ名デフォルト意味
spark.sql.inMemoryColumnarStorage.compressed true trueに設定した場合はSpark SQLはデータの統計に基づいて各カラムの圧縮コーディックを自動的に選択するでしょう。
spark.sql.inMemoryColumnarStorage.batchSize 10000 カラムキャッシュのためのバッチのサイズを制御します。バッチのサイズを大きくするとメモリの利用率と圧縮が改善できますが、データをキャッシュする時にOOMのリスクがあります。

他の設定オプション

以下のオプションもクエリ実行のパフォーマンスを調整するために使用することができます。これらのオプションはもっと多くの最適化が自動的に行われるため、将来のリリースでは非推奨になるかもしれません。

プロパティ名デフォルト意味
spark.sql.files.maxPartitionBytes 134217728 (128 MB) ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。
spark.sql.files.openCostInBytes 4194304 (4 MB) ファイルを開くための予測コストは同じ時間で操作することができるバイト数によって計測することができます。これは複数のファイルを1つのパーティションに配置する場合に使われます。過剰に予測するほうが良いです。そうれうば、小さなファイルを持つパーティションは大きなファイルを持つパーティションよりも高速になるでしょう(最初にスケジュールされます)。
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan が実行された場合、現在のところ統計はHiveメタストアテーブルのみがサポートされることに注意してください。
spark.sql.shuffle.partitions 200 joinあるいは集約のためにデータをシャッフルする時に使用するパーティションの数を設定します。

分散SQLエンジン

Spark SQLはJDBC/ODBCあるいはコマンドラインインタフェースを使って分散型クエリエンジンとして振舞うこともできます。このモードでは、エンドユーザあるいはアプリケーションがコードを書くこと無しにSQLクエリを直接実行するためにSpark SQLと直接やり取りをすることができます。

Thrift JDBC/ODBC サーバの実行

ここで実装されているThrift JDBC/ODBC サーバは、Hive 1.2.1のHiveServer2に対応します。SparkあるいはHive 1.2.1に同梱されるbeelineスクリプトを使ってJDBCサーバをテストすることができます。

JDBC/ODBCサーバを開始するには、Sparkディレクトリで以下を実行します:

./sbin/start-thriftserver.sh

このスクリプトはHiveプロパティを指定する--hiveconf を加えて全ての bin/spark-submit コマンドラインを受け付けます。全ての利用可能なオプションのリストのために./sbin/start-thriftserver.sh --help を実行するかも知れません。デフォルトでは、サーバはlocalhsot:10000をlistenします。以下のように環境変数を使ってこの挙動を上書きできます。例えば:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

あるいはシステムプロパティを使って上書き:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

これで、Thrift JDBC/ODBC サーバをテストするために beelineを使うことができます:

./bin/beeline

以下のようにしてbeeline内でJDBC/ODBCに接続します:

beeline> !connect jdbc:hive2://localhost:10000

Beeline はユーザ名とパスワードを尋ねるでしょう。セキュアで無いモードでは、単にマシーン上のユーザ名を入力し空のパスワードを入力します。セキュアモードのためには、beeline ドキュメント にある手順に従ってください。

Hiveの設定は、hive-site.xmlconf/内の core-site.xml および hdfs-site.xml ファイルに置き換えることで行われます。

Hiveに付属するbeeline スクリプトを使うかも知れません。

Thrift JDBC サーバはHTTPトランスポート上のThrift RPC メッセージの送信もサポートします。以下の設定を使ってシステムプロパティあるいはconf/内のhive-site.xml ファイルの中でHTTPモードを有効にします:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

テストするには、beelineを使って以下のようにしてhttpモードのJDBC/ODBCサーバに接続します:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Spark SQL CLIの実行

Spark SQL CLI はローカルモードでHiveメタストアサービスをを実行するのに便利なツールで、コマンドラインからの入力のクエリを実行します。Spark SQL CLIはThrift JDBCサーバとやりとりできないことに注意してください。

Spark SQL CLIを開始するには、Spark ディレクトリで以下を実行してください:

./bin/spark-sql

Hiveの設定は、hive-site.xmlconf/内の core-site.xml および hdfs-site.xml ファイルに置き換えることで行われます。全ての利用可能なオプションの完全なリストのために、./bin/spark-sql --help を実行するかもしれません。

移行ガイド

Spark SQL 1.6から2.0へのアップグレード

Spark SQL 1.5から1.6へのアップグレード

./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...

Spark SQL 1.4から1.5へのアップグレード

Spark SQL 1.3から1.4へのアップグレード

DataFrame データの reader/writer インタフェース

ユーザのフィードバックにより、(SQLContext.read)の中のデータを読み込み、(DataFrame.write)へ書き込むための新しくもっと柔軟なAPIを作成しました。古いAPIは非推奨にされました (例えば、SQLContext.parquetFile, SQLContext.jsonFile)。

SQLContext.read ( Scala, Java, Python ) および DataFrame.write ( Scala, Java, Python ) についての詳細な情報はAPIドキュメントを見てください。

グルーピング カラムを保持するDataFrame.groupBy

ユーザフィードバックに基づいて、DataFrameの結果のグルーピング カラムを保持するためにDataFrame.groupBy().agg() のデフォルトの挙動を変更しました。1.3での挙動を維持するためには、spark.sql.retainGroupColumnsfalseに設定してください。

// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");
import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

DataFrame.withColumn上での挙動の変更

1.4より前は、DataFrame.withColumn() はカラムの追加のみをサポートします。同じ名前の既存のカラムがあったとしても、結果のデータフレーム内の指定された名前を持つ新しいカラムとして、そのカラムが常に追加されるでしょう。1.4から、DataFrame.withColumn() は既存のすべてのカラムの名前と異なるカラム名の追加、あるいは同じ名前の既存のカラムの置きかえをサポートします。

この変更はScala APIだけのもので、PySparkあるいはSparkRのものでは無いことに注意してください。

Spark SQL 1.0-1.2 から 1.3 へのアップグレード

Spark 1.3. では、Spark SQLから "Alpha"のラベルが取り除かれ、この一環として利用可能なAPIの整理が行われました。Spark1.3以降は、Spark SQLは1.Xシリーズの他のリリースとバイナリ互換性を提供するでしょう。この互換性の保証には、明示的に安定していないと印を付けられている(つまり、DeveloperAPI あるいは Experimental)APIは含まれません。

SchemaRDDからデータフレームへの変更

Spark SQL 1.3にアップグレードした時にユーザが気づく最も大きな変更は、SchemaRDDDataFrame に名前が変更されることです。これは、データフレームがもはやRDDから直接継承されないが、RDD自身の実装でRDDが提供するほとんどの機能を代わりに提供するため、重要です。データフレームは.rdd メソッドを呼ぶことでRDDに変換することも可能です。

Scalaでは、いくつかのユースケースのためのソースの互換性を提供するために、SchemaRDD から DataFrame へのタイプのエイリアスがあります。それでもDataFrame を代わりに使うためにコードを更新することをお勧めします。Java および Python ユーザはコードを更新する必要は無いでしょう。

JavaとScala APIの統一

Spark 1.3 以降には、Scala APIを模倣した別個のJava互換クラス (JavaSQLContext および JavaSchemaRDD) があります。Spark 1.3 ではJavaAPIおよびScala APIは統合されました。どちらかの言語のユーザは SQLContext および DataFrameを使用しなければなりません。一般的にこれらのクラスは両方の言語で使用可能なタイプ(つまり、言語特有のコレクションの代わりに Array)を使用しようとします。一般的なタイプが無い場合(例えば、クロージャーあるいはMapを渡す)、代わりに関数の上書きが使われます。

更に、Java固有のタイプのAPIが削除されました。ScalaおよびJavaのユーザはプログラム的にスキーマを説明するためにorg.apache.spark.sql.types にあるクラスを使わなければなりません。

dslパッケージの明示的な交換と削除の分離 (Scalaのみ)

import sqlContext._ から始まるSpark 1.3より前の多くのコード例、sqlContextからの全ての関数はスコープに入れられました。Spark 1.3では、RDDから DataFrameへの変換のための暗黙的な変換は、SQLContextの中に隔離しました。今はユーザは import sqlContext.implicits._を書かなければなりません。

Additionally, the implicit conversions now only augment RDDs that are composed of Products (i.e., case classes or tuples) with a method toDF, instead of applying automatically.

When using function inside of the DSL (now replaced with the DataFrame API) users used to import org.apache.spark.sql.catalyst.dsl. 代わりに公開データフレーム関数APIが使われるべきです: import org.apache.spark.sql.functions._.

DataTypeのための org.apache.spark.sql 内のタイプエイリアスの削除 (Scala のみ)

Spark 1.3 は、DataTypeのための基本sqlパッケージにあったタイプのエイリアスを削除しました。ユーザはorg.apache.spark.sql.typesにあるクラスを代わりにインポートしなければなりません。

UDF 登録はsqlContext.udfに移動しました (Java & Scala)

UDFを登録するために使われる関数、データフレーム DSLあるいはSQLの両方で使われる、は、SQLContext内のudfオブジェクトに移動されました。

sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python のUDF登録は変更されません。

Python DataType はもうシングルトンではありません

Pythonでデータタイプを使う場合は、シングルトンを参照する代わりにそれら(つまり StringType()) を構築する必要があるでしょう。

Apache Hiveとの互換性

Spark SQL はHiveメタソース、SerDesおよびiUDFと互換性があるように設計されています。現在のところ、Hive SerDes と UDFs はHive 1.2.1に基づいていて、Spark SQLは異なるバージョンのHive Metastoreに接続されることができます (0.12.0 から 1.2.1)[Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore))も見てください。

既存のHiveウェアハウスへのデプロイ

Spark SQL Thrift JDBC サーバは"追加設定無しで"既存のHiveインストレーションと互換性があるように設計されています。既存のHiveメタストアを修正したり、あるいはデータの配置またはテーブルのパーティションを変更する必要はありません。

サポートされるHive機能

Spark SQLは以下のようなHiveの機能の大部分をサポートします:

サポートされないHive機能

以下はまだサポートされないHive機能のリストです。それらの機能のほとんどはHiveデプロイメントでほとんど使われません。

主要なHive機能

難解なHive機能

Hive 入力/出力 フォーマット

Hive 最適化

扱いにくいHive最適化はまだSparkに含まれていません。(インデックスのような)それらのいくつかはSpark SQLのインメモリ計算モデルに重要ではありません。他はSpark SQLの将来のリリースに組み込まれます。

リファレンス

データの種類

Spark SQL と DataFrames は以下のデータタイプをサポートします:

Spark SQLの全てのデータタイプは org.apache.spark.sql.typesパッケージの中にあります。以下のようにしてアクセスすることができます

import org.apache.spark.sql.types._
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" で見つかります。
データタイプ Scalaでの値のタイプ データタイプにアクセスあるいは生成するためのAPI
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
注意: containsNull のデフォルト値は trueです。
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
注意: valueContainsNull のデフォルト値は trueです。
StructType org.apache.spark.sql.Row StructType(fields)
注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのScalaでの値のタイプ(例えば、データタイプ IntegerTypeのStructFieldのためのInt) StructField(name, dataType, nullable)

Spark SQLの全てのデータタイプは org.apache.spark.sql.typesのパッケージ内にあります。データタイプにアクセスあるいは生成するには、org.apache.spark.sql.types.DataTypesで提供されるファクトリーメソッドを使ってください。

データタイプ Javaでの値タイプ データタイプにアクセスあるいは生成するためのAPI
ByteType byte あるいは Byte DataTypes.ByteType
ShortType short あるいは Short DataTypes.ShortType
IntegerType int あるいは Integer DataTypes.IntegerType
LongType long あるいは Long DataTypes.LongType
FloatType float あるいは Float DataTypes.FloatType
DoubleType double あるいは Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType()
DataTypes.createDecimalType(precision, scale).
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
BooleanType boolean あるいは Boolean DataTypes.BooleanType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DataTypes.DateType
ArrayType java.util.List DataTypes.createArrayType(elementType)
注意: containsNullの値はtrueでしょう。
DataTypes.createArrayType(elementType, containsNull).
MapType java.util.Map DataTypes.createMapType(keyType, valueType)
注意: valueContainsNullの値はtrueでしょう。
DataTypes.createMapType(keyType, valueType, valueContainsNull)
StructType org.apache.spark.sql.Row DataTypes.createStructType(fields)
注意: fields はListあるいはStructFieldsの配列です。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのJavaでの値のタイプ(例えば、データタイプ IntegerTypeのStructFieldのためのint) DataTypes.createStructField(name, dataType, nullable)

Spark SQLの全てのデータタイプは pyspark.sql.typesパッケージ内にあります。以下のようにしてアクセスすることができます

from pyspark.sql.types import *
データタイプ Pythonでの値タイプ データタイプにアクセスあるいは生成するためのAPI
ByteType int あるいは long
注意: 実行時に数値は1バイトの符号あり数値に変換されるでしょう。数値は-128から127の範囲にあるようにしてください。
ByteType()
ShortType int あるいは long
注意: 実行時に数値は2バイトの符号あり数値に変換されるでしょう。数値は-32768から32767の範囲にあるようにしてください。
ShortType()
IntegerType int あるいは long IntegerType()
LongType long
注意: 実行時に数値は8バイトの符号あり数値に変換されるでしょう。数値は-9223372036854775808 から 9223372036854775807の範囲にあるようにしてください。そうでなければ、データをdecimal.Decimalに変換し、DecimalTypeを使ってください。
LongType()
FloatType float
注意: 実行時に数値は4バイトの符号あり単精度浮動小数点に変換されるでしょう。
FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType リスト、組、あるいは配列 ArrayType(elementType, [containsNull])
注意: containsNull のデフォルト値は Trueです。
MapType dict MapType(keyType, valueType, [valueContainsNull])
注意: valueContainsNull のデフォルト値はTrueです。
StructType リストあるいは組 StructType(fields)
注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのPythonでの値のタイプ(例えば、データタイプIntegerTypeのStructFieldのためのInt) StructField(name, dataType, nullable)
データタイプ Rでの値のタイプ データタイプにアクセスあるいは生成するためのAPI
ByteType integer
注意: 実行時に数値は1バイトの符号あり数値に変換されるでしょう。数値は-128から127の範囲にあるようにしてください。
"byte"
ShortType integer
注意: 実行時に数値は2バイトの符号あり数値に変換されるでしょう。数値は-32768から32767の範囲にあるようにしてください。
"short"
IntegerType integer "integer"
LongType integer
注意: 実行時に数値は8バイトの符号あり数値に変換されるでしょう。数値は-9223372036854775808 から 9223372036854775807の範囲にあるようにしてください。そうでなければ、データをdecimal.Decimalに変換し、DecimalTypeを使ってください。
"long"
FloatType numeric
注意: 実行時に数値は4バイトの符号あり単精度浮動小数点に変換されるでしょう。
"float"
DoubleType numeric "double"
DecimalType サポートされません。 サポートされません。
StringType character "string"
BinaryType raw "binary"
BooleanType logical "bool"
TimestampType POSIXct "timestamp"
DateType Date "date"
ArrayType vector あるいは list list(type="array", elementType=elementType, containsNull=[containsNull])
注意: containsNull のデフォルト値は Trueです。
MapType environment list(type="map", keyType=keyType, valueType=valueType, valueContainsNull=[valueContainsNull])
注意: valueContainsNull のデフォルト値はTrueです。
StructType 名前付きリスト list(type="struct", fields=fields)
注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのRでの値のタイプ(例えば、データタイプIntegerTypeのStructFieldのためのinteger) list(name=name, type=dataType, nullable=nullable)

NaN Semantics

標準の浮動小数点の記号に正確の一致しない float あるいは double タイプを扱う時に、not-a-number (NAN)の特別な処理があります。具体的には:

TOP
inserted by FC2 system