JDBC から他のデータベースへ

Spark SQLはJDBCを使ってほかのデータベースからデータを読み込むことができるデータソースも含みます。この機能はJdbcRDDを使う上で好まれるべきでしょう。なぜなら結果はデータフレームとして返され、それらはSpark SQLの中で簡単に処理することができるか他のデータソースと繋げることができるからです。 JDBCデータソースはユーザにClassTagの提供を要求しないため、JavaあるいはPythonから簡単に使うことができます。(これは他のアプリケーションがSparkSQLを使ってクエリを実行することができるSpark SQL JDBCサーバと異なることに注意してください)。

開始するためには、sparkのクラスパス上に特定のデータベースのためのJDBCドライバを含む必要があるでしょう。例えば、Sparkシェルからpostgresに接続するには、以下のコマンドを実行するかもしれません:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

データソース オプション

Sparkは、JDBCで大文字と小文字を区別しない次のオプションをサポートします。JDBCのデータソースオプションは、次の方法で設定できます:

接続プロパティの場合、ユーザはデータソースオプションでJDBC接続プロパティを指定できます。データソースに記録するために、userpassword は通常接続プロパティとして提供されます。

プロパティ名デフォルト意味スコープ
url (none) 接続のためのjdbc:subprotocol:subname形式のJDBC URL。ソース固有の接続プロパティはURL内で指定されるかも知れません。例えば、jdbc:postgresql://localhost/test?user=fred&password=secret read/write
dbtable (none) 読み込みあるいは書き込みされる必要があるJDBCテーブル。readパスでそれを使う場合は、SQLクエリのFROM句で有効なものを全て使用できることに注意してください。例えば、完全なテーブルの代わりに、丸括弧内のサブクエリも使うことができます。dbtablequery オプションを同時に指定することはできません。 read/write
query (none) データをSparkに読み込むために使うクエリ。指定されたクエリは括弧で括られ、FROM 句内でサブクエリとして使われます。Sparkはサブクエリ句にエイリアスも割り当てるでしょう。例として、Sparkは以下の形式のクエリをJDBCソースに発行するでしょう。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

