YARN上でSparkを実行する

YARN (Hadoop NextGen)上での実行はバージョン0.6.0でSparkに追加され、続くリリースで改良されました。

YARN上でSparkを起動

HADOOP_CONF_DIR あるいはYARN_CONF_DIR がHadoopクラスタのための(クライアントサイドの)設定ファイルを含むディレクトリを指すようにします。これらの設定はHDFSに書き込み、YARNリソースマネージャーに接続するために使用されます。このディレクトリに含まれる設定はYARNクラスタに分配され、アプリケーションに使用される全てのコンテナは同じ設定を使用します。設定がJavaシステムプロパティを参照あるいは環境変数がYARNによって管理されていない場合は、それらはSparkアプリケーション設定(ドライバー、executor、およびクライアントモードで実行している場合はAM)にも設定されていなければなりません。

SparkアプリケーションをYARNで起動するのに使われる2つのデプロイモードがあります。clusterモードでは、Sparkドライバはクラスタ上のYARNによって管理されるアプリケーションマスタープロセスの中で実行され、クライアントはアプリケーションの初期化の後で終了することができます。client モードでは、ドライバーはクライアントプロセスの中で実行され、アプリケーションマスターはYARNからのリソースの要求のためだけに使用されます。

マスターアドレスが--master パラメータで指定されるSparkスタンドアローンおよびMesosモードとは異なり、YARNモードではリソースマネージャーのアドレスがHadoop設定から取り出されます。そして、--master パラメータはyarnです。

Sparkアプリケーションをclusterモードで起動するには:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

例えば:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    lib/spark-examples*.jar \
    10

上記はデフォルトのアプリケーションマスターを開始するYARNクライアントプログラムを開始します。そして、SparkPiはアプリケーションマスターの子スレッドとして実行されるでしょう。クライアントはステータスの更新とそれらをコンソールに表示するために定期的にアプリケーションマスターをポーリングします。一旦アプリケーションの実行が終了するとクライアントは終了するでしょう。ドライバーおよびexecutorのログをどうやって見るかについては以下の"Debugging your Application"セクションを参照してください。

client モードのSparkアプリケーションを起動するには同じことをしますが、 clusterclientと置き換えてください。以下は、spark-shellclient モードでどうやって実行することができるかを示します:

$ ./bin/spark-shell --master yarn --deploy-mode client

他のJARを追加

cluster モードでは、ドライバーはクライアントではなく異なるマシーンで動作します。つまりSparkContext.addJarはローカルからクライアントへの追加設定のファイル無しには動作しないでしょう。SparkContext.addJarが利用可能なクライアントでファイルを作成するために、起動コマンドの中に--jarsオプションを使ってそれらを含めます。

$ ./bin/spark-submit --class my.main.Class \
    --master yarn \
    --deploy-mode cluster \
    --jars my-other-jar.jar,my-other-other-jar.jar \
    my-main-jar.jar \
    app_arg1 app_arg2

準備

YARN上のSparkの実行にはYARNサポートでビルドしたSparkのバイナリ配布物が必要です。バイナリ配布物はプロジェクトのwebサイトのダウンロードページ</a>からダウンロードすることができます。Sparkを自分自身でビルドするには、Sparkのビルドを参照してください。

YARN側からSparkランタイムのjarをアクセス可能にするために、spark.yarn.archive あるいは spark.yarn.jars を指定することができます。詳細はSpark Propertiesを参照してください。spark.yarn.archive あるいは spark.yarn.jars のどちらも指定されない場合は、Sparkは $SPARK_HOME/jars の下にある全てのjarを使ってzipファイルを生成し、それを分散キャッシュにアップロードするでしょう。

設定

他のデプロイモードについては、ほとんどの設定はYARN上のSPARKと同じです。これらの詳細な情報は設定ページを見てください。これらはYARN上のSPARKに固有です。

アプリケーションのデバッグ

YARNの用語では、executorおよびアプリケーションマスターは"コンテナ"の中で実行されます。アプリケーションが完了した後でコンテナのログを処理するためにYARNには2つのモードがあります。ログの集約が有効な場合(yarn.log-aggregation-enable 設定を使用)、コンテナログはHDFSにコピーされ、ローカルマシーンからは削除されます。これらのログはyan logコマンドを使ってクラスタのどこからでも見ることができます。

yarn logs -applicationId <app ID>

