Sparkの設定
- Sparkのプロパティ
- 環境変数
- ログの設定
- 設定ディレクトリの上書き
- Hadoopクラスタ設定の継承
- 独自の Hadoop/Hive 設定
- カスタム リソース スケジューリングと設定の概要
- ステージレベルのスケジューリングの概要
- プッシュベースのシャッフルの概要
Sparkはシステムを設定するための3つの場所を提供します:
- Spark プロパティ はほとんどのアプリケーションのパラメータを制御し、SparkConfオブジェクトあるいはJavaシステムプロパティを使って設定することができます。
- 環境変数は各ノード上で
conf/spark-env.sh
スクリプトを使ってIPアドレスのようなマシンごとの設定を設定するために使うことができます。 - ログは
log4j.properties
を使って設定することができます。
Sparkのプロパティ
Spark プロパティはほとんどのアプリケーションの設定を制御し、各アプリケーションに対して別々に設定されます。これらのプロパティは SparkContext
に渡して直接SparkConf に設定することができます。SparkConf
を使って共通プロパティの幾つかを設定(例えば、マスターURLおよびアプリケーション名)することができ、set()
メソッドを使って任意のキー値ペアを設定することができます。例えば、以下のように2つのスレッドを使ってアプリケーションを初期化することができます:
local[2]をつけて実行、2つのスレッド - "最小限"の並列処理、することにより、分散コンテキストで実行した場合にのみ存在するバグを検出するのに役立つことに注意してください。
ローカルモードで1つ以上のスレッドを持つことができ、実際にSpark Streamingのような場合にはリソース不足を避けるために1つ以上のスレッドを要求するかも知れないということに注意してください。
なんらかの期間を指定するプロパティは時間の単位で設定されなければなりません。以下の書式が受け付けられます:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
バイトサイズを指定するプロパティはサイズの単位を使って設定されなければなりません。以下の書式が受け付けられます:
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
単位が無い数値は一般的にバイトとして解釈されますが、2,3のものはKiBあるいはMiBとして解釈されます。個々の設定プロパティのドキュメントを見てください。可能な箇所では単位の指定が望ましいです。
Sparkプロパティの動的なロード
時には、ある設定をSparkConf
にハードコーディングしたくないかも知れません。例えば、もし同じアプリケーションを異なるマスターあるいは異なるメモリ量で実行したいなど。Sparkは空のconfを単純に作成することができます:
そうすると、実行時に設定値を提供することができます:
Spark シェルおよび spark-submit
ツールは動的に設定をロードする2つの方法を提供します。最初のものは上で示されたように、--master
のようなコマンドラインのオプションです。spark-submit
は--conf/-c
フラグを使ってどのようなSpark プロパティも受け付けることができますが、Sparkアプリケーションを起動する時に役割があるプロパティのために特別なフラグを使うようにしてください。./bin/spark-submit --help
を実行すると、これらのオプションの完全なリストが表示されるでしょう。
bin/spark-submit
は設定オプションを conf/spark-defaults.conf
からも読み込むでしょう。各行はホワイトスペースで区切られたキーと値です。例えば:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
フラグで指定された、あるいはプロパティファイルの全ての値はアプリケーションに渡され、SparkConfを通じてマージされるでしょう。SparkConfで直接設定されたプロパティは優先度が高く、その次はspark-submit
あるいは spark-shell
に渡されたもので、その次はspark-defaults.conf
ファイル内のオプションです。以前のSparkのバージョンから2,3の設定キーの名前が変更されました; そのような場合、古いキーの名前はまだ受け付けられますが、新しいキーのどのインスタンスよりも優先度が低くなります。
Sparkのプロパティは主に2つの種類に分けることができます; 1つは “spark.driver.memory”, “spark.executor.instances” のような配備に関係します。この種類のプロパティはプログラム的に実行時にSparkConf
を使って設定する時に影響を受けないでしょう。あるいはその挙動はどのクラスタマネージャと配備モードを選択したかに依存するため、設定ファイルまたは spark-submit
コマンドラインのオプションを使って設定することが望まれます; もう1つは “spark.task.maxFailures” のような主にSparkのruntimeの制御に関係し、この種類のプロパティはどちらかの方法で設定することができます。
Sparkプロパティのビュー
http://<driver>:4040
のアプリケーションUIは、"Environment"タブの中でSparkプロパティをリスト表示します。ここはプロパティが正しく設定されたか確認するのに便利な場所です。spark-defaults.conf
, SparkConf
, あるいはコマンドラインで明示的に指定された値だけが表示されるだろうことに注意してください。他の全ての設定プロパティについては、デフォルトの値が使われると見なすことができます。
利用可能なプロパティ
内部設定を制御するほとんどのプロパティは意味のあるデフォルト値を持ちます。最も一般的なオプションの幾つかは以下のように設定されます。
アプリケーションのプロパティ
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.app.name |
(none) | アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。 | 0.9.0 |
spark.driver.cores |
1 | ドライバープロセスのために使われるコアの数。クラスターモードのみです。 | 1.3.0 |
spark.driver.maxResultSize |
1g | 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズのバイト数の制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。 | 1.2.0 |
spark.driver.memory |
1g |
ドライバープロセス、つまりSparkContextが初期化される時に使用されるメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m , 2g )。注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接 SparkConf を使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
1.1.1 |
spark.driver.memoryOverhead |
最小384で、driverMemory * 0.10 |
クラスタモード時のドライバプロセスごとに割り当てられる非ヒープメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはコンテナのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところ YARN、Kubernetes、Kubernetes でサポートされます。注意: オフヒープメモリを含む非ヒープ(spark.memory.offHeap.enabled=true の時)、他のドライバプロセスで使われるメモリ(例えば、PySparkドライバと対になるpythonプロセス)、同じコンテナ内で実行中の他の非ドライバプロセスによって使われるメモリ。実行中のドライバのコンテナの最大メモリサイズは、spark.driver.memoryOverhead と spark.driver.memory の合計によって決定されます。
|
2.3.0 |
spark.driver.resource.{resourceName}.amount |
0 |
ドライバで使用する特定のリソース型の量。これを使う場合、ドライバが起動時のリソースを見つけられるように、spark.driver.resource.{resourceName}.discoveryScript も指定する必要があります。
|
3.0.0 |
spark.driver.resource.{resourceName}.discoveryScript |
None | ドライバが特定のリソース型を見つけられるように実行するスクリプト。これは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、名前とアドレスの配列を持ちます。クライアントがサブミットしたドライバのために、ディスカバリ スクリプトは同じホスト上の他のドライバと比較して異なるリソースアドレスをこのドライバに割り当てる必要があります。 | 3.0.0 |
spark.driver.resource.{resourceName}.vendor |
None | ドライバで使用するリソースのベンダー。このオプションは現在のところKubernetes上でのみサポートされ、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインです。(例えば、Kubernetes 上の GPU については、この設定は nvidia.com または amd.com に設定されます) | 3.0.0 |
spark.resources.discoveryPlugin |
org.apache.spark.resource.ResourceDiscoveryScriptPlugin | アプリケーションへ読み込まれる org.apache.spark.api.resource.ResourceDiscoveryPlugin を実装するクラス名のカンマ区切りのリスト。これは上級ユーザが独自の実装を持つリソースディスカバリクラスを置き換えるためのものです。Spark は、指定された各クラスを、それらの1つがそのリソースの情報を返すまで試行します。It tries the discovery script last if none of the plugins return information for that resource. | 3.0.0 |
spark.executor.memory |
1g |
executorプロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m , 2g )。
|
0.7.0 |
spark.executor.pyspark.memory |
設定無し |
executorごとにPySparkに割り当てられるメモリ量。別に指定されていなければMiB単位。設定された場合、executorのためのPySparkメモリはこの量に制限されるでしょう。設定されない場合、SparkはPythonのメモリ使用を制限せず、他の非JVMプロセスと共有されるオーバーヘッドのメモリ空間を超えないようにするのはアプリケーションの責任です。PySparkがYARNあるいはKubernetes内で実行される場合、子のメモリはexecutorのリソースリクエストに追加されます。 注意: この機能はPythonの `resource` モジュールに依存します; 従って、その挙動と制限が継承されます。例えば、Windowsはリソースの制限をサポートせず、MacOS 上では実際のリソースは制限されません。 |
2.4.0 |
spark.executor.memoryOverhead |
最小384で、executorMemory * 0.10。 |
executor プロセスごとに割り当てられる追加のメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはexecutorのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところYARNとKubernetesでサポートされます。 Note: Additional memory includes PySpark executor memory (when spark.executor.pyspark.memory is not configured) and memory used by other non-executor processes running in the same container. 実行中の executor のコンテナの最大メモリサイズは、spark.executor.memoryOverhead と spark.executor.memory と spark.memory.offHeap.size と spark.executor.pyspark.memory の合計によって決定されます。
|
2.3.0 |
spark.executor.resource.{resourceName}.amount |
0 |
executor プロセスごとに使用する特定のリソース型の量。これを使う場合、executor が起動時のリソースを見つけられるように、spark.executor.resource.{resourceName}.discoveryScript も指定する必要があります。
|
3.0.0 |
spark.executor.resource.{resourceName}.discoveryScript |
None | executor が特定のリソース型を見つけられるように実行するスクリプト。これは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、名前とアドレスの配列を持ちます。 | 3.0.0 |
spark.executor.resource.{resourceName}.vendor |
None | executor で使用するリソースのベンダー。このオプションは現在のところKubernetes上でのみサポートされ、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインです。(例えば、Kubernetes 上の GPU については、この設定は nvidia.com または amd.com に設定されます) | 3.0.0 |
spark.extraListeners |
(none) |
SparkListener を実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。
|
1.3.0 |
spark.local.dir |
/tmp |
Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。 注意: これは、クラスタマネージャーによって設定される、SPARK_LOCAL_DIRS (Standalone)、MESOS_SANDBOX (Mesos) あるいは LOCAL_DIRS (YARN) 環境変数によって上書きされます。 |
0.5.0 |
spark.logConf |
false | SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。 | 0.9.0 |
spark.master |
(none) | 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。 | 0.9.0 |
spark.submit.deployMode |
(none) | Sparkドライバプログラムの配備モード、"client"あるいは"cluster"、これはドライバープログラムをローカル("client")あるいはクラスタ内のノードの1つの上で遠隔で("cluster")起動することを意味します。 | 1.5.0 |
spark.log.callerContext |
(none) |
Yarn/HDFS上で動作している時に Yarn RM log/HDFS audit ログに書き込まれるアプリケーションの情報。その長さはhadoop.caller.context.max.size のHadoop設定に依存します。簡潔でなければならず、一般的に50文字まで持つことができます。
|
2.2.0 |
spark.driver.supervise |
false | trueであれば、非0の終了ステータスで失敗した場合にドライバは自動的に再起動します。Spark スタンドアローン モードあるいは Mesos クラスタ配備モードでのみ効果があります。 | 1.3.0 |
spark.driver.log.dfsDir |
(none) |
spark.driver.log.persistToDfs.enabled が true であれば、Spark ドライバのログが同期されるベースディレクトリ。このベースディレクトリ内で、各アプリケーションはアプリケーション固有のファイルへドライバログを記録します。ユーザは、これを HDFS ディレクトリのような統一された場所に設定して、後で使用する前にドライバログファイルを保持できるようにすることができます。このディレクリは、全ての Spark ユーザがファイルを読み書きでき、Spark履歴サーバユーザがファイルを削除できる必要があります。さらに、spark.history.fs.driverlog.cleaner.enabled が true で、spark.history.fs.driverlog.cleaner.maxAge で設定される最大の古いさよりも古い場合に、個のディレクトリの古いログは Spark 履歴サーバ によって消去されます。
|
3.0.0 |
spark.driver.log.persistToDfs.enabled |
false |
true の場合、クライアントモードで実行中の spark アプリケーションはドライバーログをspark.driver.log.dfsDir で設定される永続化ストレージに書き込みます。もし spark.driver.log.dfsDir が設定されていない場合は、ドライバーログは永続化されません。さらに、spark.history.fs.driverlog.cleaner.enabled を true に設定することで、Spark 履歴サーバ でクリーナーを有効にします。
|
3.0.0 |
spark.driver.log.layout |
%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n |
spark.driver.log.dfsDir に同期されるドライバーログのレイアウト。設定されていない場合、log4j.properties で定義される最初のアペンダーのレイアウトが使われます。それも設定されていない場合、ドライバーログはデフォルトのレイアウトを使います。
|
3.0.0 |
spark.driver.log.allowErasureCoding |
false | ドライバーのログが erasure コーディングを使うことができるかどうか。On HDFS, erasure coded files will not update as quickly as regular replicated files, so they make take longer to reflect changes written by the application. Note that even if this is true, Spark will still not force the file to use erasure coding, it will simply use file system defaults. | 3.0.0 |
これとは別に、以下のプロパティも利用可能で、ある状況では有用かも知れません。
ランタイム環境
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.driver.extraClassPath |
(none) |
ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。 注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接 SparkConf を使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
1.0.0 |
spark.driver.defaultJavaOptions |
(none) |
spark.driver.extraJavaOptions に追加されるデフォルトの JVM オプションの文字列。これは管理者によって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memory を、クライアントモードでは--driver-memory コマンドラインオプションを使って設定することができます。注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接 SparkConf を使って設定すべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
3.0.0 |
spark.driver.extraJavaOptions |
(none) |
ドライバに渡す特別なJVMオプションの文字列。これはユーザによって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memory を、クライアントモードでは--driver-memory コマンドラインオプションを使って設定することができます。注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接 SparkConf を使って設定するべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。spark.driver.defaultJavaOptions がこの設定に追加されます。
|
1.0.0 |
spark.driver.extraLibraryPath |
(none) |
ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。 注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接 SparkConf を使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
1.0.0 |
spark.driver.userClassPathFirst |
false | (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。 | 1.3.0 |
spark.executor.extraClassPath |
(none) | executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。 | 1.0.0 |
spark.executor.defaultJavaOptions |
(none) |
spark.executor.extraJavaOptions に追加されるデフォルトの JVM オプションの文字列。これは管理者によって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。シンボルが続く場合、手を加えられるでしょう: アプリケーションIDによって置き換えられ、executor IDによって置き換えられるでしょう。例えば、/tmp 内にappのexecutor IDの名前をとるファイルへの冗長なgcログを有効にするには、以下の 'value' を渡します: -verbose:gc -Xloggc:/tmp/-.gc
|
3.0.0 |
spark.executor.extraJavaOptions |
(none) |
executorに渡すための特別なJVMオプションの文字列これはユーザによって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。シンボルが続く場合、手を加えられるでしょう: アプリケーションIDによって置き換えられ、executor IDによって置き換えられるでしょう。例えば、/tmp 内にappのexecutor IDの名前をとるファイルへの冗長なgcログを有効にするには、以下の 'value' を渡します: -verbose:gc -Xloggc:/tmp/-.gc spark.executor.defaultJavaOptions がこの設定に追加されます。
|
1.0.0 |
spark.executor.extraLibraryPath |
(none) | executor JVMを起動する場合に使う特別なライブラリパスを設定する。 | 1.0.0 |
spark.executor.logs.rolling.maxRetainedFiles |
(none) | システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。 | 1.1.0 |
spark.executor.logs.rolling.enableCompression |
false | executor ログの圧縮を有効にします。有効な場合、ロールされたexecutorログは圧縮されるでしょう。デフォルトは無効です。 | 2.0.2 |
spark.executor.logs.rolling.maxSize |
(none) |
executorのログがロールオーバーされる最大のファイルサイズをバイトで設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。
|
1.4.0 |
spark.executor.logs.rolling.strategy |
(none) |
executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するために>spark.executor.logs.rolling.maxSize を使用します。
|
1.1.0 |
spark.executor.logs.rolling.time.interval |
daily |
executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdaily , hourly , minutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFiles を見てください。
|
1.1.0 |
spark.executor.userClassPathFirst |
false |
(実験的なもの) spark.driver.userClassPathFirst と同じ機能ですが、executorインスタンスに適用されます。
|
1.3.0 |
spark.executorEnv.[EnvironmentVariableName] |
(none) |
EnvironmentVariableName によって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。
|
0.9.0 |
spark.redaction.regex |
(?i)secret|password|token | ドライバおよびexecutor環境の中で、Spark設定プロパティと環境変数のどちらが保護必要情報を持つかを決定する正規表現。この正規表現がプロパティのキーあるいは値に一致した場合、その値は環境のUIおよびYARNとイベントログのような様々なログから作成されます。 | 2.1.2 |
spark.python.profile |
false |
Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles() によって表示されるか、ドライバが終了する前に表示されるでしょう。sc.dump_profiles(path) によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。
|
1.2.0 |
spark.python.profile.dump |
(none) |
ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはpstats.Stats() によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。
|
1.2.0 |
spark.python.worker.memory |
512m |
集約の間にpythonワーカープロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m , 2g )。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。
|
1.1.0 |
spark.python.worker.reuse |
true | Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。 | 1.2.0 |
spark.files |
各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。globも可能です。 | 1.0.0 | |
spark.submit.pyFiles |
PythonアプリのためのPYTHONPATH上に配置されるカンマ区切りの .zip, .egg あるいは .py ファイル。globも可能です。 | 1.0.1 | |
spark.jars |
ドライバーとexecutorのクラスパス上でインクルードするjarのカンマ区切りのリスト。globも可能です。 | 0.9.0 | |
spark.jars.packages |
ドライバーとexecutorのクラスパス上でインクルードされる Maven coordinateのjarのカンマ区切りのリストcoordinateは groupId:artifactId:version でなければなりません。spark.jars.ivySettings が指定された場合、artifacts はファイル内の設定に応じて解決されるでしょう。そうでなければ artifacts はローカルのmavenリポジトリ、次にmavenセントラル、最後にコマンドラインオプション--repositories によって指定される追加のリモートリポジトリから探されるでしょう。詳細は、上級の依存管理を見てください。
|
1.5.0 | |
spark.jars.excludes |
spark.jars.packages で与えらえた依存を解決する間に依存性の衝突を避けるために除外する、groupId:artifactId のカンマ区切りのリスト。
|
1.5.0 | |
spark.jars.ivy |
Ivyユーザディレクトリを指定するパス。spark.jars.packages からローカルのIvyキャッシュおよびパッケージファイルのために使われます。これはデフォルトが ~/.ivy2 の Ivyプロパティ ivy.default.ivy.user.dir を上書くでしょう。
|
1.3.0 | |
spark.jars.ivySettings |
mavenセントラルのような組み込みのデフォルトの代わりに、spark.jars.packages を使って指定されるjarの解決を独自に行うためのIvy設定ファイルへのパス。コマンドラインオプション--repositories あるいはspark.jars.repositories によって指定される追加のリポジトリも含まれるでしょう。例えば Artifactory のような社内のartifactサーバを使って、ファイアウォールの背後からSparkがartifactを解決できるようにするのに便利です。ファイルフォーマットの設定の詳細は設定ファイルで見つけることができます。file:// スキーマのパスだけがサポートされます。スキーマが無いパスは、file:// スキーマと見なされます。
When running in YARN cluster mode, this file will also be localized to the remote driver for dependency
resolution within SparkContext#addJar
|
2.2.0 | |
spark.jars.repositories |
--packages あるいは spark.jars.packages を使って与えられるmaven coordinateのために検索される、カンマ区切りの追加のリモートリポジトリ。
|
2.3.0 | |
spark.archives |
各executorの作業ディレクトリに解凍される圧縮ファイルのカンマ区切りのリスト。.jar, .tar.gz, .tgz and .zip are supported. 解凍するファイル名の後に# を追加することで、解凍先のディレクトリ名を指定できます。例えば、file.zip#directory 。この設定は実験的なものです。
|
3.1.0 | |
spark.pyspark.driver.python |
ドライバ内でPySparkが使うPythonバイナリ実行ファイル。(デフォルトは spark.pyspark.python です)
|
2.1.0 | |
spark.pyspark.python |
ドライバと実行ファイルの両方の中でPySparkが使うPythonバイナリ実行ファイル。 | 2.1.0 |
シャッフルの挙動
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.reducer.maxSizeInFlight |
48m | 各reduceタスクから同時に取り出すmap出力の最大サイズ。別に指定されていなければMiB単位。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。 | 1.4.0 |
spark.reducer.maxReqsInFlight |
Int.MaxValue | この設定はリモートリクエストの数を指定された値までブロックの検索を制限します。クラスタ内のホストの数が増えた場合は、1つ以上のノードからやってくる接続の数が大量になり、ワーカーが負荷のために落ちるかも知れません。取得リクエストの数を制限できるようにすることで、このシナリオは緩和されるかも知れません。 | 2.0.0 |
spark.reducer.maxBlocksInFlightPerAddress |
Int.MaxValue | この設定は指定されたホスト ポートからのreduceタスク毎に取得されるリモートブロックの数を制限します。大量のブロックが1つの取得あるいは並行して指定されたアドレスからリクエストされる場合、これは提供するexecutorあるいはノードマネージャーをクラッシュするかもしれません。これは外部シャッフルが有効な時にノードマネージャーの負荷を減らすのに特に有用です。それを低い値に設定することでこの問題を緩和することができます。 | 2.2.1 |
spark.shuffle.compress |
true |
map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codec を使うでしょう。
|
0.6.0 |
spark.shuffle.file.buffer |
32k | 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。別に指定されていなければMiB単位。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。 | 1.4.0 |
spark.shuffle.io.maxRetries |
3 | (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。 | 1.2.0 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。 | 1.2.1 |
spark.shuffle.io.preferDirectBufs |
true | (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 | 1.2.0 |
spark.shuffle.io.retryWait |
5s |
(Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWait として計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。
|
1.2.1 |
spark.shuffle.io.backLog |
-1 |
シャッフルサービスの受け入れキューの長さ。大規模なアプリケーションの場合は、この値を増やす必要があるかもしれません。そうすると、サービスが短期間に多数の接続に対応できない場合に到着する接続を取りこぼさなくなります。これは、アプリケーションの外部にある可能性があるシャッフルサービス自身が実行されている全ての場所で設定される必要があります (以下の spark.shuffle.service.enabled オプションを見てください)。1未満に設定すると、Netty の io.netty.util.NetUtil#SOMAXCONN で定義される OS のデフォルトにフォールバックします。
|
1.1.1 |
spark.shuffle.io.connectionTimeout |
spark.network.timeout の値 |
Timeout for the established connections between shuffle servers and clients to be marked as idled and closed if there are still outstanding fetch requests but no traffic no the channel for at least `connectionTimeout`. | 1.2.0 |
spark.shuffle.service.enabled |
false | 外部のシャッフルサービスを有効にします。This service preserves the shuffle files written by executors e.g. so that executors can be safely removed, or so that shuffle fetches can continue in the event of executor failure. 外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentationを見てください。 | 1.2.0 |
spark.shuffle.service.port |
7337 | 外部のシャッフルサービスが実行されるだろうポート。 | 1.2.0 |
spark.shuffle.service.index.cache.size |
100m | 特定のメモリのフットプリントに制限されたキャッシュエントリのバイト。指定されない場合はバイト単位。 | 2.3.0 |
spark.shuffle.maxChunksBeingTransferred |
Long.MAX_VALUE |
シャッフルサービス上で同時に転送されることができるチャンクの最大数。新しくやってくる接続は最大の数に達した時に閉じられるだろうことに注意してください。クライアントはシャッフル再試行の設定に応じて再試行するでしょう(spark.shuffle.io.maxRetries と spark.shuffle.io.retryWait を見てください)。もしこれらの制限に達するとタスクは取得失敗で失敗するでしょう。
|
2.3.0 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。 | 1.1.1 |
spark.shuffle.spill.compress |
true |
シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codec を使うでしょう。
|
0.9.0 |
spark.shuffle.accurateBlockThreshold |
100 * 1024 * 1024 | HighlyCompressedMapStatus内のシャッフルブロックのサイズである上のバイトの閾値は正確に記録されます。 これはシャッフルブロックを取り出す時にシャッフルブロックのサイズを低めに見積もることを避けることでOOMを避けるのに役立ちます。 | 2.2.1 |
spark.shuffle.registration.timeout |
5000 | 外部シャッフルサービスへの登録のためのミリ秒のタイムアウト。 | 2.3.0 |
spark.shuffle.registration.maxAttempts |
3 | 外部シャッフルサービスに登録を失敗した時、maxAttempts回まで再試行するでしょう。 | 2.3.0 |
spark.files.io.connectionTimeout |
spark.network.timeout の値 |
Timeout for the established connections for fetching files in Spark RPC environments to be marked as idled and closed if there are still outstanding files being downloaded but no traffic no the channel for at least `connectionTimeout`. | 1.6.0 |
spark.shuffle.checksum.enabled |
true | シャッフルデータのチェックサムを計算するかどうか。有効な場合、Sparkはマップ出力ファイル内の各パーティションデータのチェックサム値を計算し、ディスク上のチェックサムファイルにその値を格納します。When there's shuffle data corruption detected, Spark will try to diagnose the cause (e.g., network issue, disk issue, etc.) of the corruption by using the checksum file. | 3.2.0 |
spark.shuffle.checksum.algorithm |
ADLER32 | シャッフルのチェックサムを計算するために使われるアルゴリズム。現在のところ、JDKの組み込みのアルゴリズムのみをサポートします。例えば、ADLER32、CRC32。 | 3.2.0 |
Spark UI
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.eventLog.logBlockUpdates.enabled |
false |
spark.eventLog.enabled がtrueの時に、各ブロックの更新毎にイベントを記録するかどうか。*Warning*: これはイベントログのサイズをかなり増やすでしょう。
|
2.3.0 |
spark.eventLog.longForm.enabled |
false | trueであれば、イベントログに長い形式の呼び出しを使います。そうでなければ短い形式を使います。 | 2.4.0 |
spark.eventLog.compress |
false |
もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。
|
1.0.0 |
spark.eventLog.compression.codec |
zstd |
ログイベントを圧縮するためのコーデック。デフォルトでは、Sparkは4つのコーディックを提供します: lz4 , lzf , snappy および zstd 。コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec , org.apache.spark.io.LZFCompressionCodec , org.apache.spark.io.SnappyCompressionCodec および org.apache.spark.io.ZStdCompressionCodec 。
|
3.0.0 |
spark.eventLog.erasureCoding.enabled |
false | Whether to allow event logs to use erasure coding, or turn erasure coding off, regardless of filesystem defaults. On HDFS, erasure coded files will not update as quickly as regular replicated files, so the application updates will take longer to appear in the History Server. Note that even if this is true, Spark will still not force the file to use erasure coding, it will simply use filesystem defaults. | 3.0.0 |
spark.eventLog.dir |
file:///tmp/spark-events |
spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。
|
1.0.0 |
spark.eventLog.enabled |
false | アプリケーションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。 | 1.0.0 |
spark.eventLog.overwrite |
false | 既存のファイルを上書きするかどうか。 | 1.0.0 |
spark.eventLog.buffer.kb |
100k | 出力ストリームに書き込む時に使用するバッファサイズ。別に指定されていなければKiB単位。 | 1.0.0 |
spark.eventLog.rolling.enabled |
false | ローリングオーバーイベントログファイルが有効かどうか。true に設定されると、各イベントログファイルを設定されたサイズに切り詰めます。 | 3.0.0 |
spark.eventLog.rolling.maxFileSize |
128m |
spark.eventLog.rolling.enabled=true の場合、イベントログファイルがロールオーバーされる最大サイズを指定します。
|
3.0.0 |
spark.ui.dagGraph.retainedRootRDDs |
Int.MaxValue | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのDAGグラフノードを記憶するか。 | 2.1.0 |
spark.ui.enabled |
true | Sparkアプリケーションのためにweb UIを実行するかどうか。 | 1.1.1 |
spark.ui.killEnabled |
true | ジョブとステージをweb UIからkillすることができます。 | 1.0.0 |
spark.ui.liveUpdate.period |
100ms | ライブ エンティティを更新する頻度。-1 はアプリケーションの再生時に "更新しない" 事を意味します。つまり最後の書き込みだけが起こるでしょう。稼働中のアプリケーションについては、着信タスクイベントを迅速に処理する場合に無くてもやり過ごせる幾つかの操作を回避します。 | 2.3.0 |
spark.ui.liveUpdate.minFlushPeriod |
1s | 古いUIデータがフラッシュされるまでの最小経過時間。着信タスクイベントが頻繁に発生しない場合にUIの陳腐化を回避します。 | 2.4.2 |
spark.ui.port |
4040 | メモリおよび作業データを表示する、アプリケーションのダッシュボードのポート。 | 0.7.0 |
spark.ui.retainedJobs |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 | 1.2.0 |
spark.ui.retainedStages |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 | 0.9.0 |
spark.ui.retainedTasks |
100000 | ガベージコレクティングの前にSparkUIおよびステータスAPIが1つのステージにどれだけのタスクを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 | 2.0.1 |
spark.ui.reverseProxy |
false | ワーカーとアプリケーションUIのためにリバースプロキシとしてSparkマスターの実行を有効にします。このモードでは、Sparkマスターはホストへの直接アクセスを必要とせずにワーカーとアプリケーションをリバースプロキシするでしょう。注意して使ってください。ワーカーとアプリケーションUIは直接アクセスできないため、sparkのマスター/プロキシ public URLを使ってアクセスできるだけでしょう。この設定はクラスタ内の全てのワーカーとアプリケーションUIに影響し、全てのワーカー、ドライバーおよびマスターで設定されなければなりません。 | 2.1.0 |
spark.ui.reverseProxyUrl |
Spark UIが他のフロントエンドリバースプロキシを経由して提供されなければならない場合、そのリバースプロキシを経由してSparkマスターUIにアクセスするためのURLです。認証のためにプロキシを実行する場合に便利です。例えば、OAuthプロキシ。The URL may contain a path prefix, like http://mydomain.com/path/to/spark/ , allowing you to serve the UI for multiple Spark clusters and other web applications through the same virtual host and port. 通常、これはスキーマ(http/https)、ホスト、ポートを含む、絶対URLでなければなりません。"/" で始まる相対URLを指定できます。In this case, all URLs generated by the Spark UI and Spark REST APIs will be server-relative links -- this will still work, as the entire Spark UI is served through the same host and port.”The setting affects link generation in the Spark UI, but the front-end reverse proxy is responsible for
spark.ui.reverseProxy がオンの場合のみ効果があります。Sparkマスターweb UIが直接到達可能な場合は、この設定は不要です。 |
2.1.0 | |
spark.ui.proxyRedirectUri |
Spark がプロキシの背後で実行されている場合のリダイレクト先。これにより、Spark は Spark UI 自体のアドレスではなく、プロキシサーバを指すようにリダイレクト応答を修正します。これは、アプリケーションのプレフィックスパスを含まない、サーバのアドレスのみにする必要があります; プレフィックスは(X-Forwarded-Context リクエストヘッダを追加することで)プロキシサーバ自体、あるいは Spark アプリケーションの設定内のプロキシベースを設定することで、設定される必要があります。
|
3.0.0 | |
spark.ui.showConsoleProgress |
false |
コンソール内に進捗バーを表示します。進捗バーは500msより長く実行しているステージの進捗を示します。複数のステージが同時に実行される場合は、複数の進捗バーが同じ行に表示されるでしょう。 注意: シェル環境では、spark.ui.showConsoleProgress のデフォルト値は true です。 |
1.2.1 |
spark.ui.custom.executor.log.url |
(none) | Spark UI でクラスタマネージャーのアプリケーションログの URL を使う代わりに、外部のログサービスをサポートするための独自の spark executor ログの URL を指定します。Spark はクラスタマネージャーで変更できるパターンを介して、幾つかのパス変数をサポートします。クラスタマネージャーでどのパターンがサポートされるかを調べるには、ドキュメントがある場合はドキュメントを調べてください。 この設定はイベントログ内の元の url も置き換えることに注意してください。これは履歴サーバ上のアプリケーションにアクセスする場合にも有効です。新しいログの url は永続的である必要があります。そうしないと excecutor のログの url が無効になるかもしれません。 現在のところ、YARN モードのみがこの設定をサポートします。 | 3.0.0 |
spark.worker.ui.retainedExecutors |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。 | 1.5.0 |
spark.worker.ui.retainedDrivers |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。 | 1.5.0 |
spark.sql.ui.retainedExecutions |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。 | 1.5.0 |
spark.streaming.ui.retainedBatches |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。 | 1.0.0 |
spark.ui.retainedDeadExecutors |
100 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのdeadのexecutorを記憶するか。 | 2.0.0 |
spark.ui.filters |
None |
Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。spark.<class name of filter>.param.<param name>=<value> の形式の構成エントリを設定することで、フィルターパラメータも設定内で指定することができます例えば: spark.ui.filters=com.test.filter1 spark.com.test.filter1.param.name1=foo spark.com.test.filter1.param.name2=bar
|
1.0.0 |
spark.ui.requestHeaderSize |
8k | HTTPリクエストヘッダの最大許容サイズ。指定されない場合はバイト単位。この設定はSpark履歴サーバにも適用されます。 | 2.2.3 |
spark.ui.timeline.executors.maximum |
250 | イベントのタイムラインで表示されるexecutorの最大数。 | 3.2.0 |
spark.ui.timeline.jobs.maximum |
500 | イベントタイムラインで表示されるジョブの最大数。 | 3.2.0 |
spark.ui.timeline.stages.maximum |
500 | イベントのタイムラインで表示されるstageの最大数。 | 3.2.0 |
spark.ui.timeline.tasks.maximum |
1000 | イベントのタイムラインで表示されるtaskの最大数。 | 1.4.0 |
圧縮および直列化
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.broadcast.compress |
true |
ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codec を使うでしょう。
|
0.6.0 |
spark.checkpoint.compress |
false |
RDD チェックポイントを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codec を使うでしょう。
|
2.2.0 |
spark.io.compression.codec |
lz4 |
RDDパーティション、イベントログ、ブロードキャスト変数 およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは4つのコーディックを提供します: lz4 , lzf , snappy および zstd 。コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec , org.apache.spark.io.LZFCompressionCodec , org.apache.spark.io.SnappyCompressionCodec および org.apache.spark.io.ZStdCompressionCodec 。
|
0.8.0 |
spark.io.compression.lz4.blockSize |
32k | LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。特記の無い限り、デフォルトの単位はバイト。 | 1.4.0 |
spark.io.compression.snappy.blockSize |
32k | Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。特記の無い限り、デフォルトの単位はバイト。 | 1.4.0 |
spark.io.compression.zstd.level |
1 | Zstd圧縮コーディックの圧縮レベル。圧縮レベルを増やすとCPUとメモリの消費を犠牲にして結果がより良い圧縮となるでしょう。 | 2.3.0 |
spark.io.compression.zstd.bufferSize |
32k | Zstd圧縮コーディックが使用される場合、Zstd圧縮で使用されるブロックサイズのバイト数。このサイズを小さくするとZstdが使われる時のシャッフルメモリ率が低くなりますが、過度のJNI呼び出しのオーバーヘッドにより圧縮コストが増えるかもしれません。 | 2.3.0 |
spark.kryo.classesToRegister |
(none) | Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。 | 1.2.0 |
spark.kryo.referenceTracking |
true | Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。 | 0.8.0 |
spark.kryo.registrationRequired |
false | Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。 | 1.1.0 |
spark.kryo.registrator |
(none) |
Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにカンマ区切りのクラスのリストを渡してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。 KryoRegistrator を拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。
|
0.5.0 |
spark.kryo.unsafe |
false | 安全では無いKryoベースのシリアライザを使うかどうか。安全では無いIOベースを使うことで実質的に速くすることができます。 | 2.1.0 |
spark.kryoserializer.buffer.max |
64m | kryoのシリアライズバッファの最大許容サイズ。別に指定されていなければMiB単位。これはシリアライズ化しようとしているどのオブジェクトよりも大きくなければならず、2048mより小さくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。 | 1.4.0 |
spark.kryoserializer.buffer |
64k |
Kryoのシリアライズ化バッファの初期サイズ。別に指定されていなければKiB単位。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。
|
1.4.0 |
spark.rdd.compress |
false |
シリアライズされたパーティションを圧縮するかどうか(例えば、JavaとScalaでのStorageLevel.MEMORY_ONLY_SER 、あるいは PythonでのStorageLevel.MEMORY_ONLY )。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。圧縮はspark.io.compression.codec を使うでしょう。
|
0.6.0 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer |
ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializer のどのようなサブクラスにもなることができます。
|
0.5.0 |
spark.serializer.objectStreamReset |
100 | org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。 | 1.0.0 |
メモリ管理
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.memory.fraction |
0.6 | 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。この設定の目的は内部メタデータ、ユーザデータ構造、およびまばらな場合の不正確なサイズの見積もりのためにメモリを取って置くことで、非常に大きなレコードです。これはデフォルトの値にしておくことをお勧めします。この値を増加した場合の正しいJVMガベージコレクションの調整についての重要な情報を含む詳細は、この説明を見てください。 | 1.6.0 |
spark.memory.storageFraction |
0.5 |
立ち退きに耐性のあるストレージメモリの量。spark.memory.fraction によって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。
|
1.6.0 |
spark.memory.offHeap.enabled |
false |
trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。
|
1.6.0 |
spark.memory.offHeap.size |
0 |
オフヒープ割り当てのために使うことができる絶対メモリ量。特記が無い限りバイト単位。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=true の場合は、これは正の値に設定されなければなりません。
|
1.6.0 |
spark.storage.replication.proactive |
false | RDDブロックのために積極的なブロックリプリケーションを有効にする。executorの障害により失われたキャッシュされたRDDのブロックレプリカは、既存の利用可能なレプリカがある限り補充されます。これはブロックのレプリケーション レベルを初期の数にしようとします。 | 2.2.0 |
spark.cleaner.periodicGC.interval |
30min |
ガベージコレクションどれだけの頻度で起動するかを制御します。 このコンテキストクリーナーは弱い参照がガベージコレクトされた時のみ掃除されます。ドライバ上でほとんどメモリの圧迫が無いような大きなドライバJVMを使った長く実行中のアプリケーション内では、これは極めて稀か全く起きないかもしれません。全く掃除しないことはしばらくした後でexecutorがディスク空間外で実行することに繋がるかもしれません。 |
1.6.0 |
spark.cleaner.referenceTracking |
true | コンテキストの掃除を有効または無効にします。 | 1.0.0 |
spark.cleaner.referenceTracking.blocking |
true |
クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します (シャッフル以外、これはspark.cleaner.referenceTracking.blocking.shuffle Sparkプロパティによって制御されます)。
|
1.0.0 |
spark.cleaner.referenceTracking.blocking.shuffle |
false | シャッフル クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します。 | 1.1.1 |
spark.cleaner.referenceTracking.cleanCheckpoints |
false | 参照がスコープ外の場合にチェックポイントファイルを掃除するかどうかを制御します。 | 1.4.0 |
Executionの挙動
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.broadcast.blockSize |
4m |
TorrentBroadcastFactory のためのブロックの各断片のサイズ。別に指定されていなければKiB単位。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManager はパフォーマンスの打撃を受けるかも知れません。
|
0.5.0 |
spark.broadcast.checksum |
true | ブロードキャストのためにチェックサムを有効にするかどうか。有効にされた場合、ブロードキャストはチェックサムを含むでしょう。これは少し多くのデータの計算と送信という代償を払って、間違ったブロックの検知を手助けすることができます。ネットワークがブロードキャスト中にデータの間違いが起きない保証をするための他の機構を持つ場合、無効にすることができます。 | 2.1.1 |
spark.executor.cores |
YARNモードの場合は1、スタンドアローンモードとMesos coarse-grainedモードの場合はワーカー上の全ての利用可能なコア。 | 各executor上で使用されるコアの数。スタンドアローンおよびMesosの coarse-grained モードでの詳細については、this descriptionを見てください。 | 1.0.0 |
spark.default.parallelism |
reduceByKey および join のような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelize のような操作については、クラスタマネージャーに依存します:
|
ユーザによって設定されなかった場合は、 join , reduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。
|
0.5.0 |
spark.executor.heartbeatInterval |
10s | ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。spark.executor.heartbeatInterval は spark.network.timeout よりかなり小さくなければなりません。 | 1.1.0 |
spark.files.fetchTimeout |
60s | ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。 | 1.0.0 |
spark.files.useFetchCache |
true | true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。 | 1.2.2 |
spark.files.overwrite |
false | ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。 | 1.0.0 |
spark.files.maxPartitionBytes |
134217728 (128 MiB) | ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。 | 2.1.0 |
spark.files.openCostInBytes |
4194304 (4 MiB) | ファイルを開くための予測コストは同じ時間で操作することができるバイト数によって計測することができます。これは複数のファイルを1つのパーティションに配置する場合に使われます。過剰に予測するほうが良いです。そうすれば、小さなファイルを持つパーティションは大きなファイルを持つパーティションよりも高速になるでしょう。 | 2.1.0 |
spark.hadoop.cloneConf |
false |
trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定 スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。
|
1.0.3 |
spark.hadoop.validateOutputSpecs |
true | trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。 | 1.0.1 |
spark.storage.memoryMapThreshold |
2m | ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズ。特記の無い限り、デフォルトの単位はバイト。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。 | 0.9.2 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version |
1 | ファイル出力コミッタのアルゴリズムのバージョン。有効なアルゴリズムのバージョン番号: 1 あるいは 2。2 は MAPREDUCE-7282 のような正確さの問題を引き起こすかもしれないことに注意してください。 | 2.2.0 |
Executor のメトリクス
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.eventLog.logStageExecutorMetrics |
false |
(各 executor について) executor メトリクスのステージごとのピークをイベントログに書き込むかどうか。 注意: メトリクスは executor の heatbeat でポーリング(収集)され、送信されます。これは常に行われます; この設定は、集約されたメトリクスのピークがイベントログに書き込まれるかどうかを決定するためだけのものです。 |
3.0.0 | spark.executor.processTreeMetrics.enabled |
false |
executor のメトリクスを収集する時に、(/proc ファイルシステムから)プロセスのツリーメトリクスを収集するかどうか。 注意: プロセスツリーメトリクスは /proc ファイルシステムが存在する場合のみ収集されます。 |
3.0.0 |
spark.executor.metrics.pollingInterval |
0 |
executor メトリクスを収集する頻度 (ミリ秒) 0 の場合、ポーリングは executor heatbeat で実行されます (したがって、 spark.executor.heartbeatInterval で指定された heartbeat 間隔です)。正の場合、ポーリングはこの間隔で行われます。
|
3.0.0 |
ネットワーク
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.rpc.message.maxSize |
128 | "control plane"通信内で許される最大のメッセージ(MiB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。RPCメッセージに関するメッセージが表示されるでしょう。 | 2.0.0 |
spark.blockManager.port |
(random) | 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。 | 1.1.0 |
spark.driver.blockManager.port |
(spark.blockManager.port の値) | ブロックマネージャーがlistenするドライバー固有のポート。executorとして同じ設定を使うことができない場合。 | 2.1.0 |
spark.driver.bindAddress |
(spark.driver.host の値) |
listenソケットをバインドするホスト名またはIPアドレス。この設定は SPARK_LOCAL_IP 環境変数(以下を見てください)を上書きします。 executorあるいは外部のシステムに知らせるためにローカルのものから異なるアドレスにすることもできます。これは例えばブリッジネットワークのコンテナを動かしている場合に便利です。これが正しく動作するためには、ドライバー(RPC, ブロックマネージャー および UI)によって使われる異なるポートがコンテナのホストからフォワードされる必要があります。 |
2.1.0 |
spark.driver.host |
(ローカル ホスト名) | ドライバーのホスト名とIPアドレスexecutorとスタンドアローンマスターが通信するために使われます。 | 0.7.0 |
spark.driver.port |
(random) | ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。 | 0.7.0 |
spark.rpc.io.backLog |
64 | RPC サーバの受け入れキューの長さ。大規模なアプリケーションの場合は、この値を増やす必要があるかもしれません。そうすると、短期間に多数の接続が来た時に到着する接続を取りこぼさなくなります。 | 3.0.0 |
spark.network.timeout |
120s |
全てのネットワークの相互交流のタイムアウト。この設定は、spark.storage.blockManagerHeartbeatTimeoutMs 、spark.shuffle.io.connectionTimeout 、spark.rpc.askTimeout 、spark.rpc.lookupTimeout が設定されていない場合に、それらの代わりに使われるでしょう。
|
1.3.0 |
spark.network.io.preferDirectBufs |
true | 有効にされた場合、オフヒープバッファの割り当ては共有アロケータによって選ばれます。オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 | 3.0.0 |
spark.port.maxRetries |
16 | ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。 | 1.1.1 |
spark.rpc.numRetries |
3 | RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。 | 1.4.0 |
spark.rpc.retry.wait |
3s | RPCのask操作が再試行するまで待つ期間。 | 1.4.0 |
spark.rpc.askTimeout |
spark.network.timeout |
RPCのask操作がタイムアウトするまで待つ期間。 | 1.4.0 |
spark.rpc.lookupTimeout |
120s | RPCリモートエンドポイントがタイムアウトするまで待つ時間。 | 1.4.0 |
spark.network.maxRemoteBlockSizeFetchToMem |
200m | ブロックのサイズがこの閾値のバイトよりも上の場合、リモートのブロックはディスクに取りに行くでしょう。これは巨大なリクエストがあまりに大きなメモリを要求することを避けるためのものです。この設定はシャッフルの取得およびブロックマネージャーのリモートブロックの取得の両方に影響するだろうことに注意してください。外部シャッフルサービスを有効にしたユーザについては、この機能は外部シャッフルサービスが少なくとも 2.3.0 である場合のみ動作するかもしれません。 | 3.0.0 |
spark.rpc.io.connectionTimeout |
spark.network.timeout の値 |
Timeout for the established connections between RPC peers to be marked as idled and closed if there are outstanding RPC requests but no traffic on the channel for at least `connectionTimeout`. | 1.2.0 |
スケジューリング
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.cores.max |
(not set) |
<c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores 、Mesos上では無制限(利用可能な全てのコア)になります。
|
0.6.0 |
spark.locality.wait |
3s |
データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.node などを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。
|
0.5.0 |
spark.locality.wait.node |
spark.locality.wait | ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。 | 0.8.0 |
spark.locality.wait.process |
spark.locality.wait | プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。 | 0.8.0 |
spark.locality.wait.rack |
spark.locality.wait | rack局地性を待つための局地性をカスタマイズします。 | 0.8.0 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30s | スケジュールが始まる前にリソースが登録するための最大待ち時間。 | 1.1.1 |
spark.scheduler.minRegisteredResourcesRatio |
KUBERNETES modeでは 0.8; YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。 |
スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードとKubernetesモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTime によって設定されます。
|
1.1.1 |
spark.scheduler.mode |
FIFO |
同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。
|
0.8.0 |
spark.scheduler.revive.interval |
1s | スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。 | 0.8.1 |
spark.scheduler.listenerbus.eventqueue.capacity |
10000 | イベントキューのデフォルトの容量。Spark は最初に `spark.scheduler.listenerbus.eventqueue.queueName.capacity` で指定された容量を使ってイベントキューを初期化しようとします。設定されていない場合、Spark はこの設定で指定されたデフォルトの容量を使います。容量は 0 より大きい必要があることに注意してください。listenerのイベントが零れる場合は、値を増やすことを考えてください (例えば 20000)。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 | 2.3.0 |
spark.scheduler.listenerbus.eventqueue.shared.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark listener バスでの共有イベントキューの容量。これは listener バス に登録する外部 listener(s) のイベントを保持します。共有キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.appStatus.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
sppStatus イベントキューの容量。これは内部アプリケーションステータス listener のイベントを保持します。appStatus キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark listener バスでの executorManagement イベントキューの容量。これは内部 executor management listener のイベントを保持します。executorManagement キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.eventLog.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark listener バスでの eventLog キューの容量。これは eventLogs にイベントを書き込む Event logging listener のイベントを保持します。eventLog キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.streams.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark listener バスでの streams キューの容量。これは内部 streaming listener のイベントを保持します。streams キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 | 3.0.0 |
spark.scheduler.resource.profileMergeConflicts |
false | If set to "true", Spark will merge ResourceProfiles when different profiles are specified in RDDs that get combined into a single stage. マージされた場合、Sparkはそれぞれのリソースの最大のものを選択し、新しいResourceProfileを生成します。The default of false results in Spark throwing an exception if multiple different ResourceProfiles are found in RDDs going into the same stage. | 3.1.0 |
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout |
120s | The timeout in seconds to wait to acquire a new executor and schedule a task before aborting a TaskSet which is unschedulable because all executors are excluded due to task failures. | 2.4.1 |
spark.excludeOnFailure.enabled |
false | "true"に設定すると、あまりに多くのタスクの失敗によって除外されたexecutor上でSparkがタスクをスケジュールすることを防ぎます。executorとノードを除外するために使われるアルゴリズムは、他の"spark.excludeOnFailure"設定オプションによって更に制御することができます。 | 2.1.0 |
spark.excludeOnFailure.timeout |
1h | (実験的) ノードあるいはexecutorが新しいタスクを実行しようとして除外リストから無条件に削除される前に、どれくらい長くノードあるいはexecutorがアプリケーション全体で除外されるか。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor |
1 | (実験的) 指定されたタスクについて、1つのexecutorがそのタスクが除外される前にそのexecutor上でどれだけ再試行できるか。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerNode |
2 | (実験的) 指定されたタスクについて、1つのノードがそのタスクが除外される前にそのノード上でどれだけ再試行できるか。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor |
2 | (実験的) executorが1つのステージについて除外される前に、1つのexecutor上のそのステージ内でどれだけの異なるタスクを失敗しなければならないか。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode |
2 | (実験的) ノード全体が指定されたステージについて失敗だとマークされる前に、どれだけの異なるexecutorがそのステージについて除外されたとマークされるか。 | 2.1.0 |
spark.excludeOnFailure.application.maxFailedTasksPerExecutor |
2 |
(実験的) executorがアプリケーション全体で除外される前に、連続するタスクのセットの中でどれだけの異なるタスクが1つのeecutor上で失敗しなければならないか。除外されたexecutorはspark.excludeOnFailure.timeout で指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、executorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。
|
2.2.0 |
spark.excludeOnFailure.application.maxFailedExecutorsPerNode |
2 |
(実験的) ノードがアプリケーション全体で除外される前に、どれだけの異なるexecutorがアプリケーション全体で除外されなければならないか。除外されたノードはspark.excludeOnFailure.timeout で指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、ノード上のexecutorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。
|
2.2.0 |
spark.excludeOnFailure.killExcludedExecutors |
false | (実験的) "true" に設定すると、spark.killExcludedExecutors.application.* によって制御されるため取得時の障害あるいはアプリケーション全体で除外される時にSparkが自動的にexecutorをkillすることができます。ノード全体が除外に追加された場合、ノード上の全てのexecutorはkillされるだろうことに注意してください。 | 2.2.0 |
spark.excludeOnFailure.application.fetchFailure.enabled |
false | (実験的) もし "true" に設定すると、取得の失敗が起きた場合にSparkはexecutorをすぐに除外するでしょう。もし外部シャッフルサービスが有効な場合、ノード全体が除外されるでしょう。 | 2.3.0 |
spark.speculation |
false | "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。 | 0.6.0 |
spark.speculation.interval |
100ms | どれだけの頻度でSparkが投機するためにタスクをチェックするか。 | 0.6.0 |
spark.speculation.multiplier |
1.5 | 平均より遅いタスクが何回投機と見なされるか。 | 0.6.0 |
spark.speculation.quantile |
0.75 | 指定のステージで投機が有効になる前にどれだけのタスクの割合が終了していなければならないか。 | 0.6.0 |
spark.speculation.minTaskRuntime |
100ms | Minimum amount of time a task runs before being considered for speculation. This can be used to avoid launching speculative copies of tasks that are very short. | 3.2.0 |
spark.speculation.task.duration.threshold |
None | スケジューラがタスクを投機的に実行しようとするまでのタスク期間。指定した場合、現在のステージに含まれるタスクが1つの executor のスロット数以下であり、タスクが閾値よりも時間が掛かる場合に、タスクが投機的に実行されます。この設定は非常に少ないタスクでステージを推測するのに役立ちます。executor のスロットが十分に大きい場合、通常の投機設定も適用されるかもしれません。例えば、閾値に達していないが十分に実行が成功している場合は、タスクが再起動されるかもしれません。The number of slots is computed based on the conf values of spark.executor.cores and spark.task.cpus minimum 1. 特記の無い限り、デフォルトの単位はバイト。 | 3.0.0 |
spark.task.cpus |
1 | 各タスクごとに割り当てるコアの数。 | 0.5.0 |
spark.task.resource.{resourceName}.amount |
1 |
各タスクに割り当てる特定のリソースタイプの量。これは倍になるかもしれないことに注意してください。これが指定された場合、executor の設定 spark.executor.resource.{resourceName}.amount および対応するディスカバリ設定も提供して、executor がそのリソースタイプで作成されるようにする必要があります。In addition to whole amounts, a fractional amount (for example, 0.25, which means 1/4th of a resource) may be specified. Fractional amounts must be less than or equal to 0.5, or in other words, the minimum amount of resource sharing is 2 tasks per resource. Additionally, fractional amounts are floored in order to assign resource slots (e.g. a 0.2222 configuration, or 1/0.2222 slots will become 4 tasks/resource, not 5).
|
3.0.0 |
spark.task.maxFailures |
4 | ジョブを諦める前のタスクの失敗の数。異なるタスクに渡って広がっている失敗の総数はジョブを失敗させないでしょう; 特定のタスクがこの試行の数を失敗しなければなりません。1以上でなければなりません。許可された再試行の数 = この値 - 1. | 0.8.0 |
spark.task.reaper.enabled |
false |
killed / interrupted タスクの監視を有効にする。trueに設定した場合、killされた全てのタスクはタスクが実際に実行を完了するまでexecutorによって監視されるでしょう。この監視の正確な挙動を制御する方法についての詳細は、他のspark.task.reaper.* 設定を見てください。false (デフォルト)に設定すると、タスクのkill はそのような監視が欠けている古いコードを使うでしょう。
|
2.0.3 |
spark.task.reaper.pollingInterval |
10s |
spark.task.reaper.enabled = true の時、この設定はexecutorがkillされたタスクの状態をpollする頻度を制御します。pollされた時にkillされたタスクがまだ実行中の場合は、警告が記録され、デフォルトではタスクのthread-dumpが記録されるでしょう(この thread dump は spark.task.reaper.threadDump 設定によって無効にすることができます。これは以下で説明されます)。
|
2.0.3 |
spark.task.reaper.threadDump |
true |
spark.task.reaper.enabled = true の場合、この設定はkillされたタスクの定期的なpollの際にタスクの thread dumps が記録されるかどうかを制御します。thread dumpの収集を無効にするには、これをfalseに設定します。
|
2.0.3 |
spark.task.reaper.killTimeout |
-1 |
spark.task.reaper.enabled = true の場合、この設定は、killされたタスクが実行を停止していない場合にexecutor JVMがそれ自身をkillするだろうタイムアウトを指定します。 デフォルト値 -1 は、この機構を無効にし、executorが自己破壊をすることを避けます。この設定の目的は、キャンセル不可能なタスクの暴走がexecutorを不安定にすることを避けるための安全策として振舞うことです。
|
2.0.3 |
spark.stage.maxConsecutiveAttempts |
4 | ステージが中止されるまでに許される連続するステージの試行の数。 | 2.2.0 |
バリア実行モード
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.barrier.sync.timeout |
365d |
barrier タスクからの各 barrier() 呼び出しのタイムアウト。coordinator が設定された時間内に barrier タスクから全ての同期メッセージを受信しなかった場合は、全てのタスクを失敗させるために SparkException を投げます。デフォルト値は 31536000(3600 * 24 * 365) に設定されているため、barrier() 呼び出しは1年間待つ必要があります。
|
2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.interval |
15s | 最大同時タスクチェックの失敗から次のチェックまで待つ秒数。最大同時チェックにより、クラスタは送信されたジョブの barrier ステージで必要とされるより多くの同時タスクを起動できることが保証されます。クラスタが起動したばかりで十分な executor が登録されていない場合、チェックは失敗する可能性があるため、しばらく待ってからもう一度チェックを実行します。チェックがジョブに対して設定された最大失敗回数より多く失敗した場合、現在のジョブの送信が失敗します。この設定は1つ以上の barrier ステージを含むジョブにのみ適用されることに注意してください。barrier 以外のジョブではチェックを実行しません。 | 2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures |
40 | ジョブの送信が失敗するまでに許可される最大同時タスクチェックの失敗の数。最大同時チェックにより、クラスタは送信されたジョブの barrier ステージで必要とされるより多くの同時タスクを起動できることが保証されます。クラスタが起動したばかりで十分な executor が登録されていない場合、チェックは失敗する可能性があるため、しばらく待ってからもう一度チェックを実行します。チェックがジョブに対して設定された最大失敗回数より多く失敗した場合、現在のジョブの送信が失敗します。この設定は1つ以上の barrier ステージを含むジョブにのみ適用されることに注意してください。barrier 以外のジョブではチェックを実行しません。 | 2.4.0 |
動的割り当て
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.dynamicAllocation.enabled |
false |
動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。詳細は ここの説明を見てください。 これは spark.shuffle.service.enabled あるいは spark.dynamicAllocation.shuffleTracking.enabled の設定が必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors , spark.dynamicAllocation.maxExecutors , spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio
|
1.2.0 |
spark.dynamicAllocation.executorIdleTimeout |
60s | 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 | 1.2.0 |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 | 1.4.0 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
動的割り当てが有効な場合、実行するexecutorの初期の数。 もし `--num-executors` (あるいは `spark.executor.instances`) が設定され、この値よりも大きい場合は、executorの初期の数として使われるでしょう。 |
1.3.0 |
spark.dynamicAllocation.maxExecutors |
infinity | 動的割り当てが有効な場合のexecutorの数の上限。 | 1.2.0 |
spark.dynamicAllocation.minExecutors |
0 | 動的割り当てが有効な場合のexecutorの数の下限。 | 1.2.0 |
spark.dynamicAllocation.executorAllocationRatio |
1 |
デフォルトで、動的な割り当ては処理するタスクの数に応じて並行度を最大にするのに十分なexecutorを要求するでしょう。これはジョブのレイテンシを最小化しますが、いくつかのexecutorは何も仕事をしないかもしれないため、この設定を持つ小さなタスクはexecutorの割り当てのオーバーヘッドにより多くのリソースを浪費するかもしれません。この設定によりexecutorの数を減らすために使われる割合を設定することができます。完全な並行度に関して。最大の並行度を与えるために、デフォルトは1.0です。0.5 はexecutorの目標値を2で割るでしょう。dynamicAllocationによって計算されるexecutorの目標値は spark.dynamicAllocation.minExecutors と spark.dynamicAllocation.maxExecutors の設定によって上書きすることもできます。
|
2.4.0 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。 | 1.2.0 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
spark.dynamicAllocation.schedulerBacklogTimeout と同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は ここの説明を見てください。
|
1.2.0 |
spark.dynamicAllocation.shuffleTracking.enabled |
false |
実験的。executor のシャッフルファイルトラッキングを有効にします。これにより、外部シャッフルサービスを必要とせずに動的な割り当てが可能になります。このオプションはアクティブなジョブのシャッフルデータを格納している executor を保とうとします。 | 3.0.0 |
spark.dynamicAllocation.shuffleTracking.timeout |
infinity |
シャッフルトラッキングが有効な場合、シャッフルデータを保持している executor のタイムアウトを制御します。The default value means that Spark will rely on the shuffles being garbage collected to be able to release executors. 何らかの理由でガベージコレクションがシャッフルを十分に早くクリーンアップしない場合、このオプションを使って、シャッフルデータを格納している executor をシャッフルデータを格納している時であってもタイムアウトするタイミングを制御することができます。 | 3.0.0 |
スレッドの設定
ジョブとクラスタの設定に応じて、使用可能なリソースを使ってパフォーマンスを向上させるために、Spark の幾つかの場所にスレッドを設定することができます。Spark 3.0 より前では、これらのスレッド設定はドライバ、executor 、ワーカー、マスターなど、Spark の全てのロールに適用されます。Spark 3.0 から、ドライバと executor から始めて、より細かい粒度でスレッドを設定することができます。以下の表の例では、RPC モジュールを取り上げます。シャッフルなどの他のモジュールの場合、RPC モジュール専用の spark.{driver|executor}.rpc.netty.dispatcher.numThreads
を除いて、プロパティ名の “rpc” を “shuffle” に置き換えるだけです。
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.{driver|executor}.rpc.io.serverThreads |
Fall back on spark.rpc.io.serverThreads
|
サーバスレッドプールで使われるスレッド数 | 1.6.0 |
spark.{driver|executor}.rpc.io.clientThreads |
Fall back on spark.rpc.io.clientThreads
|
クライアントスレッドプールで使われるスレッドの数 | 1.6.0 |
spark.{driver|executor}.rpc.netty.dispatcher.numThreads |
Fall back on spark.rpc.netty.dispatcher.numThreads
|
RPCメッセージディスパッチャースレッドプールで使われるスレッドの数 | 3.0.0 |
スレッド関連の設定キーの数のデフォルト値は、ドライバまたは executor に要求されたコアの数の最小値、またはその値が無い場合は、JVM に利用可能なコアの数 (ハードコードされた上限は 8)。
セキュリティ
異なるSparkサブシステムを安全にする方法について利用可能なオプションについては、Security ページを参照してください。
Spark SQL
Runtime SQL 設定
ランタイム SQL 設定は、セッションごとの変更可能な Spark SQL 設定です。設定ファイルと、--conf/-c
が前に付いたコマンドラインオプション、または SparkSession
を作成するために使われる SparkConf
を設定することで、初期値を設定することができます。また、SET コマンドで設定とクエリを実行し、RESET コマンドあるいは実行時の SparkSession.conf
のセッターとゲッターで初期値に戻すことができます。
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.sql.adaptive.advisoryPartitionSizeInBytes |
(spark.sql.adaptive.shuffle.targetPostShuffleInputSize の値) |
適応最適化中のシャッフルパーティションの推奨サイズのバイト数 (spark.sql.adaptive.enabled が true の場合)。Sparkが小さなシャッフルパーティションを結合するか、スキューされたパーティションを分割する時に有効になります。 |
3.0.0 |
spark.sql.adaptive.autoBroadcastJoinThreshold |
(none) | joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。デフォルトの値はspark.sql.autoBroadcastJoinThresholdと同じです。これは適応フレームワークでのみ使われることに注意してください。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.enabled |
true | これが true で、'spark.sql.adaptive.enabled' が true の場合、Spark はターゲットサイズ('spark.sql.adaptive.advisoryPartitionSizeInBytes' で指定される)に従って連続するシャッフルパーティションを合体させ、小さなタスクが多すぎないようにします。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(none) | 合体する前のシャッフルパーティションの初期数。設定されない場合は、spark.sql.shuffle.partitionsと同じです。この設定は、'spark.sql.adaptive.enabled' と 'spark.sql.adaptive.coalescePartitions.enabled' の両方が true の場合にのみ効果があります。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 合体後のシャッフルパーティションの最小サイズ。This is useful when the adaptively calculated target size is too small during partition coalescing. |
3.2.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | When true, Spark does not respect the target size specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes' (default 64MB) when coalescing contiguous shuffle partitions, but adaptively calculate the target size according to the default parallelism of the Spark cluster. The calculated size is usually smaller than the configured target size. This is to maximize the parallelism and avoid performance regression when enabling adaptive query execution. It's recommended to set this config to false and respect the configured target size. |
3.2.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(none) | The custom cost evaluator class to be used for adaptive execution. If not being set, Spark will use its own SimpleCostEvaluator by default. |
3.2.0 |
spark.sql.adaptive.enabled |
true | true の場合、アダプティブクエリ実行を有効にします。これにより、正確な実行時統計に基づいて、クエリ実行の途中でクエリプランが再最適化されます。 |
1.6.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | When true and 'spark.sql.adaptive.enabled' is true, Spark tries to use local shuffle reader to read the shuffle data when the shuffle partitioning is not needed, for example, after converting sort-merge join to broadcast-hash join. |
3.0.0 |
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0b | ローカルハッシュマップをビルドできるパーティションあたりの最大サイズをバイト単位で設定します。If this value is not smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes and all the partition size are not larger than this config, join selection prefer to use shuffled hash join instead of sort merge join regardless of the value of spark.sql.join.preferSortMergeJoin. |
3.2.0 |
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | When true and 'spark.sql.adaptive.enabled' is true, Spark will optimize the skewed shuffle partitions in RebalancePartitions and split them to smaller ones according to the target size (specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes'), to avoid data skew. |
3.2.0 |
spark.sql.adaptive.optimizer.excludedRules |
(none) | Configures a list of rules to be disabled in the adaptive optimizer, in which the rules are specified by their rule names and separated by comma. The optimizer will log the rules that have indeed been excluded. |
3.1.0 |
spark.sql.adaptive.skewJoin.enabled |
true | When true and 'spark.sql.adaptive.enabled' is true, Spark dynamically handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and replicating if needed) skewed partitions. |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5 | A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes' |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' multiplying the median partition size. 理想的には、この設定は'spark.sql.adaptive.advisoryPartitionSizeInBytes'より大きく設定されなければなりません。 |
3.0.0 |
spark.sql.ansi.enabled |
false | When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. For example, Spark will throw an exception at runtime instead of returning null results when the inputs to a SQL operator/function are invalid.For full details of this dialect, you can find them in the section "ANSI Compliance" of Spark's documentation. Some ANSI dialect features may be not from the ANSI SQL standard directly, but their behaviors align with ANSI SQL's style |
3.0.0 |
spark.sql.autoBroadcastJoinThreshold |
10MB | joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。Note that currently statistics are only supported for Hive Metastore tables where the command |
1.1.0 |
spark.sql.avro.compression.codec |
snappy | AVROファイルの書き込みで使われる圧縮コーディック。サポートされるコーディック: uncompressed, deflate, snappy, bzip2, xz および zstandard。デフォルトのコーディックは snappy です。 |
2.4.0 |
spark.sql.avro.deflate.level |
-1 | AVROファイルの書き込みで使われるdeflateコーディック圧縮レベル。有効な値は-1または1から10の範囲でなければなりません。デフォルト値は-1で、現在の実装での6レベルに対応します。 |
2.4.0 |
spark.sql.avro.filterPushdown.enabled |
true | When true, enable filter pushdown to Avro datasource. |
3.1.0 |
spark.sql.broadcastTimeout |
300 | ブロードキャストjoinでのブロードキャスト待ち時間のタイムアウト秒数。 |
1.3.0 |
spark.sql.bucketing.coalesceBucketsInJoin.enabled |
false | When true, if two bucketed tables with the different number of buckets are joined, the side with a bigger number of buckets will be coalesced to have the same number of buckets as the other side. Bigger number of buckets is divisible by the smaller number of buckets. Bucket coalescing is applied to sort-merge joins and shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling in join, but it also reduces parallelism and could possibly cause OOM for shuffled hash join. |
3.1.0 |
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio |
4 | The ratio of the number of two buckets being coalesced should be less than or equal to this value for bucket coalescing to be applied. This configuration only has an effect when 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' is set to true. |
3.1.0 |
spark.sql.catalog.spark_catalog |
(none) | A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'. |
3.0.0 |
spark.sql.cbo.enabled |
false | Enables CBO for estimation of plan statistics when set true. |
2.2.0 |
spark.sql.cbo.joinReorder.dp.star.filter |
false | Applies star-join filter heuristics to cost based join enumeration. |
2.2.0 |
spark.sql.cbo.joinReorder.dp.threshold |
12 | The maximum number of joined nodes allowed in the dynamic programming algorithm. |
2.2.0 |
spark.sql.cbo.joinReorder.enabled |
false | Enables join reorder in CBO. |
2.2.0 |
spark.sql.cbo.planStats.enabled |
false | When true, the logical plan will fetch row counts and column statistics from catalog. |
3.0.0 |
spark.sql.cbo.starSchemaDetection |
false | When true, it enables join reordering based on star schema detection. |
2.2.0 |
spark.sql.cli.print.header |
false | When set to true, spark-sql CLI prints the names of the columns in query output. |
3.2.0 |
spark.sql.columnNameOfCorruptRecord |
_corrupt_record | The name of internal column for storing raw/un-parsed JSON and CSV records that fail to parse. |
1.2.0 |
spark.sql.csv.filterPushdown.enabled |
true | When true, enable filter pushdown to CSV datasource. |
3.0.0 |
spark.sql.datetime.java8API.enabled |
false | If the configuration property is set to true, java.time.Instant and java.time.LocalDate classes of Java 8 API are used as external types for Catalyst's TimestampType and DateType. If it is set to false, java.sql.Timestamp and java.sql.Date are used for the same purpose. |
3.0.0 |
spark.sql.debug.maxToStringFields |
25 | Maximum number of fields of sequence-like entries can be converted to strings in debug output. Any elements beyond the limit will be dropped and replaced by a "... N more fields" placeholder. |
3.0.0 |
spark.sql.defaultCatalog |
spark_catalog | デフォルトのカタログの名前。This will be the current catalog if users have not explicitly set the current catalog yet. |
3.0.0 |
spark.sql.execution.arrow.enabled |
false | (Spark 3.0から非推奨です。'spark.sql.execution.arrow.pyspark.enabled'を設定してください。) |
2.3.0 |
spark.sql.execution.arrow.fallback.enabled |
true | (Spark 3.0から非推奨です。'spark.sql.execution.arrow.pyspark.fallback.enabled'を設定してください。) |
2.4.0 |
spark.sql.execution.arrow.maxRecordsPerBatch |
10000 | When using Apache Arrow, limit the maximum number of records that can be written to a single ArrowRecordBatch in memory. ゼロまたは負の値に設定された場合、制限はありません。 |
2.3.0 |
spark.sql.execution.arrow.pyspark.enabled |
(spark.sql.execution.arrow.enabled の値) |
When true, make use of Apache Arrow for columnar data transfers in PySpark. This optimization applies to: 1. pyspark.sql.DataFrame.toPandas 2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame The following data types are unsupported: ArrayType of TimestampType, and nested StructType. |
3.0.0 |
spark.sql.execution.arrow.pyspark.fallback.enabled |
(spark.sql.execution.arrow.fallback.enabled の値) |
When true, optimizations enabled by 'spark.sql.execution.arrow.pyspark.enabled' will fallback automatically to non-optimized implementations if an error occurs. |
3.0.0 |
spark.sql.execution.arrow.pyspark.selfDestruct.enabled |
false | (Experimental) When true, make use of Apache Arrow's self-destruct and split-blocks options for columnar data transfers in PySpark, when converting from Arrow to Pandas. これは、いくらかのCPU時間を使ってメモリの使用を減らします。This optimization applies to: pyspark.sql.DataFrame.toPandas when 'spark.sql.execution.arrow.pyspark.enabled' is set. |
3.2.0 |
spark.sql.execution.arrow.sparkr.enabled |
false | When true, make use of Apache Arrow for columnar data transfers in SparkR. This optimization applies to: 1. createDataFrame when its input is an R DataFrame 2. collect 3. dapply 4. gapply The following data types are unsupported: FloatType, BinaryType, ArrayType, StructType and MapType. |
3.0.0 |
spark.sql.execution.pandas.udf.buffer.size |
(spark.buffer.size の値) |
|
3.0.0 |
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled |
true | When true, the traceback from Python UDFs is simplified. It hides the Python worker, (de)serialization, etc from PySpark in tracebacks, and only shows the exception messages from UDFs. これはCPython 3.7+でしか動作しないことに注意してください。 |
3.1.0 |
spark.sql.execution.topKSortFallbackThreshold |
2147483632 | In SQL queries with a SORT followed by a LIMIT like 'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort in memory, otherwise do a global sort which spills to disk if necessary. |
2.4.0 |
spark.sql.files.ignoreCorruptFiles |
false | Whether to ignore corrupt files. true に設定された場合、Spark ジョブは破損したファイルが検出された時に引き続き実行され、読み取られた内容は引き続き返されます。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。 |
2.1.1 |
spark.sql.files.ignoreMissingFiles |
false | 不足しているファイルを無視するかどうか。true に設定された場合、Spark ジョブは不足したファイルが検出された時に引き続き実行され、読み取られた内容は引き続き返されます。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。 |
2.3.0 |
spark.sql.files.maxPartitionBytes |
128MB | ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。 |
2.0.0 |
spark.sql.files.maxRecordsPerFile |
0 | 1つのファイルに書き出されるレコードの最大数。この値がゼロまたは負の値の場合、制限はありません。 |
2.2.0 |
spark.sql.files.minPartitionNum |
(none) | 推奨される(保証されていない)分割ファイルパーティションの最小数。設定されない場合、デフォルト値は |
3.1.0 |
spark.sql.function.concatBinaryAsString |
false | このオプションがfalseに設定され、全ての入力がバイナリの場合、 |
2.3.0 |
spark.sql.function.eltOutputAsString |
false | このオプションがfalseに設定され、全ての入力がバイナリの場合、 |
2.3.0 |
spark.sql.groupByAliases |
true | trueの場合、selectリスト内のエイリアスをグループ句で使うことができます。falseの場合、解析例外が投げられます。 |
2.2.0 |
spark.sql.groupByOrdinal |
true | trueの場合、グループ句の序数はselectリスト内の位置として扱われます。falseの場合、序数は無視されます。 |
2.0.0 |
spark.sql.hive.convertInsertingPartitionedTable |
true | When set to true, and |
3.0.0 |
spark.sql.hive.convertMetastoreCtas |
true | trueに設定された場合、SparkはCTASのHive serdeの代わりに組み込みのデータソースライターを使おうとします。このフラグは、 |
3.0.0 |
spark.sql.hive.convertMetastoreOrc |
true | trueに設定された場合、組み込みのORCリーダーとライターがHive serdeの代わりにHiveQL構文を使って作成されたORCテーブルを処理するために使われます。 |
2.0.0 |
spark.sql.hive.convertMetastoreParquet |
true | trueに設定された場合、組み込みのParquetリーダーとライターがHive serdeの代わりにHiveQL構文を使って作成されたparquetテーブルを処理するために使われます。 |
1.1.1 |
spark.sql.hive.convertMetastoreParquet.mergeSchema |
false | When true, also tries to merge possibly different but compatible Parquet schemas in different Parquet data files. この設定は"spark.sql.hive.convertMetastoreParquet" がtrueの場合のみ効果があります。 |
1.3.1 |
spark.sql.hive.filesourcePartitionFileCacheSize |
262144000 | ゼロでは無い場合、メモリ内のパーティションファイルメタデータのキャッシュを有効にします。全てのテーブルはファイルメタデータについて指定されたバイト数まで使うことができるキャッシュを共有します。この設定は、hiveファイルソースパーティションの管理が有効な場合にのみ効果があります。 |
2.1.1 |
spark.sql.hive.manageFilesourcePartitions |
true | trueの場合、ファイルソーステーブルについてのメタストアパーティション管理も有効にします。これには、データソースと変換されたHiveテーブルの両方が含まれます。When partition management is enabled, datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning when spark.sql.hive.metastorePartitionPruning is set to true. |
2.1.1 |
spark.sql.hive.metastorePartitionPruning |
true | trueの場合、一部の述語はHiveメタストアにプッシュダウンされるため、一致しないパーティションを早期に排除できます。 |
1.5.0 |
spark.sql.hive.thriftServer.async |
true | trueに設定された場合、Hive Thrift サーバはSQLクエリを非同期に実行します。 |
1.5.0 |
spark.sql.hive.verifyPartitionPath |
false | trueの場合、HDFSに格納されているデータを読み込む時にテーブルのルートディレクトリの下にある全てのパーティションパスを確認します。この設定は将来のリリースで非推奨になり、spark.files.ignoreMissingFilesに置き換えられます。 |
1.4.0 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | カラムキャッシュのためのバッチのサイズを制御します。バッチのサイズを大きくするとメモリの利用率と圧縮が改善できますが、データをキャッシュする時にOOMのリスクがあります。 |
1.1.1 |
spark.sql.inMemoryColumnarStorage.compressed |
true | trueに設定した場合はSpark SQLはデータの統計に基づいて各カラムの圧縮コーディックを自動的に選択するでしょう。 |
1.0.1 |
spark.sql.inMemoryColumnarStorage.enableVectorizedReader |
true | カラム型キャッシュ用のベクトル化されたリーダーを有効にします。 |
2.3.1 |
spark.sql.json.filterPushdown.enabled |
true | trueの場合、JSONデータソースへのフィルタプッシュダウンを有効にします。 |
3.1.0 |
spark.sql.jsonGenerator.ignoreNullFields |
true | JSONデータソースおよびto_jsonなどのJSON関数でJSONオブジェクトを生成する時にnullフィールドを無視するかどうか。falseの場合、JSONオブジェクトのnullフィールドに対してnullを生成します。 |
3.0.0 |
spark.sql.leafNodeDefaultParallelism |
(none) | ファイルスキャンノード、ローカルデータスキャンノード、範囲ノードなどのデータを生成するSpark SQLリーフノードのデフォルトの並行処理。この設定のデフォルト値は'SparkContext#defaultParallelism'です。 |
3.2.0 |
spark.sql.mapKeyDedupPolicy |
EXCEPTION | 組み込み関数でマップキーを重複排除するポリシー: CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat、TransformKeys。EXCEPTION の場合、重複したマップキーが検出されるとクエリは失敗します。LAST_WIN の場合、最後に挿入されたマップキーが優先されます。 |
3.0.0 |
spark.sql.maven.additionalRemoteRepositories |
https://maven-central.storage-download.googleapis.com/maven2/ | オプションの追加のリモートMavenミラーリポジトリのカンマ区切りの文字列設定。これはデフォルトのMaven Centralリポジトリに到達できない場合に、IsolatedClientLoaderでHive jarをダウンロードするためにのみ使われます。 |
3.0.0 |
spark.sql.maxMetadataStringLength |
100 | メタデータ文字列に対して出力する最大文字数。例えば、 |
3.1.0 |
spark.sql.maxPlanStringLength |
2147483632 | プラン文字列に対して出力する最大文字数。プランが長い場合、それ以降の出力は切り捨てられます。デフォルト設定は常に完全なプランが生成されます。プラン文字列が大量のメモリを使用している場合、またはドライバやUIプロセスでOutOfMemoryエラーが発生している場合は、これを8kなどの低い値に設定します。 |
3.0.0 |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
true | trueの場合、結合キーとして使われるときにパーティション列の術語を生成します。 |
3.0.0 |
spark.sql.optimizer.enableCsvExpressionOptimization |
true | SQLオプティマイザでCSV式を最適化するかどうか。from_csvから不要な列を削除することも含まれます。 |
3.2.0 |
spark.sql.optimizer.enableJsonExpressionOptimization |
true | JSONオプティマイザでCSV式を最適化するかどうか。これには、from_jsonからの不要な列の削除、from_json + to_json, to_json + named_struct(from_json.col1, from_json.col2, ....) の簡略化が含まれます。 |
3.1.0 |
spark.sql.optimizer.excludedRules |
(none) | Configures a list of rules to be disabled in the optimizer, in which the rules are specified by their rule names and separated by comma. It is not guaranteed that all the rules in this configuration will eventually be excluded, as some rules are necessary for correctness. The optimizer will log the rules that have indeed been excluded. |
2.4.0 |
spark.sql.orc.columnarReaderBatchSize |
4096 | The number of rows to include in a orc vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data. |
2.4.0 |
spark.sql.orc.compression.codec |
snappy | Sets the compression codec used when writing ORC files. If either |
2.3.0 |
spark.sql.orc.enableNestedColumnVectorizedReader |
false | Enables vectorized orc decoding for nested column. |
3.2.0 |
spark.sql.orc.enableVectorizedReader |
true | Enables vectorized orc decoding. |
2.3.0 |
spark.sql.orc.filterPushdown |
true | When true, enable filter pushdown for ORC files. |
1.4.0 |
spark.sql.orc.mergeSchema |
false | When true, the Orc data source merges schemas collected from all data files, otherwise the schema is picked from a random data file. |
3.0.0 |
spark.sql.orderByOrdinal |
true | When true, the ordinal numbers are treated as the position in the select list. When false, the ordinal numbers in order/sort by clause are ignored. |
2.0.0 |
spark.sql.parquet.binaryAsString |
false | 他の幾つかのParquet生成システム、特にImpalaとSpark SQLの古いバージョンは、Parquetスキーマを書き出す時にバイナリデータと文字列の区別しません。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにバイナリデータを文字列として扱うように指示します。 |
1.1.1 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | The number of rows to include in a parquet vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data. |
2.4.0 |
spark.sql.parquet.compression.codec |
snappy | Parquetファイルを書き込む時に圧縮符号化を使うように設定します。 |
1.1.1 |
spark.sql.parquet.enableVectorizedReader |
true | Enables vectorized parquet decoding. |
2.0.0 |
spark.sql.parquet.filterPushdown |
true | trueに設定された場合は、Parquet filter push-down 最適化を有効化します。 |
1.2.0 |
spark.sql.parquet.int96AsTimestamp |
true | 幾つかのParquet生成システム、特にImparaは、タイムスタンプをINT96に格納します。ナノ秒項目の精度の喪失を避ける必要があるため、SparkもタイムスタンプをINT96で保持するでしょう。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにINT96データをタイムスタンプとして解釈するように指示します。 |
1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | This controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive & Spark. |
2.3.0 |
spark.sql.parquet.mergeSchema |
false | trueの場合、Parquetデータソースは全てのデータファイルから集められたスキーマをマージします。そうでなければ、スキーマは、サマリファイル、あるいはサマリファイルが利用できない場合はランダムデータファイルから取り出されます。 |
1.5.0 |
spark.sql.parquet.outputTimestampType |
INT96 | Sets which Parquet timestamp type to use when Spark writes data to Parquet files. INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS is a standard timestamp type in Parquet, which stores number of microseconds from the Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which means Spark has to truncate the microsecond portion of its timestamp value. |
2.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | If true, enables Parquet's native record-level filtering using the pushed down filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled and the vectorized reader is not used. You can ensure the vectorized reader is not used by setting 'spark.sql.parquet.enableVectorizedReader' to false. |
2.3.0 |
spark.sql.parquet.respectSummaryFiles |
false | When true, we make assumption that all part-files of Parquet are consistent with summary files and we will ignore them when merging schema. Otherwise, if this is false, which is the default, we will merge all part-files. This should be considered as expert-only option, and shouldn't be enabled before knowing what it means exactly. |
1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | もしtrueであれば、データはSpark1.4以前の方法で書き込まれるでしょう。例えば、小数値はApache Parquetの固定長のバイト配列形式で書き込まれるでしょう。これはApache HiveやApache Impalaのよゆな他のシステムが使います。falseであれば、Parquetの新しい形式が使われるでしょう。例えば、小数はintに基づいた形式で書き込まれるでしょう。Parquetの出力がこの新しい形式をサポートしないシステムと一緒に使うことを意図している場合は、trueに設定してください。 |
1.6.0 |
spark.sql.parser.quotedRegexColumnNames |
false | When true, quoted Identifiers (using backticks) in SELECT statement are interpreted as regular expressions. |
2.3.0 |
spark.sql.pivotMaxValues |
10000 | When doing a pivot without specifying values for the pivot column this is the maximum number of (distinct) values that will be collected without error. |
1.6.0 |
spark.sql.pyspark.jvmStacktrace.enabled |
false | When true, it shows the JVM stacktrace in the user-facing PySpark exception together with Python stacktrace. By default, it is disabled and hides JVM stacktrace and shows a Python-friendly exception only. |
3.0.0 |
spark.sql.redaction.options.regex |
(?i)url | Regex to decide which keys in a Spark SQL command's options map contain sensitive information. The values of options whose names that match this regex will be redacted in the explain output. This redaction is applied on top of the global redaction configuration defined by spark.redaction.regex. |
2.2.2 |
spark.sql.redaction.string.regex |
(value of spark.redaction.string.regex ) |
Regex to decide which parts of strings produced by Spark contain sensitive information. When this regex matches a string part, that string part is replaced by a dummy value. This is currently used to redact the output of SQL explain commands. When this conf is not set, the value from |
2.3.0 |
spark.sql.repl.eagerEval.enabled |
false | Enables eager evaluation or not. When true, the top K rows of Dataset will be displayed if and only if the REPL supports the eager evaluation. Currently, the eager evaluation is supported in PySpark and SparkR. In PySpark, for the notebooks like Jupyter, the HTML table (generated by repr_html) will be returned. For plain Python REPL, the returned outputs are formatted like dataframe.show(). In SparkR, the returned outputs are showed similar to R data.frame would. |
2.4.0 |
spark.sql.repl.eagerEval.maxNumRows |
20 | The max number of rows that are returned by eager evaluation. This only takes effect when spark.sql.repl.eagerEval.enabled is set to true. The valid range of this config is from 0 to (Int.MaxValue - 1), so the invalid config like negative and greater than (Int.MaxValue - 1) will be normalized to 0 and (Int.MaxValue - 1). |
2.4.0 |
spark.sql.repl.eagerEval.truncate |
20 | The max number of characters for each cell that is returned by eager evaluation. This only takes effect when spark.sql.repl.eagerEval.enabled is set to true. |
2.4.0 |
spark.sql.session.timeZone |
(value of local timezone) | The ID of session local timezone in the format of either region-based zone IDs or zone offsets. Region IDs must have the form 'area/city', such as 'America/Los_Angeles'. Zone offsets must be in the format '(+|-)HH', '(+|-)HH:mm' or '(+|-)HH:mm:ss', e.g '-08', '+01:00' or '-13:33:33'. また、'UTC'と'Z'は'+00:00'のエイリアスとしてサポートされています。Other short names are not recommended to use because they can be ambiguous. |
2.2.0 |
spark.sql.shuffle.partitions |
200 | The default number of partitions to use when shuffling data for joins or aggregations. Note: For structured streaming, this configuration cannot be changed between query restarts from the same checkpoint location. |
1.1.0 |
spark.sql.sources.bucketing.autoBucketedScan.enabled |
true | When true, decide whether to do bucketed scan on input tables based on query plan automatically. Do not use bucketed scan if 1. query does not have operators to utilize bucketing (e.g. join, group-by, etc), or 2. there's an exchange operator between these operators and table scan. Note when 'spark.sql.sources.bucketing.enabled' is set to false, this configuration does not take any effect. |
3.1.0 |
spark.sql.sources.bucketing.enabled |
true | When false, we will treat bucketed table as normal table |
2.0.0 |
spark.sql.sources.bucketing.maxBuckets |
100000 | The maximum number of buckets allowed. |
2.4.0 |
spark.sql.sources.default |
parquet | The default data source to use in input/output. |
1.3.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | The maximum number of paths allowed for listing files at driver side. If the number of detected paths exceeds this value during partition discovery, it tries to list the files with another Spark distributed job. この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。 |
1.5.0 |
spark.sql.sources.partitionColumnTypeInference.enabled |
true | When true, automatically infer the data types for partitioned columns. |
1.5.0 |
spark.sql.sources.partitionOverwriteMode |
STATIC | When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't affect Hive serde tables, as they are always overwritten with dynamic mode. This can also be set as an output option for a data source using key partitionOverwriteMode (which takes precedence over this setting), e.g. dataframe.write.option("partitionOverwriteMode", "dynamic").save(path). |
2.3.0 |
spark.sql.statistics.fallBackToHdfs |
false | When true, it will fall back to HDFS if the table statistics are not available from table metadata. This is useful in determining if a table is small enough to use broadcast joins. This flag is effective only for non-partitioned Hive tables. For non-partitioned data source tables, it will be automatically recalculated if table statistics are not available. For partitioned data source and partitioned Hive tables, It is 'spark.sql.defaultSizeInBytes' if table statistics are not available. |
2.0.0 |
spark.sql.statistics.histogram.enabled |
false | Generates histograms when computing column statistics if enabled. Histograms can provide better estimation accuracy. Currently, Spark only supports equi-height histogram. Note that collecting histograms takes extra cost. For example, collecting column statistics usually takes only one table scan, but generating equi-height histogram will cause an extra table scan. |
2.3.0 |
spark.sql.statistics.size.autoUpdate.enabled |
false | Enables automatic update for table size once table's data is changed. Note that if the total number of files of the table is very large, this can be expensive and slow down data change commands. |
2.3.0 |
spark.sql.storeAssignmentPolicy |
ANSI | When inserting a value into a column with different data type, Spark will perform type coercion. 現在のところ、型強制ルールの3つのポリシーがサポートされます: ANSI、legacy、strict。ANSI ポリシーでは、Spark は ANSI SQL に従って型強制を実行します。実際には、PostgreSQLとほとんど同じ挙動です。It disallows certain unreasonable type conversions such as converting |
3.0.0 |
spark.sql.streaming.checkpointLocation |
(none) | The default location for storing checkpoint data for streaming queries. |
2.0.0 |
spark.sql.streaming.continuous.epochBacklogQueueSize |
10000 | The max number of entries to be stored in queue to wait for late epochs. If this parameter is exceeded by the size of the queue, stream will stop with an error. |
3.0.0 |
spark.sql.streaming.disabledV2Writers |
A comma-separated list of fully qualified data source register class names for which StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks. |
2.3.1 | |
spark.sql.streaming.fileSource.cleaner.numThreads |
1 | Number of threads used in the file source completed file cleaner. |
3.0.0 |
spark.sql.streaming.forceDeleteTempCheckpointLocation |
false | When true, enable temporary checkpoint locations force delete. |
3.0.0 |
spark.sql.streaming.metricsEnabled |
false | Whether Dropwizard/Codahale metrics will be reported for active streaming queries. |
2.0.2 |
spark.sql.streaming.multipleWatermarkPolicy |
min | Policy to calculate the global watermark value when there are multiple watermark operators in a streaming query. The default value is 'min' which chooses the minimum watermark reported across multiple operators. Other alternative value is 'max' which chooses the maximum across multiple operators. Note: This configuration cannot be changed between query restarts from the same checkpoint location. |
2.4.0 |
spark.sql.streaming.noDataMicroBatches.enabled |
true | Whether streaming micro-batch engine will execute batches without data for eager state management for stateful streaming queries. |
2.4.1 |
spark.sql.streaming.numRecentProgressUpdates |
100 | The number of progress updates to retain for a streaming query |
2.1.1 |
spark.sql.streaming.stateStore.stateSchemaCheck |
true | When true, Spark will validate the state schema against schema on existing state and fail query if it's incompatible. |
3.1.0 |
spark.sql.streaming.stopActiveRunOnRestart |
true | Running multiple runs of the same streaming query concurrently is not supported. If we find a concurrent active run for a streaming query (in the same or different SparkSessions on the same cluster) and this flag is true, we will stop the old streaming query run to start the new one. |
3.0.0 |
spark.sql.streaming.stopTimeout |
0 | How long to wait in milliseconds for the streaming execution thread to stop when calling the streaming query's stop() method. 0 or negative values wait indefinitely. |
3.0.0 |
spark.sql.thriftServer.interruptOnCancel |
false | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. |
3.2.0 |
spark.sql.thriftServer.queryTimeout |
0ms | Set a query duration timeout in seconds in Thrift Server. If the timeout is set to a positive value, a running query will be cancelled automatically when the timeout is exceeded, otherwise the query continues to run till completion. If timeout values are set for each statement via |
3.1.0 |
spark.sql.thriftserver.scheduler.pool |
(none) | Set a Fair Scheduler pool for a JDBC client session. |
1.1.1 |
spark.sql.thriftserver.ui.retainedSessions |
200 | The number of SQL client sessions kept in the JDBC/ODBC web UI history. |
1.4.0 |
spark.sql.thriftserver.ui.retainedStatements |
200 | The number of SQL statements kept in the JDBC/ODBC web UI history. |
1.4.0 |
spark.sql.ui.explainMode |
formatted | Configures the query explain mode used in the Spark SQL UI. The value can be 'simple', 'extended', 'codegen', 'cost', or 'formatted'. The default value is 'formatted'. |
3.1.0 |
spark.sql.variable.substitute |
true | This enables substitution using syntax like |
2.0.0 |
Static SQL 設定
Static SQL configurations are cross-session, immutable Spark SQL configurations. They can be set with final values by the config file
and command-line options with --conf/-c
prefixed, or by setting SparkConf
that are used to create SparkSession
.
External users can query the static sql config values via SparkSession.conf
or via set command, e.g. SET spark.sql.extensions;
, but cannot set/unset them.
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.sql.cache.serializer |
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer | The name of a class that implements org.apache.spark.sql.columnar.CachedBatchSerializer. It will be used to translate SQL data into a format that can more efficiently be cached. The underlying API is subject to change so use with caution. Multiple classes cannot be specified. The class must have a no-arg constructor. |
3.1.0 |
spark.sql.event.truncate.length |
2147483647 | Threshold of SQL length beyond which it will be truncated before adding to event. Defaults to no truncation. If set to 0, callsite will be logged instead. |
3.0.0 |
spark.sql.extensions |
(none) | A comma-separated list of classes that implement Function1[SparkSessionExtensions, Unit] used to configure Spark Session extensions. The classes must have a no-args constructor. If multiple extensions are specified, they are applied in the specified order. For the case of rules and planner strategies, they are applied in the specified order. For the case of parsers, the last parser is used and each parser can delegate to its predecessor. For the case of function name conflicts, the last registered function name is used. |
2.2.0 |
spark.sql.hive.metastore.barrierPrefixes |
Spark SQLと通信をする各バージョンのHiveのために明示的にロードされなければならないクラスのプリフィックスのカンマ区切りのリスト。例えば、一般的なprefixで定義されたHive UDFは共有されるでしょう(例えば、 |
1.4.0 | |
spark.sql.hive.metastore.jars |
ビルトイン | HiveMetastoreClientをインスタンス化するために使われるべきjarの場所。This property can be one of four options: 1. "builtin" Use Hive 2.3.9, which is bundled with the Spark assembly when |
1.4.0 |
spark.sql.hive.metastore.jars.path |
HiveMetastoreClientをインスタンス化するために使われるjarのカンマ区切りのパス。この設定は、 |
3.1.0 | |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | Spark SQLとHiveの特定のバージョンの間で共有されるクラスローダを使ってロードされるべきカンマ区切りのクラスプリフィックスのリスト。共有されるべきクラスの例はJDBCドライバで、メタストアと対話するために必要とされます。共有される必要がある他のクラスは、既に共有されているクラスとやり取りするためのものです。例えば、log4jによって使われる独自のアペンダーです。 |
1.4.0 |
spark.sql.hive.metastore.version |
2.3.9 | Hiveメタストアのバージョン利用可能なオプションは、 |
1.4.0 |
spark.sql.hive.thriftServer.singleSession |
false | When set to true, Hive Thrift server is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. |
1.6.0 |
spark.sql.hive.version |
2.3.9 | The compiled, a.k.a, builtin Hive version of the Spark distribution bundled with. Note that, this a read-only conf and only used to report the built-in hive version. If you want a different metastore client for Spark to call, please refer to spark.sql.hive.metastore.version. |
1.1.1 |
spark.sql.metadataCacheTTLSeconds |
-1000ms | Time-to-live (TTL) value for the metadata caches: partition file metadata cache and session catalog cache. This configuration only has an effect when this value having a positive value (> 0). It also requires setting 'spark.sql.catalogImplementation' to |
3.1.0 |
spark.sql.queryExecutionListeners |
(none) | List of class names implementing QueryExecutionListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument. |
2.3.0 |
spark.sql.streaming.streamingQueryListeners |
(none) | List of class names implementing StreamingQueryListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument. |
2.4.0 |
spark.sql.streaming.ui.enabled |
true | Whether to run the Structured Streaming Web UI for the Spark application when the Spark Web UI is enabled. |
3.0.0 |
spark.sql.streaming.ui.retainedProgressUpdates |
100 | The number of progress updates to retain for a streaming query for Structured Streaming UI. |
3.0.0 |
spark.sql.streaming.ui.retainedQueries |
100 | The number of inactive queries to retain for Structured Streaming UI. |
3.0.0 |
spark.sql.ui.retainedExecutions |
1000 | Number of executions to retain in the Spark UI. |
1.5.0 |
spark.sql.warehouse.dir |
(value of $PWD/spark-warehouse ) |
The default location for managed databases and tables. |
2.0.0 |
Spark ストリーミング
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.streaming.backpressure.enabled |
false |
Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRate およびspark.streaming.kafka.maxRatePerPartition が設定されている場合に、この値で上限を制限されます (以下を見てください)。
|
1.5.0 |
spark.streaming.backpressure.initialRate |
not set | これはバックプレッシャー機構が有効な場合に最初のバッチのために各レシーバーがデータを受信する初期の最大受信レートです。 | 2.0.0 |
spark.streaming.blockInterval |
200ms | Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。See the performance tuning section in the Spark Streaming programming guide for more details. | 0.8.0 |
spark.streaming.receiver.maxRate |
not set | 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。See the deployment guide in the Spark Streaming programming guide for mode details. | 1.0.2 |
spark.streaming.receiver.writeAheadLog.enable |
false | レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。See the deployment guide in the Spark Streaming programming guide for more details. | 1.2.1 |
spark.streaming.unpersist |
true | Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。 | 0.9.0 |
spark.streaming.stopGracefullyOnShutdown |
false |
true の場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。
|
1.4.0 |
spark.streaming.kafka.maxRatePerPartition |
not set | 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。 | 1.3.0 |
spark.streaming.kafka.minRatePerPartition |
1 | 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最小レート(秒間あたりのレコード数)。 | 2.4.0 |
spark.streaming.ui.retainedBatches |
1000 | ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。 | 1.0.0 |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite |
false | ドライバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。ドライバー上のメタデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 | 1.6.0 |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite |
false | レシーバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。レシーバー上のデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 | 1.6.0 |
SparkR
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.r.numRBackendThreads |
2 | SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。 | 1.4.0 |
spark.r.command |
Rscript | ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。 | 1.5.3 |
spark.r.driver.command |
spark.r.command | ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。 | 1.5.3 |
spark.r.shell.command |
R |
ドライバーのためのクライアントモードのsparkRシェルの実行のための実行ファイル。クラスターモードでは無視されます。環境変数 SPARKR_DRIVER_R と同じですが、それより優先されます。spark.r.shell.command はsparkRシェルのために使われますが、spark.r.driver.command は R スクリプトを実行するために使われます。
|
2.1.0 |
spark.r.backendConnectionTimeout |
6000 | Rプロセスによって設定されるRBackendへの接続上の接続タイムアウトの秒数。 | 2.1.0 |
spark.r.heartBeatInterval |
100 | 接続タイムアウトを防ぐためにSparkRバックエンドからRプロセスに送信されるハートビートの間隔。 | 2.1.0 |
GraphX
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.graphx.pregel.checkpointInterval |
-1 | Pregelでのグラフとメッセージのためのチェックポイント間隔。多くの繰り返しの連鎖による stackOverflowError を避けるために使われます。デフォルトではチェックポイントは無効です。 | 2.2.0 |
配備
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.deploy.recoveryMode |
NONE | クラスターモードでサブミットされたSparkジョブが失敗し再起動する場合に、回復の設定をするリカバリモード。スタンダードあるいはMesosで動いている場合にクラスタモードでのみ適用可能です。 | 0.8.1 |
spark.deploy.zookeeper.url |
None | `spark.deploy.recoveryMode` が ZOOKEEPER に設定されている場合は、この設定は接続するためのzookeeperのURLに設定するために使われます。 | 0.8.1 |
spark.deploy.zookeeper.dir |
None | `spark.deploy.recoveryMode` がZOOKEEPERに設定されている場合は、この設定はリカバリー状態を保持するためのzookeeperディレクトリを設定するために使われます。 | 0.8.1 |
クラスタマネージャー
Sparkの各クラスタマネージャーは追加の設定オプションを持ちます。各ノードのためのページ上で設定を見つけることができます。
YARN
Mesos
Kubernetes
スタンドアローンモード
環境変数
あるSpark設定は環境変数によって設定することができ、それらはSparkがインストールされたディレクトリ内のconf/spark-env.sh
スクリプトによって使われます(Windows上ではconf/spark-env.cmd
)。スタンドアローンおよびMesosモードでは、このファイルはホスト名のようなマシーン固有の情報を渡すことができるでしょう。ローカルのアプリケーションの実行あるいはスクリプトのサブミットの場合、それは開始場所にもなります。
conf/spark-env.sh
はSparkがインストールされた場合はデフォルトでは存在しないことに注意してください。しかし、それを作成するためにconf/spark-env.sh.template
をコピーすることができます。コピーを実行可能にすることを忘れないでください。
以下の変数はspark-env.sh
の中で設定することができます:
環境変数 | 意味 |
---|---|
JAVA_HOME |
Javaがインストールされた場所(デフォルトのPATH 上に無い場合)。 |
PYSPARK_PYTHON |
Python binary executable to use for PySpark in both driver and workers (default is python3 if available, otherwise python ). 設定されている場合はプロパティspark.pyspark.python が優先されます。 |
PYSPARK_DRIVER_PYTHON |
PySparkのためにドライバの中でのみ使うPythonの実行可能バイナリ(デフォルトはPYSPARK_PYTHON です)。設定されている場合はプロパティspark.pyspark.driver.python が優先されます。 |
SPARKR_DRIVER_R |
SparkRシェルのために使われるRバイナリ実行ファイル(デフォルトは R )。設定されている場合はプロパティspark.r.shell.command が優先されます。 |
SPARK_LOCAL_IP |
バインドするマシーンのIPアドレス。 |
SPARK_PUBLIC_DNS |
Sparkプログラムが他のマシーンに知らせるホスト名。 |
上に加えて、各マシーンで使うコアの数や最大メモリのような、Sparkスタンドアローン クラスタ スクリプトを設定するためのオプションもあります。
spark-env.sh
はシェルスクリプトなので、それらのいくつかはプログラム的に設定することができます - 例えば、特定のネットワークインタフェースのIPを調べることでSPARK_LOCAL_IP
を計算することができるかもしれません。
注意: cluster
モードでYARN上のSparkを実行する場合は、 conf/spark-defaults.conf
ファイル内のspark.yarn.appMasterEnv.[EnvironmentVariableName]
を使って環境変数が設定される必要があります。spark-env.sh
に設定されている環境変数は、cluster
モードのYARN マスタープロセス内には反映されないでしょう。詳細についてはYARNに関係する Spark プロパティ を見てください。
ログの設定
Sparkはログのためにlog4jを使います。conf
ディレクトリに log4j.properties
ファイルを追加することで設定することができます。開始する一つの方法は、そこに存在する既存のlog4j.properties.template
をコピーすることです。
By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): mdc.taskName
, which shows something
like task 1.0 in stage 0.0
. You can add %X{mdc.taskName}
to your patternLayout in
order to print it in the logs.
Moreover, you can use spark.sparkContext.setLocalProperty(s"mdc.$name", "value")
to add user specific data into MDC.
The key in MDC will be the string of “mdc.$name”.
設定ディレクトリの上書き
デフォルトの"SPARK_HOME/conf"以外に異なる設定ディレクトリを指定するために、SPARK_CONF_DIRを設定することができます。Sparkはこのディレクトリから設定ファイル(spark-defaults.conf, spark-env.sh, log4j.properties など)を使用するでしょう。
Hadoopクラスタ設定の継承
Sparkを使ってHDFSから読み込みをするつもりであれば、Sparkのクラスパス上に含まれるべき二つのHadoop設定ファイルがあります。
hdfs-site.xml
は、HDFSクライアントのデフォルトの挙動を提供します。core-site.xml
は、デフォルトのファイル名を設定します。
これらの設定ファイルの場所はHadoopおよびHDPバージョンによって変わりますが、一般的な場所は/etc/hadoop/conf
の中です。幾つかのツールはその場で設定を生成しますが、それらのコピーをダウンロードするための仕組みを提供します。
それらのファイルがSparkに見えるようにするためには、$SPARK_HOME/conf/spark-env.sh
内のHADOOP_CONF_DIR
を設定ファイルを含む場所に設定します。
独自の Hadoop/Hive 設定
もしSparkアプリケーションがHadoop, Hive あるいは両方とやり取りをする場合、Sparkのクラスパス内におそらくHadoop/Hiveの設定ファイルがあります。
複数の実行中のアプリケーションは異なるHadoop/Hiveクライアント側の設定を必要とするかもしれません。各アプリケーションごとにSparkのクラスパス内の hdfs-site.xml
, core-site.xml
, yarn-site.xml
, hive-site.xml
をコピーし修正することができます。YARN上で実行中のSparkクラスタ内ではこれらの設定ファイルはクラスタ全体で設定され、アプリケーションによって安全に変更することができません。
The better choice is to use spark hadoop properties in the form of spark.hadoop.*
, and use
spark hive properties in the form of spark.hive.*
.
For example, adding configuration “spark.hadoop.abc.def=xyz” represents adding hadoop property “abc.def=xyz”,
and adding configuration “spark.hive.abc=xyz” represents adding hive property “hive.abc=xyz”.
They can be considered as same as normal spark properties which can be set in $SPARK_HOME/conf/spark-defaults.conf
時には、ある設定をSparkConf
にハードコーディングしたくないかも知れません。For instance, Spark allows you to simply create an empty conf and set spark/spark hadoop/spark hive properties.
また、実行時に設定を修正あるいは追加することができます:
カスタム リソース スケジューリングと設定の概要
GPUs and other accelerators have been widely used for accelerating special workloads, e.g., deep learning and signal processing. Spark now supports requesting and scheduling generic resources, such as GPUs, with a few caveats. The current implementation requires that the resource have addresses that can be allocated by the scheduler. It requires your cluster manager to support and be properly configured with the resources.
There are configurations available to request resources for the driver: spark.driver.resource.{resourceName}.amount
, request resources for the executor(s): spark.executor.resource.{resourceName}.amount
and specify the requirements for each task: spark.task.resource.{resourceName}.amount
. The spark.driver.resource.{resourceName}.discoveryScript
config is required on YARN, Kubernetes and a client side Driver on Spark Standalone. spark.executor.resource.{resourceName}.discoveryScript
config is required for YARN and Kubernetes. Kubernetes also requires spark.driver.resource.{resourceName}.vendor
and/or spark.executor.resource.{resourceName}.vendor
. See the config descriptions above for more information on each.
Spark will use the configurations specified to first request containers with the corresponding resources from the cluster manager. Once it gets the container, Spark launches an Executor in that container which will discover what resources the container has and the addresses associated with each resource. The Executor will register with the Driver and report back the resources available to that Executor. The Spark scheduler can then schedule tasks to each Executor and assign specific resource addresses based on the resource requirements the user specified. The user can see the resources assigned to a task using the TaskContext.get().resources
api. On the driver, the user can see the resources assigned with the SparkContext resources
call. It’s then up to the user to use the assignedaddresses to do the processing they want or pass those into the ML/AI framework they are using.
See your cluster manager specific page for requirements and details on each of - YARN, Kubernetes and Standalone Mode. It is currently not available with Mesos or local mode. And please also note that local-cluster mode with multiple workers is not supported(see Standalone documentation).
ステージレベルのスケジューリングの概要
The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run. This is only available for the RDD API in Scala, Java, and Python. It is available on YARN and Kubernetes when dynamic allocation is enabled. See the YARN page or Kubernetes page for more implementation details.
See the RDD.withResources
and ResourceProfileBuilder
API’s for using this feature. The current implementation acquires new executors for each ResourceProfile
created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config spark.scheduler.resource.profileMergeConflicts
to control that behavior. The current merge strategy Spark implements when spark.scheduler.resource.profileMergeConflicts
is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.
プッシュベースのシャッフルの概要
Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.
Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.
Currently push-based shuffle is only supported for Spark on YARN with external shuffle service.
外部のシャッフルサービス(サーバ)側の設定オプション
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.shuffle.push.server.mergedShuffleFileManagerImpl |
org.apache.spark.network.shuffle.
|
Class name of the implementation of MergedShuffleFileManager that manages push-based shuffle. This acts as a server side config to disable or enable push-based shuffle. By default, push-based shuffle is disabled at the server side. To enable push-based shuffle on the server side, set this config to |
3.2.0 |
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile |
2m |
The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during push-based shuffle. A merged shuffle file consists of multiple small shuffle blocks. Fetching the complete merged shuffle file in a single disk I/O increases the memory requirements for both the clients and the external shuffle services. Instead, the external shuffle service serves the merged file in Setting this too high would increase the memory requirements on both the clients and the external shuffle service. Setting this too low would increase the overall number of RPC requests to external shuffle service unnecessarily. |
3.2.0 |
spark.shuffle.push.server.mergedIndexCacheSize |
100m |
The maximum size of cache in memory which could be used in push-based shuffle for storing merged index files. This cache is in addition to the one configured via spark.shuffle.service.index.cache.size .
|
3.2.0 |
クライアント側の設定オプション
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.shuffle.push.enabled |
false |
Set to true to enable push-based shuffle on the client side and works in conjunction with the server side flag spark.shuffle.push.server.mergedShuffleFileManagerImpl .
|
3.2.0 |
spark.shuffle.push.finalize.timeout |
10s |
The amount of time driver waits in seconds, after all mappers have finished for a given shuffle map stage, before it sends merge finalize requests to remote external shuffle services. This gives the external shuffle services extra time to merge blocks. Setting this too long could potentially lead to performance regression. | 3.2.0 |
spark.shuffle.push.maxRetainedMergerLocations |
500 |
Maximum number of merger locations cached for push-based shuffle. Currently, merger locations are hosts of external shuffle services responsible for handling pushed blocks, merging them and serving merged blocks for later shuffle fetch. | 3.2.0 |
spark.shuffle.push.mergersMinThresholdRatio |
0.05 |
Ratio used to compute the minimum number of shuffle merger locations required for a stage based on the number of partitions for the reducer stage. For example, a reduce stage which has 100 partitions and uses the default value 0.05 requires at least 5 unique merger locations to enable push-based shuffle. | 3.2.0 |
spark.shuffle.push.mergersMinStaticThreshold |
5 |
The static threshold for number of shuffle push merger locations should be available in order to enable push-based shuffle for a stage. Note this config works in conjunction with spark.shuffle.push.mergersMinThresholdRatio . Maximum of spark.shuffle.push.mergersMinStaticThreshold and spark.shuffle.push.mergersMinThresholdRatio ratio number of mergers needed to enable push-based shuffle for a stage. For example: with 1000 partitions for the child stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would need at least 50 mergers to enable push-based shuffle for that stage.
|
3.2.0 |
spark.shuffle.push.maxBlockSizeToPush |
1m |
The max size of an individual block to push to the remote external shuffle services. Blocks larger than this threshold are not pushed to be merged remotely. These shuffle blocks will be fetched in the original manner. Setting this too high would result in more blocks to be pushed to remote external shuffle services but those are already efficiently fetched with the existing mechanisms resulting in additional overhead of pushing the large blocks to remote external shuffle services. It is recommended to set Setting this too low would result in lesser number of blocks getting merged and directly fetched from mapper external shuffle service results in higher small random reads affecting overall disk I/O performance. |
3.2.0 |
spark.shuffle.push.maxBlockBatchSize |
3m |
The max size of a batch of shuffle blocks to be grouped into a single push request. Default is set to 3m in order to keep it slightly higher than spark.storage.memoryMapThreshold default which is 2m as it is very likely that each batch of block gets memory mapped which incurs higher overhead.
|
3.2.0 |