YARN上でSparkを実行する

YARN (Hadoop NextGen)上での実行はバージョン0.6.0でSparkに追加され、続くリリースで改良されました。

セキュリティ

認証のようなセキュリティ機能はデフォルトでは有効にされていません。インターネットや信頼できないネットワークに開かれたクラスタを配備する場合、認証されていないアプリケーションをクラスタ上で実行することを防ぐためにクラスタへのアクセスを安全にすることが重要です。Sparkを実行する前にSpark セキュリティとこのドキュメントの特定のセキュリティの章を見てください。

YARN上でSparkを起動

HADOOP_CONF_DIR あるいはYARN_CONF_DIR がHadoopクラスタのための(クライアントサイドの)設定ファイルを含むディレクトリを指すようにします。これらの設定はHDFSに書き込み、YARNリソースマネージャーに接続するために使用されます。このディレクトリに含まれる設定はYARNクラスタに分配され、アプリケーションに使用される全てのコンテナは同じ設定を使用します。設定がJavaシステムプロパティを参照あるいは環境変数がYARNによって管理されていない場合は、それらはSparkアプリケーション設定(ドライバー、executor、およびクライアントモードで実行している場合はAM)にも設定されていなければなりません。

SparkアプリケーションをYARNで起動するのに使われる2つのデプロイモードがあります。clusterモードでは、Sparkドライバはクラスタ上のYARNによって管理されるアプリケーションマスタープロセスの中で実行され、クライアントはアプリケーションの初期化の後で終了することができます。client モードでは、ドライバーはクライアントプロセスの中で実行され、アプリケーションマスターはYARNからのリソースの要求のためだけに使用されます。

マスターのアドレスが --master パラメータで指定されるSparkによってサポートされる他のクラスタマネージャーと異なり、YARNモードではリソースマネージャーのアドレスがHadoop設定から取り出されます。そして、--master パラメータは yarnです。

Sparkアプリケーションをclusterモードで起動するには:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

例えば:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10

上記はデフォルトのアプリケーションマスターを開始するYARNクライアントプログラムを開始します。そして、SparkPiはアプリケーションマスターの子スレッドとして実行されるでしょう。クライアントはステータスの更新とそれらをコンソールに表示するために定期的にアプリケーションマスターをポーリングします。一旦アプリケーションの実行が終了するとクライアントは終了するでしょう。ドライバーおよびexecutorのログをどうやって見るかについては以下の Debugging your Applicationセクションを参照してください。

client モードのSparkアプリケーションを起動するには同じことをしますが、 clusterclientと置き換えてください。以下は、spark-shellclient モードでどうやって実行することができるかを示します:

$ ./bin/spark-shell --master yarn --deploy-mode client

他のJARを追加

cluster モードでは、ドライバーはクライアントではなく異なるマシーンで動作します。つまりSparkContext.addJarはローカルからクライアントへの追加設定のファイル無しには動作しないでしょう。SparkContext.addJarが利用可能なクライアントでファイルを作成するために、起動コマンドの中に--jarsオプションを使ってそれらを含めます。

$ ./bin/spark-submit --class my.main.Class \
    --master yarn \
    --deploy-mode cluster \
    --jars my-other-jar.jar,my-other-other-jar.jar \
    my-main-jar.jar \
    app_arg1 app_arg2

準備

YARN上のSparkの実行にはYARNサポートでビルドしたSparkのバイナリ配布物が必要です。バイナリ配布物はプロジェクトのwebサイトのダウンロードページ</a>からダウンロードすることができます。ダウンロードできるSparkのバイナリ配布には、2つの種類があります。1つは特定のバージョンのApache Hadoopで事前にビルドされたものです; このSparkの配布物はHadoopのランタイムが組み込まれているため、hadoop付き Spark 配布物と呼びます。もう1つは、ユーザ提供のHadoopで事前にビルドされたものです; このSpark配布物は組み込みのHadoopランタイムを含まないため、サイズは小さくなりますが、ユーザはHadoopインストールを別に提供する必要があります。この変種をhadoop無し Spark 配布物と呼びます。hadoop付き Spark 配布物に関しては、組み込みのHadoopランタイムがすでに含まれているため、デフォルトではジョブがHadoop Yarnクラスタにサブミットされる時にジョブの競合を防ぐために、SparkにはYarnのクラスパスが設定されないでしょう。この動作を上書きするために、spark.yarn.populateHadoopClasspath=trueを設定できます。hadoop無し Spark 配布物に関しては、Spark はHadoopランタイムを取得するために、デフォルトでYarnのクラスパスを設定します。hadoop付き Spark配布物の場合、アプリケーションがクラスタでのみ使用可能な特定のライブラリに依存している場合は、上記のプロパティを設定してYarnクラスパスを設定してみてください。そうすることでjarの競合の問題が発生する場合は、それをオフにしてこのライブラリをアプリケーションjarに含める必要があります。

