YARN上でSparkを実行する
YARN (Hadoop NextGen)上での実行はバージョン0.6.0でSparkに追加され、続くリリースで改良されました。
YARN上でSparkを起動
HADOOP_CONF_DIR
あるいはYARN_CONF_DIR
がHadoopクラスタのための(クライアントサイドの)設定ファイルを含むディレクトリを指すようにします。これらの設定はHDFSに書き込み、YARNリソースマネージャーに接続するために使用されます。このディレクトリに含まれる設定はYARNクラスタに分配され、アプリケーションに使用される全てのコンテナは同じ設定を使用します。設定がJavaシステムプロパティを参照あるいは環境変数がYARNによって管理されていない場合は、それらはSparkアプリケーション設定(ドライバー、executor、およびクライアントモードで実行している場合はAM)にも設定されていなければなりません。
SparkアプリケーションをYARNで起動するのに使われる2つのデプロイモードがあります。cluster
モードでは、Sparkドライバはクラスタ上のYARNによって管理されるアプリケーションマスタープロセスの中で実行され、クライアントはアプリケーションの初期化の後で終了することができます。client
モードでは、ドライバーはクライアントプロセスの中で実行され、アプリケーションマスターはYARNからのリソースの要求のためだけに使用されます。
マスターアドレスが--master
パラメータで指定されるSparkスタンドアローンおよびMesosモードとは異なり、YARNモードではリソースマネージャーのアドレスがHadoop設定から取り出されます。そして、--master
パラメータはyarn
です。
Sparkアプリケーションをcluster
モードで起動するには:
$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
例えば:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10
上記はデフォルトのアプリケーションマスターを開始するYARNクライアントプログラムを開始します。そして、SparkPiはアプリケーションマスターの子スレッドとして実行されるでしょう。クライアントはステータスの更新とそれらをコンソールに表示するために定期的にアプリケーションマスターをポーリングします。一旦アプリケーションの実行が終了するとクライアントは終了するでしょう。ドライバーおよびexecutorのログをどうやって見るかについては以下の"Debugging your Application"セクションを参照してください。
client
モードのSparkアプリケーションを起動するには同じことをしますが、 cluster
を client
と置き換えてください。以下は、spark-shell
を client
モードでどうやって実行することができるかを示します:
$ ./bin/spark-shell --master yarn --deploy-mode client
他のJARを追加
cluster
モードでは、ドライバーはクライアントではなく異なるマシーンで動作します。つまりSparkContext.addJar
はローカルからクライアントへの追加設定のファイル無しには動作しないでしょう。SparkContext.addJar
が利用可能なクライアントでファイルを作成するために、起動コマンドの中に--jars
オプションを使ってそれらを含めます。
$ ./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2
準備
YARN上のSparkの実行にはYARNサポートでビルドしたSparkのバイナリ配布物が必要です。バイナリ配布物はプロジェクトのwebサイトのダウンロードページ</a>からダウンロードすることができます。Sparkを自分自身でビルドするには、Sparkのビルドを参照してください。
設定
他のデプロイモードについては、ほとんどの設定はYARN上のSPARKと同じです。これらの詳細な情報は設定ページを見てください。これらはYARN上のSPARKに固有です。
アプリケーションのデバッグ
YARNの用語では、executorおよびアプリケーションマスターは"コンテナ"の中で実行されます。アプリケーションが完了した後でコンテナのログを処理するためにYARNには2つのモードがあります。ログの集約が有効な場合(yarn.log-aggregation-enable
設定を使用)、コンテナログはHDFSにコピーされ、ローカルマシーンからは削除されます。これらのログはyan log
コマンドを使ってクラスタのどこからでも見ることができます。
yarn logs -applicationId <app ID>
指定されたアプリケーションについて全てのコンテナからの全てのログファイルの内容を出力するでしょう。HDFSシェルあるいはAPIを使って直接HDFS内のコンテナログを見ることもできます。YARNの設定を調べることでそれらがあるディレクトリを見つけることができます(yarn.nodemanager.remote-app-log-dir
および yarn.nodemanager.remote-app-log-dir-suffix
)。このログはSpark Web UIのexecutorタブでも利用可能です。Spark履歴サーバとMapReduce履歴サーバの両方の動作が必要で、yarn-site.xml
適切にyarn.log.server.url
を設定する必要があります。Spark履歴サーバUI上のログURLは集約されたログを表示するためにMapReduce履歴サーバにリダイレクトするでしょう。
ログの集約をオンにしていない場合は、ログは各マシーン上のYARN_APP_LOGS_DIR
の下にローカルに保持されます。これは通常/tmp/logs
あるいは、Hadoopのバージョンおよびインストレーションに依存する $HADOOP_HOME/logs/userlogs
に設定されます。コンテナンのログを見るにはそれらを含むホストに行き、このディレクトリを調べる必要があります。サブディレクトリはアプリケーションIDとコンテナIDでログファイルを整理します。このログはSpark Web UIのexecutorタブでも利用可能で、MapReduce履歴サーバの実行を必要としません。
コンテナごとの起動環境をレビューするには、yarn.nodemanager.delete.debug-delay-sec
を大きな値(例えば、36000
)に増やし、コンテナが起動されたノード上のyarn.nodemanager.local-dirs
を使ってアプリケーションのキャッシュにアクセスします。このディレクトリは起動スクリプト、JAR、各コンテナを起動するために使われる全ての環境変数を含んでいます。このプロセスは特にクラスパスの問題をデバッグするのに役立ちます。(これを有効にするにはクラスタ設定の管理権限が必要で全てのノードマネージャーを再起動が必要になる事に注意してください。従ってこれは干すとされているクラスタでは適用できません)。
アプリケーションマスターあるいはexecutorのための独自log4j設定を使うには、以下の選択肢があります:
- アプリケーションと一緒にアップロードされる
--files
のファイルのリストにlog4j.propertiesを追加することで、spark-submit
を使って独自のlog4j.properties
をアップロードします。 -Dlog4j.configuration=<location of configuration file>
にspark.driver.extraJavaOptions
(ドライバーのため)、あるいはspark.executor.extraJavaOptions
(executorのため)を追加します。ファイルを使う場合は明示的にfile:
プロトコルが提供されなければならず、そのファイルは全てのノード上でローカルに存在しなければならない事に注意してください。$SPARK_CONF_DIR/log4j.properties
ファイルを更新すると、他の設定と一緒に自動的にアップロードされるでしょう。複数のオプションが指定された場合は、このオプションよりも他の2つのオプションが優先されることに注意してください。
最初の選択肢は、executorとアプリケーションマスターの両方が同じlog4j設定を共有するでしょう。これは同じノード上で動作する場合に問題を起こすかもしれません(例えば、同じログファイルに書き込もうとします)。
YARNがログを適切に表示および集約できるようにYARN内でログファイルを置く適切な場所への参照を必要とする場合は、log4j.propertiesのspark.yarn.app.container.log.dir
を使ってください。例えば、log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log
。ストリーミングアプリケーションについては、RollingFileAppender
の設置とファイルの場所をYARNのログディレクトリに設定することで、大きなログファイルによって起こるディスクオーバーフローを避けることができ、ログはYARNのログユーティリティを使ってアクセスすることができるでしょう。
Sparkのプロパティ
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.yarn.am.memory |
512m |
クライアントモードのYARNアプリケーションのために使われるメモリの量は、JVMメモリ設定と同じフォーマットです。(例えば、 512m , 2g )。クラスタモードでは、代わりにspark.driver.memory を使います。
小文字のサフィックスを使ってください。例えば k , m , g , t と p は、それぞれ kibi-, mebi-, gibi-, tebi- および pebibytes です。
|
spark.driver.cores |
1 |
YARNクラスタモードでドライバによって使われるコアの数。ドライバはクラスタモードのYARNアプリケーションマスターとして同じJVM内で実行されるため、これはYARNアプリケーションマスターによって使われるコアも制御します。クライアントモードでは、YARNアプリケーションマスターによって使用されるコアの数を制御するために、代わりにspark.yarn.am.cores を使用します。
|
spark.yarn.am.cores |
1 |
クライアントモードでYARNアプリケーションマスターのために使うコアの数。クラスタモードでは、代わりにspark.driver.cores を使います。
|
spark.yarn.am.waitTime |
100s |
cluster モードでは、YARNアプリケーションマスターがSparkContextを初期化するために待つ時間です。client モードでは、YARNアプリケーションマスターがドライバーが接続するまで待つ時間です。
|
spark.yarn.submit.file.replication |
デフォルトのHDFSレプリケーション(通常は3 )。 |
アプリケーションのためにHDFSにアップロードされたファイルのHDFSレプリケーションレベル。これらにはSpark jar, アプリjar, および全ての分散キャッシュファイル/圧縮ファイル が含まれます。 |
spark.yarn.preserve.staging.files |
false |
ステージ化されたファイル(Spark jar, app jar, 分散キャッシュファイル)を削除せずにジョブの最後で保持するためにはtrue に設定します。
|
spark.yarn.scheduler.heartbeat.interval-ms |
3000 |
SparkアプリケーションマスターがYARNリソースマネージャーの生存確認をするミリ秒単位の間隔。値はYARNの期限切れ間隔、つまりyarn.am.liveness-monitor.expiry-interval-ms 、の設定の半分の値で上限を設定されます。
|
spark.yarn.scheduler.initial-allocation.interval |
200ms |
延期されているコンテナ割り当てリクエストがある場合は、SparkアプリケーションマスターがしきりにYARN ResourceManagerにハートビートを送信する初期間隔。spark.yarn.scheduler.heartbeat.interval-ms より長くてはいけません。もし延期されているコンテナがまだある場合は、spark.yarn.scheduler.heartbeat.interval-ms に到達するまで割り当てられる連続するハートビートは2倍の間隔に割り当てられるでしょう。
|
spark.yarn.max.executor.failures |
最小は3で、numExecutors * 2。 | アプリケーションが失敗するまでのexecutorの失敗の最大の数。 |
spark.yarn.historyServer.address |
(none) |
Spark履歴サーバのアドレス、例えばhost.com:18080 。アドレスはスキーマ(http:// )を含んではいけません。履歴サーバは任意のサービスのため、デフォルトは設定されません。SparkアプリケーションがResourceManger UIからSpark履歴サーバUIにアプリケーションをリンクし終わった場合は、このアドレスはYARN ResourceManagerに渡されます。このプロパティに関しては、YARNプロパティは変数として使用することができ、これらは実行時にSparkによって代用されます。例えば、もしSpark履歴サーバがYARNリソースマネージャーとして同じノード上で実行している場合は、${hadoopconf-yarn.resourcemanager.hostname}:18080 に設定することができます。
|
spark.yarn.dist.archives |
(none) | 各executorの作業ディレクトリに解凍される圧縮ファイルのカンマ区切りのリスト。 |
spark.yarn.dist.files |
(none) | 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。 |
spark.executor.instances |
2 |
executorの数。このプロパティは spark.dynamicAllocation.enabled と互換性が無いことに注意してください。spark.dynamicAllocation.enabled とspark.executor.instances の両方が指定された場合、動的割り当ては無効にされ、spark.executor.instances の指定された数が使われます。
|
spark.yarn.executor.memoryOverhead |
最小384で、executorMemory * 0.10。 | executorごとに割り当てられるオフヒープメモリ(メガバイト)の量。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはexecutorのサイズと共に(一般的に 6-10%)大きくなりがちです。 |
spark.yarn.driver.memoryOverhead |
最小384で、driverMemory * 0.10 | クラスタモード時のexecutorごとに割り当てられるオフヒープメモリ(メガバイト)の量。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはコンテナのサイズと共に(一般的に 6-10%)大きくなりがちです。 |
spark.yarn.am.memoryOverhead |
最小384で、AM memory * 0.10 |
spark.yarn.driver.memoryOverhead と同じですが、クライアントモードのYARNアプリケーションマスターのためのものです。
|
spark.yarn.am.port |
(random) | YARNアプリケーションがlistenするためのポート。YARNクライアントモードでは、これはゲートウェイ上で実行しているSparkドライバとYARN上で実行しているYARNアプリケーションマスターの間の通信で使われます。YARNクラスタモードでは、これは動的executor機能に使われます。スケジューラバックエンドからのkill を処理します。 |
spark.yarn.queue |
デフォルト: |
アプリケーションがサブミットされるYARNキューの名前。 |
spark.yarn.jar |
(none) |
デフォルトの値を上書きすることが望ましい場合の、Spark jarファイルの場所。デフォルトでは、YARN上のSparkはローカルにインストールされたSpark.jarを使いますが、Spark jarはHDFS上のworld-readableな場所にもあります。これにより、YARNはアプリケーションが実行される度に分配されないようにキャッシュすることができます。HDFS上のjarを示すには、例えば、この設定をhdfs:///some/path にします。
|
spark.yarn.access.namenodes |
(none) |
Sparkアプリケーションがアクセスしようとする安全なHDFSネームノードのカンマ区切りのリスト。例えば、spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032 。Sparkアプリケーションはリストされているネームノードへのアクセスしなければならず、それらアクセスできるように適切に設定されていなければなりません(同じrealmあるいは信頼されたrealmのどちらか)。SparkアプリケーションがそれらのリモートのHDFSクラスタにアクセスできるように、Sparkは各ネームノードのためのセキュリティトークンを取得します。
|
spark.yarn.appMasterEnv.[EnvironmentVariableName] |
(none) |
EnvironmentVariableName によって指定される環境変数を、YARN上で起動されたアプリケーションマスタープロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。cluster モードでは、これはSPARKドライバの環境を制御し、client モードではexecutorのlauncherの環境のみを制御します。
|
spark.yarn.containerLauncherMaxThreads |
25 |
YARNアプリケーションマスタ内でexecutorコンテナを起動するために使用するスレッドの最大数。 |
spark.yarn.am.extraJavaOptions |
(none) |
クライアントモードでYARNアプリケーションマスターに渡す特別なJVMオプションのリスト。クラスタモードでは、代わりにspark.driver.extraJavaOptions を使用します。
|
spark.yarn.am.extraLibraryPath |
(none) | クライアントモードでYARNアプリケーションマスターを起動する場合に設定する特別なライブラリのパス。 |
spark.yarn.maxAppAttempts |
YARN内のyarn.resourcemanager.am.max-attempts |
アプリケーションをサブミットするために行われる試行の最大数。YARN設定内の最大試行数のグローバル数より大きくてはなりません。 |
spark.yarn.am.attemptFailuresValidityInterval |
(none) | AM障害追跡のための有効期間を定義します。AMが少なくとも定義された期間実行している場合は、AM障害回数はリセットされます。設定されていない場合この機能は有効ではありません。Hadoop 2.6+でのみサポートされます。 |
spark.yarn.submit.waitAppCompletion |
true |
YARNクラスタモードでは、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。true に設定した場合は、クライアントプロセスはアプリケーションの活動状態を報告し続けるでしょう。そうでなければ、クライアントプロセスはサブミットのあとで終了するでしょう。
|
spark.yarn.am.nodeLabelExpression |
(none) | ノードのAMのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。 |
spark.yarn.executor.nodeLabelExpression |
(none) | ノードのexecutorのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。 |
spark.yarn.tags |
(none) | Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps. |
spark.yarn.keytab |
(none) | 上で指定されたプリンシパルのためのキータブを含むファイルへのフルパス。ログインチケットと委任トークンを定期的に更新するために、このキータブはSecure Distributed Cacheを使ってYARNアプリケーションマスターを実行しているノードにコピーされるでしょう。("local"マスターとも協調して動作します) |
spark.yarn.principal |
(none) | secure HDFS上で動作する間にKDCにログインするために使われるプリンシパル。("local"マスターとも協調して動作します) |
spark.yarn.config.gatewayPath |
(none) |
(Sparkアプリケーションが開始された)ゲートウェイホスト上で有効なパスですが、クラスタ内の他のノードの同じリソースでは異なるパスかも知れません。Coupled with spark.yarn.config.replacementPath , this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes.
置換パスは通常なんらかのYARNによってexportされた環境変数へのリファレンスを含みます(従ってSparkコンテナから見えます)。
例えば、もしゲートウェイノードが /disk1/hadoop 上にインストールされたHadoopライブラリを持ち、Hadoopがインストールされた場所がYARNによってHADOOP_HOME 環境変数としてexportされた場所であれば、この値を /disk1/hadoop に設定して置換パスを$HADOOP_HOME に設定することは、リモートプロセスによって起動するために使われるパスが適切にローカルのYARN設定を参照するようにするでしょう。
|
spark.yarn.config.replacementPath |
(none) |
spark.yarn.config.gatewayPath を見てください。
|
spark.yarn.security.tokens.${service}.enabled |
true |
セキュリティが有効な場合に、非HDFSのための委譲トークンを扱うかどうかを制御します。By default, delegation tokens for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.
現在のところ、サポートされるサービスは以下のとおりです: hive , hbase
|
重要事項
- スケジュールの決定時にコアのリスクエストが尊重されるかどうかは、どのスケジューラが使われているか、およびどのように設定されているかに依存します。
cluster
モードではSpark executorおよびSpark ドライバによって使われるローカルディレクトリは、YARNのための設定のローカルディレクトリでしょう((Hadoop YARN configyarn.nodemanager.local-dirs
)。ユーザがspark.local.dir
を指定した場合は、無視されるでしょう。client
モードでは、Spark executorはYARNのために設定されたローカルディレクトリを使うでしょう。一方でSparkドライバはspark.local.dir
で定義されたそれらを使うでしょう。これはSparkドライバはclient
モードではYARNクラスタ上で実行されないからです。Spark executorのみがそうします。--files
および--archives
オプションはHadoopに似て # のファイル名の指定をサポートします。例えば、以下のように指定することができます:--files localtest.txt#appSees.txt
とこれは、ローカルでlocaltest.txt
と名前をつけたファイルがHDFSにアップロードするでしょうが、これはappSees.txt
という名前によってリンクされ、YARN上で実行する場合にアプリケーションはappSees.txt
として名前を使わなければなりません。--jars
オプションにより、ローカルファイルを使っていてluster
モードで実行している場合は、SparkContext.addJar
関数は動作することができます。 HDFS, HTTP, HTTPS あるいはFTPファイルを使っている場合は使用する必要はありません。