Hive テーブル

Spark SQLはApache Hiveに格納されたデータの読み書きもサポートします。しかし、Hiveは多くの依存性を持つため、これらの依存物はデフォルトのSparkの配布物の中に含まれません。もしHiveの依存物をクラスパス上で見つけることができた場合、Sparkはそれらを自動的にロードするでしょう。Hiveに格納されているデータにアクセスするために全てのワーカーノードはHiveシリアライズおよびデシリアライズライブラリ(SerDes)へのアクセスが必要になるため、これらのHiveの依存物は全てのワーカーノード上にも存在しなければなりません。

Hiveの設定は、hive-site.xml, core-site.xml (セキュリティ設定) および hdfs-site.xml (HDFS 設定) ファイルを conf/に置くことで行われます。

Hiveを動かす場合、永続Hiveメタストアへの接続性、Hive serdesのサポート、および Hiveのユーザ定義関数を含めて、Hiveをサポートする SparkSession をインスタンス化しなければなりません。既存のHiveデプロイメントを持たないユーザは、Hiveサポートを有効にすることができます。hive-site.xmlによって設定されていない場合、コンテキストが自動的に現在のディレクトリに metastore_db を作成し、spark.sql.warehouse.dirによって設定されるディレクトリを生成します。このディレクトリはsparkアプリケーションが開始される現在のディレクトリ内をデフォルトのspark-warehouseにします。Spark 2.0.0からhive-site.xml 内のhive.metastore.warehouse.dir プロパティが非推奨であることに注意してください。代わりにwarehouse内のデータベースのデフォルトの場所を指定するためにspark.sql.warehouse.dirを使います。sparkを開始するユーザへ書き込み権限を与える必要があるかも知れません。

import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" で見つかります。
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
    Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java" で見つかります。
from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|
# ...
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/hive.py" で見つかります。

Hiveを動かす場合、HiveサポートのSparkSessionをインスタンス化しなければなりません。これはメタソース内のテーブルを見つけ、HiveSQLを使ってクエリを書き込むためのサポートを追加します。

# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。

Hiveテーブルのためのストレージ形式の指定

Hiveテーブルを作成する時、このテーブルがどのようにデータをファイルシステムから/へ読み込む/書き込むかを定義する必要があります。つまり“入力フォーマット” と “出力フォーマット”。このテーブルがどのようにデータを行にデシリアライズ、あるいは行をデータにシリアライズするかを定義する必要もあります。つまり “serde”。以下のオプションはストレージのフォーマット(“serde”, “input format”, “output format”)を指定するために使うことができます。例えば、CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。デフォルトでは、平文のテキストとしてテーブルファイルを読むでしょう。テーブル作成時にHiveストレージ ハンドラはサポートされておらず、Hive側でストレージハンドラを使ってテーブルを作成することができ、それを読むためにSpark SQLを使うことに注意してください。

プロパティ名意味
fileFormat fileFormat は "serde", "input format" および "output format" を含むストレージフォーマットの仕様のパッケージの種類です。現在のところ、6つのfileFormatsがサポートされます: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' および 'avro'。
inputFormat, outputFormat これらの2つのオプションは文字列リテラルとして InputFormat および OutputFormat クラスに対応する名前を指定します。例えば、org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。これらの2つのオプションはペアで現れなければならず、すでに fileFormat オプションを指定していた場合でもそれらを指定することはできません。
serde このオプションはserdeクラスを指定します。fileFormat オプションが指定された場合、指定された fileFormat がすでにserdeの情報を含む場合はこのオプションを指定しないでください。現在のところ、"sequencefile", "textfile" および "rcfile" はserde情報を含まず、これらの3つのfileFormats を使ってこのオプションを使うことができます。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim これらのオプションは "textfile" fileFormatを使ってのみ使うことができます。それらは行に区切られたファイルを読み込む方法を定義します。

OPTIONS を使って定義される他の全てのオプションはHive serdeプロパティと見なされるでしょう。

