ジョブ スケジューリング

概要

Sparkは計算の間のリソースのスケジューリングのために幾つかの設備を持ちます。まず、クラスタモードの概要を思い出してください。各Sparkアプリケーション(SparkContextのインスタンスは独立したexecutorのプロセスのセットを実行します。Sparkが実行されるクラスタマネージャーは アプリケーションを横断したスケジューリングを提供します。次に、各Sparkアプリケーションで、複数の"ジョブ"(Sparkアクション)が異なるスレッドによってサブミットされた場合は同時に実行されるかも知れません。アプリケーションがネットワーク越しにリクエストに応えている場合は、これは一般的です。Sparkは各Sparkコンテキスト内でリソースをスケジュールするために 公平なスケジューラを含んでいます。

アプリケーションを横断したスケジューリング

クラスタ上で動作する場合は、各Sparkアプリケーションはそのアプリケーションのためだけにタスクを実行しデータを保持するexecutor JVMの独立したセットを持ちます。複数のユーザがクラスタを共有する必要がある場合は、クラスタマネージャに依存する割り当てを管理する異なるオプションがあります。

全てのクラスタマネージャ上で利用可能なもっとも簡単なオプションは、リソースの静的な分割です。このやり方で、各アプリケーションは利用可能なリソースの最大量を指定され、その間それらを保持します。これはSparkのstandalone および YARN モード、そして がさつな Mesos モードで使用されるやり方です。リソースの割り当てはクラスタのタイプに基づいて以下のように設定することができます:

Mesosで利用可能な二つ目のオプションは、CPUコアの動的共有です。このモードでは、各Sparkアプリケーションは固定および独立したメモリ割り当て(spark.executor.memoryで設定される)を持ちますが、アプリケーションがマシーン上で実行していない場合はほかのアプリケーションがタスクをそれらのコア上で実行するかも知れません。このモードは別個のユーザからのシェルのセッションなど過剰ではない数のアプリケーションが見込まれる場合に効果的です。しかし、仕事がある場合にアプリケーションが一つのノード上のコアを取り戻すのに時間が掛かるかも知れないため、それには予測しがたい遅延のリスクが付いて回ります。このモードを使用するには、単にmesos:// URLを使用し、spark.mesos.coarseをfalseに設定ます。

どのモードも現在のところアプリケーション間のメモリの共有を提供していないことに注意してください。このやりかたでデータを共有したい場合は、同じRDDをクエリすることで複数のリクエストを扱うことができる単独のサーバアプリケーションを実行することをお勧めします。将来のリリースでは、TachyonのようなインメモリストレージシステムがRDDを共有するための他のやり方を提供するでしょう。

動的なリソースの割り当て

Sparkは負荷に応じてアプリケーションに占有されているリソースを動的に調整する仕組みを提供します。このことは、もしリソースがもう使われていなくて後で再び要求があった場合に、アプリケーションはクラスタにリソースを返すかもしれないことを意味します。この機能はSparkクラスタ内で複数のアプリケーションがリソースを共有している場合に特に有用です。

この機能はデフォルトで無効であり、全てのcoarse-grainedマネージャー上で利用可能です。つまりスタンドアローンモードYARN モード、およびMesos coarse-grained モード

設定とセットアップ

この機能を使うには2つの必要条件があります。まず、アプリケーションがspark.dynamicAllocation.enabledtrueに設定していなければなりません。次に同じクラスタ内の各ワーカーノード上で外部シャッフルサービス をセットアップし、アプリケーション内で spark.shuffle.service.enabledをtrueに設定しなければなりません。外部シャッフルサービスの目的は、executorによって書き込まれたシャッフルファイルを削除すること無しに、executorを削除することができるようにするためです。このサービスのセットアップの方法は、クラスタマネージャーによって変わります:

スタンドアローンモードでは、単純にspark.shuffle.service.enabledtrueに設定してワーカーを開始します。

Mesos coarse-grained モードでは、全てのスレーブノード上でspark.shuffle.service.enabledtrueに設定して.$SPARK_HOME/sbin/start-mesos-shuffle-service.shを実行します。例えば、Marathonを使ってそうするかも知れません。

YARNモードでは、以下のように各NodeManager上でシャッフルサービスを開始します:

  1. SparkをYARN profileを使ってビルドします。pre-packaged配布物を使っている場合はこのステップをスキップします。
  2. spark-<version>-yarn-shuffle.jarを配置します。Sparkを自身で構築した場合は $SPARK_HOME/network/yarn/target/scala-<version> の下に無ければならず、配布物を使っている場合はlibの下に無ければなりません。
  3. このjarをクラスタ内の全てのNodeManagerのクラスパスに追加します。
  4. 各ノード上のyarn-site.xml の中で、spark_shuffleyarn.nodemanager.aux-servicesに追加し、その後、yarn.nodemanager.aux-services.spark_shuffle.classorg.apache.spark.network.yarn.YarnShuffleServiceに設定し、>spark.shuffle.service.enabledをtrueにします。
  5. クラスタ内の全ての NodeManagerを再起動します。

他の全ての関係する設定は任意で、spark.dynamicAllocation.* および spark.shuffle.service.* の名前空間の下にあります。詳細は設定ページを見てください。

リソースの割り当てポリシー

高レベルでは、Sparkはexecutorがもう使用されなくなった場合にexecutorを放棄し、executorが必要となった時にexecutorを要求しなければなりません。削除されようとしているexecutorが近い将来にタスクを実行するかどうか、あるいは追加されようとしている新しいexecutorが実際には仕事をしないかも知れないかどうかを予測する決定的な方法は無いため、いつexecutorを削除あるいは要求するかを決定するために一連の実践的な方法を必要とします。

リクエストのポリシー

動的割り当てが有効なSparkアプリケーションはスケジュールされるのを待っている待機中のタスクがある場合に、追加のexecutorを要求します。この条件は、サブミットされたが完了していない全てのタスクを同時に没頭させるには既存のexecutorのセットが足りないことを必然的に暗示します。

Sparkはexecutorを概数で要求します。実際のリクエストはspark.dynamicAllocation.schedulerBacklogTimeout 秒の間待機中のタスクがある場合に引き起こされます。そして待機中のタスクのキューが存在していればspark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒ごとに再び引き起こされます。更に、各回ごとにリクエストされるexecutorの数は以前の回から指数関数的に増大します。例えば、アプリケーションが最初の回で1つのexecutorを追加し、その後の回では2,4,8等々のexecutorが追加されるでしょう。

指数関数的な増加のポリシーの理由は2つあります。まず、少しの追加のexecutorで十分だと分かる場合に備えて、アプリケーションは最初のうちは慎重にexecutorをリクエストしなければなりません。これはTCPの遅い開始のための正当化と呼応します。次に、多くのexecutorが実際に必要だと分かる場合に備えて、アプリケーションは状況に応じてリソースの使用を増やすことができなければなりません。

削除のポリシー

executorの削除のためのポリシーはもっと単純です。Sparkのアプリケーションはspark.dynamicAllocation.executorIdleTimeout 秒以上仕事をしていないexecutorがある場合に削除します。スケジュールされるべきまだ待機中のタスクがある場合にexecutorは何もしないでいるべきでは無いという点で、ほとんどの状況においてこの条件はリクエストの条件とはお互いに相容れないものであることに注意してください。

executorのgracefulな退役

動的な割り当ての前は、Spark executorは失敗した場合あるいは関連するアプリケーションも終了した場合のどちらかで終了します。両方の場合において、executorに関する全ての状態はもう必要ないものであり、安全に破棄することができます。しかし、動的な割り当てを使う場合、executorが明示的に削除された場合、アプリケーションはまだ実行されています。アプリケーションがexecutorによって格納あるいは書き込まれた状態にアクセスしようとすると、アプリケーションは状態の再計算を行わなければならないでしょう。従って、Sparkは状態を削除する前にそれを維持することによってexecutorをgracefullyに退役させる仕組みが必要です。

この要求はシャッフルの場合に特に重要です。シャッフルの間、Spark executorはまず自身のmap出力をローカルのディスクに出力し、それから他のexecutorがそれらを取り出そうとした場合にそれらのファイルに対してサーバとして振る舞います。In the event of stragglers, which are tasks that run for much longer than their peers, dynamic allocation may remove an executor before the shuffle completes, in which case the shuffle files written by that executor must be recomputed unnecessarily.

シャッフルファイルを維持するための解決策は外部のシャッフルサービスを使用することで、Spark 1.2でも導入されています。このサービスはSparkアプリケーションおよびそれらのexecutorと関係なく各クラスタのノード上で実行する長く実行中のプロセスを参照します。サービスが有効であれば、Spark executorはお互いからの代わりにサービスからシャッフルファイルを取得します。この事はexecutorが書き込んだどのシャッフル状態もexecutorのライフタイムを超えて提供され続けるかもしれないことを意味します。

シャッフルファイルの書き込みに加えて、executorもディスクあるいはメモリのどちらかにデータをキャッシュします。しかし、executorが削除される場合に全てのキャッシュされたデータはもうアクセスできなくなるでしょう。 コレの解決策はSpark1.2では現在のところありません。In future releases, the cached data may be preserved through an off-heap storage similar in spirit to how shuffle files are preserved through the external shuffle service.

アプリケーション内のスケジューリング

指定されたSparkアプリケーション(SparkContextインスタンス)では、複数の並行ジョブが異なるスレッドからサブミットされた場合には並行して実行することができます。この章では、"ジョブ"という言葉で、Sparkアクション(例えば、save, collect) およびアクションを評価するために実行が必要な全てのタスクを意味します。Sparkのスケジューラは完全にスレッドセーフで、複数のリクエストを提供するアプリケーションを有効にするためにこのやり方をサポートします。

デフォルトでは、SparkのスケジューラはFIFO形式でジョブを実行します。各ジョブは"ステージ"(つまり、mapおよびreduceフェーズ)に分割され、起動するためのタスクをステージが持つ間、最初のジョブは利用可能な全てのリソースの優先権を持ちます。そして2つ目等々のジョブが優先権を持ちます。キューの先頭のジョブがクラスタ全体を使う必要が無い場合、後のジョブがすぐに実行を開始することができますが、キューの先頭のジョブが大きければ、後のジョブはかなり遅れるかもしれません。

Spark0.8から、ジョブ間の公平な共有も利用可能です。公平な共有の下では、Sparkはジョブ間で"ラウンドロビン"形式でタスクを割り当て、その結果全てのジョブはクラスタのリソースをほぼ等しく得ます。このことは、長いジョブが実行中にサブミットされた短いジョブはすぐにリソースを受け取って開始することができ、長いジョブが終了するまで待つことなく良い応答タイムを得ることができることを意味します。このモードは複数ユーザ設定のためには最善です。

公平なスケジューラを有効にするには、SparkContextを設定する時に単にspark.scheduler.mode プロパティをFAIRに設定します。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平なスケジューラのプール

公平なスケジューラはジョブのプールへのグルーピング、および各プールごとに異なるスケジューリングオプション(例えば weight)の設定もサポートしています。This can be useful to create a “high-priority” pool for more important jobs, for example, or to group the jobs of each user together and give users equal shares regardless of how many concurrent jobs they have instead of giving jobs equal shares. このやり方は Hadoopの公平なスケジューラの後でモデル化されます。

いかなる干渉も無しに新しくサブミットされたジョブはデフォルト プールに投入されますが、ジョブのプールはサブミットする時のスレッド内のSparkContextにspark.scheduler.pool "local property"を追加することで設定することができます。これは以下のようにして行われます:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

このlocalプロパティを設定した後で、このスレッドにサブミットされた全てのジョブ(このスレッド内で RDD.save, count, collect など)はこのプール名を使用するでしょう。The setting is per-thread to make it easy to have a thread run multiple jobs on behalf of the same user. 関係するスレッドのプールを明確にしたい場合は、以下のように単に呼び出します:

sc.setLocalProperty("spark.scheduler.pool", null)

プールのデフォルトの挙動

デフォルトでは、各プールはクラスタの均等な共有(デフォルトプール内の各ジョブの均等な共有も)しますが、各プール内ではジョブはFIFO順に実行されます。例えば、ユーザごとに一つのプールを作成した場合、このことは各ユーザがクラスタの均等な共有を取得することを意味し、各ユーザのクエリは後のクエリがユーザの前のリソースを取らずに順番に実行されるだろうことを意味します。

プールのプロパティの設定

特定のプールのプロパティも設定ファイルを使って修正することができます。各プールは3つのプロパティをサポートします:

プールのプロパティはconf/fairscheduler.xml.templateに似たXMLファイルを作成し、SparkConfspark.scheduler.allocation.file プロパティを設定することで、設定することができます。

conf.set("spark.scheduler.allocation.file", "/path/to/file")

XMLファイルのフォーマットは単に各プールごとの<pool>エレメントであり、その中に様々な設定のための異なるエレメントがあります。例えば:


<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

完全な例はconf/fairscheduler.xml.templateの中でも利用可能です。XMLファイルに設定されていない全てのプールは全ての設定(スケジューリングモード FIFO、weight 1 および minShare 0)について単純にデフォルトの値を取る事に注意してください。

TOP
inserted by FC2 system