Spark SQL, データフレーム および データセット ガイド

概要

Spark SQL は構造化されたデータの処理のためのSparkモジュールです。基本のRDD API とは異なり、Spark SQLによって提供されるインタフェースは、Sparkにデータと実行されている計算の両方についての構造のもっと多くの情報を提供します。内部的には、Spark SQLはこの特別な上方を特別な最適化を実施するために使用します。SQL、データフレームAPIおよびデータセットAPIを含むSpark SQLとやり取りをする幾つかの方法があります。hWhen computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between the various APIs based on which provides the most natural way to express a given transformation.

このページの全ての例はSpark配布物に含まれるサンプルデータをしようし、spark-shell, pyspark シェルあるいは sparkR シェルの中で実行することができます。

SQL

Spark SQLの一つの使い方は、基本的なSQL構文またはHiveQLのどちらかを使って書かれたSQLクエリを実行することです。Spark SQLは既存のHiveインストレーションからデータを読み込むために使うこともできます。この起動をどう設定するかの詳細については、 Hive テーブル の章を参照してください。他のプログラミング言語の中でSQLを実行する場合、結果はDataFrameとして返されるでしょう。command-line あるいは JDBC/ODBCを使ってSQLインタフェースとやり取りすることもできます。

データフレーム

データフレームは、データの分散コレクションが名前付きの列に整理されたものです。リレーショナルデータベースでのテーブル、あるいはR/Python出のデータフレームと概念的に等価ですが、裏ではもっと最適化されています。データフレームはソース</a>の大きな配列から構築することが可能ですです: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRDD

データフレームAPIはScala, Java, Python および R で利用可能です。

データセット

A Dataset is a new experimental interface added in Spark 1.6 that tries to provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. データセットはJVMオブジェクトから構築することができ、関数的な変形を使って操作することができます(map, flatMap, filterなど)。

統一化されたデータセットAPIは、Scala および Javaの両方の中で使うことができます。PythonはまだデータセットAPIをサポートしていませんが、その動的な特性により、既に多くの恩恵が既に利用可能です (つまり、行のフィールドを簡単に名前row.columnNameによってアクセスすることができます)。完全なpythonのサポートは将来のリリースで追加されるでしょう。

開始

開始点: SQLContext

Spark SQLの全ての機能へのエントリーポイントはSQLContext クラス、あるいはそれの継承クラスの一つです。基本のSQLContextを生成するのに必要なのは、SparkContexxtだけです。

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

Spark SQLの全ての機能へのエントリーポイントはSQLContext クラス、あるいはそれの継承クラスの一つです。基本のSQLContextを生成するのに必要なのは、SparkContexxtだけです。

JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

Sparkの全ての関係する機能へのエントリーポイントはSQLContext クラス、あるいはそれの継承クラスの一つです。基本のSQLContextを生成するのに必要なのは、SparkContexxtだけです。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Sparkの全ての関係する機能へのエントリーポイントはSQLContextクラス、あるいはそれの継承クラスの一つです。基本のSQLContextを生成するのに必要なのは、SparkContexxtだけです。

sqlContext <- sparkRSQL.init(sc)

基本の SQLContext に加えて、基本のSQLContextによって提供される機能の上位集合を提供するHiveContextを生成することもできます。もっと複雑なHiveQLパーサを使ったクエリを書く機能、Hive UDFへのアクセス、およびHiveテーブルからのデータの読み込み機能を含む追加の機能HiveContextを使うために、既存のHiveセットアップをする必要ありません。SQLContextで利用可能な全てのデータソースも利用可能です。デフォルトのSparkビルドでは全てのHiveの依存を含むことを避けるために、HiveContextだけは別個にパッケージされています。アプリケーションにとってこの依存が問題なければ、HiveContextの使用はSparkの1.3リリースにお勧めです。将来のリリースは、HiveContextを使ってSQLContext を特徴等価にまで育てることに焦点を当てています。

クエリをパースするために使われるSQLの特定の変数はspark.sql.dialect オプションを使って選択することができます。このパラメータは、SQLContextsetConfメソッドあるいはSQL内でSET key=valueコマンドを使って変更することができます。SQLContextに関しては、この方言はSparkSQLによって提供されるシンプルなSQLパーサが使用する"sql"で利用可能です。HiveContextの中では、デフォルトは"hiveql"ですが、"sql"も利用可能です。HiveQLパーサはもっと複雑なため、ほとんどのユーザケースではこれがお勧めです。

データフレームの生成

SQLContextを使ってアプリケーションは既存の RDD、Hive テーブル、またはデータソースから データフレーム/c1>を生成することができます。

例として、以下ではJSONファイルの内容に基づいて データフレームを生成します。

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame to stdout
df.show()
sqlContext <- SQLContext(sc)

df <- jsonFile(sqlContext, "examples/src/main/resources/people.json")

# Displays the content of the DataFrame to stdout
showDF(df)

データフレームの操作

データフレームは ScalaJavaPythonおよびR での構造化データ操作のためのドメイン固有の言語を提供します。

以下は、データフレームを使った構造化データ処理の幾つかの基本的な例を含んでいます:

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

Pythonでは属性(df.age) あるいは、インデックス(df['age'])のどちらかを使ってデータフレームのカラムにアクセスすることができます。While the former is convenient for interactive data exploration, users are highly encouraged to use the latter form, which is future proof and won’t break with column names that are also attributes on the DataFrame class.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
df.show()
## age  name
## null Michael
## 30   Andy
## 19   Justin

# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
df.filter(df['age'] > 21).show()
## age name
## 30  Andy

# Count people by age
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1

DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

sqlContext <- sparkRSQL.init(sc)

# Create the DataFrame
df <- jsonFile(sqlContext, "examples/src/main/resources/people.json")

# Show the content of the DataFrame
showDF(df)
## age  name
## null Michael
## 30   Andy
## 19   Justin

# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
showDF(select(df, "name"))
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
showDF(select(df, df$name, df$age + 1))
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
showDF(where(df, df$age > 21))
## age name
## 30  Andy

# Count people by age
showDF(count(groupBy(df, "age")))
## age  count
## null 1
## 19   1
## 30   1

DataFrameで実施できる操作の種類の完全なリストは API ドキュメントを参照してください。

単純なカラムの参照および表現に加えて、データフレームは文字列操作、日付計算、一般的な数学操作などを含む関数の豊富なライブラリも持っています。完全なリストはDataFrame 関数リファレンスの中で利用可能です。

