監視および計測器

Sparkアプリケーションを監視するいくつかの方法があります: web UI、マトリクス、および実験的な計測器。

Web インタフェース

各SparkContextはデフォルトではポート4040でweb UIを起動します。これはアプリケーションに関する有用な情報を表示します。以下のものが含まれます:

単にwebブラウザでhttp://<driver-node>:4040を開くことで、このインタフェースにアクセスできます。同じホスト上で複数のSparkContextが実行中の場合、それらは4040から始まる連続したポートに割り当てられるでしょう(4041, 4042など)。

デフォルトではこの情報はアプリケーションの持続期間のみ利用可能です。後でwebUIを見るためには、アプリケーションの開始前にspark.eventLog.enabled をtrueに設定します。これはUIに表示された情報を符号化したSparkイベントを永続的なストレージに記録するようにSparkを設定します。

後で見る

アプリケーションのイベントログが存在する場合、Sparkの履歴サーバを使ってアプリケーションのUIを構築することが可能です。以下を実行することで履歴サーバを開始することができます:

./sbin/start-history-server.sh

これはデフォルトで http://<server-url>:18080 にwebインタフェースを生成し、未完了あるいは完了済みのアプリケーションと試行をリスト化します。

ファイルシステム プロバイダ クラス(以下のspark.history.providerを見てください)を使っている場合、ログの基本ディレクトリはspark.history.fs.logDirectory設定オプションで提供され、基本ディレクトリはアプリケーションのログイベントを表すサブディレクトリを含まなければなりません。

sparkのジョブ自身はイベントを記録するように設定され、同じ共有、書き込み可能なディレクトリに記録するように設定されていなければなりません。例えば、もしサーバがhdfs://namenode/shared/spark-logsのログディレクトリを設定されている場合、クライアント側のオプションは以下のようになります:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

履歴サーバは以下のようにして設定することができます:

環境変数

環境変数意味
SPARK_DAEMON_MEMORY 履歴サーバに割り当てるメモリ (デフォルト: 1g)。
SPARK_DAEMON_JAVA_OPTS 履歴サーバのためのJVMオプション (デフォルト: none)。
SPARK_PUBLIC_DNS 履歴サーバの公開アドレス。設定されない場合、アプリケーションの履歴へのリンクはサーバの内部アドレスが使われ、壊れたリンクとなります (デフォルト: none)。
SPARK_HISTORY_OPTS spark.history.* 履歴サーバの設定オプション (デフォルト: none)。

Spark configuration options

