ジョブ スケジューリング

概要

Sparkは計算の間のリソースのスケジューリングのために幾つかの設備を持ちます。まず、クラスタモードの概要を思い出してください。各Sparkアプリケーション(SparkContextのインスタンス)は独立したexecutorのプロセスのセットを実行します。Sparkが実行されるクラスタマネージャーは アプリケーションを横断したスケジューリングを提供します。次に、各Sparkアプリケーションで、複数の"ジョブ"(Sparkアクション)が異なるスレッドによってサブミットされた場合は同時に実行されるかも知れません。アプリケーションがネットワーク越しにリクエストに応えている場合は、これは一般的です。Sparkは各Sparkコンテキスト内でリソースをスケジュールするために 公平なスケジューラを含んでいます。

アプリケーションを横断したスケジューリング

クラスタ上で動作する場合は、各Sparkアプリケーションはそのアプリケーションのためだけにタスクを実行しデータを保持するexecutor JVMの独立したセットを持ちます。複数のユーザがクラスタを共有する必要がある場合は、クラスタマネージャに依存する割り当てを管理する異なるオプションがあります。

全てのクラスタマネージャ上で利用可能なもっとも簡単なオプションは、リソースの静的な分割です。このやり方で、各アプリケーションは利用可能なリソースの最大量を指定され、その間それらを保持します。これはSparkのstandalone および YARN モード、そして coarse-grained Mesos モードで使用されるやり方です。リソースの割り当てはクラスタのタイプに基づいて以下のように設定することができます:

Mesosで利用可能な二つ目のオプションは、CPUコアの動的共有です。このモードでは、各Sparkアプリケーションは固定および独立したメモリ割り当て(spark.executor.memoryで設定される)を持ちますが、アプリケーションがマシーン上で実行していない場合はほかのアプリケーションがタスクをそれらのコア上で実行するかも知れません。このモードは別個のユーザからのシェルのセッションなど過剰ではない数のアプリケーションが見込まれる場合に効果的です。しかし、仕事がある場合にアプリケーションが一つのノード上のコアを取り戻すのに時間が掛かるかも知れないため、それには予測しがたい遅延のリスクが付いて回ります。このモードを使用するには、単にmesos:// URLを使用し、spark.mesos.coarseをfalseに設定します。

どのモードも現在のところアプリケーション間のメモリの共有を提供していないことに注意してください。このやりかたでデータを共有したい場合は、同じRDDをクエリすることで複数のリクエストを扱うことができる単独のサーバアプリケーションを実行することをお勧めします。

動的なリソースの割り当て

Sparkは負荷に応じてアプリケーションに占有されているリソースを動的に調整する仕組みを提供します。このことは、もしリソースがもう使われていなくて後で再び要求があった場合に、アプリケーションはクラスタにリソースを返すかもしれないことを意味します。この機能はSparkクラスタ内で複数のアプリケーションがリソースを共有している場合に特に有用です。

この機能はデフォルトで無効であり、全てのcoarse-grainedマネージャー上で利用可能です。つまりスタンドアローンモードYARN モードMesos coarse-grained モードおよびK8s モード

設定とセットアップ

この機能を使うには2つの方法があります。まず、アプリケーションでspark.dynamicAllocation.enabledspark.dynamicAllocation.shuffleTracking.enabledの両方をtrueに設定する必要があります。次に、同じクラスタ内の全てのワーカーノードでexternal shuffle serviceをセットアップした後、spark.dynamicAllocation.enabledspark.shuffle.service.enabledの両方をtrueに設定する必要があります。シャッフル追跡または外部シャッフルサービスの目的は、executorによって書き込まれたシャッフルファイルを削除せずにexecutorを削除できるようにすることです(詳細は、以下で説明します)。シャッフル追跡を有効にするのは簡単ですが、外部シャッフルサービスを設定する方法はクラスタマネージャによって異なります:

スタンドアローンモードでは、単純にspark.shuffle.service.enabledtrueに設定してワーカーを開始します。

Mesosのcoarse-grainedモードで、spark.shuffle.service.enabledtrueに設定された全てのワーカーノードで$SPARK_HOME/sbin/start-mesos-shuffle-service.shを実行します。例えば、Marathonを使ってそうするかも知れません。

YARNモードでは、この手順に従ってください。

他の全ての関係する設定は任意で、spark.dynamicAllocation.* および spark.shuffle.service.* の名前空間の下にあります。詳細は設定ページを見てください。

リソースの割り当てポリシー

高レベルでは、Sparkはexecutorがもう使用されなくなった場合にexecutorを放棄し、executorが必要となった時にexecutorを要求しなければなりません。削除されようとしているexecutorが近い将来にタスクを実行するかどうか、あるいは追加されようとしている新しいexecutorが実際には仕事をしないかも知れないかどうかを予測する決定的な方法は無いため、いつexecutorを削除あるいは要求するかを決定するために一連の実践的な方法を必要とします。

