JDBC から他のデータベースへ

Spark SQLはJDBCを使ってほかのデータベースからデータを読み込むことができるデータソースも含みます。この機能はJdbcRDDを使う上で好まれるべきでしょう。なぜなら結果はデータフレームとして返され、それらはSpark SQLの中で簡単に処理することができるか他のデータソースと繋げることができるからです。 JDBCデータソースはユーザにClassTagの提供を要求しないため、JavaあるいはPythonから簡単に使うことができます。(これは他のアプリケーションがSparkSQLを使ってクエリを実行することができるSpark SQL JDBCサーバと異なることに注意してください)。

開始するためには、sparkのクラスパス上に特定のデータベースのためのJDBCドライバを含む必要があるでしょう。例えば、Sparkシェルからpostgresに接続するには、以下のコマンドを実行するかもしれません:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

リモートデータベースからのテーブルは、データソースAPIを使ってデータフレームあるいはSpark SQLのテンポラリ ビューとしてロードすることができます。ユーザはデータソースオプションの中でJDBC接続プロパティを指定することができます。データソースに記録するために、userpassword は通常接続プロパティとして提供されます。接続プロパティに加えて、Sparkは以下の大文字小文字を区別しないオプションもサポートします:

プロパティ名意味
url 接続するための JDBC URLソース固有の接続プロパティはURL内で指定されるかも知れません。例えば, jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 読み込みあるいは書き込みされる必要があるJDBCテーブル。readパスでそれを使う場合は、SQLクエリのFROM句で有効なものを全て使用できることに注意してください。例えば、完全なテーブルの代わりに、丸括弧内のサブクエリも使うことができます。dbtablequery オプションを同時に指定することはできません。
query データをSparkに読み込むために使うクエリ。指定されたクエリは括弧で括られ、FROM 句内でサブクエリとして使われます。Sparkはサブクエリ句にエイリアスも割り当てるでしょう。例として、Sparkは以下の形式のクエリをJDBCソースに発行するでしょう。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