プロパティ名デフォルト意味
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider アプリケーション履歴のバックエンド実装のクラス名。今のところ、Sparkによって提供された1つの実装しかありません。これはファイルシステムに保存されたアプリケーションログを探します。
spark.history.fs.logDirectory file:/tmp/spark-events ファイルシステムの履歴プロバイダのために、URLはロードするアプリケーションのイベントログを含みますディレクトリです。これはローカルの file:// パス、HDFS パスhdfs://namenode/shared/spark-logs、あるいはHadoop APIによってサポートされる別のファイルシステムのパスです。
spark.history.fs.update.interval 10s ファイルシステム履歴プロバイダがログディレクトリ内で新規あるいは更新ログをチェックする周期。短い周期は、更新されたアプリケーションの再読み込みのサーバ負荷を犠牲にして、新しいアプリケーションを素早く検知します。更新が完了するとすぐに完了あるいは未完了のアプリケーションのリスト化は変更を反映するでしょう。
spark.history.retainedApplications 50 キャッシュ内にUIデータを保持するアプリケーションの数。この上限を超えると、一番古いアプリケーションがキャッシュから削除されます。アプリケーションがキャッシュに無い場合、それがUIからアクセスされる場合はディスクからロードされなければならないでしょう。
spark.history.ui.maxApplications Int.MaxValue 履歴サマリページ上で表示されるアプリケーションの数。履歴サマリページ上に表示されていない場合でも、それらのURLを直接アクセスすることでアプリケーションのUIがまだ利用可能です。
spark.history.ui.port 18080 履歴サーバのwebインタフェースが紐付けられるポート。
spark.history.kerberos.enabled false 履歴サーバがログインするためにkerberosを使わなければならないかどうかを示す。もし履歴サーバがsecureなHadoopクラスタ上のHDFSファイルにアクセスする場合にこれが必要とされます。これがtrueの場合、spark.history.kerberos.principal およびspark.history.kerberos.keytabの設定が使われます。
spark.history.kerberos.principal (none) 履歴サーバのためのKerberosプリンシパル名。
spark.history.kerberos.keytab (none) 履歴サーバのためのkerberosキータブファイルの場所
spark.history.ui.acls.enable false アプリケーションを表示するユーザを認証するためにaclがチェックされなければならないかどうかを指定する。有効な場合、アプリケーションが実行される時に各アプリケーションのspark.ui.acls.enableが設定されているかどうかに関係なく、アクセス制御の調査が行われます。アプリケーションの所有者は自身のアプリケーションを見るために常に認証されることになり、アプリエケ-ションが実行された場合にspark.ui.view.aclsを使って指定された全てのユーザとspark.ui.view.acls.groups を使って指定されたグループもアプリケーションを見るために認証されるでしょう。無効な場合、一切のアクセス制御が行われません。
spark.history.ui.admin.acls empty 履歴サーバ内で全てのSparkアプリケーションへのviewアクセスを持つユーザ/管理者のカンマ区切りのリスト。デフォルトでは実行時のアプリケーションを見る権限を持つユーザのみが関連するアプリケーション履歴にアクセスすることができます。これを使って設定されたユーザ/管理者がアクセスする権限を持つこともできます。リストに "*"を入れることは、全てのユーザが管理権限を持つことを意味します。
spark.history.ui.admin.acls.groups empty 履歴サーバ内で全てのSparkアプリケーションへのviewアクセスを持つグループのカンマ区切りのリスト。デフォルトでは実行時のアプリケーションを見る権限を持つグループのみが関連するアプリケーション履歴にアクセスすることができます。これを使って設定されたグループがアクセスする権限を持つこともできます。リストに "*"を入れることは、全てのグループが管理権限を持つことを意味します。
spark.history.fs.cleaner.enabled false 履歴サーバが定期的にイベントログをストレージから削除するかどうかを指定する。
spark.history.fs.cleaner.interval 1d ファイルシステムのジョブ履歴クリーナーがファイルを削除するためにチェックする頻度。もしファイルがspark.history.fs.cleaner.maxAgeより古い場合は、それらは削除されます。
spark.history.fs.cleaner.maxAge 7d これより古いファイルシステムのジョブ履歴ファイルは履歴クリーナーが実行される時に削除されるでしょう。
spark.history.fs.numReplayThreads 利用可能なコアの25% 履歴サーバによって使われイベントログを処理するためのスレッドの数。

これら全てのUI内でテーブルはヘッダーをクリックすることでソート可能です。遅いタスク、データのゆがみなどを識別するのを容易にします。

注意

  1. 履歴サーバは完了および未完了のSparkジョブを表示します。障害の後でアプリケーションが複数回の試行をする場合、外に出ていく全ての未完了の試行あるいは最後に成功した試行と共に、失敗した試行が表示されるでしょう。

  2. 未完了のアプリケーションは断続的に更新されるだけです。更新の間の時間は変更されたファイルのチェックの間隔によって定義されます(spark.history.fs.update.interval)。大きなクラスターほど更新の間隔は大きな値に設定されるかも知れません。実行中のアプリケーションを見る方法は実際には自身のweb UIを見ることです。

  3. 完了したと自身を登録しないで終了したアプリケーションは、未完了としてリスト化されるでしょう - それらがもう実行していないとしてもです。これはアプリケーションがクラッシュした場合に起こりえます。

  4. Sparkジョブの完了の信号を送る1つの方法はSparkのContextを明示的に終了することです (sc.stop())。あるいは、Pythonではwith SparkContext() as sc: を使ってSparkのContextのsteupとtear downを処理するために構築します。