指定されたアプリケーションについて全てのコンテナからの全てのログファイルの内容を出力するでしょう。HDFSシェルあるいはAPIを使って直接HDFS内のコンテナログを見ることもできます。YARNの設定を調べることでそれらがあるディレクトリを見つけることができます(yarn.nodemanager.remote-app-log-dir および yarn.nodemanager.remote-app-log-dir-suffix)。このログはSpark Web UIのexecutorタブでも利用可能です。Spark履歴サーバとMapReduce履歴サーバの両方の動作が必要で、yarn-site.xml適切にyarn.log.server.url を設定する必要があります。Spark履歴サーバUI上のログURLは集約されたログを表示するためにMapReduce履歴サーバにリダイレクトするでしょう。

ログの集約をオンにしていない場合は、ログは各マシーン上のYARN_APP_LOGS_DIRの下にローカルに保持されます。これは通常/tmp/logs あるいは、Hadoopのバージョンおよびインストレーションに依存する $HADOOP_HOME/logs/userlogsに設定されます。コンテナンのログを見るにはそれらを含むホストに行き、このディレクトリを調べる必要があります。サブディレクトリはアプリケーションIDとコンテナIDでログファイルを整理します。このログはSpark Web UIのexecutorタブでも利用可能で、MapReduce履歴サーバの実行を必要としません。

コンテナごとの起動環境をレビューするには、yarn.nodemanager.delete.debug-delay-sec を大きな値(例えば、36000)に増やし、コンテナが起動されたノード上のyarn.nodemanager.local-dirsを使ってアプリケーションのキャッシュにアクセスします。このディレクトリは起動スクリプト、JAR、各コンテナを起動するために使われる全ての環境変数を含んでいます。このプロセスは特にクラスパスの問題をデバッグするのに役立ちます。(これを有効にするにはクラスタ設定の管理権限が必要で全てのノードマネージャーを再起動が必要になる事に注意してください。従ってこれはホストとされているクラスタでは適用できません)。

アプリケーションマスターあるいはexecutorのための独自log4j設定を使うには、以下の選択肢があります:

最初の選択肢は、executorとアプリケーションマスターの両方が同じlog4j設定を共有するでしょう。これは同じノード上で動作する場合に問題を起こすかもしれません(例えば、同じログファイルに書き込もうとします)。

YARNがログを適切に表示および集約できるようにYARN内でログファイルを置く適切な場所への参照を必要とする場合は、log4j.propertiesのspark.yarn.app.container.log.dirを使ってください。例えば、log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log。ストリーミングアプリケーションについては、RollingFileAppenderの設置とファイルの場所をYARNのログディレクトリに設定することで、大きなログファイルによって起こるディスクオーバーフローを避けることができ、ログはYARNのログユーティリティを使ってアクセスすることができるでしょう。

アプリケーションマスターとexecutorのために独自の metrics.properties を使うには、$SPARK_CONF_DIR/metrics.properties ファイルを更新します。他の設定と一緒に自動的にアップロードされるでしょう。つまり、手動で --filesを使って指定する必要はありません。

Sparkのプロパティ

プロパティ名デフォルト意味
spark.yarn.am.memory 512m クライアントモードのYARNアプリケーションのために使われるメモリの量は、JVMメモリ設定と同じフォーマットです。(例えば、 512m, 2g)。クラスタモードでは、代わりにspark.driver.memory を使います。

小文字のサフィックスを使ってください。例えば k, m, g, tpは、それぞれ kibi-, mebi-, gibi-, tebi- および pebibytes です。