Sparkを自分自身でビルドするには、Sparkのビルドを参照してください。

YARN側からSparkランタイムのjarをアクセス可能にするために、spark.yarn.archive あるいは spark.yarn.jars を指定することができます。詳細はSpark Propertiesを参照してください。spark.yarn.archive あるいは spark.yarn.jars のどちらも指定されない場合は、Sparkは $SPARK_HOME/jars の下にある全てのjarを使ってzipファイルを生成し、それを分散キャッシュにアップロードするでしょう。

設定

他のデプロイモードについては、ほとんどの設定はYARN上のSPARKと同じです。これらの詳細な情報は設定ページを見てください。これらはYARN上のSPARKに固有です。

アプリケーションのデバッグ

YARNの用語では、executorおよびアプリケーションマスターは"コンテナ"の中で実行されます。アプリケーションが完了した後でコンテナのログを処理するためにYARNには2つのモードがあります。ログの集約が有効な場合(yarn.log-aggregation-enable 設定を使用)、コンテナログはHDFSにコピーされ、ローカルマシーンからは削除されます。これらのログはyan logコマンドを使ってクラスタのどこからでも見ることができます。

yarn logs -applicationId <app ID>

指定されたアプリケーションについて全てのコンテナからの全てのログファイルの内容を出力するでしょう。HDFSシェルあるいはAPIを使って直接HDFS内のコンテナログを見ることもできます。YARNの設定を調べることでそれらがあるディレクトリを見つけることができます(yarn.nodemanager.remote-app-log-dir および yarn.nodemanager.remote-app-log-dir-suffix)。このログはSpark Web UIのexecutorタブでも利用可能です。Spark履歴サーバとMapReduce履歴サーバの両方の動作が必要で、yarn-site.xml適切にyarn.log.server.url を設定する必要があります。Spark履歴サーバUI上のログURLは集約されたログを表示するためにMapReduce履歴サーバにリダイレクトするでしょう。

ログの集約をオンにしていない場合は、ログは各マシーン上のYARN_APP_LOGS_DIRの下にローカルに保持されます。これは通常/tmp/logs あるいは、Hadoopのバージョンおよびインストレーションに依存する $HADOOP_HOME/logs/userlogsに設定されます。コンテナンのログを見るにはそれらを含むホストに行き、このディレクトリを調べる必要があります。サブディレクトリはアプリケーションIDとコンテナIDでログファイルを整理します。このログはSpark Web UIのexecutorタブでも利用可能で、MapReduce履歴サーバの実行を必要としません。

コンテナごとの起動環境をレビューするには、yarn.nodemanager.delete.debug-delay-sec を大きな値(例えば、36000)に増やし、コンテナが起動されたノード上のyarn.nodemanager.local-dirsを使ってアプリケーションのキャッシュにアクセスします。このディレクトリは起動スクリプト、JAR、各コンテナを起動するために使われる全ての環境変数を含んでいます。このプロセスは特にクラスパスの問題をデバッグするのに役立ちます。(これを有効にするにはクラスタ設定の管理権限が必要で全てのノードマネージャーを再起動が必要になる事に注意してください。従ってこれはホストとされているクラスタでは適用できません)。

アプリケーションマスターあるいはexecutorのための独自log4j設定を使うには、以下の選択肢があります:

最初の選択肢は、executorとアプリケーションマスターの両方が同じlog4j設定を共有するでしょう。これは同じノード上で動作する場合に問題を起こすかもしれません(例えば、同じログファイルに書き込もうとします)。

YARNがログを適切に表示および集約できるようにYARN内でログファイルを置く適切な場所への参照を必要とする場合は、log4j.propertiesのspark.yarn.app.container.log.dirを使ってください。例えば、log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log。ストリーミングアプリケーションについては、RollingFileAppenderの設置とファイルの場所をYARNのログディレクトリに設定することで、大きなログファイルによって起こるディスクオーバーフローを避けることができ、ログはYARNのログユーティリティを使ってアクセスすることができるでしょう。

アプリケーションマスターとexecutorのために独自の metrics.properties を使うには、$SPARK_CONF_DIR/metrics.properties ファイルを更新します。他の設定と一緒に自動的にアップロードされるでしょう。つまり、手動で --filesを使って指定する必要はありません。

Sparkのプロパティ

プロパティ名デフォルト意味これ以降のバージョンから
spark.yarn.am.memory 512m クライアントモードのYARNアプリケーションのために使われるメモリの量は、JVMメモリ設定と同じフォーマットです。(例えば、 512m, 2g)。クラスタモードでは、代わりにspark.driver.memory を使います。

小文字のサフィックスを使ってください。例えば k, m, g, tpは、それぞれ kibi-, mebi-, gibi-, tebi- および pebibytes です。

1.3.0
spark.yarn.am.resource.{resource-type}.amount (none) クライアントモードでYARNアプリケーションマスターのために使うリソースの量。クラスタモードでは、代わりにspark.yarn.driver.resource.<resource-type>.amount を使用します。この機能は、YARM 3.0+ でのみ使えることに注意してください。参考のために、YARN リソースモデルドキュメントを見てください: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

