ジョブ スケジューリング
概要
Sparkは計算の間のリソースのスケジューリングのために幾つかの設備を持ちます。まず、クラスタモードの概要を思い出してください。各Sparkアプリケーション(SparkContextのインスタンスは独立したexecutorのプロセスのセットを実行します。Sparkが実行されるクラスタマネージャーは アプリケーションを横断したスケジューリングを提供します。次に、各Sparkアプリケーション内で、複数の"ジョブ"(Sparkアクション)が異なるスレッドによってサブミットされた場合は同時に実行されるかも知れません。アプリケーションがネットワーク越しにリクエストに応えている場合は、これは一般的です。Sparkは各Sparkコンテキスト内でリソースをスケジュールするために 公平なスケジューラを含んでいます。
アプリケーションを横断したスケジューリング
クラスタ上で動作する場合は、各Sparkアプリケーションはそのアプリケーションのためだけにタスクを実行しデータを保持するexecutor JVMの独立したセットを持ちます。複数のユーザがクラスタを共有する必要がある場合は、クラスタマネージャに依存する割り当てを管理する異なるオプションがあります。
全てのクラスタマネージャ上で利用可能なもっとも簡単なオプションは、リソースの静的な分割です。このやり方で、各アプリケーションは利用可能なリソースの最大量を指定され、その間それらを保持します。これはSparkのstandalone および YARN モード、そして がさつな Mesos モードで使用されるやり方です。リソースの割り当てはクラスタのタイプに基づいて以下のように設定することができます:
- スタンドアローンモード: デフォルトでは、スタンドアローンモードクラスタにサブミットされたアプリケーションはFIFO (first-in-first-out) 順に実行され、各アプリケーションは全ての利用可能なノードを使用しようとするでしょう。
spark.cores.max
設定プロパティを設定、あるいはこの設定をしないアプリケーションのデフォルトをspark.deploy.defaultCores
を使って変更することで、アプリケーションが使用するノードの数を制限することができます。最後に、コアの制御に加えて、各アプリケーションのspark.executor.memory
設定がメモリの使用を制御します。 - Mesos: Mesosで静的な割り当てを使うために、
spark.mesos.coarse
設定プロパティをtrue
に設定し、スタンドアローンモードと同じように各アプリケーションのリソースを制限するために任意でspark.cores.max
を設定します。executorメモリーを制御するためにspark.executor.memory
も設定する必要があります。 - YARN: Spark YARNクライアントへの
--num-executors
オプションはどれだけの数のexecutorをクラスタ上で割り当てるかを制御します。一方で、--executor-memory
および--executor-cores
はexecutorあたりのリソースを制御します。
Mesosで利用可能な二つ目のオプションは、CPUコアの動的共有です。このモードでは、各Sparkアプリケーションは固定および独立したメモリ割り当て(spark.executor.memory
で設定される)を持ちますが、アプリケーションがマシーン上で実行していない場合はほかのアプリケーションがタスクをそれらのコア上で実行するかも知れません。このモードは別個のユーザからのシェルのセッションなど過剰ではない数のアプリケーションが見込まれる場合に効果的です。しかし、仕事がある場合にアプリケーションが一つのノード上のコアを取り戻すのに時間が掛かるかも知れないため、それには予測しがたい遅延のリスクが付いて回ります。このモードを使用するには、単にmesos://
URLを使用し、spark.mesos.coarse
をfalseに設定ます。
どのモードも現在のところアプリケーション間のメモリの共有を提供していないことに注意してください。このやりかたでデータを共有したい場合は、同じRDDをクエリすることで複数のリクエストを扱うことができる単独のサーバアプリケーションを実行することをお勧めします。将来のリリースでは、TachyonのようなインメモリストレージシステムがRDDを共有するための他のやり方を提供するでしょう。
動的なリソースの割り当て
Sparkは負荷に応じてアプリケーションに占有されているリソースを動的に調整する仕組みを提供します。このことは、もしリソースがもう使われていなくて後で再び要求があった場合に、アプリケーションはクラスタにリソースを返すかもしれないことを意味します。この機能はSparkクラスタ内で複数のアプリケーションがリソースを共有している場合に特に有用です。
この機能はデフォルトで無効であり、全てのcoarse-grainedマネージャー上で利用可能です。つまりスタンドアローンモード、YARN モード、およびMesos coarse-grained モード。
設定とセットアップ
この機能を使うには2つの必要条件があります。まず、アプリケーションがspark.dynamicAllocation.enabled
をtrue
に設定していなければなりません。次に同じクラスタ内の各ワーカーノード上で外部シャッフルサービス をセットアップし、アプリケーション内で spark.shuffle.service.enabled
をtrueに設定しなければなりません。外部シャッフルサービスの目的は、executorによって書き込まれたシャッフルファイルを削除すること無しに、executorを削除することができるようにするためです。このサービスのセットアップの方法は、クラスタマネージャーによって変わります:
スタンドアローンモードでは、単純にspark.shuffle.service.enabled
を true
に設定してワーカーを開始します。
Mesos coarse-grained モードでは、全てのスレーブノード上でspark.shuffle.service.enabled
を true
に設定して.$SPARK_HOME/sbin/start-mesos-shuffle-service.sh
を実行します。例えば、Marathonを使ってそうするかも知れません。
YARNモードでは、以下のように各NodeManager
上でシャッフルサービスを開始します:
- SparkをYARN profileを使ってビルドします。pre-packaged配布物を使っている場合はこのステップをスキップします。
spark-<version>-yarn-shuffle.jar
を配置します。Sparkを自身で構築した場合は$SPARK_HOME/network/yarn/target/scala-<version>
の下に無ければならず、配布物を使っている場合はlib
の下に無ければなりません。- このjarをクラスタ内の全ての
NodeManager
のクラスパスに追加します。 - 各ノード上の
yarn-site.xml
の中で、spark_shuffle
をyarn.nodemanager.aux-services
に追加し、その後、yarn.nodemanager.aux-services.spark_shuffle.class
をorg.apache.spark.network.yarn.YarnShuffleService
に設定し、>spark.shuffle.service.enabled
をtrueにします。 - クラスタ内の全ての
NodeManager
を再起動します。
他の全ての関係する設定は任意で、spark.dynamicAllocation.*
および spark.shuffle.service.*
の名前空間の下にあります。詳細は設定ページを見てください。
リソースの割り当てポリシー
高レベルでは、Sparkはexecutorがもう使用されなくなった場合にexecutorを放棄し、executorが必要となった時にexecutorを要求しなければなりません。削除されようとしているexecutorが近い将来にタスクを実行するかどうか、あるいは追加されようとしている新しいexecutorが実際には仕事をしないかも知れないかどうかを予測する決定的な方法は無いため、いつexecutorを削除あるいは要求するかを決定するために一連の実践的な方法を必要とします。
リクエストのポリシー
動的割り当てが有効なSparkアプリケーションはスケジュールされるのを待っている待機中のタスクがある場合に、追加のexecutorを要求します。この条件は、サブミットされたが完了していない全てのタスクを同時に没頭させるには既存のexecutorのセットが足りないことを必然的に暗示します。
Sparkはexecutorを概数で要求します。実際のリクエストはspark.dynamicAllocation.schedulerBacklogTimeout
秒の間待機中のタスクがある場合に引き起こされます。そして待機中のタスクのキューが存在していればspark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒ごとに再び引き起こされます。更に、各回ごとにリクエストされるexecutorの数は以前の回から指数関数的に増大します。例えば、アプリケーションが最初の回で1つのexecutorを追加し、その後の回では2,4,8等々のexecutorが追加されるでしょう。
指数関数的な増加のポリシーの理由は2つあります。まず、少しの追加のexecutorで十分だと分かる場合に備えて、アプリケーションは最初のうちは慎重にexecutorをリクエストしなければなりません。これはTCPの遅い開始のための正当化と呼応します。次に、多くのexecutorが実際に必要だと分かる場合に備えて、アプリケーションは状況に応じてリソースの使用を増やすことができなければなりません。
削除のポリシー
executorの削除のためのポリシーはもっと単純です。Sparkのアプリケーションはspark.dynamicAllocation.executorIdleTimeout
秒以上仕事をしていないexecutorがある場合に削除します。スケジュールされるべきまだ待機中のタスクがある場合にexecutorは何もしないでいるべきでは無いという点で、ほとんどの状況においてこの条件はリクエストの条件とはお互いに相容れないものであることに注意してください。
executorのgracefulな退役
動的な割り当ての前は、Spark executorは失敗した場合あるいは関連するアプリケーションも終了した場合のどちらかで終了します。両方の場合において、executorに関する全ての状態はもう必要ないものであり、安全に破棄することができます。しかし、動的な割り当てを使う場合、executorが明示的に削除された場合、アプリケーションはまだ実行されています。アプリケーションがexecutorによって格納あるいは書き込まれた状態にアクセスしようとすると、アプリケーションは状態の再計算を行わなければならないでしょう。従って、Sparkは状態を削除する前にそれを維持することによってexecutorをgracefullyに退役させる仕組みが必要です。
この要求はシャッフルの場合に特に重要です。シャッフルの間、Spark executorはまず自身のmap出力をローカルのディスクに出力し、それから他のexecutorがそれらを取り出そうとした場合にそれらのファイルに対してサーバとして振る舞います。In the event of stragglers, which are tasks that run for much longer than their peers, dynamic allocation may remove an executor before the shuffle completes, in which case the shuffle files written by that executor must be recomputed unnecessarily.
シャッフルファイルを維持するための解決策は外部のシャッフルサービスを使用することで、Spark 1.2でも導入されています。このサービスはSparkアプリケーションおよびそれらのexecutorと関係なく各クラスタのノード上で実行する長く実行中のプロセスを参照します。サービスが有効であれば、Spark executorはお互いからの代わりにサービスからシャッフルファイルを取得します。この事はexecutorが書き込んだどのシャッフル状態もexecutorのライフタイムを超えて提供され続けるかもしれないことを意味します。
シャッフルファイルの書き込みに加えて、executorもディスクあるいはメモリのどちらかにデータをキャッシュします。しかし、executorが削除される場合に全てのキャッシュされたデータはもうアクセスできなくなるでしょう。 コレの解決策はSpark1.2では現在のところありません。In future releases, the cached data may be preserved through an off-heap storage similar in spirit to how shuffle files are preserved through the external shuffle service.
アプリケーション内のスケジューリング
指定されたSparkアプリケーション(SparkContextインスタンス)では、複数の並行ジョブが異なるスレッドからサブミットされた場合には並行して実行することができます。この章では、"ジョブ"という言葉で、Sparkアクション(例えば、save
, collect
) およびアクションを評価するために実行が必要な全てのタスクを意味します。Sparkのスケジューラは完全にスレッドセーフで、複数のリクエストを提供するアプリケーションを有効にするためにこのやり方をサポートします。
デフォルトでは、SparkのスケジューラはFIFO形式でジョブを実行します。各ジョブは"ステージ"(つまり、mapおよびreduceフェーズ)に分割され、起動するためのタスクをステージが持つ間、最初のジョブは利用可能な全てのリソースの優先権を持ちます。そして2つ目等々のジョブが優先権を持ちます。キューの先頭のジョブがクラスタ全体を使う必要が無い場合、後のジョブがすぐに実行を開始することができますが、キューの先頭のジョブが大きければ、後のジョブはかなり遅れるかもしれません。
Spark0.8から、ジョブ間の公平な共有も利用可能です。公平な共有の下では、Sparkはジョブ間で"ラウンドロビン"形式でタスクを割り当て、その結果全てのジョブはクラスタのリソースをほぼ等しく得ます。このことは、長いジョブが実行中にサブミットされた短いジョブはすぐにリソースを受け取って開始することができ、長いジョブが終了するまで待つことなく良い応答タイムを得ることができることを意味します。このモードは複数ユーザ設定のためには最善です。
公平なスケジューラを有効にするには、SparkContextを設定する時に単にspark.scheduler.mode
プロパティをFAIR
に設定します。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
公平なスケジューラのプール
公平なスケジューラはジョブのプールへのグルーピング、および各プールごとに異なるスケジューリングオプション(例えば weight)の設定もサポートしています。This can be useful to create a “high-priority” pool for more important jobs, for example, or to group the jobs of each user together and give users equal shares regardless of how many concurrent jobs they have instead of giving jobs equal shares. このやり方は Hadoopの公平なスケジューラの後でモデル化されます。
いかなる干渉も無しに新しくサブミットされたジョブはデフォルト プールに投入されますが、ジョブのプールはサブミットする時のスレッド内のSparkContextにspark.scheduler.pool
"local property"を追加することで設定することができます。これは以下のようにして行われます:
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
このlocalプロパティを設定した後で、このスレッドにサブミットされた全てのジョブ(このスレッド内で RDD.save
, count
, collect
など)はこのプール名を使用するでしょう。The setting is per-thread to make it easy to have a thread run multiple jobs on behalf of the same user. 関係するスレッドのプールを明確にしたい場合は、以下のように単に呼び出します:
sc.setLocalProperty("spark.scheduler.pool", null)
プールのデフォルトの挙動
デフォルトでは、各プールはクラスタの均等な共有(デフォルトプール内の各ジョブの均等な共有も)しますが、各プール内ではジョブはFIFO順に実行されます。例えば、ユーザごとに一つのプールを作成した場合、このことは各ユーザがクラスタの均等な共有を取得することを意味し、各ユーザのクエリは後のクエリがユーザの前のリソースを取らずに順番に実行されるだろうことを意味します。
プールのプロパティの設定
特定のプールのプロパティも設定ファイルを使って修正することができます。各プールは3つのプロパティをサポートします:
schedulingMode
: This can be FIFO or FAIR, to control whether jobs within the pool queue up behind each other (the default) or share the pool’s resources fairly.weight
: これはクラスタの他のプールに対するプールの共有を制御します。デフォルトでは、全てのプールはweightが1です。例えば、特定のプールのweightを2にすると、他のアクティブなプールに比べて2倍のリソースを取得するでしょう。1000のような高いweightの設定は、プール間の優先度を実装することも可能にします - 本質的に、1000のweightのプールはアクティブなジョブを持つ時は常に最初にタスクを起動させるでしょう。minShare
: 全体のweightとは別に、各プールは 管理者がしたがるようなminimum shares (CPUのコアの数として)を指定することができます。公平なスケジューラは、weightに応じて余分なリソースを再分配する前に、全てのアクティブなプールのminimum sharesが一致するようにしようとします。TheminShare
property can therefore be another way to ensure that a pool can always get up to a certain number of resources (e.g. 10 cores) quickly without giving it a high priority for the rest of the cluster. デフォルトでは、各プールのminShare
は0です。
プールのプロパティはconf/fairscheduler.xml.template
に似たXMLファイルを作成し、SparkConfに spark.scheduler.allocation.file
プロパティを設定することで、設定することができます。
conf.set("spark.scheduler.allocation.file", "/path/to/file")
XMLファイルのフォーマットは単に各プールごとの<pool>
エレメントであり、その中に様々な設定のための異なるエレメントがあります。例えば:
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
完全な例はconf/fairscheduler.xml.template
の中でも利用可能です。XMLファイルに設定されていない全てのプールは全ての設定(スケジューリングモード FIFO、weight 1 および minShare 0)について単純にデフォルトの値を取る事に注意してください。