YARN上でSparkを実行する

YARN (Hadoop NextGen)上での実行はバージョン0.6.0でSparkに追加され、続くリリースで改良されました。

YARN上でSparkを起動

HADOOP_CONF_DIR あるいはYARN_CONF_DIR がHadoopクラスタのための(クライアントサイドの)設定ファイルを含むディレクトリを指すようにします。これらの設定はHDFSに書き込み、YARNリソースマネージャーに接続するために使用されます。このディレクトリに含まれる設定はYARNクラスタに分配され、アプリケーションに使用される全てのコンテナは同じ設定を使用します。設定がJavaシステムプロパティを参照あるいは環境変数がYARNによって管理されていない場合は、それらはSparkアプリケーション設定(ドライバー、executor、およびクライアントモードで実行している場合はAM)にも設定されていなければなりません。

SparkアプリケーションをYARNで起動するのに使われる2つのデプロイモードがあります。clusterモードでは、Sparkドライバはクラスタ上のYARNによって管理されるアプリケーションマスタープロセスの中で実行され、クライアントはアプリケーションの初期化の後で終了することができます。client モードでは、ドライバーはクライアントプロセスの中で実行され、アプリケーションマスターはYARNからのリソースの要求のためだけに使用されます。

マスターアドレスが--master パラメータで指定されるSparkスタンドアローンおよびMesosモードとは異なり、YARNモードではリソースマネージャーのアドレスがHadoop設定から取り出されます。そして、--master パラメータはyarnです。

Sparkアプリケーションをclusterモードで起動するには:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

例えば:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    lib/spark-examples*.jar \
    10

上記はデフォルトのアプリケーションマスターを開始するYARNクライアントプログラムを開始します。そして、SparkPiはアプリケーションマスターの子スレッドとして実行されるでしょう。クライアントはステータスの更新とそれらをコンソールに表示するために定期的にアプリケーションマスターをポーリングします。一旦アプリケーションの実行が終了するとクライアントは終了するでしょう。ドライバーおよびexecutorのログをどうやって見るかについては以下の"Debugging your Application"セクションを参照してください。

client モードのSparkアプリケーションを起動するには同じことをしますが、 clusterclientと置き換えてください。以下は、spark-shellclient モードでどうやって実行することができるかを示します:

$ ./bin/spark-shell --master yarn --deploy-mode client

他のJARを追加

cluster モードでは、ドライバーはクライアントではなく異なるマシーンで動作します。つまりSparkContext.addJarはローカルからクライアントへの追加設定のファイル無しには動作しないでしょう。SparkContext.addJarが利用可能なクライアントでファイルを作成するために、起動コマンドの中に--jarsオプションを使ってそれらを含めます。

$ ./bin/spark-submit --class my.main.Class \
    --master yarn \
    --deploy-mode cluster \
    --jars my-other-jar.jar,my-other-other-jar.jar
    my-main-jar.jar
    app_arg1 app_arg2

準備

YARN上のSparkの実行にはYARNサポートでビルドしたSparkのバイナリ配布物が必要です。バイナリ配布物はプロジェクトのwebサイトのダウンロードページ</a>からダウンロードすることができます。Sparkを自分自身でビルドするには、Sparkのビルドを参照してください。

設定

他のデプロイモードについては、ほとんどの設定はYARN上のSPARKと同じです。これらの詳細な情報は設定ページを見てください。これらはYARN上のSPARKに固有です。

アプリケーションのデバッグ

YARNの用語では、executorおよびアプリケーションマスターは"コンテナ"の中で実行されます。アプリケーションが完了した後でコンテナのログを処理するためにYARNには2つのモードがあります。ログの集約が有効な場合(yarn.log-aggregation-enable 設定を使用)、コンテナログはHDFSにコピーされ、ローカルマシーンからは削除されます。これらのログはyan logコマンドを使ってクラスタのどこからでも見ることができます。

yarn logs -applicationId <app ID>

