JDBC から他のデータベースへ
Spark SQLはJDBCを使ってほかのデータベースからデータを読み込むことができるデータソースも含みます。この機能はJdbcRDDを使う上で好まれるべきでしょう。なぜなら結果はデータフレームとして返され、それらはSpark SQLの中で簡単に処理することができるか他のデータソースと繋げることができるからです。 JDBCデータソースはユーザにClassTagの提供を要求しないため、JavaあるいはPythonから簡単に使うことができます。(これは他のアプリケーションがSparkSQLを使ってクエリを実行することができるSpark SQL JDBCサーバと異なることに注意してください)。
開始するためには、sparkのクラスパス上に特定のデータベースのためのJDBCドライバを含む必要があるでしょう。例えば、Sparkシェルからpostgresに接続するには、以下のコマンドを実行するかもしれません:
データソース オプション
Sparkは、JDBCで大文字と小文字を区別しない次のオプションをサポートします。JDBCのデータソースオプションは、次の方法で設定できます:
- the
.option
/.options
methods ofDataFrameReader
DataFrameWriter
- CREATE TABLE USING DATA_SOURCEの
OPTIONS
句
接続プロパティの場合、ユーザはデータソースオプションでJDBC接続プロパティを指定できます。データソースに記録するために、user
と password
は通常接続プロパティとして提供されます。
プロパティ名 | デフォルト | 意味 | スコープ |
---|---|---|---|
url |
(none) |
接続のためのjdbc:subprotocol:subname 形式のJDBC URL。ソース固有の接続プロパティはURL内で指定されるかも知れません。例えば、jdbc:postgresql://localhost/test?user=fred&password=secret
|
read/write |
dbtable |
(none) |
読み込みあるいは書き込みされる必要があるJDBCテーブル。readパスでそれを使う場合は、SQLクエリのFROM 句で有効なものを全て使用できることに注意してください。例えば、完全なテーブルの代わりに、丸括弧内のサブクエリも使うことができます。dbtable と query オプションを同時に指定することはできません。
|
read/write |
query |
(none) |
データをSparkに読み込むために使うクエリ。指定されたクエリは括弧で括られ、FROM 句内でサブクエリとして使われます。Sparkはサブクエリ句にエイリアスも割り当てるでしょう。例として、Sparkは以下の形式のクエリをJDBCソースに発行するでしょう。 SELECT <columns> FROM (<user_specified_query>) spark_gen_alias 以下はこのオプションを使う時の2つの制限です。
|
read/write |
driver |
(none) | このURLに接続するために使われるJDBCドライバのクラス名。 | read/write |
partitionColumn, lowerBound, upperBound |
(none) |
これらのオプションは、いずれかが指定される場合は全て指定されなければなりません。更に、numPartitions が指定されなければなりません。複数のワーカーから並行して読み込む時は、それらはどうやってテーブルを分割するかを説明します。partitionColumn は問題のテーブルからの数字, 日付あるいはタイムスタンプのカラムでなければなりません。lowerBound と upperBound はパーティションのストライドを決めるために使われるだけで、テーブル内の行をフィルタするためのものでは無いことに注意してください。つまりテーブル内の全ての行が分割され返されるでしょう。このオプションは読み込みにのみ適用されます。
|
read |
numPartitions |
(none) |
テーブルの読み書きの並行のために使うことができるパーティションの最大数。これは同時のJDBC接続の最大数も決定します。書き込みのパーティションの数がこの制限を超える場合、書き込む前に coalesce(numPartitions) を呼ぶことで書き込みの数をこの制限まで減らすことができます。
|
read/write |
queryTimeout |
0 |
ドライバーが指定された秒数だけStatementオブジェクトが実行を待つだろう秒数。0は制限が無いことを意味します。書き込みのパスの中では、このオプションはJDBCドライバがどのようにAPI setQueryTimeout を実装するかに依存します。例えば、h2 JDBC ドライバはJDBCバッチ全体の代わりに各クエリのタイムアウトをチェックします。
|
read/write |
fetchsize |
0 |
JDBCの fetchサイズ。これは一回でどれだけの数の行をfetchするかを決定します。これはデフォルトが少ないフェッチサイズのJDBCドライバ上でパフォーマンスを良くします (例えば、10行のOracle)。 | read |
batchsize |
1000 |
JDBCの バッチサイズ。これは一回でどれだけの数の行を挿入するかを決定します。これはJDBCドライバ上でのパフォーマンスを良くするかもしれません。このオプションは書き込みにのみ適用されます。 | write |
isolationLevel |
READ_UNCOMMITTED |
トランザクションの隔離レベル。これは現在の接続に適用されます。NONE , READ_COMMITTED , READ_UNCOMMITTED , REPEATABLE_READ あるいは SERIALIZABLE のうちの一つが可能で、JDBCの接続オブジェクトによって定義される標準的なトランザクション隔離レベルに対応します。デフォルトはREAD_UNCOMMITTED です。java.sql.Connection のドキュメントを参照してください。
|
write |
sessionInitStatement |
(none) |
各データベースセッションがリモートのDBに開かれた後で、データの読み込みを開始する前に、このオプションは独自のSQL文 (あるいは PL/SQL ブロック)を実行します。セッションの初期化コードを実装するためにこれを使ってください。例: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
|
read |
truncate |
false |
これはJDBC writerに関係するオプションです。SaveMode.Overwrite が有効な場合、このオプションはSparkに既存のテーブルをdropして再createする代わりにtruncateさせます。これはもっと効率的で、テーブルのメタデータ(例えば、indices)が削除されることを防ぎます。しかし、新しいデータが異なるスキーマを持つなど、いくつかの場合に動作しないでしょう。失敗した場合、ユーザはtruncate オプションをオフにして、DROP TABLE を再度利用する必要があります。また、DBMS間でTRUNCATE TABLE の動作が異なるため、これを使うことが常に安全であるとは限りません。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect、OracleDialectはこれをサポートしますが、PostgresDialectとデフォルトのJDBCDirectはサポートしません。不明でサポートされていないJDBCDirectの場合、ユーザオプションtruncate は無視されます。
| write |
cascadeTruncate |
the default cascading truncate behaviour of the JDBC database in question, specified in the isCascadeTruncate in each JDBCDialect |
これはJDBC writerに関係するオプションです。JDBCデータベースによって有効化およびサポートされる場合(今のところPostgreSQLとOracle)、このオプションはTRUNCATE TABLE t CASCADE の実行を許可します (PostgreSQLの場合は不注意で子テーブルをtruncateすることを避けるためにTRUNCATE TABLE ONLY t CASCADE が実行されます)。これは他のテーブルに影響するため、注意して使う必要があります。
|
write |
createTableOptions |
|
これはJDBC writerに関係するオプションです。指定された場合、このオプションはテーブルを作成する時にデータベース固有のテーブルとパーティションオプションを設定することができます (例えば CREATE TABLE t (name string) ENGINE=InnoDB. )。
|
write |
createTableColumnTypes |
(none) |
テーブルを作成する時のデフォルトの代わりに使うデータベースカラムのデータ型。データ型の情報は CREATE TABLE カラム構文 (例えば: "name CHAR(64), comments VARCHAR(1024)") と同じ形式で指定されなければなりません。指定された型は有効なspark sql データ型でなければなりません。
|
write |
customSchema |
(none) |
JDBCコネクタからデータを読み込むために使われる独自のスキーマ。例えば、"id DECIMAL(38, 0), name STRING" 。部分フィールドを指定することもでき、その他はデフォルトの型マッピングを使用します。例えば、"id DECIMAL(38, 0)" 。カラム名はJDBCテーブルの対応するカラム名と一致しなければなりません。ユーザはデフォルトを使う代わりにSparkSQLの対応するデータ型を指定することができます。
|
read |
pushDownPredicate |
true |
Predicate push-downをJDBCデータソースに入れることを有効または無効にするためのオプション。デフォルトの値はtrueで、その場合Sparkは出来る限り多くのフィルターをJDBCデータソースにpush downするでしょう。そうでなければ、もしfalseの場合、フィルターはJDBCデータソースにpush downされず、従って全てのフィルターはSparkによって処理されるでしょう。predicateフィルタリングがJDBCデータソースよりもSparkが速く行われる場合、Predicate push-down は通常はオフにされます。 | read |
pushDownAggregate |
false |
V2 JDBCデータソースで集約プッシュダウンを有効または無効にするオプション。デフォルト値はfalseです。この場合、Sparkは集計をJDBCデータソースにプッシュしません。それ以外の場合、trueに設定すると、集約はJDBCデータソースにプッシュダウンされます。集約がJDBCデータソースよりもSparkによって速く行われる場合、集約プッシュダウンは通常はオフにされます。全ての修家関数と関連するフィルターをプッシュダウンできる場合のみ、集計をプッシュダウンできることに注意してください。Sparkは、データソースが集計を完全に完了できないと想定し、データソース出力に対して最終的な集計を行います。 | read |
keytab |
(none) |
JDBCクライアントのkerberosキータブファイル(spark-submitの--files オプションまたは手動で全てのノードに事前にアップロードする必要があります)の場所。パス情報が見つかると、Sparkはキータブが手動で配布されたと見なします。それ以外の場合は、--files が想定されます。キータブ とプリンシパル の両方が定義されている場合、Sparkはkerberos認証を実行しようとします。
|
read/write |
principal |
(none) |
JDBCクライアントのためのkerberosプリンシパルを指定します。キータブ とプリンシパル の両方が定義されている場合、Sparkはkerberos認証を実行しようとします。
|
read/write |
refreshKrb5Config |
false |
このオプションは新しい接続を確立する前に、JDBCクライアントのkerberos設定を更新するかどうかを制御します。設定を更新する場合はtrueに設定し、そうでなければfalseに設定します。デフォルト値はfalseです。このオプションをtrueに設定し、複数の接続を確立しようとすると、競合状態が発生する可能性があることに注意してください。考えられる状況の1つは、以下のようになります。
|
read/write |
キータブを使ったkerberos認証は、JDBCドライバによって常にサポートされているとは限らないことに注意してください。キータブ
とプリンシパル
設定オプションを使う前に、以下の要件が満たされていることを確認してください:
- 含まれているJDBCドライバのバージョンは、キータブを使ったkerberos認証をサポートします。
- 使用するデータベースをサポートする組み込みの札族プロバイダがあります。
以下のデータベース用の組み込み接続プロバイダがあります:
- DB2
- MariaDB
- MS Sql
- Oracle
- PostgreSQL
要件が満たされない場合は、JdbcConnectionProvider
開発者用APIを使って独自の認証を処理することを検討してください。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")