例: YARM から GPU リソースをリクエストするには、spark.yarn.am.resource.yarn.io/gpu.amount を使ってください

3.0.0
spark.yarn.applicationType SPARK アプリケーションの型をもっと明確に定義します。例えば、SPARK, SPARK-SQL, SPARK-STREAMING, SPARK-MLLIB, SPARK-GRAPH。20文字を超えないように注意してください。 3.1.0
spark.yarn.driver.resource.{resource-type}.amount (none) クラスタモードでYARNアプリケーションマスターのために使うリソースの量。この機能は、YARM 3.0+ でのみ使えることに注意してください。参考のために、YARN リソースモデルドキュメントを見てください: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

例: YARM から GPU リソースをリクエストするには、spark.yarn.driver.resource.yarn.io/gpu.amount を使ってください

3.0.0
spark.yarn.executor.resource.{resource-type}.amount (none) executor プロセスごとに使用するリソースの量。この機能は、YARM 3.0+ でのみ使えることに注意してください。参考のために、YARN リソースモデルドキュメントを見てください: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

例: YARM から GPU リソースをリクエストするには、spark.yarn.executor.resource.yarn.io/gpu.amount を使ってください

3.0.0
spark.yarn.resourceGpuDeviceName yarn.io/gpu Sparkリソース型のgpuのマッピングをGPUを表すYARNリソースに指定します。デフォルトではYARNはyarn.io/gpu を使いますが、YARNが独自のリソース型で構成されている場合、これにより再マッピングが可能になります。spark.{driver/executor}.resource.gpu.* 設定を使う場合に適用されます。 3.2.1
spark.yarn.resourceFpgaDeviceName yarn.io/fpga Sparkリソース型のfpgaのマッピングをFPGAを表すYARNリソースに指定します。デフォルトではYARNはyarn.io/fpga を使いますが、YARNが独自のリソース型で構成されている場合、これにより再マッピングが可能になります。spark.{driver/executor}.resource.fpga.* 設定を使う場合に適用されます。 3.2.1
spark.yarn.am.cores 1 クライアントモードでYARNアプリケーションマスターのために使うコアの数。クラスタモードでは、代わりにspark.driver.cores を使います。 1.3.0
spark.yarn.am.waitTime 100s cluster モードだけで使われます。YARNアプリケーション マスターがSparkContextが初期化されるのを待つ時間。 1.3.0
spark.yarn.submit.file.replication デフォルトのHDFSレプリケーション(通常は3)。 アプリケーションのためにHDFSにアップロードされたファイルのHDFSレプリケーションレベル。これらにはSpark jar, アプリjar, および全ての分散キャッシュファイル/圧縮ファイル が含まれます。 0.8.1
spark.yarn.stagingDir ファイルシステム内の現在のユーザのホームディレクトリ アプリケーションのサブミット中に使われるステージングディレクトリ。 2.0.0
spark.yarn.preserve.staging.files false ステージ化されたファイル(Spark jar, app jar, 分散キャッシュファイル)を削除せずにジョブの最後で保持するためにはtrueに設定します。 1.1.0
spark.yarn.scheduler.heartbeat.interval-ms 3000 SparkアプリケーションマスターがYARNリソースマネージャーの生存確認をするミリ秒単位の間隔。値はYARNの期限切れ間隔、つまりyarn.am.liveness-monitor.expiry-interval-ms、の設定の半分の値で上限を設定されます。 0.8.1
spark.yarn.scheduler.initial-allocation.interval 200ms 延期されているコンテナ割り当てリクエストがある場合は、SparkアプリケーションマスターがしきりにYARN ResourceManagerにハートビートを送信する初期間隔。spark.yarn.scheduler.heartbeat.interval-msより長くてはいけません。もし延期されているコンテナがまだある場合は、spark.yarn.scheduler.heartbeat.interval-ms に到達するまで割り当てられる連続するハートビートは2倍の間隔に割り当てられるでしょう。 1.4.0
spark.yarn.max.executor.failures 最小は3で、numExecutors * 2。 アプリケーションが失敗するまでのexecutorの失敗の最大の数。 1.0.0
spark.yarn.historyServer.address (none) Spark履歴サーバのアドレス、例えばhost.com:18080。アドレスはスキーマ(http://)を含んではいけません。履歴サーバは任意のサービスのため、デフォルトは設定されません。SparkアプリケーションがResourceManger UIからSpark履歴サーバUIにアプリケーションをリンクし終わった場合は、このアドレスはYARN ResourceManagerに渡されます。このプロパティに関しては、YARNプロパティは変数として使用することができ、これらは実行時にSparkによって代用されます。例えば、もしSpark履歴サーバがYARNリソースマネージャーとして同じノード上で実行している場合は、${hadoopconf-yarn.resourcemanager.hostname}:18080 に設定することができます。 1.0.0
spark.yarn.dist.archives (none) 各executorの作業ディレクトリに解凍される圧縮ファイルのカンマ区切りのリスト。 1.0.0
spark.yarn.dist.files (none) 各executorの作業ディレクトリに配置されるカンマ区切りのファイルのリスト。 1.0.0
spark.yarn.dist.jars (none) 各executorの作業ディレクトリに配置されるカンマ区切りのjarのリスト。 2.0.0
spark.yarn.dist.forceDownloadSchemes (none) YARNの分散キャッシュに追加される前にローカルディスクにダウンロードされるリソースのスキーマのカンマ区切りのリスト。http, https および ftpのような、YARNサービスがSparkによってサポートされるスキーマをサポートしない場合、あるいはローカルのYARNクライアントのクラスパス内にある必要があるjarに使います。ワイルドカード '*' は全てのスキーマについてリソースをダウンロードすることを示します。 2.3.0
spark.executor.instances 2 静的な割り当てのためのexecutorの数。spark.dynamicAllocation.enabledを使って、executorの初期セットは少なくともこの多さになるでしょう。 1.0.0
spark.yarn.am.memoryOverhead 最小384で、AM memory * 0.10 spark.driver.memoryOverheadと同じですが、クライアントモードのYARNアプリケーションマスターのためのものです。 1.3.0
spark.yarn.queue デフォルト: アプリケーションがサブミットされるYARNキューの名前。 1.0.0
spark.yarn.jars (none) YARNコンテナに配布するSparkコードを含むライブラリのリスト。デフォルトでは、YARN上のSparkはローカルにインストールされたSpark jarを使いますが、Spark jarはHDFS上のworld-readableな場所にもあります。これにより、YARNはアプリケーションが実行される度に分配されないようにキャッシュすることができます。HDFS上のjarを示すには、例えば、この設定をhdfs:///some/pathにします。globも可能です。 2.0.0
spark.yarn.archive (none) YARNキャッシュに配布するために必要とされるSpark jarを含む書庫。設定された場合は、この設定はspark.yarn.jars を置き換え、全てのアプリケーションのコンテナの中で書庫が使われます。書庫はそのルートディレクトリにjarファイルを含まなければなりません。前のオプションを使う場合と同じように、ファイルの分散を高速化するため書庫はHDFS上にホストすることもできます。 2.0.0
spark.yarn.appMasterEnv.[EnvironmentVariableName] (none) EnvironmentVariableNameによって指定される環境変数を、YARN上で起動されたアプリケーションマスタープロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。clusterモードでは、これはSPARKドライバの環境を制御し、clientモードではexecutorのlauncherの環境のみを制御します。 1.1.0
spark.yarn.containerLauncherMaxThreads 25 YARNアプリケーションマスタ内でexecutorコンテナを起動するために使用するスレッドの最大数。 1.2.0
spark.yarn.am.extraJavaOptions (none) クライアントモードでYARNアプリケーションマスターに渡す特別なJVMオプションのリスト。クラスタモードでは、代わりにspark.driver.extraJavaOptionsを使用します。このオプションを使って最大のヒープサイズ(-Xmx)を設定することは違反だということに注意してください。最大のヒープサイズの設定は spark.yarn.am.memoryを使って設定することができます。 1.3.0
spark.yarn.am.extraLibraryPath (none) クライアントモードでYARNアプリケーションマスターを起動する場合に設定する特別なライブラリのパス。 1.4.0
spark.yarn.populateHadoopClasspath with-hadoop Spark配布物の場合、これはfalseに設定されます; no-hadoop 配布物の場合、これはtrueに設定されます。 Hadoop クラスパスをyarn.application.classpathmapreduce.application.classpathから設定するかどうか。これがfalseに設定されている場合、Hadoopランタイムを同梱するwith-Hadoop Spark配布物を必要とするか、ユーザが別にHadoopインストールを提供する必要があります。 2.4.6
spark.yarn.maxAppAttempts YARN内のyarn.resourcemanager.am.max-attempts アプリケーションをサブミットするために行われる試行の最大数。YARN設定内の最大試行数のグローバル数より大きくてはなりません。 1.3.0
spark.yarn.am.attemptFailuresValidityInterval (none) AM障害追跡のための有効期間を定義します。AMが少なくとも定義された期間実行している場合は、AM障害回数はリセットされます。設定されていない場合はこの機能は有効ではありません。 1.6.0
spark.yarn.executor.failuresValidityInterval (none) executorの障害追跡のための有効期間を定義します。有効期間より古いexecutorの障害は無視されるでしょう。 2.0.0
spark.yarn.submit.waitAppCompletion true YARNクラスタモードでは、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。trueに設定した場合は、クライアントプロセスはアプリケーションの活動状態を報告し続けるでしょう。そうでなければ、クライアントプロセスはサブミットのあとで終了するでしょう。 1.4.0
spark.yarn.am.nodeLabelExpression (none) ノードのAMのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。 1.6.0
spark.yarn.executor.nodeLabelExpression (none) ノードのexecutorのセットを制限するYARN ノードラベル表現がスケジュールされるでしょう。2.6以上のバージョンのYARNのみがノードラベル表現をサポートします。つまり、前のバージョンを実行する場合、このプロパティは無視されます。 1.4.0
spark.yarn.tags (none) YARNアプリケーションレポートの中に現れるYARNアプリケーションのタグとして渡される文字列のカンマ区切りのリスト。これはYARNアプリケーションを問い合わせる時にフィルターとして使うことができます。 1.5.0
spark.yarn.priority (none) YARN が保留中のアプリケーションの順序付けポリシーを定義するためのアプリケーションの優先順位。整数値が大きいほど、アクティブ化される機会が多くなります。現在のところ、YARN は FIFO 順序付けポリシーを使っている場合のみアプリケーションの優先順位をサポートします。 3.0.0
spark.yarn.config.gatewayPath (none) (Sparkアプリケーションが開始された)ゲートウェイホスト上で有効なパスですが、クラスタ内の他のノードの同じリソースでは異なるパスかも知れません。spark.yarn.config.replacementPathと合わせて、Sparkがリモートプロセスを正しく起動できるように、異質な設定を持つクラスタをサポートするために使われます。

置換パスは通常なんらかのYARNによってexportされた環境変数へのリファレンスを含みます(従ってSparkコンテナから見えます)。

例えば、もしゲートウェイノードが /disk1/hadoop上にインストールされたHadoopライブラリを持ち、Hadoopがインストールされた場所がYARNによってHADOOP_HOME環境変数としてexportされた場所であれば、この値を /disk1/hadoop に設定して置換パスを$HADOOP_HOMEに設定することは、リモートプロセスによって起動するために使われるパスが適切にローカルのYARN設定を参照するようにするでしょう。

1.5.0
spark.yarn.config.replacementPath (none) spark.yarn.config.gatewayPathを見てください。 1.5.0
spark.yarn.rolledLog.includePattern (none) 定義されたincludeパターンに合致するログファイルをフィルタするJava Regexで、それらのログファイルはrolling形式で集約されるでしょう。これはYARNのrollingログ集約と一緒に使われ、この機能を有効にするためにYARN側では yarn.nodemanager.log-aggregation.roll-monitoring-interval-secondsが設定されるべきです。Spark log4j appender は、FileAppenderあるいは実行中に削除されるファイルを扱うことができる他のappenderを使うように変更する必要があります。(spark.logのような)log4j設定の中で設定されたファイル名に基づいて、ユーザは集約する必要がある全てのログファイルを含むようにregex(spark*)を設定する必要があります。 2.0.0
spark.yarn.rolledLog.excludePattern (none) 定義されたincludeパターンを除外するログファイルをフィルタするJava Regexで、それらのログファイルはrolling形式で集約されないでしょう。ログファイル名がincludeとexcludeのパターンの両方に合致する場合、このファイルは結果的に除外されるでしょう。 2.0.0
spark.yarn.executor.launch.excludeOnFailure.enabled false YARNのリソース割り当ての問題を持つノードの除外を有効にするためのフラグ。除外のためのエラーの制限は、spark.excludeOnFailure.application.maxFailedExecutorsPerNodeで設定できます。 2.4.0
spark.yarn.exclude.nodes (none) リソース割り当てから除外される YARN ノード名のカンマ区切りのリスト。 3.0.0
spark.yarn.metrics.namespace (none) AMメトリクスのレポートのためのルートの名前空間。設定されない場合は、YARNアプリケーションIDが使われます。 2.4.0

SHS カスタム executor ログ URL で利用可能なパターン

Pattern意味
{{HTTP_SCHEME}} YARN HTTPポリシーに応じて、http://https://。(yarn.http.policyを介して設定されます)
{{NM_HOST}} コンテナが実行されたノードの "ホスト"。
{{NM_PORT}} コンテナが実行されたノードマネージャの "ポート"。
{{NM_HTTP_PORT}} コンテナが実行されたノードマネージャの http サーバの "ポート"。
{{NM_HTTP_ADDRESS}} コンテナが割り当てられているノードの Http URI。
{{CLUSTER_ID}} リソースマネージャのクラスタ ID。(yarn.resourcemanager.cluster-idを介して設定されます)
{{CONTAINER_ID}} コンテナの ID。
{{USER}} システム環境上のSPARK_USER
{{FILE_NAME}} stdout, stderr

例えば、ノードマネージャの http サーバにリダイレクトさせずに、ログ URL リンクをジョブ履歴サーバに直接ポイントする場合は、以下のように spark.history.custom.executor.log.url を設定することができます:

{{HTTP_SCHEME}}<JHS_HOST>:<JHS_PORT>/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096

注意: <JHS_HOST><JHS_PORT>を実際の値で置き換える必要があります。

リソース割り当てと設定の概要

設定ページのカスタムリソーススケジューリングと設定概要セクションを必ず読んでください。このセクションでは、リソーススケジュールの YARN 固有の側面についてのみ説明します。

YARN は、ユーザが Spark で使用したいリソースをサポートするように設定する必要があります。YARN のリソーススケジューリングは YARN 3.1.0 で追加されました。リソースの設定と分離の適切な設定の詳細については、YARN ドキュメントを見てください。理想的には、リソースは executor が割り当てられたリソースのみを見ることができるように、分離してセットアップされます。分離を有効にしていない場合、ユーザはリソースが executor 間で共有されないようにする検出スクリプトを作成する必要があります。

YARNはユーザ定義のリソース型をサポートしますが、GPU (yarn.io/gpu)とFPGA (yarn.io/fpga)の組み込み型があります。そのため、これらのリソースのいずれかを使っている場合は、Spark はSpark リソースのリクエストを YARN リソースに変換でき、spark.{driver/executor}.resource. 設定を指定するだけで済みます。YARNでGPUやFPGAに独自のリソース型を使う場合は、spark.yarn.resourceGpuDeviceNamespark.yarn.resourceFpgaDeviceNameを使ってSparkのマッピングを変更できることに注意してください。FPGA あるいは GPU 以外のリソースを使っている場合は、ユーザは YARN (spark.yarn.{driver/executor}.resource.) と Spark (spark.{driver/executor}.resource.) の両方の設定を指定する必要があります。

例えば、ユーザは executor ごとに2つの GPU を要求したいとします。ユーザは、spark.executor.resource.gpu.amount=2 を指定するだけで、Spark は YARN からyarn.io/gpu リソース型のリクエストを処理します。

ユーザがユーザ定義の YARN リソースを持つ場合は、それを acceleratorX と呼び、ユーザは spark.yarn.executor.resource.acceleratorX.amount=2spark.executor.resource.acceleratorX.amount=2 を指定する必要があります。

YARN は、各コンテナに割り当てられたリソースのアドレスを Spark に通知しません。そのため、ユーザは起動時に executor によって実行される検出スクリプトを指定して、その executor が利用可能なリソースを検出する必要があります。サンプルスクリプトは examples/src/main/scripts/getGpusResources.sh にあります。スクリプトには実行権限が設定されている必要があり、ユーザは悪意のあるユーザがスクリプトを変更できないように権限を設定する必要があります。スクリプトは STDOUT に ResourceInformation クラスの形式で JSON 文字列を書き込む必要があります。これは、リソース名とその executor だけが利用できるリソースアドレスの配列を持ちます。

ステージレベルのスケジューリングの概要

動的割り当てが有効な場合、YARNでステージレベルのスケジューリングがサポートされます。YARN固有で1つ注意すべきことは、各ResourceProfileはYARNで異なるコンテナ優先度を必要とすることです。The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. これは、先に作成されたプロファイルがYARNでより高い優先度を持つことを意味します。Sparkは別のステージを開始する前に1つのステージを終了するため、通常これは問題になりません。これが影響を与える可能性があるのは、ジョブサーバ型のシナリオのみであるため、注意が必要です。基本のデフォルトプロファイルと独自のResourceProfileの間でカスタムリソースの処理方法に違いがあることに注意してください。ユーザがSparkスケジューリング無しで追加のリソースを含むYARNコンテナを要求できるようにするために、ユーザはspark.yarn.executor.resource.設定を介してリソースを指定できます。ただしこれらの設定は基本のデフォルトプロファイルのみで使われ、他の独自のResourceProfileには伝搬されません。これはステージにそれらを持たせたくない場合、それらを削除する方法がないためです。これによりデフォルトのプロファイルでspark.yarn.executor.resource.で定義された独自のリソースと、GPUまたはFPGAのsparkで定義されたリソースが取得されることになります。SparkはGPUとFPGAリソースをYARN組み込み型yarn.io/gpuyarn.io/fpgaに変換しますが、他のリソースのマッピングを認識しません。その他のSpark独自のリソースは、デフォルトプロファイルのYARNに伝播されません。したがって、独自のリソースに基づいてSparkをスケジュールし、YARNから要求する場合は、YARN(spark.yarn.{driver/executor}.resource.)とSpark(spark.{driver/executor}.resource.)設定から指定しなければなりません。追加のリソースを含むYARNコンテナのみが必要で、Sparkがそれらの使用をスケジュールしない場合は、Spark設定をオフのままにします。現在のところ、独自のResourceProfileの場合、SparkがそれらをスケジューリングせずにYARNリソースのみを指定する方法はありません。これは、独自のResourceProfileの場合、ResourceProfileで定義されたすべてのリソースをYARNに伝播することを意味します。まだ、GPUとFPGAをYARNの組み込み型に変換します。これは、指定する独自のリソースの名前がYARNで定義されているものと一致する必要があります。

重要事項

Kerberos

Sparkでの標準的なKerberosサポートはセキュリティのページでカバーされます。

YARN モードでは、Hadoop ファイルシステムにアクセスすると、hadoop 設定のデフォルトのファイルシステム以外に、Spark は Spark アプリケーションのステージングディレクトリをホストするサービスの移譲トークンも自動的に取得します。

YARN固有のKerberos設定

プロパティ名デフォルト意味これ以降のバージョンから
spark.kerberos.keytab (none) 上で指定されたプリンシパルのためのキータブを含むファイルへのフルパス。このキータブはYARN分散キャッシュを使ってYARNアプリケーションマスターを実行しているノードにコピーされるでしょう。ログインチケットと委任トークンを定期的に更新するために使われるでしょう。--keytab コマンドラインの引数と同じです。
("local"マスターとも協調して動作します。)
3.0.0
spark.kerberos.principal (none) secure クラスタ上で動作する間にKDCにログインするために使われるプリンシパル。--principal コマンドラインの引数と同じです。
("local"マスターとも協調して動作します。)
3.0.0
spark.yarn.kerberos.relogin.period 1m kerberos TGTが更新されるべきかどうかをどれだけの頻度で調べるか。これはTGTの更新の期間(あるいはもしTGT更新が無効な場合はTGTの生存期間)よりも短い値に設定されるべきです。デフォルト値はほとんどの配備で十分な筈です。 2.3.0
spark.yarn.kerberos.renewal.excludeHadoopFileSystems (none) リソーススケジュールでの委任トークンからホストが除外されるHadoopファイルシステムのカンマ区切りのリスト。例えば、spark.yarn.kerberos.renewal.excludeHadoopFileSystems=hdfs://nn1.com:8032, hdfs://nn2.com:8032。これは今のところYARNで動作することが分かっているため、YARNリソースマネージャーはアプリケーションのトークンを更新しません。リソーススケジューラはトークンを更新しないため、そのトークンを使おうとする元のトークンの有効期限よりも長く実行されているアプリケーションは失敗する可能性があることに注意してください。 3.2.0

Kerberosのトラブルシューティング

Hadoop/Kerberos 問題は"困難"になる可能性があります。有用な方法のひとつはHADOOP_JAAS_DEBUG 環境変数を設定することでHadoop内のKerberos操作の特別なログを有効にすることです。

export HADOOP_JAAS_DEBUG=true

JDK クラスはシステムプロパティ sun.security.krb5.debugsun.security.spnego.debug=trueを設定することで、Kerberosおよび SPNEGO/REST 認証の特別なログを有効に設定することができます。

-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

これら全てのオプションはアプリケーションマスター内で有効にすることができます:

spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true
spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

最後に、org.apache.spark.deploy.yarn.Clientのログレベルが DEBUGに設定された場合、ログには全ての取得されたトークンのリストとそれらの失効の詳細が含まれるでしょう。

外部シャッフルサービスの設定

YARNクラスタ内の各NodeManager上でSparkシャッフルサービスを開始するには、以下の説明に従ってください:

  1. SparkをYARN profileを使ってビルドします。pre-packaged配布物を使っている場合はこのステップをスキップします。
  2. spark-<version>-yarn-shuffle.jarを配置します。Sparkを自身で構築した場合は $SPARK_HOME/common/network-yarn/target/scala-<version> の下に無ければならず、配布物を使っている場合はyarnの下に無ければなりません。
  3. このjarをクラスタ内の全てのNodeManagerのクラスパスに追加します。
  4. 各ノード上のyarn-site.xml の中で、spark_shuffleyarn.nodemanager.aux-servicesに追加し、その後、yarn.nodemanager.aux-services.spark_shuffle.classorg.apache.spark.network.yarn.YarnShuffleServiceに設定します。
  5. シャッフル中のガベージコレクションの問題を避けるためにetc/hadoop/yarn-env.sh内のYARN_HEAPSIZE (デフォルトは1000) を設定することで、NodeManager のヒープサイズを増やします。
  6. クラスタ内の全ての NodeManagerを再起動します。

YARN上でシャッフルサービスが動作している場合は、以下の特別な設定オプションが利用可能です:

プロパティ名デフォルト意味
spark.yarn.shuffle.stopOnFailure false Sparkシャッフルサービスの初期化で障害がある場合に、ノードマネージャーを停止するかどうか。これは、Sparkシャッフルサービスが実行中では無いノードマネージャー上でコンテナを実行することで起こるアプリケーション障害を防ぎます。
spark.yarn.shuffle.service.metrics.namespace sparkShuffleService NodeManagerのHadoop metrics2 システムへシャッフルサービスメトリクスを発行する時に使う名前空間。

上記の手順はデフォルトのシャッフルサービス名spark_shuffleが使われることを前提としていることに注意してください。ここでは任意の名前を使えますが、YARN NodeManager設定で使われる値はSparkアプリケーションのspark.shuffle.service.nameの値と一致する必要があります。

シャッフルサービスは、デフォルトでNodeManagerによって使われるHadoop設定から全ての設定を取得します(例、yarn-site.xml)。ただし、spark-shuffle-site.xmlという名前のファイルを使って、シャッフルサービスを個別に設定することもできます。このファイルはシャッフルサービスのクラスパスに配置する必要があります(デフォルトでは、NodeManagerのクラスパスで共有されます)。シャッフルサービスはこれを標準のHadoop設定リソースとして扱い、NodeManagerの設定を上書きします。

Apache Oozieを使ってアプリケーションを起動

Apache Oozie はSparkアプリケーションをワークフローの一部として起動することができます。secureクラスタ内で、起動されたアプリケーションはクラスタのサービスにアクセスするために関係するトークンを必要とするでしょう。Sparkがkeytabを使って起動された場合は、これは自動的です。しかし、もしSparkがkeytab無しで起動された場合は、securityをセットアップする責任はOozieに移譲されなければなりません。

secureなクラスタのためにOozieを設定およびジョブのための証明書を取得する詳細は、特定のリリースドキュメントの"認証"の章のOozie web サイトで見つけることができます。

Sparkのアプリケーションのために、OozieワークフローはOozieがアプリケーションが必要とする以下を含む全てのトークンを要求するようにセットアップしなければなりません:

SparkがHive, HBaseおよびリモートのHDFSトークンを取得しようと試行 - そして失敗 - するのを避けるために、Sparkの設定はサービスのためのトークンのコレクションを無効に設定しなければなりません。

Sparkの設定は以下の行を含む必要があります:

spark.security.credentials.hive.enabled   false
spark.security.credentials.hbase.enabled  false

設定オプション spark.kerberos.access.hadoopFileSystems は未設定でなければなりません。

Spark Web UIを置き換えるためのSpark履歴サーバの使用

アプリケーション UIが無効の時に、実行中のアプリケーションのための追跡のURLとしてSpark履歴サーバ アプリケーションを使うことができます。これはsecureなクラスタ、あるいはSparkドライバのメモリの使用を減らすために望ましいかも知れません。Spark履歴サーバを使って追跡を設定するには、以下を行います:

履歴サーバの情報はアプリケーションの状態が原因で最新ではないかも知れないことに注意してください。

複数のバージョンのSparkシャッフルサービスの実行

このセクションは、YARNのバージョン 2.9.0 以上で実行される時にのみ適用されることに注意してください。

場合によってでゃ、異なるバージョンのSparkを使っているSparkシャッフルサービスの複数のインスタンスを実行することが望ましい場合があります。これは、例えば、複数のSparkバージョンを実行するアプリケーションのワークロードが混在するYARNクラスタを実行する場合に役立ちます。これは、特定のバージョンのシャッフルサービスが他のバージョンのSparkと常に互換性があるとは限らないためです。2.9.0以降のYARNバージョンは分離されたクラスローダ内でシャッフルサービスを実行する機能をサポートします(YARN-4577を見てください)。つまり、単一のNodeManager内で複数のSparkバージョンが共存できます。yarn.nodemanager.aux-services.<service-name>.classpath、YARN 2.10.2/3.1.1/3.2.0 以降、yarn.nodemanager.aux-services.<service-name>.remote-classpath オプションを使ってこれを設定できます。別々のクラスパスを設定することに加えて、2つのバージョンが異なるポートに広告できることを確認する必要があります。これは上記のspark-shuffle-site.xmlファイルを使って実現できます。例えば、以下のような構成に設定することができます:

  yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y
  yarn.nodemanager.aux-services.spark_shuffle_x.classpath = /path/to/spark-x-yarn-shuffle.jar,/path/to/spark-x-config
  yarn.nodemanager.aux-services.spark_shuffle_y.classpath = /path/to/spark-y-yarn-shuffle.jar,/path/to/spark-y-config

2つのspark-*-configディレクトリには、それぞれのファイル spark-shuffle-site.xmlが含まれています。これらはHadoop Configuration formatのXMLであり、それぞれが使うポート番号とメトリクス名のプリフィックスを調整するためのいくつかの設定が含まれます。

<configuration>
  <property>
    <name>spark.shuffle.service.port</name>
    <value>7001</value>
  </property>
  <property>
    <name>spark.yarn.shuffle.service.metrics.namespace</name>
    <value>sparkShuffleServiceX</value>
  </property>
</configuration>

2つの異なるサービスでは値が両方とも異なる必要があります。

次に、Sparkアプリケーションの設定で、1つは以下のように設定する必要があります。

  spark.shuffle.service.name = spark_shuffle_x
  spark.shuffle.service.port = 7001

もう1つは以下のように設定する必要があります:

  spark.shuffle.service.name = spark_shuffle_y
  spark.shuffle.service.port = <other value>
TOP
inserted by FC2 system