REST API

UI内でマトリックスを見ることに加えて、それらはJSONとして利用可能です。これにより開発者は新しい表示方法およびSparkのための監視ツールを簡単に作成することができます。JSONは実行中のアプリケーションおよび履歴サーバ内の両方で利用可能です。エンドポイントは /api/v1にマウントされます。例えば、履歴サーバの場合、それらは一般的にhttp://<server-url>:18080/api/v1でアクセスすることができ、実行中のアプリケーションの場合http://localhost:4040/api/v1で利用可能です。

APIの中では、アプリケーションはアプリケーションID [app-id]によって参照されます。YARN上で実行中の場合、各アプリケーションは複数の試行を持つかも知れませんが、クライアントモードでのアプリケーションと異なり、クラスタモードでのアプリケーションの試行IDです。YARNクラスタモードのアプリケーションはそれらの [attempt-id]によって識別することができます。以下でリスト化されているAPIでは、[app-id] は実際には[base-app-id]/[attempt-id]で、[base-app-id] はYARNのアプリケーションIDです。


?status=[active|complete|pending|failed] 状態のステージのみをリスト化します。
エンドポイント意味
/applications 全てのアプリケーションのリスト。
?status=[completed|running] 選択された状態のアプリケーションのみをリスト化します。
?minDate=[date] リスト化する最も早い開始日付/時間。
?maxDate=[date] リスト化するもっとも最近の開始日付/時間。
?minEndDate=[date] リスト化する最も早い終了日付/時間。
?maxEndDate=[date] リスト化する最も最近の終了日付/時間。
?limit=[limit] リスト化するアプリケーションの数を制限する。
例:
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs 指定されたアプリケーションの全てのジョブのリスト。
?status=[running|succeeded|failed|unknown] 特定の状態のジョブのみをリスト化します。
/applications/[app-id]/jobs/[job-id] 指定されたジョブの詳細。
/applications/[app-id]/stages 指定されたアプリケーションの全てのステージのリスト。
/applications/[app-id]/stages/[stage-id] 指定されたステージの全ての試行のリスト。
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] Details for the given stage attempt.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 指定されたステージの試行の中の全てのタスクのマトリックスの概要。
?quantiles 指定された変位内の測定基準を要約します。
例: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 指定されたステージの試行の全てのタスクのリスト。
?offset=[offset]&length=[len] 指定された範囲のタスクをリスト化します。
?sortBy=[runtime|-runtime] タスクをソートします。
例: ?offset=10&length=50&sortBy=runtime
/applications/[app-id]/executors 指定されたアプリケーションの全ての動作中のexecutorのリスト。
/applications/[app-id]/allexecutors 指定されたアプリケーションの全て(動作中および停止)のexecutorのリスト。
/applications/[app-id]/storage/rdd 指定されたアプリケーションのための保存されたRDDのリスト。
/applications/[app-id]/storage/rdd/[rdd-id] 指定されたRDDのストレージ状態の詳細。
/applications/[base-app-id]/logs 指定されたアプリケーションの全ての試みについてのイベントログをzipファイルのファイルとしてダウンロードします。
/applications/[base-app-id]/[attempt-id]/logs 指定されたアプリケーションの試行のイベントログをzipファイルとしてダウンロードします。
/applications/[app-id]/streaming/statistics ストリーミング コンテキストのための統計。
/applications/[app-id]/streaming/receivers 全てのストリーミング レシーバーのリスト。
/applications/[app-id]/streaming/receivers/[stream-id] 指定されたレシーバーの詳細。
/applications/[app-id]/streaming/batches 全ての保持されたバッチのリスト。
/applications/[app-id]/streaming/batches/[batch-id] 指定されたバッチの詳細。
/applications/[app-id]/streaming/batches/[batch-id]/operations 指定されたバッチの全ての出力操作のリスト。
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] 指定された操作と指定されたバッチの称しあ。
/applications/[app-id]/environment 指定されたアプリケーションの環境の詳細。

