Sparkの設定
Sparkはシステムを設定するための3つの場所を提供します:
- Spark プロパティ はほとんどのアプリケーションのパラメータを制御し、SparkConfオブジェクトあるいはJavaシステムプロパティを使って設定することができます。
- 環境変数は各ノード上で
conf/spark-env.sh
スクリプトを使ってIPアドレスのようなマシンごとの設定を設定するために使うことができます。 - ログは
log4j.properties
を使って設定することができます。
Sparkのプロパティ
Spark プロパティはほとんどのアプリケーションの設定を制御し、各アプリケーションに対して別々に設定されます。これらのプロパティは SparkContext
に渡して直接SparkConf に設定することができます。SparkConf
を使って共通プロパティの幾つかを設定(例えば、マスターURLおよびアプリケーション名)することができ、set()
メソッドを使って任意のキー値ペアを設定することができます。例えば、以下のように2つのスレッドを使ってアプリケーションを初期化することができます:
local[2]をつけて実行、2つのスレッド - "最小限"の並列処理、することにより、分散コンテキストで実行した場合にのみ存在するバグを検出するのに役立つことに注意してください。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
ローカルモードで1つ以上のスレッドを持つことができ、実際にSpark Streamingのような場合にはリソース不足を避けるために1つ以上のスレッドを要求するかも知れないということに注意してください。
なんらかの期間を指定するプロパティは時間の単位で設定されなければなりません。以下の書式が受け付けられます:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
バイトサイズを指定するプロパティはサイズの単位を使って設定されなければなりません。以下の書式が受け付けられます:
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
Sparkプロパティの動的なロード
時には、ある設定をSparkConf
にハードコーディングしたくないかも知れません。例えば、もし同じアプリケーションを異なるマスターあるいは異なるメモリ量で実行したいなど。Sparkは空のconfを単純に作成することができます:
val sc = new SparkContext(new SparkConf())
そうすると、実行時に設定値を提供することができます:
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark シェルおよび spark-submit
ツールは動的に設定をロードする2つの方法を提供します。最初の一つは、上で説明したように、--master
のようなコマンドラインオプションです。spark-submit
は--conf
フラグを使ってどのようなSpark プロパティも受け付けることができますが、Sparkアプリケーションを起動する時に役割があるプロパティのために特別なフラグを使うようにしてください。./bin/spark-submit --help
を実行すると、これらのオプションの完全なリストが表示されるでしょう。
bin/spark-submit
は設定オプションを conf/spark-defaults.conf
からも読み込むでしょう。各行はホワイトスペースで区切られたキーと値です。例えば:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
フラグで指定された、あるいはプロパティファイルの全ての値はアプリケーションに渡され、SparkConfを通じてマージされるでしょう。SparkConfで直接設定されたプロパティは優先度が高く、その次はspark-submit
あるいは spark-shell
に渡されたもので、その次はspark-defaults.conf
ファイル内のオプションです。以前のSparkのバージョンから2,3の設定キーの名前が変更されました; そのような場合、古いキーの名前はまだ受け付けられますが、新しいキーのどのインスタンスよりも優先度が低くなります。
Sparkプロパティのビュー
http://<driver>:4040
のアプリケーションUIは、"Environment"タブの中でSparkプロパティをリスト表示します。ここはプロパティが正しく設定されたか確認するのに便利な場所です。spark-defaults.conf
, SparkConf
, あるいはコマンドラインで明示的に指定された値だけが表示されるだろうことに注意してください。他の全ての設定プロパティについては、デフォルトの値が使われると見なすことができます。
利用可能なプロパティ
内部設定を制御するほとんどのプロパティは意味のあるデフォルト値を持ちます。最も一般的なオプションの幾つかは以下のように設定されます。
アプリケーションのプロパティ
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.app.name |
(none) | アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。 |
spark.driver.cores |
1 | ドライバープロセスのために使われるコアの数。クラスターモードのみです。 |
spark.driver.maxResultSize |
1g | 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズの制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。 |
spark.driver.memory |
1g |
ドライバープロセスのために使われるメモリの量。例えば SparkContextが初期化される場所。(例えば、1g , 2g ). 注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接 SparkConf を使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
spark.executor.memory |
1g |
executorプロセスあたりに使用するメモリ量(例えば2g , 8g )。
|
spark.extraListeners |
(none) |
SparkListener を実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。
|
spark.local.dir |
/tmp | Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。注意: Spark 1.0 以降では、これはクラスタマネージャーによって、SPARK_LOCAL_DIRS (スタンドアローン, Mesos) あるいは LOCAL_DIRS (YARN) 環境変数で上書きされます。 |
spark.logConf |
false | SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。 |
spark.master |
(none) | 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。 |
spark.submit.deployMode |
(none) | Sparkドライバプログラムの配備モード、"client"あるいは"cluster"、これはドライバープログラムをローカル("client")あるいはクラスタ内のノードの1つの上で遠隔で("cluster")起動することを意味します。 |
これとは別に、以下のプロパティも利用可能で、ある状況では有用かも知れません。
ランタイム環境
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.driver.extraClassPath |
(none) |
ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。 注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接 SparkConf を使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
spark.driver.extraJavaOptions |
(none) |
ドライバに渡す特別なJVMオプションの文字列。例えば、GC設定あるいは他のログ。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は、クラスタモードではspark.driver.memory を、クライアントモードでは--driver-memory コマンドラインオプションを使って設定することができます。注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接 SparkConf を使って設定するべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
spark.driver.extraLibraryPath |
(none) |
ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。 注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接 SparkConf を使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
spark.driver.userClassPathFirst |
false | (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。 |
spark.executor.extraClassPath |
(none) | executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。 |
spark.executor.extraJavaOptions |
(none) | executorに渡すための特別なJVMオプションの文字列例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいは最大のヒープサイズ(-Xmx)の設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。最大のヒープサイズの設定はspark.executor.memoryを使って設定することができます。 |
spark.executor.extraLibraryPath |
(none) | executor JVMを起動する場合に使う特別なライブラリパスを設定する。 |
spark.executor.logs.rolling.maxRetainedFiles |
(none) | システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。 |
spark.executor.logs.rolling.enableCompression |
false | executor ログの圧縮を有効にします。有効な場合、ロールされたexecutorログは圧縮されるでしょう。デフォルトは無効です。 |
spark.executor.logs.rolling.maxSize |
(none) |
executorのログがロールオーバーされる最大のファイルサイズをバイトで設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。
|
spark.executor.logs.rolling.strategy |
(none) |
executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するために>spark.executor.logs.rolling.maxSize を使用します。
|
spark.executor.logs.rolling.time.interval |
daily |
executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdaily , hourly , minutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFiles を見てください。
|
spark.executor.userClassPathFirst |
false |
(実験的なもの) spark.driver.userClassPathFirst と同じ機能ですが、executorインスタンスに適用されます。
|
spark.executorEnv.[EnvironmentVariableName] |
(none) |
EnvironmentVariableName によって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。
|
spark.python.profile |
false |
Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles() によって表示されるか、ドライバーが終了する前に表示されるでしょう。sc.dump_profiles(path) によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。
|
spark.python.profile.dump |
(none) | ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはptats.Stats()によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。 |
spark.python.worker.memory |
512m |
集約の間にpythonワーカープロセスごとに使用されるメモリの量。JVMメモリ文字列と同じフォーマット(例えば512m , 2g )。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。
|
spark.python.worker.reuse |
true | Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。 |
spark.files |
各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。 | |
spark.submit.pyFiles |
PythonアプリのためのPYTHONPATH上に配置されるカンマ区切りの .zip, .egg あるいは .py ファイル。 | |
spark.jars |
ドライバーとexecutorのクラスパス上でインクルードされるローカルjarのカンマ区切りのリスト | |
spark.jars.packages |
ドライバーとexecutorのクラスパス上でインクルードされるmaven coordinateのjarのカンマ区切りのリストローカルのmavenリポジトリ、次にmaven central、そしてspark.jars.ivy で与えられた追加のリモートリポジトリを検索するでしょう。調整のためのフォーマットは groupId:artifactId:version でなければなりません。
|
|
spark.jars.excludes |
spark.jars.packages で与えらえた依存を解決する間に依存性の衝突を避けるために除外する、groupId:artifactId のカンマ区切りのリスト。
|
|
spark.jars.ivy |
spark.jars.packages を使って与えられるcoordinateのために検索される、カンマ区切りの追加のリモートリポジトリ。
|
シャッフルの挙動
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.reducer.maxSizeInFlight |
48m | 各reduceタスクから同時に取り出すmap出力の最大サイズ。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。 |
spark.reducer.maxReqsInFlight |
Int.MaxValue | この設定はリモートリクエストの数を指定された値までブロックの検索を制限します。クラスタ内のホストの数が増えた場合は、1つ以上のノードからやってくる接続の数が大量になり、ワーカーが負荷のために落ちるかも知れません。取得リクエストの数を制限できるようにすることで、このシナリオは緩和されるかも知れません。 |
spark.shuffle.compress |
true |
map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codec を使うでしょう。
|
spark.shuffle.file.buffer |
32k | 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。 |
spark.shuffle.io.maxRetries |
3 | (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。 |
spark.shuffle.io.preferDirectBufs |
true | (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 |
spark.shuffle.io.retryWait |
5s |
(Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWait として計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。
|
spark.shuffle.service.enabled |
false |
外部のシャッフルサービスを有効にします。このサービスはexecutorが安全に削除されるようにexecutorによって書き込まれるシャッフルファイルを保持します。もしspark.dynamicAllocation.enabled が"true"であれば、これは有効でなければなりません。外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentation を見てください。
|
spark.shuffle.service.port |
7337 | 外部のシャッフルサービスが実行されるだろうポート。 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。 |
spark.shuffle.spill.compress |
true |
シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codec を使うでしょう。
|
Spark UI
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.eventLog.compress |
false |
もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。
|
spark.eventLog.dir |
file:///tmp/spark-events |
spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。
|
spark.eventLog.enabled |
false | アプリケーションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。 |
spark.ui.killEnabled |
true | web uiからステージおよび対応するジョブをkillすることを許可する。 |
spark.ui.port |
4040 | 芽折および作業データを表示する、アプリケーションのダッシュボードのポート。 |
spark.ui.retainedJobs |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。 |
spark.ui.retainedStages |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。 |
spark.ui.retainedTasks |
100000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのタスクを記憶するか。 |
spark.worker.ui.retainedExecutors |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。 |
spark.worker.ui.retainedDrivers |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。 |
spark.sql.ui.retainedExecutions |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。 |
spark.streaming.ui.retainedBatches |
1000 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。 |
spark.ui.retainedDeadExecutors |
100 | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのdeadのexecutorを記憶するか。 |
圧縮および直列化
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.broadcast.compress |
true | ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。 |
spark.io.compression.codec |
lz4 |
RDDパーティション、ブロードキャスト変数およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは3つのコーディックを提供します: lz4 , lzf およびsnappy . コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec , org.apache.spark.io.LZFCompressionCodec および org.apache.spark.io.SnappyCompressionCodec .
|
spark.io.compression.lz4.blockSize |
32k | LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。 |
spark.io.compression.snappy.blockSize |
32k | Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。 |
spark.kryo.classesToRegister |
(none) | Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。 |
spark.kryo.referenceTracking |
true (Spark SQL Thrift サーバを使う場合は false) | Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。 |
spark.kryo.registrationRequired |
false | Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。 |
spark.kryo.registrator |
(none) |
Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにカンマ区切りのクラスのリストを渡してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。 KryoRegistrator を拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。
|
spark.kryoserializer.buffer.max |
64m | kryoのシリアライズバッファの最大許容サイズ。これはシリアライズしようと思っているどのオブジェクトよりも大きくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。 |
spark.kryoserializer.buffer |
64k |
Kryoのシリアライズバッファの初期サイズ。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。
|
spark.rdd.compress |
false |
シリアライズされたパーティションを圧縮するかどうか(例えば、JavaとScalaでのStorageLevel.MEMORY_ONLY_SER 、あるいは PythonでのStorageLevel.MEMORY_ONLY )。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。
|
spark.serializer |
org.apache.spark.serializer. JavaSerializer (org.apache.spark.serializer. Spark SQL Thriftサーバを使う場合のKryoSerializer) |
ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializer のどのようなサブクラスにもなることができます。
|
spark.serializer.objectStreamReset |
100 | org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。 |
メモリ管理
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.memory.fraction |
0.6 | 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。この設定の目的は内部メタデータ、ユーザデータ構造、およびまばらな場合の不正確なサイズの見積もりのためにメモリを取って置くことで、非常に大きなレコードです。これはデフォルトの値にしておくことをお勧めします。この値を増加した場合の正しいJVMガベージコレクションの調整についての重要な情報を含む詳細は、この説明を見てください。 |
spark.memory.storageFraction |
0.5 |
立ち退きに耐性のあるストレージメモリの量。spark.memory.fraction によって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。
|
spark.memory.offHeap.enabled |
false |
trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。
|
spark.memory.offHeap.size |
0 |
オフヒープ割り当てのために使うことができる絶対メモリ量のバイト数。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=true の場合は、これは正の値に設定されなければなりません。
|
spark.memory.useLegacyMode |
false |
Spark 1.5以前で使われていた古いメモリ管理モードを有効にするかどうか。以前のモードではヒープ領域を頑なに固定サイズの領域に分割します。アプリケーションがチューニングされていない場合は過度にこぼれることになります。これが有効にされない場合、以下の非推奨のメモリー断片設定は読み込まれません: spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction
|
spark.shuffle.memoryFraction |
0.2 |
(非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。シャッフルの間に集約および集団を作るために使われるJavaヒープの断片。いつでも、シャッフルに使われる全てのインメモリのマップの全体のサイズはこの制限によって束縛されます。これを超えた内容なディスクにこぼれ始めるでしょう。流し込みが頻繁に起きる場合は、spark.storage.memoryFraction を犠牲にしてこの値を増やすことを考えます。
|
spark.storage.memoryFraction |
0.6 |
(非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。Sparkのメモリキャッシュによって使用されるJava ヒープの断片。これはJVM内のオブジェクトの古い世代より大きくてはいけません。デフォルトではヒープの0.6ですが、古い世代のサイズを独自に設定した場合はそれを増やすことができます。
|
spark.storage.unrollFraction |
0.2 |
(非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。メモリ内の展開されないブロックが使う spark.storage.memoryFraction の断片。新しいブロックを完全に展開するために十分なストレージスペースが無い場合に、既存のブロックを削除することで動的に割り当てられます。
|
Executionの挙動
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.broadcast.blockSize |
4m |
TorrentBroadcastFactory のためのブロックの各部分のサイズ。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManager はパフォーマンスの打撃を受けるかも知れません。
|
spark.executor.cores |
YARNモードの場合は1、スタンドアローンモードとMesos coarse-grainedモードの場合はワーカー上の全ての利用可能なコア。 | 各executor上で使用されるコアの数。スタンドアローンモードとMesos coarse-grainedモードの場合は、ワーカー上で十分なコアがあると仮定すると、このパラメータを設定することでアプリケーションが同じワーカー上で複数のexecutorを実行することができます。そうでなければ、アプリケーションごとに1つのexecutorが各ワーカー上で実行されるでしょう。 |
spark.default.parallelism |
reduceByKey および join のような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelize のような操作については、クラスタマネージャーに依存します:
|
ユーザによって設定されなかった場合は、 join , reduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。
|
spark.executor.heartbeatInterval |
10s | ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。 |
spark.files.fetchTimeout |
60s | ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。 |
spark.files.useFetchCache |
true | true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。 |
spark.files.overwrite |
false | ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。 |
spark.hadoop.cloneConf |
false | trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定 スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。 |
spark.hadoop.validateOutputSpecs |
true | trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。 |
spark.storage.memoryMapThreshold |
2m | ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズ。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。 |
ネットワーク
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.rpc.message.maxSize |
128 | "control plane"通信内で許される最大のメッセージ(MB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。RPCメッセージに関するメッセージが表示されるでしょう。 |
spark.blockManager.port |
(random) | 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。 |
spark.driver.host |
(local hostname) | ドライバがlistenするホスト名あるいはIPアドレス。executorとスタンドアローンマスターが通信するために使われます。 |
spark.driver.port |
(random) | ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。 |
spark.network.timeout |
120s |
全てのネットワークの相互交流のタイムアウト。この設定は、spark.core.connection.ack.wait.timeout , spark.storage.blockManagerSlaveTimeoutMs , spark.shuffle.io.connectionTimeout , spark.rpc.askTimeout あるいは spark.rpc.lookupTimeout が設定されていない場合に、それらの代わりに使われるでしょう。
|
spark.port.maxRetries |
16 | ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。 |
spark.rpc.numRetries |
3 | RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。 |
spark.rpc.retry.wait |
3s | RPCのask操作が再試行するまで待つ期間。 |
spark.rpc.askTimeout |
120s | RPCのask操作がタイムアウトするまで待つ期間。 |
spark.rpc.lookupTimeout |
120s | RPCリモートエンドポイントがタイムアウトするまで待つ時間。 |
スケジューリング
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.cores.max |
(not set) |
<c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores 、Mesos上では無制限(利用可能な全てのコア)になります。
|
spark.locality.wait |
3s |
データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.node などを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。
|
spark.locality.wait.node |
spark.locality.wait | ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。 |
spark.locality.wait.process |
spark.locality.wait | プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。 |
spark.locality.wait.rack |
spark.locality.wait | rack局地性を待つための局地性をカスタマイズします。 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30s | スケジュールが始まる前にリソースが登録するための最大待ち時間。 |
spark.scheduler.minRegisteredResourcesRatio |
YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。 |
スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTime によって設定されます。
|
spark.scheduler.mode |
FIFO |
同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。
|
spark.scheduler.revive.interval |
1s | スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。 |
spark.speculation |
false | "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。 |
spark.speculation.interval |
100ms | どれだけの頻度でSparkが投機するためにタスクをチェックするか。 |
spark.speculation.multiplier |
1.5 | 平均より遅いタスクが何回投機と見なされるか。 |
spark.speculation.quantile |
0.75 | 指定のステージで投機が有効になる前にどれだけのタスクの割合が終了していなければならないか。 |
spark.task.cpus |
1 | 各タスクごとに割り当てるコアの数。 |
spark.task.maxFailures |
4 | ジョブを諦める前のタスクの失敗の数。異なるタスクに渡って広がっている失敗の総数はジョブを失敗させないでしょう; 特定のタスクがこの試行の数を失敗しなければなりません。1以上でなければなりません。許可された再試行の数 = この値 - 1. |
動的割り当て
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.dynamicAllocation.enabled |
false |
動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。詳細は ここの説明を見てください。 これには spark.shuffle.service.enabled が設定されることが必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors , spark.dynamicAllocation.maxExecutors および spark.dynamicAllocation.initialExecutors
|
spark.dynamicAllocation.executorIdleTimeout |
60s | 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
動的割り当てが有効な場合、実行するexecutorの初期の数。 もし `--num-executors` (あるいは `spark.executor.instances`) が設定され、この値よりも大きい場合は、executorの初期の数として使われるでしょう。 |
spark.dynamicAllocation.maxExecutors |
infinity | 動的割り当てが有効な場合のexecutorの数の上限。 |
spark.dynamicAllocation.minExecutors |
0 | 動的割り当てが有効な場合のexecutorの数の下限。 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
spark.dynamicAllocation.schedulerBacklogTimeout と同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は ここの説明を見てください。
|
セキュリティ
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.acls.enable |
false | Sparkのaclを有効にしなければならないかどうか。有効な場合は、これはユーザがジョブを表示あるいは修正するアクセス権限をもつかどうかを調べます。これはユーザが分かっていなければならず、もしユーザがnullとして渡された場合はなにも調べられないことに注意してください。ユーザを認証および設定するために、UIを使ってフィルターを使うことができます。 |
spark.admin.acls |
Empty | 全てのSparkジョブを表示および修正アクセスを持つユーザ/管理者のカンマ区切りのリスト。もし共有クラスタ上で実行し、動作する時にデバッグを手伝ってくれる一連の管理者あるいは開発者がいる場合は、これが使われるかも知れません。リストに "*"を入れることは、全てのユーザが管理権限を持つことを意味します。 |
spark.admin.acls.groups |
Empty |
全てのSparkジョブを表示および修正アクセスを持つグループのカンマ区切りのリスト。基礎となるインフラストラクチャーを維持しデバッグしてくれる一連の管理者あるいは開発者がいる場合は、これが使われるかも知れません。リストに "*"を入れることは、全てのグループの全てのユーザが管理権限を持つことを意味します。ユーザグループはspark.user.groups.mapping によって指定されるグループ マッピング プロバイダのインスタンスから取得されます。詳細は spark.user.groups.mapping のエントリを調べてください。
|
spark.user.groups.mapping |
org.apache.spark.security.ShellBasedGroupsMappingProvider |
ユーザのためのグループのリストは、プロパティによって設定することができるorg.apache.spark.security.GroupMappingServiceProviderによって定義されるグループマッピングサービスによって決定されます。デフォルトのunixシェルに基づいた実装は、ユーザのためのグループのリストを解決するために指定することができるorg.apache.spark.security.ShellBasedGroupsMappingProvider によって提供されます。注意: この実装は Unix/Linux に基づく環境でのみサポートされます。Windows 環境は現在のところサポートされません。しかし、特性 org.apache.spark.security.GroupMappingServiceProvider を実装することで新しいプラットフォーム/プロトコルでサポートされることができます。
|
spark.authenticate |
false |
Sparkが内部接続を認証するかどうか。YARN上で実行しない場合はspark.authenticate.secret を見てください。
|
spark.authenticate.secret |
None | Sparkがコンポーネント間で認証するために使われる秘密キーを設定します。YARN上で実行しておらず、認証が有効な場合は、これが設定される必要があります。 |
spark.authenticate.enableSaslEncryption |
false | 認証が有効な場合に暗号化通信を有効にする。これはブロックの転送サービスとRPCエンドポイントによってサポートされます。 |
spark.network.sasl.serverAlwaysEncrypt |
false | SASL認証をサポートするサービスに対して非暗号化通信を無効にする。これは現在のところ外部シャッフルサービスでのみサポートされています。 |
spark.core.connection.ack.wait.timeout |
60s | 接続がタイムアウトして諦めるまでにどれだけ長くackを待つか。GCのような長い休止で不本意なタイムアウトを避けるために、大きな値を設定することができます。 |
spark.core.connection.auth.wait.timeout |
30s | 接続がタイムアウトして諦めるまでにどれだけ長く認証を待つか。 |
spark.modify.acls |
Empty | Sparkジョブへの修正アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザがそれの修正(例えばkill)アクセスを持ちます。リストに "*"を入れることは、全てのユーザが修正する権限を持つことを意味します。 |
spark.modify.acls.groups |
Empty |
全てのSparkジョブを表示および修正アクセスを持つグループのカンマ区切りのリスト。同じチームからジョブを制御するためにアクセスする一連の管理者あるいは開発者がいる場合にはこれが使われるかも知れません。リスト内への"*"の配置は全てのグループ内の全てのユーザがSparkのジョブを編集する権限を持つことを意味します。ユーザグループはspark.user.groups.mapping によって指定されるグループ マッピング プロバイダのインスタンスから取得されます。詳細は spark.user.groups.mapping のエントリを調べてください。
|
spark.ui.filters |
None |
Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。各フィルターへのパラメータは以下のようなjavaシステムプロパティの設定により指定することもできます:spark.<フィルター名>.params='param1=value1,param2=value2' 例えば: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing'
|
spark.ui.view.acls |
Empty | Spark web uiへの表示アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザのみが表示アクセスを持ちます。リストに "*"を入れることは、全てのユーザがこのSparkジョブを見る権限を持つことを意味します。 |
spark.ui.view.acls.groups |
Empty |
Sparkのジョブの詳細を見るためにSpark web ui への view権限を持つカンマで区切られたグループのリスト。サブミットされたSparkジョブを監視することができる一連の管理者あるいは開発者がいる場合はこれが使われるかも知れません。リスト内への"*"の配置は全てのグループ内の全てのユーザがSpark web ui上のSparkジョブの詳細を見ることができる権限を持つことを意味します。ユーザグループはspark.user.groups.mapping によって指定されるグループ マッピング プロバイダのインスタンスから取得されます。詳細は spark.user.groups.mapping のエントリを調べてください。
|
暗号化
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.ssl.enabled |
false |
全てのサポートされたプロトコル上でSSL接続を有効にするかどうか。
|
spark.ssl.enabledAlgorithms |
Empty | cipherのカンマ区切りのリスト。指定されたcipherはJVMによってサポートされていなければなりません。プロトコルの参照リストは この ページで見つけることができます。注意: 設定されていない場合、JVMのデフォルトのcipherスイーツを使うでしょう。 |
spark.ssl.keyPassword |
None | キーストア内のプライベートキーのパスワード。 |
spark.ssl.keyStore |
None | キーストアファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。 |
spark.ssl.keyStorePassword |
None | キーストアへのパスワード。 |
spark.ssl.keyStoreType |
JKS | キー ストアの種類 |
spark.ssl.protocol |
None | プロトコル名。プロトコルはJVMによってサポートされていなければなりません。The reference list of protocols one can find on this page. |
spark.ssl.needClientAuth |
false | SSLがクライアント認証を必要とする場合はtrueを設定します。 |
spark.ssl.trustStore |
None | trust-stoerファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。 |
spark.ssl.trustStorePassword |
None | trust-store のパスワード。 |
spark.ssl.trustStoreType |
JKS | trust-storeの種類 |
Spark SQL
SET -v
コマンドの実行は、SQL設定の完全なリストを表示するでしょう。
// spark is an existing SparkSession
spark.sql("SET -v").show(numRows = 200, truncate = false)
// spark is an existing SparkSession
spark.sql("SET -v").show(200, false);
# spark is an existing SparkSession
spark.sql("SET -v").show(n=200, truncate=False)
sparkR.session()
properties <- sql("SET -v")
showDF(properties, numRows = 200, truncate = FALSE)
Spark ストリーミング
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.streaming.backpressure.enabled |
false |
Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRate およびspark.streaming.kafka.maxRatePerPartition が設定されている場合に、この値で上限を制限されます (以下を見てください)。
|
spark.streaming.backpressure.initialRate |
not set | これはバックプレッシャー機構が有効な場合に最初のバッチのために各レシーバーがデータを受信する初期の最大受信レートです。 |
spark.streaming.blockInterval |
200ms | Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。詳細はSpark ストリーミング プログラミング ガイドのパフォーマンス チューニングの章を見てください。 |
spark.streaming.receiver.maxRate |
not set | 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。 |
spark.streaming.receiver.writeAheadLog.enable |
false | レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。 |
spark.streaming.unpersist |
true | Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。 |
spark.streaming.stopGracefullyOnShutdown |
false |
true の場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。
|
spark.streaming.kafka.maxRatePerPartition |
not set | 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。 |
spark.streaming.kafka.maxRetries |
1 | ドライバーが各パーティションのリーダー上で最新のオフセットを見つけるために作成する連続する試行の最大数(デフォルト値1はドライバーが最大2の試行をすることを意味します)。stream APIを差している新しいKafkaにのみ適用されます。 |
spark.streaming.ui.retainedBatches |
1000 | ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。 |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite |
false | ドライバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。ドライバー上のメタデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite |
false | レシーバー上で先行書き込みログを書き込んだ後でファイルを閉じるかどうか。レシーバー上のデータ WALのために、S3(あるいはフラッシュをサポートしないファイルシステム)を使いたい場合は 'true' に設定します。 |
SparkR
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.r.numRBackendThreads |
2 | SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。 |
spark.r.command |
Rscript | ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。 |
spark.r.driver.command |
spark.r.command | ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。 |
配備
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.deploy.recoveryMode |
NONE | クラスターモードでサブミットされたSparkジョブが失敗し再起動する場合に、回復の設定をするリカバリモード。スタンダードあるいはMesosで動いている場合にクラスタモードでのみ適用可能です。 |
spark.deploy.zookeeper.url |
None | `spark.deploy.recoveryMode` が ZOOKEEPER に設定されている場合は、この設定は接続するためのzookeeperのURLに設定するために使われます。 |
spark.deploy.zookeeper.dir |
None | `spark.deploy.recoveryMode` がZOOKEEPERに設定されている場合は、この設定はリカバリー状態を保持するためのzookeeperディレクトリを設定するために使われます。 |
クラスタマネージャー
Sparkの各クラスタマネージャーは追加の設定オプションを持ちます。各ノードのためのページ上で設定を見つけることができます。
YARN
Mesos
スタンドアローンモード
環境変数
あるSpark設定は環境変数によって設定することができ、それらはSparkがインストールされたディレクトリ内のconf/spark-env.sh
スクリプトによって使われます(Windows上ではconf/spark-env.cmd
)。スタンドアローンおよびMesosモードでは、このファイルはホスト名のようなマシーン固有の情報を渡すことができるでしょう。ローカルのアプリケーションの実行あるいはスクリプトのサブミットの場合、それは開始場所にもなります。
conf/spark-env.sh
はSparkがインストールされた場合はデフォルトでは存在しないことに注意してください。しかし、それを作成するためにconf/spark-env.sh.template
をコピーすることができます。コピーを実行可能にすることを忘れないでください。
以下の変数はspark-env.sh
の中で設定することができます:
環境変数 | 意味 |
---|---|
JAVA_HOME |
Javaがインストールされた場所(デフォルトのPATH 上に無い場合)。 |
PYSPARK_PYTHON |
PySparkのためにドライバおよびワーカーの両方の中で使うPythonの実行可能バイナリ(デフォルトは、利用可能であればpython2.7 。そうでなければpython です)。 |
PYSPARK_DRIVER_PYTHON |
PySparkのためにドライバの中でのみ使うPythonの実行可能バイナリ(デフォルトはPYSPARK_PYTHON です)。 |
SPARKR_DRIVER_R |
SparkRシェルのために使われるRバイナリ実行ファイル(デフォルトは R )。 |
SPARK_LOCAL_IP |
バインドするマシーンのIPアドレス。 |
SPARK_PUBLIC_DNS |
Sparkプログラムが他のマシーンに知らせるホスト名。 |
上に加えて、各マシーンで使うコアの数や最大メモリのような、Sparkスタンドアローン クラスタ スクリプトを設定するためのオプションもあります。
spark-env.sh
はシェルスクリプトなので、それらのいくつかはプログラム的に設定することができます - 例えば、特定のネットワークインタフェースのIPを調べることでSPARK_LOCAL_IP
を計算することができるかもしれません。
注意: cluster
モードでYARN上のSparkを実行する場合は、 conf/spark-defaults.conf
ファイル内のspark.yarn.appMasterEnv.[EnvironmentVariableName]
を使って環境変数が設定される必要があります。spark-env.sh
に設定されている環境変数は、cluster
モードのYARN マスタープロセス内には反映されないでしょう。詳細についてはYARNに関係する Spark プロパティ を見てください。
ログの設定
Sparkはログのためにlog4jを使います。conf
ディレクトリに log4j.properties
ファイルを追加することで設定することができます。開始する一つの方法は、そこに存在する既存のlog4j.properties.template
をコピーすることです。
設定ディレクトリの上書き
デフォルトの"SPARK_HOME/conf"以外に異なる設定ディレクトリを指定するために、SPARK_CONF_DIRを設定することができます。Sparkはこのディレクトリから設定ファイル(spark-defaults.conf, spark-env.sh, log4j.properties など)を使用するでしょう。
Hadoopクラスタ設定の継承
Sparkを使ってHDFSから読み込みをするつもりであれば、Sparkのクラスパス上に含まれるべき二つのHadoop設定ファイルがあります。
hdfs-site.xml
は、HDFSクライアントのデフォルトの挙動を提供します。core-site.xml
は、デフォルトのファイル名を設定します。
これらの設定ファイルの場所はCDHおよびHDPバージョンによって変わりますが、一般的な場所は/etc/hadoop/conf
の中です。Cloudera マネージャのような幾つかのツールはその場で設定を生成しますが、それらのコピーをダウンロードするための仕組みを提供します。
それらのファイルがSparkに見えるようにするためには、$SPARK_HOME/spark-env.sh
内のHADOOP_CONF_DIR
を設定ファイルを含む場所に設定します。