以下はこのオプションを使う時の2つの制限です。
  1. dbtablequery オプションを同時に指定することはできません。
  2. querypartitionColumn オプションを同時に指定することはできません。partitionColumn オプションの指定が必要な場合、サブクエリは代わりに dbtable オプションを使って指定することができ、パーティションのカラムは dbtable の一部として提供されるサブクエリのエイリアスを使って資格を与えることができます。
    例:
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
read/write
driver (none) このURLに接続するために使われるJDBCドライバのクラス名。 read/write
partitionColumn, lowerBound, upperBound (none) これらのオプションは、いずれかが指定される場合は全て指定されなければなりません。更に、numPartitionsが指定されなければなりません。複数のワーカーから並行して読み込む時は、それらはどうやってテーブルを分割するかを説明します。partitionColumn は問題のテーブルからの数字, 日付あるいはタイムスタンプのカラムでなければなりません。lowerBoundupperBound はパーティションのストライドを決めるために使われるだけで、テーブル内の行をフィルタするためのものでは無いことに注意してください。つまりテーブル内の全ての行が分割され返されるでしょう。このオプションは読み込みにのみ適用されます。 read
numPartitions (none) テーブルの読み書きの並行のために使うことができるパーティションの最大数。これは同時のJDBC接続の最大数も決定します。書き込みのパーティションの数がこの制限を超える場合、書き込む前に coalesce(numPartitions) を呼ぶことで書き込みの数をこの制限まで減らすことができます。 read/write
queryTimeout 0 ドライバーが指定された秒数だけStatementオブジェクトが実行を待つだろう秒数。0は制限が無いことを意味します。書き込みのパスの中では、このオプションはJDBCドライバがどのようにAPI setQueryTimeoutを実装するかに依存します。例えば、h2 JDBC ドライバはJDBCバッチ全体の代わりに各クエリのタイムアウトをチェックします。 read/write
fetchsize 0 JDBCの fetchサイズ。これは一回でどれだけの数の行をfetchするかを決定します。これはデフォルトが少ないフェッチサイズのJDBCドライバ上でパフォーマンスを良くします (例えば、10行のOracle)。 read
batchsize 1000 JDBCの バッチサイズ。これは一回でどれだけの数の行を挿入するかを決定します。これはJDBCドライバ上でのパフォーマンスを良くするかもしれません。このオプションは書き込みにのみ適用されます。 write
isolationLevel READ_UNCOMMITTED トランザクションの隔離レベル。これは現在の接続に適用されます。NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ あるいは SERIALIZABLE のうちの一つが可能で、JDBCの接続オブジェクトによって定義される標準的なトランザクション隔離レベルに対応します。デフォルトはREAD_UNCOMMITTEDです。java.sql.Connectionのドキュメントを参照してください。 write
sessionInitStatement (none) 各データベースセッションがリモートのDBに開かれた後で、データの読み込みを開始する前に、このオプションは独自のSQL文 (あるいは PL/SQL ブロック)を実行します。セッションの初期化コードを実装するためにこれを使ってください。例: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") read
truncate false これはJDBC writerに関係するオプションです。SaveMode.Overwriteが有効な場合、このオプションはSparkに既存のテーブルをdropして再createする代わりにtruncateさせます。これはもっと効率的で、テーブルのメタデータ(例えば、indices)が削除されることを防ぎます。しかし、新しいデータが異なるスキーマを持つなど、いくつかの場合に動作しないでしょう。失敗した場合、ユーザはtruncateオプションをオフにして、DROP TABLEを再度利用する必要があります。また、DBMS間でTRUNCATE TABLEの動作が異なるため、これを使うことが常に安全であるとは限りません。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect、OracleDialectはこれをサポートしますが、PostgresDialectとデフォルトのJDBCDirectはサポートしません。不明でサポートされていないJDBCDirectの場合、ユーザオプションtruncateは無視されます。 write
cascadeTruncate the default cascading truncate behaviour of the JDBC database in question, specified in the isCascadeTruncate in each JDBCDialect これはJDBC writerに関係するオプションです。JDBCデータベースによって有効化およびサポートされる場合(今のところPostgreSQLとOracle)、このオプションはTRUNCATE TABLE t CASCADEの実行を許可します (PostgreSQLの場合は不注意で子テーブルをtruncateすることを避けるためにTRUNCATE TABLE ONLY t CASCADE が実行されます)。これは他のテーブルに影響するため、注意して使う必要があります。 write
createTableOptions これはJDBC writerに関係するオプションです。指定された場合、このオプションはテーブルを作成する時にデータベース固有のテーブルとパーティションオプションを設定することができます (例えば CREATE TABLE t (name string) ENGINE=InnoDB.)。 write
createTableColumnTypes (none) テーブルを作成する時のデフォルトの代わりに使うデータベースカラムのデータ型。データ型の情報は CREATE TABLE カラム構文 (例えば: "name CHAR(64), comments VARCHAR(1024)") と同じ形式で指定されなければなりません。指定された型は有効なspark sql データ型でなければなりません。 write
customSchema (none) JDBCコネクタからデータを読み込むために使われる独自のスキーマ。例えば、"id DECIMAL(38, 0), name STRING"。部分フィールドを指定することもでき、その他はデフォルトの型マッピングを使用します。例えば、"id DECIMAL(38, 0)"。カラム名はJDBCテーブルの対応するカラム名と一致しなければなりません。ユーザはデフォルトを使う代わりにSparkSQLの対応するデータ型を指定することができます。 read
pushDownPredicate true Predicate push-downをJDBCデータソースに入れることを有効または無効にするためのオプション。デフォルトの値はtrueで、その場合Sparkは出来る限り多くのフィルターをJDBCデータソースにpush downするでしょう。そうでなければ、もしfalseの場合、フィルターはJDBCデータソースにpush downされず、従って全てのフィルターはSparkによって処理されるでしょう。predicateフィルタリングがJDBCデータソースよりもSparkが速く行われる場合、Predicate push-down は通常はオフにされます。 read
pushDownAggregate false V2 JDBCデータソースで集約プッシュダウンを有効または無効にするオプション。デフォルト値はfalseです。この場合、Sparkは集計をJDBCデータソースにプッシュしません。それ以外の場合、trueに設定すると、集約はJDBCデータソースにプッシュダウンされます。集約がJDBCデータソースよりもSparkによって速く行われる場合、集約プッシュダウンは通常はオフにされます。全ての修家関数と関連するフィルターをプッシュダウンできる場合のみ、集計をプッシュダウンできることに注意してください。Sparkは、データソースが集計を完全に完了できないと想定し、データソース出力に対して最終的な集計を行います。 read
keytab (none) JDBCクライアントのkerberosキータブファイル(spark-submitの--filesオプションまたは手動で全てのノードに事前にアップロードする必要があります)の場所。パス情報が見つかると、Sparkはキータブが手動で配布されたと見なします。それ以外の場合は、--filesが想定されます。キータブプリンシパルの両方が定義されている場合、Sparkはkerberos認証を実行しようとします。 read/write
principal (none) JDBCクライアントのためのkerberosプリンシパルを指定します。キータブプリンシパルの両方が定義されている場合、Sparkはkerberos認証を実行しようとします。 read/write
refreshKrb5Config false このオプションは新しい接続を確立する前に、JDBCクライアントのkerberos設定を更新するかどうかを制御します。設定を更新する場合はtrueに設定し、そうでなければfalseに設定します。デフォルト値はfalseです。このオプションをtrueに設定し、複数の接続を確立しようとすると、競合状態が発生する可能性があることに注意してください。考えられる状況の1つは、以下のようになります。
  1. refreshKrb5Configフラグはセキュリティコンテキスト1で設定されます
  2. 対応するDBMSにはJDBC接続プロバイダが使われます
  3. krb5.confは変更されていますが、JVMはそれをリロードする必要があることをまだ認識していません
  4. Sparkはセキュリティコンテキスト1を正常に認証します
  5. JVMは、変更されたkrb5.confからセキュリティコンテキスト2をロードします
  6. Sparkは以前に保存されたセキュリティコンテキスト1を復元します
  7. 変更されたkrb5.confコンテンツが無くなりました
read/write

キータブを使ったkerberos認証は、JDBCドライバによって常にサポートされているとは限らないことに注意してください。
キータブプリンシパル設定オプションを使う前に、以下の要件が満たされていることを確認してください:

以下のデータベース用の組み込み接続プロバイダがあります:

要件が満たされない場合は、JdbcConnectionProvider開発者用APIを使って独自の認証を処理することを検討してください。

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
例の完全なコードは Spark のリポジトリの "examples/src/main/r/RSparkSQLExample.R" で見つかります。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable
TOP
inserted by FC2 system