指定されたアプリケーションについて全てのコンテナからの全てのログファイルの内容を出力するでしょう。HDFSシェルあるいはAPIを使って直接HDFS内のコンテナログを見ることもできます。YARNの設定を調べることでそれらがあるディレクトリを見つけることができます(yarn.nodemanager.remote-app-log-dir および yarn.nodemanager.remote-app-log-dir-suffix)。このログはSpark Web UIのexecutorタブでも利用可能です。Spark履歴サーバとMapReduce履歴サーバの両方の動作が必要で、yarn-site.xml適切にyarn.log.server.url を設定する必要があります。Spark履歴サーバUI上のログURLは集約されたログを表示するためにMapReduce履歴サーバにリダイレクトするでしょう。

ログの集約をオンにしていない場合は、ログは各マシーン上のYARN_APP_LOGS_DIRの下にローカルに保持されます。これは通常/tmp/logs あるいは、Hadoopのバージョンおよびインストレーションに依存する $HADOOP_HOME/logs/userlogsに設定されます。コンテナンのログを見るにはそれらを含むホストに行き、このディレクトリを調べる必要があります。サブディレクトリはアプリケーションIDとコンテナIDでログファイルを整理します。このログはSpark Web UIのexecutorタブでも利用可能で、MapReduce履歴サーバの実行を必要としません。

コンテナごとの起動環境をレビューするには、yarn.nodemanager.delete.debug-delay-sec を大きな値(例えば、36000)に増やし、コンテナが起動されたノード上のyarn.nodemanager.local-dirsを使ってアプリケーションのキャッシュにアクセスします。このディレクトリは起動スクリプト、JAR、各コンテナを起動するために使われる全ての環境変数を含んでいます。このプロセスは特にクラスパスの問題をデバッグするのに役立ちます。(これを有効にするにはクラスタ設定の管理権限が必要で全てのノードマネージャーを再起動が必要になる事に注意してください。従ってこれは干すとされているクラスタでは適用できません)。

アプリケーションマスターあるいはexecutorのための独自log4j設定を使うには、以下の選択肢があります:

最初の選択肢は、executorとアプリケーションマスターの両方が同じlog4j設定を共有するでしょう。これは同じノード上で動作する場合に問題を起こすかもしれません(例えば、同じログファイルに書き込もうとします)。

YARNがログを適切に表示および集約できるようにYARN内でログファイルを置く適切な場所への参照を必要とする場合は、log4j.propertiesのspark.yarn.app.container.log.dirを使ってください。例えば、log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log。ストリーミングアプリケーションについては、RollingFileAppenderの設置とファイルの場所をYARNのログディレクトリに設定することで、大きなログファイルによって起こるディスクオーバーフローを避けることができ、ログはYARNのログユーティリティを使ってアクセスすることができるでしょう。

Sparkのプロパティ

プロパティ名デフォルト意味
spark.yarn.am.memory 512m クライアントモードのYARNアプリケーションのために使われるメモリの量は、JVMメモリ設定と同じフォーマットです。(例えば、 512m, 2g)。クラスタモードでは、代わりにspark.driver.memory を使います。

小文字のサフィックスを使ってください。例えば k, m, g, tpは、それぞれ kibi-, mebi-, gibi-, tebi- および pebibytes です。

