Sparkの設定

Sparkはシステムを設定するための3つの場所を提供します:

Sparkのプロパティ

Spark プロパティはほとんどのアプリケーションの設定を制御し、各アプリケーションに対して別々に設定されます。これらのプロパティは SparkContextに渡して直接SparkConf に設定することができます。SparkConfを使って共通プロパティの幾つかを設定(例えば、マスターURLおよびアプリケーション名)することができ、set()メソッドを使って任意のキー値ペアを設定することができます。例えば、以下のように2つのスレッドを使ってアプリケーションを初期化することができます:

local[2]をつけて実行、2つのスレッド - "最小限"の並列処理、することにより、分散コンテキストで実行した場合にのみ存在するバグを検出するのに役立つことに注意してください。

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

ローカルモードで1つ以上のスレッドを持つことができ、実際にSpark Streamingのような場合にはリソース不足を避けるために1つ以上のスレッドを要求するかも知れないということに注意してください。

なんらかの期間を指定するプロパティは時間の単位で設定されなければなりません。以下の書式が受け付けられます:

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

バイトサイズを指定するプロパティはサイズの単位を使って設定されなければなりません。以下の書式が受け付けられます:

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

単位が無い数値は一般的にバイトとして解釈されますが、2,3のものはKiBあるいはMiBとして解釈されます。個々の設定プロパティのドキュメントを見てください。可能な箇所では単位の指定が望ましいです。

Sparkプロパティの動的なロード

時には、ある設定をSparkConfにハードコーディングしたくないかも知れません。例えば、もし同じアプリケーションを異なるマスターあるいは異なるメモリ量で実行したいなど。Sparkは空のconfを単純に作成することができます:

val sc = new SparkContext(new SparkConf())

そうすると、実行時に設定値を提供することができます:

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark シェルおよび spark-submit ツールは動的に設定をロードする2つの方法を提供します。最初のものは上で示されたように、--masterのようなコマンドラインのオプションです。spark-submit--conf/-c フラグを使ってどのようなSpark プロパティも受け付けることができますが、Sparkアプリケーションを起動する時に役割があるプロパティのために特別なフラグを使うようにしてください。./bin/spark-submit --helpを実行すると、これらのオプションの完全なリストが表示されるでしょう。

bin/spark-submitは設定オプションを conf/spark-defaults.confからも読み込むでしょう。各行はホワイトスペースで区切られたキーと値です。例えば:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

フラグで指定された、あるいはプロパティファイルの全ての値はアプリケーションに渡され、SparkConfを通じてマージされるでしょう。SparkConfで直接設定されたプロパティは優先度が高く、その次はspark-submit あるいは spark-shellに渡されたもので、その次はspark-defaults.conf ファイル内のオプションです。以前のSparkのバージョンから2,3の設定キーの名前が変更されました; そのような場合、古いキーの名前はまだ受け付けられますが、新しいキーのどのインスタンスよりも優先度が低くなります。

Sparkのプロパティは主に2つの種類に分けることができます; 1つは “spark.driver.memory”, “spark.executor.instances” のような配備に関係します。この種類のプロパティはプログラム的に実行時にSparkConfを使って設定する時に影響を受けないでしょう。あるいはその挙動はどのクラスタマネージャと配備モードを選択したかに依存するため、設定ファイルまたは spark-submit コマンドラインのオプションを使って設定することが望まれます; もう1つは “spark.task.maxFailures” のような主にSparkのruntimeの制御に関係し、この種類のプロパティはどちらかの方法で設定することができます。

Sparkプロパティのビュー

http://<driver>:4040 のアプリケーションUIは、"Environment"タブの中でSparkプロパティをリスト表示します。ここはプロパティが正しく設定されたか確認するのに便利な場所です。spark-defaults.conf, SparkConf, あるいはコマンドラインで明示的に指定された値だけが表示されるだろうことに注意してください。他の全ての設定プロパティについては、デフォルトの値が使われると見なすことができます。

利用可能なプロパティ

内部設定を制御するほとんどのプロパティは意味のあるデフォルト値を持ちます。最も一般的なオプションの幾つかは以下のように設定されます。

アプリケーションのプロパティ

プロパティ名デフォルト意味これ以降のバージョンから
spark.app.name (none) アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。 0.9.0
spark.driver.cores 1 ドライバープロセスのために使われるコアの数。クラスターモードのみです。 1.3.0
spark.driver.maxResultSize 1g 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズのバイト数の制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。 1.2.0
spark.driver.memory 1g ドライバープロセス、つまりSparkContextが初期化される時に使用されるメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
1.1.1
spark.driver.memoryOverhead 最小384で、driverMemory * 0.10 クラスタモード時のドライバプロセスごとに割り当てられる非ヒープメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはコンテナのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところ YARN、Kubernetes、Kubernetes でサポートされます。注意: オフヒープメモリを含む非ヒープ(spark.memory.offHeap.enabled=true の時)、他のドライバプロセスで使われるメモリ(例えば、PySparkドライバと対になるpythonプロセス)、同じコンテナ内で実行中の他の非ドライバプロセスによって使われるメモリ。実行中のドライバのコンテナの最大メモリサイズは、spark.driver.memoryOverheadspark.driver.memory の合計によって決定されます。 2.3.0
spark.driver.resource.{resourceName}.amount 0 ドライバで使用する特定のリソース型の量。これを使う場合、ドライバが起動時のリソースを見つけられるように、spark.driver.resource.{resourceName}.discoveryScript も指定する必要があります。 3.0.0
spark.driver.resource.{resourceName}.discoveryScript None ドライバが特定のリソース型を見つけられるように実行するスクリプト。これは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、名前とアドレスの配列を持ちます。クライアントがサブミットしたドライバのために、ディスカバリ スクリプトは同じホスト上の他のドライバと比較して異なるリソースアドレスをこのドライバに割り当てる必要があります。 3.0.0
spark.driver.resource.{resourceName}.vendor None ドライバで使用するリソースのベンダー。このオプションは現在のところKubernetes上でのみサポートされ、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインです。(例えば、Kubernetes 上の GPU については、この設定は nvidia.com または amd.com に設定されます) 3.0.0
spark.resources.discoveryPlugin org.apache.spark.resource.ResourceDiscoveryScriptPlugin アプリケーションへ読み込まれる org.apache.spark.api.resource.ResourceDiscoveryPlugin を実装するクラス名のカンマ区切りのリスト。これは上級ユーザが独自の実装を持つリソースディスカバリクラスを置き換えるためのものです。Spark は、指定された各クラスを、それらの1つがそのリソースの情報を返すまで試行します。It tries the discovery script last if none of the plugins return information for that resource. 3.0.0
spark.executor.memory 1g executorプロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。 0.7.0
spark.executor.pyspark.memory 設定無し executorごとにPySparkに割り当てられるメモリ量。別に指定されていなければMiB単位。設定された場合、executorのためのPySparkメモリはこの量に制限されるでしょう。設定されない場合、SparkはPythonのメモリ使用を制限せず、他の非JVMプロセスと共有されるオーバーヘッドのメモリ空間を超えないようにするのはアプリケーションの責任です。PySparkがYARNあるいはKubernetes内で実行される場合、子のメモリはexecutorのリソースリクエストに追加されます。
注意: この機能はPythonの `resource` モジュールに依存します; 従って、その挙動と制限が継承されます。例えば、Windowsはリソースの制限をサポートせず、MacOS 上では実際のリソースは制限されません。
2.4.0
spark.executor.memoryOverhead 最小384で、executorMemory * 0.10。 executor プロセスごとに割り当てられる追加のメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはexecutorのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところYARNとKubernetesでサポートされます。
Note: Additional memory includes PySpark executor memory (when spark.executor.pyspark.memory is not configured) and memory used by other non-executor processes running in the same container. 実行中の executor のコンテナの最大メモリサイズは、spark.executor.memoryOverheadspark.executor.memoryspark.memory.offHeap.sizespark.executor.pyspark.memory の合計によって決定されます。
2.3.0
spark.executor.resource.{resourceName}.amount 0 executor プロセスごとに使用する特定のリソース型の量。これを使う場合、executor が起動時のリソースを見つけられるように、spark.executor.resource.{resourceName}.discoveryScript も指定する必要があります。 3.0.0
spark.executor.resource.{resourceName}.discoveryScript None executor が特定のリソース型を見つけられるように実行するスクリプト。これは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、名前とアドレスの配列を持ちます。 3.0.0
spark.executor.resource.{resourceName}.vendor None executor で使用するリソースのベンダー。このオプションは現在のところKubernetes上でのみサポートされ、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインです。(例えば、Kubernetes 上の GPU については、この設定は nvidia.com または amd.com に設定されます) 3.0.0
spark.extraListeners (none) SparkListenerを実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。 1.3.0
spark.local.dir /tmp Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。
注意: これは、クラスタマネージャーによって設定される、SPARK_LOCAL_DIRS (Standalone)、MESOS_SANDBOX (Mesos) あるいは LOCAL_DIRS (YARN) 環境変数によって上書きされます。
0.5.0
spark.logConf false SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。 0.9.0
spark.master (none) 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。 0.9.0
spark.submit.deployMode (none) Sparkドライバプログラムの配備モード、"client"あるいは"cluster"、これはドライバープログラムをローカル("client")あるいはクラスタ内のノードの1つの上で遠隔で("cluster")起動することを意味します。 1.5.0
spark.log.callerContext (none) Yarn/HDFS上で動作している時に Yarn RM log/HDFS audit ログに書き込まれるアプリケーションの情報。その長さはhadoop.caller.context.max.sizeのHadoop設定に依存します。簡潔でなければならず、一般的に50文字まで持つことができます。 2.2.0
spark.driver.supervise false trueであれば、非0の終了ステータスで失敗した場合にドライバは自動的に再起動します。Spark スタンドアローン モードあるいは Mesos クラスタ配備モードでのみ効果があります。 1.3.0
spark.driver.log.dfsDir (none) spark.driver.log.persistToDfs.enabled が true であれば、Spark ドライバのログが同期されるベースディレクトリ。このベースディレクトリ内で、各アプリケーションはアプリケーション固有のファイルへドライバログを記録します。ユーザは、これを HDFS ディレクトリのような統一された場所に設定して、後で使用する前にドライバログファイルを保持できるようにすることができます。このディレクリは、全ての Spark ユーザがファイルを読み書きでき、Spark履歴サーバユーザがファイルを削除できる必要があります。さらに、spark.history.fs.driverlog.cleaner.enabled が true で、spark.history.fs.driverlog.cleaner.maxAge で設定される最大の古いさよりも古い場合に、個のディレクトリの古いログは Spark 履歴サーバ によって消去されます。 3.0.0
spark.driver.log.persistToDfs.enabled false true の場合、クライアントモードで実行中の spark アプリケーションはドライバーログをspark.driver.log.dfsDir で設定される永続化ストレージに書き込みます。もし spark.driver.log.dfsDir が設定されていない場合は、ドライバーログは永続化されません。さらに、spark.history.fs.driverlog.cleaner.enabled を true に設定することで、Spark 履歴サーバ でクリーナーを有効にします。 3.0.0
spark.driver.log.layout %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n spark.driver.log.dfsDir に同期されるドライバーログのレイアウト。設定されていない場合、log4j.properties で定義される最初のアペンダーのレイアウトが使われます。それも設定されていない場合、ドライバーログはデフォルトのレイアウトを使います。 3.0.0
spark.driver.log.allowErasureCoding false ドライバーのログが erasure コーディングを使うことができるかどうか。On HDFS, erasure coded files will not update as quickly as regular replicated files, so they make take longer to reflect changes written by the application. Note that even if this is true, Spark will still not force the file to use erasure coding, it will simply use file system defaults. 3.0.0

これとは別に、以下のプロパティも利用可能で、ある状況では有用かも知れません。

ランタイム環境

