Sparkの設定

Sparkはシステムを設定するための3つの場所を提供します:

Sparkのプロパティ

Spark プロパティはほとんどのアプリケーションの設定を制御し、各アプリケーションに対して別々に設定されます。これらのプロパティは SparkContextに渡して直接SparkConf に設定することができます。SparkConfを使って共通プロパティの幾つかを設定(例えば、マスターURLおよびアプリケーション名)することができ、set()メソッドを使って任意のキー値ペアを設定することができます。例えば、以下のように2つのスレッドを使ってアプリケーションを初期化することができます:

local[2]をつけて実行、2つのスレッド - "最小限"の並列処理、することにより、分散コンテキストで実行した場合にのみ存在するバグを検出するのに役立つことに注意してください。

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

ローカルモードで1つ以上のスレッドを持つことができ、実際にSpark Streamingのような場合にはリソース不足を避けるために1つ以上のスレッドを要求するかも知れないということに注意してください。

なんらかの期間を指定するプロパティは時間の単位で設定されなければなりません。以下の書式が受け付けられます:

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

バイトサイズを指定するプロパティはサイズの単位を使って設定されなければなりません。以下の書式が受け付けられます:

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

単位が無い数値は一般的にバイトとして解釈されますが、2,3のものはKiBあるいはMiBとして解釈されます。個々の設定プロパティのドキュメントを見てください。可能な箇所では単位の指定が望ましいです。

Sparkプロパティの動的なロード

時には、ある設定をSparkConfにハードコーディングしたくないかも知れません。例えば、もし同じアプリケーションを異なるマスターあるいは異なるメモリ量で実行したいなど。Sparkは空のconfを単純に作成することができます:

val sc = new SparkContext(new SparkConf())

そうすると、実行時に設定値を提供することができます:

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark シェルおよび spark-submit ツールは動的に設定をロードする2つの方法を提供します。最初のものは上で示されたように、--masterのようなコマンドラインのオプションです。spark-submit--conf/-c フラグを使ってどのようなSpark プロパティも受け付けることができますが、Sparkアプリケーションを起動する時に役割があるプロパティのために特別なフラグを使うようにしてください。./bin/spark-submit --helpを実行すると、これらのオプションの完全なリストが表示されるでしょう。

bin/spark-submitは設定オプションを conf/spark-defaults.confからも読み込むでしょう。各行はホワイトスペースで区切られたキーと値です。例えば:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

フラグで指定された、あるいはプロパティファイルの全ての値はアプリケーションに渡され、SparkConfを通じてマージされるでしょう。SparkConfで直接設定されたプロパティは優先度が高く、その次はspark-submit あるいは spark-shellに渡されたもので、その次はspark-defaults.conf ファイル内のオプションです。以前のSparkのバージョンから2,3の設定キーの名前が変更されました; そのような場合、古いキーの名前はまだ受け付けられますが、新しいキーのどのインスタンスよりも優先度が低くなります。

Sparkのプロパティは主に2つの種類に分けることができます; 1つは “spark.driver.memory”, “spark.executor.instances” のような配備に関係します。この種類のプロパティはプログラム的に実行時にSparkConfを使って設定する時に影響を受けないでしょう。あるいはその挙動はどのクラスタマネージャと配備モードを選択したかに依存するため、設定ファイルまたは spark-submit コマンドラインのオプションを使って設定することが望まれます; もう1つは “spark.task.maxFailures” のような主にSparkのruntimeの制御に関係し、この種類のプロパティはどちらかの方法で設定することができます。

Sparkプロパティのビュー

http://<driver>:4040 のアプリケーションUIは、"Environment"タブの中でSparkプロパティをリスト表示します。ここはプロパティが正しく設定されたか確認するのに便利な場所です。spark-defaults.conf, SparkConf, あるいはコマンドラインで明示的に指定された値だけが表示されるだろうことに注意してください。他の全ての設定プロパティについては、デフォルトの値が使われると見なすことができます。

利用可能なプロパティ

内部設定を制御するほとんどのプロパティは意味のあるデフォルト値を持ちます。最も一般的なオプションの幾つかは以下のように設定されます。

アプリケーションのプロパティ

プロパティ名デフォルト意味これ以降のバージョンから
spark.app.name (none) アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。 0.9.0
spark.driver.cores 1 ドライバープロセスのために使われるコアの数。クラスターモードのみです。 1.3.0
spark.driver.maxResultSize 1g 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズのバイト数の制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。 1.2.0
spark.driver.memory 1g ドライバープロセス、つまりSparkContextが初期化される時に使用されるメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
1.1.1
spark.driver.memoryOverhead 最小384で、driverMemory * 0.10 クラスタモード時のドライバプロセスごとに割り当てられる非ヒープメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはコンテナのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところ YARN、Kubernetes、Kubernetes でサポートされます。注意: オフヒープメモリを含む非ヒープ(spark.memory.offHeap.enabled=true の時)、他のドライバプロセスで使われるメモリ(例えば、PySparkドライバと対になるpythonプロセス)、同じコンテナ内で実行中の他の非ドライバプロセスによって使われるメモリ。実行中のドライバのコンテナの最大メモリサイズは、spark.driver.memoryOverheadspark.driver.memory の合計によって決定されます。 2.3.0
spark.driver.resource.{resourceName}.amount 0 ドライバで使用する特定のリソース型の量。これを使う場合、ドライバが起動時のリソースを見つけられるように、spark.driver.resource.{resourceName}.discoveryScript も指定する必要があります。 3.0.0
spark.driver.resource.{resourceName}.discoveryScript None ドライバが特定のリソース型を見つけられるように実行するスクリプト。これは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、名前とアドレスの配列を持ちます。クライアントがサブミットしたドライバのために、ディスカバリ スクリプトは同じホスト上の他のドライバと比較して異なるリソースアドレスをこのドライバに割り当てる必要があります。 3.0.0
spark.driver.resource.{resourceName}.vendor None ドライバで使用するリソースのベンダー。このオプションは現在のところKubernetes上でのみサポートされ、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインです。(例えば、Kubernetes 上の GPU については、この設定は nvidia.com または amd.com に設定されます) 3.0.0
spark.resources.discoveryPlugin org.apache.spark.resource.ResourceDiscoveryScriptPlugin アプリケーションへ読み込まれる org.apache.spark.api.resource.ResourceDiscoveryPlugin を実装するクラス名のカンマ区切りのリスト。これは上級ユーザが独自の実装を持つリソースディスカバリクラスを置き換えるためのものです。Spark は、指定された各クラスを、それらの1つがそのリソースの情報を返すまで試行します。It tries the discovery script last if none of the plugins return information for that resource. 3.0.0
spark.executor.memory 1g executorプロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。 0.7.0
spark.executor.pyspark.memory 設定無し executorごとにPySparkに割り当てられるメモリ量。別に指定されていなければMiB単位。設定された場合、executorのためのPySparkメモリはこの量に制限されるでしょう。設定されない場合、SparkはPythonのメモリ使用を制限せず、他の非JVMプロセスと共有されるオーバーヘッドのメモリ空間を超えないようにするのはアプリケーションの責任です。PySparkがYARNあるいはKubernetes内で実行される場合、子のメモリはexecutorのリソースリクエストに追加されます。
注意: この機能はPythonの `resource` モジュールに依存します; 従って、その挙動と制限が継承されます。例えば、Windowsはリソースの制限をサポートせず、MacOS 上では実際のリソースは制限されません。
2.4.0
spark.executor.memoryOverhead 最小384で、executorMemory * 0.10。 クラスタモード時の executor プロセスごとに割り当てられる追加のメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはexecutorのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところYARNとKubernetesでサポートされます。
Note: Additional memory includes PySpark executor memory (when spark.executor.pyspark.memory is not configured) and memory used by other non-executor processes running in the same container. The maximum memory size of container to running executor is determined by the sum of spark.executor.memoryOverhead, spark.executor.memory, spark.memory.offHeap.size and spark.executor.pyspark.memory.
2.3.0
spark.executor.resource.{resourceName}.amount 0 executor プロセスごとに使用する特定のリソース型の量。これを使う場合、executor が起動時のリソースを見つけられるように、spark.executor.resource.{resourceName}.discoveryScript も指定する必要があります。 3.0.0
spark.executor.resource.{resourceName}.discoveryScript None executor が特定のリソース型を見つけられるように実行するスクリプト。これは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、名前とアドレスの配列を持ちます。 3.0.0
spark.executor.resource.{resourceName}.vendor None executor で使用するリソースのベンダー。このオプションは現在のところKubernetes上でのみサポートされ、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインです。(例えば、Kubernetes 上の GPU については、この設定は nvidia.com または amd.com に設定されます) 3.0.0
spark.extraListeners (none) SparkListenerを実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。 1.3.0
spark.local.dir /tmp Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。
注意: これは、クラスタマネージャーによって設定される、SPARK_LOCAL_DIRS (Standalone)、MESOS_SANDBOX (Mesos) あるいは LOCAL_DIRS (YARN) 環境変数によって上書きされます。
0.5.0
spark.logConf false SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。 0.9.0
spark.master (none) 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。 0.9.0
spark.submit.deployMode (none) Sparkドライバプログラムの配備モード、"client"あるいは"cluster"、これはドライバープログラムをローカル("client")あるいはクラスタ内のノードの1つの上で遠隔で("cluster")起動することを意味します。 1.5.0
spark.log.callerContext (none) Yarn/HDFS上で動作している時に Yarn RM log/HDFS audit ログに書き込まれるアプリケーションの情報。その長さはhadoop.caller.context.max.sizeのHadoop設定に依存します。簡潔でなければならず、一般的に50文字まで持つことができます。 2.2.0
spark.driver.supervise false trueであれば、非0の終了ステータスで失敗した場合にドライバは自動的に再起動します。Spark スタンドアローン モードあるいは Mesos クラスタ配備モードでのみ効果があります。 1.3.0
spark.driver.log.dfsDir (none) spark.driver.log.persistToDfs.enabled が true であれば、Spark ドライバのログが同期されるベースディレクトリ。このベースディレクトリ内で、各アプリケーションはアプリケーション固有のファイルへドライバログを記録します。ユーザは、これを HDFS ディレクトリのような統一された場所に設定して、後で使用する前にドライバログファイルを保持できるようにすることができます。このディレクリは、全ての Spark ユーザがファイルを読み書きでき、Spark履歴サーバユーザがファイルを削除できる必要があります。さらに、spark.history.fs.driverlog.cleaner.enabled が true で、spark.history.fs.driverlog.cleaner.maxAge で設定される最大の古いさよりも古い場合に、個のディレクトリの古いログは Spark 履歴サーバ によって消去されます。 3.0.0
spark.driver.log.persistToDfs.enabled false true の場合、クライアントモードで実行中の spark アプリケーションはドライバーログをspark.driver.log.dfsDir で設定される永続化ストレージに書き込みます。もし spark.driver.log.dfsDir が設定されていない場合は、ドライバーログは永続化されません。さらに、spark.history.fs.driverlog.cleaner.enabled を true に設定することで、Spark 履歴サーバ でクリーナーを有効にします。 3.0.0
spark.driver.log.layout %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n spark.driver.log.dfsDir に同期されるドライバーログのレイアウト。設定されていない場合、log4j.properties で定義される最初のアペンダーのレイアウトが使われます。それも設定されていない場合、ドライバーログはデフォルトのレイアウトを使います。 3.0.0
spark.driver.log.allowErasureCoding false Whether to allow driver logs to use erasure coding. On HDFS, erasure coded files will not update as quickly as regular replicated files, so they make take longer to reflect changes written by the application. Note that even if this is true, Spark will still not force the file to use erasure coding, it will simply use file system defaults. 3.0.0

これとは別に、以下のプロパティも利用可能で、ある状況では有用かも知れません。

ランタイム環境