プログラム的にSQLクエリを実行

SQLContextsqlはアプリケーションがSQLクエリをプログラム的に実行することを可能にし、結果を DataFrameとして返します。

val sqlContext = ... // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")
SQLContext sqlContext = ... // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")
sqlContext <- sparkRSQL.init(sc)
df <- sql(sqlContext, "SELECT * FROM table")

データセットの生成

データセットはRDDに似ていますが、Javaのシリアライズ化あるいはKryoを使う代わりに、ネットワークを越えて処理あるいは転送するためにオブジェクトをシリアライズ化するために特別なEncoderを使用します。While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

RDDを使った内部操作

Spark SQL は既存のRDDをデータフレームに変換する二つの異なるメソッドを提供します。1つ目のメソッドはオブジェクトの特定のタイプを含むRDDのスキーマを推測するためにリフレクションを使用します。このリフレクションを基礎としたやり方は、Sparkアプリケーションを書いている時にスキーマを既にしっている場合には、結果としてより簡潔でよく動きます。

2つ目のデータフレームを生成するメソッドは、スキーマを構築し既存のRDDに適用することができるプログラム的なインタフェースです。このメソッドは多少冗長ですが、カラムとそれらのタイプが実行時まで分からない場合にデータフレームを構築することができます。

リフレクションを使ったスキーマの推測

Spark SQLのためのScalaインタフェースは、データフレームへのcaseクラスを含むRDDの自動的な変換をサポートします。caseクラスはテーブルのスキーマを定義します。caseクラスへの引数の名前はリフレクションを使って読み込まれ、カラムの名前となります。case cクラスは入れ子にすることもでき、SequenceあるいはArrayのような複雑なタイプを含むことができます。このRDDは明示的にデータフレームに変換され、テーブルとして登録されます。テーブルは後に続くSQL分の中で使うことができます。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

Spark SQLはJavaBeansのRDDからデータフレームへの自動的な変換をサポートします。 リフレクションを使って取得されたBeanInfoはテーブルのスキーマを定義します。現在のところ、Spark SQLは入れ子になったJavaBeans、あるいはListまたは配列のような複雑なタイプを含むJavaBeansをサポートしません。Serializableを実装し全てのフィールドについてgetterおよびsetterを持つクラスを生成することで、JavaBeanを生成することができます。

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

createDataFrameを呼び、JavaBeanのためのクラスオブジェクトを提供することで、既存のRDDにスキーマを適用することができます。

// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

Spark SQL はデータタイプを推測して、RowオブジェクトのRDDをデータフレームに変換することができます。行はkwargsとしてkey/valueペアをRowクラスに渡すことで構築することができます。このリストのキーはテーブルのカラムを定義し、最初の行を調べることでタイプが推測されます。現在のところ最初の行のみを調べるため、RDDの最初の行にデータのミスが無いことが重要です。将来のバージョンでは、JSONファイルで実施される推測のようなもっと多くのデータを調査することでスキーマをもっと完全に推測することを計画しています。

# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)

プログラム的なスキーマの指定

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

  1. 元のRDDからのRDDを生成する;
  2. ステップ1で生成したRDD内のRowの構造に一致するStructTypeによって表現されるスキーマを生成する。
  3. スキーマをSQLContextによって提供されるcreateDataFrameメソッドを使ってRowに適用します。

例えば:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)

When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

  1. 元のRDDからのRDDを生成する;
  2. ステップ1で生成したRDD内のRowの構造に一致するStructTypeによって表現されるスキーマを生成する。
  3. スキーマをSQLContextによって提供されるcreateDataFrameメソッドを使ってRowに適用します。

例えば:

import org.apache.spark.api.java.function.Function;
// Import factory methods provided by DataTypes.
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import Row.
import org.apache.spark.sql.Row;
// Import RowFactory.
import org.apache.spark.sql.RowFactory;

// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
  fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(
  new Function<String, Row>() {
    public Row call(String record) throws Exception {
      String[] fields = record.split(",");
      return RowFactory.create(fields[0], fields[1].trim());
    }
  });

// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

When a dictionary of kwargs cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

  1. 元のRDDから組あるいはリストのRDDを生成する;
  2. ステップ1で生成したRDD内の組あるいはリストの構造に一致するStructTypeによって表現されるスキーマを生成する。
  3. スキーマを<c2>SQLContext</c2>によって提供されるcreateDataFrameメソッドを使ってRDDに適用します。

例えば:

# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print(name)

データソース

Spark SQL はDataFrame インタフェースを使って様々なデータソース上での操作をサポートします。データフレームは通常のRDDとして操作、および一時テーブルとして登録することもできます。データフレームをテーブルとして登録することによりデータにSQLクエリを実行することができます。このセクションではSpark データソースを使ってデータをロードおよび保存する一般的なメソッドを説明し、その後ビルトインのデータソースのために利用可能な特定のオプションについて詳しく調べます。

一般的なロード/保存 機能

In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df <- loadDF(sqlContext, "people.parquet")
saveDF(select(df, "name", "age"), "namesAndAges.parquet")

手動でのオプションの指定

データソースに渡したいどのような特別のオプションと一緒にデータソースを手動で指定することもできます。データソースは完全修飾名(例えば、org.apache.spark.sql.parquet)で指定されますが、ビルトインのソースの場合はショート名(json, parquet, jdbc)を使うことができます。どのタイプのデータフレームもこの構文を使って他のタイプに変換することができます。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
df = sqlContext.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
df <- loadDF(sqlContext, "people.json", "json")
saveDF(select(df, "name", "age"), "namesAndAges.parquet", "parquet")

ファイル上のSQLを直接実行

