Sparkの設定

Sparkはシステムを設定するための3つの場所を提供します:

Sparkのプロパティ

Spark プロパティはほとんどのアプリケーションの設定を制御し、各アプリケーションに対して別々に設定されます。これらのプロパティは SparkContextに渡して直接SparkConf に設定することができます。SparkConfを使って共通プロパティの幾つかを設定(例えば、マスターURLおよびアプリケーション名)することができ、set()メソッドを使って任意のキー値ペアを設定することができます。例えば、以下のように2つのスレッドを使ってアプリケーションを初期化することができます:

local[2]をつけて実行、2つのスレッド - "最小限"の並列処理、することにより、分散コンテキストで実行した場合にのみ存在するバグを検出するのに役立つことに注意してください。

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

ローカルモードで1つ以上のスレッドを持つことができ、実際にSpark Streamingのような場合にはリソース不足を避けるために1つ以上のスレッドを要求するかも知れないということに注意してください。

なんらかの期間を指定するプロパティは時間の単位で設定されなければなりません。以下の書式が受け付けられます:

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

バイトサイズを指定するプロパティはサイズの単位を使って設定されなければなりません。以下の書式が受け付けられます:

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

単位が無い数値は一般的にバイトとして解釈されますが、2,3のものはKiBあるいはMiBとして解釈されます。個々の設定プロパティのドキュメントを見てください。可能な箇所では単位の指定が望ましいです。

Sparkプロパティの動的なロード

時には、ある設定をSparkConfにハードコーディングしたくないかも知れません。例えば、もし同じアプリケーションを異なるマスターあるいは異なるメモリ量で実行したいなど。Sparkは空のconfを単純に作成することができます:

val sc = new SparkContext(new SparkConf())

そうすると、実行時に設定値を提供することができます:

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark シェルおよび spark-submit ツールは動的に設定をロードする2つの方法を提供します。最初のものは上で示されたように、--masterのようなコマンドラインのオプションです。spark-submit--conf フラグを使ってどのようなSpark プロパティも受け付けることができますが、Sparkアプリケーションを起動する時に役割があるプロパティのために特別なフラグを使うようにしてください。./bin/spark-submit --helpを実行すると、これらのオプションの完全なリストが表示されるでしょう。

bin/spark-submitは設定オプションを conf/spark-defaults.confからも読み込むでしょう。各行はホワイトスペースで区切られたキーと値です。例えば:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

フラグで指定された、あるいはプロパティファイルの全ての値はアプリケーションに渡され、SparkConfを通じてマージされるでしょう。SparkConfで直接設定されたプロパティは優先度が高く、その次はspark-submit あるいは spark-shellに渡されたもので、その次はspark-defaults.conf ファイル内のオプションです。以前のSparkのバージョンから2,3の設定キーの名前が変更されました; そのような場合、古いキーの名前はまだ受け付けられますが、新しいキーのどのインスタンスよりも優先度が低くなります。

Sparkのプロパティは主に2つの種類に分けることができます; 1つは “spark.driver.memory”, “spark.executor.instances” のような配備に関係します。この種類のプロパティはプログラム的に実行時にSparkConfを使って設定する時に影響を受けないでしょう。あるいはその挙動はどのクラスタマネージャと配備モードを選択したかに依存するため、設定ファイルまたは spark-submit コマンドラインのオプションを使って設定することが望まれます; もう1つは “spark.task.maxFailures” のような主にSparkのruntimeの制御に関係し、この種類のプロパティはどちらかの方法で設定することができます。

Sparkプロパティのビュー

http://<driver>:4040 のアプリケーションUIは、"Environment"タブの中でSparkプロパティをリスト表示します。ここはプロパティが正しく設定されたか確認するのに便利な場所です。spark-defaults.conf, SparkConf, あるいはコマンドラインで明示的に指定された値だけが表示されるだろうことに注意してください。他の全ての設定プロパティについては、デフォルトの値が使われると見なすことができます。

利用可能なプロパティ

内部設定を制御するほとんどのプロパティは意味のあるデフォルト値を持ちます。最も一般的なオプションの幾つかは以下のように設定されます。

アプリケーションのプロパティ

プロパティ名デフォルト意味
spark.app.name (none) アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。
spark.driver.cores 1 ドライバープロセスのために使われるコアの数。クラスターモードのみです。
spark.driver.maxResultSize 1g 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズのバイト数の制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。
spark.driver.memory 1g ドライバープロセス、つまりSparkContextが初期化される時に使用されるメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.memoryOverhead 最小384で、driverMemory * 0.10 クラスタモード時のドライバごとに割り当てられるオフヒープメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはコンテナのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところYARNとKubernetesでサポートされます。
spark.executor.memory 1g executorプロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。
spark.executor.pyspark.memory 設定無し executorごとにPySparkに割り当てられるメモリ量。別に指定されていなければMiB単位。設定された場合、executorのためのPySparkメモリはこの量に制限されるでしょう。設定されない場合、SparkはPythonのメモリ使用を制限せず、他の非JVMプロセスと共有されるオーバーヘッドのメモリ空間を超えないようにするのはアプリケーションの責任です。PySparkがYARNあるいはKubernetes内で実行される場合、子のメモリはexecutorのリソースリクエストに追加されます。注意: Pythonのメモリの使用は、Windowsのようなリソースの制限をサポートしないプラットフォーム上で制限されないかもしれません。
spark.executor.memoryOverhead 最小384で、executorMemory * 0.10。 executorあたりに割り当てられるオフヒープメモリの量。別に指定されていなければMiB単位。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはexecutorのサイズと共に(一般的に 6-10%)大きくなりがちです。このオプションは現在のところYARNとKubernetesでサポートされます。
spark.extraListeners (none) SparkListenerを実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。
spark.local.dir /tmp Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。注意: Spark 1.0 以降では、これはクラスタマネージャーによって、SPARK_LOCAL_DIRS (スタンドアローン)、MESOS_SANDBOX (Mesos) あるいは LOCAL_DIRS (YARN) 環境変数で上書きされます。
spark.logConf false SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。
spark.master (none) 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。
spark.submit.deployMode (none) Sparkドライバプログラムの配備モード、"client"あるいは"cluster"、これはドライバープログラムをローカル("client")あるいはクラスタ内のノードの1つの上で遠隔で("cluster")起動することを意味します。
spark.log.callerContext (none) Yarn/HDFS上で動作している時に Yarn RM log/HDFS audit ログに書き込まれるアプリケーションの情報。その長さはhadoop.caller.context.max.sizeのHadoop設定に依存します。簡潔でなければならず、一般的に50文字まで持つことができます。
spark.driver.supervise false trueであれば、非0の終了ステータスで失敗した場合にドライバは自動的に再起動します。Spark スタンドアローン モードあるいは Mesos クラスタ配備モードでのみ効果があります。

