Sparkの設定

Sparkはシステムを設定するための3つの場所を提供します:

Sparkのプロパティ

Spark プロパティはほとんどのアプリケーションの設定を制御し、各アプリケーションに対して別々に設定されます。これらのプロパティは SparkContextに渡して直接SparkConf に設定することができます。SparkConfを使って共通プロパティの幾つかを設定(例えば、マスターURLおよびアプリケーション名)することができ、set()メソッドを使って任意のキー値ペアを設定することができます。例えば、以下のように2つのスレッドを使ってアプリケーションを初期化することができます:

local[2]をつけて実行、2つのスレッド - "最小限"の並列処理、することにより、分散コンテキストで実行した場合にのみ存在するバグを検出するのに役立つことに注意してください。

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

ローカルモードで1つ以上のスレッドを持つことができ、実際にSpark Streamingのような場合にはリソース不足を避けるために1つ以上のスレッドを要求するかも知れないということに注意してください。

なんらかの期間を指定するプロパティは時間の単位で設定されなければなりません。以下の書式が受け付けられます:

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

バイトサイズを指定するプロパティはサイズの単位を使って設定されなければなりません。以下の書式が受け付けられます:

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

Sparkプロパティの動的なロード

時には、ある設定をSparkConfにハードコーディングしたくないかも知れません。例えば、もし同じアプリケーションを異なるマスターあるいは異なるメモリ量で実行したいなど。Sparkは空のconfを単純に作成することができます:

val sc = new SparkContext(new SparkConf())

そうすると、実行時に設定値を提供することができます:

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark シェルおよび spark-submit ツールは動的に設定をロードする2つの方法を提供します。最初の一つは、上で説明したように、--masterのようなコマンドラインオプションです。spark-submit--conf フラグを使ってどのようなSpark プロパティも受け付けることができますが、Sparkアプリケーションを起動する時に役割があるプロパティのために特別なフラグを使うようにしてください。./bin/spark-submit --helpを実行すると、これらのオプションの完全なリストが表示されるでしょう。

bin/spark-submitは設定オプションを conf/spark-defaults.confからも読み込むでしょう。各行はホワイトスペースで区切られたキーと値です。例えば:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

フラグで指定された、あるいはプロパティファイルの全ての値はアプリケーションに渡され、SparkConfを通じてマージされるでしょう。SparkConfで直接設定されたプロパティは優先度が高く、その次はspark-submit あるいは spark-shellに渡されたもので、その次はspark-defaults.conf ファイル内のオプションです。以前のSparkのバージョンから2,3の設定キーの名前が変更されました; そのような場合、古いキーの名前はまだ受け付けられますが、新しいキーのどのインスタンスよりも優先度が低くなります。

Sparkプロパティのビュー

http://<driver>:4040 のアプリケーションUIは、"Environment"タブの中でSparkプロパティをリスト表示します。ここはプロパティが正しく設定されたか確認するのに便利な場所です。spark-defaults.conf, SparkConf, あるいはコマンドラインで明示的に指定された値だけが表示されるだろうことに注意してください。他の全ての設定プロパティについては、デフォルトの値が使われると見なすことができます。

利用可能なプロパティ

内部設定を制御するほとんどのプロパティは意味のあるデフォルト値を持ちます。最も一般的なオプションの幾つかは以下のように設定されます。

アプリケーションのプロパティ

プロパティ名デフォルト意味
spark.app.name (none) アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。
spark.driver.cores 1 ドライバープロセスのために使われるコアの数。クラスターモードのみです。
spark.driver.maxResultSize 1g 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズの制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。
spark.driver.memory 1g ドライバープロセスのために使われるメモリの量。例えば SparkContextが初期化される場所。(例えば、1g, 2g).
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.executor.memory 1g executorプロセスあたりに使用するメモリ量(例えば2g, 8g)。
spark.extraListeners (none) SparkListenerを実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。
spark.local.dir /tmp Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。注意: Spark 1.0 以降では、これはクラスタマネージャーによって、SPARK_LOCAL_DIRS (スタンドアローン, Mesos) あるいは LOCAL_DIRS (YARN) 環境変数で上書きされます。
spark.logConf false SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。
spark.master (none) 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。
spark.submit.deployMode (none) Sparkドライバプログラムの配備モード、"client"あるいは"cluster"、これはドライバープログラムをローカル("client")あるいはクラスタ内のノードの1つの上で遠隔で("cluster")起動することを意味します。