プロパティ名デフォルト意味これ以降のバージョンから
spark.driver.extraClassPath (none) ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
1.0.0
spark.driver.defaultJavaOptions (none) spark.driver.extraJavaOptions に追加されるデフォルトの JVM オプションの文字列。これは管理者によって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memoryを、クライアントモードでは--driver-memoryコマンドラインオプションを使って設定することができます。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
3.0.0
spark.driver.extraJavaOptions (none) ドライバに渡す特別なJVMオプションの文字列。これはユーザによって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memoryを、クライアントモードでは--driver-memory コマンドラインオプションを使って設定することができます。
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。spark.driver.defaultJavaOptions がこの設定に追加されます。
1.0.0
spark.driver.extraLibraryPath (none) ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
1.0.0
spark.driver.userClassPathFirst false (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。 1.3.0
spark.executor.extraClassPath (none) executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。 1.0.0
spark.executor.defaultJavaOptions (none) spark.executor.extraJavaOptions に追加されるデフォルトの JVM オプションの文字列。これは管理者によって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。シンボルが続く場合、手を加えられるでしょう: アプリケーションIDによって置き換えられ、executor IDによって置き換えられるでしょう。例えば、/tmp 内にappのexecutor IDの名前をとるファイルへの冗長なgcログを有効にするには、以下の 'value' を渡します: -verbose:gc -Xloggc:/tmp/-.gc 3.0.0
spark.executor.extraJavaOptions (none) executorに渡すための特別なJVMオプションの文字列これはユーザによって設定されることを目的としています。例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。シンボルが続く場合、手を加えられるでしょう: アプリケーションIDによって置き換えられ、executor IDによって置き換えられるでしょう。例えば、/tmp 内にappのexecutor IDの名前をとるファイルへの冗長なgcログを有効にするには、以下の 'value' を渡します: -verbose:gc -Xloggc:/tmp/-.gc spark.executor.defaultJavaOptions がこの設定に追加されます。 1.0.0
spark.executor.extraLibraryPath (none) executor JVMを起動する場合に使う特別なライブラリパスを設定する。 1.0.0
spark.executor.logs.rolling.maxRetainedFiles (none) システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。 1.1.0
spark.executor.logs.rolling.enableCompression false executor ログの圧縮を有効にします。有効な場合、ロールされたexecutorログは圧縮されるでしょう。デフォルトは無効です。 2.0.2
spark.executor.logs.rolling.maxSize (none) executorのログがロールオーバーされる最大のファイルサイズをバイトで設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。 1.4.0
spark.executor.logs.rolling.strategy (none) executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するために>spark.executor.logs.rolling.maxSize を使用します。 1.1.0
spark.executor.logs.rolling.time.interval daily executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdaily, hourly, minutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFilesを見てください。 1.1.0
spark.executor.userClassPathFirst false (実験的なもの) spark.driver.userClassPathFirstと同じ機能ですが、executorインスタンスに適用されます。 1.3.0
spark.executorEnv.[EnvironmentVariableName] (none) EnvironmentVariableNameによって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。 0.9.0
spark.redaction.regex (?i)secret|password|token ドライバおよびexecutor環境の中で、Spark設定プロパティと環境変数のどちらが保護必要情報を持つかを決定する正規表現。この正規表現がプロパティのキーあるいは値に一致した場合、その値は環境のUIおよびYARNとイベントログのような様々なログから作成されます。 2.1.2
spark.python.profile false Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles()によって表示されるか、ドライバが終了する前に表示されるでしょう。sc.dump_profiles(path)によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。 1.2.0
spark.python.profile.dump (none) ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはpstats.Stats()によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。 1.2.0
spark.python.worker.memory 512m 集約の間にpythonワーカープロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。 1.1.0
spark.python.worker.reuse true Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。 1.2.0
spark.files 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。globも可能です。 1.0.0
spark.submit.pyFiles PythonアプリのためのPYTHONPATH上に配置されるカンマ区切りの .zip, .egg あるいは .py ファイル。globも可能です。 1.0.1
spark.jars ドライバーとexecutorのクラスパス上でインクルードするjarのカンマ区切りのリスト。globも可能です。 0.9.0
spark.jars.packages ドライバーとexecutorのクラスパス上でインクルードされる Maven coordinateのjarのカンマ区切りのリストcoordinateは groupId:artifactId:version でなければなりません。spark.jars.ivySettingsが指定された場合、artifacts はファイル内の設定に応じて解決されるでしょう。そうでなければ artifacts はローカルのmavenリポジトリ、次にmavenセントラル、最後にコマンドラインオプション--repositoriesによって指定される追加のリモートリポジトリから探されるでしょう。詳細は、上級の依存管理を見てください。 1.5.0
spark.jars.excludes spark.jars.packagesで与えらえた依存を解決する間に依存性の衝突を避けるために除外する、groupId:artifactId のカンマ区切りのリスト。 1.5.0
spark.jars.ivy Ivyユーザディレクトリを指定するパス。spark.jars.packagesからローカルのIvyキャッシュおよびパッケージファイルのために使われます。これはデフォルトが ~/.ivy2 の Ivyプロパティ ivy.default.ivy.user.dir を上書くでしょう。 1.3.0
spark.jars.ivySettings mavenセントラルのような組み込みのデフォルトの代わりに、spark.jars.packages を使って指定されるjarの解決を独自に行うためのIvy設定ファイルへのパス。コマンドラインオプション--repositoriesあるいはspark.jars.repositories によって指定される追加のリポジトリも含まれるでしょう。例えば Artifactory のような社内のartifactサーバを使って、ファイアウォールの背後からSparkがartifactを解決できるようにするのに便利です。ファイルフォーマットの設定の詳細は設定ファイルで見つけることができます 2.2.0
spark.jars.repositories --packages あるいは spark.jars.packagesを使って与えられるmaven coordinateのために検索される、カンマ区切りの追加のリモートリポジトリ。 2.3.0
spark.pyspark.driver.python ドライバ内でPySparkが使うPythonバイナリ実行ファイル。(デフォルトは spark.pyspark.pythonです) 2.1.0
spark.pyspark.python ドライバと実行ファイルの両方の中でPySparkが使うPythonバイナリ実行ファイル。 2.1.0

シャッフルの挙動

プロパティ名デフォルト意味これ以降のバージョンから
spark.reducer.maxSizeInFlight 48m 各reduceタスクから同時に取り出すmap出力の最大サイズ。別に指定されていなければMiB単位。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。 1.4.0
spark.reducer.maxReqsInFlight Int.MaxValue この設定はリモートリクエストの数を指定された値までブロックの検索を制限します。クラスタ内のホストの数が増えた場合は、1つ以上のノードからやってくる接続の数が大量になり、ワーカーが負荷のために落ちるかも知れません。取得リクエストの数を制限できるようにすることで、このシナリオは緩和されるかも知れません。 2.0.0
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue この設定は指定されたホスト ポートからのreduceタスク毎に取得されるリモートブロックの数を制限します。大量のブロックが1つの取得あるいは並行して指定されたアドレスからリクエストされる場合、これは提供するexecutorあるいはノードマネージャーをクラッシュするかもしれません。これは外部シャッフルが有効な時にノードマネージャーの負荷を減らすのに特に有用です。それを低い値に設定することでこの問題を緩和することができます。 2.2.1
spark.shuffle.compress true map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 0.6.0
spark.shuffle.file.buffer 32k 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。別に指定されていなければMiB単位。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。 1.4.0
spark.shuffle.io.maxRetries 3 (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。 1.2.0
spark.shuffle.io.numConnectionsPerPeer 1 (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。 1.2.1
spark.shuffle.io.preferDirectBufs true (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 1.2.0
spark.shuffle.io.retryWait 5s (Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWaitとして計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。 1.2.1
spark.shuffle.io.backLog -1 シャッフルサービスの受け入れキューの長さ。大規模なアプリケーションの場合は、この値を増やす必要があるかもしれません。そうすると、サービスが短期間に多数の接続に対応できない場合に到着する接続を取りこぼさなくなります。これは、アプリケーションの外部にある可能性があるシャッフルサービス自身が実行されている全ての場所で設定される必要があります (以下の spark.shuffle.service.enabled オプションを見てください)。1未満に設定すると、Netty の io.netty.util.NetUtil#SOMAXCONN で定義される OS のデフォルトにフォールバックします。 1.1.1
spark.shuffle.service.enabled false 外部のシャッフルサービスを有効にします。このサービスはexecutorが安全に削除されるようにexecutorによって書き込まれるシャッフルファイルを保持します。もしspark.dynamicAllocation.enabled が"true"であれば、これは有効でなければなりません。外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentation を見てください。 1.2.0
spark.shuffle.service.port 7337 外部のシャッフルサービスが実行されるだろうポート。 1.2.0
spark.shuffle.service.index.cache.size 100m 特定のメモリのフットプリントに制限されたキャッシュエントリのバイト。指定されない場合はバイト単位。 2.3.0
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE シャッフルサービス上で同時に転送されることができるチャンクの最大数。新しくやってくる接続は最大の数に達した時に閉じられるだろうことに注意してください。クライアントはシャッフル再試行の設定に応じて再試行するでしょう(spark.shuffle.io.maxRetriesspark.shuffle.io.retryWaitを見てください)。もしこれらの制限に達するとタスクは取得失敗で失敗するでしょう。 2.3.0
spark.shuffle.sort.bypassMergeThreshold 200 (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。 1.1.1
spark.shuffle.spill.compress true シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codecを使うでしょう。 0.9.0
spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 HighlyCompressedMapStatus内のシャッフルブロックのサイズである上のバイトの閾値は正確に記録されます。 これはシャッフルブロックを取り出す時にシャッフルブロックのサイズを低めに見積もることを避けることでOOMを避けるのに役立ちます。 2.2.1
spark.shuffle.registration.timeout 5000 外部シャッフルサービスへの登録のためのミリ秒のタイムアウト。 2.3.0
spark.shuffle.registration.maxAttempts 3 外部シャッフルサービスに登録を失敗した時、maxAttempts回まで再試行するでしょう。 2.3.0

Spark UI

プロパティ名デフォルト意味これ以降のバージョンから
spark.eventLog.logBlockUpdates.enabled false spark.eventLog.enabled がtrueの時に、各ブロックの更新毎にイベントを記録するかどうか。*Warning*: これはイベントログのサイズをかなり増やすでしょう。 2.3.0
spark.eventLog.longForm.enabled false trueであれば、イベントログに長い形式の呼び出しを使います。そうでなければ短い形式を使います。 2.4.0
spark.eventLog.compress false もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。 1.0.0
spark.eventLog.compression.codec ログイベントを圧縮するためのコーデック。指定されない場合は、spark.io.compression.codec が使われます。 3.0.0
spark.eventLog.erasureCoding.enabled false Whether to allow event logs to use erasure coding, or turn erasure coding off, regardless of filesystem defaults. On HDFS, erasure coded files will not update as quickly as regular replicated files, so the application updates will take longer to appear in the History Server. Note that even if this is true, Spark will still not force the file to use erasure coding, it will simply use filesystem defaults. 3.0.0
spark.eventLog.dir file:///tmp/spark-events spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。 1.0.0
spark.eventLog.enabled false アプリケーションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。 1.0.0
spark.eventLog.overwrite false 既存のファイルを上書きするかどうか。 1.0.0
spark.eventLog.buffer.kb 100k 出力ストリームに書き込む時に使用するバッファサイズ。別に指定されていなければKiB単位。 1.0.0
spark.eventLog.rolling.enabled false ローリングオーバーイベントログファイルが有効かどうか。true に設定されると、各イベントログファイルを設定されたサイズに切り詰めます。 3.0.0
spark.eventLog.rolling.maxFileSize 128m When spark.eventLog.rolling.enabled=true, specifies the max size of event log file before it's rolled over. 3.0.0
spark.ui.dagGraph.retainedRootRDDs Int.MaxValue ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのDAGグラフノードを記憶するか。 2.1.0
spark.ui.enabled true Sparkアプリケーションのためにweb UIを実行するかどうか。 1.1.1
spark.ui.killEnabled true ジョブとステージをweb UIからkillすることができます。 1.0.0
spark.ui.liveUpdate.period 100ms ライブ エンティティを更新する頻度。-1 はアプリケーションの再生時に "更新しない" 事を意味します。つまり最後の書き込みだけが起こるでしょう。稼働中のアプリケーションについては、着信タスクイベントを迅速に処理する場合に無くてもやり過ごせる幾つかの操作を回避します。 2.3.0
spark.ui.liveUpdate.minFlushPeriod 1s 古いUIデータがフラッシュされるまでの最小経過時間。着信タスクイベントが頻繁に発生しない場合にUIの陳腐化を回避します。 2.4.2
spark.ui.port 4040 メモリおよび作業データを表示する、アプリケーションのダッシュボードのポート。 0.7.0
spark.ui.retainedJobs 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 1.2.0
spark.ui.retainedStages 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 0.9.0
spark.ui.retainedTasks 100000 ガベージコレクティングの前にSparkUIおよびステータスAPIが1つのステージにどれだけのタスクを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。 2.0.1
spark.ui.reverseProxy false ワーカーとアプリケーションUIのためにリバースプロキシとしてSparkマスターの実行を有効にします。このモードでは、Sparkマスターはホストへの直接アクセスを必要とせずにワーカーとアプリケーションをリバースプロキシするでしょう。注意して使ってください。ワーカーとアプリケーションUIは直接アクセスできないため、sparkのマスター/プロキシ public URLを使ってアクセスできるだけでしょう。この設定はクラスタ内の全てのワーカーとアプリケーションUIに影響し、全てのワーカー、ドライバーおよびマスターで設定されなければなりません。 2.1.0
spark.ui.reverseProxyUrl これはプロキシを実行しているURLです。このURLはSparkマスターの前で実行しているプロキシのものです。認証のためにプロキシを実行している場合に便利です。例えば、QAuthプロキシ。プロキシに到達するスキーマ (http/https) およびポートを含む完全なURLにしてください。 2.1.0
spark.ui.proxyRedirectUri Spark がプロキシの背後で実行されている場合のリダイレクト先。これにより、Spark は Spark UI 自体のアドレスではなく、プロキシサーバを指すようにリダイレクト応答を修正します。これは、アプリケーションのプレフィックスパスを含まない、サーバのアドレスのみにする必要があります; プレフィックスは(X-Forwarded-Context リクエストヘッダを追加することで)プロキシサーバ自体、あるいは Spark アプリケーションの設定内のプロキシベースを設定することで、設定される必要があります。 3.0.0
spark.ui.showConsoleProgress false コンソール内に進捗バーを表示します。進捗バーは500msより長く実行しているステージの進捗を示します。複数のステージが同時に実行される場合は、複数の進捗バーが同じ行に表示されるでしょう。
注意: シェル環境では、spark.ui.showConsoleProgress のデフォルト値は true です。
1.2.1
spark.ui.custom.executor.log.url (none) Spark UI でクラスタマネージャーのアプリケーションログの URL を使う代わりに、外部のログサービスをサポートするための独自の spark executor ログの URL を指定します。Spark はクラスタマネージャーで変更できるパターンを介して、幾つかのパス変数をサポートします。クラスタマネージャーでどのパターンがサポートされるかを調べるには、ドキュメントがある場合はドキュメントを調べてください。

この設定はイベントログ内の元の url も置き換えることに注意してください。これは履歴サーバ上のアプリケーションにアクセスする場合にも有効です。新しいログの url は永続的である必要があります。そうしないと excecutor のログの url が無効になるかもしれません。

現在のところ、YARN モードのみがこの設定をサポートします。

3.0.0
spark.worker.ui.retainedExecutors 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。 1.5.0
spark.worker.ui.retainedDrivers 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。 1.5.0
spark.sql.ui.retainedExecutions 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。 1.5.0
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。 1.0.0
spark.ui.retainedDeadExecutors 100 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのdeadのexecutorを記憶するか。 2.0.0
spark.ui.filters None Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。
spark.<class name of filter>.param.<param name>=<value>の形式の構成エントリを設定することで、フィルターパラメータも設定内で指定することができます
例えば:
spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
1.0.0
spark.ui.requestHeaderSize 8k HTTPリクエストヘッダの最大許容サイズ。指定されない場合はバイト単位。この設定はSpark履歴サーバにも適用されます。 2.2.3

圧縮および直列化

プロパティ名デフォルト意味これ以降のバージョンから
spark.broadcast.compress true ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 0.6.0
spark.checkpoint.compress false RDD チェックポイントを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 2.2.0
spark.io.compression.codec lz4 RDDパーティション、イベントログ、ブロードキャスト変数 およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは4つのコーディックを提供します: lz4, lzf, snappyおよび zstd。コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec および org.apache.spark.io.ZStdCompressionCodec 0.8.0
spark.io.compression.lz4.blockSize 32k LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。特記の無い限り、デフォルトの単位はバイト。 1.4.0
spark.io.compression.snappy.blockSize 32k Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。特記の無い限り、デフォルトの単位はバイト。 1.4.0
spark.io.compression.zstd.level 1 Zstd圧縮コーディックの圧縮レベル。圧縮レベルを増やすとCPUとメモリの消費を犠牲にして結果がより良い圧縮となるでしょう。 2.3.0
spark.io.compression.zstd.bufferSize 32k Zstd圧縮コーディックが使用される場合、Zstd圧縮で使用されるブロックサイズのバイト数。このサイズを小さくするとZstdが使われる時のシャッフルメモリ率が低くなりますが、過度のJNI呼び出しのオーバーヘッドにより圧縮コストが増えるかもしれません。 2.3.0
spark.kryo.classesToRegister (none) Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。 1.2.0
spark.kryo.referenceTracking true Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。 0.8.0
spark.kryo.registrationRequired false Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。 1.1.0
spark.kryo.registrator (none) Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにカンマ区切りのクラスのリストを渡してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。 KryoRegistratorを拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。 0.5.0
spark.kryo.unsafe false 安全では無いKryoベースのシリアライザを使うかどうか。安全では無いIOベースを使うことで実質的に速くすることができます。 2.1.0
spark.kryoserializer.buffer.max 64m kryoのシリアライズバッファの最大許容サイズ。別に指定されていなければMiB単位。これはシリアライズ化しようとしているどのオブジェクトよりも大きくなければならず、2048mより小さくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。 1.4.0
spark.kryoserializer.buffer 64k Kryoのシリアライズ化バッファの初期サイズ。別に指定されていなければKiB単位。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。 1.4.0
spark.rdd.compress false シリアライズされたパーティションを圧縮するかどうか(例えば、JavaとScalaでのStorageLevel.MEMORY_ONLY_SER、あるいは PythonでのStorageLevel.MEMORY_ONLY)。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。圧縮はspark.io.compression.codecを使うでしょう。 0.6.0
spark.serializer org.apache.spark.serializer.
JavaSerializer
ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializerのどのようなサブクラスにもなることができます。 0.5.0
spark.serializer.objectStreamReset 100 org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。 1.0.0

メモリ管理

プロパティ名デフォルト意味これ以降のバージョンから
spark.memory.fraction 0.6 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。この設定の目的は内部メタデータ、ユーザデータ構造、およびまばらな場合の不正確なサイズの見積もりのためにメモリを取って置くことで、非常に大きなレコードです。これはデフォルトの値にしておくことをお勧めします。この値を増加した場合の正しいJVMガベージコレクションの調整についての重要な情報を含む詳細は、この説明を見てください。 1.6.0
spark.memory.storageFraction 0.5 立ち退きに耐性のあるストレージメモリの量。spark.memory.fractionによって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。 1.6.0
spark.memory.offHeap.enabled false trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。 1.6.0
spark.memory.offHeap.size 0 オフヒープ割り当てのために使うことができる絶対メモリ量。特記が無い限りバイト単位。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=trueの場合は、これは正の値に設定されなければなりません。 1.6.0
spark.storage.replication.proactive false RDDブロックのために積極的なブロックリプリケーションを有効にする。executorの障害により失われたキャッシュされたRDDのブロックレプリカは、既存の利用可能なレプリカがある限り補充されます。これはブロックのレプリケーション レベルを初期の数にしようとします。 2.2.0
spark.cleaner.periodicGC.interval 30min ガベージコレクションどれだけの頻度で起動するかを制御します。

このコンテキストクリーナーは弱い参照がガベージコレクトされた時のみ掃除されます。ドライバ上でほとんどメモリの圧迫が無いような大きなドライバJVMを使った長く実行中のアプリケーション内では、これは極めて稀か全く起きないかもしれません。全く掃除しないことはしばらくした後でexecutorがディスク空間外で実行することに繋がるかもしれません。
1.6.0
spark.cleaner.referenceTracking true コンテキストの掃除を有効または無効にします。 1.0.0
spark.cleaner.referenceTracking.blocking true クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します (シャッフル以外、これはspark.cleaner.referenceTracking.blocking.shuffle Sparkプロパティによって制御されます)。 1.0.0
spark.cleaner.referenceTracking.blocking.shuffle false シャッフル クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します。 1.1.1
spark.cleaner.referenceTracking.cleanCheckpoints false 参照がスコープ外の場合にチェックポイントファイルを掃除するかどうかを制御します。 1.4.0

Executionの挙動

プロパティ名デフォルト意味これ以降のバージョンから
spark.broadcast.blockSize 4m TorrentBroadcastFactoryのためのブロックの各断片のサイズ。別に指定されていなければKiB単位。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManagerはパフォーマンスの打撃を受けるかも知れません。 0.5.0
spark.broadcast.checksum true ブロードキャストのためにチェックサムを有効にするかどうか。有効にされた場合、ブロードキャストはチェックサムを含むでしょう。これは少し多くのデータの計算と送信という代償を払って、間違ったブロックの検知を手助けすることができます。ネットワークがブロードキャスト中にデータの間違いが起きない保証をするための他の機構を持つ場合、無効にすることができます。 2.1.1
spark.executor.cores YARNモードの場合は1、スタンドアローンモードとMesos coarse-grainedモードの場合はワーカー上の全ての利用可能なコア。 各executor上で使用されるコアの数。スタンドアローンおよびMesosの coarse-grained モードでの詳細については、this descriptionを見てください。 1.0.0
spark.default.parallelism reduceByKey および joinのような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelizeのような操作については、クラスタマネージャーに依存します:
  • ローカルモード: ローカルマシーン上のコアの数
  • Mesos fine grained mode: 8
  • その他: 全てのexecutorノードのコアの総数あるいは2のどちらか大きいほう
ユーザによって設定されなかった場合は、 join, reduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。 0.5.0
spark.executor.heartbeatInterval 10s ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。spark.executor.heartbeatInterval は spark.network.timeout よりかなり小さくなければなりません。 1.1.0
spark.files.fetchTimeout 60s ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。 1.0.0
spark.files.useFetchCache true true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。 1.2.2
spark.files.overwrite false ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。 1.0.0
spark.files.maxPartitionBytes 134217728 (128 MiB) ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。 2.1.0
spark.files.openCostInBytes 4194304 (4 MiB) ファイルを開くための予測コストは同じ時間で操作することができるバイト数によって計測することができます。これは複数のファイルを1つのパーティションに配置する場合に使われます。過剰に予測するほうが良いです。そうすれば、小さなファイルを持つパーティションは大きなファイルを持つパーティションよりも高速になるでしょう。 2.1.0
spark.hadoop.cloneConf false trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。 1.0.3
spark.hadoop.validateOutputSpecs true trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。 1.0.1
spark.storage.memoryMapThreshold 2m ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズ。特記の無い限り、デフォルトの単位はバイト。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。 0.9.2
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 ファイル出力コミッタのアルゴリズムのバージョン。有効なアルゴリズムのバージョン番号: 1 あるいは 2。バージョン 2 はよりパフォーマンスが良いかも知れませんが、ある状況では MAPREDUCE-4815により、バージョン 1が障害をより良く扱うかも知れません。 2.2.0

Executor のメトリクス

プロパティ名デフォルト意味これ以降のバージョンから
spark.eventLog.logStageExecutorMetrics false (各 executor について) executor メトリクスのステージごとのピークをイベントログに書き込むかどうか。
注意: メトリクスは executor の heatbeat でポーリング(収集)され、送信されます。これは常に行われます; この設定は、集約されたメトリクスのピークがイベントログに書き込まれるかどうかを決定するためだけのものです。
3.0.0
spark.executor.processTreeMetrics.enabled false executor のメトリクスを収集する時に、(/proc ファイルシステムから)プロセスのツリーメトリクスを収集するかどうか。
注意: プロセスツリーメトリクスは /proc ファイルシステムが存在する場合のみ収集されます。
3.0.0
spark.executor.metrics.pollingInterval 0 executor メトリクスを収集する頻度 (ミリ秒)
0 の場合、ポーリングは executor heatbeat で実行されます (したがって、spark.executor.heartbeatInterval で指定された heartbeat 間隔です)。正の場合、ポーリングはこの間隔で行われます。
3.0.0

ネットワーク

プロパティ名デフォルト意味これ以降のバージョンから
spark.rpc.message.maxSize 128 "control plane"通信内で許される最大のメッセージ(MiB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。RPCメッセージに関するメッセージが表示されるでしょう。 2.0.0
spark.blockManager.port (random) 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。 1.1.0
spark.driver.blockManager.port (spark.blockManager.port の値) ブロックマネージャーがlistenするドライバー固有のポート。executorとして同じ設定を使うことができない場合。 2.1.0
spark.driver.bindAddress (spark.driver.host の値) listenソケットをバインドするホスト名またはIPアドレス。この設定は SPARK_LOCAL_IP 環境変数(以下を見てください)を上書きします。
executorあるいは外部のシステムに知らせるためにローカルのものから異なるアドレスにすることもできます。これは例えばブリッジネットワークのコンテナを動かしている場合に便利です。これが正しく動作するためには、ドライバー(RPC, ブロックマネージャー および UI)によって使われる異なるポートがコンテナのホストからフォワードされる必要があります。
2.1.0
spark.driver.host (ローカル ホスト名) ドライバーのホスト名とIPアドレスexecutorとスタンドアローンマスターが通信するために使われます。 0.7.0
spark.driver.port (random) ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。 0.7.0
spark.rpc.io.backLog 64 RPC サーバの受け入れキューの長さ。大規模なアプリケーションの場合は、この値を増やす必要があるかもしれません。そうすると、短期間に多数の接続が来た時に到着する接続を取りこぼさなくなります。 3.0.0
spark.network.timeout 120s 全てのネットワークの相互交流のタイムアウト。この設定は、spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout あるいは spark.rpc.lookupTimeout が設定されていない場合に、それらの代わりに使われるでしょう。 1.3.0
spark.network.io.preferDirectBufs true 有効にされた場合、オフヒープバッファの割り当ては共有アロケータによって選ばれます。オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 3.0.0
spark.port.maxRetries 16 ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。 1.1.1
spark.rpc.numRetries 3 RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。 1.4.0
spark.rpc.retry.wait 3s RPCのask操作が再試行するまで待つ期間。 1.4.0
spark.rpc.askTimeout spark.network.timeout RPCのask操作がタイムアウトするまで待つ期間。 1.4.0
spark.rpc.lookupTimeout 120s RPCリモートエンドポイントがタイムアウトするまで待つ時間。 1.4.0
spark.core.connection.ack.wait.timeout spark.network.timeout 接続がタイムアウトして諦めるまでにどれだけ長くackを待つか。GCのような長い休止で不本意なタイムアウトを避けるために、大きな値を設定することができます。 1.1.1
spark.network.maxRemoteBlockSizeFetchToMem 200m ブロックのサイズがこの閾値のバイトよりも上の場合、リモートのブロックはディスクに取りに行くでしょう。これは巨大なリクエストがあまりに大きなメモリを要求することを避けるためのものです。この設定はシャッフルの取得およびブロックマネージャーのリモートブロックの取得の両方に影響するだろうことに注意してください。外部シャッフルサービスを有効にしたユーザについては、この機能は外部シャッフルサービスが少なくとも 2.3.0 である場合のみ動作するかもしれません。 3.0.0

スケジューリング

プロパティ名デフォルト意味これ以降のバージョンから
spark.cores.max (not set) <c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores、Mesos上では無制限(利用可能な全てのコア)になります。 0.6.0
spark.locality.wait 3s データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.nodeなどを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。 0.5.0
spark.locality.wait.node spark.locality.wait ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。 0.8.0
spark.locality.wait.process spark.locality.wait プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。 0.8.0
spark.locality.wait.rack spark.locality.wait rack局地性を待つための局地性をカスタマイズします。 0.8.0
spark.scheduler.maxRegisteredResourcesWaitingTime 30s スケジュールが始まる前にリソースが登録するための最大待ち時間。 1.1.1
spark.scheduler.minRegisteredResourcesRatio KUBERNETES modeでは 0.8; YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。 スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードとKubernetesモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTimeによって設定されます。 1.1.1
spark.scheduler.mode FIFO 同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。 0.8.0
spark.scheduler.revive.interval 1s スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。 0.8.1
spark.scheduler.listenerbus.eventqueue.capacity 10000 イベントキューのデフォルトの容量。Spark は最初に `spark.scheduler.listenerbus.eventqueue.queueName.capacity` で指定された容量を使ってイベントキューを初期化しようとします。設定されていない場合、Spark はこの設定で指定されたデフォルトの容量を使います。容量は 0 より大きい必要があることに注意してください。listenerのイベントが零れる場合は、値を増やすことを考えてください (例えば 20000)。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 2.3.0
spark.scheduler.listenerbus.eventqueue.shared.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの共有イベントキューの容量。これは listener バス に登録する外部 listener(s) のイベントを保持します。共有キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.appStatus.capacity spark.scheduler.listenerbus.eventqueue.capacity sppStatus イベントキューの容量。これは内部アプリケーションステータス listener のイベントを保持します。appStatus キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの executorManagement イベントキューの容量。これは内部 executor management listener のイベントを保持します。executorManagement キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.eventLog.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの eventLog キューの容量。これは eventLogs にイベントを書き込む Event logging listener のイベントを保持します。eventLog キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.listenerbus.eventqueue.streams.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark listener バスでの streams キューの容量。これは内部 streaming listener のイベントを保持します。streams キューに対応する listener イベントを取りこぼす場合は、値を増やすことを検討してください。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。 3.0.0
spark.scheduler.blacklist.unschedulableTaskSetTimeout 120s 完全にブラックリスト化されたためにスケジュールできないTaskSetを中止する前に、新しいexecutorの獲得とタスクのスケジュールを待つタイムアウトの秒数。 2.4.1
spark.blacklist.enabled false "true"に設定すると、あまりに多くのタスクの失敗によってブラックリストに入れられたexecutor上でSparkがタスクをスケジュールすることを防ぎます。ブラックリストのアルゴリズムは他の"spark.blacklist"設定オプションによって更に制御することができます。 2.1.0
spark.blacklist.timeout 1h (実験的) ノードあるいはexecutorが新しいタスクを実行しようとしてブラックリストから無条件に削除される前に、どれくらい長くノードあるいはexecutorがアプリケーション全体でブラックリスト化されるか。 2.1.0
spark.blacklist.task.maxTaskAttemptsPerExecutor 1 (実験的) 指定されたタスクについて、1つのexecutorがそのタスクがブラックリストされる前にそのexecutor上でどれだけ再試行できるか。 2.1.0
spark.blacklist.task.maxTaskAttemptsPerNode 2 (実験的) 指定されたタスクについて、1つのノードがそのタスクがブラックリストされる前にそのノード上でどれだけ再試行できるか。 2.1.0
spark.blacklist.stage.maxFailedTasksPerExecutor 2 (実験的) executorが1つのステージについてブラックリストされる前に、1つのexecutor上のそのステージ内でどれだけの異なるタスクを失敗しなければならないか。 2.1.0
spark.blacklist.stage.maxFailedExecutorsPerNode 2 (実験的) ノード全体が指定されたステージについて失敗だとマークされる前に、どれだけの異なるexecutorがそのステージについてブラックリストだとマークされるか。 2.1.0
spark.blacklist.application.maxFailedTasksPerExecutor 2 (実験的) executorがアプリケーション全体でブラックリストされる前に、連続するタスクのセットの中でどれだけの異なるタスクが1つのeecutor上で失敗しなければならないか。ブラックリスト化されたexecutorはspark.blacklist.timeoutで指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、executorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。 2.2.0
spark.blacklist.application.maxFailedExecutorsPerNode 2 (実験的) ノードがアプリケーション全体でブラックリストされる前に、どれだけの異なるexecutorがアプリケーション全体でブラックリストされなければならないか。ブラックリスト化されたノードはspark.blacklist.timeoutで指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、ノード上のexecutorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。 2.2.0
spark.blacklist.killBlacklistedExecutors false (実験的) "true" に設定すると、spark.blacklist.application.* によって制御されるため取得時の障害あるいはアプリケーション全体でブラックリストに入れられた時にSparkが自動的にexecutorをkillすることができます。ノード全体がブラックリストに追加された場合、ノード上の全てのexecutorはkillされるだろうことに注意してください。 2.2.0
spark.blacklist.application.fetchFailure.enabled false (実験的) もし "true" に設定すると、取得の失敗が起きた場合にSparkはexecutorをすぐにブラックリストに入れるでしょう。もし外部シャッフルサービスが有効な場合、ノード全体がブラックリストに入れられるでしょう。 2.3.0
spark.speculation false "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。 0.6.0
spark.speculation.interval 100ms どれだけの頻度でSparkが投機するためにタスクをチェックするか。 0.6.0
spark.speculation.multiplier 1.5 平均より遅いタスクが何回投機と見なされるか。 0.6.0
spark.speculation.quantile 0.75 指定のステージで投機が有効になる前にどれだけのタスクの割合が終了していなければならないか。 0.6.0
spark.speculation.task.duration.threshold None スケジューラがタスクを投機的に実行しようとするまでのタスク期間。指定した場合、現在のステージに含まれるタスクが1つの executor のスロット数以下であり、タスクが閾値よりも時間が掛かる場合に、タスクが投機的に実行されます。この設定は非常に少ないタスクでステージを推測するのに役立ちます。executor のスロットが十分に大きい場合、通常の投機設定も適用されるかもしれません。例えば、閾値に達していないが十分に実行が成功している場合は、タスクが再起動されるかもしれません。The number of slots is computed based on the conf values of spark.executor.cores and spark.task.cpus minimum 1. 特記の無い限り、デフォルトの単位はバイト。 3.0.0
spark.task.cpus 1 各タスクごとに割り当てるコアの数。 0.5.0
spark.task.resource.{resourceName}.amount 1 各タスクに割り当てる特定のリソースタイプの量。これは倍になるかもしれないことに注意してください。これが指定された場合、executor の設定 spark.executor.resource.{resourceName}.amount および対応するディスカバリ設定も提供して、executor がそのリソースタイプで作成されるようにする必要があります。In addition to whole amounts, a fractional amount (for example, 0.25, which means 1/4th of a resource) may be specified. Fractional amounts must be less than or equal to 0.5, or in other words, the minimum amount of resource sharing is 2 tasks per resource. Additionally, fractional amounts are floored in order to assign resource slots (e.g. a 0.2222 configuration, or 1/0.2222 slots will become 4 tasks/resource, not 5). 3.0.0
spark.task.maxFailures 4 ジョブを諦める前のタスクの失敗の数。異なるタスクに渡って広がっている失敗の総数はジョブを失敗させないでしょう; 特定のタスクがこの試行の数を失敗しなければなりません。1以上でなければなりません。許可された再試行の数 = この値 - 1. 0.8.0
spark.task.reaper.enabled false killed / interrupted タスクの監視を有効にする。trueに設定した場合、killされた全てのタスクはタスクが実際に実行を完了するまでexecutorによって監視されるでしょう。この監視の正確な挙動を制御する方法についての詳細は、他のspark.task.reaper.*設定を見てください。false (デフォルト)に設定すると、タスクのkill はそのような監視が欠けている古いコードを使うでしょう。 2.0.3
spark.task.reaper.pollingInterval 10s spark.task.reaper.enabled = trueの時、この設定はexecutorがkillされたタスクの状態をpollする頻度を制御します。pollされた時にkillされたタスクがまだ実行中の場合は、警告が記録され、デフォルトではタスクのthread-dumpが記録されるでしょう(この thread dump は spark.task.reaper.threadDump 設定によって無効にすることができます。これは以下で説明されます)。 2.0.3
spark.task.reaper.threadDump true spark.task.reaper.enabled = trueの場合、この設定はkillされたタスクの定期的なpollの際にタスクの thread dumps が記録されるかどうかを制御します。thread dumpの収集を無効にするには、これをfalseに設定します。 2.0.3
spark.task.reaper.killTimeout -1 spark.task.reaper.enabled = trueの場合、この設定は、killされたタスクが実行を停止していない場合にexecutor JVMがそれ自身をkillするだろうタイムアウトを指定します。 デフォルト値 -1 は、この機構を無効にし、executorが自己破壊をすることを避けます。この設定の目的は、キャンセル不可能なタスクの暴走がexecutorを不安定にすることを避けるための安全策として振舞うことです。 2.0.3
spark.stage.maxConsecutiveAttempts 4 ステージが中止されるまでに許される連続するステージの試行の数。 2.2.0

バリア実行モード

プロパティ名デフォルト意味これ以降のバージョンから
spark.barrier.sync.timeout 365d barrier タスクからの各 barrier() 呼び出しのタイムアウト。coordinator が設定された時間内に barrier タスクから全ての同期メッセージを受信しなかった場合は、全てのタスクを失敗させるために SparkException を投げます。デフォルト値は 31536000(3600 * 24 * 365) に設定されているため、barrier() 呼び出しは1年間待つ必要があります。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.interval 15s 最大同時タスクチェックの失敗から次のチェックまで待つ秒数。最大同時チェックにより、クラスタは送信されたジョブの barrier ステージで必要とされるより多くの同時タスクを起動できることが保証されます。クラスタが起動したばかりで十分な executor が登録されていない場合、チェックは失敗する可能性があるため、しばらく待ってからもう一度チェックを実行します。チェックがジョブに対して設定された最大失敗回数より多く失敗した場合、現在のジョブの送信が失敗します。この設定は1つ以上の barrier ステージを含むジョブにのみ適用されることに注意してください。barrier 以外のジョブではチェックを実行しません。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures 40 ジョブの送信が失敗するまでに許可される最大同時タスクチェックの失敗の数。最大同時チェックにより、クラスタは送信されたジョブの barrier ステージで必要とされるより多くの同時タスクを起動できることが保証されます。クラスタが起動したばかりで十分な executor が登録されていない場合、チェックは失敗する可能性があるため、しばらく待ってからもう一度チェックを実行します。チェックがジョブに対して設定された最大失敗回数より多く失敗した場合、現在のジョブの送信が失敗します。この設定は1つ以上の barrier ステージを含むジョブにのみ適用されることに注意してください。barrier 以外のジョブではチェックを実行しません。 2.4.0

動的割り当て

プロパティ名デフォルト意味これ以降のバージョンから
spark.dynamicAllocation.enabled false 動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。詳細は ここの説明を見てください。

これは spark.shuffle.service.enabled あるいは spark.dynamicAllocation.shuffleTracking.enabled の設定が必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio
1.2.0
spark.dynamicAllocation.executorIdleTimeout 60s 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 1.2.0
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 1.4.0
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 動的割り当てが有効な場合、実行するexecutorの初期の数。

もし `--num-executors` (あるいは `spark.executor.instances`) が設定され、この値よりも大きい場合は、executorの初期の数として使われるでしょう。
1.3.0
spark.dynamicAllocation.maxExecutors infinity 動的割り当てが有効な場合のexecutorの数の上限。 1.2.0
spark.dynamicAllocation.minExecutors 0 動的割り当てが有効な場合のexecutorの数の下限。 1.2.0
spark.dynamicAllocation.executorAllocationRatio 1 デフォルトで、動的な割り当ては処理するタスクの数に応じて並行度を最大にするのに十分なexecutorを要求するでしょう。これはジョブのレイテンシを最小化しますが、いくつかのexecutorは何も仕事をしないかもしれないため、この設定を持つ小さなタスクはexecutorの割り当てのオーバーヘッドにより多くのリソースを浪費するかもしれません。この設定によりexecutorの数を減らすために使われる割合を設定することができます。完全な並行度に関して。最大の並行度を与えるために、デフォルトは1.0です。0.5 はexecutorの目標値を2で割るでしょう。dynamicAllocationによって計算されるexecutorの目標値は spark.dynamicAllocation.minExecutorsspark.dynamicAllocation.maxExecutors の設定によって上書きすることもできます。 2.4.0
spark.dynamicAllocation.schedulerBacklogTimeout 1s もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。 1.2.0
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout spark.dynamicAllocation.schedulerBacklogTimeoutと同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は ここの説明を見てください。 1.2.0
spark.dynamicAllocation.shuffleTracking.enabled false 実験的。executor のシャッフルファイルトラッキングを有効にします。これにより、外部シャッフルサービスを必要とせずに動的な割り当てが可能になります。このオプションはアクティブなジョブのシャッフルデータを格納している executor を保とうとします。 3.0.0
spark.dynamicAllocation.shuffleTracking.timeout infinity シャッフルトラッキングが有効な場合、シャッフルデータを保持している executor のタイムアウトを制御します。The default value means that Spark will rely on the shuffles being garbage collected to be able to release executors. 何らかの理由でガベージコレクションがシャッフルを十分に早くクリーンアップしない場合、このオプションを使って、シャッフルデータを格納している executor をシャッフルデータを格納している時であってもタイムアウトするタイミングを制御することができます。 3.0.0

スレッドの設定

ジョブとクラスタの設定に応じて、使用可能なリソースを使ってパフォーマンスを向上させるために、Spark の幾つかの場所にスレッドを設定することができます。Spark 3.0 より前では、これらのスレッド設定はドライバ、executor 、ワーカー、マスターなど、Spark の全てのロールに適用されます。Spark 3.0 から、ドライバと executor から始めて、より細かい粒度でスレッドを設定することができます。以下の表の例では、RPC モジュールを取り上げます。シャッフルなどの他のモジュールの場合、RPC モジュール専用の spark.{driver|executor}.rpc.netty.dispatcher.numThreads を除いて、プロパティ名の “rpc” を “shuffle” に置き換えるだけです。

プロパティ名デフォルト意味これ以降のバージョンから
spark.{driver|executor}.rpc.io.serverThreads Fall back on spark.rpc.io.serverThreads サーバスレッドプールで使われるスレッド数 1.6.0
spark.{driver|executor}.rpc.io.clientThreads Fall back on spark.rpc.io.clientThreads クライアントスレッドプールで使われるスレッドの数 1.6.0
spark.{driver|executor}.rpc.netty.dispatcher.numThreads Fall back on spark.rpc.netty.dispatcher.numThreads RPCメッセージディスパッチャースレッドプールで使われるスレッドの数 3.0.0

スレッド関連の設定キーの数のデフォルト値は、ドライバまたは executor に要求されたコアの数の最小値、またはその値が無い場合は、JVM に利用可能なコアの数 (ハードコードされた上限は 8)。

セキュリティ

異なるSparkサブシステムを安全にする方法について利用可能なオプションについては、Security ページを参照してください。

Spark SQL

Runtime SQL 設定

ランタイム SQL 設定は、セッションごとの変更可能な Spark SQL 設定です。設定ファイルと、--conf/-c が前に付いたコマンドラインオプション、または SparkSession を作成するために使われる SparkConf を設定することで、初期値を設定することができます。また、SET コマンドで設定とクエリを実行し、RESET コマンドあるいは実行時の SparkSession.conf のセッターとゲッターで初期値に戻すことができます。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.adaptive.advisoryPartitionSizeInBytes (spark.sql.adaptive.shuffle.targetPostShuffleInputSize の値)

適応最適化中のシャッフルパーティションの推奨サイズのバイト数 (spark.sql.adaptive.enabled が true の場合)。It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.

3.0.0
spark.sql.adaptive.coalescePartitions.enabled true

これが true で、'spark.sql.adaptive.enabled' が true の場合、Spark はターゲットサイズ('spark.sql.adaptive.advisoryPartitionSizeInBytes' で指定される)に従って連続するシャッフルパーティションを合体させ、小さなタスクが多すぎないようにします。

3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (none)

合体する前のシャッフルパーティションの初期数。デフォルトでは、spark.sql.shuffle.partitions と同じです。この設定は、'spark.sql.adaptive.enabled' と 'spark.sql.adaptive.coalescePartitions.enabled' の両方が true の場合にのみ効果があります。

3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionNum (none)

合体後のシャッフルパーティションの最小数。設定しない場合、デフォルト値は Spark クラスタのデフォルトの並行度になります。この設定は、'spark.sql.adaptive.enabled' と 'spark.sql.adaptive.coalescePartitions.enabled' の両方が true の場合にのみ効果があります。

3.0.0
spark.sql.adaptive.enabled false

true の場合、アダプティブクエリ実行を有効にします。これにより、正確な実行時統計に基づいて、クエリ実行の途中でクエリプランが再最適化されます。

1.6.0
spark.sql.adaptive.localShuffleReader.enabled true

When true and 'spark.sql.adaptive.enabled' is true, Spark tries to use local shuffle reader to read the shuffle data when the shuffle partitioning is not needed, for example, after converting sort-merge join to broadcast-hash join.

3.0.0
spark.sql.adaptive.skewJoin.enabled true

When true and 'spark.sql.adaptive.enabled' is true, Spark dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed partitions.

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5

A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes'

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB

A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' multiplying the median partition size. Ideally this config should be set larger than 'spark.sql.adaptive.advisoryPartitionSizeInBytes'.

3.0.0
spark.sql.ansi.enabled false

true の場合、Spark はANSI SQL仕様に準拠しようとします:整数/10進数フィールドの操作でオーバーフローが発生した場合、Spark は実行時例外を投げます。2. Spark は ANSI SQL の予約済みキーワードを SQL パーサの識別子として使うことに注意してください。Spark will forbid using the reserved keywords of ANSI SQL as identifiers in the SQL parser.

3.0.0
spark.sql.autoBroadcastJoinThreshold 10MB

joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run, and file-based data source tables where the statistics are computed directly on the files of data.

1.1.0
spark.sql.avro.compression.codec snappy

AVROファイルの書き込みで使われる圧縮コーディック。サポートされるコーディック: uncompressed, deflate, snappy, bzip2 および xz。デフォルトのコーディックは snappy です。

2.4.0
spark.sql.avro.deflate.level -1

AVROファイルの書き込みで使われるdeflateコーディック圧縮レベル。有効な値は-1または1から10の範囲でなければなりません。デフォルト値は-1で、現在の実装での6レベルに対応します。

2.4.0
spark.sql.broadcastTimeout 300

ブロードキャストjoinでのブロードキャスト待ち時間のタイムアウト秒数。

1.3.0
spark.sql.catalog.spark_catalog (none)

A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.

3.0.0
spark.sql.cbo.enabled false

Enables CBO for estimation of plan statistics when set true.

2.2.0
spark.sql.cbo.joinReorder.dp.star.filter false

Applies star-join filter heuristics to cost based join enumeration.

2.2.0
spark.sql.cbo.joinReorder.dp.threshold 12

The maximum number of joined nodes allowed in the dynamic programming algorithm.

2.2.0
spark.sql.cbo.joinReorder.enabled false

Enables join reorder in CBO.

2.2.0
spark.sql.cbo.planStats.enabled false

When true, the logical plan will fetch row counts and column statistics from catalog.

3.0.0
spark.sql.cbo.starSchemaDetection false

When true, it enables join reordering based on star schema detection.

2.2.0
spark.sql.columnNameOfCorruptRecord _corrupt_record

The name of internal column for storing raw/un-parsed JSON and CSV records that fail to parse.

1.2.0
spark.sql.csv.filterPushdown.enabled true

When true, enable filter pushdown to CSV datasource.

3.0.0
spark.sql.datetime.java8API.enabled false

If the configuration property is set to true, java.time.Instant and java.time.LocalDate classes of Java 8 API are used as external types for Catalyst's TimestampType and DateType. If it is set to false, java.sql.Timestamp and java.sql.Date are used for the same purpose.

3.0.0
spark.sql.debug.maxToStringFields 25

Maximum number of fields of sequence-like entries can be converted to strings in debug output. Any elements beyond the limit will be dropped and replaced by a "... N more fields" placeholder.

3.0.0
spark.sql.defaultCatalog spark_catalog

Name of the default catalog. This will be the current catalog if users have not explicitly set the current catalog yet.

3.0.0
spark.sql.execution.arrow.enabled false

(Deprecated since Spark 3.0, please set 'spark.sql.execution.arrow.pyspark.enabled'.)

2.3.0
spark.sql.execution.arrow.fallback.enabled true

(Deprecated since Spark 3.0, please set 'spark.sql.execution.arrow.pyspark.fallback.enabled'.)

2.4.0
spark.sql.execution.arrow.maxRecordsPerBatch 10000

When using Apache Arrow, limit the maximum number of records that can be written to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.

2.3.0
spark.sql.execution.arrow.pyspark.enabled (value of spark.sql.execution.arrow.enabled)

When true, make use of Apache Arrow for columnar data transfers in PySpark. This optimization applies to: 1. pyspark.sql.DataFrame.toPandas 2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame The following data types are unsupported: BinaryType, MapType, ArrayType of TimestampType, and nested StructType.

3.0.0
spark.sql.execution.arrow.pyspark.fallback.enabled (value of spark.sql.execution.arrow.fallback.enabled)

When true, optimizations enabled by 'spark.sql.execution.arrow.pyspark.enabled' will fallback automatically to non-optimized implementations if an error occurs.

3.0.0
spark.sql.execution.arrow.sparkr.enabled false

When true, make use of Apache Arrow for columnar data transfers in SparkR. This optimization applies to: 1. createDataFrame when its input is an R DataFrame 2. collect 3. dapply 4. gapply The following data types are unsupported: FloatType, BinaryType, ArrayType, StructType and MapType.

3.0.0
spark.sql.execution.pandas.udf.buffer.size (value of spark.buffer.size)

Same as spark.buffer.size but only applies to Pandas UDF executions. If it is not set, the fallback is spark.buffer.size. Note that Pandas execution requires more than 4 bytes. Lowering this value could make small Pandas UDF batch iterated and pipelined; however, it might degrade performance. See SPARK-27870.

3.0.0
spark.sql.files.ignoreCorruptFiles false

Whether to ignore corrupt files. If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

2.1.1
spark.sql.files.ignoreMissingFiles false

Whether to ignore missing files. If true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

2.3.0
spark.sql.files.maxPartitionBytes 128MB

ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

2.0.0
spark.sql.files.maxRecordsPerFile 0

Maximum number of records to write out to a single file. If this value is zero or negative, there is no limit.

2.2.0
spark.sql.function.concatBinaryAsString false

When this option is set to false and all inputs are binary, functions.concat returns an output as binary. そうでなければ、文字列として返します。

2.3.0
spark.sql.function.eltOutputAsString false

When this option is set to false and all inputs are binary, elt returns an output as binary. そうでなければ、文字列として返します。

2.3.0
spark.sql.groupByAliases true

When true, aliases in a select list can be used in group by clauses. When false, an analysis exception is thrown in the case.

2.2.0
spark.sql.groupByOrdinal true

When true, the ordinal numbers in group by clauses are treated as the position in the select list. When false, the ordinal numbers are ignored.

2.0.0
spark.sql.hive.convertInsertingPartitionedTable true

When set to true, and spark.sql.hive.convertMetastoreParquet or spark.sql.hive.convertMetastoreOrc is true, the built-in ORC/Parquet writer is usedto process inserting into partitioned ORC/Parquet tables created by using the HiveSQL syntax.

3.0.0
spark.sql.hive.convertMetastoreCtas true

When set to true, Spark will try to use built-in data source writer instead of Hive serde in CTAS. This flag is effective only if spark.sql.hive.convertMetastoreParquet or spark.sql.hive.convertMetastoreOrc is enabled respectively for Parquet and ORC formats

3.0.0
spark.sql.hive.convertMetastoreOrc true

When set to true, the built-in ORC reader and writer are used to process ORC tables created by using the HiveQL syntax, instead of Hive serde.

2.0.0
spark.sql.hive.convertMetastoreParquet true

When set to true, the built-in Parquet reader and writer are used to process parquet tables created by using the HiveQL syntax, instead of Hive serde.

1.1.1
spark.sql.hive.convertMetastoreParquet.mergeSchema false

When true, also tries to merge possibly different but compatible Parquet schemas in different Parquet data files. This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.

1.3.1
spark.sql.hive.filesourcePartitionFileCacheSize 262144000

When nonzero, enable caching of partition file metadata in memory. All tables share a cache that can use up to specified num bytes for file metadata. This conf only has an effect when hive filesource partition management is enabled.

2.1.1
spark.sql.hive.manageFilesourcePartitions true

When true, enable metastore partition management for file source tables as well. This includes both datasource and converted Hive tables. When partition management is enabled, datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning.

2.1.1
spark.sql.hive.metastorePartitionPruning true

When true, some predicates will be pushed down into the Hive metastore so that unmatching partitions can be eliminated earlier. This only affects Hive tables not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and HiveUtils.CONVERT_METASTORE_ORC for more information).

1.5.0
spark.sql.hive.thriftServer.async true

When set to true, Hive Thrift server executes SQL queries in an asynchronous way.

1.5.0
spark.sql.hive.verifyPartitionPath false

When true, check all the partition paths under the table's root directory when reading data stored in HDFS. This configuration will be deprecated in the future releases and replaced by spark.files.ignoreMissingFiles.

1.4.0
spark.sql.hive.version (value of spark.sql.hive.metastore.version)

deprecated, please use spark.sql.hive.metastore.version to get the Hive version in Spark.

1.1.1
spark.sql.inMemoryColumnarStorage.batchSize 10000

カラムキャッシュのためのバッチのサイズを制御します。バッチのサイズを大きくするとメモリの利用率と圧縮が改善できますが、データをキャッシュする時にOOMのリスクがあります。

1.1.1
spark.sql.inMemoryColumnarStorage.compressed true

trueに設定した場合はSpark SQLはデータの統計に基づいて各カラムの圧縮コーディックを自動的に選択するでしょう。

1.0.1
spark.sql.inMemoryColumnarStorage.enableVectorizedReader true

Enables vectorized reader for columnar caching.

2.3.1
spark.sql.jsonGenerator.ignoreNullFields true

Whether to ignore null fields when generating JSON objects in JSON data source and JSON functions such as to_json. If false, it generates null for null fields in JSON objects.

3.0.0
spark.sql.legacy.allowHashOnMapType false

When set to true, hash expressions can be applied on elements of MapType. Otherwise, an analysis exception will be thrown.

3.0.0
spark.sql.mapKeyDedupPolicy EXCEPTION

The policy to deduplicate map keys in builtin function: CreateMap, MapFromArrays, MapFromEntries, StringToMap, MapConcat and TransformKeys. When EXCEPTION, the query fails if duplicated map keys are detected. When LAST_WIN, the map key that is inserted at last takes precedence.

3.0.0
spark.sql.maven.additionalRemoteRepositories https://maven-central.storage-download.googleapis.com/maven2/

A comma-delimited string config of the optional additional remote Maven mirror repositories. This is only used for downloading Hive jars in IsolatedClientLoader if the default Maven Central repo is unreachable.

3.0.0
spark.sql.maxPlanStringLength 2147483632

Maximum number of characters to output for a plan string. If the plan is longer, further output will be truncated. The default setting always generates a full plan. Set this to a lower value such as 8k if plan strings are taking up too much memory or are causing OutOfMemory errors in the driver or UI processes.

3.0.0
spark.sql.optimizer.dynamicPartitionPruning.enabled true

When true, we will generate predicate for partition column when it's used as join key

3.0.0
spark.sql.optimizer.excludedRules (none)

Configures a list of rules to be disabled in the optimizer, in which the rules are specified by their rule names and separated by comma. It is not guaranteed that all the rules in this configuration will eventually be excluded, as some rules are necessary for correctness. The optimizer will log the rules that have indeed been excluded.

2.4.0
spark.sql.orc.columnarReaderBatchSize 4096

The number of rows to include in a orc vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data.

2.4.0
spark.sql.orc.compression.codec snappy

Sets the compression codec used when writing ORC files. If either compression or orc.compress is specified in the table-specific options/properties, the precedence would be compression, orc.compress, spark.sql.orc.compression.codec.Acceptable values include: none, uncompressed, snappy, zlib, lzo.

2.3.0
spark.sql.orc.enableVectorizedReader true

Enables vectorized orc decoding.

2.3.0
spark.sql.orc.filterPushdown true

When true, enable filter pushdown for ORC files.

1.4.0
spark.sql.orc.mergeSchema false

When true, the Orc data source merges schemas collected from all data files, otherwise the schema is picked from a random data file.

3.0.0
spark.sql.orderByOrdinal true

When true, the ordinal numbers are treated as the position in the select list. When false, the ordinal numbers in order/sort by clause are ignored.

2.0.0
spark.sql.parquet.binaryAsString false

他の幾つかのParquet生成システム、特にImpalaとSpark SQLの古いバージョンは、Parquetスキーマを書き出す時にバイナリデータと文字列の区別しません。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにバイナリデータを文字列として扱うように指示します。

1.1.1
spark.sql.parquet.columnarReaderBatchSize 4096

The number of rows to include in a parquet vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data.

2.4.0
spark.sql.parquet.compression.codec snappy

Parquetファイルを書き込む時に圧縮符号化を使うように設定します。compression または parquet.compression のどちらかがテーブル固有のオプション/プロパティ内で指定された場合、先行詞は compression, parquet.compression, spark.sql.parquet.compression.codec でしょう。利用可能な値には、none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd が含まれます。

1.1.1
spark.sql.parquet.enableVectorizedReader true

Enables vectorized parquet decoding.

2.0.0
spark.sql.parquet.filterPushdown true

trueに設定された場合は、Parquet filter push-down 最適化を有効化します。

1.2.0
spark.sql.parquet.int96AsTimestamp true

幾つかのParquet生成システム、特にImparaは、タイムスタンプをINT96に格納します。ナノ秒項目の精度の喪失を避ける必要があるため、SparkもタイムスタンプをINT96で保持するでしょう。このフラグはこれらのシステムとの互換性を提供するために、Spark SQLにINT96データをタイムスタンプとして解釈するように指示します。

1.3.0
spark.sql.parquet.int96TimestampConversion false

This controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive & Spark.

2.3.0
spark.sql.parquet.mergeSchema false

trueの場合、Parquetデータソースは全てのデータファイルから集められたスキーマをマージします。そうでなければ、スキーマは、サマリファイル、あるいはサマリファイルが利用できない場合はランダムデータファイルから取り出されます。

1.5.0
spark.sql.parquet.outputTimestampType INT96

Sets which Parquet timestamp type to use when Spark writes data to Parquet files. INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS is a standard timestamp type in Parquet, which stores number of microseconds from the Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which means Spark has to truncate the microsecond portion of its timestamp value.

2.3.0
spark.sql.parquet.recordLevelFilter.enabled false

If true, enables Parquet's native record-level filtering using the pushed down filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled and the vectorized reader is not used. You can ensure the vectorized reader is not used by setting 'spark.sql.parquet.enableVectorizedReader' to false.

2.3.0
spark.sql.parquet.respectSummaryFiles false

When true, we make assumption that all part-files of Parquet are consistent with summary files and we will ignore them when merging schema. Otherwise, if this is false, which is the default, we will merge all part-files. This should be considered as expert-only option, and shouldn't be enabled before knowing what it means exactly.

1.5.0
spark.sql.parquet.writeLegacyFormat false

もしtrueであれば、データはSpark1.4以前の方法で書き込まれるでしょう。例えば、小数値はApache Parquetの固定長のバイト配列形式で書き込まれるでしょう。これはApache HiveやApache Impalaのよゆな他のシステムが使います。falseであれば、Parquetの新しい形式が使われるでしょう。例えば、小数はintに基づいた形式で書き込まれるでしょう。Parquetの出力がこの新しい形式をサポートしないシステムと一緒に使うことを意図している場合は、trueに設定してください。

1.6.0
spark.sql.parser.quotedRegexColumnNames false

When true, quoted Identifiers (using backticks) in SELECT statement are interpreted as regular expressions.

2.3.0
spark.sql.pivotMaxValues 10000

When doing a pivot without specifying values for the pivot column this is the maximum number of (distinct) values that will be collected without error.

1.6.0
spark.sql.pyspark.jvmStacktrace.enabled false

When true, it shows the JVM stacktrace in the user-facing PySpark exception together with Python stacktrace. By default, it is disabled and hides JVM stacktrace and shows a Python-friendly exception only.

3.0.0
spark.sql.redaction.options.regex (?i)url

Regex to decide which keys in a Spark SQL command's options map contain sensitive information. The values of options whose names that match this regex will be redacted in the explain output. This redaction is applied on top of the global redaction configuration defined by spark.redaction.regex.

2.2.2
spark.sql.redaction.string.regex (value of spark.redaction.string.regex)

Regex to decide which parts of strings produced by Spark contain sensitive information. When this regex matches a string part, that string part is replaced by a dummy value. This is currently used to redact the output of SQL explain commands. When this conf is not set, the value from spark.redaction.string.regex is used.

2.3.0
spark.sql.repl.eagerEval.enabled false

Enables eager evaluation or not. When true, the top K rows of Dataset will be displayed if and only if the REPL supports the eager evaluation. Currently, the eager evaluation is supported in PySpark and SparkR. In PySpark, for the notebooks like Jupyter, the HTML table (generated by repr_html) will be returned. For plain Python REPL, the returned outputs are formatted like dataframe.show(). In SparkR, the returned outputs are showed similar to R data.frame would.

2.4.0
spark.sql.repl.eagerEval.maxNumRows 20

The max number of rows that are returned by eager evaluation. This only takes effect when spark.sql.repl.eagerEval.enabled is set to true. The valid range of this config is from 0 to (Int.MaxValue - 1), so the invalid config like negative and greater than (Int.MaxValue - 1) will be normalized to 0 and (Int.MaxValue - 1).

2.4.0
spark.sql.repl.eagerEval.truncate 20

The max number of characters for each cell that is returned by eager evaluation. This only takes effect when spark.sql.repl.eagerEval.enabled is set to true.

2.4.0
spark.sql.session.timeZone (value of local timezone)

The ID of session local timezone in the format of either region-based zone IDs or zone offsets. Region IDs must have the form 'area/city', such as 'America/Los_Angeles'. Zone offsets must be in the format '(+|-)HH:mm', for example '-08:00' or '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'. Other short names are not recommended to use because they can be ambiguous.

2.2.0
spark.sql.shuffle.partitions 200

The default number of partitions to use when shuffling data for joins or aggregations. Note: For structured streaming, this configuration cannot be changed between query restarts from the same checkpoint location.

1.1.0
spark.sql.sources.bucketing.enabled true

When false, we will treat bucketed table as normal table

2.0.0
spark.sql.sources.bucketing.maxBuckets 100000

The maximum number of buckets allowed.

2.4.0
spark.sql.sources.default parquet

The default data source to use in input/output.

1.3.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32

The maximum number of paths allowed for listing files at driver side. If the number of detected paths exceeds this value during partition discovery, it tries to list the files with another Spark distributed job. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

1.5.0
spark.sql.sources.partitionColumnTypeInference.enabled true

When true, automatically infer the data types for partitioned columns.

1.5.0
spark.sql.sources.partitionOverwriteMode STATIC

When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't affect Hive serde tables, as they are always overwritten with dynamic mode. This can also be set as an output option for a data source using key partitionOverwriteMode (which takes precedence over this setting), e.g. dataframe.write.option("partitionOverwriteMode", "dynamic").save(path).

2.3.0
spark.sql.statistics.fallBackToHdfs false

When true, it will fall back to HDFS if the table statistics are not available from table metadata. This is useful in determining if a table is small enough to use broadcast joins. This flag is effective only for non-partitioned Hive tables. For non-partitioned data source tables, it will be automatically recalculated if table statistics are not available. For partitioned data source and partitioned Hive tables, It is 'spark.sql.defaultSizeInBytes' if table statistics are not available.

2.0.0
spark.sql.statistics.histogram.enabled false

Generates histograms when computing column statistics if enabled. Histograms can provide better estimation accuracy. Currently, Spark only supports equi-height histogram. Note that collecting histograms takes extra cost. For example, collecting column statistics usually takes only one table scan, but generating equi-height histogram will cause an extra table scan.

2.3.0
spark.sql.statistics.size.autoUpdate.enabled false

Enables automatic update for table size once table's data is changed. Note that if the total number of files of the table is very large, this can be expensive and slow down data change commands.

2.3.0
spark.sql.storeAssignmentPolicy ANSI

When inserting a value into a column with different data type, Spark will perform type coercion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy policy, Spark allows the type coercion as long as it is a valid Cast, which is very loose. e.g. converting string to int or double to boolean is allowed. It is also the only behavior in Spark 2.x and it is compatible with Hive. With strict policy, Spark doesn't allow any possible precision loss or data truncation in type coercion, e.g. converting double to int or decimal to double is not allowed.

3.0.0
spark.sql.streaming.checkpointLocation (none)

The default location for storing checkpoint data for streaming queries.

2.0.0
spark.sql.streaming.continuous.epochBacklogQueueSize 10000

The max number of entries to be stored in queue to wait for late epochs. If this parameter is exceeded by the size of the queue, stream will stop with an error.

3.0.0
spark.sql.streaming.disabledV2Writers

A comma-separated list of fully qualified data source register class names for which StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.

2.3.1
spark.sql.streaming.fileSource.cleaner.numThreads 1

Number of threads used in the file source completed file cleaner.

3.0.0
spark.sql.streaming.forceDeleteTempCheckpointLocation false

When true, enable temporary checkpoint locations force delete.

3.0.0
spark.sql.streaming.metricsEnabled false

Whether Dropwizard/Codahale metrics will be reported for active streaming queries.

2.0.2
spark.sql.streaming.multipleWatermarkPolicy min

Policy to calculate the global watermark value when there are multiple watermark operators in a streaming query. The default value is 'min' which chooses the minimum watermark reported across multiple operators. Other alternative value is 'max' which chooses the maximum across multiple operators. Note: This configuration cannot be changed between query restarts from the same checkpoint location.

2.4.0
spark.sql.streaming.noDataMicroBatches.enabled true

Whether streaming micro-batch engine will execute batches without data for eager state management for stateful streaming queries.

2.4.1
spark.sql.streaming.numRecentProgressUpdates 100

The number of progress updates to retain for a streaming query

2.1.1
spark.sql.streaming.stopActiveRunOnRestart true

Running multiple runs of the same streaming query concurrently is not supported. If we find a concurrent active run for a streaming query (in the same or different SparkSessions on the same cluster) and this flag is true, we will stop the old streaming query run to start the new one.

3.0.0
spark.sql.streaming.stopTimeout 0

How long to wait in milliseconds for the streaming execution thread to stop when calling the streaming query's stop() method. 0 or negative values wait indefinitely.

3.0.0
spark.sql.thriftserver.scheduler.pool (none)

Set a Fair Scheduler pool for a JDBC client session.

1.1.1
spark.sql.thriftserver.ui.retainedSessions 200

The number of SQL client sessions kept in the JDBC/ODBC web UI history.

1.4.0
spark.sql.thriftserver.ui.retainedStatements 200

The number of SQL statements kept in the JDBC/ODBC web UI history.

1.4.0
spark.sql.variable.substitute true

This enables substitution using syntax like ${var}, ${system:var}, and ${env:var}.

2.0.0

Static SQL 設定

Static SQL configurations are cross-session, immutable Spark SQL configurations. They can be set with final values by the config file and command-line options with --conf/-c prefixed, or by setting SparkConf that are used to create SparkSession. External users can query the static sql config values via SparkSession.conf or via set command, e.g. SET spark.sql.extensions;, but cannot set/unset them.

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.event.truncate.length 2147483647

Threshold of SQL length beyond which it will be truncated before adding to event. Defaults to no truncation. If set to 0, callsite will be logged instead.

3.0.0
spark.sql.extensions (none)

A comma-separated list of classes that implement Function1[SparkSessionExtensions, Unit] used to configure Spark Session extensions. The classes must have a no-args constructor. If multiple extensions are specified, they are applied in the specified order. For the case of rules and planner strategies, they are applied in the specified order. For the case of parsers, the last parser is used and each parser can delegate to its predecessor. For the case of function name conflicts, the last registered function name is used.

2.2.0
spark.sql.hive.metastore.barrierPrefixes

Spark SQLと通信をする各バージョンのHiveのために明示的にロードされなければならないクラスのプリフィックスのカンマ区切りのリスト。例えば、一般的なprefixで定義されたHive UDFは共有されるでしょう(例えば、org.apache.spark.*)。

1.4.0
spark.sql.hive.metastore.jars ビルトイン

HiveMetastoreClientをインスタンス化するために使われるべきjarの場所。This property can be one of three options: " 1. "builtin" Use Hive 2.3.7, which is bundled with the Spark assembly when -Phive is enabled. このオプションが選択された場合は、spark.sql.hive.metastore.version2.3.7 あるいは未定義のどちらかでなければなりません。2. "maven" Use Hive jars of specified version downloaded from Maven repositories. 3. HiveおよびHadoopの両方の標準的なフォーマットのクラスパス。

1.4.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc

Spark SQLとHiveの特定のバージョンの間で共有されるクラスローダを使ってロードされるべきカンマ区切りのクラスプリフィックスのリスト。共有されるべきクラスの例はJDBCドライバで、メタストアと対話するために必要とされます。共有される必要がある他のクラスは、既に共有されているクラスとやり取りするためのものです。例えば、log4jによって使われる独自のアペンダーです。

1.4.0
spark.sql.hive.metastore.version 2.3.7

HiveメタストアのバージョンAvailable options are 0.12.0 through 2.3.7 and 3.0.0 through 3.1.2.

1.4.0
spark.sql.hive.thriftServer.singleSession false

When set to true, Hive Thrift server is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.

1.6.0
spark.sql.legacy.sessionInitWithConfigDefaults false

Flag to revert to legacy behavior where a cloned SparkSession receives SparkConf defaults, dropping any overrides in its parent SparkSession.

3.0.0
spark.sql.queryExecutionListeners (none)

List of class names implementing QueryExecutionListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument.

2.3.0
spark.sql.streaming.streamingQueryListeners (none)

List of class names implementing StreamingQueryListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument.

2.4.0
spark.sql.streaming.ui.enabled true

Whether to run the Structured Streaming Web UI for the Spark application when the Spark Web UI is enabled.

3.0.0
spark.sql.streaming.ui.retainedProgressUpdates 100

The number of progress updates to retain for a streaming query for Structured Streaming UI.

3.0.0
spark.sql.streaming.ui.retainedQueries 100

The number of inactive queries to retain for Structured Streaming UI.

3.0.0
spark.sql.ui.retainedExecutions 1000

Number of executions to retain in the Spark UI.

1.5.0
spark.sql.warehouse.dir (value of $PWD/spark-warehouse)

The default location for managed databases and tables.

2.0.0

Spark ストリーミング

プロパティ名デフォルト意味これ以降のバージョンから
spark.streaming.backpressure.enabled false Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRateおよびspark.streaming.kafka.maxRatePerPartitionが設定されている場合に、この値で上限を制限されます (以下を見てください)。 1.5.0
spark.streaming.backpressure.initialRate not set これはバックプレッシャー機構が有効な場合に最初のバッチのために各レシーバーがデータを受信する初期の最大受信レートです。 2.0.0
spark.streaming.blockInterval 200ms Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。See the performance tuning section in the Spark Streaming programming guide for more details. 0.8.0
spark.streaming.receiver.maxRate not set 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。See the deployment guide in the Spark Streaming programming guide for mode details. 1.0.2
spark.streaming.receiver.writeAheadLog.enable false レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。See the deployment guide in the Spark Streaming programming guide for more details. 1.2.1
spark.streaming.unpersist true Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。 0.9.0
spark.streaming.stopGracefullyOnShutdown false trueの場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。 1.4.0
spark.streaming.kafka.maxRatePerPartition not set 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。 1.3.0
spark.streaming.kafka.minRatePerPartition 1 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最小レート(秒間あたりのレコード数)。 2.4.0
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。 1.0.0
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false ドライバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。ドライバー上のメタデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 1.6.0
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false レシーバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。レシーバー上のデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 1.6.0

SparkR

プロパティ名デフォルト意味これ以降のバージョンから
spark.r.numRBackendThreads 2 SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。 1.4.0
spark.r.command Rscript ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。 1.5.3
spark.r.driver.command spark.r.command ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。 1.5.3
spark.r.shell.command R ドライバーのためのクライアントモードのsparkRシェルの実行のための実行ファイル。クラスターモードでは無視されます。環境変数 SPARKR_DRIVER_R と同じですが、それより優先されます。spark.r.shell.command はsparkRシェルのために使われますが、spark.r.driver.command は R スクリプトを実行するために使われます。 2.1.0
spark.r.backendConnectionTimeout 6000 Rプロセスによって設定されるRBackendへの接続上の接続タイムアウトの秒数。 2.1.0
spark.r.heartBeatInterval 100 接続タイムアウトを防ぐためにSparkRバックエンドからRプロセスに送信されるハートビートの間隔。 2.1.0

GraphX

プロパティ名デフォルト意味これ以降のバージョンから
spark.graphx.pregel.checkpointInterval -1 Pregelでのグラフとメッセージのためのチェックポイント間隔。多くの繰り返しの連鎖による stackOverflowError を避けるために使われます。デフォルトではチェックポイントは無効です。 2.2.0

配備

プロパティ名デフォルト意味これ以降のバージョンから
spark.deploy.recoveryMode NONE クラスターモードでサブミットされたSparkジョブが失敗し再起動する場合に、回復の設定をするリカバリモード。スタンダードあるいはMesosで動いている場合にクラスタモードでのみ適用可能です。 0.8.1
spark.deploy.zookeeper.url None `spark.deploy.recoveryMode` が ZOOKEEPER に設定されている場合は、この設定は接続するためのzookeeperのURLに設定するために使われます。 0.8.1
spark.deploy.zookeeper.dir None `spark.deploy.recoveryMode` がZOOKEEPERに設定されている場合は、この設定はリカバリー状態を保持するためのzookeeperディレクトリを設定するために使われます。 0.8.1

クラスタマネージャー

Sparkの各クラスタマネージャーは追加の設定オプションを持ちます。各ノードのためのページ上で設定を見つけることができます。

YARN

Mesos

Kubernetes

スタンドアローンモード

環境変数

あるSpark設定は環境変数によって設定することができ、それらはSparkがインストールされたディレクトリ内のconf/spark-env.shスクリプトによって使われます(Windows上ではconf/spark-env.cmd)。スタンドアローンおよびMesosモードでは、このファイルはホスト名のようなマシーン固有の情報を渡すことができるでしょう。ローカルのアプリケーションの実行あるいはスクリプトのサブミットの場合、それは開始場所にもなります。

conf/spark-env.sh はSparkがインストールされた場合はデフォルトでは存在しないことに注意してください。しかし、それを作成するためにconf/spark-env.sh.template をコピーすることができます。コピーを実行可能にすることを忘れないでください。

以下の変数はspark-env.shの中で設定することができます:

環境変数意味
JAVA_HOME Javaがインストールされた場所(デフォルトのPATH上に無い場合)。
PYSPARK_PYTHON PySparkのためにドライバおよびワーカーの両方の中で使うPythonの実行可能バイナリ(デフォルトは、利用可能であればpython2.7。そうでなければpythonです)。設定されている場合はプロパティspark.pyspark.python が優先されます。
PYSPARK_DRIVER_PYTHON PySparkのためにドライバの中でのみ使うPythonの実行可能バイナリ(デフォルトはPYSPARK_PYTHONです)。設定されている場合はプロパティspark.pyspark.driver.python が優先されます。
SPARKR_DRIVER_R SparkRシェルのために使われるRバイナリ実行ファイル(デフォルトは R)。設定されている場合はプロパティspark.r.shell.command が優先されます。
SPARK_LOCAL_IP バインドするマシーンのIPアドレス。
SPARK_PUBLIC_DNS Sparkプログラムが他のマシーンに知らせるホスト名。

上に加えて、各マシーンで使うコアの数や最大メモリのような、Sparkスタンドアローン クラスタ スクリプトを設定するためのオプションもあります。

spark-env.shはシェルスクリプトなので、それらのいくつかはプログラム的に設定することができます - 例えば、特定のネットワークインタフェースのIPを調べることでSPARK_LOCAL_IPを計算することができるかもしれません。

注意: cluster モードでYARN上のSparkを実行する場合は、 conf/spark-defaults.conf ファイル内のspark.yarn.appMasterEnv.[EnvironmentVariableName] を使って環境変数が設定される必要があります。spark-env.shに設定されている環境変数は、clusterモードのYARN マスタープロセス内には反映されないでしょう。詳細についてはYARNに関係する Spark プロパティ を見てください。

ログの設定

Sparkはログのためにlog4jを使います。confディレクトリに log4j.properties ファイルを追加することで設定することができます。開始する一つの方法は、そこに存在する既存のlog4j.properties.template をコピーすることです。

設定ディレクトリの上書き

デフォルトの"SPARK_HOME/conf"以外に異なる設定ディレクトリを指定するために、SPARK_CONF_DIRを設定することができます。Sparkはこのディレクトリから設定ファイル(spark-defaults.conf, spark-env.sh, log4j.properties など)を使用するでしょう。

Hadoopクラスタ設定の継承

Sparkを使ってHDFSから読み込みをするつもりであれば、Sparkのクラスパス上に含まれるべき二つのHadoop設定ファイルがあります。

これらの設定ファイルの場所はHadoopおよびHDPバージョンによって変わりますが、一般的な場所は/etc/hadoop/confの中です。幾つかのツールはその場で設定を生成しますが、それらのコピーをダウンロードするための仕組みを提供します。

それらのファイルがSparkに見えるようにするためには、$SPARK_HOME/conf/spark-env.sh内のHADOOP_CONF_DIRを設定ファイルを含む場所に設定します。

独自の Hadoop/Hive 設定

もしSparkアプリケーションがHadoop, Hive あるいは両方とやり取りをする場合、Sparkのクラスパス内におそらくHadoop/Hiveの設定ファイルがあります。

複数の実行中のアプリケーションは異なるHadoop/Hiveクライアント側の設定を必要とするかもしれません。各アプリケーションごとにSparkのクラスパス内の hdfs-site.xml, core-site.xml, yarn-site.xml, hive-site.xml をコピーし修正することができます。YARN上で実行中のSparkクラスタ内ではこれらの設定ファイルはクラスタ全体で設定され、アプリケーションによって安全に変更することができません。

The better choice is to use spark hadoop properties in the form of spark.hadoop.*, and use spark hive properties in the form of spark.hive.*. For example, adding configuration “spark.hadoop.abc.def=xyz” represents adding hadoop property “abc.def=xyz”, and adding configuration “spark.hive.abc=xyz” represents adding hive property “hive.abc=xyz”. They can be considered as same as normal spark properties which can be set in $SPARK_HOME/conf/spark-defaults.conf

時には、ある設定をSparkConfにハードコーディングしたくないかも知れません。For instance, Spark allows you to simply create an empty conf and set spark/spark hadoop/spark hive properties.

val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)

また、実行時に設定を修正あるいは追加することができます:

./bin/spark-submit \ 
  --name "My app" \ 
  --master local[4] \  
  --conf spark.eventLog.enabled=false \ 
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ 
  --conf spark.hadoop.abc.def=xyz \
  --conf spark.hive.abc=xyz
  myApp.jar

カスタム リソース スケジューリングと設定の概要

GPUs and other accelerators have been widely used for accelerating special workloads, e.g., deep learning and signal processing. Spark now supports requesting and scheduling generic resources, such as GPUs, with a few caveats. The current implementation requires that the resource have addresses that can be allocated by the scheduler. It requires your cluster manager to support and be properly configured with the resources.

There are configurations available to request resources for the driver: spark.driver.resource.{resourceName}.amount, request resources for the executor(s): spark.executor.resource.{resourceName}.amount and specify the requirements for each task: spark.task.resource.{resourceName}.amount. The spark.driver.resource.{resourceName}.discoveryScript config is required on YARN, Kubernetes and a client side Driver on Spark Standalone. spark.executor.resource.{resourceName}.discoveryScript config is required for YARN and Kubernetes. Kubernetes also requires spark.driver.resource.{resourceName}.vendor and/or spark.executor.resource.{resourceName}.vendor. See the config descriptions above for more information on each.

Spark will use the configurations specified to first request containers with the corresponding resources from the cluster manager. Once it gets the container, Spark launches an Executor in that container which will discover what resources the container has and the addresses associated with each resource. The Executor will register with the Driver and report back the resources available to that Executor. The Spark scheduler can then schedule tasks to each Executor and assign specific resource addresses based on the resource requirements the user specified. The user can see the resources assigned to a task using the TaskContext.get().resources api. On the driver, the user can see the resources assigned with the SparkContext resources call. It’s then up to the user to use the assignedaddresses to do the processing they want or pass those into the ML/AI framework they are using.

See your cluster manager specific page for requirements and details on each of - YARN, Kubernetes and Standalone Mode. It is currently not available with Mesos or local mode. And please also note that local-cluster mode with multiple workers is not supported(see Standalone documentation).

TOP
inserted by FC2 system