Sparkスタンドアローンモード

MesosあるいはYARNクラスタマネージャー上での実行に加えて、Sparkは単純なスタンドアローン デプロイモードも提供します。マスターおよびワーカーを手動で開始するか、あるいは提供された起動スクリプトを使用するかのどちらかで、スタンドアローンクラスタを手動で起動することができます。テストのために一つのマシーン上でそれらのデーモンを起動することもできます。

Sparkスタンドアローンをクラスタにインストール

Sparkスタンドアローンモードをインストールするには、クラスタ上の各ノード上にコンパイルされたバージョンのSparkを単に配置します。各リリースのSparkのpre-buildバージョンを取得するか、あるいは自身でビルドします。

クラスタを手動で開始

以下を実行することでスタンドアローンマスターサーバを起動することができます:

./sbin/start-master.sh

一度開始すると、マスターは自身でspark://HOST:PORT URLを出力するでしょう。ワーカーがそれに接続するために使うか、SparkContextの"master"引数として渡すことができます。マスターweb UI上でこのURLを見つけることもできます。デフォルトではhttp://localhost:8080 です。

同じように、1つ以上のワーカーを開始し、それらを以下のようにしてマスターに接続することができます:

./sbin/start-slave.sh <master-spark-URL>

一度ワーカーを開始して、マスターのweb UIを見てください(デフォルトでは、http://localhost:8080)。CPUの数とメモリと共に新しいノードがリストされているのを見るでしょう。(OSのために1ギガバイトが残されます)。

最後に、以下の設定オプションをマスターおよびワーカーに渡すことができます:

引数意味
-h HOST, --host HOST listenするホスト名
-i HOST, --ip HOST listenするホスト名(非推奨です。-h あるいは --hostを使ってください)
-p PORT, --port PORT サービスがlistenするポート(デフォルト: マスターは 7077、ワーカーはランダム)
--webui-port PORT web UIのためのポート (デフォルト: マスターは8080、ワーカーは8081)
-c CORES, --cores CORES Sparkアプリケーションがマシーン上で使うことができる総CPUコア (デフォルト: 利用可能な全て); ワーカー上のみ
-m MEM, --memory MEM Sparkアプリケーションがマシーン上で利用可能な総メモリ量。1000Mあるいは2Gのようなフォーマット (デフォルト: マシーンの総RAM から1G引いたもの); ワーカー上のみ
-d DIR, --work-dir DIR 作業場所およびジョブのログ出力に使われるディレクトリ (デフォルト: SPARK_HOME/work); ワーカー上のみ
--properties-file FILE 独自のSparkプロパティファイルをロードするパス (デフォルト: conf/spark-defaults.conf)

クラスタ起動スクリプト

起動スクリプトを使ってSparkスタンドアローンクラスタを起動するには、Sparkディレクトリに conf/slaves と呼ばれるファイルを生成する必要があります。これにはSpark workerを開始しようとする全てのマシーンのホスト名が1行ごとに1つずつ含まれていなければなりません。conf/slavesが無い場合は、起動スクリプトは初期値の1つのマシーン(localhost)に設定されます。これはテストに便利です。マスターマシーンはsshを使って各ワーカーにアクセスすることに注意してください。デフォルトでは、sshは並行して実行され、設定のために(プライベートキーを使って)パスワード無しを必要とします。パスワード無しのセットアップをしていない場合、環境変数 SPARK_SSH_FOREGROUND を設定し、各ワーカーに連続してパスワードを与えることができます。

一旦このファイルをセットアップすると、以下のシェルスクリプトを使ってクラスタを起動あるいは停止することができます。Hadoopのデプロイスクリプトに基づいており、SPARK_HOME/sbinで利用可能です:

これらのスクリプトは、ローカルマシーンではなく、あなたがSparkマスターを実行したいマシーン上で実行されなければならない事に注意してください。

conf/spark-env.sh内で環境変数を設定することで、更にクラスタを任意に設定することができます。conf/spark-env.sh.templateからこのファイルを生成し、設定を有効にするためにそれを全てのワーカーマシーンにコピーします。以下の設定が利用可能です:

環境変数意味
SPARK_MASTER_HOST マスターを特定のホスト名あるいはIPアドレス、例えば公開されているものにバインドします。
SPARK_MASTER_PORT 異なるポート(デフォルト: 7070)でマスターを開始します。
SPARK_MASTER_WEBUI_PORT マスターweb UIのポート (デフォルト: 8080)。
SPARK_MASTER_OPTS "-Dx=y"の形式でマスターにのみ適用される設定プロパティ (デフォルト: 無し)可能なオプションのリストは以下を見てください。
SPARK_LOCAL_DIRS Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。
SPARK_WORKER_CORES Spark アプリケーションがマシーン上で使用する総コア数 (デフォルト: 利用可能な全てのコア)。
SPARK_WORKER_MEMORY Sparkアプリケーションがマシーン上で使用できる総メモリ量。例えば 1000m, 2g (デフォルト: 総メモリから1GBを引いたもの); 各アプリケーションの 個々の メモリはspark.executor.memory プロパティを使って設定されることに注意してください。
SPARK_WORKER_PORT 特定のポートでSparkワーカーを開始する(デフォルト: ランダム)。
SPARK_WORKER_WEBUI_PORT ワーカーweb UIのポート (デフォルト: 8081)。
SPARK_WORKER_DIR 中でアプリケーションを実行するディレクトリで、ログおよび作業スペースの両方を含むでしょう(デフォルト: SPARK_HOME/work)。
SPARK_WORKER_OPTS "-Dx=y"の形式でワーカーにのみ適用される設定プロパティ (デフォルト: 無し)可能なオプションのリストは以下を見てください。
SPARK_DAEMON_MEMORY Spark マスターとワーカーデーモン自身に割り当てるメモリ(デフォルト: 1g)。
SPARK_DAEMON_JAVA_OPTS Sparkマスターとワーカーデーモン自身の"-Dx=y"形式のJVMオプション (デフォルト: none)。
SPARK_PUBLIC_DNS Sparkマスターおよびワーカーの公開DNS名 (デフォルト: none)。

注意: 起動スクリプトは現在のところWindowsをサポートしません。SparkクラスタをWindows上で動作するには、手動でマスターおよびワーカーを開始してください。

SPARK_MASTER_OPTS は以下のシステムプロパティをサポートします:

プロパティ名デフォルト意味
spark.deploy.retainedApplications 200 表示するための完了したアプリケーションの最大数。古いアプリケーションはこの制限を維持するためにUIから落とされるでしょう。
spark.deploy.retainedDrivers 200 表示するための完了したドライバーの最大数。古いドライバーはこの制限を維持するためにUIから落とされるでしょう。
spark.deploy.spreadOut true スタンドアローンクラスタマネージャーがノードを渡ってアプリケーションを拡散しなければならないか、あるいはできるだけ少ない数のノード上で固めるかどうか。Spreading out is usually better for data locality in HDFS, but consolidating is more efficient for compute-intensive workloads.
spark.deploy.defaultCores (infinite) spark.cores.maxを設定しない場合は、スパークスタンドアローンモードでアプリケーションに渡すデフォルトのコアの数。設定しない場合は、spark.cores.maxを設定しない限りはアプリケーションは常に利用可能な全てのコアを取得します。共有クラスタ上ではユーザがデフォルトでクラスタ全体を占拠することを防ぐためにこれを少なく設定します。
spark.deploy.maxExecutorRetries 10 Limit on the maximum number of back-to-back executor failures that can occur before the standalone cluster manager removes a faulty application. 実行中のexecutorがある場合はアプリケーションは削除されないでしょう。If an application experiences more than spark.deploy.maxExecutorRetries failures in a row, no executors successfully start running in between those failures, and the application has no running executors then the standalone cluster manager will remove the application and mark it as failed. この自動削除を無効にするには、spark.deploy.maxExecutorRetries-1に設定します。
spark.worker.timeout 60 スタンドアローンデプロイマスターがハートビートを受け取らない場合にワーカーを失ったと見なすまでの秒数。

SPARK_WORKER_OPTS は以下のシステムプロパティをサポートします:

プロパティ名デフォルト意味
spark.worker.cleanup.enabled false 定期的な ワーカー/アプリケーション ディレクトリの掃除を有効にする。YARNは異なる動作をするため、これはスタンドアローンモードにのみ影響することに注意してください。停止したアプリケーションのディレクトリのみが掃除されます。
spark.worker.cleanup.interval 1800 (30 分) ワーカーがローカルマシーン上の古いアプリケーションディレクトリを掃除する間隔を秒数で制御します。
spark.worker.cleanup.appDataTtl 7 * 24 * 3600 (7 日) 各ワーカー上でアプリケーションのワークディレクトリを維持する秒数。これは生存時間で、利用可能なディスクの量に依存します。アプリケーションログおよびjarは各アプリケーションワークディレクトリにダウンロードされます。ジョブを頻繁に実行する場合は特に、時間が経つにつれてワークディレクトリは急速にディスクスペースを埋めます。
spark.worker.ui.compressedLogFileLengthCacheSize 100 For compressed log files, the uncompressed file can only be computed by uncompressing the files. Spark は圧縮されたログファイルの非圧縮のファイルサイズをキャッシュします。このプロパティはキャッシュサイズを制御します。

アプリケーションをクラスタに接続

Sparkクラスタ上でアプリケーションを実行するには、SparkContext コンストラクタにするように、単純にマスターのspark://IP:PORT URL を渡します。

クラスタに対して対話的なSparkシェルを実行するには以下のコマンドを実行します:

./bin/spark-shell --master spark://IP:PORT

sparkシェルがクラスタ内で使用するコアの数を制御するために、オプション--total-executor-cores <numCores> を渡すこともできます。

Sparkアプリケーションの起動

spark-submit スクリプト はコンパイルされたSparkアプリケーションをクラスタにサブミットする最も率直な方法です。スタンドアローンクラスタに関しては、Sparkは現在のところ二つのデプロイモードをサポートします:クライアント モードでは、ドライバは同じプロセス内でクライアントがアプリケーションをサブミットしたかのように起動します。クラスタ モードでは、ドライバはクラスタ内のワーカープロセスの一つから起動されますが、クライアントプロセスはアプリケーションが終了するのを待たずにアプリケーションをサブミットする責任を果たすとすぐに終了します。

アプリケーションがSparkサブミットを使って起動されると、アプリケーションのjarは自動的に全てのワーカーノードに分配されます。アプリケーションが依存するどのような追加のjarについても、カンマを区切り文字として使って --jars フラグを使ってそれらを指定しなければなりません。アプリケーションの設定あるいは実行の環境を制御するには、Spark 設定を見てください。

更に、スタンドアローンクラスタモードは、アプリケーションが0ではない終了コードで終了した場合に、自動でのアプリケーションの再起動をサポートします。この機能を使うには、アプリケーションを起動する時にspark-submit--supervise フラグを渡します。次に、繰り返し失敗するアプリケーションをkillしたい場合は、以下のようにして行うことができます:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

http://<master url>:8080のスタンドアローンマスター web UIを使ってドライバIDを見つける事ができます。

リソースのスケジューリング

スタンドアローンクラスターモードは現在のところアプリケーションを跨ぐ単純なFIFOスケジューラのみをサポートします。しかし、複数の同時実行ユーザを許可するために、各アプリケーションが使うであろうリソースの最大数を制御することができます。デフォルトでは、クラスタ内の全ての コアを取得しますが、これは同時に一つのアプリケーションだけを実行している場合にのみ意味があります。SparkConf内でspark.cores.max を設定することで、コアの数に上限を付けることができます。例えば:

val conf = new SparkConf()
             .setMaster(...)
             .setAppName(...)
             .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

spark.cores.maxをinfinite未満の何かに設定していないアプリケーションのためのデフォルトを変更するために、更にクラスタマスタープロセス上のspark.deploy.defaultCoresを設定することができます。以下をconf/spark-env.shに追加することでこれをしてください:

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

これはユーザが個々にコアの最大数を設定していないかもしれない共有クラスタで有用かも知れません。

監視および記録

Sparkのスタンドアローンモードはクラスタを監視するためにwebベースのユーザインタフェースを提供します。マスターと各ワーカーは、クラスターとジョブの統計を表示する独自のwebUIを持ちます。デフォルトでは、マスターのためのwebUIにポート8080でアクセスすることができます。ポートは、設定ファイルあるいはコマンドラインオプションのどちらかを使って変更することができます。

更に、各ジョブの詳細なログ出力も各スレーブノードの作業デイlレクトリ(デフォルトではSPARK_HOME/work)に書き込まれます。各ジョブごとに2つのファイル、stdoutstderr、があるでしょう。全ての出力はコンソールに書き込まれます。

Hadoopと一緒に実行

同じマシーン上で別個のサービスとして起動するだけで、Sparkと一緒に既存のHaddopクラスタを実行することができます。SparkからHadoopデータにアクセスするためには、単に hdfs:// URL (一般的には hdfs://<namenode>:9000/path、正しいURLはHadoopネームノードのweb UIで見つけることができます)を使ってください。別のやり方として、Sparkのための個別のクラスタをセットアップし、ネットワーク越しにHDFSにまだアクセスできるようにすることができます; これはディスクのローカルアクセスより遅いですが、もし同じローカルエリアネットワーク内でまだ実行している場合は重要ではありません (例えば、Hadoopがある各ラック上に2、3のSparkマシーンを配置する)。

ネットワークセキュリティのためのポートの設定

Sparkはネットワークを酷く利用し、堅いファイアウォール設定の使用に関して幾つかの環境は厳しい要求をします。設定するポートの完全なリストは、 セキュリティのページを見てください。

高可用性

デフォルトでは、スタンドアローンスケジューリングクラスタはワーカーの障害に対して回復力に富んでいます(Spark自体の範囲においてワークの喪失に対して他のワーカーにそれを移動することで回復力に富んでいます)。しかし、スケジューラーはスケジュールの決定をするためにマスターを使用します。これにより(デフォルト)単一障害点ができます: もしマスターがクラッシュすると新しいアプリケーションを生成することができません。これを回避するために、2つの高可用性の仕組みがあります。以下に詳細を示します。

ZooKeeperを使ったスタンバイマスター

概要

リーダーの選出と幾つかの状態のストレージを提供するためにZooKeeperを使用すると、同じZooKeeperインスタンスに接続しているクラスタ内に複数のマスターを起動することができます。一つが"リーダー"として選出され、他はスタンドバイモードのままでいるでしょう。現在のリーダーが死亡すると、他のマスターが選出され、古いマスターの状態を回復してスケジューリングを再開します。回復の全プロセス(最初のリーダーがダウンしてから)には、1、2分かかるでしょう。この遅延は新しい アプリケーションのスケジューリングにのみ影響することに注意してください - マスター障害の間に既に実行中のアプリケーションは影響を受けません。

ZooKeeperを使った開始についてもっと知るにはここを見てください。

設定

この回復モードを有効にするために、spark.deploy.recoveryMode と関係する spark.deploy.zookeeper.* 設定を設定することで、SPARK_DAEMON_JAVA_OPTS を設定することができます。これらの設定についての詳しい情報は設定ドキュメントを参照してください。

Possible gotcha: クラスタ内に複数のマスターがあるが、ZooKeeperを使うように正しくマスターを設定できていない場合、マスターはお互いを見つける事ができずにそれぞれが全てリーダーだと考えるかもしれません。これは結果として(全てのマスターが独立してスケジュールすることになるため)healthyクラスタ状態ではなくなるでしょう

詳細

ZooKeeperクラスターをセットアップした後は、高可用性を有効にすることは簡単です。異なるノード上の同じZooKeeper設定(ZooKeeper URL およびディレクトリ)を持つ複数のマスタープロセスを単純に開始します。マスターはいつでも追加および削除することができます。

新しいアプリケーションのスケジュールあるいはクラスタにワーカーを追加するためには、それらがクラスタのリーダーのIPアドレスを知る必要があります。一つの時に渡していたマスターのリストに単純に渡すことで行うことができます。例えば、spark://host1:port1,host2:port2を指しているSparkContextを開始しても構いません。これはSparkContextが両方のマスターを登録するようにさせるでしょう - もし host1がダウンすると、新しいリーダーhost2を見つけるのでこの設定はまだ正しいでしょう。

"マスターの登録"と通常の操作の間で重要な違いがあります。開始時にはアプリケーションあるいはワーカーは現在のリーダーマスターを見つけて登録できる必要があります。とはいえ、一旦登録に成功すると、それは"システム内"になります (つまり、ZooKeeperに格納されます)。障害が発生すると、新しいリーダーはリーダーシップの変更を全ての以前に登録されたアプリケーションとワーカーに知らせるために接続します。そのためそれらは起動時に新しいマスターの存在を知る必要さえありません。

この特性のため、新しいマスターはいつでも生成することができ、それがリーダーになった場合に新しいアプリケーションとワーカーがそれを登録するために見つけることができるかを心配するだけで良いです。一旦登録されると、ZooKeeperが面倒を見ます。

ローカルファイルシステムを使ったシングルモードリカバリ

概要

ZooKeeperはプロダクションレベルの高可用性を追求するための最も良い方法ですが、もしマスターがダウンしたら単に再起動できるようにしたいと思う場合は、FILESYSTEMモードがそれを引き受けます。アプリケーションおよびワーカーを登録する時に、それらはマスタープロセスが再起動する場合に回復できるように提供されたディレクトリに十分な状態を書き込みます。

設定

この回復モードを有効にするために、以下の設定を使ってspark-envの中でSPARK_DAEMON_JAVA_OPTSを設定することができます。

システム プロパティ意味
spark.deploy.recoveryMode シングルノードリカバリーモードをFILESYSTEMに設定します (デフォルト: NONE)。
spark.deploy.recoveryDirectory Sparkがリカバリー状態を格納する、マスターからアクセス可能なディレクトリ。

詳細

TOP
inserted by FC2 system