これとは別に、以下のプロパティも利用可能で、ある状況では有用かも知れません。

ランタイム環境

プロパティ名デフォルト意味
spark.driver.extraClassPath (none) ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.extraJavaOptions (none) ドライバに渡す特別なJVMオプションの文字列。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memoryを、クライアントモードでは--driver-memory コマンドラインオプションを使って設定することができます。
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.extraLibraryPath (none) ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.userClassPathFirst false (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。
spark.executor.extraClassPath (none) executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。
spark.executor.extraJavaOptions (none) executorに渡すための特別なJVMオプションの文字列例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。
spark.executor.extraLibraryPath (none) executor JVMを起動する場合に使う特別なライブラリパスを設定する。
spark.executor.logs.rolling.maxRetainedFiles (none) システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。
spark.executor.logs.rolling.enableCompression false executor ログの圧縮を有効にします。有効な場合、ロールされたexecutorログは圧縮されるでしょう。デフォルトは無効です。
spark.executor.logs.rolling.maxSize (none) executorのログがロールオーバーされる最大のファイルサイズをバイトで設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。
spark.executor.logs.rolling.strategy (none) executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するために>spark.executor.logs.rolling.maxSize を使用します。
spark.executor.logs.rolling.time.interval daily executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdaily, hourly, minutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFilesを見てください。
spark.executor.userClassPathFirst false (実験的なもの) spark.driver.userClassPathFirstと同じ機能ですが、executorインスタンスに適用されます。
spark.executorEnv.[EnvironmentVariableName] (none) EnvironmentVariableNameによって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。
spark.python.profile false Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles()によって表示されるか、ドライバーが終了する前に表示されるでしょう。sc.dump_profiles(path)によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。
spark.python.profile.dump (none) ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはptats.Stats()によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。
spark.python.worker.memory 512m 集約の間にpythonワーカープロセスごとに使用されるメモリの量。JVMメモリ文字列と同じフォーマット(例えば512m, 2g)。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。
spark.python.worker.reuse true Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。
spark.files 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。
spark.submit.pyFiles PythonアプリのためのPYTHONPATH上に配置されるカンマ区切りの .zip, .egg あるいは .py ファイル。
spark.jars ドライバーとexecutorのクラスパス上でインクルードされるローカルjarのカンマ区切りのリスト
spark.jars.packages ドライバーとexecutorのクラスパス上でインクルードされるmaven coordinateのjarのカンマ区切りのリストローカルのmavenリポジトリ、次にmaven central、そしてspark.jars.ivyで与えられた追加のリモートリポジトリを検索するでしょう。調整のためのフォーマットは groupId:artifactId:version でなければなりません。
spark.jars.excludes spark.jars.packagesで与えらえた依存を解決する間に依存性の衝突を避けるために除外する、groupId:artifactId のカンマ区切りのリスト。
spark.jars.ivy spark.jars.packagesを使って与えられるcoordinateのために検索される、カンマ区切りの追加のリモートリポジトリ。

シャッフルの挙動

プロパティ名デフォルト意味
spark.reducer.maxSizeInFlight 48m 各reduceタスクから同時に取り出すmap出力の最大サイズ。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。
spark.reducer.maxReqsInFlight Int.MaxValue この設定はリモートリクエストの数を指定された値までブロックの検索を制限します。クラスタ内のホストの数が増えた場合は、1つ以上のノードからやってくる接続の数が大量になり、ワーカーが負荷のために落ちるかも知れません。取得リクエストの数を制限できるようにすることで、このシナリオは緩和されるかも知れません。
spark.shuffle.compress true map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。
spark.shuffle.file.buffer 32k 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。
spark.shuffle.io.maxRetries 3 (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。
spark.shuffle.io.numConnectionsPerPeer 1 (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。
spark.shuffle.io.preferDirectBufs true (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。
spark.shuffle.io.retryWait 5s (Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWaitとして計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。
spark.shuffle.service.enabled false 外部のシャッフルサービスを有効にします。このサービスはexecutorが安全に削除されるようにexecutorによって書き込まれるシャッフルファイルを保持します。もしspark.dynamicAllocation.enabled が"true"であれば、これは有効でなければなりません。外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentation を見てください。
spark.shuffle.service.port 7337 外部のシャッフルサービスが実行されるだろうポート。
spark.shuffle.sort.bypassMergeThreshold 200 (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。
spark.shuffle.spill.compress true シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codecを使うでしょう。

Spark UI

プロパティ名デフォルト意味
spark.eventLog.compress false もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。
spark.eventLog.dir file:///tmp/spark-events spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。
spark.eventLog.enabled false アプリケーションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。
spark.ui.killEnabled true web uiからステージおよび対応するジョブをkillすることを許可する。
spark.ui.port 4040 芽折および作業データを表示する、アプリケーションのダッシュボードのポート。
spark.ui.retainedJobs 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。
spark.ui.retainedStages 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。
spark.ui.retainedTasks 100000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのタスクを記憶するか。
spark.worker.ui.retainedExecutors 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。
spark.worker.ui.retainedDrivers 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。
spark.sql.ui.retainedExecutions 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。
spark.ui.retainedDeadExecutors 100 ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのdeadのexecutorを記憶するか。

圧縮および直列化

プロパティ名デフォルト意味
spark.broadcast.compress true ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。
spark.io.compression.codec lz4 RDDパーティション、ブロードキャスト変数およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは3つのコーディックを提供します: lz4, lzfおよびsnappy. コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodecおよび org.apache.spark.io.SnappyCompressionCodec.
spark.io.compression.lz4.blockSize 32k LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。
spark.io.compression.snappy.blockSize 32k Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。
spark.kryo.classesToRegister (none) Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。
spark.kryo.referenceTracking true (Spark SQL Thrift サーバを使う場合は false) Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。
spark.kryo.registrationRequired false Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。
spark.kryo.registrator (none) Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにカンマ区切りのクラスのリストを渡してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。 KryoRegistratorを拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。
spark.kryoserializer.buffer.max 64m kryoのシリアライズバッファの最大許容サイズ。これはシリアライズしようと思っているどのオブジェクトよりも大きくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。
spark.kryoserializer.buffer 64k Kryoのシリアライズバッファの初期サイズ。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。
spark.rdd.compress false シリアライズされたパーティションを圧縮するかどうか(例えば、JavaとScalaでのStorageLevel.MEMORY_ONLY_SER、あるいは PythonでのStorageLevel.MEMORY_ONLY)。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。
spark.serializer org.apache.spark.serializer.
JavaSerializer (org.apache.spark.serializer.
Spark SQL Thriftサーバを使う場合のKryoSerializer)
ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializerのどのようなサブクラスにもなることができます。
spark.serializer.objectStreamReset 100 org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。

メモリ管理

プロパティ名デフォルト意味
spark.memory.fraction 0.6 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。この設定の目的は内部メタデータ、ユーザデータ構造、およびまばらな場合の不正確なサイズの見積もりのためにメモリを取って置くことで、非常に大きなレコードです。これはデフォルトの値にしておくことをお勧めします。この値を増加した場合の正しいJVMガベージコレクションの調整についての重要な情報を含む詳細は、この説明を見てください。
spark.memory.storageFraction 0.5 立ち退きに耐性のあるストレージメモリの量。s​park.memory.fractionによって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。
spark.memory.offHeap.enabled false trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。
spark.memory.offHeap.size 0 オフヒープ割り当てのために使うことができる絶対メモリ量のバイト数。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=trueの場合は、これは正の値に設定されなければなりません。
spark.memory.useLegacyMode false Spark 1.5以前で使われていた古いメモリ管理モードを有効にするかどうか。以前のモードではヒープ領域を頑なに固定サイズの領域に分割します。アプリケーションがチューニングされていない場合は過度にこぼれることになります。これが有効にされない場合、以下の非推奨のメモリー断片設定は読み込まれません: spark.shuffle.memoryFraction
spark.storage.memoryFraction
spark.storage.unrollFraction
spark.shuffle.memoryFraction 0.2 (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。シャッフルの間に集約および集団を作るために使われるJavaヒープの断片。いつでも、シャッフルに使われる全てのインメモリのマップの全体のサイズはこの制限によって束縛されます。これを超えた内容なディスクにこぼれ始めるでしょう。流し込みが頻繁に起きる場合は、spark.storage.memoryFractionを犠牲にしてこの値を増やすことを考えます。
spark.storage.memoryFraction 0.6 (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。Sparkのメモリキャッシュによって使用されるJava ヒープの断片。これはJVM内のオブジェクトの古い世代より大きくてはいけません。デフォルトではヒープの0.6ですが、古い世代のサイズを独自に設定した場合はそれを増やすことができます。
spark.storage.unrollFraction 0.2 (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。メモリ内の展開されないブロックが使う spark.storage.memoryFraction の断片。新しいブロックを完全に展開するために十分なストレージスペースが無い場合に、既存のブロックを削除することで動的に割り当てられます。

Executionの挙動

プロパティ名デフォルト意味
spark.broadcast.blockSize 4m TorrentBroadcastFactoryのためのブロックの各部分のサイズ。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManagerはパフォーマンスの打撃を受けるかも知れません。
spark.executor.cores YARNモードの場合は1、スタンドアローンモードとMesos coarse-grainedモードの場合はワーカー上の全ての利用可能なコア。 各executor上で使用されるコアの数。スタンドアローンモードとMesos coarse-grainedモードの場合は、ワーカー上で十分なコアがあると仮定すると、このパラメータを設定することでアプリケーションが同じワーカー上で複数のexecutorを実行することができます。そうでなければ、アプリケーションごとに1つのexecutorが各ワーカー上で実行されるでしょう。
spark.default.parallelism reduceByKey および joinのような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelizeのような操作については、クラスタマネージャーに依存します:
  • ローカルモード: ローカルマシーン上のコアの数
  • Mesos fine grained mode: 8
  • その他: 全てのexecutorノードのコアの総数あるいは2のどちらか大きいほう
ユーザによって設定されなかった場合は、 join, reduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。
spark.executor.heartbeatInterval 10s ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。
spark.files.fetchTimeout 60s ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。
spark.files.useFetchCache true true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。
spark.files.overwrite false ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。
spark.hadoop.cloneConf false trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。
spark.hadoop.validateOutputSpecs true trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。
spark.storage.memoryMapThreshold 2m ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズ。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。

ネットワーク

プロパティ名デフォルト意味
spark.rpc.message.maxSize 128 "control plane"通信内で許される最大のメッセージ(MB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。RPCメッセージに関するメッセージが表示されるでしょう。
spark.blockManager.port (random) 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。
spark.driver.host (local hostname) ドライバがlistenするホスト名あるいはIPアドレス。executorとスタンドアローンマスターが通信するために使われます。
spark.driver.port (random) ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。
spark.network.timeout 120s 全てのネットワークの相互交流のタイムアウト。この設定は、spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout あるいは spark.rpc.lookupTimeout が設定されていない場合に、それらの代わりに使われるでしょう。
spark.port.maxRetries 16 ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。
spark.rpc.numRetries 3 RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。
spark.rpc.retry.wait 3s RPCのask操作が再試行するまで待つ期間。
spark.rpc.askTimeout 120s RPCのask操作がタイムアウトするまで待つ期間。
spark.rpc.lookupTimeout 120s RPCリモートエンドポイントがタイムアウトするまで待つ時間。

スケジューリング

プロパティ名デフォルト意味
spark.cores.max (not set) <c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores、Mesos上では無制限(利用可能な全てのコア)になります。
spark.locality.wait 3s データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.nodeなどを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。
spark.locality.wait.node spark.locality.wait ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。
spark.locality.wait.process spark.locality.wait プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。
spark.locality.wait.rack spark.locality.wait rack局地性を待つための局地性をカスタマイズします。
spark.scheduler.maxRegisteredResourcesWaitingTime 30s スケジュールが始まる前にリソースが登録するための最大待ち時間。
spark.scheduler.minRegisteredResourcesRatio YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。 スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTimeによって設定されます。
spark.scheduler.mode FIFO 同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。
spark.scheduler.revive.interval 1s スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。
spark.speculation false "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。
spark.speculation.interval 100ms どれだけの頻度でSparkが投機するためにタスクをチェックするか。
spark.speculation.multiplier 1.5 平均より遅いタスクが何回投機と見なされるか。
spark.speculation.quantile 0.75 指定のステージで投機が有効になる前にどれだけのタスクの割合が終了していなければならないか。
spark.task.cpus 1 各タスクごとに割り当てるコアの数。
spark.task.maxFailures 4 ジョブを諦める前のタスクの失敗の数。異なるタスクに渡って広がっている失敗の総数はジョブを失敗させないでしょう; 特定のタスクがこの試行の数を失敗しなければなりません。1以上でなければなりません。許可された再試行の数 = この値 - 1.

動的割り当て

プロパティ名デフォルト意味
spark.dynamicAllocation.enabled false 動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。詳細は ここの説明を見てください。

これには spark.shuffle.service.enabled が設定されることが必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors および spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.executorIdleTimeout 60s 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 動的割り当てが有効な場合、実行するexecutorの初期の数。

もし `--num-executors` (あるいは `spark.executor.instances`) が設定され、この値よりも大きい場合は、executorの初期の数として使われるでしょう。
spark.dynamicAllocation.maxExecutors infinity 動的割り当てが有効な場合のexecutorの数の上限。
spark.dynamicAllocation.minExecutors 0 動的割り当てが有効な場合のexecutorの数の下限。
spark.dynamicAllocation.schedulerBacklogTimeout 1s もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout spark.dynamicAllocation.schedulerBacklogTimeoutと同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は ここの説明を見てください。

セキュリティ

プロパティ名デフォルト意味
spark.acls.enable false Sparkのaclを有効にしなければならないかどうか。有効な場合は、これはユーザがジョブを表示あるいは修正するアクセス権限をもつかどうかを調べます。これはユーザが分かっていなければならず、もしユーザがnullとして渡された場合はなにも調べられないことに注意してください。ユーザを認証および設定するために、UIを使ってフィルターを使うことができます。
spark.admin.acls Empty 全てのSparkジョブを表示および修正アクセスを持つユーザ/管理者のカンマ区切りのリスト。もし共有クラスタ上で実行し、動作する時にデバッグを手伝ってくれる一連の管理者あるいは開発者がいる場合は、これが使われるかも知れません。リストに "*"を入れることは、全てのユーザが管理権限を持つことを意味します。
spark.admin.acls.groups Empty 全てのSparkジョブを表示および修正アクセスを持つグループのカンマ区切りのリスト。基礎となるインフラストラクチャーを維持しデバッグしてくれる一連の管理者あるいは開発者がいる場合は、これが使われるかも知れません。リストに "*"を入れることは、全てのグループの全てのユーザが管理権限を持つことを意味します。ユーザグループはspark.user.groups.mappingによって指定されるグループ マッピング プロバイダのインスタンスから取得されます。詳細は spark.user.groups.mapping のエントリを調べてください。
spark.user.groups.mapping org.apache.spark.security.ShellBasedGroupsMappingProvider ユーザのためのグループのリストは、プロパティによって設定することができるorg.apache.spark.security.GroupMappingServiceProviderによって定義されるグループマッピングサービスによって決定されます。デフォルトのunixシェルに基づいた実装は、ユーザのためのグループのリストを解決するために指定することができるorg.apache.spark.security.ShellBasedGroupsMappingProviderによって提供されます。注意: この実装は Unix/Linux に基づく環境でのみサポートされます。Windows 環境は現在のところサポートされません。しかし、特性 org.apache.spark.security.GroupMappingServiceProviderを実装することで新しいプラットフォーム/プロトコルでサポートされることができます。
spark.authenticate false Sparkが内部接続を認証するかどうか。YARN上で実行しない場合はspark.authenticate.secretを見てください。
spark.authenticate.secret None Sparkがコンポーネント間で認証するために使われる秘密キーを設定します。YARN上で実行しておらず、認証が有効な場合は、これが設定される必要があります。
spark.authenticate.enableSaslEncryption false 認証が有効な場合に暗号化通信を有効にする。これはブロックの転送サービスとRPCエンドポイントによってサポートされます。
spark.network.sasl.serverAlwaysEncrypt false SASL認証をサポートするサービスに対して非暗号化通信を無効にする。これは現在のところ外部シャッフルサービスでのみサポートされています。
spark.core.connection.ack.wait.timeout 60s 接続がタイムアウトして諦めるまでにどれだけ長くackを待つか。GCのような長い休止で不本意なタイムアウトを避けるために、大きな値を設定することができます。
spark.core.connection.auth.wait.timeout 30s 接続がタイムアウトして諦めるまでにどれだけ長く認証を待つか。
spark.modify.acls Empty Sparkジョブへの修正アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザがそれの修正(例えばkill)アクセスを持ちます。リストに "*"を入れることは、全てのユーザが修正する権限を持つことを意味します。
spark.modify.acls.groups Empty 全てのSparkジョブを表示および修正アクセスを持つグループのカンマ区切りのリスト。同じチームからジョブを制御するためにアクセスする一連の管理者あるいは開発者がいる場合にはこれが使われるかも知れません。リスト内への"*"の配置は全てのグループ内の全てのユーザがSparkのジョブを編集する権限を持つことを意味します。ユーザグループはspark.user.groups.mappingによって指定されるグループ マッピング プロバイダのインスタンスから取得されます。詳細は spark.user.groups.mapping のエントリを調べてください。
spark.ui.filters None Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。各フィルターへのパラメータは以下のようなjavaシステムプロパティの設定により指定することもできます:
spark.<フィルター名>.params='param1=value1,param2=value2'
例えば:
-Dspark.ui.filters=com.test.filter1
-Dspark.com.test.filter1.params='param1=foo,param2=testing'
spark.ui.view.acls Empty Spark web uiへの表示アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザのみが表示アクセスを持ちます。リストに "*"を入れることは、全てのユーザがこのSparkジョブを見る権限を持つことを意味します。
spark.ui.view.acls.groups Empty Sparkのジョブの詳細を見るためにSpark web ui への view権限を持つカンマで区切られたグループのリスト。サブミットされたSparkジョブを監視することができる一連の管理者あるいは開発者がいる場合はこれが使われるかも知れません。リスト内への"*"の配置は全てのグループ内の全てのユーザがSpark web ui上のSparkジョブの詳細を見ることができる権限を持つことを意味します。ユーザグループはspark.user.groups.mappingによって指定されるグループ マッピング プロバイダのインスタンスから取得されます。詳細は spark.user.groups.mapping のエントリを調べてください。

暗号化

プロパティ名デフォルト意味
spark.ssl.enabled false

全てのサポートされたプロトコル上でSSL接続を有効にするかどうか。

spark.ssl.xxxのような全てのSSL設定、xxx は特定の設定プロパティ、は、全てのサポートされたプロトコルのためのグローバル設定を意味します。特定のプロトコルのためにグローバル設定を上書きするためには、そのプロパティがプロトコル固有の名前空間で上書きされなければなりません。

YYYで表される特定のプロトコルのためのグローバル設定を上書くためにspark.ssl.YYY.XXX 設定を使ってください。YYY のための例の値として fs, ui, standalone および historyServer を含みます。サービスのための階層的なSSL設定の詳細については、SSL 設定 を見てください。

spark.ssl.enabledAlgorithms Empty cipherのカンマ区切りのリスト。指定されたcipherはJVMによってサポートされていなければなりません。プロトコルの参照リストは この ページで見つけることができます。注意: 設定されていない場合、JVMのデフォルトのcipherスイーツを使うでしょう。
spark.ssl.keyPassword None キーストア内のプライベートキーのパスワード。
spark.ssl.keyStore None キーストアファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。
spark.ssl.keyStorePassword None キーストアへのパスワード。
spark.ssl.keyStoreType JKS キー ストアの種類
spark.ssl.protocol None プロトコル名。プロトコルはJVMによってサポートされていなければなりません。The reference list of protocols one can find on this page.
spark.ssl.needClientAuth false SSLがクライアント認証を必要とする場合はtrueを設定します。
spark.ssl.trustStore None trust-stoerファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。
spark.ssl.trustStorePassword None trust-store のパスワード。
spark.ssl.trustStoreType JKS trust-storeの種類

Spark SQL

SET -v コマンドの実行は、SQL設定の完全なリストを表示するでしょう。

// spark is an existing SparkSession
spark.sql("SET -v").show(numRows = 200, truncate = false)
// spark is an existing SparkSession
spark.sql("SET -v").show(200, false);
# spark is an existing SparkSession
spark.sql("SET -v").show(n=200, truncate=False)
sparkR.session()
properties <- sql("SET -v")
showDF(properties, numRows = 200, truncate = FALSE)

Spark ストリーミング

プロパティ名デフォルト意味
spark.streaming.backpressure.enabled false Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRateおよびspark.streaming.kafka.maxRatePerPartitionが設定されている場合に、この値で上限を制限されます (以下を見てください)。
spark.streaming.backpressure.initialRate not set これはバックプレッシャー機構が有効な場合に最初のバッチのために各レシーバーがデータを受信する初期の最大受信レートです。
spark.streaming.blockInterval 200ms Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。詳細はSpark ストリーミング プログラミング ガイドのパフォーマンス チューニングの章を見てください。
spark.streaming.receiver.maxRate not set 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。
spark.streaming.receiver.writeAheadLog.enable false レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。
spark.streaming.unpersist true Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。
spark.streaming.stopGracefullyOnShutdown false trueの場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。
spark.streaming.kafka.maxRatePerPartition not set 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。
spark.streaming.kafka.maxRetries 1 ドライバーが各パーティションのリーダー上で最新のオフセットを見つけるために作成する連続する試行の最大数(デフォルト値1はドライバーが最大2の試行をすることを意味します)。stream APIを差している新しいKafkaにのみ適用されます。
spark.streaming.ui.retainedBatches 1000 ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false ドライバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。ドライバー上のメタデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false レシーバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。レシーバー上のデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。

SparkR

プロパティ名デフォルト意味
spark.r.numRBackendThreads 2 SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。
spark.r.command Rscript ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。
spark.r.driver.command spark.r.command ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。

配備

プロパティ名デフォルト意味
spark.deploy.recoveryMode NONE クラスターモードでサブミットされたSparkジョブが失敗し再起動する場合に、回復の設定をするリカバリモード。スタンダードあるいはMesosで動いている場合にクラスタモードでのみ適用可能です。
spark.deploy.zookeeper.url None `spark.deploy.recoveryMode` が ZOOKEEPER に設定されている場合は、この設定は接続するためのzookeeperのURLに設定するために使われます。
spark.deploy.zookeeper.dir None `spark.deploy.recoveryMode` がZOOKEEPERに設定されている場合は、この設定はリカバリー状態を保持するためのzookeeperディレクトリを設定するために使われます。

クラスタマネージャー

Sparkの各クラスタマネージャーは追加の設定オプションを持ちます。各ノードのためのページ上で設定を見つけることができます。

YARN
Mesos
スタンドアローンモード

環境変数

あるSpark設定は環境変数によって設定することができ、それらはSparkがインストールされたディレクトリ内のconf/spark-env.shスクリプトによって使われます(Windows上ではconf/spark-env.cmd)。スタンドアローンおよびMesosモードでは、このファイルはホスト名のようなマシーン固有の情報を渡すことができるでしょう。ローカルのアプリケーションの実行あるいはスクリプトのサブミットの場合、それは開始場所にもなります。

conf/spark-env.sh はSparkがインストールされた場合はデフォルトでは存在しないことに注意してください。しかし、それを作成するためにconf/spark-env.sh.template をコピーすることができます。コピーを実行可能にすることを忘れないでください。

以下の変数はspark-env.shの中で設定することができます:

環境変数意味
JAVA_HOME Javaがインストールされた場所(デフォルトのPATH上に無い場合)。
PYSPARK_PYTHON PySparkのためにドライバおよびワーカーの両方の中で使うPythonの実行可能バイナリ(デフォルトは、利用可能であればpython2.7。そうでなければpythonです)。
PYSPARK_DRIVER_PYTHON PySparkのためにドライバの中でのみ使うPythonの実行可能バイナリ(デフォルトはPYSPARK_PYTHONです)。
SPARKR_DRIVER_R SparkRシェルのために使われるRバイナリ実行ファイル(デフォルトは R)。
SPARK_LOCAL_IP バインドするマシーンのIPアドレス。
SPARK_PUBLIC_DNS Sparkプログラムが他のマシーンに知らせるホスト名。

上に加えて、各マシーンで使うコアの数や最大メモリのような、Sparkスタンドアローン クラスタ スクリプトを設定するためのオプションもあります。

spark-env.shはシェルスクリプトなので、それらのいくつかはプログラム的に設定することができます - 例えば、特定のネットワークインタフェースのIPを調べることでSPARK_LOCAL_IPを計算することができるかもしれません。

注意: cluster モードでYARN上のSparkを実行する場合は、 conf/spark-defaults.conf ファイル内のspark.yarn.appMasterEnv.[EnvironmentVariableName] を使って環境変数が設定される必要があります。spark-env.shに設定されている環境変数は、clusterモードのYARN マスタープロセス内には反映されないでしょう。詳細についてはYARNに関係する Spark プロパティ を見てください。

ログの設定

Sparkはログのためにlog4jを使います。confディレクトリに log4j.properties ファイルを追加することで設定することができます。開始する一つの方法は、そこに存在する既存のlog4j.properties.template をコピーすることです。

設定ディレクトリの上書き

デフォルトの"SPARK_HOME/conf"以外に異なる設定ディレクトリを指定するために、SPARK_CONF_DIRを設定することができます。Sparkはこのディレクトリから設定ファイル(spark-defaults.conf, spark-env.sh, log4j.properties など)を使用するでしょう。

Hadoopクラスタ設定の継承

Sparkを使ってHDFSから読み込みをするつもりであれば、Sparkのクラスパス上に含まれるべき二つのHadoop設定ファイルがあります。

これらの設定ファイルの場所はCDHおよびHDPバージョンによって変わりますが、一般的な場所は/etc/hadoop/confの中です。Cloudera マネージャのような幾つかのツールはその場で設定を生成しますが、それらのコピーをダウンロードするための仕組みを提供します。

それらのファイルがSparkに見えるようにするためには、$SPARK_HOME/spark-env.sh内のHADOOP_CONF_DIRを設定ファイルを含む場所に設定します。

TOP
inserted by FC2 system