取り出すことができるジョブとステージの数はスタンドアローンのSpark UIの保持機構によって制限されます; "spark.ui.retainedJobs" はジョブ上でトリガーするガベージコレクションの閾値を定義し、spark.ui.retainedStagesはステージ上のそれを定義します。ガベージコレクションは再生によって起こることに注意してください: それらの値を増加し履歴サーバを再起動することで、より多くのエントリを取り出すことができます。

API バージョニング ポリシー

これらのエンドポイントはその上でアプリケーションを開発を容易にするために熱心にバージョン付けされています。特に、Sparkは以下を保証します:

実行中のアプリケーションのUIを検証する時でも、applications/[app-id]の分割はたった一つのアプリケーションだけが利用可能だとしてもまだ必要です。例えば、実行中のアプリケーションのジョブのリストを見るには、http://localhost:4040/api/v1/applications/[app-id]/jobsに行きます。これは両方のモードにおいてパスを矛盾が無い状態にします。

マトリックス

SparkはDropwizard Metrics Libraryに基づいた設定可能なマトリックスシステムを持っています。これにより、ユーザはSparkマトリックスにHTTP, JMX およびCSV ファイルを含む様々なsinkを伝えることができます。このマトリクスシステムは、Sparkが $SPARK_HOME/conf/metrics.propertiesに存在すると期待する設定ファイルを通じて設定することができます。独自のファイルの場所はspark.metrics.conf設定プロパティによって指定することができます。デフォルトでは、ドライバあるいはexecutorマトリクスのために使われるルートの名前空間はspark.app.idの値です。しかし、しばしば、ユーザはドライバーとexecutorについてアプリケーションを横断してメトリックスを追跡したいと思います。これはアプリケーションID(つまり、spark.app.id) を使って行うのは、アプリケーションの各起動の度に変更するため、難しいです。そのようなユースケースについては、spark.metrics.namespace 設定プロパティを使ってメトリクスのレポートのために独自の名前空間を指定することができます。例えば、ユーザがメトリクスの名前空間をアプリケーションの名前に設定したい場合、spark.metrics.namespace プロパティを${spark.app.name}のような値に設定することができます。この値はSparkによって適切に展開され、メトリックス システムのルートの名前空間として使われます。どのようなドライバおよびexecutorのメトリックスもspark.app.idを接頭語に使いませんし、どのようなspark.metrics.namespace プロパティもそのようなメトリックスに影響しません。

SparkのマトリックスはSparkコンポーネントに対応する異なる インスタンス に分離されます。各インスタンスの中で、マトリックが伝える一連のsinkを設定することができます。現在のところ以下のインスタンスがサポートされています:

各インスタンスは0個以上のsinksに対して報告することができます。sinkはorg.apache.spark.metrics.sink パッケージに含まれています。

Sparkはライセンスの制限のためにデフォルトのビルドに含まれていないGanglia sinkもサポートします:

GangliaSinkをインストールするには、Sparkのカスタムビルドを実行する必要があるでしょう。このライブラリを組み込むことでSparkパッケージにLGPLでライセンスされたコードが含まれることに注意してください。sbt ユーザに関しては、ビルドする前に SPARK_GANGLIA_LGPL 環境変数を設定してください。Mavenユーザに関しては、 -Pspark-ganglia-lgpl プロファイルを有効にします。クラスタのSparkビルドを修正することに加えて、ユーザアプリケーションは spark-ganglia-lgpl アーティファクトにリンクする必要があります。

マトリックスの設定の構文は $SPARK_HOME/conf/metrics.properties.templateの例の設定ファイル内で定義されています。

上級の計測器

Sparkのジョブのパフォーマンスをプロファイルするために幾つかの外部ツールを使用することができます。

TOP
inserted by FC2 system