spark.driver.cores 1 YARNクラスタモードでドライバによって使われるコアの数。ドライバはクラスタモードのYARNアプリケーションマスターとして同じJVM内で実行されるため、これはYARNアプリケーションマスターによって使われるコアも制御します。クライアントモードでは、YARNアプリケーションマスターによって使用されるコアの数を制御するために、代わりにspark.yarn.am.coresを使用します。
spark.yarn.am.cores 1 クライアントモードでYARNアプリケーションマスターのために使うコアの数。クラスタモードでは、代わりにspark.driver.cores を使います。
spark.yarn.am.waitTime 100s clusterモードでは、YARNアプリケーションマスターがSparkContextを初期化するために待つ時間です。clientモードでは、YARNアプリケーションマスターがドライバーが接続するまで待つ時間です。
spark.yarn.submit.file.replication デフォルトのHDFSレプリケーション(通常は3)。 アプリケーションのためにHDFSにアップロードされたファイルのHDFSレプリケーションレベル。これらにはSpark jar, アプリjar, および全ての分散キャッシュファイル/圧縮ファイル が含まれます。
spark.yarn.preserve.staging.files false ステージ化されたファイル(Spark jar, app jar, 分散キャッシュファイル)を削除せずにジョブの最後で保持するためにはtrueに設定します。
spark.yarn.scheduler.heartbeat.interval-ms 3000 SparkアプリケーションマスターがYARNリソースマネージャーの生存確認をするミリ秒単位の間隔。値はYARNの期限切れ間隔、つまりyarn.am.liveness-monitor.expiry-interval-ms、の設定の半分の値で上限を設定されます。
spark.yarn.scheduler.initial-allocation.interval 200ms 延期されているコンテナ割り当てリクエストがある場合は、SparkアプリケーションマスターがしきりにYARN ResourceManagerにハートビートを送信する初期間隔。spark.yarn.scheduler.heartbeat.interval-msより長くてはいけません。もし延期されているコンテナがまだある場合は、spark.yarn.scheduler.heartbeat.interval-ms に到達するまで割り当てられる連続するハートビートは2倍の間隔に割り当てられるでしょう。
spark.yarn.max.executor.failures 最小は3で、numExecutors * 2。 アプリケーションが失敗するまでのexecutorの失敗の最大の数。
spark.yarn.historyServer.address (none) Spark履歴サーバのアドレス、例えばhost.com:18080。アドレスはスキーマ(http://)を含んではいけません。履歴サーバは任意のサービスのため、デフォルトは設定されません。SparkアプリケーションがResourceManger UIからSpark履歴サーバUIにアプリケーションをリンクし終わった場合は、このアドレスはYARN ResourceManagerに渡されます。このプロパティに関しては、YARNプロパティは変数として使用することができ、これらは実行時にSparkによって代用されます。例えば、もしSpark履歴サーバがYARNリソースマネージャーとして同じノード上で実行している場合は、${hadoopconf-yarn.resourcemanager.hostname}:18080 に設定することができます。
spark.yarn.dist.archives (none) 各executorの作業ディレクトリに解凍される圧縮ファイルのカンマ区切りのリスト。
spark.yarn.dist.files (none) 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。
spark.executor.instances 2 executorの数。このプロパティは spark.dynamicAllocation.enabledと互換性が無いことに注意してください。spark.dynamicAllocation.enabledspark.executor.instances の両方が指定された場合、動的割り当ては無効にされ、spark.executor.instances の指定された数が使われます。
spark.yarn.executor.memoryOverhead 最小384で、executorMemory * 0.10。 executorごとに割り当てられるオフヒープメモリ(メガバイト)の量。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはexecutorのサイズと共に(一般的に 6-10%)大きくなりがちです。
spark.yarn.driver.memoryOverhead 最小384で、driverMemory * 0.10 クラスタモード時のexecutorごとに割り当てられるオフヒープメモリ(メガバイト)の量。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはコンテナのサイズと共に(一般的に 6-10%)大きくなりがちです。
spark.yarn.am.memoryOverhead 最小384で、AM memory * 0.10 spark.yarn.driver.memoryOverheadと同じですが、クライアントモードのYARNアプリケーションマスターのためのものです。
spark.yarn.am.port (random) YARNアプリケーションがlistenするためのポート。YARNクライアントモードでは、これはゲートウェイ上で実行しているSparkドライバとYARN上で実行しているYARNアプリケーションマスターの間の通信で使われます。YARNクラスタモードでは、これは動的executor機能に使われます。スケジューラバックエンドからのkill を処理します。
spark.yarn.queue デフォルト: アプリケーションがサブミットされるYARNキューの名前。
spark.yarn.jar (none) デフォルトの値を上書きすることが望ましい場合の、Spark jarファイルの場所。デフォルトでは、YARN上のSparkはローカルにインストールされたSpark.jarを使いますが、Spark jarはHDFS上のworld-readableな場所にもあります。これにより、YARNはアプリケーションが実行される度に分配されないようにキャッシュすることができます。HDFS上のjarを示すには、例えば、この設定をhdfs:///some/pathにします。
spark.yarn.access.namenodes (none) Sparkアプリケーションがアクセスしようとする安全なHDFSネームノードのカンマ区切りのリスト。例えば、spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032。Sparkアプリケーションはリストされているネームノードへのアクセスしなければならず、それらアクセスできるように適切に設定されていなければなりません(同じrealmあるいは信頼されたrealmのどちらか)。SparkアプリケーションがそれらのリモートのHDFSクラスタにアクセスできるように、Sparkは各ネームノードのためのセキュリティトークンを取得します。
spark.yarn.appMasterEnv.[EnvironmentVariableName] (none) EnvironmentVariableNameによって指定される環境変数を、YARN上で起動されたアプリケーションマスタープロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。clusterモードでは、これはSPARKドライバの環境を制御し、clientモードではexecutorのlauncherの環境のみを制御します。
spark.yarn.containerLauncherMaxThreads 25 YARNアプリケーションマスタ内でexecutorコンテナを起動するために使用するスレッドの最大数。
spark.yarn.am.extraJavaOptions (none) クライアントモードでYARNアプリケーションマスターに渡す特別なJVMオプションのリスト。クラスタモードでは、代わりにspark.driver.extraJavaOptionsを使用します。
spark.yarn.am.extraLibraryPath (none) クライアントモードでYARNアプリケーションマスターを起動する場合に設定する特別なライブラリのパス。
spark.yarn.maxAppAttempts YARN内のyarn.resourcemanager.am.max-attempts アプリケーションをサブミットするために行われる試行の最大数。YARN設定内の最大試行数のグローバル数より大きくてはなりません。
spark.yarn.am.attemptFailuresValidityInterval (none) AM障害追跡のための有効期間を定義します。AMが少なくとも定義された期間実行している場合は、AM障害回数はリセットされます。設定されていない場合この機能は有効ではありません。Hadoop 2.6+でのみサポートされます。
spark.yarn.submit.waitAppCompletion true YARNクラスタモードでは、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。trueに設定した場合は、クライアントプロセスはアプリケーションの活動状態を報告し続けるでしょう。そうでなければ、クライアントプロセスはサブミットのあとで終了するでしょう。
spark.yarn.am.nodeLabelExpression (none) ノードのAMのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。
spark.yarn.executor.nodeLabelExpression (none) ノードのexecutorのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。
spark.yarn.tags (none) Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
spark.yarn.keytab (none) 上で指定されたプリンシパルのためのキータブを含むファイルへのフルパス。ログインチケットと委任トークンを定期的に更新するために、このキータブはSecure Distributed Cacheを使ってYARNアプリケーションマスターを実行しているノードにコピーされるでしょう。("local"マスターとも協調して動作します)
spark.yarn.principal (none) secure HDFS上で動作する間にKDCにログインするために使われるプリンシパル。("local"マスターとも協調して動作します)
spark.yarn.config.gatewayPath (none) (Sparkアプリケーションが開始された)ゲートウェイホスト上で有効なパスですが、クラスタ内の他のノードの同じリソースでは異なるパスかも知れません。Coupled with spark.yarn.config.replacementPath, this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes.

置換パスは通常なんらかのYARNによってexportされた環境変数へのリファレンスを含みます(従ってSparkコンテナから見えます)。

例えば、もしゲートウェイノードが /disk1/hadoop上にインストールされたHadoopライブラリを持ち、Hadoopがインストールされた場所がYARNによってHADOOP_HOME環境変数としてexportされた場所であれば、この値を /disk1/hadoop に設定して置換パスを$HADOOP_HOMEに設定することは、リモートプロセスによって起動するために使われるパスが適切にローカルのYARN設定を参照するようにするでしょう。

spark.yarn.config.replacementPath (none) spark.yarn.config.gatewayPathを見てください。
spark.yarn.security.tokens.${service}.enabled true セキュリティが有効な場合に、非HDFSのための委譲トークンを扱うかどうかを制御します。By default, delegation tokens for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.

現在のところ、サポートされるサービスは以下のとおりです: hive, hbase

重要事項

TOP
inserted by FC2 system