Sparkスタンドアローンモード

MesosあるいはYARNクラスタマネージャー上での実行に加えて、Sparkは単純なスタンドアローン デプロイモードも提供します。マスターおよびワーカーを手動で開始するか、あるいは提供された起動スクリプトを使用するかのどちらかで、スタンドアローンクラスタを手動で起動することができます。テストのために一つのマシーン上でそれらのデーモンを起動することもできます。

セキュリティ

認証のようなセキュリティ機能はデフォルトでは有効にされていません。インターネットや信頼できないネットワークに開かれたクラスタを配備する場合、認証されていないアプリケーションをクラスタ上で実行することを防ぐためにクラスタへのアクセスを安全にすることが重要です。Sparkを実行する前にSpark セキュリティとこのドキュメントの特定のセキュリティの章を見てください。

Sparkスタンドアローンをクラスタにインストール

Sparkスタンドアローンモードをインストールするには、クラスタ上の各ノード上にコンパイルされたバージョンのSparkを単に配置します。各リリースのSparkのpre-buildバージョンを取得するか、あるいは自身でビルドします。

クラスタを手動で開始

以下を実行することでスタンドアローンマスターサーバを起動することができます:

./sbin/start-master.sh

一度開始すると、マスターは自身でspark://HOST:PORT URLを出力するでしょう。ワーカーがそれに接続するために使うか、SparkContextの"master"引数として渡すことができます。マスターweb UI上でこのURLを見つけることもできます。デフォルトではhttp://localhost:8080 です。

同じように、1つ以上のワーカーを開始し、それらを以下のようにしてマスターに接続することができます:

./sbin/start-worker.sh <master-spark-URL>