以下はこのオプションを使う時の2つの制限です。
  1. dbtablequery オプションを同時に指定することはできません。
  2. querypartitionColumn オプションを同時に指定することはできません。partitionColumn オプションの指定が必要な場合、サブクエリは代わりに dbtable オプションを使って指定することができ、パーティションのカラムは dbtable の一部として提供されるサブクエリのエイリアスを使って資格を与えることができます。
    例:
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
driver このURLに接続するために使われるJDBCドライバのクラス名。
partitionColumn, lowerBound, upperBound これらのオプションは、いずれかが指定される場合は全て指定されなければなりません。更に、numPartitionsが指定されなければなりません。複数のワーカーから並行して読み込む時は、それらはどうやってテーブルを分割するかを説明します。partitionColumn は問題のテーブルからの数字, 日付あるいはタイムスタンプのカラムでなければなりません。lowerBoundupperBound はパーティションのストライドを決めるために使われるだけで、テーブル内の行をフィルタするためのものでは無いことに注意してください。つまりテーブル内の全ての行が分割され返されるでしょう。このオプションは読み込みにのみ適用されます。
numPartitions テーブルの読み書きの並行のために使うことができるパーティションの最大数。これは同時のJDBC接続の最大数も決定します。書き込みのパーティションの数がこの制限を超える場合、書き込む前に coalesce(numPartitions) を呼ぶことで書き込みの数をこの制限まで減らすことができます。
queryTimeout ドライバーが指定された秒数だけStatementオブジェクトが実行を待つだろう秒数。0は制限が無いことを意味します。書き込みのパスの中では、このオプションはJDBCドライバがどのようにAPI setQueryTimeoutを実装するかに依存します。例えば、h2 JDBC ドライバはJDBCバッチ全体の代わりに各クエリのタイムアウトをチェックします。デフォルトは 0 です。
fetchsize JDBCの fetchサイズ。これは一回でどれだけの数の行をfetchするかを決定します。これはデフォルトが少ないフェッチサイズのJDBCドライバ上でパフォーマンスを良くします (例えば、Oracleは10行)。このオプションは読み込みにのみ適用されます。
batchsize JDBCの バッチサイズ。これは一回でどれだけの数の行を挿入するかを決定します。これはJDBCドライバ上でのパフォーマンスを良くするかもしれません。このオプションは書き込みにのみ適用されます。デフォルトは1000です。
isolationLevel トランザクションの隔離レベル。これは現在の接続に適用されます。NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ あるいは SERIALIZABLE のうちの一つが可能で、JDBCの接続オブジェクトによって定義される標準的なトランザクション隔離レベルに対応します。デフォルトはREAD_UNCOMMITTEDです。このオプションは書き込みにのみ適用されます。java.sql.Connectionのドキュメントを参照してください。
sessionInitStatement 各データベースセッションがリモートのDBに開かれた後で、データの読み込みを開始する前に、このオプションは独自のSQL文 (あるいは PL/SQL ブロック)を実行します。セッションの初期化コードを実装するためにこれを使ってください。例: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate これはJDBC writerに関係するオプションです。SaveMode.Overwriteが有効な場合、このオプションはSparkに既存のテーブルをdropして再createする代わりにtruncateさせます。これはもっと効率的で、テーブルのメタデータ(例えば、indices)が削除されることを防ぎます。しかし、新しいデータが異なるスキーマを持つなど、いくつかの場合に動作しないでしょう。デフォルトはfalseです。このオプションは書き込みにのみ適用されます。
cascadeTruncate これはJDBC writerに関係するオプションです。JDBCデータベースによって有効化およびサポートされる場合(今のところPostgreSQLとOracle)、このオプションはTRUNCATE TABLE t CASCADEの実行を許可します (PostgreSQLの場合は不注意で子テーブルをtruncateすることを避けるためにTRUNCATE TABLE ONLY t CASCADE が実行されます)。これは他のテーブルに影響するため、注意して使う必要があります。このオプションは書き込みにのみ適用されます。各JDBCDialect内のisCascadeTruncateで指定される該当のJDBCデータベースのデフォルトのtruncateのカスケードの挙動をデフォルトとします。
createTableOptions これはJDBC writerに関係するオプションです。指定された場合、このオプションはテーブルを作成する時にデータベース固有のテーブルとパーティションオプションを設定することができます (例えば CREATE TABLE t (name string) ENGINE=InnoDB.)。このオプションは書き込みにのみ適用されます。
createTableColumnTypes テーブルを作成する時のデフォルトの代わりに使うデータベースカラムのデータ型。データ型の情報は CREATE TABLE カラム構文 (例えば: "name CHAR(64), comments VARCHAR(1024)") と同じ形式で指定されなければなりません。指定された型は有効なspark sql データ型でなければなりません。このオプションは書き込みにのみ適用されます。
customSchema JDBCコネクタからデータを読み込むために使われる独自のスキーマ。例えば、"id DECIMAL(38, 0), name STRING"。部分フィールドを指定することもでき、その他はデフォルトの型マッピングを使用します。例えば、"id DECIMAL(38, 0)"。カラム名はJDBCテーブルの対応するカラム名と一致しなければなりません。ユーザはデフォルトを使う代わりにSparkSQLの対応するデータ型を指定することができます。このオプションは読み込みにのみ適用されます。
pushDownPredicate Predicate push-downをJDBCデータソースに入れることを有効または無効にするためのオプション。デフォルトの値はtrueで、その場合Sparkは出来る限り多くのフィルターをJDBCデータソースにpush downするでしょう。そうでなければ、もしfalseの場合、フィルターはJDBCデータソースにpush downされず、従って全てのフィルターはSparkによって処理されるでしょう。predicateフィルタリングがJDBCデータソースよりもSparkが速く行われる場合、Predicate push-down は通常はオフにされます。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable
TOP
inserted by FC2 system