ファイルをデータフレームにロードし、それに質問するためにread APIを使う代わりに、SQLを使ってファイルに直接質問することもできます。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
DataFrame df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df <- sql(sqlContext, "SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

セーブモード

セーブ操作は任意にSaveModeを取ることができます。これは既存のデータがあった場合にどう扱うかを指定します。これらのセーブモードがどのようなロックも使わないこと、およびアトミックでは無いことを理解しておくことは重要です。更に、Overwriteを実施する場合、新しいデータを書き込む前にデータは削除されるでしょう。

Scala/JavaAny Language意味
SaveMode.ErrorIfExists (default) "error" (default) データフレームをデータソースに保存する場合、もしデータが既に存在する場合は例外が投げられるでしょう。
SaveMode.Append "append" データフレームをデータソースに保存する場合、もし データ/テーブル が既に存在する場合は、データフレームの内容は既存のデータに追記されるでしょう。
SaveMode.Overwrite "overwrite" overwrite モードは、データフレームをデータソースに保存する場合に、もし データ/テーブル が存在する場合は既存のデータがデータフレームの内容によって上書きされるだろうことを意味します。
SaveMode.Ignore "ignore" ignore モードは、データフレームをデータソースに保存する場合に、もし データ が存在する場合はセーブ操作によってデータフレームの内容が保存されず、既存のデータが変更されないだろうことを意味します。これはSQLでの CREATE TABLE IF NOT EXISTS に似ています。

永続テーブルへの保存

HiveContextを使っている場合は、DataFramessaveAsTableコマンドを使って永続的なテーブルとしてセーブすることもできます。registerTempTable コマンドと違い、saveAsTable はデータフレームの内容を具体化し、HiveMetastore内のデータへのポインタを生成するでしょう。同じmetastoreに接続を続ける限り、永続的なテーブルはSparkプログラムが再起動した後もまだ存在しているでしょう。永続的なテーブルのためのデータフレームはSQLContext上でテーブル名を使って table メソッドを呼ぶことで生成することができます。

デフォルトでは、saveAsTable が"managed table"を生成するでしょう。つまり、データの場所がmetastoreによって制御されるでしょう。managed tables はテーブルが削除される場合に自動的にそれらのデータも削除するでしょう。

Parquet Files

Parquet は多くのほかのデータ処理システムでサポートされているコラム状のフォーマットです。Spark SQL は自動的に元のデータのスキーマを保持するParquetファイルの読み書きの両方のサポートを提供します。When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

プログラム的なデータのロード

上の例から以下のようにデータを使用します:

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// sqlContext from the previous example is used in this example.

DataFrame schemaPeople = ... // The DataFrame from the previous example.

// DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write().parquet("people.parquet");

// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();
# sqlContext from the previous example is used in this example.

schemaPeople # The DataFrame from the previous example.

# DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write.parquet("people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = sqlContext.read.parquet("people.parquet")

# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)
# sqlContext from the previous example is used in this example.

schemaPeople # The DataFrame from the previous example.

# DataFrames can be saved as Parquet files, maintaining the schema information.
saveAsParquetFile(schemaPeople, "people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- parquetFile(sqlContext, "people.parquet")

# Parquet files can also be registered as tables and then used in SQL statements.
registerTempTable(parquetFile, "parquetFile");
teenagers <- sql(sqlContext, "SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames <- map(teenagers, function(p) { paste("Name:", p$name)})
for (teenName in collect(teenNames)) {
  cat(teenName, "\n")
}
CREATE TEMPORARY TABLE parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

パーティションの発見

テーブルのパーティションはHiveのようなシステムで使われる一般的な最適化の方法です。パーティションされたテーブルの中でデータは各パーティションディレクトリのパスでエンコードされたパーティションカラム値を使って、通常異なるディレクトリに保持されます。Parquetデータソースはこれで自動的にパーティションの情報を発見および推測することができます。例えば、全ての以前に使用したパーティションデータを以下のディレクトリ構造を使って、パーションカラムとしてgender および country の2つの追加のカラムを持つパーティションされたテーブルに格納することができます。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

path/to/tableSQLContext.read.parquet または SQLContext.read.loadのどちらかに渡すことで、Spark SQL は自動的にパスからパーティション情報を抽出することができるでしょう。これで、返されるデータフレームのスキーマは以下のようになります:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

パーティションのカラムのデータタイプは自動的に推測されることに注意してください。現在のところ、数学的なデータタイプおよび文字列のタイプがサポートされます。パーティションカラムのデータタイプの自動的な推測をされたくない場合があるかも知れません。そのような場合のために、自動的なタイプの推測はspark.sql.sources.partitionColumnTypeInference.enabledで設定することができます。デフォルトは trueです。タイプの推測が無効な場合、パーティションカラムとして文字列タイプが使われるでしょう。

Spark 1.6.0から、パーティションの発見はデフォルトで指定されたパスの下のパーティションだけを見つけます。上の例では、もしユーザが path/to/table/gender=maleSQLContext.read.parquet または SQLContext.read.loadのどちらかに渡す場合に、gender はパーティションのカラムとして見なされないでしょう。ユーザがIf users need to specify the base path that partition discovery should start with, they can set basePath in the data source options. 例えば、path/to/table/gender=maleがデータのパスである場合、ユーザはbasePathpath/to/table/に設定します。 gender はパーティションカラムになるでしょう。

スキーマのマージ

ProtocolBuffer, Avro および Thrift のように、Parquet もスキーマの評価をサポートします。ユーザは単純なスキーマから開始し、必要に応じて次第にもっとカラムをスキーマに追加することができます。この場合、ユーザは異なるがお互いにスキーマの互換性がある複数のParquetファイルにするかも知れません。Parquetデータソースは現在では自動的にこのケースを検知し、全てのこれらのファイルのスキーマをマージすることができます。

スキーマのマージは比較的高価な操作であり、多くの場合必要ではないため、1.5.0以降からデフォルトではoffにしています。以下のようにして有効にするかも知れません

  1. (以下の例のように)Parquet ファイルを読む時にデータソースオプションmergeSchematrue に設定、あるいは
  2. グローバルSQLオプションspark.sql.parquet.mergeSchematrueに設定。
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
# sqlContext from the previous example is used in this example.

# Create a simple DataFrame, stored into a partition directory
df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df1.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))
                                   .map(lambda i: Row(single=i, triple=i * 3)))
df2.write.parquet("data/test_table/key=2")

# Read the partitioned table
df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)
# sqlContext from the previous example is used in this example.

# Create a simple DataFrame, stored into a partition directory
saveDF(df1, "data/test_table/key=1", "parquet", "overwrite")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
saveDF(df2, "data/test_table/key=2", "parquet", "overwrite")

# Read the partitioned table
df3 <- loadDF(sqlContext, "data/test_table", "parquet", mergeSchema="true")
printSchema(df3)

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)

Hive metastore Parquet table conversion

Hive metastore Parquetテーブルへの読み書きの時に、Spark SQLはより良いパフォーマンスのためにHive SerDeの代わりに独自のParquetサポートを使おうとするでしょう。この挙動は spark.sql.hive.convertMetastoreParquet設定によって制御され、デフォルトで作動しています。