これとは別に、以下のプロパティも利用可能で、ある状況では有用かも知れません。

ランタイム環境

プロパティ名デフォルト意味
spark.driver.extraClassPath (none) ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.extraJavaOptions (none) ドライバに渡す特別なJVMオプションの文字列。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memoryを、クライアントモードでは--driver-memory コマンドラインオプションを使って設定することができます。
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.extraLibraryPath (none) ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.userClassPathFirst false (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。
spark.executor.extraClassPath (none) executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。
spark.executor.extraJavaOptions (none) executorに渡すための特別なJVMオプションの文字列例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。シンボルが続く場合、手を加えられるでしょう: アプリケーションIDによって置き換えられ、executor IDによって置き換えられるでしょう。例えば、/tmp 内にappのexecutor IDの名前をとるファイルへの冗長なgcログを有効にするには、以下の 'value' を渡します: -verbose:gc -Xloggc:/tmp/-.gc
spark.executor.extraLibraryPath (none) executor JVMを起動する場合に使う特別なライブラリパスを設定する。
spark.executor.logs.rolling.maxRetainedFiles (none) システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。
spark.executor.logs.rolling.enableCompression false executor ログの圧縮を有効にします。有効な場合、ロールされたexecutorログは圧縮されるでしょう。デフォルトは無効です。
spark.executor.logs.rolling.maxSize (none) executorのログがロールオーバーされる最大のファイルサイズをバイトで設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。
spark.executor.logs.rolling.strategy (none) executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するために>spark.executor.logs.rolling.maxSize を使用します。
spark.executor.logs.rolling.time.interval daily executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdaily, hourly, minutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFilesを見てください。
spark.executor.userClassPathFirst false (実験的なもの) spark.driver.userClassPathFirstと同じ機能ですが、executorインスタンスに適用されます。
spark.executorEnv.[EnvironmentVariableName] (none) EnvironmentVariableNameによって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。
spark.redaction.regex (?i)secret|password ドライバおよびexecutor環境の中で、Spark設定プロパティと環境変数のどちらが保護必要情報を持つかを決定する正規表現。この正規表現がプロパティのキーあるいは値に一致した場合、その値は環境のUIおよびYARNとイベントログのような様々なログから作成されます。
spark.python.profile false Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles()によって表示されるか、ドライバが終了する前に表示されるでしょう。sc.dump_profiles(path)によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。
spark.python.profile.dump (none) ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはpstats.Stats()によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。
spark.python.worker.memory 512m 集約の間にpythonワーカープロセスごとに使用するメモリ量。サイズ単位のサフィックスを持つJVMメモリ文字列と同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。
spark.python.worker.reuse true Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。
spark.files 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。globも可能です。
spark.submit.pyFiles PythonアプリのためのPYTHONPATH上に配置されるカンマ区切りの .zip, .egg あるいは .py ファイル。globも可能です。
spark.jars ドライバーとexecutorのクラスパス上でインクルードするjarのカンマ区切りのリスト。globも可能です。
spark.jars.packages ドライバーとexecutorのクラスパス上でインクルードされる Maven coordinateのjarのカンマ区切りのリストcoordinateは groupId:artifactId:version でなければなりません。spark.jars.ivySettingsが指定された場合、artifacts はファイル内の設定に応じて解決されるでしょう。そうでなければ artifacts はローカルのmavenリポジトリ、次にmavenセントラル、最後にコマンドラインオプション--repositoriesによって指定される追加のリモートリポジトリから探されるでしょう。詳細は、上級の依存管理を見てください。
spark.jars.excludes spark.jars.packagesで与えらえた依存を解決する間に依存性の衝突を避けるために除外する、groupId:artifactId のカンマ区切りのリスト。
spark.jars.ivy Ivyユーザディレクトリを指定するパス。spark.jars.packagesからローカルのIvyキャッシュおよびパッケージファイルのために使われます。これはデフォルトが ~/.ivy2 の Ivyプロパティ ivy.default.ivy.user.dir を上書くでしょう。
spark.jars.ivySettings mavenセントラルのような組み込みのデフォルトの代わりに、spark.jars.packages を使って指定されるjarの解決を独自に行うためのIvy設定ファイルへのパス。コマンドラインオプション--repositoriesあるいはspark.jars.repositories によって指定される追加のリポジトリも含まれるでしょう。例えば Artifactory のような社内のartifactサーバを使って、ファイアウォールの背後からSparkがartifactを解決できるようにするのに便利です。ファイルフォーマットの設定の詳細は http://ant.apache.org/ivy/history/latest-milestone/settings.html で見つけることができます
spark.jars.repositories --packages あるいは spark.jars.packagesを使って与えられるmaven coordinateのために検索される、カンマ区切りの追加のリモートリポジトリ。
spark.pyspark.driver.python ドライバ内でPySparkが使うPythonバイナリ実行ファイル。(デフォルトは spark.pyspark.pythonです)
spark.pyspark.python ドライバと実行ファイルの両方の中でPySparkが使うPythonバイナリ実行ファイル。

シャッフルの挙動

プロパティ名デフォルト意味
spark.reducer.maxSizeInFlight 48m 各reduceタスクから同時に取り出すmap出力の最大サイズ。別に指定されていなければMiB単位。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。
spark.reducer.maxReqsInFlight Int.MaxValue この設定はリモートリクエストの数を指定された値までブロックの検索を制限します。クラスタ内のホストの数が増えた場合は、1つ以上のノードからやってくる接続の数が大量になり、ワーカーが負荷のために落ちるかも知れません。取得リクエストの数を制限できるようにすることで、このシナリオは緩和されるかも知れません。
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue この設定は指定されたホスト ポートからのreduceタスク毎に取得されるリモートブロックの数を制限します。大量のブロックが1つの取得あるいは並行して指定されたアドレスからリクエストされる場合、これは提供するexecutorあるいはノードマネージャーをクラッシュするかもしれません。これは外部シャッフルが有効な時にノードマネージャーの負荷を減らすのに特に有用です。それを低い値に設定することでこの問題を緩和することができます。
spark.maxRemoteBlockSizeFetchToMem Int.MaxValue - 512 ブロックのサイズがこの閾値のバイトよりも上の場合、リモートのブロックはディスクに取りに行くでしょう。これは巨大なリクエストがあまりに大きなメモリを要求することを避けるためのものです。デフォルトでは、どのようなリソースが利用可能であっても、直接メモリに取り出すことができないため、これは blocks > 2GB についてのみ有効にされます。しかし、もっと少ない値に下げることも同様にできます (例えば 200m) 小さいブロック上にあまりに大きなメモリを使うことを避けるために。この設定はシャッフルの取得およびブロックマネージャーのリモートブロックの取得の両方に影響するだろうことに注意してください。外部シャッフルサービスを有効にしたユーザについては、この機能は外部シャッフルサービスがSpark2.2より新しい場合にのみ使われるかもしれません。
spark.shuffle.compress true map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。
spark.shuffle.file.buffer 32k 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。別に指定されていなければMiB単位。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。
spark.shuffle.io.maxRetries 3 (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。
spark.shuffle.io.numConnectionsPerPeer 1 (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。
spark.shuffle.io.preferDirectBufs true (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。
spark.shuffle.io.retryWait 5s (Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWaitとして計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。
spark.shuffle.service.enabled false 外部のシャッフルサービスを有効にします。このサービスはexecutorが安全に削除されるようにexecutorによって書き込まれるシャッフルファイルを保持します。もしspark.dynamicAllocation.enabled が"true"であれば、これは有効でなければなりません。外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentation を見てください。
spark.shuffle.service.port 7337 外部のシャッフルサービスが実行されるだろうポート。
spark.shuffle.service.index.cache.size 100m 特定のメモリのフットプリントに制限されたキャッシュエントリのバイト。
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE シャッフルサービス上で同時に転送されることができるチャンクの最大数。新しくやってくる接続は最大の数に達した時に閉じられるだろうことに注意してください。クライアントはシャッフル再試行の設定に応じて再試行するでしょう(spark.shuffle.io.maxRetriesspark.shuffle.io.retryWaitを見てください)。もしこれらの制限に達するとタスクは取得失敗で失敗するでしょう。
spark.shuffle.sort.bypassMergeThreshold 200 (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。
spark.shuffle.spill.compress true シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codecを使うでしょう。
spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 HighlyCompressedMapStatus内のシャッフルブロックのサイズである上のバイトの閾値は正確に記録されます。 これはシャッフルブロックを取り出す時にシャッフルブロックのサイズを低めに見積もることを避けることでOOMを避けるのに役立ちます。
spark.shuffle.registration.timeout 5000 外部シャッフルサービスへの登録のためのミリ秒のタイムアウト。
spark.shuffle.registration.maxAttempts 3 外部シャッフルサービスに登録を失敗した時、maxAttempts回まで再試行するでしょう。

Spark UI

プロパティ名デフォルト意味
spark.eventLog.logBlockUpdates.enabled false spark.eventLog.enabled がtrueの時に、各ブロックの更新毎にイベントを記録するかどうか。*Warning*: これはイベントログのサイズをかなり増やすでしょう。
spark.eventLog.longForm.enabled false trueであれば、イベントログに長い形式の呼び出しを使います。そうでなければ短い形式を使います。
spark.eventLog.compress false もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。圧縮は spark.io.compression.codec を使うでしょう。
spark.eventLog.dir file:///tmp/spark-events spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。
spark.eventLog.enabled false アプリケーションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。
spark.eventLog.overwrite false 既存のファイルを上書きするかどうか。
spark.eventLog.buffer.kb 100k 出力ストリームに書き込む時に使用するバッファサイズ。別に指定されていなければKiB単位。
spark.ui.dagGraph.retainedRootRDDs Int.MaxValue ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのDAGグラフノードを記憶するか。
spark.ui.enabled true Sparkアプリケーションのためにweb UIを実行するかどうか。
spark.ui.killEnabled true ジョブとステージをweb UIからkillすることができます。
spark.ui.liveUpdate.period 100ms ライブ エンティティを更新する頻度。-1 はアプリケーションの再生時に "更新しない" 事を意味します。つまり最後の書き込みだけが起こるでしょう。稼働中のアプリケーションについては、着信タスクイベントを迅速に処理する場合に無くてもやり過ごせる幾つかの操作を回避します。
spark.ui.liveUpdate.minFlushPeriod 1s 古いUIデータがフラッシュされるまでの最小経過時間。着信タスクイベントが頻繁に発生しない場合にUIの陳腐化を回避します。
spark.ui.port 4040 メモリおよび作業データを表示する、アプリケーションのダッシュボードのポート。
spark.ui.retainedJobs 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。
spark.ui.retainedStages 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。
spark.ui.retainedTasks 100000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのタスクを記憶するか。これは目的の最大値で、ある環境ではより少ない要素が維持されるかも知れません。
spark.ui.reverseProxy false ワーカーとアプリケーションUIのためにリバースプロキシとしてSparkマスターの実行を有効にします。このモードでは、Sparkマスターはホストへの直接アクセスを必要とせずにワーカーとアプリケーションをリバースプロキシするでしょう。注意して使ってください。ワーカーとアプリケーションUIは直接アクセスできないため、sparkのマスター/プロキシ public URLを使ってアクセスできるだけでしょう。この設定はクラスタ内の全てのワーカーとアプリケーションUIに影響し、全てのワーカー、ドライバーおよびマスターで設定されなければなりません。
spark.ui.reverseProxyUrl これはプロキシを実行しているURLです。このURLはSparkマスターの前で実行しているプロキシのものです。認証のためにプロキシを実行している場合に便利です。例えば、QAuthプロキシ。プロキシに到達するスキーマ (http/https) およびポートを含む完全なURLにしてください。
spark.ui.showConsoleProgress true コンソール内に進捗バーを表示します。進捗バーは500msより長く実行しているステージの進捗を示します。複数のステージが同時に実行される場合は、複数の進捗バーが同じ行に表示されるでしょう。
spark.worker.ui.retainedExecutors 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。
spark.worker.ui.retainedDrivers 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。
spark.sql.ui.retainedExecutions 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。
spark.ui.retainedDeadExecutors 100 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのdeadのexecutorを記憶するか。
spark.ui.filters None Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。
spark.<class name of filter>.param.<param name>=<value>の形式の構成エントリを設定することで、フィルターパラメータも設定内で指定することができます
例えば:
spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
spark.ui.requestHeaderSize 8k HTTPリクエストヘッダの最大許容サイズ。指定されない場合はバイト単位。この設定はSpark履歴サーバにも適用されます。

圧縮および直列化

プロパティ名デフォルト意味
spark.broadcast.compress true ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。
spark.io.compression.codec lz4 RDDパーティション、イベントログ、ブロードキャスト変数 およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは4つのコーディックを提供します: lz4, lzf, snappyおよび zstd。コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec および org.apache.spark.io.ZStdCompressionCodec
spark.io.compression.lz4.blockSize 32k LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズのバイト数。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。
spark.io.compression.snappy.blockSize 32k Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズのバイト数。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。
spark.io.compression.zstd.level 1 Zstd圧縮コーディックの圧縮レベル。圧縮レベルを増やすとCPUとメモリの消費を犠牲にして結果がより良い圧縮となるでしょう。
spark.io.compression.zstd.bufferSize 32k Zstd圧縮コーディックが使用される場合、Zstd圧縮で使用されるブロックサイズのバイト数。このサイズを小さくするとZstdが使われる時のシャッフルメモリ率が低くなりますが、過度のJNI呼び出しのオーバーヘッドにより圧縮コストが増えるかもしれません。
spark.kryo.classesToRegister (none) Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。
spark.kryo.referenceTracking true Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。
spark.kryo.registrationRequired false Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。
spark.kryo.registrator (none) Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにカンマ区切りのクラスのリストを渡してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。 KryoRegistratorを拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。
spark.kryo.unsafe false 安全では無いKryoベースのシリアライザを使うかどうか。安全では無いIOベースを使うことで実質的に速くすることができます。
spark.kryoserializer.buffer.max 64m kryoのシリアライズバッファの最大許容サイズ。別に指定されていなければMiB単位。これはシリアライズ化しようとしているどのオブジェクトよりも大きくなければならず、2048mより小さくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。
spark.kryoserializer.buffer 64k Kryoのシリアライズ化バッファの初期サイズ。別に指定されていなければKiB単位。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。
spark.rdd.compress false シリアライズされたパーティションを圧縮するかどうか(例えば、JavaとScalaでのStorageLevel.MEMORY_ONLY_SER、あるいは PythonでのStorageLevel.MEMORY_ONLY)。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。圧縮はspark.io.compression.codecを使うでしょう。
spark.serializer org.apache.spark.serializer.
JavaSerializer
ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializerのどのようなサブクラスにもなることができます。
spark.serializer.objectStreamReset 100 org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。

メモリ管理

プロパティ名デフォルト意味
spark.memory.fraction 0.6 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。この設定の目的は内部メタデータ、ユーザデータ構造、およびまばらな場合の不正確なサイズの見積もりのためにメモリを取って置くことで、非常に大きなレコードです。これはデフォルトの値にしておくことをお勧めします。この値を増加した場合の正しいJVMガベージコレクションの調整についての重要な情報を含む詳細は、この説明を見てください。
spark.memory.storageFraction 0.5 立ち退きに耐性のあるストレージメモリの量。spark.memory.fractionによって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。
spark.memory.offHeap.enabled false trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。
spark.memory.offHeap.size 0 オフヒープ割り当てのために使うことができる絶対メモリ量のバイト数。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=trueの場合は、これは正の値に設定されなければなりません。
spark.memory.useLegacyMode false Spark 1.5以前で使われていた古いメモリ管理モードを有効にするかどうか。以前のモードではヒープ領域を頑なに固定サイズの領域に分割します。アプリケーションがチューニングされていない場合は過度にこぼれることになります。これが有効にされない場合、以下の非推奨のメモリー断片設定は読み込まれません: spark.shuffle.memoryFraction
spark.storage.memoryFraction
spark.storage.unrollFraction
spark.shuffle.memoryFraction 0.2 (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。シャッフルの間に集約および集団を作るために使われるJavaヒープの断片。いつでも、シャッフルに使われる全てのインメモリのマップの全体のサイズはこの制限によって束縛されます。これを超えた内容なディスクにこぼれ始めるでしょう。流し込みが頻繁に起きる場合は、spark.storage.memoryFractionを犠牲にしてこの値を増やすことを考えます。
spark.storage.memoryFraction 0.6 (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。Sparkのメモリキャッシュによって使用されるJava ヒープの断片。これはJVM内のオブジェクトの古い世代より大きくてはいけません。デフォルトではヒープの0.6ですが、古い世代のサイズを独自に設定した場合はそれを増やすことができます。
spark.storage.unrollFraction 0.2 (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。メモリ内の展開されないブロックが使う spark.storage.memoryFraction の断片。新しいブロックを完全に展開するために十分なストレージスペースが無い場合に、既存のブロックを削除することで動的に割り当てられます。
spark.storage.replication.proactive false RDDブロックのために積極的なブロックリプリケーションを有効にする。executorの障害により失われたキャッシュされたRDDのブロックレプリカは、既存の利用可能なレプリカがある限り補充されます。これはブロックのレプリケーション レベルを初期の数にしようとします。
spark.cleaner.periodicGC.interval 30min ガベージコレクションどれだけの頻度で起動するかを制御します。

このコンテキストクリーナーは弱い参照がガベージコレクトされた時のみ掃除されます。ドライバ上でほとんどメモリの圧迫が無いような大きなドライバJVMを使った長く実行中のアプリケーション内では、これは極めて稀か全く起きないかもしれません。全く掃除しないことはしばらくした後でexecutorがディスク空間外で実行することに繋がるかもしれません。
spark.cleaner.referenceTracking true コンテキストの掃除を有効または無効にします。
spark.cleaner.referenceTracking.blocking true クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します (シャッフル以外、これはspark.cleaner.referenceTracking.blocking.shuffle Sparkプロパティによって制御されます)。
spark.cleaner.referenceTracking.blocking.shuffle false シャッフル クリーンアップ タスク上で掃除スレッドが阻止するかどうかを制御します。
spark.cleaner.referenceTracking.cleanCheckpoints false 参照がスコープ外の場合にチェックポイントファイルを掃除するかどうかを制御します。

Executionの挙動

プロパティ名デフォルト意味
spark.broadcast.blockSize 4m TorrentBroadcastFactoryのためのブロックの各断片のサイズ。別に指定されていなければKiB単位。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManagerはパフォーマンスの打撃を受けるかも知れません。
spark.broadcast.checksum true ブロードキャストのためにチェックサムを有効にするかどうか。有効にされた場合、ブロードキャストはチェックサムを含むでしょう。これは少し多くのデータの計算と送信という代償を払って、間違ったブロックの検知を手助けすることができます。ネットワークがブロードキャスト中にデータの間違いが起きない保証をするための他の機構を持つ場合、無効にすることができます。
spark.executor.cores YARNモードの場合は1、スタンドアローンモードとMesos coarse-grainedモードの場合はワーカー上の全ての利用可能なコア。 各executor上で使用されるコアの数。スタンドアローンおよびMesosの coarse-grained モードでの詳細については、this descriptionを見てください。
spark.default.parallelism reduceByKey および joinのような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelizeのような操作については、クラスタマネージャーに依存します:
  • ローカルモード: ローカルマシーン上のコアの数
  • Mesos fine grained mode: 8
  • その他: 全てのexecutorノードのコアの総数あるいは2のどちらか大きいほう
ユーザによって設定されなかった場合は、 join, reduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。
spark.executor.heartbeatInterval 10s ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。spark.executor.heartbeatInterval は spark.network.timeout よりかなり小さくなければなりません。
spark.files.fetchTimeout 60s ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。
spark.files.useFetchCache true true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。
spark.files.overwrite false ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。
spark.files.maxPartitionBytes 134217728 (128 MB) ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。
spark.files.openCostInBytes 4194304 (4 MB) ファイルを開くための予測コストは同じ時間で操作することができるバイト数によって計測することができます。これは複数のファイルを1つのパーティションに配置する場合に使われます。過剰に予測するほうが良いです。そうすれば、小さなファイルを持つパーティションは大きなファイルを持つパーティションよりも高速になるでしょう。
spark.hadoop.cloneConf false trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。
spark.hadoop.validateOutputSpecs true trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。
spark.storage.memoryMapThreshold 2m ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズのバイト数。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 ファイル出力コミッタのアルゴリズムのバージョン。有効なアルゴリズムのバージョン番号: 1 あるいは 2。バージョン 2 はよりパフォーマンスが良いかも知れませんが、ある状況では MAPREDUCE-4815により、バージョン 1が障害をより良く扱うかも知れません。

ネットワーク

プロパティ名デフォルト意味
spark.rpc.message.maxSize 128 "control plane"通信内で許される最大のメッセージ(MB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。RPCメッセージに関するメッセージが表示されるでしょう。
spark.blockManager.port (random) 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。
spark.driver.blockManager.port (spark.blockManager.port の値) ブロックマネージャーがlistenするドライバー固有のポート。executorとして同じ設定を使うことができない場合。
spark.driver.bindAddress (spark.driver.host の値) listenソケットをバインドするホスト名またはIPアドレス。この設定は SPARK_LOCAL_IP 環境変数(以下を見てください)を上書きします。
executorあるいは外部のシステムに知らせるためにローカルのものから異なるアドレスにすることもできます。これは例えばブリッジネットワークのコンテナを動かしている場合に便利です。これが正しく動作するためには、ドライバー(RPC, ブロックマネージャー および UI)によって使われる異なるポートがコンテナのホストからフォワードされる必要があります。
spark.driver.host (ローカル ホスト名) ドライバーのホスト名とIPアドレスexecutorとスタンドアローンマスターが通信するために使われます。
spark.driver.port (random) ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。
spark.network.timeout 120s 全てのネットワークの相互交流のタイムアウト。この設定は、spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout あるいは spark.rpc.lookupTimeout が設定されていない場合に、それらの代わりに使われるでしょう。
spark.port.maxRetries 16 ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。
spark.rpc.numRetries 3 RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。
spark.rpc.retry.wait 3s RPCのask操作が再試行するまで待つ期間。
spark.rpc.askTimeout spark.network.timeout RPCのask操作がタイムアウトするまで待つ期間。
spark.rpc.lookupTimeout 120s RPCリモートエンドポイントがタイムアウトするまで待つ時間。
spark.core.connection.ack.wait.timeout spark.network.timeout 接続がタイムアウトして諦めるまでにどれだけ長くackを待つか。GCのような長い休止で不本意なタイムアウトを避けるために、大きな値を設定することができます。

スケジューリング

プロパティ名デフォルト意味
spark.cores.max (not set) <c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores、Mesos上では無制限(利用可能な全てのコア)になります。
spark.locality.wait 3s データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.nodeなどを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。
spark.locality.wait.node spark.locality.wait ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。
spark.locality.wait.process spark.locality.wait プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。
spark.locality.wait.rack spark.locality.wait rack局地性を待つための局地性をカスタマイズします。
spark.scheduler.maxRegisteredResourcesWaitingTime 30s スケジュールが始まる前にリソースが登録するための最大待ち時間。
spark.scheduler.minRegisteredResourcesRatio KUBERNETES modeでは 0.8; YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。 スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードとKubernetesモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTimeによって設定されます。
spark.scheduler.mode FIFO 同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。
spark.scheduler.revive.interval 1s スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。
spark.scheduler.listenerbus.eventqueue.capacity 10000 Spark listenerバスでのイベントキューの容量。0より大きくなければなりません。listenerのイベントが零れる場合は、値を増やすことを考えてください (例えば 20000)。この値を増やすとドライバがもっとメモリを使うことになるかもしれません。
spark.scheduler.blacklist.unschedulableTaskSetTimeout 120s 完全にブラックリスト化されたためにスケジュールできないTaskSetを中止する前に、新しいexecutorの獲得とタスクのスケジュールを待つタイムアウトの秒数。
spark.blacklist.enabled false "true"に設定すると、あまりに多くのタスクの失敗によってブラックリストに入れられたexecutor上でSparkがタスクをスケジュールすることを防ぎます。ブラックリストのアルゴリズムは他の"spark.blacklist"設定オプションによって更に制御することができます。
spark.blacklist.timeout 1h (実験的) ノードあるいはexecutorが新しいタスクを実行しようとしてブラックリストから無条件に削除される前に、どれくらい長くノードあるいはexecutorがアプリケーション全体でブラックリスト化されるか。
spark.blacklist.task.maxTaskAttemptsPerExecutor 1 (実験的) 指定されたタスクについて、1つのexecutorがそのタスクがブラックリストされる前にそのexecutor上でどれだけ再試行できるか。
spark.blacklist.task.maxTaskAttemptsPerNode 2 (実験的) 指定されたタスクについて、1つのノードがそのタスクがブラックリストされる前にそのノード上でどれだけ再試行できるか。
spark.blacklist.stage.maxFailedTasksPerExecutor 2 (実験的) executorが1つのステージについてブラックリストされる前に、1つのexecutor上のそのステージ内でどれだけの異なるタスクを失敗しなければならないか。
spark.blacklist.stage.maxFailedExecutorsPerNode 2 (実験的) ノード全体が指定されたステージについて失敗だとマークされる前に、どれだけの異なるexecutorがそのステージについてブラックリストだとマークされるか。
spark.blacklist.application.maxFailedTasksPerExecutor 2 (実験的) executorがアプリケーション全体でブラックリストされる前に、連続するタスクのセットの中でどれだけの異なるタスクが1つのeecutor上で失敗しなければならないか。ブラックリスト化されたexecutorはspark.blacklist.timeoutで指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、executorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。
spark.blacklist.application.maxFailedExecutorsPerNode 2 (実験的) ノードがアプリケーション全体でブラックリストされる前に、どれだけの異なるexecutorがアプリケーション全体でブラックリストされなければならないか。ブラックリスト化されたノードはspark.blacklist.timeoutで指定されたタイムアウトの後で利用可能なリソースのプールに自動的に戻されます。動的割り当てにより、ノード上のexecutorは仕事をしていないと印を付けられ、クラスタマネージャーによって取り戻されるかも知れないことに注意してください。
spark.blacklist.killBlacklistedExecutors false (実験的) "true" に設定すると、spark.blacklist.application.* によって制御されるため取得時の障害あるいはアプリケーション全体でブラックリストに入れられた時にSparkが自動的にexecutorをkillすることができます。ノード全体がブラックリストに追加された場合、ノード上の全てのexecutorはkillされるだろうことに注意してください。
spark.blacklist.application.fetchFailure.enabled false (実験的) もし "true" に設定すると、取得の失敗が起きた場合にSparkはexecutorをすぐにブラックリストに入れるでしょう。もし外部シャッフルサービスが有効な場合、ノード全体がブラックリストに入れられるでしょう。
spark.speculation false "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。
spark.speculation.interval 100ms どれだけの頻度でSparkが投機するためにタスクをチェックするか。
spark.speculation.multiplier 1.5 平均より遅いタスクが何回投機と見なされるか。
spark.speculation.quantile 0.75 指定のステージで投機が有効になる前にどれだけのタスクの割合が終了していなければならないか。
spark.task.cpus 1 各タスクごとに割り当てるコアの数。
spark.task.maxFailures 4 ジョブを諦める前のタスクの失敗の数。異なるタスクに渡って広がっている失敗の総数はジョブを失敗させないでしょう; 特定のタスクがこの試行の数を失敗しなければなりません。1以上でなければなりません。許可された再試行の数 = この値 - 1.
spark.task.reaper.enabled false killed / interrupted タスクの監視を有効にする。trueに設定した場合、killされた全てのタスクはタスクが実際に実行を完了するまでexecutorによって監視されるでしょう。この監視の正確な挙動を制御する方法についての詳細は、他のspark.task.reaper.*設定を見てください。false (デフォルト)に設定すると、タスクのkill はそのような監視が欠けている古いコードを使うでしょう。
spark.task.reaper.pollingInterval 10s spark.task.reaper.enabled = trueの時、この設定はexecutorがkillされたタスクの状態をpollする頻度を制御します。pollされた時にkillされたタスクがまだ実行中の場合は、警告が記録され、デフォルトではタスクのthread-dumpが記録されるでしょう(この thread dump は spark.task.reaper.threadDump 設定によって無効にすることができます。これは以下で説明されます)。
spark.task.reaper.threadDump true spark.task.reaper.enabled = trueの場合、この設定はkillされたタスクの定期的なpollの際にタスクの thread dumps が記録されるかどうかを制御します。thread dumpの収集を無効にするには、これをfalseに設定します。
spark.task.reaper.killTimeout -1 spark.task.reaper.enabled = trueの場合、この設定は、killされたタスクが実行を停止していない場合にexecutor JVMがそれ自身をkillするだろうタイムアウトを指定します。 デフォルト値 -1 は、この機構を無効にし、executorが自己破壊をすることを避けます。この設定の目的は、キャンセル不可能なタスクの暴走がexecutorを不安定にすることを避けるための安全策として振舞うことです。
spark.stage.maxConsecutiveAttempts 4 ステージが中止されるまでに許される連続するステージの試行の数。

動的割り当て

プロパティ名デフォルト意味
spark.dynamicAllocation.enabled false 動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。詳細は ここの説明を見てください。

これには spark.shuffle.service.enabled が設定されることが必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, spark.dynamicAllocation.initialExecutorsおよび spark.dynamicAllocation.executorAllocationRatio
spark.dynamicAllocation.executorIdleTimeout 60s 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 動的割り当てが有効な場合、実行するexecutorの初期の数。

もし `--num-executors` (あるいは `spark.executor.instances`) が設定され、この値よりも大きい場合は、executorの初期の数として使われるでしょう。
spark.dynamicAllocation.maxExecutors infinity 動的割り当てが有効な場合のexecutorの数の上限。
spark.dynamicAllocation.minExecutors 0 動的割り当てが有効な場合のexecutorの数の下限。
spark.dynamicAllocation.executorAllocationRatio 1 デフォルトで、動的な割り当ては処理するタスクの数に応じて並行度を最大にするのに十分なexecutorを要求するでしょう。これはジョブのレイテンシを最小化しますが、いくつかのexecutorは何も仕事をしないかもしれないため、この設定を持つ小さなタスクはexecutorの割り当てのオーバーヘッドにより多くのリソースを浪費するかもしれません。この設定によりexecutorの数を減らすために使われる割合を設定することができます。完全な並行度に関して。最大の並行度を与えるために、デフォルトは1.0です。0.5 はexecutorの目標値を2で割るでしょう。dynamicAllocationによって計算されるexecutorの目標値は spark.dynamicAllocation.minExecutorsspark.dynamicAllocation.maxExecutors の設定によって上書きすることもできます。
spark.dynamicAllocation.schedulerBacklogTimeout 1s もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout spark.dynamicAllocation.schedulerBacklogTimeoutと同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は ここの説明を見てください。

セキュリティ

異なるSparkサブシステムを安全にする方法について利用可能なオプションについては、Security ページを参照してください。

Spark SQL

SET -v コマンドの実行は、SQL設定の完全なリストを表示するでしょう。

// spark is an existing SparkSession
spark.sql("SET -v").show(numRows = 200, truncate = false)
// spark is an existing SparkSession
spark.sql("SET -v").show(200, false);
# spark is an existing SparkSession
spark.sql("SET -v").show(n=200, truncate=False)
sparkR.session()
properties <- sql("SET -v")
showDF(properties, numRows = 200, truncate = FALSE)

Spark ストリーミング

プロパティ名デフォルト意味
spark.streaming.backpressure.enabled false Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRateおよびspark.streaming.kafka.maxRatePerPartitionが設定されている場合に、この値で上限を制限されます (以下を見てください)。
spark.streaming.backpressure.initialRate not set これはバックプレッシャー機構が有効な場合に最初のバッチのために各レシーバーがデータを受信する初期の最大受信レートです。
spark.streaming.blockInterval 200ms Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。詳細はSpark ストリーミング プログラミング ガイドのパフォーマンス チューニングの章を見てください。
spark.streaming.receiver.maxRate not set 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。
spark.streaming.receiver.writeAheadLog.enable false レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。
spark.streaming.unpersist true Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。
spark.streaming.stopGracefullyOnShutdown false trueの場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。
spark.streaming.kafka.maxRatePerPartition not set 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。
spark.streaming.kafka.minRatePerPartition 1 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最小レート(秒間あたりのレコード数)。
spark.streaming.kafka.maxRetries 1 ドライバーが各パーティションのリーダー上で最新のオフセットを見つけるために作成する連続する試行の最大数(デフォルト値1はドライバーが最大2の試行をすることを意味します)。stream APIを差している新しいKafkaにのみ適用されます。
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false ドライバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。ドライバー上のメタデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false レシーバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。レシーバー上のデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。

SparkR

プロパティ名デフォルト意味
spark.r.numRBackendThreads 2 SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。
spark.r.command Rscript ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。
spark.r.driver.command spark.r.command ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。
spark.r.shell.command R ドライバーのためのクライアントモードのsparkRシェルの実行のための実行ファイル。クラスターモードでは無視されます。環境変数 SPARKR_DRIVER_R と同じですが、それより優先されます。spark.r.shell.command はsparkRシェルのために使われますが、spark.r.driver.command は R スクリプトを実行するために使われます。
spark.r.backendConnectionTimeout 6000 Rプロセスによって設定されるRBackendへの接続上の接続タイムアウトの秒数。
spark.r.heartBeatInterval 100 接続タイムアウトを防ぐためにSparkRバックエンドからRプロセスに送信されるハートビートの間隔。

GraphX

プロパティ名デフォルト意味
spark.graphx.pregel.checkpointInterval -1 Pregelでのグラフとメッセージのためのチェックポイント間隔。多くの繰り返しの連鎖による stackOverflowError を避けるために使われます。デフォルトではチェックポイントは無効です。

配備

プロパティ名デフォルト意味
spark.deploy.recoveryMode NONE クラスターモードでサブミットされたSparkジョブが失敗し再起動する場合に、回復の設定をするリカバリモード。スタンダードあるいはMesosで動いている場合にクラスタモードでのみ適用可能です。
spark.deploy.zookeeper.url None `spark.deploy.recoveryMode` が ZOOKEEPER に設定されている場合は、この設定は接続するためのzookeeperのURLに設定するために使われます。
spark.deploy.zookeeper.dir None `spark.deploy.recoveryMode` がZOOKEEPERに設定されている場合は、この設定はリカバリー状態を保持するためのzookeeperディレクトリを設定するために使われます。

クラスタマネージャー

Sparkの各クラスタマネージャーは追加の設定オプションを持ちます。各ノードのためのページ上で設定を見つけることができます。

YARN

Mesos

Kubernetes

スタンドアローンモード

環境変数

あるSpark設定は環境変数によって設定することができ、それらはSparkがインストールされたディレクトリ内のconf/spark-env.shスクリプトによって使われます(Windows上ではconf/spark-env.cmd)。スタンドアローンおよびMesosモードでは、このファイルはホスト名のようなマシーン固有の情報を渡すことができるでしょう。ローカルのアプリケーションの実行あるいはスクリプトのサブミットの場合、それは開始場所にもなります。

conf/spark-env.sh はSparkがインストールされた場合はデフォルトでは存在しないことに注意してください。しかし、それを作成するためにconf/spark-env.sh.template をコピーすることができます。コピーを実行可能にすることを忘れないでください。

以下の変数はspark-env.shの中で設定することができます:

環境変数意味
JAVA_HOME Javaがインストールされた場所(デフォルトのPATH上に無い場合)。
PYSPARK_PYTHON PySparkのためにドライバおよびワーカーの両方の中で使うPythonの実行可能バイナリ(デフォルトは、利用可能であればpython2.7。そうでなければpythonです)。設定されている場合はプロパティspark.pyspark.python が優先されます。
PYSPARK_DRIVER_PYTHON PySparkのためにドライバの中でのみ使うPythonの実行可能バイナリ(デフォルトはPYSPARK_PYTHONです)。設定されている場合はプロパティspark.pyspark.driver.python が優先されます。
SPARKR_DRIVER_R SparkRシェルのために使われるRバイナリ実行ファイル(デフォルトは R)。設定されている場合はプロパティspark.r.shell.command が優先されます。
SPARK_LOCAL_IP バインドするマシーンのIPアドレス。
SPARK_PUBLIC_DNS Sparkプログラムが他のマシーンに知らせるホスト名。

上に加えて、各マシーンで使うコアの数や最大メモリのような、Sparkスタンドアローン クラスタ スクリプトを設定するためのオプションもあります。

spark-env.shはシェルスクリプトなので、それらのいくつかはプログラム的に設定することができます - 例えば、特定のネットワークインタフェースのIPを調べることでSPARK_LOCAL_IPを計算することができるかもしれません。

注意: cluster モードでYARN上のSparkを実行する場合は、 conf/spark-defaults.conf ファイル内のspark.yarn.appMasterEnv.[EnvironmentVariableName] を使って環境変数が設定される必要があります。spark-env.shに設定されている環境変数は、clusterモードのYARN マスタープロセス内には反映されないでしょう。詳細についてはYARNに関係する Spark プロパティ を見てください。

ログの設定

Sparkはログのためにlog4jを使います。confディレクトリに log4j.properties ファイルを追加することで設定することができます。開始する一つの方法は、そこに存在する既存のlog4j.properties.template をコピーすることです。

設定ディレクトリの上書き

デフォルトの"SPARK_HOME/conf"以外に異なる設定ディレクトリを指定するために、SPARK_CONF_DIRを設定することができます。Sparkはこのディレクトリから設定ファイル(spark-defaults.conf, spark-env.sh, log4j.properties など)を使用するでしょう。

Hadoopクラスタ設定の継承

Sparkを使ってHDFSから読み込みをするつもりであれば、Sparkのクラスパス上に含まれるべき二つのHadoop設定ファイルがあります。

これらの設定ファイルの場所はHadoopおよびHDPバージョンによって変わりますが、一般的な場所は/etc/hadoop/confの中です。幾つかのツールはその場で設定を生成しますが、それらのコピーをダウンロードするための仕組みを提供します。

それらのファイルがSparkに見えるようにするためには、$SPARK_HOME/conf/spark-env.sh内のHADOOP_CONF_DIRを設定ファイルを含む場所に設定します。

独自の Hadoop/Hive 設定

もしSparkアプリケーションがHadoop, Hive あるいは両方とやり取りをする場合、Sparkのクラスパス内におそらくHadoop/Hiveの設定ファイルがあります。

複数の実行中のアプリケーションは異なるHadoop/Hiveクライアント側の設定を必要とするかもしれません。各アプリケーションごとにSparkのクラスパス内の hdfs-site.xml, core-site.xml, yarn-site.xml, hive-site.xml をコピーし修正することができます。YARN上で実行中のSparkクラスタ内ではこれらの設定ファイルはクラスタ全体で設定され、アプリケーションによって安全に変更することができません。

spark.hadoop.*の形式でspark hadoopプロパティを使うことはより良い選択です。それらは$SPARK_HOME/conf/spark-defaults.conf内で設定することができる通常のsparkプロパティと同じと見なすことができます

時には、ある設定をSparkConfにハードコーディングしたくないかも知れません。例えば、Sparkを使って単純に空のconfを作成し、 spark/spark hadoop プロパティを設定することができます。

val conf = new SparkConf().set("spark.hadoop.abc.def","xyz")
val sc = new SparkContext(conf)

また、実行時に設定を修正あるいは追加することができます:

./bin/spark-submit \ 
  --name "My app" \ 
  --master local[4] \  
  --conf spark.eventLog.enabled=false \ 
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ 
  --conf spark.hadoop.abc.def=xyz \ 
  myApp.jar
TOP
inserted by FC2 system