spark.driver.memory 1g ドライバープロセスのために使われるメモリの量。例えば SparkContextが初期化される場所。(例えば、1g, 2g).
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
spark.driver.cores 1 YARNクラスタモードでドライバによって使われるコアの数。ドライバはクラスタモードのYARNアプリケーションマスターとして同じJVM内で実行されるため、これはYARNアプリケーションマスターによって使われるコアも制御します。クライアントモードでは、YARNアプリケーションマスターによって使用されるコアの数を制御するために、代わりにspark.yarn.am.coresを使用します。
spark.yarn.am.cores 1 クライアントモードでYARNアプリケーションマスターのために使うコアの数。クラスタモードでは、代わりにspark.driver.cores を使います。
spark.yarn.am.waitTime 100s clusterモードでは、YARNアプリケーションマスターがSparkContextを初期化するために待つ時間です。clientモードでは、YARNアプリケーションマスターがドライバーが接続するまで待つ時間です。
spark.yarn.submit.file.replication デフォルトのHDFSレプリケーション(通常は3)。 アプリケーションのためにHDFSにアップロードされたファイルのHDFSレプリケーションレベル。これらにはSpark jar, アプリjar, および全ての分散キャッシュファイル/圧縮ファイル が含まれます。
spark.yarn.stagingDir ファイルシステム内の現在のユーザのホームディレクトリ アプリケーションのサブミット中に使われるステージングディレクトリ。
spark.yarn.preserve.staging.files false ステージ化されたファイル(Spark jar, app jar, 分散キャッシュファイル)を削除せずにジョブの最後で保持するためにはtrueに設定します。
spark.yarn.scheduler.heartbeat.interval-ms 3000 SparkアプリケーションマスターがYARNリソースマネージャーの生存確認をするミリ秒単位の間隔。値はYARNの期限切れ間隔、つまりyarn.am.liveness-monitor.expiry-interval-ms、の設定の半分の値で上限を設定されます。
spark.yarn.scheduler.initial-allocation.interval 200ms 延期されているコンテナ割り当てリクエストがある場合は、SparkアプリケーションマスターがしきりにYARN ResourceManagerにハートビートを送信する初期間隔。spark.yarn.scheduler.heartbeat.interval-msより長くてはいけません。もし延期されているコンテナがまだある場合は、spark.yarn.scheduler.heartbeat.interval-ms に到達するまで割り当てられる連続するハートビートは2倍の間隔に割り当てられるでしょう。
spark.yarn.max.executor.failures 最小は3で、numExecutors * 2。 アプリケーションが失敗するまでのexecutorの失敗の最大の数。
spark.yarn.historyServer.address (none) Spark履歴サーバのアドレス、例えばhost.com:18080。アドレスはスキーマ(http://)を含んではいけません。履歴サーバは任意のサービスのため、デフォルトは設定されません。SparkアプリケーションがResourceManger UIからSpark履歴サーバUIにアプリケーションをリンクし終わった場合は、このアドレスはYARN ResourceManagerに渡されます。このプロパティに関しては、YARNプロパティは変数として使用することができ、これらは実行時にSparkによって代用されます。例えば、もしSpark履歴サーバがYARNリソースマネージャーとして同じノード上で実行している場合は、${hadoopconf-yarn.resourcemanager.hostname}:18080 に設定することができます。
spark.yarn.dist.archives (none) 各executorの作業ディレクトリに解凍される圧縮ファイルのカンマ区切りのリスト。
spark.yarn.dist.files (none) 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。
spark.yarn.dist.jars (none) 各executorの作業ディレクトリに配置されるカンマ区切りのjarのリスト。
spark.executor.cores YARNモードの場合は1、スタンドアローンモードの場合はワーカー上の全ての利用可能なコア。 各executor上で使用されるコアの数。YARNおよびスタンドアローンモードのみ。
spark.executor.instances 2 静的な割り当てのためのexecutorの数。spark.dynamicAllocation.enabledを使って、executorの初期セットは少なくともこの多さになるでしょう。
spark.executor.memory 1g executorプロセスあたりに使用するメモリ量(例えば2g, 8g)。
spark.yarn.executor.memoryOverhead 最小384で、executorMemory * 0.10。 executorごとに割り当てられるオフヒープメモリ(メガバイト)の量。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはexecutorのサイズと共に(一般的に 6-10%)大きくなりがちです。
spark.yarn.driver.memoryOverhead 最小384で、driverMemory * 0.10 クラスタモード時のexecutorごとに割り当てられるオフヒープメモリ(メガバイト)の量。これはVMのオーバーヘッド、中間文字列、他のネイティブオーバーヘッドのように見なされるメモリです。これはコンテナのサイズと共に(一般的に 6-10%)大きくなりがちです。
spark.yarn.am.memoryOverhead 最小384で、AM memory * 0.10 spark.yarn.driver.memoryOverheadと同じですが、クライアントモードのYARNアプリケーションマスターのためのものです。
spark.yarn.am.port (random) YARNアプリケーションがlistenするためのポート。YARNクライアントモードでは、これはゲートウェイ上で実行しているSparkドライバとYARN上で実行しているYARNアプリケーションマスターの間の通信で使われます。YARNクラスタモードでは、これは動的executor機能に使われます。スケジューラバックエンドからのkill を処理します。
spark.yarn.queue デフォルト: アプリケーションがサブミットされるYARNキューの名前。
spark.yarn.jars (none) YARNコンテナに配布するSparkコードを含むライブラリのリスト。デフォルトでは、YARN上のSparkはローカルにインストールされたSpark jarを使いますが、Spark jarはHDFS上のworld-readableな場所にもあります。これにより、YARNはアプリケーションが実行される度に分配されないようにキャッシュすることができます。HDFS上のjarを示すには、例えば、この設定をhdfs:///some/pathにします。globも可能です。
spark.yarn.archive (none) YARNキャッシュに配布するために必要とされるSpark jarを含む書庫。設定された場合は、この設定はspark.yarn.jars を置き換え、全てのアプリケーションのコンテナの中で書庫が使われます。書庫はそのルートディレクトリにjarファイルを含まなければなりません。前のオプションを使う場合と同じように、ファイルの分散を高速化するため書庫はHDFS上にホストすることもできます。
spark.yarn.access.namenodes (none) Sparkアプリケーションがアクセスしようとする安全なHDFSネームノードのカンマ区切りのリスト。例えば、spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032, webhdfs://nn3.com:50070。Sparkアプリケーションはリストされているネームノードへのアクセスしなければならず、それらアクセスできるように適切に設定されていなければなりません(同じrealmあるいは信頼されたrealmのどちらか)。SparkアプリケーションがそれらのリモートのHDFSクラスタにアクセスできるように、Sparkは各ネームノードのためのセキュリティトークンを取得します。
spark.yarn.appMasterEnv.[EnvironmentVariableName] (none) EnvironmentVariableNameによって指定される環境変数を、YARN上で起動されたアプリケーションマスタープロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。clusterモードでは、これはSPARKドライバの環境を制御し、clientモードではexecutorのlauncherの環境のみを制御します。
spark.yarn.containerLauncherMaxThreads 25 YARNアプリケーションマスタ内でexecutorコンテナを起動するために使用するスレッドの最大数。
spark.yarn.am.extraJavaOptions (none) クライアントモードでYARNアプリケーションマスターに渡す特別なJVMオプションのリスト。クラスタモードでは、代わりにspark.driver.extraJavaOptionsを使用します。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は spark.yarn.am.memoryを使って設定することができます。
spark.yarn.am.extraLibraryPath (none) クライアントモードでYARNアプリケーションマスターを起動する場合に設定する特別なライブラリのパス。
spark.yarn.maxAppAttempts YARN内のyarn.resourcemanager.am.max-attempts アプリケーションをサブミットするために行われる試行の最大数。YARN設定内の最大試行数のグローバル数より大きくてはなりません。
spark.yarn.am.attemptFailuresValidityInterval (none) AM障害追跡のための有効期間を定義します。AMが少なくとも定義された期間実行している場合は、AM障害回数はリセットされます。設定されていない場合この機能は有効ではありません。Hadoop 2.6+でのみサポートされます。
spark.yarn.executor.failuresValidityInterval (none) executorの障害追跡のための有効期間を定義します。有効期間より古いexecutorの障害は無視されるでしょう。
spark.yarn.submit.waitAppCompletion true YARNクラスタモードでは、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。trueに設定した場合は、クライアントプロセスはアプリケーションの活動状態を報告し続けるでしょう。そうでなければ、クライアントプロセスはサブミットのあとで終了するでしょう。
spark.yarn.am.nodeLabelExpression (none) ノードのAMのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。
spark.yarn.executor.nodeLabelExpression (none) ノードのexecutorのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。
spark.yarn.tags (none) YARNアプリケーションレポートの中に現れるYARNアプリケーションのタグとして渡される文字列のカンマ区切りのリスト。これはYARNアプリケーションを問い合わせる時にフィルターとして使うことができます。
spark.yarn.keytab (none) 上で指定されたプリンシパルのためのキータブを含むファイルへのフルパス。ログインチケットと委任トークンを定期的に更新するために、このキータブはSecure Distributed Cacheを使ってYARNアプリケーションマスターを実行しているノードにコピーされるでしょう。("local"マスターとも協調して動作します)
spark.yarn.principal (none) secure HDFS上で動作する間にKDCにログインするために使われるプリンシパル。("local"マスターとも協調して動作します)
spark.yarn.config.gatewayPath (none) (Sparkアプリケーションが開始された)ゲートウェイホスト上で有効なパスですが、クラスタ内の他のノードの同じリソースでは異なるパスかも知れません。spark.yarn.config.replacementPathと合わせて、Sparkがリモートプロセスを正しく起動できるように、異質な設定を持つクラスタをサポートするために使われます。

置換パスは通常なんらかのYARNによってexportされた環境変数へのリファレンスを含みます(従ってSparkコンテナから見えます)。

例えば、もしゲートウェイノードが /disk1/hadoop上にインストールされたHadoopライブラリを持ち、Hadoopがインストールされた場所がYARNによってHADOOP_HOME環境変数としてexportされた場所であれば、この値を /disk1/hadoop に設定して置換パスを$HADOOP_HOMEに設定することは、リモートプロセスによって起動するために使われるパスが適切にローカルのYARN設定を参照するようにするでしょう。

spark.yarn.config.replacementPath (none) spark.yarn.config.gatewayPathを見てください。
spark.yarn.security.tokens.${service}.enabled true セキュリティが有効な場合に、非HDFSのための委譲トークンを扱うかどうかを制御します。デフォルトではサービスが設定されている場合に全てのサポートされるサービスのための移譲トークンが取り出されるようにしますが、アプリケーションが実行されるのに何かしらの衝突がある場合はその挙動を無効にすることができます。

現在のところ、サポートされるサービスは以下のとおりです: hive, hbase

重要事項

Secureなクラスタでの実行

securityでカバーされるように、Kerberosはサービスとクライアントに関連したprincipalを認証するためにsecureなHadoopクラスタで使われます。これにより、クライアントはそれらの認証サービスにリクエストを送ることができます; 認証されたprincipalへの権限の認可をするサービス。

Hadoop サービスはサービスとデータへのアクセスを認可するhadoop トークン を発行します。YARNクラスタ内で起動される時にクライアントはアプリケーションと一緒にアクセスおよびパスするだろうサービスのためのトークンを最初に取得する必要があります。

SparkアプリケーションがHDFS, HBaseおよびHiveとやり取りをするために、ユーザが起動するアプリケーションのKerberos証明書を使って関連するトークンを取得しなければなりません - つまり、証明書のprincipalが起動されたSparkアプリケーションのそれになるでしょう。

これは通常起動時に行われます: secureなクラスタでは、Sparkは自動的にクラスタのHDFSファイルシステム、もしかするとHBaseおよびHive、のためのトークンを取得するでしょう。

もしHBaseがクラスパス上にある場合には、HBaseトークンが取得されるでしょう。HBase設定はアプリケーションがsecureであると宣言します。(つまり、 hbase-site.xmlhbase.security.authenticationkerberosに設定します)。そして spark.yarn.security.tokens.hbase.enabledfalseに設定されません。

同様に、もしHiveがクラスパス上にある場合には、Hiveトークンが取得されるでしょう。その設定は"hive.metastore.urisにあるメタストアのURIに含まれ、spark.yarn.security.tokens.hive.enabledfalseに設定されません。

アプリケーションが他のsecure HDFSクラスタとやり取りをする必要がある場合は、それらのクラスタにアクセスするために必要なトークンは起動時に明示的に要求されなければなりません。spark.yarn.access.namenodes プロパティの中でそれらをリスト化することで行われます。

spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/

Apache Oozieを使ってアプリケーションを起動

Apache Oozie はSparkアプリケーションをワークフローの一部として起動することができます。secureクラスタ内で、起動されたアプリケーションはクラスタのサービスにアクセスするために関係するトークンを必要とするでしょう。Sparkがkeytabを使って起動された場合は、これは自動的です。しかし、もしSparkがkeytab無しで起動された場合は、securityをセットアップする責任はOozieに移譲されなければなりません。

secureなクラスタのためにOozieを設定およびジョブのための証明書を取得する詳細は、特定のリリースドキュメントの"認証"の章のOozie web サイトで見つけることができます。

Sparkのアプリケーションのために、OozieワークフローはOozieがアプリケーションが必要とする以下を含む全てのトークンを要求するようにセットアップしなければなりません:

SparkがHive, HBaseおよびリモートのHDFSトークンを取得しようと試行 - そして失敗 - するのを避けるために、Sparkの設定はサービスのためのトークンのコレクションを無効に設定しなければなりません。

Sparkの設定は以下の行を含む必要があります:

spark.yarn.security.tokens.hive.enabled false spark.yarn.security.tokens.hbase.enabled false

設定オプション spark.yarn.access.namenodes は未設定でなければなりません。

Kerberosのトラブルシューティング

Hadoop/Kerberos 問題は"困難"になる可能性があります。有用な方法のひとつはHADOOP_JAAS_DEBUG 環境変数を設定することでHadoop内のKerberos操作の特別なログを有効にすることです。

bash export HADOOP_JAAS_DEBUG=true

JDK クラスはシステムプロパティ sun.security.krb5.debugsun.security.spnego.debug=trueを設定することで、Kerberosおよび SPNEGO/REST 認証の特別なログを有効に設定することができます。

-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

これら全てのオプションはアプリケーションマスター内で有効にすることができます:

spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

最後に、org.apache.spark.deploy.yarn.Clientのログレベルが DEBUGに設定された場合、ログには全ての取得されたトークンのリストとそれらの失効の詳細が含まれるでしょう。

inserted by FC2 system