Hive/Parquet Schema Reconciliation

テーブルスキーマ処理の観点から、HiveとParquetの間には2つの主要な違いがあります。

  1. Hive は大文字小文字を区別しませんが、Parquetは区別します。
  2. Hiveは全てのカラムがnull可能ですが、Parquetには重要な意味があります。

この理由により、Hive metastore Parquet テーブルをSpark SQL Parquet テーブルに変換する場合に、Hive metastore スキーマと Parquet スキーマを調停する必要があります。調停ルールは以下の通りです:

  1. 両方のスキーマで同じ名前を持つフィールドは、null可能かどうかに関係なく同じデータタイプでなければなりません。調停フィールドはParquet側のデータタイプを持たなければなりません。つまりnull可能かどうかが考慮されます。

  2. 調停スキーマはHive metastoreスキーマで定義されるそれらのフィールドを正確に含まなければなりません。

    • Parquetスキーマにのみ現れる全てのフィールドは調停スキーマの中で落とされます。
    • Hive metastoreスキーマにのみ現れる全てのフィールドは調停スキーマの中でnull可能なフィールドとして追加されます。

メタデータのリフレッシュ

Spark SQL はパフォーマンスの向上のためにParquet metadetaとしてキャッシュされます。Hive metastore Parquet テーブルの変換が有効な場合、それらの変換されたテーブルのmetadataもキャッシュされます。もしそれらのテーブルがHiveまたは他の外部のツールで更新された場合、metadataの一貫性のために手動でそれらを更新する必要があります。

// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
# sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
REFRESH TABLE my_table;

設定

Parquetの設定はSQLContextsetConf メソッドを使うか、SQLを使ってSET key=value を実行することで行うことができます。

プロパティ名デフォルト意味
spark.sql.parquet.binaryAsString false 他の幾つかのParquet生成システム、特にImpala, Hive および Spark SQLの古いバージョンは、Parquetスキーマを書き出す時にバイナリデータと文字列の区別しません。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにバイナリデータを文字列として扱うように指示します。
spark.sql.parquet.int96AsTimestamp true 幾つかのParquet生成システム、特にImparaおよびHiveは、タイムスタンプをINT96に格納します。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにINT96データをタイムスタンプとして解釈するように指示します。
spark.sql.parquet.cacheMetadata true Parquetスキーマメタデータのキャッシュの機構をオンにします。静的なデータのクエリを高速化することができます。
spark.sql.parquet.compression.codec gzip Parquetファイルを書き込む時に圧縮符号化を使うように設定します。利用可能な値には、uncompressed, snappy, gzip, lzo が含まれます。
spark.sql.parquet.filterPushdown true trueに設定された場合は、Parquet filter push-down 最適化を有効化します。
spark.sql.hive.convertMetastoreParquet true falseに設定した場合は、Spark SQLはparquetテーブルのためにビルトインサポートの代わりにHive SerDeを使用するでしょう。
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.
ParquetOutputCommitter

Parquetによって使用される出力comitterクラス。指定されたクラスはorg.apache.hadoopのサブクラスである必要があります。
mapreduce.OutputCommitter
. 一般的には、それはorg.apache.parquet.hadoop.ParquetOutputCommitterの子クラスにもなります。

注意:

  • spark.speculation がオンの場合、このオプションは自動的に無視されます。
  • このオプションはSparkのSQLConfではなく、Hadoopの設定で設定されなければなりません。
  • このオプションはspark.sql.sources
    outputCommitterClass
    を上書きます。

Spark SQL は組み込みのorg.apache.spark.sqlから来ます。
parquet.DirectParquetOutputCommitter
, which can be more efficient then the default Parquet output committer when writing data to S3.

spark.sql.parquet.mergeSchema false

trueの場合、Parquetデータソースは全てのデータファイルから集められたスキーマをマージします。そうでなければ、スキーマは、サマリファイル、あるいはサマリファイルが利用できない場合はランダムデータファイルから取り出されます。

JSON データセット

Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。この変換は、文字列のRDDあるいはJSONファイルのどちらかでSQLContext.read.json()を使って行うことができます。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。この変換は、文字列のRDDあるいはJSONファイルのどちらかでSQLContext.read().json()を使って行うことができます。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method.
people.printSchema();
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
  "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。この変換は、JSONファイル上のSQLContext.read.json()を使って行うことができます。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

# sc is an existing SparkContext.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
people = sqlContext.read.json("examples/src/main/resources/people.json")

# The inferred schema can be visualized using the printSchema() method.
people.printSchema()
# root
#  |-- age: integer (nullable = true)
#  |-- name: string (nullable = true)

# Register this DataFrame as a table.
people.registerTempTable("people")

# SQL statements can be run by using the sql methods provided by `sqlContext`.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string.
anotherPeopleRDD = sc.parallelize([
  '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。jsonFile を使うと、データを各ファイルの各行がJSONオブジェクトであるJSONファイルのディレクトリからデータをロードします。

json ファイルとして提供されるファイルは一般的なJSONファイルではないことに注意してください。各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。 結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

# sc is an existing SparkContext.
sqlContext <- sparkRSQL.init(sc)

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json"
# Create a DataFrame from the file(s) pointed to by path
people <- jsonFile(sqlContext, path)

# The inferred schema can be visualized using the printSchema() method.
printSchema(people)
# root
#  |-- age: integer (nullable = true)
#  |-- name: string (nullable = true)

# Register this DataFrame as a table.
registerTempTable(people, "people")

# SQL statements can be run by using the sql methods provided by `sqlContext`.
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")
CREATE TEMPORARY TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path "examples/src/main/resources/people.json"
)

SELECT * FROM jsonTable

Hive テーブル

Spark SQLはApache Hiveに格納されたデータの読み書きもサポートします。しかし、Hiveは多くの依存性があるため、デフォルトのSparkアセンブリには含まれません。Hiveサポートは Sparkのビルドに-Phive および -Phive-thriftserver フラグを追加することで有効になります。このコマンドはHiveを含む新しいアセンブリjarをビルドします。Hiveに格納されているデータにアクセスするために全てのワーカーノードはHiveシリアライズおよびデシリアライズライブラリ(SerDes)へのアクセスが必要になるため、このHiveアセンブリjarは全てのワーカーノード上にも存在しなければなりません。

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), hdfs-site.xml (for HDFS configuration) file in conf/. Please note when running the query on a YARN cluster (cluster mode), the datanucleus jars under the lib_managed/jars directory and hive-site.xml under conf/ directory need to be available on the driver and all executors launched by the YARN cluster. The convenient way to do this is adding them through the --jars option and --file option of the spark-submit command.

