Spark SQL, データフレーム および データセット ガイド
- 概要
- 開始
- データソース
- パフォーマンス チューニング
- 分散SQLエンジン
- 移行ガイド
- リファレンス
概要
Spark SQL は構造化されたデータの処理のためのSparkモジュールです。基本のRDD API とは異なり、Spark SQLによって提供されるインタフェースは、Sparkにデータと実行されている計算の両方についての構造のもっと多くの情報を提供します。内部的には、Spark SQLはこの特別な上方を特別な最適化を実施するために使用します。SQLおよびデータセットAPIを含むSpark SQLとやり取りをする幾つかの方法があります。計算を表現するためにどのAPI/言語を使っているかに関係なく、結果を計算する時に同じ実行エンジンが使われます。この単一化は、開発者が指定された変換を表現するもっとも自然な方法に基づく異なるAPI間を容易に行き来できることを意味します。
このページの全ての例はSpark配布物に含まれるサンプルデータを使用し、spark-shell
, pyspark
シェルあるいは sparkR
シェルの中で実行することができます。
SQL
Spark SQLの一つの使い方は、SQLクエリを実行することです。Spark SQLは既存のHiveインストレーションからデータを読み込むために使うこともできます。この起動をどう設定するかの詳細については、 Hive テーブル の章を参照してください。他のプログラミング言語の中でSQLを実行する場合、結果はデータセット/データフレームとして返されるでしょう。command-line あるいは JDBC/ODBCを使ってSQLインタフェースとやり取りすることもできます。
データセットとデータフレーム
データセットはデータの分散型コレクションです。データセットはSparkSQLの最適化実行エンジンを使ってRDDの利点(強力なタイプ、強力なラムダ関数を使用する能力)を提供するSpark 1.6で追加された新しいインタフェースです。データセットはJVMオブジェクトから構築することができ、関数的な変形(map
, flatMap
, filter
など)を使って操作することができます。データセットAPIは Scala および Javaで利用可能です。PythonはデータセットAPIのためのサポートを持ちません。しかし、Pythonの動的な性質により、データセットAPIの多くの利点が既に利用可能です(たとえば自然に名前row.columnName
によって行のフィールドにアクセスすることができます)。Rの場合も似ています。
データフレームは、名前付きの列に整理されたデータセットです。リレーショナルデータベースでのテーブル、あるいはR/Pythonでのデータフレームと概念的に等価ですが、裏ではもっと最適化されています。データフレームは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRDD。データフレームAPIはScala, Java, Python および R で利用可能です。Scalaと Javaでは、データフレームは Row
のデータセットによって表現されます。Scala APIでは、DataFrame
は単純にDataset[Row]
のタイプのエイリアスです。一方で、Java APIでは、ユーザはDataFrame
を表現するためにDataset<Row>
を使う必要があります。
このドキュメントの至る所で、データフレームとしてRow
のScala/Javaデータセットをしばしば参照するつもりです。
開始
開始点: SparkSession
Sparkの全ての機能へのエントリーポイントはSQLSession
クラスです。基本的な SparkSession
を生成するには、単にSparkSession.builder()
を使います:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
Sparkの全ての機能へのエントリーポイントはSQLSession
クラスです。基本的な SparkSession
を生成するには、単にSparkSession.builder()
を使います:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Sparkの全ての機能へのエントリーポイントはSQLSession
クラスです。基本的な SparkSession
を生成するには、単にSparkSession.builder
を使います:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Sparkの全ての機能へのエントリーポイントはSQLSession
クラスです。基本のSparkSession
を初期化するには、単にsparkR.session()
を呼び出します:
sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
初めて起動する時には、sparkR.session()
がグローバルなSparkSession
シングルトン インスタンスを初期化し、続く起動で常にこのインスタンスへのリファレンスを返すことに注意してください。この方法は、ユーザは1度だけSparkSession
を初期化する必要があり、read.df
のSparkRの関数はこのグローバルインスタンスに暗黙的にアクセスすることができ、ユーザはSparkSession
インスタンスをあちこちに渡す必要がありません。
Spark 2.0でのSparkSession
はHiveQLを使ってクエリを書き、Hive UDFにアクセスし、Hiveテーブルからデータを読み込むことができる能力を含むHiveの機能のための組み込みのサポートを提供します。 これらの機能を使うために、既存のHiveのセットアップを持つ必要はありません。
データフレームの生成
SparkSession
を使ってアプリケーションは既存のRDD
、Hiveテーブル、あるいはSpark data sourcesからデータフレームを作成することができます。
例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
SparkSession
を使ってアプリケーションは既存のRDD
、Hiveテーブル、あるいはSpark data sourcesからデータフレームを作成することができます。
例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
SparkSession
を使ってアプリケーションは既存のRDD
、Hiveテーブル、あるいはSpark data sourcesからデータフレームを作成することができます。
例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
SparkSession
を使って、アプリケーションはローカルのRのdata.frame、Hiveテーブル、あるいはSpark データソースからデータフレームを生成することができます。
例として、以下ではJSONファイルの内容に基づいて、データフレームを生成します。
df <- read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame
head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
## +----+-------+
## | age| name|
## +----+-------+
## |null|Michael|
## | 30| Andy|
## | 19| Justin|
## +----+-------+
無タイプのデータセット操作 (別名、DataFrame操作)
データフレームは Scala、Java、PythonおよびR での構造化データ操作のためのドメイン固有の言語を提供します。
上で述べたように、Spark 2.0では、データフレームはScalaおよびJava APIでのまさに行のデータセットです。これらのオペレーションは、強く型付けされたScala/Javaのデータセットが付属している "型有りの変換"に対して、"型無しの変換"として参照されます。
以下は、データセットを使った構造化データ処理の幾つかの基本的な例を含んでいます:
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
データセットで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。
単純なカラムの参照および表現に加えて、データセットは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
データセットで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。
単純なカラムの参照および表現に加えて、データセットは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。
Pythonでは属性(df.age
) あるいは、インデックス(df['age']
)のどちらかを使ってデータフレームのカラムにアクセスすることができます。データを探索するには前者が便利ですが、後者の形式を使うことをとてもお勧めします。これは将来が保証されており、データフレームクラス上の属性もカラム名を使って破壊しないでしょう。
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+
# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。
単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。
# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame
head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Select only the "name" column
head(select(df, "name"))
## name
## 1 Michael
## 2 Andy
## 3 Justin
# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
## name (age + 1.0)
## 1 Michael NA
## 2 Andy 31
## 3 Justin 20
# Select people older than 21
head(where(df, df$age > 21))
## age name
## 1 30 Andy
# Count people by age
head(count(groupBy(df, "age")))
## age count
## 1 19 1
## 2 NA 1
## 3 30 1
DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。
単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。
プログラム的にSQLクエリを実行
SparkSession
のsql
はアプリケーションがSQLクエリをプログラム的に実行することを可能にし、結果を DataFrame
として返します。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
SparkSession
のsql
はアプリケーションがSQLクエリをプログラム的に実行することを可能にし、結果を Dataset<Row>
として返します。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
SparkSession
のsql
はアプリケーションがSQLクエリをプログラム的に実行することを可能にし、結果を DataFrame
として返します。
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
sql
関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はSparkDataFrame
として返されます。
df <- sql("SELECT * FROM table")
グローバル テンポラリ ビュー
Spark SQLでのテンポラリビューはセッションスコープで、それを生成したセッションが終了した場合に消えるでしょう。Sparkアプリケーションが終了するまで保持し全てのセッションで共有するテンポラリビューを持ちたい場合は、グローバルテンプレートビューを生成することができます。グローバルテンプレート ビューはシステムが保持するデータベースglobal_temp
に結び付けられ、それを参照するために完全名を使う必要があります。例えば、SELECT * FROM global_temp.view1
。
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl
SELECT * FROM global_temp.temp_view
データセットの生成
データセットはRDDに似ていますが、Javaのシリアライズ化あるいはKryoを使う代わりに、ネットワークを越えて処理あるいは転送するためにオブジェクトをシリアライズ化するために特別なEncoderを使用します。エンコーダーと標準シリアライズ化のどちらもオブジェクトをバイトに変えることに責任を持ちますが、エンコーダーは動的に生成されるコードであり、Sparkがバイトをオブジェクトへデシリアライズ化無しにフィルタリング、ソーティングおよびハッシュ化のような多くのオペレーションを実施することができる形式を使用します。
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
RDDを使った内部操作
Spark SQL は既存のRDDをデータセットに変換する二つの異なるメソッドを提供します。1つ目のメソッドはオブジェクトの特定のタイプを含むRDDのスキーマを推測するためにリフレクションを使用します。このリフレクションを基礎としたやり方は、Sparkアプリケーションを書いている時にスキーマを既に知っている場合には、結果としてより簡潔でよく動きます。
データセットを生成する2つ目のメソッドは、スキーマを構築し既存のRDDに適用することができるプログラム的なインタフェースです。このメソッドは多少冗長ですが、カラムとそれらのタイプが実行時まで分からない場合にデータセットを構築することができます。
リフレクションを使ったスキーマの推測
Spark SQLのためのScalaインタフェースは、データフレームへのcaseクラスを含むRDDの自動的な変換をサポートします。caseクラスはテーブルのスキーマを定義します。caseクラスへの引数の名前はリフレクションを使って読み込まれ、カラムの名前となります。case クラスは入れ子にすることもでき、Seq
あるいはArray
のような複雑なタイプを含むことができます。このRDDは明示的にデータフレームに変換され、テーブルとして登録されます。テーブルは後に続くSQL分の中で使うことができます。
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
Spark SQLはJavaBeansのRDDからデータフレームへの自動的な変換をサポートします。 リフレクションを使って取得されたBeanInfo
はテーブルのスキーマを定義します。現在のところ、Spark SQLはMap
フィールドを含むJavaBeansをサポートしません。しかし、入れ子になったJavaBeansと、List
あるいは Array
フィールドはサポートされます。Serializableを実装し全てのフィールドについてgetterおよびsetterを持つクラスを生成することで、JavaBeanを生成することができます。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Spark SQL はデータタイプを推測して、RowオブジェクトのRDDをデータフレームに変換することができます。行はkwargsとしてkey/valueペアをRowクラスに渡すことで構築することができます。このリストのキーは表のカラム名を定義し、JSONファイル上で実施される推測に似たような、データセット全体を標本化することでタイプを推測します。
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
プログラム的なスキーマの指定
事前にケースクラスを定義できない場合(例えば、レコードの構造が文字列にエンコードされている。あるいはテキストのデータセットがパースされ、フィールドが異なるユーザには異なって抽出されるかもしれない)、DataFrame
は3つのステップでプログラム的に生成することができます。
- 元のRDDから
行
のRDDを生成する; - ステップ1で生成したRDD内の
行
の構造に一致するStructType
によって表現されるスキーマを生成する。 - スキーマを
SparkSession
によって提供されるcreateDataFrame
メソッドを使って行
のRDDに適用する。
例えば:
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
事前にJavaBeanクラスを定義できない場合(例えば、レコードの構造が文字列にエンコードされている。あるいはテキストのデータセットがパースされ、フィールドが異なるユーザには異なって抽出されるかもしれない)、Dataset<Row>
は3つのステップでプログラム的に生成することができます。
- 元のRDDから
行
のRDDを生成する; - ステップ1で生成したRDD内の
行
の構造に一致するStructType
によって表現されるスキーマを生成する。 - スキーマを
SparkSession
によって提供されるcreateDataFrame
メソッドを使って行
のRDDに適用する。
例えば:
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
事前にkwargsの辞書を定義できない場合(例えば、レコードの構造が文字列にエンコードされている。あるいはテキストのデータセットがパースされ、フィールドが異なるユーザには異なって抽出されるかもしれない)、DataFrame
は3つのステップでプログラム的に生成することができます。
- 元のRDDから組あるいはリストのRDDを生成する;
- ステップ1で生成したRDD内の組あるいはリストの構造に一致する
StructType
によって表現されるスキーマを生成する。 - スキーマを
SparkSession
によって提供されるcreateDataFrame
メソッドを使ってRDDに適用します。
例えば:
# Import data types
from pyspark.sql.types import *
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
集約
組み込みのデータフレーム関数 はcount()
, countDistinct()
, avg()
, max()
, min()
などのような一般的な集約を提供します。これらの関数はデータフレームのための設計されていますが、Spark SQLはScala と Java で強く型付けされたデータセットと連携するためにそれらの幾つかの型セーフのバージョンも持ちます。更に、ユーザは定義済みの集約関数に制限されず、独自のそれらを作成することができます。
型無しユーザ定義集約関数
独自の型無しの集約関数を実装するには、ユーザはUserDefinedAggregateFunction抽象クラスを拡張する必要があります。例えば、ユーザ定義のaverageは以下のようになります:
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// Register the function to access it
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
// Data types of input arguments of this aggregate function
public StructType inputSchema() {
return inputSchema;
}
// Data types of values in the aggregation buffer
public StructType bufferSchema() {
return bufferSchema;
}
// The data type of the returned value
public DataType dataType() {
return DataTypes.DoubleType;
}
// Whether this function always returns the same output on the identical input
public boolean deterministic() {
return true;
}
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
// Calculates the final result
public Double evaluate(Row buffer) {
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
}
// Register the function to access it
spark.udf().register("myAverage", new MyAverage());
Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
型セーフ ユーザ定義集約関数
強く型付けされたデータセットのためのユーザ定義の集約はAggregator 抽象クラス周りを解決します。例えば、型セーフのユーザ定義averageは以下のようになります:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
public static class Employee implements Serializable {
private String name;
private long salary;
// Constructors, getters, setters...
}
public static class Average implements Serializable {
private long sum;
private long count;
// Constructors, getters, setters...
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// Specifies the Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
データソース
Spark SQL はDataFrameインタフェースを使って様々なデータソース上での操作をサポートします。データフレームはrelational transformationを使って操作することができ、一時viewを生成するために使うことができます。データフレームを一時的なビューとして登録することによりデータにSQLクエリを実行することができます。このセクションではSpark データソースを使ってデータをロードおよび保存する一般的なメソッドを説明し、その後ビルトインのデータソースのために利用可能な特定のオプションについて詳しく調べます。
一般的なロード/保存 機能
もっとも簡単な形式では、全てのオペレータのためにデフォルトのデータソース (spark.sql.sources.default
で他のものが指定されていない場合はparquet
) が使われるでしょう。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
手動でのオプションの指定
データソースに渡したいどのような特別のオプションと一緒にデータソースを手動で指定することもできます。データソースは完全修飾名(例えば、org.apache.spark.sql.parquet
)で指定されますが、ビルトインのソースの場合はショート名 (json
, parquet
, jdbc
, orc
, libsvm
, csv
, text
)を使うこともできます。どのようなデータソースからロードされたデータフレームもこの構文を使って他のタイプに変換することができます。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
ファイル上のSQLを直接実行
ファイルをデータフレームにロードし、それに質問するためにread APIを使う代わりに、SQLを使ってファイルに直接質問することもできます。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
セーブモード
セーブ操作は任意にSaveMode
を取ることができます。これは既存のデータがあった場合にどう扱うかを指定します。これらのセーブモードがどのようなロックも使わないこと、およびアトミックでは無いことを理解しておくことは重要です。更に、Overwrite
を実施する場合、新しいデータを書き込む前にデータは削除されるでしょう。
Scala/Java | Any Language | 意味 |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" (default) |
データフレームをデータソースに保存する場合、もしデータが既に存在する場合は例外が投げられるでしょう。 |
SaveMode.Append |
"append" |
データフレームをデータソースに保存する場合、もし データ/テーブル が既に存在する場合は、データフレームの内容は既存のデータに追記されるでしょう。 |
SaveMode.Overwrite |
"overwrite" |
overwrite モードは、データフレームをデータソースに保存する場合に、もし データ/テーブル が存在する場合は既存のデータがデータフレームの内容によって上書きされるだろうことを意味します。 |
SaveMode.Ignore |
"ignore" |
ignore モードは、データフレームをデータソースに保存する場合に、もし データ が存在する場合はセーブ操作によってデータフレームの内容が保存されず、既存のデータが変更されないだろうことを意味します。これはSQLでの CREATE TABLE IF NOT EXISTS に似ています。
|
永続テーブルへの保存
DataFrames
はsaveAsTable
コマンドを使ってHiveのメタストアの中に永続テーブルとして保存することもできます。既存のHive配備はこの機能を使う必要が無いことに注意してください。Sparkは(Derbyを使って)デフォルトのローカルのHiveメタストアを作るでしょう。createOrReplaceTempView
と異なり、saveAsTable
はデータフレームの内容を具体化し、Hiveメタストア内のデータへのポインタを生成するでしょう。同じmetastoreに接続を続ける限り、永続的なテーブルはSparkプログラムが再起動した後もまだ存在しているでしょう。永続的なテーブルのためのデータフレームはSparkSession
上でテーブル名を使って table
メソッドを呼ぶことで生成することができます。
例えば、テキスト、parquet、jsonなどのファイルベースのデータソースについては、path
オプション、例えばdf.write.option("path", "/some/path").saveAsTable("t")
を使って独自のテーブルパスを指定することができます。テーブルが削除された場合、独自のテーブルパスは削除されず、テーブルのデータはまだあるでしょう。独自のテーブルパスが指定されない場合は、Sparkはデータをウェアハウスのディレクトリの下のデフォルトのテーブルパスに書き込むでしょう。テーブルが削除される時、デフォルトのテーブルパスも削除されるでしょう。
Spark 2.1 から、永続データソーステーブルはHive metastoreに格納されるパーティション毎のmetadataを持ちます。これにより幾つかの恩恵があります:
- metastoreはクエリのための必要なパーティションだけを返すことができるため、テーブルへの最初のクエリで全てのパーティションを見つけることはもう必要ではありません。
ALTER TABLE PARTITION ... SET LOCATION
のようなHive DDLが今ではDatasource APIを使って作成されたテーブルに利用可能であることを意味します。
(path
オプションを持つ)外部データソーステーブルを作る時に、パーティション情報はデフォルトでは集められません。metastor内でパーティション情報を同期するために、MSCK REPAIR TABLE
を起動することができます。
Bucketing, Sorting and Partitioning
ファイルベースのデータソースについては、出力をバケット化およびソート、あるいは分割することができます。バケット化とソートは永続性のあるテーブルのみに適用可能です:
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
CREATE TABLE users_bucketed_by_name(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;
while partitioning can be used with both save
and saveAsTable
when using the Dataset APIs.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
CREATE TABLE users_by_favorite_color(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);
1つのテーブルについて分割とバケット化の両方を使うことが可能です:
peopleDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed")
peopleDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed");
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed"))
CREATE TABLE users_bucketed_and_partitioned(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;
partitionBy
は Partition Discovery の章で説明されるようにディレクトリの構造を作成します。従って、カーディナリティが高いカラムへは適用に制限があります。それとは対照的に、bucketBy
は固定数のバケットに渡ってデータを分散し、ユニークな値に制限が無い場合に使うことができます。
Parquet ファイル
Parquet は多くのほかのデータ処理システムでサポートされているコラム状のフォーマットです。Spark SQL は自動的に元のデータのスキーマを保持するParquetファイルの読み書きの両方のサポートを提供します。Parquetファイルを書く場合、互換性の理由から全てのカラムは自動的にnullが可能なように変換されます。
プログラム的なデータのロード
上の例から以下のようにデータを使用します:
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");
// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
peopleDF = spark.read.json("examples/src/main/resources/people.json")
# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
df <- read.df("examples/src/main/resources/people.json", "json")
# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path "examples/src/main/resources/people.parquet"
)
SELECT * FROM parquetTable
パーティションの発見
テーブルのパーティションはHiveのようなシステムで使われる一般的な最適化の方法です。パーティションされたテーブルの中でデータは各パーティションディレクトリのパスでエンコードされたパーティションカラム値を使って、通常異なるディレクトリに保持されます。Parquetデータソースはこれで自動的にパーティションの情報を発見および推測することができます。例えば、全ての以前に使用したパーティションデータを以下のディレクトリ構造を使って、パーションカラムとしてgender
および country
の2つの追加のカラムを持つパーティションされたテーブルに格納することができます。
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
path/to/table
を SparkSession.read.parquet
または SparkSession.read.load
のどちらかに渡すことで、Spark SQL は自動的にパスからパーティション情報を抽出することができるでしょう。これで、返されるデータフレームのスキーマは以下のようになります:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
パーティションのカラムのデータタイプは自動的に推測されることに注意してください。現在のところ、数学的なデータタイプおよび文字列のタイプがサポートされます。パーティションカラムのデータタイプの自動的な推測をされたくない場合があるかも知れません。そのような場合のために、自動的なタイプの推測はspark.sql.sources.partitionColumnTypeInference.enabled
で設定することができます。デフォルトは true
です。タイプの推測が無効な場合、パーティションカラムとして文字列タイプが使われるでしょう。
Spark 1.6.0から、パーティションの発見はデフォルトで指定されたパスの下のパーティションだけを見つけます。上の例では、もしユーザが path/to/table/gender=male
を SparkSession.read.parquet
または SparkSession.read.load
のどちらかに渡す場合に、gender
はパーティションのカラムとして見なされないでしょう。パーティションの検索を始めるベースパスを指定する必要がある場合は、データソースのオプション内で basePath
を設定することができます。例えば、path/to/table/gender=male
がデータのパスである場合、ユーザはbasePath
を path/to/table/
に設定します。 gender
はパーティションカラムになるでしょう。
スキーマのマージ
ProtocolBuffer, Avro および Thrift のように、Parquet もスキーマの評価をサポートします。ユーザは単純なスキーマから開始し、必要に応じて次第にもっとカラムをスキーマに追加することができます。この場合、ユーザは異なるがお互いにスキーマの互換性がある複数のParquetファイルにするかも知れません。Parquetデータソースは現在では自動的にこのケースを検知し、全てのこれらのファイルのスキーマをマージすることができます。
スキーマのマージは比較的高価な操作であり、多くの場合必要ではないため、1.5.0以降からデフォルトではoffにしています。以下のようにして有効にすることができます
- (以下の例のように)Parquet ファイルを読む時にデータソースオプション
mergeSchema
をtrue
に設定、あるいは - グローバルSQLオプション
spark.sql.parquet.mergeSchema
をtrue
に設定。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// Getters and setters...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// Getters and setters...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
from pyspark.sql import Row
# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
.map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")
# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- double: long (nullable = true)
# |-- single: long (nullable = true)
# |-- triple: long (nullable = true)
# |-- key: integer (nullable = true)
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths
## root
## |-- single: double (nullable = true)
## |-- double: double (nullable = true)
## |-- triple: double (nullable = true)
## |-- key: integer (nullable = true)
Hive メタストア Parquet テーブル交換
Hive metastore Parquetテーブルへの読み書きの時に、Spark SQLはより良いパフォーマンスのためにHive SerDeの代わりに独自のParquetサポートを使おうとするでしょう。この挙動は spark.sql.hive.convertMetastoreParquet
設定によって制御され、デフォルトで作動しています。
Hive/Parquet Schema 調整
テーブルスキーマ処理の観点から、HiveとParquetの間には2つの主要な違いがあります。
- Hive は大文字小文字を区別しませんが、Parquetは区別します。
- Hiveは全てのカラムがnull可能ですが、Parquetには重要な意味があります。
この理由により、Hive metastore Parquet テーブルをSpark SQL Parquet テーブルに変換する場合に、Hive metastore スキーマと Parquet スキーマを調停する必要があります。調停ルールは以下の通りです:
-
両方のスキーマで同じ名前を持つフィールドは、null可能かどうかに関係なく同じデータタイプでなければなりません。調停フィールドはParquet側のデータタイプを持たなければなりません。つまりnull可能かどうかが考慮されます。
-
調停スキーマはHive metastoreスキーマで定義されるそれらのフィールドを正確に含まなければなりません。
- Parquetスキーマにのみ現れる全てのフィールドは調停スキーマの中で落とされます。
- Hive metastoreスキーマにのみ現れる全てのフィールドは調停スキーマの中でnull可能なフィールドとして追加されます。
メタデータのリフレッシュ
Spark SQL はパフォーマンスの向上のためにParquet metadetaとしてキャッシュされます。Hive metastore Parquet テーブルの変換が有効な場合、それらの変換されたテーブルのmetadataもキャッシュされます。もしそれらのテーブルがHiveまたは他の外部のツールで更新された場合、metadataの一貫性のために手動でそれらを更新する必要があります。
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");
# spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
REFRESH TABLE my_table;
設定
Parquetの設定はSparkSession
の setConf
メソッドを使うか、SQLを使ってSET key=value
を実行することで行うことができます。
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.sql.parquet.binaryAsString |
false | 他の幾つかのParquet生成システム、特にImpala, Hive および Spark SQLの古いバージョンは、Parquetスキーマを書き出す時にバイナリデータと文字列の区別をしません。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにバイナリデータを文字列として扱うように指示します。 |
spark.sql.parquet.int96AsTimestamp |
true | 幾つかのParquet生成システム、特にImparaおよびHiveは、タイムスタンプをINT96に格納します。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにINT96データをタイムスタンプとして解釈するように指示します。 |
spark.sql.parquet.cacheMetadata |
true | Parquetスキーマメタデータのキャッシュの機構をオンにします。静的なデータのクエリを高速化することができます。 |
spark.sql.parquet.compression.codec |
snappy | Parquetファイルを書き込む時に圧縮符号化を使うように設定します。利用可能な値には、uncompressed, snappy, gzip, lzo が含まれます。 |
spark.sql.parquet.filterPushdown |
true | trueに設定された場合は、Parquet filter push-down 最適化を有効化します。 |
spark.sql.hive.convertMetastoreParquet |
true | falseに設定した場合は、Spark SQLはparquetテーブルのためにビルトインサポートの代わりにHive SerDeを使用するでしょう。 |
spark.sql.parquet.mergeSchema |
false |
trueの場合、Parquetデータソースは全てのデータファイルから集められたスキーマをマージします。そうでなければ、スキーマは、サマリファイル、あるいはサマリファイルが利用できない場合はランダムデータファイルから取り出されます。 |
spark.sql.optimizer.metadataOnly |
true |
trueの場合、パーティション カラムを生成するために、テーブル走査の代わりにテーブルのメタデータを使うような、メタデータのみのクエリ最適化を有効にします。全ての走査されたカラムがパーティションカラムで、クエリがdistinctセマンティクスを満たす集約オペレータを持つ場合に適用します。 |
JSON データセット
Spark SQL は自動的にJSONデータセットのスキーマを推測しDataset[Row]
としてロードすることができます。この変換は、Dataset[String]
あるいはJSONファイルのどちらかでSparkSession.read.json()
を使って行うことができます。
json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。
通常の複数行のJSONファイルについては、multiLine
オプションをtrue
に設定します。
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Spark SQL は自動的にJSONデータセットのスキーマを推測しDataset<Row>
としてロードすることができます。この変換は、Dataset<String>
あるいはJSONファイルのどちらかでSparkSession.read().json()
を使って行うことができます。
json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。
通常の複数行のJSONファイルについては、multiLine
オプションをtrue
に設定します。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。この変換は、JSONファイル上のSparkSession.read.json
を使って行うことができます。
json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。
通常の複数行のJSONファイルについては、multiLine
パラメータをTrue
に設定します。
# spark is from the previous example.
sc = spark.sparkContext
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "examples/src/main/resources/people.json"
peopleDF = spark.read.json(path)
# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
# +---------------+----+
# | address|name|
# +---------------+----+
# |[Columbus,Ohio]| Yin|
# +---------------+----+
Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。read.json()
を使うと、データを各ファイルの各行がJSONオブジェクトであるJSONファイルのディレクトリからデータをロードします。
json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。
通常の複数行のJSONファイルについては、名前付きパラメータmultiLine
をTRUE
に設定します。
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json"
# Create a DataFrame from the file(s) pointed to by path
people <- read.json(path)
# The inferred schema can be visualized using the printSchema() method.
printSchema(people)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Register this DataFrame as a table.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql methods.
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/people.json"
)
SELECT * FROM jsonTable
Hive テーブル
Spark SQLはApache Hiveに格納されたデータの読み書きもサポートします。しかし、Hiveは多くの依存性を持つため、これらの依存物はデフォルトのSparkの配布物の中に含まれません。もしHiveの依存物をクラスパス上で見つけることができた場合、Sparkはそれらを自動的にロードするでしょう。Hiveに格納されているデータにアクセスするために全てのワーカーノードはHiveシリアライズおよびデシリアライズライブラリ(SerDes)へのアクセスが必要になるため、これらのHiveの依存物は全てのワーカーノード上にも存在しなければなりません。
Hiveの設定は、hive-site.xml
, core-site.xml
(セキュリティ設定) および hdfs-site.xml
(HDFS 設定) ファイルを conf/
に置くことで行われます。
Hiveを動かす場合、永続Hiveメタストアへの接続性、Hive serdesのサポート、および Hiveのユーザ定義関数を含めて、Hiveをサポートする SparkSession
をインスタンス化しなければなりません。既存のHiveデプロイメントを持たないユーザは、Hiveサポートを有効にすることができます。hive-site.xml
によって設定されていない場合、コンテキストが自動的に現在のディレクトリに metastore_db
を作成し、spark.sql.warehouse.dir
によって設定されるディレクトリを生成します。このディレクトリはsparkアプリケーションが開始される現在のディレクトリ内をデフォルトのspark-warehouse
にします。Spark 2.0.0からhive-site.xml
内のhive.metastore.warehouse.dir
プロパティが非推奨であることに注意してください。代わりにwarehouse内のデータベースのデフォルトの場所を指定するためにspark.sql.warehouse.dir
を使います。sparkを開始するユーザへ書き込み権限を与える必要があるかも知れません。
import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
Hiveを動かす場合、HiveサポートのSparkSession
をインスタンス化しなければなりません。これはメタソース内のテーブルを見つけ、HiveSQLを使ってクエリを書き込むためのサポートを追加します。
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
Hiveテーブルのためのストレージ形式の指定
Hiveテーブルを作成する時、このテーブルがどのようにデータをファイルシステムから/へ読み込む/書き込むかを定義する必要があります。つまり“入力フォーマット” と “出力フォーマット”。このテーブルがどのようにデータを行にデシリアライズ、あるいは行をデータにシリアライズするかを定義する必要もあります。つまり “serde”。以下のオプションはストレージのフォーマット(“serde”, “input format”, “output format”)を指定するために使うことができます。例えば、CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
。デフォルトでは、平文のテキストとしてテーブルファイルを読むでしょう。テーブル作成時にHiveストレージ ハンドラはサポートされておらず、Hive側でストレージハンドラを使ってテーブルを作成することができ、それを読むためにSpark SQLを使うことに注意してください。
プロパティ名 | 意味 |
---|---|
fileFormat |
fileFormat は "serde", "input format" および "output format" を含むストレージフォーマットの仕様のパッケージの種類です。現在のところ、6つのfileFormatsがサポートされます: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' および 'avro'。 |
inputFormat, outputFormat |
これらの2つのオプションは文字列リテラルとして`InputFormat` および `OutputFormat` クラスに対応する名前を指定します。例えば、`org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。これらの2つのオプションはペアで現れなければならず、すでに `fileFormat` オプションを指定していた場合でもそれらを指定することはできません。 |
serde |
このオプションはserdeクラスを指定します。`fileFormat` オプションが指定された場合、指定された`fileFormat` がすでにserdeの情報を含む場合はこのオプションを指定しないでください。現在のところ、"sequencefile", "textfile" および "rcfile" はserde情報を含まず、これらの3つのfileFormats を使ってこのオプションを使うことができます。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim |
これらのオプションは "textfile" fileFormatを使ってのみ使うことができます。それらは行に区切られたファイルを読み込む方法を定義します。 |
OPTIONS
を使って定義される他の全てのオプションはHive serdeプロパティと見なされるでしょう。
Hiveメタソースの異なるバージョンの相互影響
SparkのHiveサポートの最も重要な部分の一つにHiveメタストアとの対話があります。これによりSpark SQLはHiveのテーブルのメタデータにアクセスすることができます。Spark 1.4.0から、Spark SQLの単独バイナリビルドが以下で説明する設定を使って異なるバージョンのHiveメタストアにクエリするために使うことができるようになりました。Hiveのバージョンの非依存性はmetastoreについて語られるもので、内部的にはSparkSQLはHive 1.2.1に対してコンパイルされ、それらのクラスを内部的な実行に使用することに注意してください。(serdes, UDFs, UDAFsなど)
以下のオプションがメタデータを扱うのに使われるHiveのバージョンを設定するために使うことができます。
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.sql.hive.metastore.version |
1.2.1 |
Hiveメタストアのバージョン使用可能なオプションは 0.12.0 から 1.2.1 です。
|
spark.sql.hive.metastore.jars |
ビルトイン |
HiveMetastoreClientをインスタンス化するために使われるべきjarの場所。このプロパティは以下の3つのオプションのどれか一つです:
-Phive が有効な場合は、SparkアセンブリにバンドルされているHive 1.2.1を使います。このオプションが選択された場合、spark.sql.hive.metastore.version は 1.2.1 あるいは未定義のどちらかでなければなりません。
|
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
Spark SQLとHiveの特定のバージョンの間で共有されるクラスローダを使ってロードされるべきカンマ区切りのクラスプリフィックスのリスト。共有されるべきクラスの例はJDBCドライバで、メタストアと対話するために必要とされます。共有される必要がある他のクラスは、既に共有されているクラスとやり取りするためのものです。例えば、log4jによって使われる独自のアペンダーです。 |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
Spark SQLと通信をする各バージョンのHiveのために明示的にロードされなければならないクラスのプリフィックスのカンマ区切りのリスト。例えば、一般的なprefixで定義されたHive UDFは共有されるでしょう(例えば、 |
JDBC から他のデータベースへ
Spark SQLはJDBCを使ってほかのデータベースからデータを読み込むことができるデータソースも含みます。この機能はJdbcRDDを使う上で好まれるべきでしょう。なぜなら結果はデータフレームとして返され、それらはSpark SQLの中で簡単に処理することができるか他のデータソースと繋げることができるからです。 JDBCデータソースはユーザにClassTagの提供を要求しないため、JavaあるいはPythonから簡単に使うことができます。(これは他のアプリケーションがSparkSQLを使ってクエリを実行することができるSpark SQL JDBCサーバと異なることに注意してください)。
開始するためには、sparkのクラスパス上に特定のデータベースのためのJDBCドライバを含む必要があるでしょう。例えば、Sparkシェルからpostgresに接続するには、以下のコマンドを実行するかもしれません:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
リモートデータベースからのテーブルは、データソースAPIを使ってデータフレームあるいはSpark SQLのテンポラリ ビューとしてロードすることができます。ユーザはデータソースオプションの中でJDBC接続プロパティを指定することができます。データソースに記録するために、user
と password
は通常接続プロパティとして提供されます。接続プロパティに加えて、Sparkは以下の大文字小文字を区別しないオプションもサポートします:
プロパティ名 | 意味 |
---|---|
url |
接続するための JDBC URLソース固有の接続プロパティはURL内で指定されるかも知れません。例えば, jdbc:postgresql://localhost/test?user=fred&password=secret
|
dbtable |
読み込まれる必要があるJDBCテーブル。SQLクエリのFROM 句で有効なものを全て使用できることに注意してください。例えば、完全なテーブルの代わりに、丸括弧内のサブクエリも使うことができます。
|
driver |
このURLに接続するために使われるJDBCドライバのクラス名。 |
partitionColumn, lowerBound, upperBound |
これらのオプションは、いずれかが指定される場合は全て指定されなければなりません。更に、numPartitions が指定されなければなりません。複数のワーカーから並行して読み込む時は、それらはどうやってテーブルを分割するかを説明します。partitionColumn は問題のテーブルからの数字のカラムでなければなりません。lowerBound と upperBound はパーティションのストライドを決めるために使われるだけで、テーブル内の行をフィルタするためのものでは無いことに注意してください。つまりテーブル内の全ての行が分割され返されるでしょう。このオプションは読み込みにのみ適用されます。
|
numPartitions |
テーブルの読み書きの並行のために使うことができるパーティションの最大数。これは同時のJDBC接続の最大数も決定します。書き込みのパーティションの数がこの制限を超える場合、書き込む前に coalesce(numPartitions) を呼ぶことで書き込みの数をこの制限まで減らすことができます。
|
fetchsize |
JDBCの fetchサイズ。これは一回でどれだけの数の行をfetchするかを決定します。これはデフォルトが少ないフェッチサイズのJDBCドライバ上でパフォーマンスを良くします (例えば、Oracleは10行)。このオプションは読み込みにのみ適用されます。 |
batchsize |
JDBCの バッチサイズ。これは一回でどれだけの数の行を挿入するかを決定します。これはJDBCドライバ上でのパフォーマンスを良くするかもしれません。このオプションは書き込みにのみ適用されます。デフォルトは1000 です。
|
isolationLevel |
トランザクションの隔離レベル。これは現在の接続に適用されます。NONE , READ_COMMITTED , READ_UNCOMMITTED , REPEATABLE_READ あるいは SERIALIZABLE のうちの一つが可能で、JDBCの接続オブジェクトによって定義される標準的なトランザクション隔離レベルに対応します。デフォルトはREAD_UNCOMMITTED です。このオプションは書き込みにのみ適用されます。java.sql.Connection のドキュメントを参照してください。
|
truncate |
これはJDBC writerに関係するオプションです。SaveMode.Overwrite が有効な場合、このオプションはSparkに既存のテーブルをdropして再createする代わりにtruncateさせます。これはもっと効率的で、テーブルのメタデータ(例えば、indices)が削除されることを防ぎます。しかし、新しいデータが異なるスキーマを持つなど、いくつかの場合に動作しないでしょう。デフォルトはfalse です。このオプションは書き込みにのみ適用されます。
|
createTableOptions |
これはJDBC writerに関係するオプションです。指定された場合、このオプションはテーブルを作成する時にデータベース固有のテーブルとパーティションオプションを設定することができます (例えば CREATE TABLE t (name string) ENGINE=InnoDB. )。このオプションは書き込みにのみ適用されます。
|
createTableColumnTypes |
テーブルを作成する時のデフォルトの代わりに使うデータベースカラムのデータ型。データ型の情報は CREATE TABLE カラム構文 (例えば: "name CHAR(64), comments VARCHAR(1024)") と同じ形式で指定されなければなりません。指定された型は有効なspark sql データ型でなければなりません。このオプションは書き込みにのみ適用されます。
|
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
user 'username',
password 'password'
)
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable
トラブルシューティング
- JDBCドライバクラスはクライアントセッションおよび全てのexecutorの根本的なクラスローダから見えなければなりません。これはJavaのDriverManagerクラスが、接続を開こうとする時に根本的なクラスローダから見えない全てのドライバを無視することになるセキュリティチェックを行うからです。これをする一つの簡単な方法が、全てのワーカーノード上のcompute_classpath.shをドライバーJARを含むように修正することです。
- H2のような幾つかのデータベースは全ての名前を大文字に変換します。Spark SQLでそれらの名前を参照するには大文字を使う必要があるでしょう。
パフォーマンス チューニング
ちょっとした次善策として、メモリにデータをキャッシュ、あるいは幾つかの実験的なオプションを調整することでパフォーマンスを改善することができます。
メモリへのデータのキャッシュ
Spark SQL は spark.catalog.cacheTable("tableName")
あるいはdataFrame.cache()
を呼ぶことでインメモリのコラム形式のフォーマットを使ってテーブルをキャッシュすることができます。そして、Spark SQLは必要なカラムだけをスキャンし、メモリの使用量とGCの圧力を最小化するために圧縮を自動的に調整するでしょう。メモリからテーブルを削除するために spark.catalog.uncacheTable("tableName")
を呼ぶことができます。
メモリ内キャッシングの設定はSparkSession
のsetConf
メソッドあるいは SQLを使ってSET key=value
コマンドを実行することで行うことができます。
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | trueに設定した場合はSpark SQLはデータの統計に基づいて各カラムの圧縮コーディックを自動的に選択するでしょう。 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | カラムキャッシュのためのバッチのサイズを制御します。バッチのサイズを大きくするとメモリの利用率と圧縮が改善できますが、データをキャッシュする時にOOMのリスクがあります。 |
他の設定オプション
以下のオプションもクエリ実行のパフォーマンスを調整するために使用することができます。これらのオプションはもっと多くの最適化が自動的に行われるため、将来のリリースでは非推奨になるかもしれません。
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.sql.files.maxPartitionBytes |
134217728 (128 MB) | ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。 |
spark.sql.files.openCostInBytes |
4194304 (4 MB) | ファイルを開くための予測コストは同じ時間で操作することができるバイト数によって計測することができます。これは複数のファイルを1つのパーティションに配置する場合に使われます。過剰に予測するほうが良いです。そうれうば、小さなファイルを持つパーティションは大きなファイルを持つパーティションよりも高速になるでしょう(最初にスケジュールされます)。 |
spark.sql.broadcastTimeout |
300 |
ブロードキャストjoinでのブロードキャスト待ち時間のタイムアウト秒数 |
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) |
joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan が実行された場合、現在のところ統計はHiveメタストアテーブルのみがサポートされることに注意してください。
|
spark.sql.shuffle.partitions |
200 | joinあるいは集約のためにデータをシャッフルする時に使用するパーティションの数を設定します。 |
分散SQLエンジン
Spark SQLはJDBC/ODBCあるいはコマンドラインインタフェースを使って分散型クエリエンジンとして振舞うこともできます。このモードでは、エンドユーザあるいはアプリケーションがコードを書くこと無しにSQLクエリを直接実行するためにSpark SQLと直接やり取りをすることができます。
Thrift JDBC/ODBC サーバの実行
ここで実装されているThrift JDBC/ODBC サーバは、Hive 1.2.1のHiveServer2
に対応します。SparkあるいはHive 1.2.1に同梱されるbeelineスクリプトを使ってJDBCサーバをテストすることができます。
JDBC/ODBCサーバを開始するには、Sparkディレクトリで以下を実行します:
./sbin/start-thriftserver.sh
このスクリプトはHiveプロパティを指定する--hiveconf
を加えて全ての bin/spark-submit
コマンドラインを受け付けます。全ての利用可能なオプションのリストのために./sbin/start-thriftserver.sh --help
を実行するかも知れません。デフォルトでは、サーバはlocalhsot:10000をlistenします。以下のように環境変数を使ってこの挙動を上書きできます。例えば:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...
あるいはシステムプロパティを使って上書き:
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...
これで、Thrift JDBC/ODBC サーバをテストするために beelineを使うことができます:
./bin/beeline
以下のようにしてbeeline内でJDBC/ODBCに接続します:
beeline> !connect jdbc:hive2://localhost:10000
Beeline はユーザ名とパスワードを尋ねるでしょう。セキュアで無いモードでは、単にマシーン上のユーザ名を入力し空のパスワードを入力します。セキュアモードのためには、beeline ドキュメント にある手順に従ってください。
Hiveの設定は、hive-site.xml
をconf/
内の core-site.xml
および hdfs-site.xml
ファイルに置き換えることで行われます。
Hiveに付属するbeeline スクリプトを使うかも知れません。
Thrift JDBC サーバはHTTPトランスポート上のThrift RPC メッセージの送信もサポートします。以下の設定を使ってシステムプロパティあるいはconf/
内のhive-site.xml
ファイルの中でHTTPモードを有効にします:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
テストするには、beelineを使って以下のようにしてhttpモードのJDBC/ODBCサーバに接続します:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
Spark SQL CLIの実行
Spark SQL CLI はローカルモードでHiveメタストアサービスをを実行するのに便利なツールで、コマンドラインからの入力のクエリを実行します。Spark SQL CLIはThrift JDBCサーバとやりとりできないことに注意してください。
Spark SQL CLIを開始するには、Spark ディレクトリで以下を実行してください:
./bin/spark-sql
Hiveの設定は、hive-site.xml
をconf/
内の core-site.xml
および hdfs-site.xml
ファイルに置き換えることで行われます。全ての利用可能なオプションの完全なリストのために、./bin/spark-sql --help
を実行するかもしれません。
移行ガイド
Spark SQL 2.1から2.2へのアップグレード
- Spark 2.1.1 は新しい設定キーを導入しました:
spark.sql.hive.caseSensitiveInferenceMode
.NEVER_INFER
のデフォルトの設定を持っていて、2.1.0 と同一の挙動を維持します。しかし、Spark 2.2.0 は背後にあるファイルシステムが大文字小文字が混じったカラム名を持つHive メタストアのテーブルの読み込みとの互換性を復元するために、このデフォルトの値の設定をINFER_AND_SAVE
に変更しました。INFER_AND_SAVE
設定値を使って、Sparkの最初のアクセスは、推測されたスキーマにまだ保存されていないHiveメタストア テーブル上のスキーマの推測を実施します。スキーマの推測は何千ものパーティションを持つテーブルについてはとても時間を消費する操作になるかも知れないことに注意してください。大文字小文字が混じったカラム名の互換性が重要では無い場合は、スキーマ推測の初期のオーバーヘッドを避けるためにspark.sql.hive.caseSensitiveInferenceMode
をNEVER_INFER
に設定することができます。新しいデフォルトのINFER_AND_SAVE
設定を使って、スキーマ推測の結果が将来使うためにメタストア キーとして保存されることに注意してください。従って、初期のスキーマ推測はテーブルの最初のアクセス時のみに起こります。
Spark SQL 2.0から2.1へのアップグレード
- Datasource テーブルは今ではHive metastore にパーティション metaデータを格納します。このことは
ALTER TABLE PARTITION ... SET LOCATION
のようなHive DDLが今ではDatasource APIを使って作成されたテーブルに利用可能であることを意味します。- 従来のデータソース テーブルは
MSCK REPAIR TABLE
コマンドを使ってこの形式に移行することができます。従来のテーブルの移行はHive DDLサポートを利用するためにお勧めで、プランのパフォーマンスが改善されます。 - テーブルが移行されたかどうかを決定するために、テーブル上で
DESCRIBE FORMATTED
を発行する時にPartitionProvider: Catalog
属性を調べてください。
- 従来のデータソース テーブルは
- データソース テーブルのための
INSERT OVERWRITE TABLE ... PARTITION ...
の挙動に変更します。- 以前のSparkのバージョンでは、
INSERT OVERWRITE
はパーティションの使用を与えられた場合でもデータソーステーブル全体を上書きしました。今では仕様に合致するパーティションだけが上書きされます。 - これはまだHiveテーブルの挙動と異なることに注意してください。新しく挿入されたデータと重なるパーティションのみ上書きします。
- 以前のSparkのバージョンでは、
Spark SQL 1.6から2.0へのアップグレード
-
今では、
SparkSession
には古いSQLContext
およびHiveContext
を置き換える新しいSparkのエントリーポイントがあります。古いSQLContextとHiveContextは後方互換性のために残されていることに注意してください。新しいcatalog
インタフェースはSparkSession
からアクセス可能です -listTables
,createExternalTable
,dropTempView
,cacheTable
のようなデータベースおよびテーブルアクセスの既存のAPIは削除されます。 -
データベースAPIとデータフレームAPIは統合されます。Scalaでは、
DataFrame
はDataset[Row]
のタイプエイリアスになりますが、Java APIユーザはDataFrame
をDataset<Row>
に置き換える必要があります。型有り変換 (例えばmap
,filter
とgroupByKey
) と 型無し変換 (例えばselect
とgroupBy
) の両方がデータセットクラスで利用可能です。PythonとRのコンパイル時の型セーフティは言語的な機能では無いため、データセットの概念はこれらの言語のAPIに適用されません。代わりに、DataFrame
は主要なプログラミング抽象を維持します。これはこれらの言語での1ノードのデータフレームの概念に類似しています。 - データセットとデータフレームAPIの
unionAll
は非推奨になり、union
に置き換えられました。 - データセット とデータフレーム APIの
explode
は非推奨になりました。代わりのものとして、select
あるいはflatMap
付きのfunctions.explode()
を使ってください。 -
データセットおよびデータフレームAPI
registerTempTable
はcreateOrReplaceTempView
と置き換えられるために非推奨になりました。 - Hiveテーブルのための
CREATE TABLE ... LOCATION
に代わりました。- Spark 2.0から、
CREATE TABLE ... LOCATION
はCREATE EXTERNAL TABLE ... LOCATION
に等価になり、ユーザが提供した場所での既存のデータを偶然にdropすることを避けます。このことは、ユーザが定義した場所のSpark SQL内のHiveテーブルは常にHive外部テーブルであることを意味します。外部テーブルのdropはデータを削除しないでしょう。ユーザはHiveが管理するテーブルの場所を指定することができません。これはHiveの挙動とは異なることに注意してください。 - 結果として、これらのテーブル上の
DROP TABLE
文はデータを削除しないでしょう。
- Spark 2.0から、
Spark SQL 1.5から1.6へのアップグレード
- Spark 1.6から、デフォルトでThriftサーバが複数セッションモードで実行します。つまり、各JDBC/ODBC 接続はそれら独自のSQL設定および一次的な関数の登録のコピーを持つことを意味します。キャッシュされたテーブルはまだ共有されています。古い1つのセッションモードでThriftサーバを実行したい場合、オプション
spark.sql.hive.thriftServer.singleSession
をtrue
に設定してください。このオプションをspark-defaults.conf
に追加するか、それを--conf
を使ってstart-thriftserver.sh
に渡すかのどちらかかも知れません。
./sbin/start-thriftserver.sh \
--conf spark.sql.hive.thriftServer.singleSession=true \
...
-
1.6.1から、SparkRの withColumn メソッドは新しいカラムの追加、あるいはデータフレームの同じ名前の既存のカラムの置き換えをサポートします。
-
Spark 1.6から、TimestampType への LongType キャストはマイクロ秒ではなく秒を期待します。この変更は、数値型からTimestampTypeへの矛盾のない型のキャストのためにHive1.2の挙動に一致させるために行われました。詳細は SPARK-11724 を見てください。
Spark SQL 1.4から1.5へのアップグレード
- 今は、表現の評価のためのコード生成と一緒に、手動メモリ管理(Tungsten)を使った実行の最適化がデフォルトで有効です。これらの機能は
spark.sql.tungsten.enabled
をfalse
に設定することで両方とも無効にすることができます。 - Parquet スキーマのマージはデフォルトでもう有効にされません。
spark.sql.parquet.mergeSchema
をtrue
に設定することで再有効化することができます。 - カラムを限定するあるいは入れ子の値にアクセスするためにdots(
.
)を使ったPythonでの文字列のカラムへの変換が今はサポートされます。例えば、df['table.column.nestedField']
。しかし、このことは、もしカラム名がドットを含む場合、今はバックチックを使ってエスケープしなければならないことを意味します(例えば、`column.with.dots`.nested)。 - メモリ内カラムストレージパーティションの切り詰めはデフォルトで有効です。
spark.sql.inMemoryColumnarStorage.partitionPruning
をfalse
に設定することで無効にすることができます。 - 精度の制限無しの数値カラムはもうサポートされません。代わりにSparkSQLを最大の精度38に強制してください。
BigDecimal
オブジェクトからスキーマを推測する場合、今は精度 (38,18) が使われます。DDLで精度が指定されない場合、デフォルトはDecimal(10, 0)
のままです。 - 今はタイムスタンプは1nsではなく1msで格納されます。
sql
の方言では、今は浮動小数点数が10進としてパースされます。HiveQL のパースは変更されないままです。- SQL/DataFrame関数の正式名は今は小文字です(例えば、sum と SUM)。
- JSONデータソースは他のアプリケーションによって生成された新しいファイル(つまり、SparkSQLによってデータセットに挿入されていないファイル)を自動的にロードしないでしょう。JSON永続化ファイル(つまり、テーブルのメタデータはHive Metastorに格納されます)に関しては、ユーザはそれらの新しいファイルをテーブルに含めるために
REFRESH TABLE
SQL コマンド、あるいはHiveContext
のrefreshTable
メソッドを使うことができます。JSONデータセットを表す DataFrame については、ユーザはDataFrameを再生成する必要があり、新しいDataFrameは新しいファイルに含まれるでしょう。 - pySparkの DataFrame.withColumn メソッドは新しいカラムの追加、あるいは同じ名前の既存のカラムの置き換えをサポートします。
Spark SQL 1.3から1.4へのアップグレード
DataFrame データの reader/writer インタフェース
ユーザのフィードバックにより、(SQLContext.read
)の中のデータを読み込み、(DataFrame.write
)へ書き込むための新しくもっと柔軟なAPIを作成しました。古いAPIは非推奨にされました (例えば、SQLContext.parquetFile
, SQLContext.jsonFile
)。
SQLContext.read
( Scala, Java, Python ) および DataFrame.write
( Scala, Java, Python ) についての詳細な情報はAPIドキュメントを見てください。
グルーピング カラムを保持するDataFrame.groupBy
ユーザフィードバックに基づいて、DataFrame
の結果のグルーピング カラムを保持するためにDataFrame.groupBy().agg()
のデフォルトの挙動を変更しました。1.3での挙動を維持するためには、spark.sql.retainGroupColumns
を false
に設定してください。
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))
// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))
// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));
// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));
// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");
import pyspark.sql.functions as func
# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))
# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))
# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
DataFrame.withColumn上での挙動の変更
1.4より前は、DataFrame.withColumn() はカラムの追加のみをサポートします。同じ名前の既存のカラムがあったとしても、結果のデータフレーム内の指定された名前を持つ新しいカラムとして、そのカラムが常に追加されるでしょう。1.4から、DataFrame.withColumn() は既存のすべてのカラムの名前と異なるカラム名の追加、あるいは同じ名前の既存のカラムの置きかえをサポートします。
この変更はScala APIだけのもので、PySparkあるいはSparkRのものでは無いことに注意してください。
Spark SQL 1.0-1.2 から 1.3 へのアップグレード
Spark 1.3. では、Spark SQLから "Alpha"のラベルが取り除かれ、この一環として利用可能なAPIの整理が行われました。Spark1.3以降は、Spark SQLは1.Xシリーズの他のリリースとバイナリ互換性を提供するでしょう。この互換性の保証には、明示的に安定していないと印を付けられている(つまり、DeveloperAPI あるいは Experimental)APIは含まれません。
SchemaRDDからデータフレームへの変更
Spark SQL 1.3にアップグレードした時にユーザが気づく最も大きな変更は、SchemaRDD
が DataFrame
に名前が変更されることです。これは、データフレームがもはやRDDから直接継承されないが、RDD自身の実装でRDDが提供するほとんどの機能を代わりに提供するため、重要です。データフレームは.rdd
メソッドを呼ぶことでRDDに変換することも可能です。
Scalaでは、いくつかのユースケースのためのソースの互換性を提供するために、SchemaRDD
から DataFrame
へのタイプのエイリアスがあります。それでもDataFrame
を代わりに使うためにコードを更新することをお勧めします。Java および Python ユーザはコードを更新する必要は無いでしょう。
JavaとScala APIの統一
Spark 1.3 以降には、Scala APIを模倣した別個のJava互換クラス (JavaSQLContext
および JavaSchemaRDD
) があります。Spark 1.3 ではJavaAPIおよびScala APIは統合されました。どちらかの言語のユーザは SQLContext
および DataFrame
を使用しなければなりません。一般的にこれらのクラスは両方の言語で使用可能なタイプ(つまり、言語特有のコレクションの代わりに Array
)を使用しようとします。一般的なタイプが無い場合(例えば、クロージャーあるいはMapを渡す)、代わりに関数の上書きが使われます。
更に、Java固有のタイプのAPIが削除されました。ScalaおよびJavaのユーザはプログラム的にスキーマを説明するためにorg.apache.spark.sql.types
にあるクラスを使わなければなりません。
dslパッケージの明示的な交換と削除の分離 (Scalaのみ)
import sqlContext._
から始まるSpark 1.3より前の多くのコード例、sqlContextからの全ての関数はスコープに入れられました。Spark 1.3では、RDD
から DataFrame
への変換のための暗黙的な変換は、SQLContext
の中に隔離しました。今はユーザは import sqlContext.implicits._
を書かなければなりません。
Additionally, the implicit conversions now only augment RDDs that are composed of Product
s (i.e.,
case classes or tuples) with a method toDF
, instead of applying automatically.
When using function inside of the DSL (now replaced with the DataFrame
API) users used to import org.apache.spark.sql.catalyst.dsl
. 代わりに公開データフレーム関数APIが使われるべきです: import org.apache.spark.sql.functions._
.
DataTypeのための org.apache.spark.sql 内のタイプエイリアスの削除 (Scala のみ)
Spark 1.3 は、DataType
のための基本sqlパッケージにあったタイプのエイリアスを削除しました。ユーザはorg.apache.spark.sql.types
にあるクラスを代わりにインポートしなければなりません。
UDF 登録はsqlContext.udf
に移動しました (Java & Scala)
UDFを登録するために使われる関数、データフレーム DSLあるいはSQLの両方で使われる、は、SQLContext
内のudfオブジェクトに移動されました。
sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);
Python のUDF登録は変更されません。
Python DataType はもうシングルトンではありません
Pythonでデータタイプを使う場合は、シングルトンを参照する代わりにそれら(つまり StringType()
) を構築する必要があるでしょう。
Apache Hiveとの互換性
Spark SQL はHiveメタソース、SerDesおよびiUDFと互換性があるように設計されています。現在のところ、Hive SerDes と UDFs はHive 1.2.1に基づいていて、Spark SQLは異なるバージョンのHive Metastoreに接続されることができます (0.12.0 から 2.1.1)[Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore))も見てください。
既存のHiveウェアハウスへのデプロイ
Spark SQL Thrift JDBC サーバは"追加設定無しで"既存のHiveインストレーションと互換性があるように設計されています。既存のHiveメタストアを修正したり、あるいはデータの配置またはテーブルのパーティションを変更する必要はありません。
サポートされるHive機能
Spark SQLは以下のようなHiveの機能の大部分をサポートします:
- Hive クエリステートメント。以下を含みます:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
- 全てのHive オペレータ。以下を含みます:
- 関係オペレーション (
=
,⇔
,==
,<>
,<
,>
,>=
,<=
など) - Arithmetic operators (
+
,-
,*
,/
,%
など) - 論理オペレータ (
AND
,&&
,OR
,||
など) - 複雑なタイプのコンストラクタ
- 数学関数 (
sign
,ln
,cos
など) - 文字関数 (
instr
,length
,printf
など)
- 関係オペレーション (
- ユーザ定義関数 (UDF)
- ユーザ定義の集約関数 (UDAF)
- ユーザ定義のシリアライズフォーマット (SerDes)
- Window functions
- Joins
JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
- Unions
- サブクエリ
SELECT col FROM ( SELECT a + b AS col from t1) t2
- 標本化
- Explain
- 動的なパーティションの挿入を含む分割されたテーブル
- View
- 全てのHive DDL関数。以下を含みます:
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
- ほとんどのHiveデータタイプ。以下を含みます:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
サポートされないHive機能
以下はまだサポートされないHive機能のリストです。それらの機能のほとんどはHiveデプロイメントでほとんど使われません。
主要なHive機能
- バケットを使ったテーブル: バケットはHiveテーブルパーティション内のハッシュパーティショニングです。Spark SQLはまだバケットをサポートしません。
難解なHive機能
UNION
type- Unique join
- カラムの統計の収集: Spark SQL は今のところはカラムの統計を集めるために抱き合わせの走査をしませんが、hiveメタストアの sizeInBytes フィールドの取り込みのみをサポートします。
Hive 入力/出力 フォーマット
- CLIのためのファイルフォーマット: CLIに表示のための結果を返すために、Spark SQLは TextOutputFormat だけをサポートします。
- Hadoop 書庫
Hive 最適化
扱いにくいHive最適化はまだSparkに含まれていません。(インデックスのような)それらのいくつかはSpark SQLのインメモリ計算モデルに重要ではありません。他はSpark SQLの将来のリリースに組み込まれます。
- ブロックレベルのビットマップのインデックスと仮想カラム(インデックスの構築のために使用される)
- joinおよびgroupbyのためのreducerの数の自動決定: Spark SQLでは現在のところ"
SET spark.sql.shuffle.partitions=[num_tasks];
"を使って並行post-shuffleの次数を制御する必要があります。 - メタデータのみのクエリ: メタデータのみを使って回答可能なクエリの場合、Spark SQLは結果を計算するためにタスクを起動します。
- Skew データフラグ: Spark SQLはHive内のskewデータフラグに従いません。
STREAMTABLE
joinのヒント: Spark SQL はSTREAMTABLE
ヒントに従いません。- クエリの結果のための複数の小さなファイルのマージ: 結果出力が複数の小さなファイルを含む場合、HiveはHDFSメタデータのオーバーフローを防ぐために、任意に小さなファイルを少数の亜フィルにマージすることができます。Spark SQL はこれをサポートしません。
リファレンス
データの種類
Spark SQL と DataFrames は以下のデータタイプをサポートします:
- 数値タイプ
ByteType
: 1バイトの符号あり整数値を表します。数値の範囲は-128
から127
です。ShortType
: 2バイトの符号あり整数値を表します。数値の範囲は-32768
から32767
です。IntegerType
: 4バイトの符号あり整数値を表します。数値の範囲は-2147483648
から2147483647
です。LongType
: 8バイトの符号あり整数値を表します。数値の範囲は-9223372036854775808
から9223372036854775807
です。FloatType
: 4バイトの単精度浮動少数を表します。DoubleType
: 8バイトの複精度浮動少数を表します。DecimalType
: 任意の精度の符号あり10進数を表します。内部的にはjava.math.BigDecimal
によって支援されます。BigDecimal
は、任意の精度のスケールしない整数値、および32ビットの数値スケールから成ります。
- 文字列タイプ
StringType
: 文字列値を表します。
- バイナリタイプ
BinaryType
: バイトシーケンス値を表します。
- 真偽タイプ
BooleanType
: 真偽値を表します。
- 日付タイプ
TimestampType
: 年、月、日、時、分、秒のフィールドの値を構成する値を表します。DateType
: 年、月、日のフィールドの値を構成する値を表します。
- 複雑なタイプ
ArrayType(elementType, containsNull)
:elementType
のタイプの要素の順列を構成する値を表します。containsNull
はArrayType
値の中の要素がnull
値を持つことができるかどうかを指示するために使われます。MapType(keyType, valueType, valueContainsNull)
: キーバリューペアのセットを構成する値を表します。キーのデータタイプはkeyType
によって説明され、値のデータタイプはvalueType
によって説明されます。MapType
値については、キーがnull
値を持つことは許されません。valueContainsNull
はMapType
値の値がnull
値を持つかどうかを指示するために使われます。StructType(fields)
:StructField
(fields
) の順列によって表現される構造の値を表します。StructField(name, dataType, nullable)
:StructType
のフィールドを表します。フィールドの名前はname
によって指示されます。フィールドのデータタイプはdataType
によって指示されます。nullable
はこのフィールドの値がnull
値を持つことができるかどうかを指示するために使われます。
Spark SQLの全てのデータタイプは org.apache.spark.sql.types
パッケージの中にあります。以下のようにしてアクセスすることができます
import org.apache.spark.sql.types._
データタイプ | Scalaでの値のタイプ | データタイプにアクセスあるいは生成するためのAPI |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |
IntegerType | Int | IntegerType |
LongType | Long | LongType |
FloatType | Float | FloatType |
DoubleType | Double | DoubleType |
DecimalType | java.math.BigDecimal | DecimalType |
StringType | 文字列 | StringType |
BinaryType | Array[Byte] | BinaryType |
BooleanType | 真偽値 | BooleanType |
TimestampType | java.sql.Timestamp | TimestampType |
DateType | java.sql.Date | DateType |
ArrayType | scala.collection.Seq |
ArrayType(elementType, [containsNull]) 注意: containsNull のデフォルト値は trueです。 |
MapType | scala.collection.Map |
MapType(keyType, valueType, [valueContainsNull]) 注意: valueContainsNull のデフォルト値は trueです。 |
StructType | org.apache.spark.sql.Row |
StructType(fields) 注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。 |
StructField | このフィールドのデータタイプのScalaでの値のタイプ(例えば、データタイプ IntegerTypeのStructFieldのためのInt) |
StructField(name, dataType, [nullable]) 注意: nullableのデフォルトの値は trueです。 |
Spark SQLの全てのデータタイプは org.apache.spark.sql.types
のパッケージ内にあります。データタイプにアクセスあるいは生成するには、org.apache.spark.sql.types.DataTypes
で提供されるファクトリーメソッドを使ってください。
データタイプ | Javaでの値タイプ | データタイプにアクセスあるいは生成するためのAPI |
---|---|---|
ByteType | byte あるいは Byte | DataTypes.ByteType |
ShortType | short あるいは Short | DataTypes.ShortType |
IntegerType | int あるいは Integer | DataTypes.IntegerType |
LongType | long あるいは Long | DataTypes.LongType |
FloatType | float あるいは Float | DataTypes.FloatType |
DoubleType | double あるいは Double | DataTypes.DoubleType |
DecimalType | java.math.BigDecimal |
DataTypes.createDecimalType() DataTypes.createDecimalType(precision, scale). |
StringType | 文字列 | DataTypes.StringType |
BinaryType | byte[] | DataTypes.BinaryType |
BooleanType | boolean あるいは Boolean | DataTypes.BooleanType |
TimestampType | java.sql.Timestamp | DataTypes.TimestampType |
DateType | java.sql.Date | DataTypes.DateType |
ArrayType | java.util.List |
DataTypes.createArrayType(elementType) 注意: containsNullの値はtrueでしょう。 DataTypes.createArrayType(elementType, containsNull). |
MapType | java.util.Map |
DataTypes.createMapType(keyType, valueType) 注意: valueContainsNullの値はtrueでしょう。 DataTypes.createMapType(keyType, valueType, valueContainsNull) |
StructType | org.apache.spark.sql.Row |
DataTypes.createStructType(fields) 注意: fields はListあるいはStructFieldsの配列です。また、同じ名前の2つのフィールドは許されません。 |
StructField | このフィールドのデータタイプのJavaでの値のタイプ(例えば、データタイプ IntegerTypeのStructFieldのためのint) | DataTypes.createStructField(name, dataType, nullable) |
Spark SQLの全てのデータタイプは pyspark.sql.types
パッケージ内にあります。以下のようにしてアクセスすることができます
from pyspark.sql.types import *
データタイプ | Pythonでの値タイプ | データタイプにアクセスあるいは生成するためのAPI |
---|---|---|
ByteType |
int あるいは long 注意: 実行時に数値は1バイトの符号あり数値に変換されるでしょう。数値は-128から127の範囲にあるようにしてください。 |
ByteType() |
ShortType |
int あるいは long 注意: 実行時に数値は2バイトの符号あり数値に変換されるでしょう。数値は-32768から32767の範囲にあるようにしてください。 |
ShortType() |
IntegerType | int あるいは long | IntegerType() |
LongType |
long 注意: 実行時に数値は8バイトの符号あり数値に変換されるでしょう。数値は-9223372036854775808 から 9223372036854775807の範囲にあるようにしてください。そうでなければ、データをdecimal.Decimalに変換し、DecimalTypeを使ってください。 |
LongType() |
FloatType |
float 注意: 実行時に数値は4バイトの符号あり単精度浮動小数点に変換されるでしょう。 |
FloatType() |
DoubleType | float | DoubleType() |
DecimalType | decimal.Decimal | DecimalType() |
StringType | 文字列 | StringType() |
BinaryType | bytearray | BinaryType() |
BooleanType | bool | BooleanType() |
TimestampType | datetime.datetime | TimestampType() |
DateType | datetime.date | DateType() |
ArrayType | リスト、組、あるいは配列 |
ArrayType(elementType, [containsNull]) 注意: containsNull のデフォルト値は Trueです。 |
MapType | dict |
MapType(keyType, valueType, [valueContainsNull]) 注意: valueContainsNull のデフォルト値はTrueです。 |
StructType | リストあるいは組 |
StructType(fields) 注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。 |
StructField | このフィールドのデータタイプのPythonでの値のタイプ(例えば、データタイプIntegerTypeのStructFieldのためのInt) |
StructField(name, dataType, [nullable]) 注意: nullableのデフォルト値は Trueです。 |
データタイプ | Rでの値のタイプ | データタイプにアクセスあるいは生成するためのAPI |
---|---|---|
ByteType |
integer 注意: 実行時に数値は1バイトの符号あり数値に変換されるでしょう。数値は-128から127の範囲にあるようにしてください。 |
"byte" |
ShortType |
integer 注意: 実行時に数値は2バイトの符号あり数値に変換されるでしょう。数値は-32768から32767の範囲にあるようにしてください。 |
"short" |
IntegerType | integer | "integer" |
LongType |
integer 注意: 実行時に数値は8バイトの符号あり数値に変換されるでしょう。数値は-9223372036854775808 から 9223372036854775807の範囲にあるようにしてください。そうでなければ、データをdecimal.Decimalに変換し、DecimalTypeを使ってください。 |
"long" |
FloatType |
numeric 注意: 実行時に数値は4バイトの符号あり単精度浮動小数点に変換されるでしょう。 |
"float" |
DoubleType | numeric | "double" |
DecimalType | サポートされません。 | サポートされません。 |
StringType | character | "string" |
BinaryType | raw | "binary" |
BooleanType | logical | "bool" |
TimestampType | POSIXct | "timestamp" |
DateType | Date | "date" |
ArrayType | vector あるいは list |
list(type="array", elementType=elementType, containsNull=[containsNull]) 注意: containsNullのデフォルト値は TRUEです。 |
MapType | environment |
list(type="map", keyType=keyType, valueType=valueType, valueContainsNull=[valueContainsNull]) 注意: valueContainsNullのデフォルト値は TRUEです。 |
StructType | 名前付きリスト |
list(type="struct", fields=fields) 注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。 |
StructField | このフィールドのデータタイプのRでの値のタイプ(例えば、データタイプIntegerTypeのStructFieldのためのinteger) |
list(name=name, type=dataType, nullable=[nullable]) 注意: nullable のデフォルト値は TRUEです。 |
NaN Semantics
標準の浮動小数点の記号に正確の一致しない float
あるいは double
タイプを扱う時に、not-a-number (NAN)の特別な処理があります。具体的には:
- NaN = NaN は true を返します。
- 集約の場合、全てのNaNの値は一緒にグループ化されます。
- NaN はjoinキー内での通常の値として扱われます。
- 昇順の場合、NaN の値はどの他の数値よりも大きく、最後にきます。