リクエストのポリシー

動的割り当てが有効なSparkアプリケーションはスケジュールされるのを待っている待機中のタスクがある場合に、追加のexecutorを要求します。この条件は、サブミットされたが完了していない全てのタスクを同時に没頭させるには既存のexecutorのセットが足りないことを必然的に暗示します。

Sparkはexecutorを概数で要求します。実際のリクエストはspark.dynamicAllocation.schedulerBacklogTimeout 秒の間待機中のタスクがある場合に引き起こされます。そして待機中のタスクのキューが存在していればspark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒ごとに再び引き起こされます。更に、各回ごとにリクエストされるexecutorの数は以前の回から指数関数的に増大します。例えば、アプリケーションが最初の回で1つのexecutorを追加し、その後の回では2,4,8等々のexecutorが追加されるでしょう。

指数関数的な増加のポリシーの理由は2つあります。まず、少しの追加のexecutorで十分だと分かる場合に備えて、アプリケーションは最初のうちは慎重にexecutorをリクエストしなければなりません。これはTCPの遅い開始のための正当化と呼応します。次に、多くのexecutorが実際に必要だと分かる場合に備えて、アプリケーションは状況に応じてリソースの使用を増やすことができなければなりません。

削除のポリシー

executorの削除のためのポリシーはもっと単純です。Sparkのアプリケーションはspark.dynamicAllocation.executorIdleTimeout 秒以上仕事をしていないexecutorがある場合に削除します。スケジュールされるべきまだ待機中のタスクがある場合にexecutorは何もしないでいるべきでは無いという点で、ほとんどの状況においてこの条件はリクエストの条件とはお互いに相容れないものであることに注意してください。

executorのgracefulな退役

動的割り当ての前に、関連付けられたアプリケーションも終了したときにSpark executorが終了した場合、executorに関連付けられたすべての状態は不要になり、安全に破棄できます。しかし、動的な割り当てを使う場合、executorが明示的に削除された場合、アプリケーションはまだ実行されています。アプリケーションがexecutorによって格納あるいは書き込まれた状態にアクセスしようとすると、アプリケーションは状態の再計算を行わなければならないでしょう。従って、Sparkは状態を削除する前にそれを維持することによってexecutorをgracefullyに退役させる仕組みが必要です。

この要求はシャッフルの場合に特に重要です。シャッフルの間、Spark executorはまず自身のmap出力をローカルのディスクに出力し、それから他のexecutorがそれらを取り出そうとした場合にそれらのファイルに対してサーバとして振る舞います。仲間よりもずっと長い時間走るタスクの場合、動的な割り当てがシャッフルが完了する前にexecutorを削除するかも知れません。その場合、executorによって書き込まれるシャッフルファイルは無駄に再計算されてしまいます。

シャッフルファイルを維持するための解決策は外部のシャッフルサービスを使用することで、Spark 1.2でも導入されています。このサービスはSparkアプリケーションおよびそれらのexecutorと関係なく各クラスタのノード上で実行する長く実行中のプロセスを参照します。サービスが有効であれば、Spark executorはお互いからの代わりにサービスからシャッフルファイルを取得します。この事はexecutorが書き込んだどのシャッフル状態もexecutorのライフタイムを超えて提供され続けるかもしれないことを意味します。

シャッフルファイルの書き込みに加えて、executorもディスクあるいはメモリのどちらかにデータをキャッシュします。しかし、executorが削除される場合に全てのキャッシュされたデータはもうアクセスできなくなるでしょう。 これを緩和するために、デフォルトでキャッシュされたデータを含むexecutorは削除されません。この挙動はspark.dynamicAllocation.cachedExecutorIdleTimeoutを使って設定することができます。将来のリリースでは、シャッフルファイルが外部のシャッフルサービスを使って保持されるやり方と気持ちの上では似た方法で、キャッシュされたデータがoff-heapストレージを使って保持されるかも知れません。

アプリケーション内のスケジューリング

指定されたSparkアプリケーション(SparkContextインスタンス)では、複数の並行ジョブが異なるスレッドからサブミットされた場合には並行して実行することができます。この章では、"ジョブ"という言葉で、Sparkアクション(例えば、save, collect) およびアクションを評価するために実行が必要な全てのタスクを意味します。Sparkのスケジューラは完全にスレッドセーフで、複数のリクエストを提供するアプリケーションを有効にするためにこのやり方をサポートします。