Hiveを使うには、HiveContextを構築しなければなりません。これはSQLContextを継承し、HiveQLを使ってメタデータからテーブルを検索しクエリを書き込むサポートを追加します。既存のHiveデプロイメントを持たないユーザは、HiveContextを生成することができます。When not configured by the hive-site.xml, the context automatically creates metastore_db in the current directory and creates warehouse directory indicated by HiveConf, which defaults to /user/hive/warehouse. Note that you may need to grant write privilege on /user/hive/warehouse to the user who starts the spark application.

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Hiveを使うには、HiveContextを構築しなければなりません。これはSQLContextを継承し、HiveQLを使ってメタデータからテーブルを検索しクエリを書き込むサポートを追加します。HiveContextsqlメソッドとは別に、HiveQLでクエリを表現できるhqlメソッドも提供されます。

// sc is an existing JavaSparkContext.
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc);

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL.
Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

Hiveを使うには、HiveContextを構築しなければなりません。これはSQLContextを継承し、HiveQLを使ってメタデータからテーブルを検索しクエリを書き込むサポートを追加します。

# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = sqlContext.sql("FROM src SELECT key, value").collect()

Hiveを使うには、HiveContextを構築しなければなりません。これはSQLContextを継承し、HiveQLを使ってメタデータからテーブルを検索しクエリを書き込むサポートを追加します。

# sc is an existing SparkContext.
sqlContext <- sparkRHive.init(sc)

