Sparkスタンドアローンモード
- Sparkスタンドアローンをクラスタにインストール
- クラスタを手動で開始
- クラスタ起動スクリプト
- アプリケーションをクラスタに接続
- Sparkアプリケーションの起動
- リソースのスケジューリング
- 監視および記録
- Hadoopと一緒に実行
- ネットワークセキュリティのためのポートの設定
- 高可用性
MesosあるいはYARNクラスタマネージャー上での実行に加えて、Sparkは単純なスタンドアローン デプロイモードも提供します。マスターおよびワーカーを手動で開始するか、あるいは提供された起動スクリプトを使用するかのどちらかで、スタンドアローンクラスタを手動で起動することができます。テストのために一つのマシーン上でそれらのデーモンを起動することもできます。
Sparkスタンドアローンをクラスタにインストール
Sparkスタンドアローンモードをインストールするには、クラスタ上の各ノード上にコンパイルされたバージョンのSparkを単に配置します。各リリースのSparkのpre-buildバージョンを取得するか、あるいは自身でビルドします。
クラスタを手動で開始
以下を実行することでスタンドアローンマスターサーバを起動することができます:
./sbin/start-master.sh
一度開始すると、マスターは自身でspark://HOST:PORT
URLを出力するでしょう。ワーカーがそれに接続するために使うか、SparkContext
の"master"引数として渡すことができます。マスターweb UI上でこのURLを見つけることもできます。デフォルトではhttp://localhost:8080 です。
同じように、1つ以上のワーカーを開始し、それらを以下のようにしてマスターに接続することができます:
./sbin/start-slave.sh <master-spark-URL>
一度ワーカーを開始して、マスターのweb UIを見てください(デフォルトでは、http://localhost:8080)。CPUの数とメモリと共に新しいノードがリストされているのを見るでしょう。(OSのために1ギガバイトが残されます)。
最後に、以下の設定オプションをマスターおよびワーカーに渡すことができます:
引数 | 意味 |
---|---|
-h HOST , --host HOST |
listenするホスト名 |
-i HOST , --ip HOST |
listenするホスト名(非推奨です。-h あるいは --hostを使ってください) |
-p PORT , --port PORT |
サービスがlistenするポート(デフォルト: マスターは 7077、ワーカーはランダム) |
--webui-port PORT |
web UIのためのポート (デフォルト: マスターは8080、ワーカーは8081) |
-c CORES , --cores CORES |
Sparkアプリケーションがマシーン上で使うことができる総CPUコア (デフォルト: 利用可能な全て); ワーカー上のみ |
-m MEM , --memory MEM |
Sparkアプリケーションがマシーン上で利用可能な総メモリ量。1000Mあるいは2Gのようなフォーマット (デフォルト: マシーンの総RAM から1G引いたもの); ワーカー上のみ |
-d DIR , --work-dir DIR |
作業場所およびジョブのログ出力に使われるディレクトリ (デフォルト: SPARK_HOME/work); ワーカー上のみ |
--properties-file FILE |
独自のSparkプロパティファイルをロードするパス (デフォルト: conf/spark-defaults.conf) |
クラスタ起動スクリプト
起動スクリプトを使ってSparkスタンドアローンクラスタを起動するには、Sparkディレクトリに conf/slaves と呼ばれるファイルを生成する必要があります。これにはSpark workerを開始しようとする全てのマシーンのホスト名が1行ごとに1つずつ含まれていなければなりません。conf/slavesが無い場合は、起動スクリプトは初期値の1つのマシーン(localhost)に設定されます。これはテストに便利です。マスターマシーンはsshを使って各ワーカーにアクセスすることに注意してください。デフォルトでは、sshは並行して実行され、設定のために(プライベートキーを使って)パスワード無しを必要とします。パスワード無しのセットアップをしていない場合、環境変数 SPARK_SSH_FOREGROUND を設定し、各ワーカーに連続してパスワードを与えることができます。
一旦このファイルをセットアップすると、以下のシェルスクリプトを使ってクラスタを起動あるいは停止することができます。Hadoopのデプロイスクリプトに基づいており、SPARK_HOME/sbin
で利用可能です:
sbin/start-master.sh
- スクリプトが実行されるマシーン上のマスターインスタンスを開始する。sbin/start-slaves.sh
-conf/slaves
ファイルで指定された各マシーン上でスレーブインスタンスを開始する。sbin/start-slave.sh
- スクリプトが実行されたマシーン上でスレーブインスタンスを開始する。sbin/start-all.sh
- 上で説明されたマスターと多数のスレーブの両方を開始する。sbin/stop-master.sh
-sbin/start-master.sh
スクリプトで開始されたマスターを停止する。sbin/stop-slaves.sh
-conf/slaves
ファイルで指定されたマシーン上の全てのスレーブインスタンスを停止する。sbin/stop-all.sh
- 上で説明されたマスターとスレーブの両方を停止する。
これらのスクリプトは、ローカルマシーンではなく、あなたがSparkマスターを実行したいマシーン上で実行されなければならない事に注意してください。
conf/spark-env.sh
内で環境変数を設定することで、更にクラスタを任意に設定することができます。conf/spark-env.sh.template
からこのファイルを生成し、設定を有効にするためにそれを全てのワーカーマシーンにコピーします。以下の設定が利用可能です:
環境変数 | 意味 |
---|---|
SPARK_MASTER_HOST |
マスターを特定のホスト名あるいはIPアドレス、例えば公開されているものにバインドします。 |
SPARK_MASTER_PORT |
異なるポート(デフォルト: 7070)でマスターを開始します。 |
SPARK_MASTER_WEBUI_PORT |
マスターweb UIのポート (デフォルト: 8080)。 |
SPARK_MASTER_OPTS |
"-Dx=y"の形式でマスターにのみ適用される設定プロパティ (デフォルト: 無し)可能なオプションのリストは以下を見てください。 |
SPARK_LOCAL_DIRS |
Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。 |
SPARK_WORKER_CORES |
Spark アプリケーションがマシーン上で使用する総コア数 (デフォルト: 利用可能な全てのコア)。 |
SPARK_WORKER_MEMORY |
Sparkアプリケーションがマシーン上で使用できる総メモリ量。例えば 1000m , 2g (デフォルト: 総メモリから1GBを引いたもの); 各アプリケーションの 個々の メモリはspark.executor.memory プロパティを使って設定されることに注意してください。 |
SPARK_WORKER_PORT |
特定のポートでSparkワーカーを開始する(デフォルト: ランダム)。 |
SPARK_WORKER_WEBUI_PORT |
ワーカーweb UIのポート (デフォルト: 8081)。 |
SPARK_WORKER_DIR |
中でアプリケーションを実行するディレクトリで、ログおよび作業スペースの両方を含むでしょう(デフォルト: SPARK_HOME/work)。 |
SPARK_WORKER_OPTS |
"-Dx=y"の形式でワーカーにのみ適用される設定プロパティ (デフォルト: 無し)可能なオプションのリストは以下を見てください。 |
SPARK_DAEMON_MEMORY |
Spark マスターとワーカーデーモン自身に割り当てるメモリ(デフォルト: 1g)。 |
SPARK_DAEMON_JAVA_OPTS |
Sparkマスターとワーカーデーモン自身の"-Dx=y"形式のJVMオプション (デフォルト: none)。 |
SPARK_PUBLIC_DNS |
Sparkマスターおよびワーカーの公開DNS名 (デフォルト: none)。 |
注意: 起動スクリプトは現在のところWindowsをサポートしません。SparkクラスタをWindows上で動作するには、手動でマスターおよびワーカーを開始してください。
SPARK_MASTER_OPTS は以下のシステムプロパティをサポートします:
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.deploy.retainedApplications |
200 |
表示するための完了したアプリケーションの最大数。古いアプリケーションはこの制限を維持するためにUIから落とされるでしょう。 |
spark.deploy.retainedDrivers |
200 |
表示するための完了したドライバーの最大数。古いドライバーはこの制限を維持するためにUIから落とされるでしょう。 |
spark.deploy.spreadOut |
true |
スタンドアローンクラスタマネージャーがノードを渡ってアプリケーションを拡散しなければならないか、あるいはできるだけ少ない数のノード上で固めるかどうか。HDFS内のデータのローカリティのために拡散することは通常良いことですが、合併は計算の集中の負担がより効率的です。 |
spark.deploy.defaultCores |
(infinite) |
spark.cores.max を設定しない場合は、スパークスタンドアローンモードでアプリケーションに渡すデフォルトのコアの数。設定しない場合は、spark.cores.max を設定しない限りはアプリケーションは常に利用可能な全てのコアを取得します。共有クラスタ上ではユーザがデフォルトでクラスタ全体を占拠することを防ぐためにこれを少なく設定します。 |
spark.deploy.maxExecutorRetries |
10 |
スタンドアローンのクラスタマネージャーが障害のあるアプリケーションを削除する前に、連続するexecutorの障害が起こり得る最大の数の制限。実行中のexecutorがある場合はアプリケーションは削除されないでしょう。アプリケーションが連続してspark.deploy.maxExecutorRetries より多くの障害を経験し、executorがそれらの障害の間で実行開始できず、アプリケーションが実行中のexecutorを持たない場合は、スタンドアローン クラスタのマネージャーはアプリケーションを削除し、それを失敗したとマークするでしょう。この自動削除を無効にするには、spark.deploy.maxExecutorRetries を -1 に設定します。
|
spark.worker.timeout |
60 | スタンドアローンデプロイマスターがハートビートを受け取らない場合にワーカーを失ったと見なすまでの秒数。 |
SPARK_WORKER_OPTS は以下のシステムプロパティをサポートします:
プロパティ名 | デフォルト | 意味 |
---|---|---|
spark.worker.cleanup.enabled |
false | 定期的な ワーカー/アプリケーション ディレクトリの掃除を有効にする。YARNは異なる動作をするため、これはスタンドアローンモードにのみ影響することに注意してください。停止したアプリケーションのディレクトリのみが掃除されます。 |
spark.worker.cleanup.interval |
1800 (30 分) | ワーカーがローカルマシーン上の古いアプリケーションディレクトリを掃除する間隔を秒数で制御します。 |
spark.worker.cleanup.appDataTtl |
604800 (7 days, 7 * 24 * 3600) | 各ワーカー上でアプリケーションのワークディレクトリを維持する秒数。これは生存時間で、利用可能なディスクの量に依存します。アプリケーションログおよびjarは各アプリケーションワークディレクトリにダウンロードされます。ジョブを頻繁に実行する場合は特に、時間が経つにつれてワークディレクトリは急速にディスクスペースを埋めます。 |
spark.worker.ui.compressedLogFileLengthCacheSize |
100 | 圧縮されたログファイルについては、ファイルを解凍することで非圧縮のファイルだけが計算されます。Spark は圧縮されたログファイルの非圧縮のファイルサイズをキャッシュします。このプロパティはキャッシュサイズを制御します。 |
アプリケーションをクラスタに接続
Sparkクラスタ上でアプリケーションを実行するには、SparkContext
コンストラクタにするように、単純にマスターのspark://IP:PORT
URL を渡します。
クラスタに対して対話的なSparkシェルを実行するには以下のコマンドを実行します:
./bin/spark-shell --master spark://IP:PORT
sparkシェルがクラスタ内で使用するコアの数を制御するために、オプション--total-executor-cores <numCores>
を渡すこともできます。
Sparkアプリケーションの起動
spark-submit
スクリプト はコンパイルされたSparkアプリケーションをクラスタにサブミットする最も率直な方法です。スタンドアローンクラスタに関しては、Sparkは現在のところ二つのデプロイモードをサポートします:クライアント
モードでは、ドライバは同じプロセス内でクライアントがアプリケーションをサブミットしたかのように起動します。クラスタ
モードでは、ドライバはクラスタ内のワーカープロセスの一つから起動されますが、クライアントプロセスはアプリケーションが終了するのを待たずにアプリケーションをサブミットする責任を果たすとすぐに終了します。
アプリケーションがSparkサブミットを使って起動されると、アプリケーションのjarは自動的に全てのワーカーノードに分配されます。アプリケーションが依存するどのような追加のjarについても、カンマを区切り文字として使って --jars
フラグを使ってそれらを指定しなければなりません。アプリケーションの設定あるいは実行の環境を制御するには、Spark 設定を見てください。
更に、スタンドアローンクラスタ
モードは、アプリケーションが0ではない終了コードで終了した場合に、自動でのアプリケーションの再起動をサポートします。この機能を使うには、アプリケーションを起動する時にspark-submit
に--supervise
フラグを渡します。次に、繰り返し失敗するアプリケーションをkillしたい場合は、以下のようにして行うことができます:
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
http://<master url>:8080
のスタンドアローンマスター web UIを使ってドライバIDを見つける事ができます。
リソースのスケジューリング
スタンドアローンクラスターモードは現在のところアプリケーションを跨ぐ単純なFIFOスケジューラのみをサポートします。しかし、複数の同時実行ユーザを許可するために、各アプリケーションが使うであろうリソースの最大数を制御することができます。デフォルトでは、クラスタ内の全ての コアを取得しますが、これは同時に一つのアプリケーションだけを実行している場合にのみ意味があります。SparkConf内でspark.cores.max
を設定することで、コアの数に上限を付けることができます。例えば:
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
spark.cores.max
をinfinite未満の何かに設定していないアプリケーションのためのデフォルトを変更するために、更にクラスタマスタープロセス上のspark.deploy.defaultCores
を設定することができます。以下をconf/spark-env.sh
に追加することでこれをしてください:
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
これはユーザが個々にコアの最大数を設定していないかもしれない共有クラスタで有用かも知れません。
監視および記録
Sparkのスタンドアローンモードはクラスタを監視するためにwebベースのユーザインタフェースを提供します。マスターと各ワーカーは、クラスターとジョブの統計を表示する独自のwebUIを持ちます。デフォルトでは、マスターのためのwebUIにポート8080でアクセスすることができます。ポートは、設定ファイルあるいはコマンドラインオプションのどちらかを使って変更することができます。
更に、各ジョブの詳細なログ出力も各スレーブノードの作業デイlレクトリ(デフォルトではSPARK_HOME/work
)に書き込まれます。各ジョブごとに2つのファイル、stdout
と stderr
、があるでしょう。全ての出力はコンソールに書き込まれます。
Hadoopと一緒に実行
同じマシーン上で別個のサービスとして起動するだけで、Sparkと一緒に既存のHaddopクラスタを実行することができます。SparkからHadoopデータにアクセスするためには、単に hdfs:// URL (一般的には hdfs://<namenode>:9000/path
、正しいURLはHadoopネームノードのweb UIで見つけることができます)を使ってください。別のやり方として、Sparkのための個別のクラスタをセットアップし、ネットワーク越しにHDFSにまだアクセスできるようにすることができます; これはディスクのローカルアクセスより遅いですが、もし同じローカルエリアネットワーク内でまだ実行している場合は重要ではありません (例えば、Hadoopがある各ラック上に2、3のSparkマシーンを配置する)。
ネットワークセキュリティのためのポートの設定
Sparkはネットワークを酷く利用し、堅いファイアウォール設定の使用に関して幾つかの環境は厳しい要求をします。設定するポートの完全なリストは、 セキュリティのページを見てください。
高可用性
デフォルトでは、スタンドアローンスケジューリングクラスタはワーカーの障害に対して回復力に富んでいます(Spark自体の範囲においてワークの喪失に対して他のワーカーにそれを移動することで回復力に富んでいます)。しかし、スケジューラーはスケジュールの決定をするためにマスターを使用します。これにより(デフォルト)単一障害点ができます: もしマスターがクラッシュすると新しいアプリケーションを生成することができません。これを回避するために、2つの高可用性の仕組みがあります。以下に詳細を示します。
ZooKeeperを使ったスタンバイマスター
概要
リーダーの選出と幾つかの状態のストレージを提供するためにZooKeeperを使用すると、同じZooKeeperインスタンスに接続しているクラスタ内に複数のマスターを起動することができます。一つが"リーダー"として選出され、他はスタンドバイモードのままでいるでしょう。現在のリーダーが死亡すると、他のマスターが選出され、古いマスターの状態を回復してスケジューリングを再開します。回復の全プロセス(最初のリーダーがダウンしてから)には、1,2分かかるでしょう。この遅延は新しい アプリケーションのスケジューリングにのみ影響することに注意してください - マスター障害の間に既に実行中のアプリケーションは影響を受けません。
ZooKeeperを使った開始についてもっと知るにはここを見てください。
設定
この回復モードを有効にするために、spark.deploy.recoveryMode
と関係する spark.deploy.zookeeper.* 設定を設定することで、SPARK_DAEMON_JAVA_OPTS を設定することができます。これらの設定についての詳しい情報は設定ドキュメントを参照してください。
Possible gotcha: クラスタ内に複数のマスターがあるが、ZooKeeperを使うように正しくマスターを設定できていない場合、マスターはお互いを見つける事ができずにそれぞれが全てリーダーだと考えるかもしれません。これは結果として(全てのマスターが独立してスケジュールすることになるため)healthyクラスタ状態ではなくなるでしょう
詳細
ZooKeeperクラスターをセットアップした後は、高可用性を有効にすることは簡単です。異なるノード上の同じZooKeeper設定(ZooKeeper URL およびディレクトリ)を持つ複数のマスタープロセスを単純に開始します。マスターはいつでも追加および削除することができます。
新しいアプリケーションのスケジュールあるいはクラスタにワーカーを追加するためには、それらがクラスタのリーダーのIPアドレスを知る必要があります。一つの時に渡していたマスターのリストに単純に渡すことで行うことができます。例えば、spark://host1:port1,host2:port2
を指しているSparkContextを開始しても構いません。これはSparkContextが両方のマスターを登録するようにさせるでしょう - もし host1
がダウンすると、新しいリーダーhost2
を見つけるのでこの設定はまだ正しいでしょう。
"マスターの登録"と通常の操作の間で重要な違いがあります。開始時にはアプリケーションあるいはワーカーは現在のリーダーマスターを見つけて登録できる必要があります。とはいえ、一旦登録に成功すると、それは"システム内"になります (つまり、ZooKeeperに格納されます)。障害が発生すると、新しいリーダーはリーダーシップの変更を全ての以前に登録されたアプリケーションとワーカーに知らせるために接続します。そのためそれらは起動時に新しいマスターの存在を知る必要さえありません。
この特性のため、新しいマスターはいつでも生成することができ、それがリーダーになった場合に新しいアプリケーションとワーカーがそれを登録するために見つけることができるかを心配するだけで良いです。一旦登録されると、ZooKeeperが面倒を見ます。
ローカルファイルシステムを使ったシングルモードリカバリ
概要
ZooKeeperはプロダクションレベルの高可用性を追求するための最も良い方法ですが、もしマスターがダウンしたら単に再起動できるようにしたいと思う場合は、FILESYSTEMモードがそれを引き受けます。アプリケーションおよびワーカーを登録する時に、それらはマスタープロセスが再起動する場合に回復できるように提供されたディレクトリに十分な状態を書き込みます。
設定
この回復モードを有効にするために、以下の設定を使ってspark-envの中でSPARK_DAEMON_JAVA_OPTSを設定することができます。
システム プロパティ | 意味 |
---|---|
spark.deploy.recoveryMode |
シングルノードリカバリーモードをFILESYSTEMに設定します (デフォルト: NONE)。 |
spark.deploy.recoveryDirectory |
Sparkがリカバリー状態を格納する、マスターからアクセス可能なディレクトリ。 |
詳細
- この解決方法はmonitのような監視/管理プロセスと提携して使われるか、単に再起動することで手動のリカバリをするために使われます。
- ファイルシステムの回復は何も回復しないよりは素直に良いように思えますが、このモードでは特定の開発あるいは実験的な目的のためには最適状態には及ばないかも知れません。特に、stop-master.shを使ってマスターをkillすることはリカバリー状態を掃除しないため、新しいマスターを開始するときはいつでもリカバリーモードに入るでしょう。これは全ての以前の登録済みのワーカー/クライアントがタイムアウトするのを待つ必要がある場合には1分間開始の時間を増加させるでしょう。
- 公式にはサポートされていませんが、リカバリーディレクトリとしてNFSディレクトリをマウントすることができます。元のマスターノードが完全にダウンした場合、異なるノードでマスターを開始することができます。全ての以前の登録済みのワーカー/アプリケーションが正しくリカバーされるでしょう(ZooKeeperのリカバリーに相当します)。しかし、将来のアプリケーションは登録するために新しいマスターを見つけることができる必要があるでしょう。