Hiveメタソースの異なるバージョンの相互影響

SparkのHiveサポートの最も重要な部分の一つにHiveメタストアとの対話があります。これによりSpark SQLはHiveのテーブルのメタデータにアクセスすることができます。Spark 1.4.0から、Spark SQLの単独バイナリビルドが以下で説明する設定を使って異なるバージョンのHiveメタストアにクエリするために使うことができるようになりました。Hiveのバージョンの非依存性はmetastoreについて語られるもので、内部的にはSparkSQLは組み込みの Hive に対してコンパイルされ、それらのクラスを内部的な実行に使用することに注意してください。(serdes, UDFs, UDAFsなど)

以下のオプションがメタデータを扱うのに使われるHiveのバージョンを設定するために使うことができます。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.hive.metastore.version 2.3.9 Hiveメタストアのバージョン利用可能なオプションは、0.12.0から2.3.93.0.0から3.1.2です。 1.4.0
spark.sql.hive.metastore.jars ビルトイン HiveMetastoreClientをインスタンス化するために使われるべきjarの場所。このプロパティは以下の4つのオプションのいずれかです:
  1. ビルトイン
  2. -Phiveが有効な場合は、SparkアセンブリにバンドルされているHive 2.3.9を使います。このオプションが選択された場合は、spark.sql.hive.metastore.version2.3.9 あるいは未定義のどちらかでなければなりません。
  3. maven
  4. Mavenリポジトリからダウンロードされた指定のバージョンのHive jarを使用します。この設定は一般的にプロダクション デプロイメントのためにはお勧めされません。
  5. path
  6. カンマ区切り形式のspark.sql.hive.metastore.jars.pathによって設定されるHive jarを使います。ローカルまたはリモートのパスをサポートします。提供されたjarはspark.sql.hive.metastore.versionと同じバージョンでなければなりません。
  7. JVMのための標準フォーマットのクラスパス。このクラスパスはHadoopの正しいバージョンを含む全てのHiveおよびその依存物を含まなければなりません。提供されたjarはspark.sql.hive.metastore.versionと同じバージョンでなければなりません。これらのjarはドライバー上にのみ存在する必要がありますが、もしyarnクラスタモードで動かしている場合は、それらがアプリケーションと一緒にパッケージされるようにしなければなりません。
1.4.0
spark.sql.hive.metastore.jars.path (empty) HiveMetastoreClientをインスタンス化するために使われるjarのカンマ区切りのパス。この設定は、spark.sql.hive.metastore.jarspathに設定されている場合のみ有用です。
パスは以下の形式のいずれかです:
  1. file://path/to/jar/foo.jar
  2. hdfs://nameservice/path/to/jar/foo.jar
  3. /path/to/jar/(URIスキーマの無いパスは、設定fs.defaultFSのURIスキーマに従います)
  4. [http/https/ftp]://path/to/jar/foo.jar
1,2,3はワイルドカードをサポートしていることに注意してください。例えば:
  1. file://path/to/jar/*,file://path2/to/jar/*/*.jar
  2. hdfs://nameservice/path/to/jar/*,hdfs://nameservice2/path/to/jar/*/*.jar
3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

Spark SQLとHiveの特定のバージョンの間で共有されるクラスローダを使ってロードされるべきカンマ区切りのクラスプリフィックスのリスト。共有されるべきクラスの例はJDBCドライバで、メタストアと対話するために必要とされます。共有される必要がある他のクラスは、既に共有されているクラスとやり取りするためのものです。例えば、log4jによって使われる独自のアペンダーです。

1.4.0
spark.sql.hive.metastore.barrierPrefixes (empty)

Spark SQLと通信をする各バージョンのHiveのために明示的にロードされなければならないクラスのプリフィックスのカンマ区切りのリスト。例えば、一般的なprefixで定義されたHive UDFは共有されるでしょう(例えば、org.apache.spark.*)。

1.4.0
TOP
inserted by FC2 system