sql(sqlContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql(sqlContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- collect(sql(sqlContext, "FROM src SELECT key, value"))

Hiveメタソースの異なるバージョンの相互影響

SparkのHiveサポートの最も重要な部分の一つにHiveメタストアとの対話があります。これによりSpark SQLはHiveのテーブルのメタデータにアクセスすることができます。Spark 1.4.0から、Spark SQLの単独バイナリビルドが以下で説明する設定を使って異なるバージョンのHiveメタストアにクエリするために使うことができるようになりました。Hiveのバージョンの非依存性はmetastoreについて語られるもので、内部的にはSparkSQLはHive 1.2.1に対してコンパイルされ、それらのクラスを内部的な実行に使用することに注意してください。(serdes, UDFs, UDAFsなど)

以下のオプションがメタデータを扱うのに使われるHiveのバージョンを設定するために使うことができます。

プロパティ名デフォルト意味
spark.sql.hive.metastore.version 1.2.1 Hiveメタストアのバージョン使用可能なオプションは 0.12.0 から 1.2.1です。
spark.sql.hive.metastore.jars ビルトイン HiveMetastoreClientをインスタンス化するために使われるべきjarの場所。このプロパティは以下の3つのオプションのどれか一つです:
  1. ビルトイン
  2. -Phiveが有効な場合は、SparkアセンブリjarにバンドルされているHive 1.2.1を使います。このオプションが選択された場合、spark.sql.hive.metastore.version1.2.1 あるいは未定義のどちらかでなければなりません。
  3. maven
  4. Mavenリポジトリからダウンロードされた指定のバージョンのHive jarを使用します。この設定は一般的にプロダクション デプロイメントのためにはお勧めされません。
  5. JVMのための標準フォーマットのクラスパス。このクラスパスはHadoopの正しいバージョンを含む全てのHiveおよびその依存物を含まなければなりません。これらのjarはドライバー上にのみ存在する必要がありますが、もしyarnクラスタモードで動かしている場合は、それらがアプリケーションと一緒にパッケージされるようにしなければなりません。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

Spark SQLとHiveの特定のバージョンの間で共有されるクラスローダを使ってロードされるべきカンマ区切りのクラスプリフィックスのリスト。共有されるべきクラスの例はJDBCドライバで、メタストアと対話するために必要とされます。共有される必要がある他のクラスは、既に共有されているクラスとやり取りするためのものです。例えば、log4jによって使われる独自のアペンダーです。

spark.sql.hive.metastore.barrierPrefixes (empty)

Spark SQLと通信をする各バージョンのHiveのために明示的にロードされなければならないクラスのプリフィックスのカンマ区切りのリスト。例えば、一般的なprefixで定義されたHive UDFは共有されるでしょう(例えば、org.apache.spark.*)。

JDBC から他のデータベースへ

Spark SQLはJDBCを使ってほかのデータベースからデータを読み込むことができるデータソースも含みます。この機能はJdbcRDDを使う上で好まれるべきでしょう。なぜなら結果はデータフレームとして返され、それらはSpark SQLの中で簡単に処理することができるか他のデータソースと繋げることができるからです。 JDBCデータソースはユーザにClassTagの提供を要求しないため、JavaあるいはPythonから簡単に使うことができます。(これは他のアプリケーションがSparkSQLを使ってクエリを実行することができるSpark SQL JDBCサーバと異なることに注意してください)。

開始するためには、sparkのクラスパス上に特定のデータベースのためのJDBCドライバを含む必要があるでしょう。例えば、Sparkシェルからpostgresに接続するには、以下のコマンドを実行するかもしれません:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

リモートデータベースからのテーブルは、データソースAPIを使ってデータフレームあるいはSpark SQLの一時テーブルとしてロードすることがdけいます。以下のオプションがサポートされます:

プロパティ名意味
url 接続するための JDBC URL
dbtable 読み込まれる必要があるJDBCテーブル。SQLクエリのFROM句で有効なものを全て使用できることに注意してください。例えば、完全なテーブルの代わりに、丸括弧内のサブクエリも使うことができます。
driver このURLに接続するために必要なJDBCドライバのクラス名。このクラスはJDBCコマンドを実行する前にドライバーが自身をJDBCサブシステムに登録するために、マスターおよびワーカー嬢にロードされるでしょう。
partitionColumn, lowerBound, upperBound, numPartitions これらのオプションは、いずれかが指定される場合は全て指定されなければなりません。複数のワーカーから並行して読み込む時は、それらはどうやってテーブルを分割するかを説明します。partitionColumn は問題となっているテーブルからの数値のカラムでなければなりません。lowerBound および upperBound はテーブル内の行をフィルタするために使われるのではなく、パーティションの影響範囲を決めるためだけに使われることに注意してください。つまりテーブル内の全ての行が分割され返されるでしょう。
fetchSize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");

DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();
df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load()
df <- loadDF(sqlContext, source="jdbc", url="jdbc:postgresql:dbserver", dbtable="schema.tablename")
CREATE TEMPORARY TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename"
)

トラブルシューティング

  • JDBCドライバクラスはクライアントセッションおよび全てのexecutorの根本的なクラスローダから見えなければなりません。これはJavaのDriverManagerクラスが、接続を開こうとする時に根本的なクラスローダから見えない全てのドライバを無視することになるセキュリティチェックを行うからです。これをする一つの簡単な方法が、全てのワーカーノード上のcompute_classpath.shをドライバーJARを含むように修正することです。
  • H2のような幾つかのデータベースは全ての名前を大文字に変換します。Spark SQLでそれらの名前を参照するには大文字を使う必要があるでしょう。

パフォーマンス チューニング

ちょっとした次善策として、メモリにデータをキャッシュ、あるいは幾つかの実験的なオプションを調整することでパフォーマンスを改善することができます。

メモリへのデータのキャッシュ

Spark SQL は sqlContext.cacheTable("tableName") あるいはdataFrame.cache()を呼ぶことでインメモリのコラム形式のフォーマットを使ってテーブルをキャッシュすることができます。そして、Spark SQLは必要なカラムだけをスキャンし、メモリの使用量とGCの圧力を最小化するために圧縮を自動的に調整するでしょう。メモリからテーブルを削除するために sqlContext.uncacheTable("tableName") を呼ぶことができます。

メモリ内キャッシングの設定はSQLContextsetConfメソッドあるいは SQLを使ってSET key=valueコマンドを実行することで行うことができます。

プロパティ名デフォルト意味
spark.sql.inMemoryColumnarStorage.compressed true trueに設定した場合はSpark SQLはデータの統計に基づいて各カラムの圧縮コーディックを自動的に選択するでしょう。
spark.sql.inMemoryColumnarStorage.batchSize 10000 カラムキャッシュのためのバッチのサイズを制御します。バッチのサイズを大きくするとメモリの利用率と圧縮が改善できますが、データをキャッシュする時にOOMのリスクがあります。

他の設定オプション

以下のオプションもクエリ実行のパフォーマンスを調整するために使用することができます。これらのオプションはもっと多くの最適化が自動的に行われるため、将来のリリースでは非推奨になるかもしれません。

プロパティ名デフォルト意味
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan が実行された場合、現在のところ統計はHiveメタストアテーブルのみがサポートされることに注意してください。
spark.sql.tungsten.enabled true trueの場合、明示的にメモリを管理および動的に表現の評価のためにバイトコードを生成する、最適化されたTungsten physical実行バックエンドを使用します。
spark.sql.shuffle.partitions 200 joinあるいは集約のためにデータをシャッフルする時に使用するパーティションの数を設定します。

分散SQLエンジン

Spark SQLはJDBC/ODBCあるいはコマンドラインインタフェースを使って分散型クエリエンジンとして振舞うこともできます。このモードでは、エンドユーザあるいはアプリケーションがコードを書くこと無しにSQLクエリを直接実行するためにSpark SQLと直接やり取りをすることができます。

Thrift JDBC/ODBC サーバの実行

ここで実装されているThrift JDBC/ODBC サーバは、Hive 1.2.1のHiveServer2に対応します。SparkあるいはHive 1.2.1に同梱されるbeelineスクリプトを使ってJDBCサーバをテストすることができます。

JDBC/ODBCサーバを開始するには、Sparkディレクトリで以下を実行します:

./sbin/start-thriftserver.sh

このスクリプトはHiveプロパティを指定する--hiveconf を加えて全ての bin/spark-submit コマンドラインを受け付けます。全ての利用可能なオプションのリストのために./sbin/start-thriftserver.sh --help を実行するかも知れません。デフォルトでは、サーバはlocalhsot:10000をlistenします。以下のように環境変数を使ってこの挙動を上書き、例えば:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

あるいはシステムプロパティを使って上書き:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

これで、Thrift JDBC/ODBC サーバをテストするために beelineを使うことができます:

./bin/beeline

以下のようにしてbeeline内でJDBC/ODBCに接続します:

beeline> !connect jdbc:hive2://localhost:10000

Beeline はユーザ名とパスワードを尋ねるでしょう。セキュアで無いモードでは、単にマシーン上のユーザ名を入力し空のパスワードを入力します。セキュアモードのためには、beeline ドキュメント にある手順に従ってください。

Hiveの設定は、hive-site.xmlconf/内の core-site.xml および hdfs-site.xml ファイルに置き換えることで行われます。

Hiveに付属するbeeline スクリプトを使うかも知れません。

Thrift JDBC サーバはHTTPトランスポート上のThrift RPC メッセージの送信もサポートします。以下の設定を使ってシステムプロパティあるいはconf/内のhive-site.xml ファイルの中でHTTPモードを有効にします:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

テストするには、beelineを使って以下のようにしてhttpモードのJDBC/ODBCサーバに接続します:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Spark SQL CLIの実行

Spark SQL CLI はローカルモードでHiveメタストアサービスをを実行するのに便利なツールで、コマンドラインからの入力のクエリを実行します。Spark SQL CLIはThrift JDBCサーバとやりとりできないことに注意してください。

Spark SQL CLIを開始するには、Spark ディレクトリで以下を実行してください:

./bin/spark-sql

Hiveの設定は、hive-site.xmlconf/内の core-site.xml および hdfs-site.xml ファイルに置き換えることで行われます。全ての利用可能なオプションの完全なリストのために、./bin/spark-sql --help を実行するかもしれません。

移行ガイド

Spark SQL 1.5から1.6へのアップグレード

  • Spark 1.6から、デフォルトでThriftサーバが複数セッションモードで実行します。つまり、各JDBC/ODBC 接続はそれら独自のSQL設定および一次的な関数の登録のコピーを持つことを意味します。キャッシュされたテーブルはまだ共有されています。古い1つのセッションモードでThriftサーバを実行したい場合、オプションspark.sql.hive.thriftServer.singleSessiontrueに設定してください。このオプションをspark-defaults.confに追加するか、それを--confを使ってstart-thriftserver.sh に渡すかのどちらかかも知れません。
./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...

Spark SQL 1.4から1.5へのアップグレード

Spark SQL 1.3から1.4へのアップグレード

DataFrame データの reader/writer インタフェース

ユーザのフィードバックにより、(SQLContext.read)の中のデータを読み込み、(DataFrame.write)へ書き込むための新しくもっと柔軟なAPIを生成しました。古いAPIは非推奨にされました (例えば、SQLContext.parquetFile, SQLContext.jsonFile)。

SQLContext.read ( Scala, Java, Python ) および DataFrame.write ( Scala, Java, Python ) についての詳細な情報はAPIドキュメントを見てください。

グルーピング カラムを保持するDataFrame.groupBy

ユーザフィードバックに基づいて、DataFrameの結果のグルーピング カラムを保持するためにDataFrame.groupBy().agg() のデフォルトの挙動を変更しました。1.3での挙動を維持するためには、spark.sql.retainGroupColumnsfalseに設定してください。

// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");
import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg("department"), func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

Spark SQL 1.0-1.2 から 1.3 へのアップグレード

Spark 1.3. では、Spark SQLから "Alpha"のラベルが取り除かれ、この一環として利用可能なAPIの整理が行われました。Spark1.3以降は、Spark SQLは1.Xシリーズの他のリリースとバイナリ互換性を提供するでしょう。この互換性の保証には、明示的に安定していないと印を付けられている(つまり、DeveloperAPI あるいは Experimental)APIは含まれません。

SchemaRDDからデータフレームへの変更

Spark SQL 1.3にアップグレードした時にユーザが気づく最も大きな変更は、SchemaRDDDataFrame に名前が変更されることです。これは、データフレームがもはやRDDから直接継承されないが、RDD自身の実装でRDDが提供するほとんどの機能を代わりに提供するため、重要です。データフレームは.rdd メソッドを呼ぶことでRDDに変換することも可能です。

Scalaでは、いくつかのユースケースのためのソースの互換性を提供するために、SchemaRDD から DataFrame へのタイプのエイリアスがあります。それでもDataFrame を代わりに使うためにコードを更新することをお勧めします。Java および Python ユーザはコードを更新する必要は無いでしょう。

JavaとScala APIの統一

Spark 1.3 以降には、Scala APIを模倣した別個のJava互換クラス (JavaSQLContext および JavaSchemaRDD) があります。Spark 1.3 ではJavaAPIおよびScala APIは統合されました。どちらかの言語のユーザは SQLContext および DataFrameを使用しなければなりません。一般的にこれらのクラスは両方の言語で使用可能なタイプ(つまり、言語特有のコレクションの代わりに Array)を使用しようとします。一般的なタイプが無い場合(例えば、クロージャーあるいはMapを渡す)、代わりに関数の上書きが使われます。

更に、Java固有のタイプのAPIが削除されました。ScalaおよびJavaのユーザはプログラム的にスキーマを説明するためにorg.apache.spark.sql.types にあるクラスを使わなければなりません。

dslパッケージの明示的な交換と削除の分離 (Scalaのみ)

import sqlContext._ から始まるSpark 1.3より前の多くのコード例、sqlContextからの全ての関数はスコープに入れられました。Spark 1.3では、RDDから DataFrameへの変換のための暗黙的な変換は、SQLContextの中に隔離しました。今はユーザは import sqlContext.implicits._を書かなければなりません。

Additionally, the implicit conversions now only augment RDDs that are composed of Products (i.e., case classes or tuples) with a method toDF, instead of applying automatically.

When using function inside of the DSL (now replaced with the DataFrame API) users used to import org.apache.spark.sql.catalyst.dsl. 代わりに公開データフレーム関数APIが使われるべきです: import org.apache.spark.sql.functions._.

DataTypeのための org.apache.spark.sql 内のタイプエイリアスの削除 (Scala のみ)

Spark 1.3 は、DataTypeのための基本sqlパッケージにあったタイプのエイリアスを削除しました。ユーザはorg.apache.spark.sql.typesにあるクラスを代わりにインポートしなければなりません。

UDF 登録はsqlContext.udfに移動しました (Java & Scala)

UDFを登録するために使われる関数、データフレーム DSLあるいはSQLの両方で使われる、は、SQLContext内のudfオブジェクトに移動されました。

sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python のUDF登録は変更されません。

Python DataType はもうシングルトンではありません

Pythonでデータタイプを使う場合は、シングルトンを参照する代わりにそれら(つまり StringType()) を構築する必要があるでしょう。

Sharkユーザの移設ガイド

スケジューリング

JDBCクライアントセッションのためのFair Scheduler プールを設定するために、ユーザは spark.sql.thriftserver.scheduler.pool 変数を設定することができます。

SET spark.sql.thriftserver.scheduler.pool=accounting;

Reducer number

Sparkでは、デフォルトのreducer数は1で、mapred.reduce.tasksプロパティによって制御されます。Spark SQLはspark.sql.shuffle.partitionsのためにこのプロパティを非推奨にします、デフォルトの値は200です。ユーザはこのプロパティをSETを使ってカスタマイズするかも知れません:

SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;

デフォルトの値を上書きするためにhive-site.xml内にこのプロパティも設定するかもしれません。

今のところ、mapred.reduce.tasks プロパティはまだ認識され、自動的に spark.sql.shuffle.partitionsに変換されます。

キャッシング

shark.cache テーブルプロパティはもう存在しません。 名前が_cachedで終わるテーブルはもう自動的にキャッシュされません。ユーザがテーブルキャッシュを制御できるように、代わりにCACHE TABLE および UNCACHE TABLEを提供します。

CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;

注意: CACHE TABLE tbl は今はデフォルトが eagerlazyではありません。もう手動でのキャッシュの具体化の起動は必要ありません。

Spark SQLはSpark 1.2.0からユーザがテーブルのキャッシングをlazyにするかしないかを制御することができる構文を新しく導入しました。

CACHE [LAZY] TABLE [AS SELECT] ...

幾つかのキャッシングに関する機能がもうサポートされません:

Apache Hiveとの互換性

Spark SQL はHiveメタソース、SerDesおよびiUDFと互換性があるように設計されています。現在のところ、Hive SerDes と UDFs はHive 1.2.1に基づいていて、Spark SQLは異なるバージョンのHive Metastoreに接続されることができます (0.12.0 から 1.2.1)[Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore))も見てください。