一度ワーカーを開始して、マスターのweb UIを見てください(デフォルトでは、http://localhost:8080)。CPUの数とメモリと共に新しいノードがリストされているのを見るでしょう。(OSのために1ギガバイトが残されます)。

最後に、以下の設定オプションをマスターおよびワーカーに渡すことができます:

引数意味
-h HOST, --host HOST listenするホスト名
-i HOST, --ip HOST listenするホスト名(非推奨です。-h あるいは --hostを使ってください)
-p PORT, --port PORT サービスがlistenするポート(デフォルト: マスターは 7077、ワーカーはランダム)
--webui-port PORT web UIのためのポート (デフォルト: マスターは8080、ワーカーは8081)
-c CORES, --cores CORES Sparkアプリケーションがマシーン上で使うことができる総CPUコア (デフォルト: 利用可能な全て); ワーカー上のみ
-m MEM, --memory MEM Sparkアプリケーションがマシーン上で利用可能な総メモリ量。1000Mあるいは2Gのようなフォーマット (デフォルト: マシーンの総RAM から1GiB引いたもの); ワーカー上のみ
-d DIR, --work-dir DIR 作業場所およびジョブのログ出力に使われるディレクトリ (デフォルト: SPARK_HOME/work); ワーカー上のみ
--properties-file FILE 独自のSparkプロパティファイルをロードするパス (デフォルト: conf/spark-defaults.conf)

クラスタ起動スクリプト

起動スクリプトを使ってSparkスタンドアローンクラスタを起動するには、Sparkディレクトリに conf/workers と呼ばれるファイルを生成する必要があります。これにはSpark workerを開始しようとする全てのマシーンのホスト名が1行ごとに1つずつ含まれていなければなりません。conf/workersが無い場合は、起動スクリプトは初期値の1つのマシーン(localhost)に設定されます。これはテストに便利です。マスターマシーンはsshを使って各ワーカーにアクセスすることに注意してください。デフォルトでは、sshは並行して実行され、設定のために(プライベートキーを使って)パスワード無しを必要とします。パスワード無しのセットアップをしていない場合、環境変数 SPARK_SSH_FOREGROUND を設定し、各ワーカーに連続してパスワードを与えることができます。

一旦このファイルをセットアップすると、以下のシェルスクリプトを使ってクラスタを起動あるいは停止することができます。Hadoopのデプロイスクリプトに基づいており、SPARK_HOME/sbinで利用可能です:

これらのスクリプトは、ローカルマシーンではなく、あなたがSparkマスターを実行したいマシーン上で実行されなければならない事に注意してください。

conf/spark-env.sh内で環境変数を設定することで、更にクラスタを任意に設定することができます。conf/spark-env.sh.templateからこのファイルを生成し、設定を有効にするためにそれを全てのワーカーマシーンにコピーします。以下の設定が利用可能です:

環境変数意味
SPARK_MASTER_HOST マスターを特定のホスト名あるいはIPアドレス、例えば公開されているものにバインドします。
SPARK_MASTER_PORT 異なるポート(デフォルト: 7070)でマスターを開始します。
SPARK_MASTER_WEBUI_PORT マスターweb UIのポート (デフォルト: 8080)。
SPARK_MASTER_OPTS "-Dx=y"の形式でマスターにのみ適用される設定プロパティ (デフォルト: 無し)可能なオプションのリストは以下を見てください。
SPARK_LOCAL_DIRS Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。
SPARK_WORKER_CORES Spark アプリケーションがマシーン上で使用する総コア数 (デフォルト: 利用可能な全てのコア)。
SPARK_WORKER_MEMORY Sparkアプリケーションがマシーン上で使用できる総メモリ量。例えば 1000m, 2g (デフォルト: 総メモリから1GiBを引いたもの); 各アプリケーションの 個々の メモリはspark.executor.memory プロパティを使って設定されることに注意してください。
SPARK_WORKER_PORT 特定のポートでSparkワーカーを開始する(デフォルト: ランダム)。
SPARK_WORKER_WEBUI_PORT ワーカーweb UIのポート (デフォルト: 8081)。
SPARK_WORKER_DIR 中でアプリケーションを実行するディレクトリで、ログおよび作業スペースの両方を含むでしょう(デフォルト: SPARK_HOME/work)。
SPARK_WORKER_OPTS "-Dx=y"の形式でワーカーにのみ適用される設定プロパティ (デフォルト: 無し)可能なオプションのリストは以下を見てください。
SPARK_DAEMON_MEMORY Spark マスターとワーカーデーモン自身に割り当てるメモリ(デフォルト: 1g)。
SPARK_DAEMON_JAVA_OPTS Sparkマスターとワーカーデーモン自身の"-Dx=y"形式のJVMオプション (デフォルト: none)。
SPARK_DAEMON_CLASSPATH Sparkマスターとワーカーのデーモン自身のためのクラスパス (デフォルト: none)。
SPARK_PUBLIC_DNS Sparkマスターおよびワーカーの公開DNS名 (デフォルト: none)。

注意: 起動スクリプトは現在のところWindowsをサポートしません。SparkクラスタをWindows上で動作するには、手動でマスターおよびワーカーを開始してください。

SPARK_MASTER_OPTS は以下のシステムプロパティをサポートします:

プロパティ名デフォルト意味これ以降のバージョンから
spark.deploy.retainedApplications 200 表示するための完了したアプリケーションの最大数。古いアプリケーションはこの制限を維持するためにUIから落とされるでしょう。
0.8.0
spark.deploy.retainedDrivers 200 表示するための完了したドライバーの最大数。古いドライバーはこの制限を維持するためにUIから落とされるでしょう。
1.1.0
spark.deploy.spreadOut true スタンドアローンクラスタマネージャーがノードを渡ってアプリケーションを拡散しなければならないか、あるいはできるだけ少ない数のノード上で固めるかどうか。HDFS内のデータのローカリティのために拡散することは通常良いことですが、合併は計算の集中の負担がより効率的です。
0.6.1
spark.deploy.defaultCores (infinite) spark.cores.maxを設定しない場合は、スパークスタンドアローンモードでアプリケーションに渡すデフォルトのコアの数。設定しない場合は、spark.cores.maxを設定しない限りはアプリケーションは常に利用可能な全てのコアを取得します。共有クラスタ上ではユーザがデフォルトでクラスタ全体を占拠することを防ぐためにこれを少なく設定します。
0.9.0
spark.deploy.maxExecutorRetries 10 スタンドアローンのクラスタマネージャーが障害のあるアプリケーションを削除する前に、連続するexecutorの障害が起こり得る最大の数の制限。実行中のexecutorがある場合はアプリケーションは削除されないでしょう。アプリケーションが連続してspark.deploy.maxExecutorRetries より多くの障害を経験し、executorがそれらの障害の間で実行開始できず、アプリケーションが実行中のexecutorを持たない場合は、スタンドアローン クラスタのマネージャーはアプリケーションを削除し、それを失敗したとマークするでしょう。この自動削除を無効にするには、spark.deploy.maxExecutorRetries-1に設定します。
1.6.3
spark.worker.timeout 60 スタンドアローンデプロイマスターがハートビートを受け取らない場合にワーカーを失ったと見なすまでの秒数。 0.6.2
spark.worker.resource.{resourceName}.amount (none) ワーカーで使用する特定のリソース型の量。 3.0.0
spark.worker.resource.{resourceName}.discoveryScript (none) リソース検出スクリプトへのパス。これはワーカーの起動時に特定のリソースを見つけるために使われます。スクリプトの出力はResourceInformation クラスのように整形される必要があります。 3.0.0
spark.worker.resourcesFile (none) ワーカーの起動時に様々なリソースを見つけるために使われる、リソースファイルへのパス。リソースファイルの内容は、 [{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}] のように整形される必要があります。特定のリソースがリソースファイルで見つからない場合は、検出スクリプトはそのリソースを見つけるために使われます。検出スクリプトもリソースを見つけられない場合は、ワーカーは起動に失敗します。 3.0.0

SPARK_WORKER_OPTS は以下のシステムプロパティをサポートします:

プロパティ名デフォルト意味これ以降のバージョンから
spark.worker.cleanup.enabled false 定期的な ワーカー/アプリケーション ディレクトリの掃除を有効にする。YARNは異なる動作をするため、これはスタンドアローンモードにのみ影響することに注意してください。停止したアプリケーションのディレクトリのみが掃除されます。もし spark.shuffle.service.db.enabled が"true"であれば、これは有効でなければなりません。 1.0.0
spark.worker.cleanup.interval 1800 (30 分) ワーカーがローカルマシーン上の古いアプリケーションディレクトリを掃除する間隔を秒数で制御します。 1.0.0
spark.worker.cleanup.appDataTtl 604800 (7 days, 7 * 24 * 3600) 各ワーカー上でアプリケーションのワークディレクトリを維持する秒数。これは生存時間で、利用可能なディスクの量に依存します。アプリケーションログおよびjarは各アプリケーションワークディレクトリにダウンロードされます。ジョブを頻繁に実行する場合は特に、時間が経つにつれてワークディレクトリは急速にディスクスペースを埋めます。 1.0.0
spark.shuffle.service.db.enabled true 外主シャッフルサービスが再起動した時に現在の executor に自動的に再読み込みされるように、外部シャッフルサービスの状態をローカルディスクに格納します。これはスタンドアローンモードのみに提供します(yarnは常にこの挙動を有効にします)。最終的に状態がクリーンアップされるように、spark.worker.cleanup.enabled も有効にする必要があります。この設定は将来削除される可能性があります。 3.0.0
spark.storage.cleanupFilesAfterExecutorExit true Enable cleanup non-shuffle files(such as temp. shuffle blocks, cached RDD/broadcast blocks, spill files, etc) of worker directories following executor exits. `spark.worker.cleanup.enabled` は停止とタイムアウトしたアプリケーションの全てのファイル/サブディレクトリのクリーンアップを有効にするが、これは死亡したexecutorのローカルディレクトリ内の非シャッフルファイルのクリーンアップを有効にするため、これは`spark.worker.cleanup.enabled`を書き換えないことに注意してください。これはスタンドアローン モードのみに影響し、他のクラスタマネージャーのサポートは将来追加されるかもしれません。 2.4.0
spark.worker.ui.compressedLogFileLengthCacheSize 100 圧縮されたログファイルについては、ファイルを解凍することで非圧縮のファイルだけが計算されます。Spark は圧縮されたログファイルの非圧縮のファイルサイズをキャッシュします。このプロパティはキャッシュサイズを制御します。 2.0.2

リソース割り当てと設定の概要

設定ページのカスタムリソーススケジューリングと設定概要セクションを必ず読んでください。このセクションでは、リソーススケジュールの Spark スタンドアローン固有の側面についてのみ説明します。

Spark スタンドアローンには2つの部分があります。1つはワーカーのリソースの設定、2つ目は特定のアプリケーションのリソースの割り当てです。

ユーザは、一連のリソースを利用できるようにワーカーを構成して、それらを executor に割り当てることができるようにする必要があります。spark.worker.resource.{resourceName}.amount はワーカーが割り当てた各リソースの量を制御するために使われます。ユーザは spark.worker.resourcesFile または spark.worker.resource.{resourceName}.discoveryScript のどちらかを指定して、ワーカーが割り当てられたリソースを検出する方法を指定する必要があります。それぞれの設定について上記の説明を参照して、設定に最適な方法を確認してください。

2つ目の部分は、Spark スタンドアローンでアプリケーションを実行することです。標準の Spark リソース設定の唯一の特殊なケースは、クライアントモードでドライバを実行している場合です。クライアントモードのドライバの場合、ユーザは spark.driver.resourcesFile または spark.driver.resource.{resourceName}.discoveryScript を使って使用するリソースを指定することができます。ドライバが他のドライバと同じホストで実行されている場合、リソースファイルあるいは検出スクリプトが、同じノードで実行している他のドライバと競合しないリソースのみを返すようにしてください。

ワーカーは割り当てられたリソースを使って各 executor を開始するため、ユーザはアプリケーションを送信する時に検出スクリプトを指定する必要が無いことに注意してください。

アプリケーションをクラスタに接続

Sparkクラスタ上でアプリケーションを実行するには、SparkContext コンストラクタにするように、単純にマスターのspark://IP:PORT URL を渡します。

クラスタに対して対話的なSparkシェルを実行するには以下のコマンドを実行します:

./bin/spark-shell --master spark://IP:PORT

sparkシェルがクラスタ内で使用するコアの数を制御するために、オプション--total-executor-cores <numCores> を渡すこともできます。

Client Properties

Sparkアプリケーションは、スタンドアローンモードに固有の以下の設定プロパティをサポートします:

プロパティ名デフォルト値意味これ以降のバージョンから
spark.standalone.submit.waitAppCompletion false スタンドアローンクラスタモードでは、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。trueに設定した場合は、クライアントプロセスはドライバの状態をポーリングし続けるでしょう。そうでなければ、クライアントプロセスはサブミットのあとで終了するでしょう。 3.1.0

Sparkアプリケーションの起動

spark-submit スクリプト はコンパイルされたSparkアプリケーションをクラスタにサブミットする最も率直な方法です。スタンドアローンクラスタに関しては、Sparkは現在のところ二つのデプロイモードをサポートします:クライアント モードでは、ドライバは同じプロセス内でクライアントがアプリケーションをサブミットしたかのように起動します。クラスタ モードでは、ドライバはクラスタ内のワーカープロセスの一つから起動されますが、クライアントプロセスはアプリケーションが終了するのを待たずにアプリケーションをサブミットする責任を果たすとすぐに終了します。

アプリケーションがSparkサブミットを使って起動されると、アプリケーションのjarは自動的に全てのワーカーノードに分配されます。アプリケーションが依存するどのような追加のjarについても、カンマを区切り文字として使って --jars フラグを使ってそれらを指定しなければなりません。アプリケーションの設定あるいは実行の環境を制御するには、Spark 設定を見てください。

更に、スタンドアローンクラスタモードは、アプリケーションが0ではない終了コードで終了した場合に、自動でのアプリケーションの再起動をサポートします。この機能を使うには、アプリケーションを起動する時にspark-submit--supervise フラグを渡します。次に、繰り返し失敗するアプリケーションをkillしたい場合は、以下のようにして行うことができます:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

http://<master url>:8080のスタンドアローンマスター web UIを使ってドライバIDを見つける事ができます。

リソースのスケジューリング

スタンドアローンクラスターモードは現在のところアプリケーションを跨ぐ単純なFIFOスケジューラのみをサポートします。しかし、複数の同時実行ユーザを許可するために、各アプリケーションが使うであろうリソースの最大数を制御することができます。デフォルトでは、クラスタ内の全ての コアを取得しますが、これは同時に一つのアプリケーションだけを実行している場合にのみ意味があります。SparkConf内でspark.cores.max を設定することで、コアの数に上限を付けることができます。例えば:

val conf = new SparkConf()
  .setMaster(...)
  .setAppName(...)
  .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

spark.cores.maxをinfinite未満の何かに設定していないアプリケーションのためのデフォルトを変更するために、更にクラスタマスタープロセス上のspark.deploy.defaultCoresを設定することができます。以下をconf/spark-env.shに追加することでこれをしてください:

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

これはユーザが個々にコアの最大数を設定していないかもしれない共有クラスタで有用かも知れません。

Executorのスケジューリング

各executorへ割り当てられるコアの数は設定可能です。spark.executor.cores が明示的に設定された場合、もしワーカーが十分なコアとメモリを持つ場合は、同じアプリケーションからの複数のexecutorが同じワーカー上で起動されるかもしれません。そうでなければ、各executorはデフォルトでワーカー上で利用可能な全てのコアを横取りします。その場合1つのスケジュールの繰り返しの間にアプリケーション毎に1つのexecutorが各ワーカー乗で起動されるかも知れません。

監視および記録

Sparkのスタンドアローンモードはクラスタを監視するためにwebベースのユーザインタフェースを提供します。マスターと各ワーカーは、クラスターとジョブの統計を表示する独自のwebUIを持ちます。デフォルトでは、マスターのためのwebUIにポート8080でアクセスすることができます。ポートは、設定ファイルあるいはコマンドラインオプションのどちらかを使って変更することができます。

更に、各ジョブの詳細なログ出力も各workerノードの作業デイlレクトリ(デフォルトではSPARK_HOME/work)に書き込まれます。各ジョブごとに2つのファイル、stdoutstderr、があるでしょう。全ての出力はコンソールに書き込まれます。

Hadoopと一緒に実行

同じマシーン上で別個のサービスとして起動するだけで、Sparkと一緒に既存のHaddopクラスタを実行することができます。SparkからHadoopデータにアクセスするためには、単に hdfs:// URL (一般的には hdfs://<namenode>:9000/path、正しいURLはHadoopネームノードのweb UIで見つけることができます)を使ってください。別のやり方として、Sparkのための個別のクラスタをセットアップし、ネットワーク越しにHDFSにまだアクセスできるようにすることができます; これはディスクのローカルアクセスより遅いですが、もし同じローカルエリアネットワーク内でまだ実行している場合は重要ではありません (例えば、Hadoopがある各ラック上に2、3のSparkマシーンを配置する)。

ネットワークセキュリティのためのポートの設定

一般的に言って、Spark クラスタとそのサービスは公的なインターネットに配備されません。それらは一般的にプライベートなサービスで、Sparkを配備する組織のネットワーク内でのみアクセス可能な筈です。Sparkサービスによって使われるホストとポートへのアクセスはサービスへアクセスする必要があるオリジンのホストに制限されるべきです。

スタンドアローン リソースマネージャーを使うクラスタは他のリソースマネージャーがする方法でfine-grainedアクセス制御をサポートしないため、これは特にそれにとって重要です。

設定するポートの完全なリストは、 セキュリティのページを見てください。

高可用性

デフォルトでは、スタンドアローンスケジューリングクラスタはワーカーの障害に対して回復力に富んでいます(Spark自体の範囲においてワークの喪失に対して他のワーカーにそれを移動することで回復力に富んでいます)。しかし、スケジューラーはスケジュールの決定をするためにマスターを使用します。これにより(デフォルト)単一障害点ができます: もしマスターがクラッシュすると新しいアプリケーションを生成することができません。これを回避するために、2つの高可用性の仕組みがあります。以下に詳細を示します。

ZooKeeperを使ったスタンバイマスター

概要

リーダーの選出と幾つかの状態のストレージを提供するためにZooKeeperを使用すると、同じZooKeeperインスタンスに接続しているクラスタ内に複数のマスターを起動することができます。一つが"リーダー"として選出され、他はスタンドバイモードのままでいるでしょう。現在のリーダーが死亡すると、他のマスターが選出され、古いマスターの状態を回復してスケジューリングを再開します。回復の全プロセス(最初のリーダーがダウンしてから)には、1,2分かかるでしょう。この遅延は新しい アプリケーションのスケジューリングにのみ影響することに注意してください - マスター障害の間に既に実行中のアプリケーションは影響を受けません。

ZooKeeperを使った開始についてもっと知るにはここを見てください。

設定

この回復モードを有効にするために、spark.deploy.recoveryMode と関係する spark.deploy.zookeeper.* 設定を設定することで、SPARK_DAEMON_JAVA_OPTS を設定することができます。これらの設定についての詳しい情報は設定ドキュメントを参照してください。

Possible gotcha: クラスタ内に複数のマスターがあるが、ZooKeeperを使うように正しくマスターを設定できていない場合、マスターはお互いを見つける事ができずにそれぞれが全てリーダーだと考えるかもしれません。これは結果として(全てのマスターが独立してスケジュールすることになるため)healthyクラスタ状態ではなくなるでしょう

詳細

ZooKeeperクラスターをセットアップした後は、高可用性を有効にすることは簡単です。異なるノード上の同じZooKeeper設定(ZooKeeper URL およびディレクトリ)を持つ複数のマスタープロセスを単純に開始します。マスターはいつでも追加および削除することができます。

新しいアプリケーションのスケジュールあるいはクラスタにワーカーを追加するためには、それらがクラスタのリーダーのIPアドレスを知る必要があります。一つの時に渡していたマスターのリストに単純に渡すことで行うことができます。例えば、spark://host1:port1,host2:port2を指しているSparkContextを開始しても構いません。これはSparkContextが両方のマスターを登録するようにさせるでしょう - もし host1がダウンすると、新しいリーダーhost2を見つけるのでこの設定はまだ正しいでしょう。

"マスターの登録"と通常の操作の間で重要な違いがあります。開始時にはアプリケーションあるいはワーカーは現在のリーダーマスターを見つけて登録できる必要があります。とはいえ、一旦登録に成功すると、それは"システム内"になります (つまり、ZooKeeperに格納されます)。障害が発生すると、新しいリーダーはリーダーシップの変更を全ての以前に登録されたアプリケーションとワーカーに知らせるために接続します。そのためそれらは起動時に新しいマスターの存在を知る必要さえありません。

この特性のため、新しいマスターはいつでも生成することができ、それがリーダーになった場合に新しいアプリケーションとワーカーがそれを登録するために見つけることができるかを心配するだけで良いです。一旦登録されると、ZooKeeperが面倒を見ます。

ローカルファイルシステムを使ったシングルモードリカバリ

概要

ZooKeeperはプロダクションレベルの高可用性を追求するための最も良い方法ですが、もしマスターがダウンしたら単に再起動できるようにしたいと思う場合は、FILESYSTEMモードがそれを引き受けます。アプリケーションおよびワーカーを登録する時に、それらはマスタープロセスが再起動する場合に回復できるように提供されたディレクトリに十分な状態を書き込みます。

設定

この回復モードを有効にするために、以下の設定を使ってspark-envの中でSPARK_DAEMON_JAVA_OPTSを設定することができます。

システム プロパティ意味これ以降のバージョンから
spark.deploy.recoveryMode シングルノードリカバリーモードをFILESYSTEMに設定します (デフォルト: NONE)。 0.8.1
spark.deploy.recoveryDirectory Sparkがリカバリー状態を格納する、マスターからアクセス可能なディレクトリ。 0.8.1

詳細

TOP
inserted by FC2 system