YARN上でSparkを実行する
- セキュリティ
- YARN上でSparkを起動
- 準備
- 設定
- アプリケーションのデバッグ
- リソース割り当てと設定の概要
- ステージレベルのスケジューリングの概要
- 重要事項
- Kerberos
- 外部シャッフルサービスの設定
- Apache Oozieを使ってアプリケーションを起動
- Spark Web UIを置き換えるためのSpark履歴サーバの使用
- 複数のバージョンのSparkシャッフルサービスの実行
YARN (Hadoop NextGen)上での実行はバージョン0.6.0でSparkに追加され、続くリリースで改良されました。
セキュリティ
認証のようなセキュリティ機能はデフォルトでは有効にされていません。インターネットや信頼できないネットワークに開かれたクラスタを配備する場合、認証されていないアプリケーションをクラスタ上で実行することを防ぐためにクラスタへのアクセスを安全にすることが重要です。Sparkを実行する前にSpark セキュリティとこのドキュメントの特定のセキュリティの章を見てください。
YARN上でSparkを起動
HADOOP_CONF_DIR
あるいはYARN_CONF_DIR
がHadoopクラスタのための(クライアントサイドの)設定ファイルを含むディレクトリを指すようにします。これらの設定はHDFSに書き込み、YARNリソースマネージャーに接続するために使用されます。このディレクトリに含まれる設定はYARNクラスタに分配され、アプリケーションに使用される全てのコンテナは同じ設定を使用します。設定がJavaシステムプロパティを参照あるいは環境変数がYARNによって管理されていない場合は、それらはSparkアプリケーション設定(ドライバー、executor、およびクライアントモードで実行している場合はAM)にも設定されていなければなりません。
SparkアプリケーションをYARNで起動するのに使われる2つのデプロイモードがあります。cluster
モードでは、Sparkドライバはクラスタ上のYARNによって管理されるアプリケーションマスタープロセスの中で実行され、クライアントはアプリケーションの初期化の後で終了することができます。client
モードでは、ドライバーはクライアントプロセスの中で実行され、アプリケーションマスターはYARNからのリソースの要求のためだけに使用されます。
マスターのアドレスが --master
パラメータで指定されるSparkによってサポートされる他のクラスタマネージャーと異なり、YARNモードではリソースマネージャーのアドレスがHadoop設定から取り出されます。そして、--master
パラメータは yarn
です。
Sparkアプリケーションをcluster
モードで起動するには:
$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
例えば:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10
上記はデフォルトのアプリケーションマスターを開始するYARNクライアントプログラムを開始します。そして、SparkPiはアプリケーションマスターの子スレッドとして実行されるでしょう。クライアントはステータスの更新とそれらをコンソールに表示するために定期的にアプリケーションマスターをポーリングします。一旦アプリケーションの実行が終了するとクライアントは終了するでしょう。ドライバーおよびexecutorのログをどうやって見るかについては以下の Debugging your Applicationセクションを参照してください。
client
モードのSparkアプリケーションを起動するには同じことをしますが、 cluster
を client
と置き換えてください。以下は、spark-shell
を client
モードでどうやって実行することができるかを示します:
$ ./bin/spark-shell --master yarn --deploy-mode client
他のJARを追加
cluster
モードでは、ドライバーはクライアントではなく異なるマシーンで動作します。つまりSparkContext.addJar
はローカルからクライアントへの追加設定のファイル無しには動作しないでしょう。SparkContext.addJar
が利用可能なクライアントでファイルを作成するために、起動コマンドの中に--jars
オプションを使ってそれらを含めます。
$ ./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars my-other-jar.jar,my-other-other-jar.jar \
my-main-jar.jar \
app_arg1 app_arg2
準備
Sparkを自分自身でビルドするには、Sparkのビルドを参照してください。
YARN側からSparkランタイムのjarをアクセス可能にするために、spark.yarn.archive
あるいは spark.yarn.jars
を指定することができます。詳細はSpark Propertiesを参照してください。spark.yarn.archive
あるいは spark.yarn.jars
のどちらも指定されない場合は、Sparkは $SPARK_HOME/jars
の下にある全てのjarを使ってzipファイルを生成し、それを分散キャッシュにアップロードするでしょう。
設定
他のデプロイモードについては、ほとんどの設定はYARN上のSPARKと同じです。これらの詳細な情報は設定ページを見てください。これらはYARN上のSPARKに固有です。
アプリケーションのデバッグ
YARNの用語では、executorおよびアプリケーションマスターは"コンテナ"の中で実行されます。アプリケーションが完了した後でコンテナのログを処理するためにYARNには2つのモードがあります。ログの集約が有効な場合(yarn.log-aggregation-enable
設定を使用)、コンテナログはHDFSにコピーされ、ローカルマシーンからは削除されます。これらのログはyan log
コマンドを使ってクラスタのどこからでも見ることができます。
yarn logs -applicationId <app ID>
指定されたアプリケーションについて全てのコンテナからの全てのログファイルの内容を出力するでしょう。HDFSシェルあるいはAPIを使って直接HDFS内のコンテナログを見ることもできます。YARNの設定を調べることでそれらがあるディレクトリを見つけることができます(yarn.nodemanager.remote-app-log-dir
および yarn.nodemanager.remote-app-log-dir-suffix
)。このログはSpark Web UIのexecutorタブでも利用可能です。Spark履歴サーバとMapReduce履歴サーバの両方の動作が必要で、yarn-site.xml
適切にyarn.log.server.url
を設定する必要があります。Spark履歴サーバUI上のログURLは集約されたログを表示するためにMapReduce履歴サーバにリダイレクトするでしょう。
ログの集約をオンにしていない場合は、ログは各マシーン上のYARN_APP_LOGS_DIR
の下にローカルに保持されます。これは通常/tmp/logs
あるいは、Hadoopのバージョンおよびインストレーションに依存する $HADOOP_HOME/logs/userlogs
に設定されます。コンテナンのログを見るにはそれらを含むホストに行き、このディレクトリを調べる必要があります。サブディレクトリはアプリケーションIDとコンテナIDでログファイルを整理します。このログはSpark Web UIのexecutorタブでも利用可能で、MapReduce履歴サーバの実行を必要としません。
コンテナごとの起動環境をレビューするには、yarn.nodemanager.delete.debug-delay-sec
を大きな値(例えば、36000
)に増やし、コンテナが起動されたノード上のyarn.nodemanager.local-dirs
を使ってアプリケーションのキャッシュにアクセスします。このディレクトリは起動スクリプト、JAR、各コンテナを起動するために使われる全ての環境変数を含んでいます。このプロセスは特にクラスパスの問題をデバッグするのに役立ちます。(これを有効にするにはクラスタ設定の管理権限が必要で全てのノードマネージャーを再起動が必要になる事に注意してください。従ってこれはホストとされているクラスタでは適用できません)。
アプリケーションマスターあるいはexecutorのための独自log4j設定を使うには、以下の選択肢があります:
- アプリケーションと一緒にアップロードされる
--files
のファイルのリストにlog4j.propertiesを追加することで、spark-submit
を使って独自のlog4j.properties
をアップロードします。 -Dlog4j.configuration=<location of configuration file>
にspark.driver.extraJavaOptions
(ドライバーのため)、あるいはspark.executor.extraJavaOptions
(executorのため)を追加します。ファイルを使う場合は明示的にfile:
プロトコルが提供されなければならず、そのファイルは全てのノード上でローカルに存在しなければならない事に注意してください。$SPARK_CONF_DIR/log4j.properties
ファイルを更新すると、他の設定と一緒に自動的にアップロードされるでしょう。複数のオプションが指定された場合は、このオプションよりも他の2つのオプションが優先されることに注意してください。
最初の選択肢は、executorとアプリケーションマスターの両方が同じlog4j設定を共有するでしょう。これは同じノード上で動作する場合に問題を起こすかもしれません(例えば、同じログファイルに書き込もうとします)。
YARNがログを適切に表示および集約できるようにYARN内でログファイルを置く適切な場所への参照を必要とする場合は、log4j.propertiesのspark.yarn.app.container.log.dir
を使ってください。例えば、log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log
。ストリーミングアプリケーションについては、RollingFileAppender
の設置とファイルの場所をYARNのログディレクトリに設定することで、大きなログファイルによって起こるディスクオーバーフローを避けることができ、ログはYARNのログユーティリティを使ってアクセスすることができるでしょう。
アプリケーションマスターとexecutorのために独自の metrics.properties を使うには、$SPARK_CONF_DIR/metrics.properties
ファイルを更新します。他の設定と一緒に自動的にアップロードされるでしょう。つまり、手動で --files
を使って指定する必要はありません。
Sparkのプロパティ
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.yarn.am.memory |
512m |
クライアントモードのYARNアプリケーションのために使われるメモリの量は、JVMメモリ設定と同じフォーマットです。(例えば、 512m , 2g )。クラスタモードでは、代わりにspark.driver.memory を使います。
小文字のサフィックスを使ってください。例えば k , m , g , t と p は、それぞれ kibi-, mebi-, gibi-, tebi- および pebibytes です。
|
1.3.0 |
spark.yarn.am.resource.{resource-type}.amount |
(none) |
クライアントモードでYARNアプリケーションマスターのために使うリソースの量。クラスタモードでは、代わりにspark.yarn.driver.resource.<resource-type>.amount を使用します。この機能は、YARM 3.0+ でのみ使えることに注意してください。参考のために、YARN リソースモデルドキュメントを見てください: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
例: YARM から GPU リソースをリクエストするには、spark.yarn.am.resource.yarn.io/gpu.amount を使ってください
|
3.0.0 |
spark.yarn.applicationType |
SPARK |
アプリケーションの型をもっと明確に定義します。例えば、SPARK , SPARK-SQL , SPARK-STREAMING , SPARK-MLLIB , SPARK-GRAPH 。20文字を超えないように注意してください。
|
3.1.0 |
spark.yarn.driver.resource.{resource-type}.amount |
(none) |
クラスタモードでYARNアプリケーションマスターのために使うリソースの量。この機能は、YARM 3.0+ でのみ使えることに注意してください。参考のために、YARN リソースモデルドキュメントを見てください: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
例: YARM から GPU リソースをリクエストするには、spark.yarn.driver.resource.yarn.io/gpu.amount を使ってください
|
3.0.0 |
spark.yarn.executor.resource.{resource-type}.amount |
(none) |
executor プロセスごとに使用するリソースの量。この機能は、YARM 3.0+ でのみ使えることに注意してください。参考のために、YARN リソースモデルドキュメントを見てください: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
例: YARM から GPU リソースをリクエストするには、spark.yarn.executor.resource.yarn.io/gpu.amount を使ってください
|
3.0.0 |
spark.yarn.resourceGpuDeviceName |
yarn.io/gpu |
Sparkリソース型のgpu のマッピングをGPUを表すYARNリソースに指定します。デフォルトではYARNはyarn.io/gpu を使いますが、YARNが独自のリソース型で構成されている場合、これにより再マッピングが可能になります。spark.{driver/executor}.resource.gpu.* 設定を使う場合に適用されます。
|
3.2.1 |
spark.yarn.resourceFpgaDeviceName |
yarn.io/fpga |
Sparkリソース型のfpga のマッピングをFPGAを表すYARNリソースに指定します。デフォルトではYARNはyarn.io/fpga を使いますが、YARNが独自のリソース型で構成されている場合、これにより再マッピングが可能になります。spark.{driver/executor}.resource.fpga.* 設定を使う場合に適用されます。
|
3.2.1 |
spark.yarn.am.cores |
1 |
クライアントモードでYARNアプリケーションマスターのために使うコアの数。クラスタモードでは、代わりにspark.driver.cores を使います。
|
1.3.0 |
spark.yarn.am.waitTime |
100s |
cluster モードだけで使われます。YARNアプリケーション マスターがSparkContextが初期化されるのを待つ時間。
|
1.3.0 |
spark.yarn.submit.file.replication |
デフォルトのHDFSレプリケーション(通常は3 )。 |
アプリケーションのためにHDFSにアップロードされたファイルのHDFSレプリケーションレベル。これらにはSpark jar, アプリjar, および全ての分散キャッシュファイル/圧縮ファイル が含まれます。 | 0.8.1 |
spark.yarn.stagingDir |
ファイルシステム内の現在のユーザのホームディレクトリ | アプリケーションのサブミット中に使われるステージングディレクトリ。 | 2.0.0 |
spark.yarn.preserve.staging.files |
false |
ステージ化されたファイル(Spark jar, app jar, 分散キャッシュファイル)を削除せずにジョブの最後で保持するためにはtrue に設定します。
|
1.1.0 |
spark.yarn.scheduler.heartbeat.interval-ms |
3000 |
SparkアプリケーションマスターがYARNリソースマネージャーの生存確認をするミリ秒単位の間隔。値はYARNの期限切れ間隔、つまりyarn.am.liveness-monitor.expiry-interval-ms 、の設定の半分の値で上限を設定されます。
|
0.8.1 |
spark.yarn.scheduler.initial-allocation.interval |
200ms |
延期されているコンテナ割り当てリクエストがある場合は、SparkアプリケーションマスターがしきりにYARN ResourceManagerにハートビートを送信する初期間隔。spark.yarn.scheduler.heartbeat.interval-ms より長くてはいけません。もし延期されているコンテナがまだある場合は、spark.yarn.scheduler.heartbeat.interval-ms に到達するまで割り当てられる連続するハートビートは2倍の間隔に割り当てられるでしょう。
|
1.4.0 |
spark.yarn.max.executor.failures |
最小は3で、numExecutors * 2。 | アプリケーションが失敗するまでのexecutorの失敗の最大の数。 | 1.0.0 |
spark.yarn.historyServer.address |
(none) |
Spark履歴サーバのアドレス、例えばhost.com:18080 。アドレスはスキーマ(http:// )を含んではいけません。履歴サーバは任意のサービスのため、デフォルトは設定されません。SparkアプリケーションがResourceManger UIからSpark履歴サーバUIにアプリケーションをリンクし終わった場合は、このアドレスはYARN ResourceManagerに渡されます。このプロパティに関しては、YARNプロパティは変数として使用することができ、これらは実行時にSparkによって代用されます。例えば、もしSpark履歴サーバがYARNリソースマネージャーとして同じノード上で実行している場合は、${hadoopconf-yarn.resourcemanager.hostname}:18080 に設定することができます。
|
1.0.0 |
spark.yarn.dist.archives |
(none) | 各executorの作業ディレクトリに解凍される圧縮ファイルのカンマ区切りのリスト。 | 1.0.0 |
spark.yarn.dist.files |
(none) | 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。 | 1.0.0 |
spark.yarn.dist.jars |
(none) | 各executorの作業ディレクトリに配置されるカンマ区切りのjarのリスト。 | 2.0.0 |
spark.yarn.dist.forceDownloadSchemes |
(none) |
YARNの分散キャッシュに追加される前にローカルディスクにダウンロードされるリソースのスキーマのカンマ区切りのリスト。http, https および ftpのような、YARNサービスがSparkによってサポートされるスキーマをサポートしない場合、あるいはローカルのYARNクライアントのクラスパス内にある必要があるjarに使います。ワイルドカード '*' は全てのスキーマについてリソースをダウンロードすることを示します。 | 2.3.0 |
spark.executor.instances |
2 |
静的な割り当てのためのexecutorの数。spark.dynamicAllocation.enabled を使って、executorの初期セットは少なくともこの多さになるでしょう。
|
1.0.0 |
spark.yarn.am.memoryOverhead |
最小384で、AM memory * 0.10 |
spark.driver.memoryOverhead と同じですが、クライアントモードのYARNアプリケーションマスターのためのものです。
|
1.3.0 |
spark.yarn.queue |
デフォルト: |
アプリケーションがサブミットされるYARNキューの名前。 | 1.0.0 |
spark.yarn.jars |
(none) |
YARNコンテナに配布するSparkコードを含むライブラリのリスト。デフォルトでは、YARN上のSparkはローカルにインストールされたSpark jarを使いますが、Spark jarはHDFS上のworld-readableな場所にもあります。これにより、YARNはアプリケーションが実行される度に分配されないようにキャッシュすることができます。HDFS上のjarを示すには、例えば、この設定をhdfs:///some/path にします。globも可能です。
|
2.0.0 |
spark.yarn.archive |
(none) |
YARNキャッシュに配布するために必要とされるSpark jarを含む書庫。設定された場合は、この設定はspark.yarn.jars を置き換え、全てのアプリケーションのコンテナの中で書庫が使われます。書庫はそのルートディレクトリにjarファイルを含まなければなりません。前のオプションを使う場合と同じように、ファイルの分散を高速化するため書庫はHDFS上にホストすることもできます。
|
2.0.0 |
spark.yarn.appMasterEnv.[EnvironmentVariableName] |
(none) |
EnvironmentVariableName によって指定される環境変数を、YARN上で起動されたアプリケーションマスタープロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。cluster モードでは、これはSPARKドライバの環境を制御し、client モードではexecutorのlauncherの環境のみを制御します。
|
1.1.0 |
spark.yarn.containerLauncherMaxThreads |
25 |
YARNアプリケーションマスタ内でexecutorコンテナを起動するために使用するスレッドの最大数。 | 1.2.0 |
spark.yarn.am.extraJavaOptions |
(none) |
クライアントモードでYARNアプリケーションマスターに渡す特別なJVMオプションのリスト。クラスタモードでは、代わりにspark.driver.extraJavaOptions を使用します。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は spark.yarn.am.memory を使って設定することができます。
|
1.3.0 |
spark.yarn.am.extraLibraryPath |
(none) | クライアントモードでYARNアプリケーションマスターを起動する場合に設定する特別なライブラリのパス。 | 1.4.0 |
spark.yarn.populateHadoopClasspath |
with-hadoop Spark配布物の場合、これはfalseに設定されます; no-hadoop 配布物の場合、これはtrueに設定されます。
|
Hadoop クラスパスをyarn.application.classpath とmapreduce.application.classpath から設定するかどうか。これがfalse に設定されている場合、Hadoopランタイムを同梱するwith-Hadoop Spark配布物を必要とするか、ユーザが別にHadoopインストールを提供する必要があります。
|
2.4.6 |
spark.yarn.maxAppAttempts |
YARN内のyarn.resourcemanager.am.max-attempts |
アプリケーションをサブミットするために行われる試行の最大数。YARN設定内の最大試行数のグローバル数より大きくてはなりません。 | 1.3.0 |
spark.yarn.am.attemptFailuresValidityInterval |
(none) | AM障害追跡のための有効期間を定義します。AMが少なくとも定義された期間実行している場合は、AM障害回数はリセットされます。設定されていない場合はこの機能は有効ではありません。 | 1.6.0 |
spark.yarn.executor.failuresValidityInterval |
(none) | executorの障害追跡のための有効期間を定義します。有効期間より古いexecutorの障害は無視されるでしょう。 | 2.0.0 |
spark.yarn.submit.waitAppCompletion |
true |
YARNクラスタモードでは、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。true に設定した場合は、クライアントプロセスはアプリケーションの活動状態を報告し続けるでしょう。そうでなければ、クライアントプロセスはサブミットのあとで終了するでしょう。
|
1.4.0 |
spark.yarn.am.nodeLabelExpression |
(none) | ノードのAMのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。 | 1.6.0 |
spark.yarn.executor.nodeLabelExpression |
(none) | ノードのexecutorのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。 | 1.4.0 |
spark.yarn.tags |
(none) | YARNアプリケーションレポートの中に現れるYARNアプリケーションのタグとして渡される文字列のカンマ区切りのリスト。これはYARNアプリケーションを問い合わせる時にフィルターとして使うことができます。 | 1.5.0 |
spark.yarn.priority |
(none) | YARN が保留中のアプリケーションの順序付けポリシーを定義するためのアプリケーションの優先順位。整数値が大きいほど、アクティブ化される機会が多くなります。現在のところ、YARN は FIFO 順序付けポリシーを使っている場合のみアプリケーションの優先順位をサポートします。 | 3.0.0 |
spark.yarn.config.gatewayPath |
(none) |
(Sparkアプリケーションが開始された)ゲートウェイホスト上で有効なパスですが、クラスタ内の他のノードの同じリソースでは異なるパスかも知れません。spark.yarn.config.replacementPath と合わせて、Sparkがリモートプロセスを正しく起動できるように、異質な設定を持つクラスタをサポートするために使われます。
置換パスは通常なんらかのYARNによってexportされた環境変数へのリファレンスを含みます(従ってSparkコンテナから見えます)。
例えば、もしゲートウェイノードが /disk1/hadoop 上にインストールされたHadoopライブラリを持ち、Hadoopがインストールされた場所がYARNによってHADOOP_HOME 環境変数としてexportされた場所であれば、この値を /disk1/hadoop に設定して置換パスを$HADOOP_HOME に設定することは、リモートプロセスによって起動するために使われるパスが適切にローカルのYARN設定を参照するようにするでしょう。
|
1.5.0 |
spark.yarn.config.replacementPath |
(none) |
spark.yarn.config.gatewayPath を見てください。
|
1.5.0 |
spark.yarn.rolledLog.includePattern |
(none) |
定義されたincludeパターンに合致するログファイルをフィルタするJava Regexで、それらのログファイルはrolling形式で集約されるでしょう。これはYARNのrollingログ集約と一緒に使われ、この機能を有効にするためにYARN側では yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds が設定されるべきです。Spark log4j appender は、FileAppenderあるいは実行中に削除されるファイルを扱うことができる他のappenderを使うように変更する必要があります。(spark.logのような)log4j設定の中で設定されたファイル名に基づいて、ユーザは集約する必要がある全てのログファイルを含むようにregex(spark*)を設定する必要があります。
|
2.0.0 |
spark.yarn.rolledLog.excludePattern |
(none) | 定義されたincludeパターンを除外するログファイルをフィルタするJava Regexで、それらのログファイルはrolling形式で集約されないでしょう。ログファイル名がincludeとexcludeのパターンの両方に合致する場合、このファイルは結果的に除外されるでしょう。 | 2.0.0 |
spark.yarn.executor.launch.excludeOnFailure.enabled |
false |
YARNのリソース割り当ての問題を持つノードの除外を有効にするためのフラグ。除外のためのエラーの制限は、spark.excludeOnFailure.application.maxFailedExecutorsPerNode で設定できます。
|
2.4.0 |
spark.yarn.exclude.nodes |
(none) | リソース割り当てから除外される YARN ノード名のカンマ区切りのリスト。 | 3.0.0 |
spark.yarn.metrics.namespace |
(none) | AMメトリクスのレポートのためのルートの名前空間。設定されない場合は、YARNアプリケーションIDが使われます。 | 2.4.0 |
SHS カスタム executor ログ URL で利用可能なパターン
Pattern | 意味 |
---|---|
{{HTTP_SCHEME}} | YARN HTTPポリシーに応じて、http:// やhttps:// 。(yarn.http.policy を介して設定されます) |
{{NM_HOST}} | コンテナが実行されたノードの "ホスト"。 |
{{NM_PORT}} | コンテナが実行されたノードマネージャの "ポート"。 |
{{NM_HTTP_PORT}} | コンテナが実行されたノードマネージャの http サーバの "ポート"。 |
{{NM_HTTP_ADDRESS}} | コンテナが割り当てられているノードの Http URI。 |
{{CLUSTER_ID}} | リソースマネージャのクラスタ ID。(yarn.resourcemanager.cluster-id を介して設定されます) |
{{CONTAINER_ID}} | コンテナの ID。 |
{{USER}} | システム環境上のSPARK_USER 。 |
{{FILE_NAME}} | stdout , stderr 。 |
例えば、ノードマネージャの http サーバにリダイレクトさせずに、ログ URL リンクをジョブ履歴サーバに直接ポイントする場合は、以下のように spark.history.custom.executor.log.url
を設定することができます:
{{HTTP_SCHEME}}<JHS_HOST>:<JHS_PORT>/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096
注意: <JHS_HOST>
と<JHS_PORT>
を実際の値で置き換える必要があります。
リソース割り当てと設定の概要
設定ページのカスタムリソーススケジューリングと設定概要セクションを必ず読んでください。このセクションでは、リソーススケジュールの YARN 固有の側面についてのみ説明します。
YARN は、ユーザが Spark で使用したいリソースをサポートするように設定する必要があります。YARN のリソーススケジューリングは YARN 3.1.0 で追加されました。リソースの設定と分離の適切な設定の詳細については、YARN ドキュメントを見てください。理想的には、リソースは executor が割り当てられたリソースのみを見ることができるように、分離してセットアップされます。分離を有効にしていない場合、ユーザはリソースが executor 間で共有されないようにする検出スクリプトを作成する必要があります。
YARNはユーザ定義のリソース型をサポートしますが、GPU (yarn.io/gpu
)とFPGA (yarn.io/fpga
)の組み込み型があります。そのため、これらのリソースのいずれかを使っている場合は、Spark はSpark リソースのリクエストを YARN リソースに変換でき、spark.{driver/executor}.resource.
設定を指定するだけで済みます。YARNでGPUやFPGAに独自のリソース型を使う場合は、spark.yarn.resourceGpuDeviceName
とspark.yarn.resourceFpgaDeviceName
を使ってSparkのマッピングを変更できることに注意してください。FPGA あるいは GPU 以外のリソースを使っている場合は、ユーザは YARN (spark.yarn.{driver/executor}.resource.
) と Spark (spark.{driver/executor}.resource.
) の両方の設定を指定する必要があります。
例えば、ユーザは executor ごとに2つの GPU を要求したいとします。ユーザは、spark.executor.resource.gpu.amount=2
を指定するだけで、Spark は YARN からyarn.io/gpu
リソース型のリクエストを処理します。
ユーザがユーザ定義の YARN リソースを持つ場合は、それを acceleratorX
と呼び、ユーザは spark.yarn.executor.resource.acceleratorX.amount=2
と spark.executor.resource.acceleratorX.amount=2
を指定する必要があります。
YARN は、各コンテナに割り当てられたリソースのアドレスを Spark に通知しません。そのため、ユーザは起動時に executor によって実行される検出スクリプトを指定して、その executor が利用可能なリソースを検出する必要があります。サンプルスクリプトは examples/src/main/scripts/getGpusResources.sh
にあります。スクリプトには実行権限が設定されている必要があり、ユーザは悪意のあるユーザがスクリプトを変更できないように権限を設定する必要があります。スクリプトは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、リソース名とその executor だけが利用できるリソースアドレスの配列を持ちます。
ステージレベルのスケジューリングの概要
動的割り当てが有効な場合、YARNでステージレベルのスケジューリングがサポートされます。YARN固有で1つ注意すべきことは、各ResourceProfileはYARNで異なるコンテナ優先度を必要とすることです。The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. これは、先に作成されたプロファイルがYARNでより高い優先度を持つことを意味します。Sparkは別のステージを開始する前に1つのステージを終了するため、通常これは問題になりません。これが影響を与える可能性があるのは、ジョブサーバ型のシナリオのみであるため、注意が必要です。基本のデフォルトプロファイルと独自のResourceProfileの間でカスタムリソースの処理方法に違いがあることに注意してください。ユーザがSparkスケジューリング無しで追加のリソースを含むYARNコンテナを要求できるようにするために、ユーザはspark.yarn.executor.resource.
設定を介してリソースを指定できます。ただしこれらの設定は基本のデフォルトプロファイルのみで使われ、他の独自のResourceProfileには伝搬されません。これはステージにそれらを持たせたくない場合、それらを削除する方法がないためです。これによりデフォルトのプロファイルでspark.yarn.executor.resource.
で定義された独自のリソースと、GPUまたはFPGAのsparkで定義されたリソースが取得されることになります。SparkはGPUとFPGAリソースをYARN組み込み型yarn.io/gpu
とyarn.io/fpga
に変換しますが、他のリソースのマッピングを認識しません。その他のSpark独自のリソースは、デフォルトプロファイルのYARNに伝播されません。したがって、独自のリソースに基づいてSparkをスケジュールし、YARNから要求する場合は、YARN(spark.yarn.{driver/executor}.resource.
)とSpark(spark.{driver/executor}.resource.
)設定から指定しなければなりません。追加のリソースを含むYARNコンテナのみが必要で、Sparkがそれらの使用をスケジュールしない場合は、Spark設定をオフのままにします。現在のところ、独自のResourceProfileの場合、SparkがそれらをスケジューリングせずにYARNリソースのみを指定する方法はありません。これは、独自のResourceProfileの場合、ResourceProfileで定義されたすべてのリソースをYARNに伝播することを意味します。まだ、GPUとFPGAをYARNの組み込み型に変換します。これは、指定する独自のリソースの名前がYARNで定義されているものと一致する必要があります。
重要事項
- スケジュールの決定時にコアのリスクエストが尊重されるかどうかは、どのスケジューラが使われているか、およびどのように設定されているかに依存します。
cluster
モードではSpark executorおよびSpark ドライバによって使われるローカルディレクトリは、YARNのための設定のローカルディレクトリでしょう((Hadoop YARN configyarn.nodemanager.local-dirs
)。ユーザがspark.local.dir
を指定した場合は、無視されるでしょう。client
モードでは、Spark executorはYARNのために設定されたローカルディレクトリを使うでしょう。一方でSparkドライバはspark.local.dir
で定義されたそれらを使うでしょう。これはSparkドライバはclient
モードではYARNクラスタ上で実行されないからです。Spark executorのみがそうします。--files
および--archives
オプションはHadoopに似て # のファイル名の指定をサポートします。例えば、以下のように指定することができます:--files localtest.txt#appSees.txt
とこれは、ローカルでlocaltest.txt
と名前をつけたファイルをHDFSにアップロードするでしょうが、これはappSees.txt
という名前によってリンクされ、YARN上で実行する場合にアプリケーションはappSees.txt
として名前を使わなければなりません。--jars
オプションにより、ローカルファイルを使っていてcluster
モードで実行している場合は、SparkContext.addJar
関数を動作することができます。 HDFS, HTTP, HTTPS あるいはFTPファイルを使っている場合は使用する必要はありません。
Kerberos
Sparkでの標準的なKerberosサポートはセキュリティのページでカバーされます。
YARN モードでは、Hadoop ファイルシステムにアクセスすると、hadoop 設定のデフォルトのファイルシステム以外に、Spark は Spark アプリケーションのステージングディレクトリをホストするサービスの移譲トークンも自動的に取得します。
YARN固有のKerberos設定
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.kerberos.keytab |
(none) |
上で指定されたプリンシパルのためのキータブを含むファイルへのフルパス。このキータブはYARN分散キャッシュを使ってYARNアプリケーションマスターを実行しているノードにコピーされるでしょう。ログインチケットと委任トークンを定期的に更新するために使われるでしょう。--keytab コマンドラインの引数と同じです。("local"マスターとも協調して動作します。) |
3.0.0 |
spark.kerberos.principal |
(none) |
secure クラスタ上で動作する間にKDCにログインするために使われるプリンシパル。--principal コマンドラインの引数と同じです。("local"マスターとも協調して動作します。) |
3.0.0 |
spark.yarn.kerberos.relogin.period |
1m | kerberos TGTが更新されるべきかどうかをどれだけの頻度で調べるか。これはTGTの更新の期間(あるいはもしTGT更新が無効な場合はTGTの生存期間)よりも短い値に設定されるべきです。デフォルト値はほとんどの配備で十分な筈です。 | 2.3.0 |
spark.yarn.kerberos.renewal.excludeHadoopFileSystems |
(none) |
リソーススケジュールでの委任トークンからホストが除外されるHadoopファイルシステムのカンマ区切りのリスト。例えば、spark.yarn.kerberos.renewal.excludeHadoopFileSystems=hdfs://nn1.com:8032, hdfs://nn2.com:8032 。これは今のところYARNで動作することが分かっているため、YARNリソースマネージャーはアプリケーションのトークンを更新しません。リソーススケジューラはトークンを更新しないため、そのトークンを使おうとする元のトークンの有効期限よりも長く実行されているアプリケーションは失敗する可能性があることに注意してください。
|
3.2.0 |
Kerberosのトラブルシューティング
Hadoop/Kerberos 問題は"困難"になる可能性があります。有用な方法のひとつはHADOOP_JAAS_DEBUG
環境変数を設定することでHadoop内のKerberos操作の特別なログを有効にすることです。
export HADOOP_JAAS_DEBUG=true
JDK クラスはシステムプロパティ sun.security.krb5.debug
と sun.security.spnego.debug=true
を設定することで、Kerberosおよび SPNEGO/REST 認証の特別なログを有効に設定することができます。
-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
これら全てのオプションはアプリケーションマスター内で有効にすることができます:
spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true
spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
最後に、org.apache.spark.deploy.yarn.Client
のログレベルが DEBUG
に設定された場合、ログには全ての取得されたトークンのリストとそれらの失効の詳細が含まれるでしょう。
外部シャッフルサービスの設定
YARNクラスタ内の各NodeManager
上でSparkシャッフルサービスを開始するには、以下の説明に従ってください:
- SparkをYARN profileを使ってビルドします。pre-packaged配布物を使っている場合はこのステップをスキップします。
spark-<version>-yarn-shuffle.jar
を配置します。Sparkを自身で構築した場合は$SPARK_HOME/common/network-yarn/target/scala-<version>
の下に無ければならず、配布物を使っている場合はyarn
の下に無ければなりません。- このjarをクラスタ内の全ての
NodeManager
のクラスパスに追加します。 - 各ノード上の
yarn-site.xml
の中で、spark_shuffle
をyarn.nodemanager.aux-services
に追加し、その後、yarn.nodemanager.aux-services.spark_shuffle.class
をorg.apache.spark.network.yarn.YarnShuffleService
に設定します。 - シャッフル中のガベージコレクションの問題を避けるために
etc/hadoop/yarn-env.sh
内のYARN_HEAPSIZE
(デフォルトは1000) を設定することで、NodeManager
のヒープサイズを増やします。 - クラスタ内の全ての
NodeManager
を再起動します。
YARN上でシャッフルサービスが動作している場合は、以下の特別な設定オプションが利用可能です:
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.yarn.shuffle.stopOnFailure |
false |
Sparkシャッフルサービスの初期化で障害がある場合に、ノードマネージャーを停止するかどうか。これは、Sparkシャッフルサービスが実行中では無いノードマネージャー上でコンテナを実行することで起こるアプリケーション障害を防ぎます。 |
spark.yarn.shuffle.service.metrics.namespace |
sparkShuffleService |
NodeManagerのHadoop metrics2 システムへシャッフルサービスメトリクスを発行する時に使う名前空間。 |
上記の手順はデフォルトのシャッフルサービス名spark_shuffle
が使われることを前提としていることに注意してください。ここでは任意の名前を使えますが、YARN NodeManager設定で使われる値はSparkアプリケーションのspark.shuffle.service.name
の値と一致する必要があります。
シャッフルサービスは、デフォルトでNodeManagerによって使われるHadoop設定から全ての設定を取得します(例、yarn-site.xml
)。ただし、spark-shuffle-site.xml
という名前のファイルを使って、シャッフルサービスを個別に設定することもできます。このファイルはシャッフルサービスのクラスパスに配置する必要があります(デフォルトでは、NodeManagerのクラスパスで共有されます)。シャッフルサービスはこれを標準のHadoop設定リソースとして扱い、NodeManagerの設定を上書きします。
Apache Oozieを使ってアプリケーションを起動
Apache Oozie はSparkアプリケーションをワークフローの一部として起動することができます。secureクラスタ内で、起動されたアプリケーションはクラスタのサービスにアクセスするために関係するトークンを必要とするでしょう。Sparkがkeytabを使って起動された場合は、これは自動的です。しかし、もしSparkがkeytab無しで起動された場合は、securityをセットアップする責任はOozieに移譲されなければなりません。
secureなクラスタのためにOozieを設定およびジョブのための証明書を取得する詳細は、特定のリリースドキュメントの"認証"の章のOozie web サイトで見つけることができます。
Sparkのアプリケーションのために、OozieワークフローはOozieがアプリケーションが必要とする以下を含む全てのトークンを要求するようにセットアップしなければなりません:
- YARN リソースマネージャー。
- ローカルのHadoopファイルシステム。
- I/Oの起点あるいは宛先として使用されるリモートのHadoopファイルシステム。
- Hive - もし使っていれば。
- HBase - もし使っていれば。
- アプリケーションがYARNタイムラインサーバとやり取りするのであれば、YARNタイムラインサーバ。
SparkがHive, HBaseおよびリモートのHDFSトークンを取得しようと試行 - そして失敗 - するのを避けるために、Sparkの設定はサービスのためのトークンのコレクションを無効に設定しなければなりません。
Sparkの設定は以下の行を含む必要があります:
spark.security.credentials.hive.enabled false
spark.security.credentials.hbase.enabled false
設定オプション spark.kerberos.access.hadoopFileSystems
は未設定でなければなりません。
Spark Web UIを置き換えるためのSpark履歴サーバの使用
アプリケーション UIが無効の時に、実行中のアプリケーションのための追跡のURLとしてSpark履歴サーバ アプリケーションを使うことができます。これはsecureなクラスタ、あるいはSparkドライバのメモリの使用を減らすために望ましいかも知れません。Spark履歴サーバを使って追跡を設定するには、以下を行います:
- アプリケーション側で、Sparkの設定で
spark.yarn.historyServer.allowTracking=true
を設定します。これはアプリケーションのUIが無効の時に、追跡URLとして履歴サーバのURIを使うようにSparkに伝えます。 - Spark履歴サーバ側で、
spark.ui.filters
設定の中のフィルタのリストにorg.apache.spark.deploy.yarn.YarnProxyRedirectFilter
を追加します。
履歴サーバの情報はアプリケーションの状態が原因で最新ではないかも知れないことに注意してください。
複数のバージョンのSparkシャッフルサービスの実行
このセクションは、YARNのバージョン 2.9.0 以上で実行される時にのみ適用されることに注意してください。
場合によってでゃ、異なるバージョンのSparkを使っているSparkシャッフルサービスの複数のインスタンスを実行することが望ましい場合があります。これは、例えば、複数のSparkバージョンを実行するアプリケーションのワークロードが混在するYARNクラスタを実行する場合に役立ちます。これは、特定のバージョンのシャッフルサービスが他のバージョンのSparkと常に互換性があるとは限らないためです。2.9.0以降のYARNバージョンは分離されたクラスローダ内でシャッフルサービスを実行する機能をサポートします(YARN-4577を見てください)。つまり、単一のNodeManager内で複数のSparkバージョンが共存できます。yarn.nodemanager.aux-services.<service-name>.classpath
、YARN 2.10.2/3.1.1/3.2.0 以降、yarn.nodemanager.aux-services.<service-name>.remote-classpath
オプションを使ってこれを設定できます。別々のクラスパスを設定することに加えて、2つのバージョンが異なるポートに広告できることを確認する必要があります。これは上記のspark-shuffle-site.xml
ファイルを使って実現できます。例えば、以下のような構成に設定することができます:
yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y
yarn.nodemanager.aux-services.spark_shuffle_x.classpath = /path/to/spark-x-yarn-shuffle.jar,/path/to/spark-x-config
yarn.nodemanager.aux-services.spark_shuffle_y.classpath = /path/to/spark-y-yarn-shuffle.jar,/path/to/spark-y-config
2つのspark-*-config
ディレクトリには、それぞれのファイル spark-shuffle-site.xml
が含まれています。これらはHadoop Configuration formatのXMLであり、それぞれが使うポート番号とメトリクス名のプリフィックスを調整するためのいくつかの設定が含まれます。
<configuration>
<property>
<name>spark.shuffle.service.port</name>
<value>7001</value>
</property>
<property>
<name>spark.yarn.shuffle.service.metrics.namespace</name>
<value>sparkShuffleServiceX</value>
</property>
</configuration>
2つの異なるサービスでは値が両方とも異なる必要があります。
次に、Sparkアプリケーションの設定で、1つは以下のように設定する必要があります。
spark.shuffle.service.name = spark_shuffle_x
spark.shuffle.service.port = 7001
もう1つは以下のように設定する必要があります:
spark.shuffle.service.name = spark_shuffle_y
spark.shuffle.service.port = <other value>