プロパティ名デフォルト意味これ以降のバージョンから
spark.driver.extraClassPath (none) ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
1.0.0
spark.driver.defaultJavaOptions (none) spark.driver.extraJavaOptions に追加されるデフォルトの JVM オプションの文字列。これは管理者によって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memoryを、クライアントモードでは--driver-memoryコマンドラインオプションを使って設定することができます。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
3.0.0
spark.driver.extraJavaOptions (none) ドライバに渡す特別なJVMオプションの文字列。これはユーザによって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memoryを、クライアントモードでは--driver-memory コマンドラインオプションを使って設定することができます。
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。spark.driver.defaultJavaOptions がこの設定に追加されます。
1.0.0
spark.driver.extraLibraryPath (none) ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
1.0.0
spark.driver.userClassPathFirst false (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。 1.3.0
spark.executor.extraClassPath (none) executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。 1.0.0
spark.executor.defaultJavaOptions (none) spark.executor.extraJavaOptions に追加されるデフォルトの JVM オプションの文字列。これは管理者によって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。シンボルが続く場合、手を加えられるでしょう: アプリケーションIDによって置き換えられ、executor IDによって置き換えられるでしょう。例えば、/tmp 内にappのexecutor IDの名前をとるファイルへの冗長なgcログを有効にするには、以下の 'value' を渡します: -verbose:gc -Xloggc:/tmp/-.gc 3.0.0
spark.executor.extraJavaOptions (none) executorに渡すための特別なJVMオプションの文字列これはユーザによって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。シンボルが続く場合、手を加えられるでしょう: アプリケーションIDによって置き換えられ、executor IDによって置き換えられるでしょう。例えば、/tmp 内にappのexecutor IDの名前をとるファイルへの冗長なgcログを有効にするには、以下の 'value' を渡します: -verbose:gc -Xloggc:/tmp/-.gc spark.executor.defaultJavaOptions がこの設定に追加されます。 1.0.0
spark.executor.extraLibraryPath (none) executor JVMを起動する場合に使う特別なライブラリパスを設定する。 1.0.0
spark.executor.logs.rolling.maxRetainedFiles (none) システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。 1.1.0
spark.executor.logs.rolling.enableCompression false executor ログの圧縮を有効にします。有効な場合、ロールされたexecutorログは圧縮されるでしょう。デフォルトは無効です。 2.0.2
spark.executor.logs.rolling.maxSize (none) executorのログがロールオーバーされる最大のファイルサイズをバイトで設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。 1.4.0
spark.executor.logs.rolling.strategy (none) executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するために>spark.executor.logs.rolling.maxSize を使用します。 1.1.0
spark.executor.logs.rolling.time.interval daily executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdaily, hourly, minutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFilesを見てください。 1.1.0
spark.executor.userClassPathFirst false (実験的なもの) spark.driver.userClassPathFirstと同じ機能ですが、executorインスタンスに適用されます。 1.3.0
spark.executorEnv.[EnvironmentVariableName] (none) EnvironmentVariableNameによって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。 0.9.0
spark.redaction.regex (?i)secret|password|token ドライバおよびexecutor環境の中で、Spark設定プロパティと環境変数のどちらが保護必要情報を持つかを決定する正規表現。この正規表現がプロパティのキーあるいは値に一致した場合、その値は環境のUIおよびYARNとイベントログのような様々なログから作成されます。 2.1.2
spark.python.profile false Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles()によって表示されるか、ドライバが終了する前に表示されるでしょう。sc.dump_profiles(path)によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。 1.2.0
spark.python.profile.dump (none) ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはpstats.Stats()によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。 1.2.0
spark.python.worker.memory 512m 集約の間にpythonワーカープロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。 1.1.0
spark.python.worker.reuse true Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。 1.2.0
spark.files 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。globも可能です。 1.0.0
spark.submit.pyFiles PythonアプリのためのPYTHONPATH上に配置されるカンマ区切りの .zip, .egg あるいは .py ファイル。globも可能です。 1.0.1
spark.jars ドライバーとexecutorのクラスパス上でインクルードするjarのカンマ区切りのリスト。globも可能です。 0.9.0
spark.jars.packages ドライバーとexecutorのクラスパス上でインクルードされる Maven coordinateのjarのカンマ区切りのリストcoordinateは groupId:artifactId:version でなければなりません。spark.jars.ivySettingsが指定された場合、artifacts はファイル内の設定に応じて解決されるでしょう。そうでなければ artifacts はローカルのmavenリポジトリ、次にmavenセントラル、最後にコマンドラインオプション--repositoriesによって指定される追加のリモートリポジトリから探されるでしょう。詳細は、上級の依存管理を見てください。 1.5.0
spark.jars.excludes spark.jars.packagesで与えらえた依存を解決する間に依存性の衝突を避けるために除外する、groupId:artifactId のカンマ区切りのリスト。 1.5.0
spark.jars.ivy Ivyユーザディレクトリを指定するパス。spark.jars.packagesからローカルのIvyキャッシュおよびパッケージファイルのために使われます。これはデフォルトが ~/.ivy2 の Ivyプロパティ ivy.default.ivy.user.dir を上書くでしょう。 1.3.0
spark.jars.ivySettings mavenセントラルのような組み込みのデフォルトの代わりに、spark.jars.packages を使って指定されるjarの解決を独自に行うためのIvy設定ファイルへのパス。コマンドラインオプション--repositoriesあるいはspark.jars.repositories によって指定される追加のリポジトリも含まれるでしょう。例えば Artifactory のような社内のartifactサーバを使って、ファイアウォールの背後からSparkがartifactを解決できるようにするのに便利です。ファイルフォーマットの設定の詳細は設定ファイルで見つけることができます。file:// スキーマのパスだけがサポートされます。スキーマが無いパスは、file:// スキーマと見なされます。

When running in YARN cluster mode, this file will also be localized to the remote driver for dependency resolution within SparkContext#addJar

2.2.0
spark.jars.repositories --packages あるいは spark.jars.packagesを使って与えられるmaven coordinateのために検索される、カンマ区切りの追加のリモートリポジトリ。 2.3.0
spark.archives 各executorの作業ディレクトリに解凍される圧縮ファイルのカンマ区切りのリスト。.jar, .tar.gz, .tgz and .zip are supported. 解凍するファイル名の後に#を追加することで、解凍先のディレクトリ名を指定できます。例えば、file.zip#directory。この設定は実験的なものです。 3.1.0
spark.pyspark.driver.python ドライバ内でPySparkが使うPythonバイナリ実行ファイル。(デフォルトは spark.pyspark.pythonです) 2.1.0
spark.pyspark.python ドライバと実行ファイルの両方の中でPySparkが使うPythonバイナリ実行ファイル。 2.1.0

シャッフルの挙動

プロパティ名デフォルト意味これ以降のバージョンから
spark.reducer.maxSizeInFlight 48m 各reduceタスクから同時に取り出すmap出力の最大サイズ。別に指定されていなければMiB単位。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。 1.4.0
spark.reducer.maxReqsInFlight Int.MaxValue この設定はリモートリクエストの数を指定された値までブロックの検索を制限します。クラスタ内のホストの数が増えた場合は、1つ以上のノードからやってくる接続の数が大量になり、ワーカーが負荷のために落ちるかも知れません。取得リクエストの数を制限できるようにすることで、このシナリオは緩和されるかも知れません。 2.0.0
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue この設定は指定されたホスト ポートからのreduceタスク毎に取得されるリモートブロックの数を制限します。大量のブロックが1つの取得あるいは並行して指定されたアドレスからリクエストされる場合、これは提供するexecutorあるいはノードマネージャーをクラッシュするかもしれません。これは外部シャッフルが有効な時にノードマネージャーの負荷を減らすのに特に有用です。それを低い値に設定することでこの問題を緩和することができます。 2.2.1
spark.shuffle.compress true map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 0.6.0
spark.shuffle.file.buffer 32k 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。別に指定されていなければMiB単位。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。 1.4.0
spark.shuffle.io.maxRetries 3 (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。 1.2.0
spark.shuffle.io.numConnectionsPerPeer 1 (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。 1.2.1
spark.shuffle.io.preferDirectBufs true (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 1.2.0
spark.shuffle.io.retryWait 5s (Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWaitとして計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。 1.2.1
spark.shuffle.io.backLog -1 シャッフルサービスの受け入れキューの長さ。大規模なアプリケーションの場合は、この値を増やす必要があるかもしれません。そうすると、サービスが短期間に多数の接続に対応できない場合に到着する接続を取りこぼさなくなります。これは、アプリケーションの外部にある可能性があるシャッフルサービス自身が実行されている全ての場所で設定される必要があります (以下の spark.shuffle.service.enabled オプションを見てください)。1未満に設定すると、Netty の io.netty.util.NetUtil#SOMAXCONN で定義される OS のデフォルトにフォールバックします。 1.1.1
spark.shuffle.io.connectionTimeout spark.network.timeoutの値 Timeout for the established connections between shuffle servers and clients to be marked as idled and closed if there are still outstanding fetch requests but no traffic no the channel for at least `connectionTimeout`. 1.2.0
spark.shuffle.service.enabled false 外部のシャッフルサービスを有効にします。This service preserves the shuffle files written by executors e.g. so that executors can be safely removed, or so that shuffle fetches can continue in the event of executor failure. 外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentationを見てください。 1.2.0
spark.shuffle.service.port 7337 外部のシャッフルサービスが実行されるだろうポート。 1.2.0
spark.shuffle.service.index.cache.size 100m 特定のメモリのフットプリントに制限されたキャッシュエントリのバイト。指定されない場合はバイト単位。 2.3.0
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE シャッフルサービス上で同時に転送されることができるチャンクの最大数。新しくやってくる接続は最大の数に達した時に閉じられるだろうことに注意してください。クライアントはシャッフル再試行の設定に応じて再試行するでしょう(spark.shuffle.io.maxRetriesspark.shuffle.io.retryWaitを見てください)。もしこれらの制限に達するとタスクは取得失敗で失敗するでしょう。 2.3.0
spark.shuffle.sort.bypassMergeThreshold 200 (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。 1.1.1
spark.shuffle.spill.compress true シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codecを使うでしょう。 0.9.0
spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 HighlyCompressedMapStatus内のシャッフルブロックのサイズである上のバイトの閾値は正確に記録されます。 これはシャッフルブロックを取り出す時にシャッフルブロックのサイズを低めに見積もることを避けることでOOMを避けるのに役立ちます。 2.2.1
spark.shuffle.registration.timeout 5000 外部シャッフルサービスへの登録のためのミリ秒のタイムアウト。 2.3.0
spark.shuffle.registration.maxAttempts 3 外部シャッフルサービスに登録を失敗した時、maxAttempts回まで再試行するでしょう。 2.3.0
spark.files.io.connectionTimeout spark.network.timeoutの値 Timeout for the established connections for fetching files in Spark RPC environments to be marked as idled and closed if there are still outstanding files being downloaded but no traffic no the channel for at least `connectionTimeout`. 1.6.0
spark.shuffle.checksum.enabled true シャッフルデータのチェックサムを計算するかどうか。有効な場合、Sparkはマップ出力ファイル内の各パーティションデータのチェックサム値を計算し、ディスク上のチェックサムファイルにその値を格納します。When there's shuffle data corruption detected, Spark will try to diagnose the cause (e.g., network issue, disk issue, etc.) of the corruption by using the checksum file. 3.2.0
spark.shuffle.checksum.algorithm ADLER32 シャッフルのチェックサムを計算するために使われるアルゴリズム。現在のところ、JDKの組み込みのアルゴリズムのみをサポートします。例えば、ADLER32、CRC32。 3.2.0

Spark UI

プロパティ名デフォルト意味これ以降のバージョンから
spark.eventLog.logBlockUpdates.enabled false spark.eventLog.enabled がtrueの時に、各ブロックの更新毎にイベントを記録するかどうか。*Warning*: これはイベントログのサイズをかなり増やすでしょう。 2.3.0
spark.eventLog.longForm.enabled false trueであれば、イベントログに長い形式の呼び出しを使います。そうでなければ短い形式を使います。 2.4.0
spark.eventLog.compress false もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。 1.0.0
spark.eventLog.compression.codec zstd ログイベントを圧縮するためのコーデック。デフォルトでは、Sparkは4つのコーディックを提供します: lz4, lzf, snappyおよび zstd。コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec および org.apache.spark.io.ZStdCompressionCodec 3.0.0
spark.eventLog.erasureCoding.enabled false Whether to allow event logs to use erasure coding, or turn erasure coding off, regardless of filesystem defaults. On HDFS, erasure coded files will not update as quickly as regular replicated files, so the application updates will take longer to appear in the History Server. Note that even if this is true, Spark will still not force the file to use erasure coding, it will simply use filesystem defaults. 3.0.0
spark.eventLog.dir file:///tmp/spark-events spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。 1.0.0
spark.eventLog.enabled false アプリケーションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。 1.0.0
spark.eventLog.overwrite false 既存のファイルを上書きするかどうか。 1.0.0
spark.eventLog.buffer.kb 100k 出力ストリームに書き込む時に使用するバッファサイズ。別に指定されていなければKiB単位。 1.0.0
spark.eventLog.rolling.enabled false ローリングオーバーイベントログファイルが有効かどうか。true に設定されると、各イベントログファイルを設定されたサイズに切り詰めます。 3.0.0
spark.eventLog.rolling.maxFileSize 128m spark.eventLog.rolling.enabled=trueの場合、イベントログファイルがロールオーバーされる最大サイズを指定します。 3.0.0
spark.ui.dagGraph.retainedRootRDDs Int.MaxValue ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのDAGグラフノードを記憶するか。 2.1.0
spark.ui.enabled true Sparkアプリケーションのためにweb UIを実行するかどうか。 1.1.1
spark.ui.killEnabled true ジョブとステージをweb UIからkillすることができます。 1.0.0
spark.ui.liveUpdate.period 100ms ライブ エンティティを更新する頻度。-1 はアプリケーションの再生時に "更新しない" 事を意味します。つまり最後の書き込みだけが起こるでしょう。稼働中のアプリケーションについては、着信タスクイベントを迅速に処理する場合に無くてもやり過ごせる幾つかの操作を回避します。 2.3.0
spark.ui.liveUpdate.minFlushPeriod 1s 古いUIデータがフラッシュされるまでの最小経過時間。着信タスクイベントが頻繁に発生しない場合にUIの陳腐化を回避します。 2.4.2
spark.ui.port 4040 メモリおよび作業データを表示する、アプリケーションのダッシュボードのポート。 0.7.0
spark.ui.retainedJobs 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 1.2.0
spark.ui.retainedStages 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 0.9.0
spark.ui.retainedTasks 100000 ガベージコレクティングの前にSparkUIおよびステータスAPIが1つのステージにどれだけのタスクを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 2.0.1
spark.ui.reverseProxy false ワーカーとアプリケーションUIのためにリバースプロキシとしてSparkマスターの実行を有効にします。このモードでは、Sparkマスターはホストへの直接アクセスを必要とせずにワーカーとアプリケーションをリバースプロキシするでしょう。注意して使ってください。ワーカーとアプリケーションUIは直接アクセスできないため、sparkのマスター/プロキシ public URLを使ってアクセスできるだけでしょう。この設定はクラスタ内の全てのワーカーとアプリケーションUIに影響し、全てのワーカー、ドライバーおよびマスターで設定されなければなりません。 2.1.0
spark.ui.reverseProxyUrl Spark UIが他のフロントエンドリバースプロキシを経由して提供されなければならない場合、そのリバースプロキシを経由してSparkマスターUIにアクセスするためのURLです。認証のためにプロキシを実行する場合に便利です。例えば、OAuthプロキシ。The URL may contain a path prefix, like http://mydomain.com/path/to/spark/, allowing you to serve the UI for multiple Spark clusters and other web applications through the same virtual host and port. 通常、これはスキーマ(http/https)、ホスト、ポートを含む、絶対URLでなければなりません。"/" で始まる相対URLを指定できます。In this case, all URLs generated by the Spark UI and Spark REST APIs will be server-relative links -- this will still work, as the entire Spark UI is served through the same host and port.”
The setting affects link generation in the Spark UI, but the front-end reverse proxy is responsible for
  • stripping a path prefix before forwarding the request,
  • rewriting redirects which point directly to the Spark master,
  • redirecting access from http://mydomain.com/path/to/spark to http://mydomain.com/path/to/spark/ (trailing slash after path prefix); otherwise relative links on the master page do not work correctly.
この設定はクラスタで実行中の全てのワーカーとアプリケーションUIに影響し、全てのワーカー、ドライバー、マスターで同じ設定をしなければなりません。spark.ui.reverseProxy がオンの場合のみ効果があります。Sparkマスターweb UIが直接到達可能な場合は、この設定は不要です。
2.1.0
spark.ui.proxyRedirectUri Spark がプロキシの背後で実行されている場合のリダイレクト先。これにより、Spark は Spark UI 自体のアドレスではなく、プロキシサーバを指すようにリダイレクト応答を修正します。これは、アプリケーションのプレフィックスパスを含まない、サーバのアドレスのみにする必要があります; プレフィックスは(X-Forwarded-Context リクエストヘッダを追加することで)プロキシサーバ自体、あるいは Spark アプリケーションの設定内のプロキシベースを設定することで、設定される必要があります。 3.0.0
spark.ui.showConsoleProgress false コンソール内に進捗バーを表示します。進捗バーは500msより長く実行しているステージの進捗を示します。複数のステージが同時に実行される場合は、複数の進捗バーが同じ行に表示されるでしょう。
注意: シェル環境では、spark.ui.showConsoleProgress のデフォルト値は true です。
1.2.1
spark.ui.custom.executor.log.url (none) Spark UI でクラスタマネージャーのアプリケーションログの URL を使う代わりに、外部のログサービスをサポートするための独自の spark executor ログの URL を指定します。Spark はクラスタマネージャーで変更できるパターンを介して、幾つかのパス変数をサポートします。クラスタマネージャーでどのパターンがサポートされるかを調べるには、ドキュメントがある場合はドキュメントを調べてください。

この設定はイベントログ内の元の url も置き換えることに注意してください。これは履歴サーバ上のアプリケーションにアクセスする場合にも有効です。新しいログの url は永続的である必要があります。そうしないと excecutor のログの url が無効になるかもしれません。

現在のところ、YARN モードのみがこの設定をサポートします。

3.0.0
spark.worker.ui.retainedExecutors 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。 1.5.0
spark.worker.ui.retainedDrivers 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。 1.5.0
spark.sql.ui.retainedExecutions 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。 1.5.0
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。 1.0.0
spark.ui.retainedDeadExecutors 100 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのdeadのexecutorを記憶するか。 2.0.0
spark.ui.filters None Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。
spark.<class name of filter>.param.<param name>=<value>の形式の構成エントリを設定することで、フィルターパラメータも設定内で指定することができます
例えば:
spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
1.0.0
spark.ui.requestHeaderSize 8k HTTPリクエストヘッダの最大許容サイズ。指定されない場合はバイト単位。この設定はSpark履歴サーバにも適用されます。 2.2.3
spark.ui.timeline.executors.maximum 250 イベントのタイムラインで表示されるexecutorの最大数。 3.2.0
spark.ui.timeline.jobs.maximum 500 イベントタイムラインで表示されるジョブの最大数。 3.2.0
spark.ui.timeline.stages.maximum 500 イベントのタイムラインで表示されるstageの最大数。 3.2.0
spark.ui.timeline.tasks.maximum 1000 イベントのタイムラインで表示されるtaskの最大数。 1.4.0

圧縮および直列化

プロパティ名デフォルト意味これ以降のバージョンから
spark.broadcast.compress true ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 0.6.0
spark.checkpoint.compress false RDD チェックポイントを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 2.2.0
spark.io.compression.codec lz4 RDDパーティション、イベントログ、ブロードキャスト変数 およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは4つのコーディックを提供します: lz4, lzf, snappyおよび zstd。コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec および org.apache.spark.io.ZStdCompressionCodec 0.8.0
spark.io.compression.lz4.blockSize 32k LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。特記の無い限り、デフォルトの単位はバイト。 1.4.0
spark.io.compression.snappy.blockSize 32k Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。特記の無い限り、デフォルトの単位はバイト。 1.4.0
spark.io.compression.zstd.level 1 Zstd圧縮コーディックの圧縮レベル。圧縮レベルを増やすとCPUとメモリの消費を犠牲にして結果がより良い圧縮となるでしょう。 2.3.0
spark.io.compression.zstd.bufferSize 32k Zstd圧縮コーディックが使用される場合、Zstd圧縮で使用されるブロックサイズのバイト数。このサイズを小さくするとZstdが使われる時のシャッフルメモリ率が低くなりますが、過度のJNI呼び出しのオーバーヘッドにより圧縮コストが増えるかもしれません。 2.3.0
spark.kryo.classesToRegister (none) Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。 1.2.0
spark.kryo.referenceTracking true Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。 0.8.0
spark.kryo.registrationRequired false Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。 1.1.0
spark.kryo.registrator (none) Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにカンマ区切りのクラスのリストを渡してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。 KryoRegistratorを拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。 0.5.0
spark.kryo.unsafe false 安全では無いKryoベースのシリアライザを使うかどうか。安全では無いIOベースを使うことで実質的に速くすることができます。 2.1.0
spark.kryoserializer.buffer.max 64m kryoのシリアライズバッファの最大許容サイズ。別に指定されていなければMiB単位。これはシリアライズ化しようとしているどのオブジェクトよりも大きくなければならず、2048mより小さくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。 1.4.0
spark.kryoserializer.buffer 64k Kryoのシリアライズ化バッファの初期サイズ。別に指定されていなければKiB単位。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。 1.4.0
spark.rdd.compress false シリアライズされたパーティションを圧縮するかどうか(例えば、JavaとScalaでのStorageLevel.MEMORY_ONLY_SER、あるいは PythonでのStorageLevel.MEMORY_ONLY)。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。圧縮はspark.io.compression.codecを使うでしょう。 0.6.0
spark.serializer org.apache.spark.serializer.
JavaSerializer
ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializerのどのようなサブクラスにもなることができます。 0.5.0
spark.serializer.objectStreamReset 100 org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。 1.0.0

メモリ管理

プロパティ名デフォルト意味これ以降のバージョンから
spark.memory.fraction 0.6 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。この設定の目的は内部メタデータ、ユーザデータ構造、およびまばらな場合の不正確なサイズの見積もりのためにメモリを取って置くことで、非常に大きなレコードです。これはデフォルトの値にしておくことをお勧めします。この値を増加した場合の正しいJVMガベージコレクションの調整についての重要な情報を含む詳細は、この説明を見てください。 1.6.0
spark.memory.storageFraction 0.5 立ち退きに耐性のあるストレージメモリの量。spark.memory.fractionによって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。 1.6.0
spark.memory.offHeap.enabled false trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。 1.6.0
spark.memory.offHeap.size 0 オフヒープ割り当てのために使うことができる絶対メモリ量。特記が無い限りバイト単位。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=trueの場合は、これは正の値に設定されなければなりません。 1.6.0
spark.storage.replication.proactive false RDDブロックのために積極的なブロックリプリケーションを有効にする。executorの障害により失われたキャッシュされたRDDのブロックレプリカは、既存の利用可能なレプリカがある限り補充されます。これはブロックのレプリケーション レベルを初期の数にしようとします。 2.2.0
spark.cleaner.periodicGC.interval 30min ガベージコレクションどれだけの頻度で起動するかを制御します。

このコンテキストクリーナーは弱い参照がガベージコレクトされた時のみ掃除されます。ドライバ上でほとんどメモリの圧迫が無いような大きなドライバJVMを使った長く実行中のアプリケーション内では、これは極めて稀か全く起きないかもしれません。全く掃除しないことはしばらくした後でexecutorがディスク空間外で実行することに繋がるかもしれません。
1.6.0
spark.cleaner.referenceTracking true コンテキストの掃除を有効または無効にします。 1.0.0
spark.cleaner.referenceTracking.blocking true クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します (シャッフル以外、これはspark.cleaner.referenceTracking.blocking.shuffle Sparkプロパティによって制御されます)。 1.0.0
spark.cleaner.referenceTracking.blocking.shuffle false シャッフル クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します。 1.1.1
spark.cleaner.referenceTracking.cleanCheckpoints false 参照がスコープ外の場合にチェックポイントファイルを掃除するかどうかを制御します。 1.4.0

Executionの挙動

プロパティ名デフォルト意味これ以降のバージョンから
spark.broadcast.blockSize 4m TorrentBroadcastFactoryのためのブロックの各断片のサイズ。別に指定されていなければKiB単位。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManagerはパフォーマンスの打撃を受けるかも知れません。 0.5.0
spark.broadcast.checksum true ブロードキャストのためにチェックサムを有効にするかどうか。有効にされた場合、ブロードキャストはチェックサムを含むでしょう。これは少し多くのデータの計算と送信という代償を払って、間違ったブロックの検知を手助けすることができます。ネットワークがブロードキャスト中にデータの間違いが起きない保証をするための他の機構を持つ場合、無効にすることができます。 2.1.1
spark.executor.cores YARNモードの場合は1、スタンドアローンモードとMesos coarse-grainedモードの場合はワーカー上の全ての利用可能なコア。 各executor上で使用されるコアの数。スタンドアローンおよびMesosの coarse-grained モードでの詳細については、this descriptionを見てください。 1.0.0
spark.default.parallelism reduceByKey および joinのような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelizeのような操作については、クラスタマネージャーに依存します:
  • ローカルモード: ローカルマシーン上のコアの数
  • Mesos fine grained mode: 8
  • その他: 全てのexecutorノードのコアの総数あるいは2のどちらか大きいほう
ユーザによって設定されなかった場合は、 join, reduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。 0.5.0
spark.executor.heartbeatInterval 10s ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。spark.executor.heartbeatInterval は spark.network.timeout よりかなり小さくなければなりません。 1.1.0
spark.files.fetchTimeout 60s ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。 1.0.0
spark.files.useFetchCache true true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。 1.2.2
spark.files.overwrite false ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。 1.0.0
spark.files.maxPartitionBytes 134217728 (128 MiB) ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。 2.1.0
spark.files.openCostInBytes 4194304 (4 MiB) ファイルを開くための予測コストは同じ時間で操作することができるバイト数によって計測することができます。これは複数のファイルを1つのパーティションに配置する場合に使われます。過剰に予測するほうが良いです。そうすれば、小さなファイルを持つパーティションは大きなファイルを持つパーティションよりも高速になるでしょう。 2.1.0
spark.hadoop.cloneConf false trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。 1.0.3
spark.hadoop.validateOutputSpecs true trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。 1.0.1
spark.storage.memoryMapThreshold 2m ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズ。特記の無い限り、デフォルトの単位はバイト。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。 0.9.2
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 ファイル出力コミッタのアルゴリズムのバージョン。有効なアルゴリズムのバージョン番号: 1 あるいは 2。2 は MAPREDUCE-7282 のような正確さの問題を引き起こすかもしれないことに注意してください。 2.2.0

Executor のメトリクス

プロパティ名デフォルト意味これ以降のバージョンから
spark.eventLog.logStageExecutorMetrics false (各 executor について) executor メトリクスのステージごとのピークをイベントログに書き込むかどうか。
注意: メトリクスは executor の heatbeat でポーリング(収集)され、送信されます。これは常に行われます; この設定は、集約されたメトリクスのピークがイベントログに書き込まれるかどうかを決定するためだけのものです。
3.0.0
spark.executor.processTreeMetrics.enabled false executor のメトリクスを収集する時に、(/proc ファイルシステムから)プロセスのツリーメトリクスを収集するかどうか。
注意: プロセスツリーメトリクスは /proc ファイルシステムが存在する場合のみ収集されます。
3.0.0
spark.executor.metrics.pollingInterval 0 executor メトリクスを収集する頻度 (ミリ秒)
0 の場合、ポーリングは executor heatbeat で実行されます (したがって、spark.executor.heartbeatInterval で指定された heartbeat 間隔です)。正の場合、ポーリングはこの間隔で行われます。
3.0.0

ネットワーク

プロパティ名デフォルト意味これ以降のバージョンから
spark.rpc.message.maxSize 128 "control plane"通信内で許される最大のメッセージ(MiB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。RPCメッセージに関するメッセージが表示されるでしょう。 2.0.0
spark.blockManager.port (random) 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。 1.1.0
spark.driver.blockManager.port (spark.blockManager.port の値) ブロックマネージャーがlistenするドライバー固有のポート。executorとして同じ設定を使うことができない場合。 2.1.0
spark.driver.bindAddress (spark.driver.host の値) listenソケットをバインドするホスト名またはIPアドレス。この設定は SPARK_LOCAL_IP 環境変数(以下を見てください)を上書きします。
executorあるいは外部のシステムに知らせるためにローカルのものから異なるアドレスにすることもできます。これは例えばブリッジネットワークのコンテナを動かしている場合に便利です。これが正しく動作するためには、ドライバー(RPC, ブロックマネージャー および UI)によって使われる異なるポートがコンテナのホストからフォワードされる必要があります。
2.1.0
spark.driver.host (ローカル ホスト名) ドライバーのホスト名とIPアドレスexecutorとスタンドアローンマスターが通信するために使われます。 0.7.0
spark.driver.port (random) ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。 0.7.0
spark.rpc.io.backLog 64 RPC サーバの受け入れキューの長さ。大規模なアプリケーションの場合は、この値を増やす必要があるかもしれません。そうすると、短期間に多数の接続が来た時に到着する接続を取りこぼさなくなります。 3.0.0
spark.network.timeout 120s 全てのネットワークの相互交流のタイムアウト。この設定は、spark.storage.blockManagerHeartbeatTimeoutMsspark.shuffle.io.connectionTimeoutspark.rpc.askTimeoutspark.rpc.lookupTimeout が設定されていない場合に、それらの代わりに使われるでしょう。 1.3.0
spark.network.io.preferDirectBufs true 有効にされた場合、オフヒープバッファの割り当ては共有アロケータによって選ばれます。オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 3.0.0
spark.port.maxRetries 16 ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。 1.1.1
spark.rpc.numRetries 3 RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。 1.4.0
spark.rpc.retry.wait 3s RPCのask操作が再試行するまで待つ期間。 1.4.0
spark.rpc.askTimeout spark.network.timeout RPCのask操作がタイムアウトするまで待つ期間。 1.4.0
spark.rpc.lookupTimeout 120s RPCリモートエンドポイントがタイムアウトするまで待つ時間。 1.4.0
spark.network.maxRemoteBlockSizeFetchToMem 200m ブロックのサイズがこの閾値のバイトよりも上の場合、リモートのブロックはディスクに取りに行くでしょう。これは巨大なリクエストがあまりに大きなメモリを要求することを避けるためのものです。この設定はシャッフルの取得およびブロックマネージャーのリモートブロックの取得の両方に影響するだろうことに注意してください。外部シャッフルサービスを有効にしたユーザについては、この機能は外部シャッフルサービスが少なくとも 2.3.0 である場合のみ動作するかもしれません。 3.0.0
spark.rpc.io.connectionTimeout spark.network.timeoutの値 Timeout for the established connections between RPC peers to be marked as idled and closed if there are outstanding RPC requests but no traffic on the channel for at least `connectionTimeout`. 1.2.0

スケジューリング

プロパティ名デフォルト意味これ以降のバージョンから
spark.cores.max (not set) <c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores、Mesos上では無制限(利用可能な全てのコア)になります。 0.6.0
spark.locality.wait 3s データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.nodeなどを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。 0.5.0
spark.locality.wait.node spark.locality.wait ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。 0.8.0
spark.locality.wait.process spark.locality.wait プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。 0.8.0
spark.locality.wait.rack spark.locality.wait rack局地性を待つための局地性をカスタマイズします。 0.8.0
spark.scheduler.maxRegisteredResourcesWaitingTime 30s スケジュールが始まる前にリソースが登録するための最大待ち時間。 1.1.1
spark.scheduler.minRegisteredResourcesRatio KUBERNETES modeでは 0.8; YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。 スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードとKubernetesモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTimeによって設定されます。 1.1.1
spark.scheduler.mode FIFO 同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。 0.8.0
spark.scheduler.revive.interval 1s スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。 0.8.1
spark.scheduler.listenerbus.eventqueue.capacity 10000 イベントキューのデフォルトの容量。Spark は最初に `spark.scheduler.listenerbus.eventqueue.queueName.capacity` で指定された容量を使ってイベントキューを初期化しようとします。設定されていない場合、Spark はこの設定で指定されたデフォルトの容量を使います。容量は 0 より大きい必要があることに注意してください。listenerのイベントが零れる場合は、値を増やすことを考えてください (例えば 20000)。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 2.3.0
spark.scheduler.listenerbus.eventqueue.shared.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの共有イベントキューの容量。これは listener バス に登録する外部 listener(s) のイベントを保持します。共有キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.appStatus.capacity spark.scheduler.listenerbus.eventqueue.capacity sppStatus イベントキューの容量。これは内部アプリケーションステータス listener のイベントを保持します。appStatus キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの executorManagement イベントキューの容量。これは内部 executor management listener のイベントを保持します。executorManagement キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.eventLog.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの eventLog キューの容量。これは eventLogs にイベントを書き込む Event logging listener のイベントを保持します。eventLog キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.streams.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの streams キューの容量。これは内部 streaming listener のイベントを保持します。streams キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.resource.profileMergeConflicts false If set to "true", Spark will merge ResourceProfiles when different profiles are specified in RDDs that get combined into a single stage. マージされた場合、Sparkはそれぞれのリソースの最大のものを選択し、新しいResourceProfileを生成します。The default of false results in Spark throwing an exception if multiple different ResourceProfiles are found in RDDs going into the same stage. 3.1.0
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout 120s The timeout in seconds to wait to acquire a new executor and schedule a task before aborting a TaskSet which is unschedulable because all executors are excluded due to task failures. 2.4.1
spark.excludeOnFailure.enabled false "true"に設定すると、あまりに多くのタスクの失敗によって除外されたexecutor上でSparkがタスクをスケジュールすることを防ぎます。executorとノードを除外するために使われるアルゴリズムは、他の"spark.excludeOnFailure"設定オプションによって更に制御することができます。 2.1.0
spark.excludeOnFailure.timeout 1h (実験的) ノードあるいはexecutorが新しいタスクを実行しようとして除外リストから無条件に削除される前に、どれくらい長くノードあるいはexecutorがアプリケーション全体で除外されるか。 2.1.0
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor 1 (実験的) 指定されたタスクについて、1つのexecutorがそのタスクが除外される前にそのexecutor上でどれだけ再試行できるか。 2.1.0
spark.excludeOnFailure.task.maxTaskAttemptsPerNode 2 (実験的) 指定されたタスクについて、1つのノードがそのタスクが除外される前にそのノード上でどれだけ再試行できるか。 2.1.0
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor 2 (実験的) executorが1つのステージについて除外される前に、1つのexecutor上のそのステージ内でどれだけの異なるタスクを失敗しなければならないか。 2.1.0
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode 2 (実験的) ノード全体が指定されたステージについて失敗だとマークされる前に、どれだけの異なるexecutorがそのステージについて除外されたとマークされるか。 2.1.0
spark.excludeOnFailure.application.maxFailedTasksPerExecutor 2 (実験的) executorがアプリケーション全体で除外される前に、連続するタスクのセットの中でどれだけの異なるタスクが1つのeecutor上で失敗しなければならないか。除外されたexecutorはspark.excludeOnFailure.timeoutで指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、executorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。 2.2.0
spark.excludeOnFailure.application.maxFailedExecutorsPerNode 2 (実験的) ノードがアプリケーション全体で除外される前に、どれだけの異なるexecutorがアプリケーション全体で除外されなければならないか。除外されたノードはspark.excludeOnFailure.timeoutで指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、ノード上のexecutorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。 2.2.0
spark.excludeOnFailure.killExcludedExecutors false (実験的) "true" に設定すると、spark.killExcludedExecutors.application.* によって制御されるため取得時の障害あるいはアプリケーション全体で除外される時にSparkが自動的にexecutorをkillすることができます。ノード全体が除外に追加された場合、ノード上の全てのexecutorはkillされるだろうことに注意してください。 2.2.0
spark.excludeOnFailure.application.fetchFailure.enabled false (実験的) もし "true" に設定すると、取得の失敗が起きた場合にSparkはexecutorをすぐに除外するでしょう。もし外部シャッフルサービスが有効な場合、ノード全体が除外されるでしょう。 2.3.0
spark.speculation false "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。 0.6.0
spark.speculation.interval 100ms どれだけの頻度でSparkが投機するためにタスクをチェックするか。 0.6.0
spark.speculation.multiplier 1.5 平均より遅いタスクが何回投機と見なされるか。 0.6.0
spark.speculation.quantile 0.75 指定のステージで投機が有効になる前にどれだけのタスクの割合が終了していなければならないか。 0.6.0
spark.speculation.minTaskRuntime 100ms Minimum amount of time a task runs before being considered for speculation. This can be used to avoid launching speculative copies of tasks that are very short. 3.2.0
spark.speculation.task.duration.threshold None スケジューラがタスクを投機的に実行しようとするまでのタスク期間。指定した場合、現在のステージに含まれるタスクが1つの executor のスロット数以下であり、タスクが閾値よりも時間が掛かる場合に、タスクが投機的に実行されます。この設定は非常に少ないタスクでステージを推測するのに役立ちます。executor のスロットが十分に大きい場合、通常の投機設定も適用されるかもしれません。例えば、閾値に達していないが十分に実行が成功している場合は、タスクが再起動されるかもしれません。The number of slots is computed based on the conf values of spark.executor.cores and spark.task.cpus minimum 1. 特記の無い限り、デフォルトの単位はバイト。 3.0.0
spark.task.cpus 1 各タスクごとに割り当てるコアの数。 0.5.0
spark.task.resource.{resourceName}.amount 1 各タスクに割り当てる特定のリソースタイプの量。これは倍になるかもしれないことに注意してください。これが指定された場合、executor の設定 spark.executor.resource.{resourceName}.amount および対応するディスカバリ設定も提供して、executor がそのリソースタイプで作成されるようにする必要があります。In addition to whole amounts, a fractional amount (for example, 0.25, which means 1/4th of a resource) may be specified. Fractional amounts must be less than or equal to 0.5, or in other words, the minimum amount of resource sharing is 2 tasks per resource. Additionally, fractional amounts are floored in order to assign resource slots (e.g. a 0.2222 configuration, or 1/0.2222 slots will become 4 tasks/resource, not 5). 3.0.0
spark.task.maxFailures 4 ジョブを諦める前のタスクの失敗の数。異なるタスクに渡って広がっている失敗の総数はジョブを失敗させないでしょう; 特定のタスクがこの試行の数を失敗しなければなりません。1以上でなければなりません。許可された再試行の数 = この値 - 1. 0.8.0
spark.task.reaper.enabled false killed / interrupted タスクの監視を有効にする。trueに設定した場合、killされた全てのタスクはタスクが実際に実行を完了するまでexecutorによって監視されるでしょう。この監視の正確な挙動を制御する方法についての詳細は、他のspark.task.reaper.*設定を見てください。false (デフォルト)に設定すると、タスクのkill はそのような監視が欠けている古いコードを使うでしょう。 2.0.3
spark.task.reaper.pollingInterval 10s spark.task.reaper.enabled = trueの時、この設定はexecutorがkillされたタスクの状態をpollする頻度を制御します。pollされた時にkillされたタスクがまだ実行中の場合は、警告が記録され、デフォルトではタスクのthread-dumpが記録されるでしょう(この thread dump は spark.task.reaper.threadDump 設定によって無効にすることができます。これは以下で説明されます)。 2.0.3
spark.task.reaper.threadDump true spark.task.reaper.enabled = trueの場合、この設定はkillされたタスクの定期的なpollの際にタスクの thread dumps が記録されるかどうかを制御します。thread dumpの収集を無効にするには、これをfalseに設定します。 2.0.3
spark.task.reaper.killTimeout -1 spark.task.reaper.enabled = trueの場合、この設定は、killされたタスクが実行を停止していない場合にexecutor JVMがそれ自身をkillするだろうタイムアウトを指定します。 デフォルト値 -1 は、この機構を無効にし、executorが自己破壊をすることを避けます。この設定の目的は、キャンセル不可能なタスクの暴走がexecutorを不安定にすることを避けるための安全策として振舞うことです。 2.0.3
spark.stage.maxConsecutiveAttempts 4 ステージが中止されるまでに許される連続するステージの試行の数。 2.2.0

バリア実行モード

プロパティ名デフォルト意味これ以降のバージョンから
spark.barrier.sync.timeout 365d barrier タスクからの各 barrier() 呼び出しのタイムアウト。coordinator が設定された時間内に barrier タスクから全ての同期メッセージを受信しなかった場合は、全てのタスクを失敗させるために SparkException を投げます。デフォルト値は 31536000(3600 * 24 * 365) に設定されているため、barrier() 呼び出しは1年間待つ必要があります。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.interval 15s 最大同時タスクチェックの失敗から次のチェックまで待つ秒数。最大同時チェックにより、クラスタは送信されたジョブの barrier ステージで必要とされるより多くの同時タスクを起動できることが保証されます。クラスタが起動したばかりで十分な executor が登録されていない場合、チェックは失敗する可能性があるため、しばらく待ってからもう一度チェックを実行します。チェックがジョブに対して設定された最大失敗回数より多く失敗した場合、現在のジョブの送信が失敗します。この設定は1つ以上の barrier ステージを含むジョブにのみ適用されることに注意してください。barrier 以外のジョブではチェックを実行しません。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures 40 ジョブの送信が失敗するまでに許可される最大同時タスクチェックの失敗の数。最大同時チェックにより、クラスタは送信されたジョブの barrier ステージで必要とされるより多くの同時タスクを起動できることが保証されます。クラスタが起動したばかりで十分な executor が登録されていない場合、チェックは失敗する可能性があるため、しばらく待ってからもう一度チェックを実行します。チェックがジョブに対して設定された最大失敗回数より多く失敗した場合、現在のジョブの送信が失敗します。この設定は1つ以上の barrier ステージを含むジョブにのみ適用されることに注意してください。barrier 以外のジョブではチェックを実行しません。 2.4.0

動的割り当て

プロパティ名デフォルト意味これ以降のバージョンから
spark.dynamicAllocation.enabled false 動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。詳細は ここの説明を見てください。

これは spark.shuffle.service.enabled あるいは spark.dynamicAllocation.shuffleTracking.enabled の設定が必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio
1.2.0
spark.dynamicAllocation.executorIdleTimeout 60s 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 1.2.0
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 1.4.0
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 動的割り当てが有効な場合、実行するexecutorの初期の数。

もし `--num-executors` (あるいは `spark.executor.instances`) が設定され、この値よりも大きい場合は、executorの初期の数として使われるでしょう。
1.3.0
spark.dynamicAllocation.maxExecutors infinity 動的割り当てが有効な場合のexecutorの数の上限。 1.2.0
spark.dynamicAllocation.minExecutors 0 動的割り当てが有効な場合のexecutorの数の下限。 1.2.0
spark.dynamicAllocation.executorAllocationRatio 1 デフォルトで、動的な割り当ては処理するタスクの数に応じて並行度を最大にするのに十分なexecutorを要求するでしょう。これはジョブのレイテンシを最小化しますが、いくつかのexecutorは何も仕事をしないかもしれないため、この設定を持つ小さなタスクはexecutorの割り当てのオーバーヘッドにより多くのリソースを浪費するかもしれません。この設定によりexecutorの数を減らすために使われる割合を設定することができます。完全な並行度に関して。最大の並行度を与えるために、デフォルトは1.0です。0.5 はexecutorの目標値を2で割るでしょう。dynamicAllocationによって計算されるexecutorの目標値は spark.dynamicAllocation.minExecutorsspark.dynamicAllocation.maxExecutors の設定によって上書きすることもできます。 2.4.0
spark.dynamicAllocation.schedulerBacklogTimeout 1s もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。 1.2.0
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout spark.dynamicAllocation.schedulerBacklogTimeoutと同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は ここの説明を見てください。 1.2.0
spark.dynamicAllocation.shuffleTracking.enabled false 実験的。executor のシャッフルファイルトラッキングを有効にします。これにより、外部シャッフルサービスを必要とせずに動的な割り当てが可能になります。このオプションはアクティブなジョブのシャッフルデータを格納している executor を保とうとします。 3.0.0
spark.dynamicAllocation.shuffleTracking.timeout infinity シャッフルトラッキングが有効な場合、シャッフルデータを保持している executor のタイムアウトを制御します。The default value means that Spark will rely on the shuffles being garbage collected to be able to release executors. 何らかの理由でガベージコレクションがシャッフルを十分に早くクリーンアップしない場合、このオプションを使って、シャッフルデータを格納している executor をシャッフルデータを格納している時であってもタイムアウトするタイミングを制御することができます。 3.0.0

スレッドの設定

ジョブとクラスタの設定に応じて、使用可能なリソースを使ってパフォーマンスを向上させるために、Spark の幾つかの場所にスレッドを設定することができます。Spark 3.0 より前では、これらのスレッド設定はドライバ、executor 、ワーカー、マスターなど、Spark の全てのロールに適用されます。Spark 3.0 から、ドライバと executor から始めて、より細かい粒度でスレッドを設定することができます。以下の表の例では、RPC モジュールを取り上げます。シャッフルなどの他のモジュールの場合、RPC モジュール専用の spark.{driver|executor}.rpc.netty.dispatcher.numThreads を除いて、プロパティ名の “rpc” を “shuffle” に置き換えるだけです。

プロパティ名デフォルト意味これ以降のバージョンから
spark.{driver|executor}.rpc.io.serverThreads Fall back on spark.rpc.io.serverThreads サーバスレッドプールで使われるスレッド数 1.6.0
spark.{driver|executor}.rpc.io.clientThreads Fall back on spark.rpc.io.clientThreads クライアントスレッドプールで使われるスレッドの数 1.6.0
spark.{driver|executor}.rpc.netty.dispatcher.numThreads Fall back on spark.rpc.netty.dispatcher.numThreads RPCメッセージディスパッチャースレッドプールで使われるスレッドの数 3.0.0

スレッド関連の設定キーの数のデフォルト値は、ドライバまたは executor に要求されたコアの数の最小値、またはその値が無い場合は、JVM に利用可能なコアの数 (ハードコードされた上限は 8)。

セキュリティ

異なるSparkサブシステムを安全にする方法について利用可能なオプションについては、Security ページを参照してください。

Spark SQL

Runtime SQL 設定

ランタイム SQL 設定は、セッションごとの変更可能な Spark SQL 設定です。設定ファイルと、--conf/-c が前に付いたコマンドラインオプション、または SparkSession を作成するために使われる SparkConf を設定することで、初期値を設定することができます。また、SET コマンドで設定とクエリを実行し、RESET コマンドあるいは実行時の SparkSession.conf のセッターとゲッターで初期値に戻すことができます。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.adaptive.advisoryPartitionSizeInBytes (spark.sql.adaptive.shuffle.targetPostShuffleInputSize の値)

適応最適化中のシャッフルパーティションの推奨サイズのバイト数 (spark.sql.adaptive.enabled が true の場合)。Sparkが小さなシャッフルパーティションを結合するか、スキューされたパーティションを分割する時に有効になります。

3.0.0
spark.sql.adaptive.autoBroadcastJoinThreshold (none)

joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。デフォルトの値はspark.sql.autoBroadcastJoinThresholdと同じです。これは適応フレームワークでのみ使われることに注意してください。

3.2.0
spark.sql.adaptive.coalescePartitions.enabled true

これが true で、'spark.sql.adaptive.enabled' が true の場合、Spark はターゲットサイズ('spark.sql.adaptive.advisoryPartitionSizeInBytes' で指定される)に従って連続するシャッフルパーティションを合体させ、小さなタスクが多すぎないようにします。

3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (none)

合体する前のシャッフルパーティションの初期数。設定されない場合は、spark.sql.shuffle.partitionsと同じです。この設定は、'spark.sql.adaptive.enabled' と 'spark.sql.adaptive.coalescePartitions.enabled' の両方が true の場合にのみ効果があります。

3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB

合体後のシャッフルパーティションの最小サイズ。This is useful when the adaptively calculated target size is too small during partition coalescing.

3.2.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true

When true, Spark does not respect the target size specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes' (default 64MB) when coalescing contiguous shuffle partitions, but adaptively calculate the target size according to the default parallelism of the Spark cluster. The calculated size is usually smaller than the configured target size. This is to maximize the parallelism and avoid performance regression when enabling adaptive query execution. It's recommended to set this config to false and respect the configured target size.

3.2.0
spark.sql.adaptive.customCostEvaluatorClass (none)

The custom cost evaluator class to be used for adaptive execution. If not being set, Spark will use its own SimpleCostEvaluator by default.

3.2.0
spark.sql.adaptive.enabled true

true の場合、アダプティブクエリ実行を有効にします。これにより、正確な実行時統計に基づいて、クエリ実行の途中でクエリプランが再最適化されます。

1.6.0
spark.sql.adaptive.localShuffleReader.enabled true

When true and 'spark.sql.adaptive.enabled' is true, Spark tries to use local shuffle reader to read the shuffle data when the shuffle partitioning is not needed, for example, after converting sort-merge join to broadcast-hash join.

3.0.0
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0b

ローカルハッシュマップをビルドできるパーティションあたりの最大サイズをバイト単位で設定します。If this value is not smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes and all the partition size are not larger than this config, join selection prefer to use shuffled hash join instead of sort merge join regardless of the value of spark.sql.join.preferSortMergeJoin.

3.2.0
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true

When true and 'spark.sql.adaptive.enabled' is true, Spark will optimize the skewed shuffle partitions in RebalancePartitions and split them to smaller ones according to the target size (specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes'), to avoid data skew.

3.2.0
spark.sql.adaptive.optimizer.excludedRules (none)

Configures a list of rules to be disabled in the adaptive optimizer, in which the rules are specified by their rule names and separated by comma. The optimizer will log the rules that have indeed been excluded.

3.1.0
spark.sql.adaptive.skewJoin.enabled true

When true and 'spark.sql.adaptive.enabled' is true, Spark dynamically handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and replicating if needed) skewed partitions.

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5

A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes'

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB

A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' multiplying the median partition size. 理想的には、この設定は'spark.sql.adaptive.advisoryPartitionSizeInBytes'より大きく設定されなければなりません。

3.0.0
spark.sql.ansi.enabled false

When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. For example, Spark will throw an exception at runtime instead of returning null results when the inputs to a SQL operator/function are invalid.For full details of this dialect, you can find them in the section "ANSI Compliance" of Spark's documentation. Some ANSI dialect features may be not from the ANSI SQL standard directly, but their behaviors align with ANSI SQL's style

3.0.0
spark.sql.autoBroadcastJoinThreshold 10MB

joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run, and file-based data source tables where the statistics are computed directly on the files of data.

1.1.0
spark.sql.avro.compression.codec snappy

AVROファイルの書き込みで使われる圧縮コーディック。サポートされるコーディック: uncompressed, deflate, snappy, bzip2, xz および zstandard。デフォルトのコーディックは snappy です。

2.4.0
spark.sql.avro.deflate.level -1

AVROファイルの書き込みで使われるdeflateコーディック圧縮レベル。有効な値は-1または1から10の範囲でなければなりません。デフォルト値は-1で、現在の実装での6レベルに対応します。

2.4.0
spark.sql.avro.filterPushdown.enabled true

When true, enable filter pushdown to Avro datasource.

3.1.0
spark.sql.broadcastTimeout 300

ブロードキャストjoinでのブロードキャスト待ち時間のタイムアウト秒数。

1.3.0
spark.sql.bucketing.coalesceBucketsInJoin.enabled false

When true, if two bucketed tables with the different number of buckets are joined, the side with a bigger number of buckets will be coalesced to have the same number of buckets as the other side. Bigger number of buckets is divisible by the smaller number of buckets. Bucket coalescing is applied to sort-merge joins and shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling in join, but it also reduces parallelism and could possibly cause OOM for shuffled hash join.

3.1.0
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio 4

The ratio of the number of two buckets being coalesced should be less than or equal to this value for bucket coalescing to be applied. This configuration only has an effect when 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' is set to true.

3.1.0
spark.sql.catalog.spark_catalog (none)

A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.

3.0.0
spark.sql.cbo.enabled false

Enables CBO for estimation of plan statistics when set true.

2.2.0
spark.sql.cbo.joinReorder.dp.star.filter false

Applies star-join filter heuristics to cost based join enumeration.

2.2.0
spark.sql.cbo.joinReorder.dp.threshold 12

The maximum number of joined nodes allowed in the dynamic programming algorithm.

2.2.0
spark.sql.cbo.joinReorder.enabled false

Enables join reorder in CBO.

2.2.0
spark.sql.cbo.planStats.enabled false

When true, the logical plan will fetch row counts and column statistics from catalog.

3.0.0
spark.sql.cbo.starSchemaDetection false

When true, it enables join reordering based on star schema detection.

2.2.0
spark.sql.cli.print.header false

When set to true, spark-sql CLI prints the names of the columns in query output.

3.2.0
spark.sql.columnNameOfCorruptRecord _corrupt_record

The name of internal column for storing raw/un-parsed JSON and CSV records that fail to parse.

1.2.0
spark.sql.csv.filterPushdown.enabled true

When true, enable filter pushdown to CSV datasource.

3.0.0
spark.sql.datetime.java8API.enabled false

If the configuration property is set to true, java.time.Instant and java.time.LocalDate classes of Java 8 API are used as external types for Catalyst's TimestampType and DateType. If it is set to false, java.sql.Timestamp and java.sql.Date are used for the same purpose.

3.0.0
spark.sql.debug.maxToStringFields 25

Maximum number of fields of sequence-like entries can be converted to strings in debug output. Any elements beyond the limit will be dropped and replaced by a "... N more fields" placeholder.

3.0.0
spark.sql.defaultCatalog spark_catalog

デフォルトのカタログの名前。This will be the current catalog if users have not explicitly set the current catalog yet.

3.0.0
spark.sql.execution.arrow.enabled false

(Spark 3.0から非推奨です。'spark.sql.execution.arrow.pyspark.enabled'を設定してください。)

2.3.0
spark.sql.execution.arrow.fallback.enabled true

(Spark 3.0から非推奨です。'spark.sql.execution.arrow.pyspark.fallback.enabled'を設定してください。)

2.4.0
spark.sql.execution.arrow.maxRecordsPerBatch 10000

When using Apache Arrow, limit the maximum number of records that can be written to a single ArrowRecordBatch in memory. ゼロまたは負の値に設定された場合、制限はありません。

2.3.0
spark.sql.execution.arrow.pyspark.enabled (spark.sql.execution.arrow.enabledの値)

When true, make use of Apache Arrow for columnar data transfers in PySpark. This optimization applies to: 1. pyspark.sql.DataFrame.toPandas 2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame The following data types are unsupported: ArrayType of TimestampType, and nested StructType.

3.0.0
spark.sql.execution.arrow.pyspark.fallback.enabled (spark.sql.execution.arrow.fallback.enabledの値)

When true, optimizations enabled by 'spark.sql.execution.arrow.pyspark.enabled' will fallback automatically to non-optimized implementations if an error occurs.

3.0.0
spark.sql.execution.arrow.pyspark.selfDestruct.enabled false

(Experimental) When true, make use of Apache Arrow's self-destruct and split-blocks options for columnar data transfers in PySpark, when converting from Arrow to Pandas. これは、いくらかのCPU時間を使ってメモリの使用を減らします。This optimization applies to: pyspark.sql.DataFrame.toPandas when 'spark.sql.execution.arrow.pyspark.enabled' is set.

3.2.0
spark.sql.execution.arrow.sparkr.enabled false

When true, make use of Apache Arrow for columnar data transfers in SparkR. This optimization applies to: 1. createDataFrame when its input is an R DataFrame 2. collect 3. dapply 4. gapply The following data types are unsupported: FloatType, BinaryType, ArrayType, StructType and MapType.

3.0.0
spark.sql.execution.pandas.udf.buffer.size (spark.buffer.sizeの値)

spark.buffer.sizeと同じですが、Pandas UDF 実行のみに適用されます。If it is not set, the fallback is spark.buffer.size. Note that Pandas execution requires more than 4 bytes. Lowering this value could make small Pandas UDF batch iterated and pipelined; however, it might degrade performance. SPARK-27870 を見てください。

3.0.0
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled true

When true, the traceback from Python UDFs is simplified. It hides the Python worker, (de)serialization, etc from PySpark in tracebacks, and only shows the exception messages from UDFs. これはCPython 3.7+でしか動作しないことに注意してください。

3.1.0
spark.sql.execution.topKSortFallbackThreshold 2147483632

In SQL queries with a SORT followed by a LIMIT like 'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort in memory, otherwise do a global sort which spills to disk if necessary.

2.4.0
spark.sql.files.ignoreCorruptFiles false

Whether to ignore corrupt files. true に設定された場合、Spark ジョブは破損したファイルが検出された時に引き続き実行され、読み取られた内容は引き続き返されます。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。

2.1.1
spark.sql.files.ignoreMissingFiles false

不足しているファイルを無視するかどうか。true に設定された場合、Spark ジョブは不足したファイルが検出された時に引き続き実行され、読み取られた内容は引き続き返されます。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。

2.3.0
spark.sql.files.maxPartitionBytes 128MB

ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。

2.0.0
spark.sql.files.maxRecordsPerFile 0

1つのファイルに書き出されるレコードの最大数。この値がゼロまたは負の値の場合、制限はありません。

2.2.0
spark.sql.files.minPartitionNum (none)

推奨される(保証されていない)分割ファイルパーティションの最小数。設定されない場合、デフォルト値はspark.default.parallelismです。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。

3.1.0
spark.sql.function.concatBinaryAsString false

このオプションがfalseに設定され、全ての入力がバイナリの場合、functions.concatはバイナリの出力を返します。そうでなければ、文字列として返します。

2.3.0
spark.sql.function.eltOutputAsString false

このオプションがfalseに設定され、全ての入力がバイナリの場合、eltはバイナリの出力を返します。そうでなければ、文字列として返します。

2.3.0
spark.sql.groupByAliases true

trueの場合、selectリスト内のエイリアスをグループ句で使うことができます。falseの場合、解析例外が投げられます。

2.2.0
spark.sql.groupByOrdinal true

trueの場合、グループ句の序数はselectリスト内の位置として扱われます。falseの場合、序数は無視されます。

2.0.0
spark.sql.hive.convertInsertingPartitionedTable true

When set to true, and spark.sql.hive.convertMetastoreParquet or spark.sql.hive.convertMetastoreOrc is true, the built-in ORC/Parquet writer is usedto process inserting into partitioned ORC/Parquet tables created by using the HiveSQL syntax.

3.0.0
spark.sql.hive.convertMetastoreCtas true

trueに設定された場合、SparkはCTASのHive serdeの代わりに組み込みのデータソースライターを使おうとします。このフラグは、spark.sql.hive.convertMetastoreParquet または spark.sql.hive.convertMetastoreOrc がそれぞれParquetとORC形式として有効にされた場合のみ効果があります。

3.0.0
spark.sql.hive.convertMetastoreOrc true

trueに設定された場合、組み込みのORCリーダーとライターがHive serdeの代わりにHiveQL構文を使って作成されたORCテーブルを処理するために使われます。

2.0.0
spark.sql.hive.convertMetastoreParquet true

trueに設定された場合、組み込みのParquetリーダーとライターがHive serdeの代わりにHiveQL構文を使って作成されたparquetテーブルを処理するために使われます。

1.1.1
spark.sql.hive.convertMetastoreParquet.mergeSchema false

When true, also tries to merge possibly different but compatible Parquet schemas in different Parquet data files. この設定は"spark.sql.hive.convertMetastoreParquet" がtrueの場合のみ効果があります。

1.3.1
spark.sql.hive.filesourcePartitionFileCacheSize 262144000

ゼロでは無い場合、メモリ内のパーティションファイルメタデータのキャッシュを有効にします。全てのテーブルはファイルメタデータについて指定されたバイト数まで使うことができるキャッシュを共有します。この設定は、hiveファイルソースパーティションの管理が有効な場合にのみ効果があります。

2.1.1
spark.sql.hive.manageFilesourcePartitions true

trueの場合、ファイルソーステーブルについてのメタストアパーティション管理も有効にします。これには、データソースと変換されたHiveテーブルの両方が含まれます。When partition management is enabled, datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning when spark.sql.hive.metastorePartitionPruning is set to true.

2.1.1
spark.sql.hive.metastorePartitionPruning true

trueの場合、一部の述語はHiveメタストアにプッシュダウンされるため、一致しないパーティションを早期に排除できます。

1.5.0
spark.sql.hive.thriftServer.async true

trueに設定された場合、Hive Thrift サーバはSQLクエリを非同期に実行します。

1.5.0
spark.sql.hive.verifyPartitionPath false

trueの場合、HDFSに格納されているデータを読み込む時にテーブルのルートディレクトリの下にある全てのパーティションパスを確認します。この設定は将来のリリースで非推奨になり、spark.files.ignoreMissingFilesに置き換えられます。

1.4.0
spark.sql.inMemoryColumnarStorage.batchSize 10000

カラムキャッシュのためのバッチのサイズを制御します。バッチのサイズを大きくするとメモリの利用率と圧縮が改善できますが、データをキャッシュする時にOOMのリスクがあります。

1.1.1
spark.sql.inMemoryColumnarStorage.compressed true

trueに設定した場合はSpark SQLはデータの統計に基づいて各カラムの圧縮コーディックを自動的に選択するでしょう。

1.0.1
spark.sql.inMemoryColumnarStorage.enableVectorizedReader true

カラム型キャッシュ用のベクトル化されたリーダーを有効にします。

2.3.1
spark.sql.json.filterPushdown.enabled true

trueの場合、JSONデータソースへのフィルタプッシュダウンを有効にします。

3.1.0
spark.sql.jsonGenerator.ignoreNullFields true

JSONデータソースおよびto_jsonなどのJSON関数でJSONオブジェクトを生成する時にnullフィールドを無視するかどうか。falseの場合、JSONオブジェクトのnullフィールドに対してnullを生成します。

3.0.0
spark.sql.leafNodeDefaultParallelism (none)

ファイルスキャンノード、ローカルデータスキャンノード、範囲ノードなどのデータを生成するSpark SQLリーフノードのデフォルトの並行処理。この設定のデフォルト値は'SparkContext#defaultParallelism'です。

3.2.0
spark.sql.mapKeyDedupPolicy EXCEPTION

組み込み関数でマップキーを重複排除するポリシー: CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat、TransformKeys。EXCEPTION の場合、重複したマップキーが検出されるとクエリは失敗します。LAST_WIN の場合、最後に挿入されたマップキーが優先されます。

3.0.0
spark.sql.maven.additionalRemoteRepositories https://maven-central.storage-download.googleapis.com/maven2/

オプションの追加のリモートMavenミラーリポジトリのカンマ区切りの文字列設定。これはデフォルトのMaven Centralリポジトリに到達できない場合に、IsolatedClientLoaderでHive jarをダウンロードするためにのみ使われます。

3.0.0
spark.sql.maxMetadataStringLength 100

メタデータ文字列に対して出力する最大文字数。例えば、DataSourceScanExec内のファイルの場所。文字列の長さがこれを超えると、全ての値が省略されます。

3.1.0
spark.sql.maxPlanStringLength 2147483632

プラン文字列に対して出力する最大文字数。プランが長い場合、それ以降の出力は切り捨てられます。デフォルト設定は常に完全なプランが生成されます。プラン文字列が大量のメモリを使用している場合、またはドライバやUIプロセスでOutOfMemoryエラーが発生している場合は、これを8kなどの低い値に設定します。

3.0.0
spark.sql.optimizer.dynamicPartitionPruning.enabled true

trueの場合、結合キーとして使われるときにパーティション列の術語を生成します。

3.0.0
spark.sql.optimizer.enableCsvExpressionOptimization true

SQLオプティマイザでCSV式を最適化するかどうか。from_csvから不要な列を削除することも含まれます。

3.2.0
spark.sql.optimizer.enableJsonExpressionOptimization true

JSONオプティマイザでCSV式を最適化するかどうか。これには、from_jsonからの不要な列の削除、from_json + to_json, to_json + named_struct(from_json.col1, from_json.col2, ....) の簡略化が含まれます。

3.1.0
spark.sql.optimizer.excludedRules (none)

Configures a list of rules to be disabled in the optimizer, in which the rules are specified by their rule names and separated by comma. It is not guaranteed that all the rules in this configuration will eventually be excluded, as some rules are necessary for correctness. The optimizer will log the rules that have indeed been excluded.

2.4.0
spark.sql.orc.columnarReaderBatchSize 4096

The number of rows to include in a orc vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data.

2.4.0
spark.sql.orc.compression.codec snappy

Sets the compression codec used when writing ORC files. If either compression or orc.compress is specified in the table-specific options/properties, the precedence would be compression, orc.compress, spark.sql.orc.compression.codec.Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd, lz4.

2.3.0
spark.sql.orc.enableNestedColumnVectorizedReader false

Enables vectorized orc decoding for nested column.

3.2.0
spark.sql.orc.enableVectorizedReader true

Enables vectorized orc decoding.

2.3.0
spark.sql.orc.filterPushdown true

When true, enable filter pushdown for ORC files.

1.4.0
spark.sql.orc.mergeSchema false

When true, the Orc data source merges schemas collected from all data files, otherwise the schema is picked from a random data file.

3.0.0
spark.sql.orderByOrdinal true

When true, the ordinal numbers are treated as the position in the select list. When false, the ordinal numbers in order/sort by clause are ignored.

2.0.0
spark.sql.parquet.binaryAsString false

他の幾つかのParquet生成システム、特にImpalaとSpark SQLの古いバージョンは、Parquetスキーマを書き出す時にバイナリデータと文字列の区別しません。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにバイナリデータを文字列として扱うように指示します。

1.1.1
spark.sql.parquet.columnarReaderBatchSize 4096

The number of rows to include in a parquet vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data.

2.4.0
spark.sql.parquet.compression.codec snappy

Parquetファイルを書き込む時に圧縮符号化を使うように設定します。compression または parquet.compression のどちらかがテーブル固有のオプション/プロパティ内で指定された場合、先行詞は compression, parquet.compression, spark.sql.parquet.compression.codec でしょう。利用可能な値には、none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd が含まれます。

1.1.1
spark.sql.parquet.enableVectorizedReader true

Enables vectorized parquet decoding.

2.0.0
spark.sql.parquet.filterPushdown true

trueに設定された場合は、Parquet filter push-down 最適化を有効化します。

1.2.0
spark.sql.parquet.int96AsTimestamp true

幾つかのParquet生成システム、特にImparaは、タイムスタンプをINT96に格納します。ナノ秒項目の精度の喪失を避ける必要があるため、SparkもタイムスタンプをINT96で保持するでしょう。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにINT96データをタイムスタンプとして解釈するように指示します。

1.3.0
spark.sql.parquet.int96TimestampConversion false

This controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive & Spark.

2.3.0
spark.sql.parquet.mergeSchema false

trueの場合、Parquetデータソースは全てのデータファイルから集められたスキーマをマージします。そうでなければ、スキーマは、サマリファイル、あるいはサマリファイルが利用できない場合はランダムデータファイルから取り出されます。

1.5.0
spark.sql.parquet.outputTimestampType INT96

Sets which Parquet timestamp type to use when Spark writes data to Parquet files. INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS is a standard timestamp type in Parquet, which stores number of microseconds from the Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which means Spark has to truncate the microsecond portion of its timestamp value.

2.3.0
spark.sql.parquet.recordLevelFilter.enabled false

If true, enables Parquet's native record-level filtering using the pushed down filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled and the vectorized reader is not used. You can ensure the vectorized reader is not used by setting 'spark.sql.parquet.enableVectorizedReader' to false.

2.3.0
spark.sql.parquet.respectSummaryFiles false

When true, we make assumption that all part-files of Parquet are consistent with summary files and we will ignore them when merging schema. Otherwise, if this is false, which is the default, we will merge all part-files. This should be considered as expert-only option, and shouldn't be enabled before knowing what it means exactly.

1.5.0
spark.sql.parquet.writeLegacyFormat false

もしtrueであれば、データはSpark1.4以前の方法で書き込まれるでしょう。例えば、小数値はApache Parquetの固定長のバイト配列形式で書き込まれるでしょう。これはApache HiveやApache Impalaのよゆな他のシステムが使います。falseであれば、Parquetの新しい形式が使われるでしょう。例えば、小数はintに基づいた形式で書き込まれるでしょう。Parquetの出力がこの新しい形式をサポートしないシステムと一緒に使うことを意図している場合は、trueに設定してください。

1.6.0
spark.sql.parser.quotedRegexColumnNames false

When true, quoted Identifiers (using backticks) in SELECT statement are interpreted as regular expressions.

2.3.0
spark.sql.pivotMaxValues 10000

When doing a pivot without specifying values for the pivot column this is the maximum number of (distinct) values that will be collected without error.

1.6.0
spark.sql.pyspark.jvmStacktrace.enabled false

When true, it shows the JVM stacktrace in the user-facing PySpark exception together with Python stacktrace. By default, it is disabled and hides JVM stacktrace and shows a Python-friendly exception only.

3.0.0
spark.sql.redaction.options.regex (?i)url

Regex to decide which keys in a Spark SQL command's options map contain sensitive information. The values of options whose names that match this regex will be redacted in the explain output. This redaction is applied on top of the global redaction configuration defined by spark.redaction.regex.

2.2.2
spark.sql.redaction.string.regex (value of spark.redaction.string.regex)

Regex to decide which parts of strings produced by Spark contain sensitive information. When this regex matches a string part, that string part is replaced by a dummy value. This is currently used to redact the output of SQL explain commands. When this conf is not set, the value from spark.redaction.string.regex is used.

2.3.0
spark.sql.repl.eagerEval.enabled false

Enables eager evaluation or not. When true, the top K rows of Dataset will be displayed if and only if the REPL supports the eager evaluation. Currently, the eager evaluation is supported in PySpark and SparkR. In PySpark, for the notebooks like Jupyter, the HTML table (generated by repr_html) will be returned. For plain Python REPL, the returned outputs are formatted like dataframe.show(). In SparkR, the returned outputs are showed similar to R data.frame would.

2.4.0
spark.sql.repl.eagerEval.maxNumRows 20

The max number of rows that are returned by eager evaluation. This only takes effect when spark.sql.repl.eagerEval.enabled is set to true. The valid range of this config is from 0 to (Int.MaxValue - 1), so the invalid config like negative and greater than (Int.MaxValue - 1) will be normalized to 0 and (Int.MaxValue - 1).

2.4.0
spark.sql.repl.eagerEval.truncate 20

The max number of characters for each cell that is returned by eager evaluation. This only takes effect when spark.sql.repl.eagerEval.enabled is set to true.

2.4.0
spark.sql.session.timeZone (value of local timezone)

The ID of session local timezone in the format of either region-based zone IDs or zone offsets. Region IDs must have the form 'area/city', such as 'America/Los_Angeles'. Zone offsets must be in the format '(+|-)HH', '(+|-)HH:mm' or '(+|-)HH:mm:ss', e.g '-08', '+01:00' or '-13:33:33'. また、'UTC'と'Z'は'+00:00'のエイリアスとしてサポートされています。Other short names are not recommended to use because they can be ambiguous.

2.2.0
spark.sql.shuffle.partitions 200

The default number of partitions to use when shuffling data for joins or aggregations. Note: For structured streaming, this configuration cannot be changed between query restarts from the same checkpoint location.

1.1.0
spark.sql.sources.bucketing.autoBucketedScan.enabled true

When true, decide whether to do bucketed scan on input tables based on query plan automatically. Do not use bucketed scan if 1. query does not have operators to utilize bucketing (e.g. join, group-by, etc), or 2. there's an exchange operator between these operators and table scan. Note when 'spark.sql.sources.bucketing.enabled' is set to false, this configuration does not take any effect.

3.1.0
spark.sql.sources.bucketing.enabled true

When false, we will treat bucketed table as normal table

2.0.0
spark.sql.sources.bucketing.maxBuckets 100000

The maximum number of buckets allowed.

2.4.0
spark.sql.sources.default parquet

The default data source to use in input/output.

1.3.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32

The maximum number of paths allowed for listing files at driver side. If the number of detected paths exceeds this value during partition discovery, it tries to list the files with another Spark distributed job. この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。

1.5.0
spark.sql.sources.partitionColumnTypeInference.enabled true

When true, automatically infer the data types for partitioned columns.

1.5.0
spark.sql.sources.partitionOverwriteMode STATIC

When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't affect Hive serde tables, as they are always overwritten with dynamic mode. This can also be set as an output option for a data source using key partitionOverwriteMode (which takes precedence over this setting), e.g. dataframe.write.option("partitionOverwriteMode", "dynamic").save(path).

2.3.0
spark.sql.statistics.fallBackToHdfs false

When true, it will fall back to HDFS if the table statistics are not available from table metadata. This is useful in determining if a table is small enough to use broadcast joins. This flag is effective only for non-partitioned Hive tables. For non-partitioned data source tables, it will be automatically recalculated if table statistics are not available. For partitioned data source and partitioned Hive tables, It is 'spark.sql.defaultSizeInBytes' if table statistics are not available.

2.0.0
spark.sql.statistics.histogram.enabled false

Generates histograms when computing column statistics if enabled. Histograms can provide better estimation accuracy. Currently, Spark only supports equi-height histogram. Note that collecting histograms takes extra cost. For example, collecting column statistics usually takes only one table scan, but generating equi-height histogram will cause an extra table scan.

2.3.0
spark.sql.statistics.size.autoUpdate.enabled false

Enables automatic update for table size once table's data is changed. Note that if the total number of files of the table is very large, this can be expensive and slow down data change commands.

2.3.0
spark.sql.storeAssignmentPolicy ANSI

When inserting a value into a column with different data type, Spark will perform type coercion. 現在のところ、型強制ルールの3つのポリシーがサポートされます: ANSI、legacy、strict。ANSI ポリシーでは、Spark は ANSI SQL に従って型強制を実行します。実際には、PostgreSQLとほとんど同じ挙動です。It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy policy, Spark allows the type coercion as long as it is a valid Cast, which is very loose. e.g. converting string to int or double to boolean is allowed. これも、Spark 2.x での唯一の動作で、Hive と互換性があります。With strict policy, Spark doesn't allow any possible precision loss or data truncation in type coercion, e.g. converting double to int or decimal to double is not allowed.

3.0.0
spark.sql.streaming.checkpointLocation (none)

The default location for storing checkpoint data for streaming queries.

2.0.0
spark.sql.streaming.continuous.epochBacklogQueueSize 10000

The max number of entries to be stored in queue to wait for late epochs. If this parameter is exceeded by the size of the queue, stream will stop with an error.

3.0.0
spark.sql.streaming.disabledV2Writers

A comma-separated list of fully qualified data source register class names for which StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.

2.3.1
spark.sql.streaming.fileSource.cleaner.numThreads 1

Number of threads used in the file source completed file cleaner.

3.0.0
spark.sql.streaming.forceDeleteTempCheckpointLocation false

When true, enable temporary checkpoint locations force delete.

3.0.0
spark.sql.streaming.metricsEnabled false

Whether Dropwizard/Codahale metrics will be reported for active streaming queries.

2.0.2
spark.sql.streaming.multipleWatermarkPolicy min

Policy to calculate the global watermark value when there are multiple watermark operators in a streaming query. The default value is 'min' which chooses the minimum watermark reported across multiple operators. Other alternative value is 'max' which chooses the maximum across multiple operators. Note: This configuration cannot be changed between query restarts from the same checkpoint location.

2.4.0
spark.sql.streaming.noDataMicroBatches.enabled true

Whether streaming micro-batch engine will execute batches without data for eager state management for stateful streaming queries.

2.4.1
spark.sql.streaming.numRecentProgressUpdates 100

The number of progress updates to retain for a streaming query

2.1.1
spark.sql.streaming.stateStore.stateSchemaCheck true

When true, Spark will validate the state schema against schema on existing state and fail query if it's incompatible.

3.1.0
spark.sql.streaming.stopActiveRunOnRestart true

Running multiple runs of the same streaming query concurrently is not supported. If we find a concurrent active run for a streaming query (in the same or different SparkSessions on the same cluster) and this flag is true, we will stop the old streaming query run to start the new one.

3.0.0
spark.sql.streaming.stopTimeout 0

How long to wait in milliseconds for the streaming execution thread to stop when calling the streaming query's stop() method. 0 or negative values wait indefinitely.

3.0.0
spark.sql.thriftServer.interruptOnCancel false

When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.

3.2.0
spark.sql.thriftServer.queryTimeout 0ms

Set a query duration timeout in seconds in Thrift Server. If the timeout is set to a positive value, a running query will be cancelled automatically when the timeout is exceeded, otherwise the query continues to run till completion. If timeout values are set for each statement via java.sql.Statement.setQueryTimeout and they are smaller than this configuration value, they take precedence. If you set this timeout and prefer to cancel the queries right away without waiting task to finish, consider enabling spark.sql.thriftServer.interruptOnCancel together.

3.1.0
spark.sql.thriftserver.scheduler.pool (none)

Set a Fair Scheduler pool for a JDBC client session.

1.1.1
spark.sql.thriftserver.ui.retainedSessions 200

The number of SQL client sessions kept in the JDBC/ODBC web UI history.

1.4.0
spark.sql.thriftserver.ui.retainedStatements 200

The number of SQL statements kept in the JDBC/ODBC web UI history.

1.4.0
spark.sql.ui.explainMode formatted

Configures the query explain mode used in the Spark SQL UI. The value can be 'simple', 'extended', 'codegen', 'cost', or 'formatted'. The default value is 'formatted'.

3.1.0
spark.sql.variable.substitute true

This enables substitution using syntax like ${var}, ${system:var}, and ${env:var}.

2.0.0

Static SQL 設定

Static SQL configurations are cross-session, immutable Spark SQL configurations. They can be set with final values by the config file and command-line options with --conf/-c prefixed, or by setting SparkConf that are used to create SparkSession. External users can query the static sql config values via SparkSession.conf or via set command, e.g. SET spark.sql.extensions;, but cannot set/unset them.

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.cache.serializer org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer

The name of a class that implements org.apache.spark.sql.columnar.CachedBatchSerializer. It will be used to translate SQL data into a format that can more efficiently be cached. The underlying API is subject to change so use with caution. Multiple classes cannot be specified. The class must have a no-arg constructor.

3.1.0
spark.sql.event.truncate.length 2147483647

Threshold of SQL length beyond which it will be truncated before adding to event. Defaults to no truncation. If set to 0, callsite will be logged instead.

3.0.0
spark.sql.extensions (none)

A comma-separated list of classes that implement Function1[SparkSessionExtensions, Unit] used to configure Spark Session extensions. The classes must have a no-args constructor. If multiple extensions are specified, they are applied in the specified order. For the case of rules and planner strategies, they are applied in the specified order. For the case of parsers, the last parser is used and each parser can delegate to its predecessor. For the case of function name conflicts, the last registered function name is used.

2.2.0
spark.sql.hive.metastore.barrierPrefixes

Spark SQLと通信をする各バージョンのHiveのために明示的にロードされなければならないクラスのプリフィックスのカンマ区切りのリスト。例えば、一般的なprefixで定義されたHive UDFは共有されるでしょう(例えば、org.apache.spark.*)。

1.4.0
spark.sql.hive.metastore.jars ビルトイン

HiveMetastoreClientをインスタンス化するために使われるべきjarの場所。This property can be one of four options: 1. "builtin" Use Hive 2.3.9, which is bundled with the Spark assembly when -Phive is enabled. このオプションが選択された場合は、spark.sql.hive.metastore.version2.3.9 あるいは未定義のどちらかでなければなりません。2. "maven" Use Hive jars of specified version downloaded from Maven repositories. 3. "path" Use Hive jars configured by spark.sql.hive.metastore.jars.path in comma separated format. Support both local or remote paths.The provided jars should be the same version as spark.sql.hive.metastore.version. 4. HiveおよびHadoopの両方の標準的なフォーマットのクラスパス。The provided jars should be the same version as spark.sql.hive.metastore.version.

1.4.0
spark.sql.hive.metastore.jars.path

HiveMetastoreClientをインスタンス化するために使われるjarのカンマ区切りのパス。この設定は、spark.sql.hive.metastore.jarspathに設定されている場合のみ有用です。The paths can be any of the following format: 1. file://path/to/jar/foo.jar 2. hdfs://nameservice/path/to/jar/foo.jar 3. /path/to/jar/ (path without URI scheme follow conf fs.defaultFS's URI schema) 4. [http/https/ftp]://path/to/jar/foo.jar Note that 1, 2, and 3 support wildcard. For example: 1. file://path/to/jar/,file://path2/to/jar//.jar 2. hdfs://nameservice/path/to/jar/,hdfs://nameservice2/path/to/jar//.jar

3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc

Spark SQLとHiveの特定のバージョンの間で共有されるクラスローダを使ってロードされるべきカンマ区切りのクラスプリフィックスのリスト。共有されるべきクラスの例はJDBCドライバで、メタストアと対話するために必要とされます。共有される必要がある他のクラスは、既に共有されているクラスとやり取りするためのものです。例えば、log4jによって使われる独自のアペンダーです。

1.4.0
spark.sql.hive.metastore.version 2.3.9

Hiveメタストアのバージョン利用可能なオプションは、0.12.0から2.3.93.0.0から3.1.2です。

1.4.0
spark.sql.hive.thriftServer.singleSession false

When set to true, Hive Thrift server is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.

1.6.0
spark.sql.hive.version 2.3.9

The compiled, a.k.a, builtin Hive version of the Spark distribution bundled with. Note that, this a read-only conf and only used to report the built-in hive version. If you want a different metastore client for Spark to call, please refer to spark.sql.hive.metastore.version.

1.1.1
spark.sql.metadataCacheTTLSeconds -1000ms

Time-to-live (TTL) value for the metadata caches: partition file metadata cache and session catalog cache. This configuration only has an effect when this value having a positive value (> 0). It also requires setting 'spark.sql.catalogImplementation' to hive, setting 'spark.sql.hive.filesourcePartitionFileCacheSize' > 0 and setting 'spark.sql.hive.manageFilesourcePartitions' to true to be applied to the partition file metadata cache.

3.1.0
spark.sql.queryExecutionListeners (none)

List of class names implementing QueryExecutionListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument.

2.3.0
spark.sql.streaming.streamingQueryListeners (none)

List of class names implementing StreamingQueryListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument.

2.4.0
spark.sql.streaming.ui.enabled true

Whether to run the Structured Streaming Web UI for the Spark application when the Spark Web UI is enabled.

3.0.0
spark.sql.streaming.ui.retainedProgressUpdates 100

The number of progress updates to retain for a streaming query for Structured Streaming UI.

3.0.0
spark.sql.streaming.ui.retainedQueries 100

The number of inactive queries to retain for Structured Streaming UI.

3.0.0
spark.sql.ui.retainedExecutions 1000

Number of executions to retain in the Spark UI.

1.5.0
spark.sql.warehouse.dir (value of $PWD/spark-warehouse)

The default location for managed databases and tables.

2.0.0

Spark ストリーミング

プロパティ名デフォルト意味これ以降のバージョンから
spark.streaming.backpressure.enabled false Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRateおよびspark.streaming.kafka.maxRatePerPartitionが設定されている場合に、この値で上限を制限されます (以下を見てください)。 1.5.0
spark.streaming.backpressure.initialRate not set これはバックプレッシャー機構が有効な場合に最初のバッチのために各レシーバーがデータを受信する初期の最大受信レートです。 2.0.0
spark.streaming.blockInterval 200ms Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。See the performance tuning section in the Spark Streaming programming guide for more details. 0.8.0
spark.streaming.receiver.maxRate not set 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。See the deployment guide in the Spark Streaming programming guide for mode details. 1.0.2
spark.streaming.receiver.writeAheadLog.enable false レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。See the deployment guide in the Spark Streaming programming guide for more details. 1.2.1
spark.streaming.unpersist true Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。 0.9.0
spark.streaming.stopGracefullyOnShutdown false trueの場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。 1.4.0
spark.streaming.kafka.maxRatePerPartition not set 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。 1.3.0
spark.streaming.kafka.minRatePerPartition 1 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最小レート(秒間あたりのレコード数)。 2.4.0
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。 1.0.0
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false ドライバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。ドライバー上のメタデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 1.6.0
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false レシーバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。レシーバー上のデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 1.6.0

SparkR

プロパティ名デフォルト意味これ以降のバージョンから
spark.r.numRBackendThreads 2 SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。 1.4.0
spark.r.command Rscript ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。 1.5.3
spark.r.driver.command spark.r.command ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。 1.5.3
spark.r.shell.command R ドライバーのためのクライアントモードのsparkRシェルの実行のための実行ファイル。クラスターモードでは無視されます。環境変数 SPARKR_DRIVER_R と同じですが、それより優先されます。spark.r.shell.command はsparkRシェルのために使われますが、spark.r.driver.command は R スクリプトを実行するために使われます。 2.1.0
spark.r.backendConnectionTimeout 6000 Rプロセスによって設定されるRBackendへの接続上の接続タイムアウトの秒数。 2.1.0
spark.r.heartBeatInterval 100 接続タイムアウトを防ぐためにSparkRバックエンドからRプロセスに送信されるハートビートの間隔。 2.1.0

GraphX

プロパティ名デフォルト意味これ以降のバージョンから
spark.graphx.pregel.checkpointInterval -1 Pregelでのグラフとメッセージのためのチェックポイント間隔。多くの繰り返しの連鎖による stackOverflowError を避けるために使われます。デフォルトではチェックポイントは無効です。 2.2.0

配備

プロパティ名デフォルト意味これ以降のバージョンから
spark.deploy.recoveryMode NONE クラスターモードでサブミットされたSparkジョブが失敗し再起動する場合に、回復の設定をするリカバリモード。スタンダードあるいはMesosで動いている場合にクラスタモードでのみ適用可能です。 0.8.1
spark.deploy.zookeeper.url None `spark.deploy.recoveryMode` が ZOOKEEPER に設定されている場合は、この設定は接続するためのzookeeperのURLに設定するために使われます。 0.8.1
spark.deploy.zookeeper.dir None `spark.deploy.recoveryMode` がZOOKEEPERに設定されている場合は、この設定はリカバリー状態を保持するためのzookeeperディレクトリを設定するために使われます。 0.8.1

クラスタマネージャー

Sparkの各クラスタマネージャーは追加の設定オプションを持ちます。各ノードのためのページ上で設定を見つけることができます。

YARN

Mesos

Kubernetes

スタンドアローンモード

環境変数

あるSpark設定は環境変数によって設定することができ、それらはSparkがインストールされたディレクトリ内のconf/spark-env.shスクリプトによって使われます(Windows上ではconf/spark-env.cmd)。スタンドアローンおよびMesosモードでは、このファイルはホスト名のようなマシーン固有の情報を渡すことができるでしょう。ローカルのアプリケーションの実行あるいはスクリプトのサブミットの場合、それは開始場所にもなります。

conf/spark-env.sh はSparkがインストールされた場合はデフォルトでは存在しないことに注意してください。しかし、それを作成するためにconf/spark-env.sh.template をコピーすることができます。コピーを実行可能にすることを忘れないでください。

以下の変数はspark-env.shの中で設定することができます:

環境変数意味
JAVA_HOME Javaがインストールされた場所(デフォルトのPATH上に無い場合)。
PYSPARK_PYTHON Python binary executable to use for PySpark in both driver and workers (default is python3 if available, otherwise python). 設定されている場合はプロパティspark.pyspark.python が優先されます。
PYSPARK_DRIVER_PYTHON PySparkのためにドライバの中でのみ使うPythonの実行可能バイナリ(デフォルトはPYSPARK_PYTHONです)。設定されている場合はプロパティspark.pyspark.driver.python が優先されます。
SPARKR_DRIVER_R SparkRシェルのために使われるRバイナリ実行ファイル(デフォルトは R)。設定されている場合はプロパティspark.r.shell.command が優先されます。
SPARK_LOCAL_IP バインドするマシーンのIPアドレス。
SPARK_PUBLIC_DNS Sparkプログラムが他のマシーンに知らせるホスト名。

上に加えて、各マシーンで使うコアの数や最大メモリのような、Sparkスタンドアローン クラスタ スクリプトを設定するためのオプションもあります。

spark-env.shはシェルスクリプトなので、それらのいくつかはプログラム的に設定することができます - 例えば、特定のネットワークインタフェースのIPを調べることでSPARK_LOCAL_IPを計算することができるかもしれません。

注意: cluster モードでYARN上のSparkを実行する場合は、 conf/spark-defaults.conf ファイル内のspark.yarn.appMasterEnv.[EnvironmentVariableName] を使って環境変数が設定される必要があります。spark-env.shに設定されている環境変数は、clusterモードのYARN マスタープロセス内には反映されないでしょう。詳細についてはYARNに関係する Spark プロパティ を見てください。

ログの設定

Sparkはログのためにlog4jを使います。confディレクトリに log4j.properties ファイルを追加することで設定することができます。開始する一つの方法は、そこに存在する既存のlog4j.properties.template をコピーすることです。

By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): mdc.taskName, which shows something like task 1.0 in stage 0.0. You can add %X{mdc.taskName} to your patternLayout in order to print it in the logs. Moreover, you can use spark.sparkContext.setLocalProperty(s"mdc.$name", "value") to add user specific data into MDC. The key in MDC will be the string of “mdc.$name”.

設定ディレクトリの上書き

デフォルトの"SPARK_HOME/conf"以外に異なる設定ディレクトリを指定するために、SPARK_CONF_DIRを設定することができます。Sparkはこのディレクトリから設定ファイル(spark-defaults.conf, spark-env.sh, log4j.properties など)を使用するでしょう。

Hadoopクラスタ設定の継承

Sparkを使ってHDFSから読み込みをするつもりであれば、Sparkのクラスパス上に含まれるべき二つのHadoop設定ファイルがあります。

これらの設定ファイルの場所はHadoopおよびHDPバージョンによって変わりますが、一般的な場所は/etc/hadoop/confの中です。幾つかのツールはその場で設定を生成しますが、それらのコピーをダウンロードするための仕組みを提供します。

それらのファイルがSparkに見えるようにするためには、$SPARK_HOME/conf/spark-env.sh内のHADOOP_CONF_DIRを設定ファイルを含む場所に設定します。

独自の Hadoop/Hive 設定

もしSparkアプリケーションがHadoop, Hive あるいは両方とやり取りをする場合、Sparkのクラスパス内におそらくHadoop/Hiveの設定ファイルがあります。

複数の実行中のアプリケーションは異なるHadoop/Hiveクライアント側の設定を必要とするかもしれません。各アプリケーションごとにSparkのクラスパス内の hdfs-site.xml, core-site.xml, yarn-site.xml, hive-site.xml をコピーし修正することができます。YARN上で実行中のSparkクラスタ内ではこれらの設定ファイルはクラスタ全体で設定され、アプリケーションによって安全に変更することができません。

The better choice is to use spark hadoop properties in the form of spark.hadoop.*, and use spark hive properties in the form of spark.hive.*. For example, adding configuration “spark.hadoop.abc.def=xyz” represents adding hadoop property “abc.def=xyz”, and adding configuration “spark.hive.abc=xyz” represents adding hive property “hive.abc=xyz”. They can be considered as same as normal spark properties which can be set in $SPARK_HOME/conf/spark-defaults.conf

時には、ある設定をSparkConfにハードコーディングしたくないかも知れません。For instance, Spark allows you to simply create an empty conf and set spark/spark hadoop/spark hive properties.

val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)

また、実行時に設定を修正あるいは追加することができます:

./bin/spark-submit \ 
  --name "My app" \ 
  --master local[4] \  
  --conf spark.eventLog.enabled=false \ 
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ 
  --conf spark.hadoop.abc.def=xyz \
  --conf spark.hive.abc=xyz
  myApp.jar

カスタム リソース スケジューリングと設定の概要

GPUs and other accelerators have been widely used for accelerating special workloads, e.g., deep learning and signal processing. Spark now supports requesting and scheduling generic resources, such as GPUs, with a few caveats. The current implementation requires that the resource have addresses that can be allocated by the scheduler. It requires your cluster manager to support and be properly configured with the resources.

There are configurations available to request resources for the driver: spark.driver.resource.{resourceName}.amount, request resources for the executor(s): spark.executor.resource.{resourceName}.amount and specify the requirements for each task: spark.task.resource.{resourceName}.amount. The spark.driver.resource.{resourceName}.discoveryScript config is required on YARN, Kubernetes and a client side Driver on Spark Standalone. spark.executor.resource.{resourceName}.discoveryScript config is required for YARN and Kubernetes. Kubernetes also requires spark.driver.resource.{resourceName}.vendor and/or spark.executor.resource.{resourceName}.vendor. See the config descriptions above for more information on each.

Spark will use the configurations specified to first request containers with the corresponding resources from the cluster manager. Once it gets the container, Spark launches an Executor in that container which will discover what resources the container has and the addresses associated with each resource. The Executor will register with the Driver and report back the resources available to that Executor. The Spark scheduler can then schedule tasks to each Executor and assign specific resource addresses based on the resource requirements the user specified. The user can see the resources assigned to a task using the TaskContext.get().resources api. On the driver, the user can see the resources assigned with the SparkContext resources call. It’s then up to the user to use the assignedaddresses to do the processing they want or pass those into the ML/AI framework they are using.

See your cluster manager specific page for requirements and details on each of - YARN, Kubernetes and Standalone Mode. It is currently not available with Mesos or local mode. And please also note that local-cluster mode with multiple workers is not supported(see Standalone documentation).

ステージレベルのスケジューリングの概要

The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run. This is only available for the RDD API in Scala, Java, and Python. It is available on YARN and Kubernetes when dynamic allocation is enabled. See the YARN page or Kubernetes page for more implementation details.

See the RDD.withResources and ResourceProfileBuilder API’s for using this feature. The current implementation acquires new executors for each ResourceProfile created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config spark.scheduler.resource.profileMergeConflicts to control that behavior. The current merge strategy Spark implements when spark.scheduler.resource.profileMergeConflicts is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.

プッシュベースのシャッフルの概要

Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.

Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.

Currently push-based shuffle is only supported for Spark on YARN with external shuffle service.

外部のシャッフルサービス(サーバ)側の設定オプション

プロパティ名デフォルト意味これ以降のバージョンから
spark.shuffle.push.server.mergedShuffleFileManagerImpl org.apache.spark.network.shuffle.
NoOpMergedShuffleFileManager
Class name of the implementation of MergedShuffleFileManager that manages push-based shuffle. This acts as a server side config to disable or enable push-based shuffle. By default, push-based shuffle is disabled at the server side.

To enable push-based shuffle on the server side, set this config to org.apache.spark.network.shuffle.RemoteBlockPushResolver

3.2.0
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile 2m

The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during push-based shuffle. A merged shuffle file consists of multiple small shuffle blocks. Fetching the complete merged shuffle file in a single disk I/O increases the memory requirements for both the clients and the external shuffle services. Instead, the external shuffle service serves the merged file in MB-sized chunks.
This configuration controls how big a chunk can get. A corresponding index file for each merged shuffle file will be generated indicating chunk boundaries.

Setting this too high would increase the memory requirements on both the clients and the external shuffle service.

Setting this too low would increase the overall number of RPC requests to external shuffle service unnecessarily.

3.2.0
spark.shuffle.push.server.mergedIndexCacheSize 100m The maximum size of cache in memory which could be used in push-based shuffle for storing merged index files. This cache is in addition to the one configured via spark.shuffle.service.index.cache.size. 3.2.0

クライアント側の設定オプション

プロパティ名デフォルト意味これ以降のバージョンから
spark.shuffle.push.enabled false Set to true to enable push-based shuffle on the client side and works in conjunction with the server side flag spark.shuffle.push.server.mergedShuffleFileManagerImpl. 3.2.0
spark.shuffle.push.finalize.timeout 10s The amount of time driver waits in seconds, after all mappers have finished for a given shuffle map stage, before it sends merge finalize requests to remote external shuffle services. This gives the external shuffle services extra time to merge blocks. Setting this too long could potentially lead to performance regression. 3.2.0
spark.shuffle.push.maxRetainedMergerLocations 500 Maximum number of merger locations cached for push-based shuffle. Currently, merger locations are hosts of external shuffle services responsible for handling pushed blocks, merging them and serving merged blocks for later shuffle fetch. 3.2.0
spark.shuffle.push.mergersMinThresholdRatio 0.05 Ratio used to compute the minimum number of shuffle merger locations required for a stage based on the number of partitions for the reducer stage. For example, a reduce stage which has 100 partitions and uses the default value 0.05 requires at least 5 unique merger locations to enable push-based shuffle. 3.2.0
spark.shuffle.push.mergersMinStaticThreshold 5 The static threshold for number of shuffle push merger locations should be available in order to enable push-based shuffle for a stage. Note this config works in conjunction with spark.shuffle.push.mergersMinThresholdRatio. Maximum of spark.shuffle.push.mergersMinStaticThreshold and spark.shuffle.push.mergersMinThresholdRatio ratio number of mergers needed to enable push-based shuffle for a stage. For example: with 1000 partitions for the child stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would need at least 50 mergers to enable push-based shuffle for that stage. 3.2.0
spark.shuffle.push.maxBlockSizeToPush 1m

The max size of an individual block to push to the remote external shuffle services. Blocks larger than this threshold are not pushed to be merged remotely. These shuffle blocks will be fetched in the original manner.

Setting this too high would result in more blocks to be pushed to remote external shuffle services but those are already efficiently fetched with the existing mechanisms resulting in additional overhead of pushing the large blocks to remote external shuffle services. It is recommended to set spark.shuffle.push.maxBlockSizeToPush lesser than spark.shuffle.push.maxBlockBatchSize config's value.

Setting this too low would result in lesser number of blocks getting merged and directly fetched from mapper external shuffle service results in higher small random reads affecting overall disk I/O performance.

3.2.0
spark.shuffle.push.maxBlockBatchSize 3m The max size of a batch of shuffle blocks to be grouped into a single push request. Default is set to 3m in order to keep it slightly higher than spark.storage.memoryMapThreshold default which is 2m as it is very likely that each batch of block gets memory mapped which incurs higher overhead. 3.2.0
TOP
inserted by FC2 system