既存のHiveウェアハウスへのデプロイ

Spark SQL Thrift JDBC サーバは"追加設定無しで"既存のHiveインストレーションと互換性があるように設計されています。既存のHiveメタストアを修正したり、あるいはデータの配置またはテーブルのパーティションを変更する必要はありません。

サポートされるHive機能

Spark SQLは以下のようなHiveの機能の大部分をサポートします:

サポートされないHive機能

以下はまだサポートされないHive機能のリストです。それらの機能のほとんどはHiveデプロイメントでほとんど使われません。

主要なHive機能

難解なHive機能

Hive 入力/出力 フォーマット

Hive 最適化

扱いにくいHive最適化はまだSparkに含まれていません。(インデックスのよな)それらのいくつかはSpark SQLのインメモリ計算モデルに重要ではありません。他はSpark SQLの将来のリリースに組み込まれます。

リファレンス

データの種類

Spark SQL と DataFrames は以下のデータタイプをサポートします:

Spark SQLの全てのデータタイプは org.apache.spark.sql.typesパッケージの中にあります。以下のようにしてアクセスすることができます

import  org.apache.spark.sql.types._
データタイプ Scalaでの値のタイプ データタイプにアクセスあるいは生成するためのAPI
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
注意: containsNull のデフォルト値は trueです。
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
注意: valueContainsNull のデフォルト値は trueです。
StructType org.apache.spark.sql.Row StructType(fields)
注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのScalaでの値のタイプ(例えば、データタイプ IntegerTypeのStructFieldのためのInt) StructField(name, dataType, nullable)