デフォルトでは、SparkのスケジューラはFIFO形式でジョブを実行します。各ジョブは"ステージ"(つまり、mapおよびreduceフェーズ)に分割され、起動するためのタスクをステージが持つ間、最初のジョブは利用可能な全てのリソースの優先権を持ちます。そして2つ目等々のジョブが優先権を持ちます。キューの先頭のジョブがクラスタ全体を使う必要が無い場合、後のジョブがすぐに実行を開始することができますが、キューの先頭のジョブが大きければ、後のジョブはかなり遅れるかもしれません。

Spark0.8から、ジョブ間の公平な共有も利用可能です。公平な共有の下では、Sparkはジョブ間で"ラウンドロビン"形式でタスクを割り当て、その結果全てのジョブはクラスタのリソースをほぼ等しく得ます。このことは、長いジョブが実行中にサブミットされた短いジョブはすぐにリソースを受け取って開始することができ、長いジョブが終了するまで待つことなく良い応答タイムを得ることができることを意味します。このモードは複数ユーザ設定のためには最善です。

公平なスケジューラを有効にするには、SparkContextを設定する時に単にspark.scheduler.mode プロパティをFAIRに設定します。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平なスケジューラのプール

公平なスケジューラはジョブのプールへのグルーピング、および各プールごとに異なるスケジューリングオプション(例えば weight)の設定もサポートしています。例えば、より重要なジョブのために "high-priority"プールを生成するのに便利でしょう。あるいは、各ユーザのジョブをグループ化し、ジョブが均等に共有するのではなく、ユーザがどれだけの同時実行ジョブを持つかに関係なく ユーザ が共有に共有のに便利でしょう。このやり方は Hadoopの公平なスケジューラの後でモデル化されます。

いかなる干渉も無しに新しくサブミットされたジョブはデフォルト プールに投入されますが、ジョブのプールはサブミットする時のスレッド内のSparkContextにspark.scheduler.pool "local property"を追加することで設定することができます。これは以下のようにして行われます:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

このlocalプロパティを設定した後で、このスレッドにサブミットされた全てのジョブ(このスレッド内で RDD.save, count, collect など)はこのプール名を使用するでしょう。これは、スレッドが同じユーザを代表して複数のジョブを実行させることを容易にするために、スレッド単位の設定です。関係するスレッドのプールを明確にしたい場合は、以下のように単に呼び出します:

sc.setLocalProperty("spark.scheduler.pool", null)

プールのデフォルトの挙動

デフォルトでは、各プールはクラスタの均等な共有(デフォルトプール内の各ジョブの均等な共有も)しますが、各プール内ではジョブはFIFO順に実行されます。例えば、ユーザごとに一つのプールを作成した場合、このことは各ユーザがクラスタの均等な共有を取得することを意味し、各ユーザのクエリは後のクエリがユーザの前のリソースを取らずに順番に実行されるだろうことを意味します。

プールのプロパティの設定

特定のプールのプロパティも設定ファイルを使って修正することができます。各プールは3つのプロパティをサポートします:

プールのプロパティはconf/fairscheduler.xml.templateに似たXMLファイルを作成し、クラスパス上にfairscheduler.xmlという名前をファイルを置くかSparkConfspark.scheduler.allocation.file プロパティを設定することで、設定することができます。ファイルパスはhadoop設定が尊重され、ローカルファイルパスまたはHDFSファイルパスのいずれかになります。

// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")

XMLファイルのフォーマットは単に各プールごとの<pool>エレメントであり、その中に様々な設定のための異なるエレメントがあります。例えば:


<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

完全な例はconf/fairscheduler.xml.templateの中でも利用可能です。XMLファイルに設定されていない全てのプールは全ての設定(スケジューリングモード FIFO、weight 1 および minShare 0)について単純にデフォルトの値を取る事に注意してください。

JDBC接続を使ったスケジューリング

JDBCクライアントセッションのためのFair Scheduler プールを設定するために、ユーザは spark.sql.thriftserver.scheduler.pool 変数を設定することができます。

SET spark.sql.thriftserver.scheduler.pool=accounting;

PySpark での同時実行ジョブ

デフォルトでは、PySpark は PVMスレッドと JVM スレッドの同期をサポートしません。複数の PVM スレッド内の複数のジョブの起動は、それぞれのジョブがそれぞれ対応する JVM スレッド内で起動されるとは限りません。この制限により、別の PVM スレッド内で sc.setJobGroup を介して別のジョブグループを設定することができません。これにより、後で sc.cancelJobGroup を介してジョブをキャンセルすることができなくなります。

pyspark.InheritableThreadは、PVMスレッドで一緒に使い、JVMスレッドのローカルプロパティなどの継承可能な属性を継承し、リソースリークを回避することをお勧めします。

TOP
inserted by FC2 system