Spark SQLの全てのデータタイプは org.apache.spark.sql.typesのパッケージ内にあります。データタイプにアクセスあるいは生成するには、org.apache.spark.sql.types.DataTypesで提供されるファクトリーメソッドを使ってください。

データタイプ Javaでの値タイプ データタイプにアクセスあるいは生成するためのAPI
ByteType byte あるいは Byte DataTypes.ByteType
ShortType short あるいは Short DataTypes.ShortType
IntegerType int あるいは Integer DataTypes.IntegerType
LongType long あるいは Long DataTypes.LongType
FloatType float あるいは Float DataTypes.FloatType
DoubleType double あるいは Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType()
DataTypes.createDecimalType(precision, scale).
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
BooleanType boolean あるいは Boolean DataTypes.BooleanType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DataTypes.DateType
ArrayType java.util.List DataTypes.createArrayType(elementType)
注意: containsNullの値はtrueでしょう。
DataTypes.createArrayType(elementType, containsNull).
MapType java.util.Map DataTypes.createMapType(keyType, valueType)
注意: valueContainsNullの値はtrueでしょう。
DataTypes.createMapType(keyType, valueType, valueContainsNull)
StructType org.apache.spark.sql.Row DataTypes.createStructType(fields)
注意: fields はListあるいはStructFieldsの配列です。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのJavaでの値のタイプ(例えば、データタイプ IntegerTypeのStructFieldのためのint) DataTypes.createStructField(name, dataType, nullable)

Spark SQLの全てのデータタイプは pyspark.sql.typesパッケージ内にあります。以下のようにしてアクセスすることができます

from pyspark.sql.types import *
データタイプ Pythonでの値タイプ データタイプにアクセスあるいは生成するためのAPI
ByteType int あるいは long
注意: 実行時に数値は1バイトの符号あり数値に変換されるでしょう。数値は-128から127の範囲にあるようにしてください。
ByteType()
ShortType int あるいは long
注意: 実行時に数値は2バイトの符号あり数値に変換されるでしょう。数値は-32768から32767の範囲にあるようにしてください。
ShortType()
IntegerType int あるいは long IntegerType()
LongType long
注意: 実行時に数値は8バイトの符号あり数値に変換されるでしょう。数値は-9223372036854775808 から 9223372036854775807の範囲にあるようにしてください。そうでなければ、データをdecimal.Decimalに変換し、DecimalTypeを使ってください。
LongType()
FloatType float
注意: 実行時に数値は4バイトの符号あり単精度浮動小数点に変換されるでしょう。
FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType リスト、組、あるいは配列 ArrayType(elementType, [containsNull])
注意: containsNull のデフォルト値は Trueです。
MapType dict MapType(keyType, valueType, [valueContainsNull])
注意: valueContainsNull のデフォルト値はTrueです。
StructType リストあるいは組 StructType(fields)
注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのPythonでの値のタイプ(例えば、データタイプIntegerTypeのStructFieldのためのInt) StructField(name, dataType, nullable)
データタイプ Rでの値のタイプ データタイプにアクセスあるいは生成するためのAPI
ByteType integer
注意: 実行時に数値は1バイトの符号あり数値に変換されるでしょう。数値は-128から127の範囲にあるようにしてください。
"byte"
ShortType integer
注意: 実行時に数値は2バイトの符号あり数値に変換されるでしょう。数値は-32768から32767の範囲にあるようにしてください。
"short"
IntegerType integer "integer"
LongType integer
注意: 実行時に数値は8バイトの符号あり数値に変換されるでしょう。数値は-9223372036854775808 から 9223372036854775807の範囲にあるようにしてください。そうでなければ、データをdecimal.Decimalに変換し、DecimalTypeを使ってください。
"long"
FloatType numeric
注意: 実行時に数値は4バイトの符号あり単精度浮動小数点に変換されるでしょう。
"float"
DoubleType numeric "double"
DecimalType サポートされません。 サポートされません。
StringType character "string"
BinaryType raw "binary"
BooleanType logical "bool"
TimestampType POSIXct "timestamp"
DateType Date "date"
ArrayType vector あるいは list list(type="array", elementType=elementType, containsNull=[containsNull])
注意: containsNull のデフォルト値は Trueです。
MapType environment list(type="map", keyType=keyType, valueType=valueType, valueContainsNull=[valueContainsNull])
注意: valueContainsNull のデフォルト値はTrueです。
StructType 名前付きリスト list(type="struct", fields=fields)
注意: fields はStructFieldsのSeqです。また、同じ名前の2つのフィールドは許されません。
StructField このフィールドのデータタイプのRでの値のタイプ(例えば、データタイプIntegerTypeのStructFieldのためのinteger) list(name=name, type=dataType, nullable=nullable)

NaN Semantics

標準の浮動小数点の記号に正確の一致しない float あるいは double タイプを扱う時に、not-a-number (NAN)の特別な処理があります。具体的には:

TOP
inserted by FC2 system