監視および計測器

Sparkアプリケーションを監視するいくつかの方法があります: web UI、マトリクス、および実験的な計測器。

Web インタフェース

各SparkContextはデフォルトではポート4040でWeb UIを起動します。これはアプリケーションに関する有用な情報を表示します。以下のものが含まれます:

単にwebブラウザでhttp://<driver-node>:4040を開くことで、このインタフェースにアクセスできます。同じホスト上で複数のSparkContextが実行中の場合、それらは4040から始まる連続したポートに割り当てられるでしょう(4041, 4042など)。

デフォルトではこの情報はアプリケーションの持続期間のみ利用可能です。後でwebUIを見るためには、アプリケーションの開始前にspark.eventLog.enabled をtrueに設定します。これはUIに表示された情報を符号化したSparkイベントを永続的なストレージに記録するようにSparkを設定します。

後で見る

アプリケーションのイベントログが存在する場合、Sparkの履歴サーバを使ってアプリケーションのUIを構築することが可能です。以下を実行することで履歴サーバを開始することができます:

./sbin/start-history-server.sh

これはデフォルトで http://<server-url>:18080 にwebインタフェースを生成し、未完了あるいは完了済みのアプリケーションと試行をリスト化します。

ファイルシステム プロバイダ クラス(以下のspark.history.providerを見てください)を使っている場合、ログの基本ディレクトリはspark.history.fs.logDirectory設定オプションで提供され、基本ディレクトリはアプリケーションのログイベントを表すサブディレクトリを含まなければなりません。

sparkのジョブ自身はイベントを記録するように設定され、同じ共有、書き込み可能なディレクトリに記録するように設定されていなければなりません。例えば、もしサーバがhdfs://namenode/shared/spark-logsのログディレクトリを設定されている場合、クライアント側のオプションは以下のようになります:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

履歴サーバは以下のようにして設定することができます:

環境変数

環境変数意味
SPARK_DAEMON_MEMORY 履歴サーバに割り当てるメモリ (デフォルト: 1g)。
SPARK_DAEMON_JAVA_OPTS 履歴サーバのためのJVMオプション (デフォルト: none)。
SPARK_DAEMON_CLASSPATH 履歴サーバのためのクラスパス (デフォルト: none).
SPARK_PUBLIC_DNS 履歴サーバの公開アドレス。設定されない場合、アプリケーションの履歴へのリンクはサーバの内部アドレスが使われ、壊れたリンクとなります (デフォルト: none)。
SPARK_HISTORY_OPTS spark.history.* 履歴サーバの設定オプション (デフォルト: none)。

ローリングイベントログファイルに圧縮を適用

実行時間の長いアプリケーション(例えばストリーミング)は、1つの巨大なエベントログファイルになる可能性があり、維持にコストが掛かり、また Spark履歴サーバで更新ごとに再生するために大量のリソースが必要になります。

spark.eventLog.rolling.enabledspark.eventLog.rolling.maxFileSize を有効にすると、幾つかのシナリオで役立つが、それでもログの全体的なサイズを減らすのには役に立たないような、1つの巨大なイベントログファイルではなく、ローリングイベントログファイルを使用できるようになります。

Spark履歴サーバは、Spark履歴サーバ上のspark.history.fs.eventLog.rolling.maxFilesToRetain設定をすることで、ローリングイベントファイルに圧縮を適用してログの全体的なサイズを削減することができます。

詳細は以下で説明しますが、圧縮は損失のある操作であることに先に注意してください。圧縮により UI に表示されなくなるイベントが破棄されます - オプションを有効にする前に破棄されるイベントを確認することをお勧めします。

圧縮が発生すると、履歴サーバはアプリケーションで利用可能な全てのイベントログファイルを列挙し、圧縮のターゲットとして保持される最小のインデックスを持つファイルよりもインデックスが少ないイベントログファイルになるように考慮します。例えば、アプリケーション A に5つのイベントログがあり、spark.history.fs.eventLog.rolling.maxFilesToRetain が 2 に設定された場合、最初の3つのログファイルが圧縮対象として選択されます。

ターゲットを選択すると、それらを分析して除外できるイベントを特定し、除外することに決定したイベントを破棄して1つの圧縮ファイルに書きなおします。

圧縮は古いデータを指すイベントを除外しようとします。現在、除外するイベントの候補を以下に示します:

書き換えが完了すると、元のログファイルはベストエフォートの方式で削除されます。履歴サーバは元のファイルを削除できない場合がありますが、それは履歴サーバの操作には影響しません。

圧縮中にあまり多くのスペースが削減されないと判明した場合、Spark 履歴サーバは古いイベントログファイルを圧縮しない場合があることに注意してください。ストリーミングクエリの場合、各マイクロバッチが1つ以上のジョブをトリガーしてすぐに終了するため通常は圧縮が実行されると予測されますが、バッチクエリの場合は多くの場合で圧縮は実行されません。

これは Spark 3.0 で導入された新しい機能であり、完全には安定しているとは限らないことにも注意してください。状況によっては、圧縮により予想よりも多くのイベントが除外され、アプリケーションの履歴サーバで UI の問題が発生する場合があります。注意して使ってください。

Spark 履歴サーバの設定オプション

Spark Historyサーバのためのセキュリティ オプションはセキュリティのページの中でより詳細にカバーされます。

プロパティ名デフォルト意味これ以降のバージョンから
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider アプリケーション履歴のバックエンド実装のクラス名。今のところ、Sparkによって提供された1つの実装しかありません。これはファイルシステムに保存されたアプリケーションログを探します。 1.1.0
spark.history.fs.logDirectory file:/tmp/spark-events ファイルシステムの履歴プロバイダのために、URLはロードするアプリケーションのイベントログを含みますディレクトリです。これはローカルの file:// パス、HDFS パスhdfs://namenode/shared/spark-logs、あるいはHadoop APIによってサポートされる別のファイルシステムのパスです。 1.1.0
spark.history.fs.update.interval 10s ファイルシステム履歴プロバイダがログディレクトリ内で新規あるいは更新ログをチェックする周期。短い周期は、更新されたアプリケーションの再読み込みのサーバ負荷を犠牲にして、新しいアプリケーションを素早く検知します。更新が完了するとすぐに完了あるいは未完了のアプリケーションのリスト化は変更を反映するでしょう。 1.4.0
spark.history.retainedApplications 50 キャッシュ内にUIデータを保持するアプリケーションの数。この上限を超えると、一番古いアプリケーションがキャッシュから削除されます。アプリケーションがキャッシュに無い場合、それがUIからアクセスされる場合はディスクからロードされなければならないでしょう。 1.0.0
spark.history.ui.maxApplications Int.MaxValue 履歴サマリページ上で表示されるアプリケーションの数。履歴サマリページ上に表示されていない場合でも、それらのURLを直接アクセスすることでアプリケーションのUIがまだ利用可能です。 2.0.1
spark.history.ui.port 18080 履歴サーバのwebインタフェースが紐付けられるポート。 1.0.0
spark.history.kerberos.enabled false 履歴サーバがログインするためにkerberosを使わなければならないかどうかを示す。もし履歴サーバがsecureなHadoopクラスタ上のHDFSファイルにアクセスする場合にこれが必要とされます。 1.0.1
spark.history.kerberos.principal (none) spark.history.kerberos.enabled=true の場合、履歴サーバの kerberos プリンシパル名を指定します。 1.0.1
spark.history.kerberos.keytab (none) spark.history.kerberos.enabled=true の場合、履歴サーバの kerberosキータブファイルの場所を指定します。 1.0.1
spark.history.fs.cleaner.enabled false 履歴サーバが定期的にイベントログをストレージから削除するかどうかを指定します。 1.4.0
spark.history.fs.cleaner.interval 1d spark.history.fs.cleaner.enabled=true の場合、ファイルシステムジョブ履歴クリーナーが削除するファイルの確認頻度を指定します。2つのうち少なくとも1つの条件が満たされた場合、ファイルが削除されます。まず、spark.history.fs.cleaner.maxAge より古い場合は削除されます。ファイルの数が spark.history.fs.cleaner.maxNum より多い場合にも削除されます。Spark は最も古い試行時間の順にアプリケーションから完了した試行をクリーンアップしようとします。 1.4.0
spark.history.fs.cleaner.maxAge 7d spark.history.fs.cleaner.enabled=true の場合、これより古いジョブ履歴ファイルは、ファイルシステム履歴クリーナーの実行時に削除されます。 1.4.0
spark.history.fs.cleaner.maxNum Int.MaxValue spark.history.fs.cleaner.enabled=true の場合、イベントログディレクトリ内のファイルの最大数を指定します。Spark は完了した試行ログをクリーンアップして、ログディレクトリをこの制限内に維持しようとします。これは、HDFS の `dfs.namenode.fs-limits.max-directory-items` のような基本的なファイルシステムの制限よりも小さくする必要があります。 3.0.0
spark.history.fs.endEventReparseChunkSize 1m 最後のイベントを見つけるためにログファイルの最後にどれだけのバイトをパースするか。これはイベントログファイルの不要な部分をスキップすることでアプリケーションのリスティングの生成をスピードアップするために使われます。この設定を0に設定することで無効にすることができます。 2.4.0
spark.history.fs.inProgressOptimization.enabled true 進行中のログの最適化された処理を有効にする。このオプションは進行中としてリスト化されたイベントログのリネームに失敗した完了したアプリケーションをそのままにするかもしれません。 2.4.0
spark.history.fs.driverlog.cleaner.enabled spark.history.fs.cleaner.enabled 履歴サーバが定期的にドライバーログをストレージから削除するかどうかを指定する。 3.0.0
spark.history.fs.driverlog.cleaner.interval spark.history.fs.cleaner.interval spark.history.fs.driverlog.cleaner.enabled=true の場合、ファイルシステムドライバログクリーナーが削除するファイルの確認頻度を指定します。もしファイルがspark.history.fs.driverlog.cleaner.maxAgeより古い場合は、それらは削除されます。 3.0.0
spark.history.fs.driverlog.cleaner.maxAge spark.history.fs.cleaner.maxAge spark.history.fs.driverlog.cleaner.enabled=true の場合、これより古いドライバログファイルは、ドライバログクリーナーの実行時に削除されます。 3.0.0
spark.history.fs.numReplayThreads 利用可能なコアの25% 履歴サーバによって使われイベントログを処理するためのスレッドの数。 2.0.0
spark.history.store.maxDiskUsage 10g キャッシュ アプリケーション履歴情報が格納されているローカルディレクトリのディスク使用量の最大値。 2.3.0
spark.history.store.path (none) アプリケーションの履歴データをキャッシュするローカルディレクトリ。設定された場合、履歴サーバはアプリケーションのデータをメモリ内に維持する代わりにディスクに格納するでしょう。ディスクに書き込まれたデータは履歴サーバが再起動した時に再利用されるでしょう。 2.3.0
spark.history.custom.executor.log.url (none) 履歴サーバでクラスタマネージャーのアプリケーションログの URL を使う代わりに、外部のログサービスをサポートするための独自の spark executor ログの URL を指定します。Spark はクラスタマネージャーで変更できるパターンを介して、幾つかのパス変数をサポートします。クラスタマネージャーでどのパターンがサポートされるかを調べるには、ドキュメントがある場合はドキュメントを調べてください。この設定はライブアプリケーションには影響せず、履歴サーバにのみ影響します。

現在のところ、YARN モードのみがこの設定をサポートします。

3.0.0
spark.history.custom.executor.log.url.applyIncompleteApplication false カスタム spark executor ログの URL を不完全なアプリケーションにも適用するかどうかを指定します。実行中のアプリケーションの executor ログを元のログの URL として指定する必要がある場合は、これを `false` に設定します。不完全なアプリケーションには、正常にシャットダウンしなかったアプリケーションが含まれる場合があることに注意してください。これが `true` に設定されていても、この設定はライブアプリケーションには影響せず、履歴サーバにのみ影響します。 3.0.0
spark.history.fs.eventLog.rolling.maxFilesToRetain Int.MaxValue 非圧縮として保持されるイベントログファイルの最大数。デフォルトでは、全てのイベントログファイルが保持されます。技術的な理由により、最小値は 1 です。
詳細については、"古いイベントログファイルの圧縮の適用" のセクションを読んでください。
3.0.0
spark.history.store.hybridStore.enabled false イベントログのパース時にHybridStoreをストアとして使うかどうか。HybridStoreはデータを最初にインメモリストアに書き込み、インメモリストアへの書き込みが完了した後でデータをディスクストアにダンプするバックグランドスレッドを持ちます。 3.1.0
spark.history.store.hybridStore.maxMemoryUsage 2g HybridStoreを作成するために使える最大メモリ空間。HybridStoreはヒープメモリを共有利用するため、HybridStoreが有効にされた場合はヒープメモリをSHSのメモリオプションを介して増やす必要があります。 3.1.0

これら全てのUI内でテーブルはヘッダーをクリックすることでソート可能です。遅いタスク、データのゆがみなどを識別するのを容易にします。

注意

  1. 履歴サーバは完了および未完了のSparkジョブを表示します。障害の後でアプリケーションが複数回の試行をする場合、外に出ていく全ての未完了の試行あるいは最後に成功した試行と共に、失敗した試行が表示されるでしょう。

  2. 未完了のアプリケーションは断続的に更新されるだけです。更新の間の時間は変更されたファイルのチェックの間隔によって定義されます(spark.history.fs.update.interval)。大きなクラスターほど更新の間隔は大きな値に設定されるかも知れません。実行中のアプリケーションを見る方法は実際には自身のweb UIを見ることです。

  3. 完了したと自身を登録しないで終了したアプリケーションは、未完了としてリスト化されるでしょう - それらがもう実行していないとしてもです。これはアプリケーションがクラッシュした場合に起こりえます。

  4. Sparkジョブの完了の信号を送る1つの方法はSparkのContextを明示的に終了することです (sc.stop())。あるいは、Pythonではwith SparkContext() as sc: を使ってSparkのContextのsteupとtear downを処理するために構築します。

REST API

UI内でマトリックスを見ることに加えて、それらはJSONとして利用可能です。これにより開発者は新しい表示方法およびSparkのための監視ツールを簡単に作成することができます。JSONは実行中のアプリケーションおよび履歴サーバ内の両方で利用可能です。エンドポイントは /api/v1にマウントされます。例えば、履歴サーバの場合、それらは一般的にhttp://<server-url>:18080/api/v1でアクセスすることができ、実行中のアプリケーションの場合http://localhost:4040/api/v1で利用可能です。

APIの中では、アプリケーションはアプリケーションID [app-id]によって参照されます。YARN上で実行中の場合、各アプリケーションは複数の試行を持つかも知れませんが、クライアントモードでのアプリケーションと異なり、クラスタモードでのアプリケーションの試行IDです。YARNクラスタモードのアプリケーションはそれらの [attempt-id]によって識別することができます。以下でリスト化されているAPIでは、[app-id] は実際には[base-app-id]/[attempt-id]で、[base-app-id] はYARNのアプリケーションIDです。

エンドポイント意味
/applications 全てのアプリケーションのリスト。
?status=[completed|running] 選択された状態のアプリケーションのみをリスト化します。
?minDate=[date] リスト化する最も早い開始日付/時間。
?maxDate=[date] リスト化するもっとも最近の開始日付/時間。
?minEndDate=[date] リスト化する最も早い終了日付/時間。
?maxEndDate=[date] リスト化する最も最近の終了日付/時間。
?limit=[limit] リスト化するアプリケーションの数を制限する。
例:
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs 指定されたアプリケーションの全てのジョブのリスト。
?status=[running|succeeded|failed|unknown] 特定の状態のジョブのみをリスト化します。
/applications/[app-id]/jobs/[job-id] 指定されたジョブの詳細。
/applications/[app-id]/stages 指定されたアプリケーションの全てのステージのリスト。
?status=[active|complete|pending|failed] は指定された状態のステージのみをリスト化します。
?details=true はタスクデータを持つ全てのステージをリスト化します。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] は指定されたタスク状態を持つタスクのみをリスト化します。クエリパラメータ taskStatus はdetails=trueの時のみ効果があります。指定された全てのタスク状態に一致する全てのタスクを返す ?details=true&taskStatus=SUCCESS&taskStatus=FAILED のような複数のtaskStatusもサポートします。
?withSummaries=trueは、タスクメトリクスの分布とexecutorメトリクスの分布を含むステージをリスト化します。
?quantiles=0.0,0.25,0.5,0.75,1.0 は指定された分位のメトリクスを要約します。クエリパラメータ quantiles はwithSummaries=trueの時のみ効果があります。デフォルト値は0.0,0.25,0.5,0.75,1.0です。
/applications/[app-id]/stages/[stage-id] 指定されたステージの全ての試行のリスト。
?details=true は指定されたステージのタスクデータを持つ全ての試行をリスト化します。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] は指定されたタスク状態を持つタスクのみをリスト化します。クエリパラメータ taskStatus はdetails=trueの時のみ効果があります。指定された全てのタスク状態に一致する全てのタスクを返す ?details=true&taskStatus=SUCCESS&taskStatus=FAILED のような複数のtaskStatusもサポートします。
?withSummaries=trueは、各試行のタスクメトリクスの分布とexecutorメトリクスの分布を含むステージをリスト化します。
?quantiles=0.0,0.25,0.5,0.75,1.0 は指定された分位のメトリクスを要約します。クエリパラメータ quantiles はwithSummaries=trueの時のみ効果があります。デフォルト値は0.0,0.25,0.5,0.75,1.0です。
例:
?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 指定されたステージの試行についての詳細。
?details=true は指定されたステージの試行の全てのタスクデータをリスト化します。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] は指定されたタスク状態を持つタスクのみをリスト化します。クエリパラメータ taskStatus はdetails=trueの時のみ効果があります。指定された全てのタスク状態に一致する全てのタスクを返す ?details=true&taskStatus=SUCCESS&taskStatus=FAILED のような複数のtaskStatusもサポートします。
?withSummaries=true は、指定されたステージの試行のタスクメトリクスの分布とexecutorメトリクスの分布をリスト化します。
?quantiles=0.0,0.25,0.5,0.75,1.0 は指定された分位のメトリクスを要約します。クエリパラメータ quantiles はwithSummaries=trueの時のみ効果があります。デフォルト値は0.0,0.25,0.5,0.75,1.0です。
例:
?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 指定されたステージの試行の中の全てのタスクのマトリックスの概要。
?quantiles 指定された変位内の測定基準を要約します。
例: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 指定されたステージの試行の全てのタスクのリスト。
?offset=[offset]&length=[len] 指定された範囲のタスクをリスト化します。
?sortBy=[runtime|-runtime] タスクをソートします。
?status=[running|success|killed|failed|unknown] はその状態のタスクのみをリスト化します。
例: ?offset=10&length=50&sortBy=runtime&status=running
/applications/[app-id]/executors 指定されたアプリケーションの全ての動作中のexecutorのリスト。
/applications/[app-id]/executors/[executor-id]/threads 指定されたアクティブなexecutorの中で実行中の全てのスレッドのトレースを積み重ねます。履歴サーバを介しては利用できません。
/applications/[app-id]/allexecutors 指定されたアプリケーションの全て(動作中および停止)のexecutorのリスト。
/applications/[app-id]/storage/rdd 指定されたアプリケーションのための保存されたRDDのリスト。
/applications/[app-id]/storage/rdd/[rdd-id] 指定されたRDDのストレージ状態の詳細。
/applications/[base-app-id]/logs 指定されたアプリケーションの全ての試みについてのイベントログをzipファイルのファイルとしてダウンロードします。
/applications/[base-app-id]/[attempt-id]/logs 指定されたアプリケーションの試行のイベントログをzipファイルとしてダウンロードします。
/applications/[app-id]/streaming/statistics ストリーミング コンテキストのための統計。
/applications/[app-id]/streaming/receivers 全てのストリーミング レシーバーのリスト。
/applications/[app-id]/streaming/receivers/[stream-id] 指定されたレシーバーの詳細。
/applications/[app-id]/streaming/batches 全ての保持されたバッチのリスト。
/applications/[app-id]/streaming/batches/[batch-id] 指定されたバッチの詳細。
/applications/[app-id]/streaming/batches/[batch-id]/operations 指定されたバッチの全ての出力操作のリスト。
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] 指定された操作と指定されたバッチの称しあ。
/applications/[app-id]/sql 指定されたアプリケーションの全てのクエリをリスト化します。
?details=[true (default) | false] はSparkプランノードの詳細をリスト化/隠します。
?planDescription=[true (default) | false] は、物理プランのサイズが大きい時にオンデマンドで物理プランの説明を有効/無効にします。
?offset=[offset]&length=[len] は指定された範囲のクエリをリスト化します。
/applications/[app-id]/sql/[execution-id] 指定されたクエリの詳細
?details=[true (default) | false] は指定されたクエリの詳細に加えてメトリクスの詳細をリスト化/隠します。
?planDescription=[true (default) | false] は、物理プランのサイズが大きい時にオンデマンドで指定されたクエリについての物理プランの説明を有効/無効にします。
/applications/[app-id]/environment 指定されたアプリケーションの環境の詳細。
/version 現在のsparkバージョンを取得します。

取り出すことができるジョブとステージの数はスタンドアローンのSpark UIの保持機構によって制限されます; "spark.ui.retainedJobs" はジョブ上でトリガーするガベージコレクションの閾値を定義し、spark.ui.retainedStagesはステージ上のそれを定義します。ガベージコレクションは再生によって起こることに注意してください: それらの値を増加し履歴サーバを再起動することで、より多くのエントリを取り出すことができます。

Executorのタスクのメトリクス

REST APIはタスクの実行を保証しながらSpark executorによって収集されたタスクのメトリクスの値を公開します。このメソッドはパフォーマンスのトラブルシューティングとワークロードの特徴付けに使うことができます。簡単な説明と利用可能なメトリクスのリスト:

Spark Executor のタスクのメトリクス名 簡単な説明
executorRunTime executorがこのタスクを実行するために費やす時間。これはシャッフルデータを取得する時間を含みます。値はミリ秒で表現されます。
executorCpuTime executorがこのタスクを実行するために費やすCPU時間。これはシャッフルデータを取得する時間を含みます。値はナノ秒で表現されます。
executorDeserializeTime このタスクをデシリアライズするために費やす時間。値はミリ秒で表現されます。
executorDeserializeCpuTime executor上でこのタスクをデシリアライズするために掛かるCPU時間。値はナノ秒で表現されます。
resultSize このタスクがTaskResultとしてドライバに送り返されるバイト数。
jvmGCTime JVMがこのタスクを実行する間にガベージコレクションで費やす時間。値はミリ秒で表現されます。
resultSerializationTime タスクの結果をシリアライズするために費やす時間。値はミリ秒で表現されます。
memoryBytesSpilled このタスクによって浪費されたインメモリのバイト数。
diskBytesSpilled このタスクによって浪費されたディスク上のバイト数。
peakExecutionMemory シャッフル、集約およびjoin中に作成された内部データ構造によって使われるピークメモリ。このaccumulatorの値はこのタスク内で作成されたそのような全てのデータ構造に渡るピークサイズのおおよその合計でなければなりません。SQLジョブについては、これは全ての安全では無いオペレータと ExternalSort のみを追跡します。
inputMetrics.* org.apache.spark.rdd.HadoopRDD または永続データから読み込まれるデータに関係するメトリクス。
    .bytesRead 読み込まれた総バイト数。
    .recordsRead 読み込まれた総レコード数。
outputMetrics.* 外部(例えば、分散ファイルシステム)へのデータ書き込みに関係するメトリクス。出力を持つタスクのみで定義されます。
    .bytesWritten 書き込まれた総バイト数。
    .recordsWritten 書き込まれた総レコード数。
shuffleReadMetrics.* シャッフル読み込みオペレーションに関係するメトリクス。
    .recordsRead シャッフル操作で読み込まれたレコード数。
    .remoteBlocksFetched シャッフル操作で取得されたリモートブロックの数。
    .localBlocksFetched シャッフルオペレーションで取得された(リモートのexecutorからの読み込みに対して)ローカルブロックの数
    .totalBlocksFetched シャッフル操作で取得されたブロックの数 (ローカルおよびリモートの両方)
    .remoteBytesRead シャッフル操作で読み込まれたリモートバイトの数
    .localBytesRead (リモートexecutorからの読み込みに対して)ローカルディスクからのシャッフルオペレーションの読み込みのバイト数
    .totalBytesRead シャッフル操作で読み込まれたバイト数 (ローカルおよびリモートの両方)
    .remoteBytesReadToDisk シャッフル操作でディスクに読み込まれたリモートバイトの数メモリへの読み込みに対して、シャッフルの読み込みオペレーションで大きなブロックがディスクへ取得されます。これはデフォルトの挙動です。
    .fetchWaitTime タスクがリモートのシャッフルブロックを待つ時間。これはシャッフル入力データでブロックする時間のみを含みます。例えば、もしタスクが処理中のブロックAをまだ完了していない間にブロックBが取得される場合、ブロックB上でブロックしているとは見なされません。値はミリ秒で表現されます。
shuffleWriteMetrics.* シャッフルデータを書き込む操作に関係するメトリクス。
    .bytesWritten シャッフル操作で書き込まれるバイト数
    .recordsWritten シャッフル操作で書き込まれるレコードの数
    .writeTime ディスクあるいはバッファキャッシュへの書き込みのブロックで費やされる時間。値はナノ秒で表現されます。

Executor のメトリクス

executor レべルのメトリクスは Heartbeat の一部として各 executor からドライバに送信され、JVM ヒープメモリや GC 情報などの executor 自身のパフォーマンスメトリクスを説明します。executor メトリクスの値と executor ごとの測定されたメモリピーク値は、REST API を使って JSON 形式および Prometheus 形式で公開されます。JSON のエンドポイントは、以下で公開されます: /applications/[app-id]/executors。そして、Prometheusのエンドポイントは以下で公開されます: /metrics/executors/prometheus。The Prometheus endpoint is conditional to a configuration parameter: spark.ui.prometheus.enabled=true (the default is false). さらに、spark.eventLog.logStageExecutorMetrics が true の場合、executor メモリメトリクスの集計されたステージごとのピーク値がイベントログに書き込まれます。
executorのメモリメトリクスもDropwizard metrics libraryに基づくSparkメトリクスシステムを介して公開されます。簡単な説明と利用可能なメトリクスのリスト:

executor レベルメトリクス名 簡単な説明
rddBlocks この executor のブロックマネージャーの RDD ブロック。
memoryUsed この executor で使われるストレージメモリ。
diskUsed この executor で RDD ストレージのために使われるディスク容量。
totalCores この executor で利用可能なコア数。
maxTasks この executor で同時に実行できるタスクの最大数。
activeTasks 現在実行中のタスクの数。
failedTasks この executor で失敗したタスクの数。
completedTasks この executor で完了したタスクの数。
totalTasks この executor のタスク(実行中、失敗、完了)の総数。
totalDuration この executor で JVM がタスクの実行に費やした経過時間。値はミリ秒で表現されます。
totalGCTime この executor で JVM がガベージコレクションに費やした経過時間の合計。値はミリ秒で表現されます。
totalInputBytes この executor の入力バイトの合計。
totalShuffleRead この executor のシャッフル読み込みバイトの合計。
totalShuffleWrite この executor のシャッフル書き込みバイトの合計。
maxMemory ストレージに利用可能なメモリの総量のバイト数。
memoryMetrics.* メモリメトリクスの現在の値:
    .usedOnHeapStorageMemory 現在ストレージ用に使用されているオンヒープメモリのバイト数。
    .usedOffHeapStorageMemory 現在ストレージ用に使用されているオフヒープメモリのバイト数。
    .totalOnHeapStorageMemory ストレージに利用可能なオンヒープメモリの総バイト数。この量は、MemoryManager の実装によって時間とともに変化するかもしれません。
    .totalOffHeapStorageMemory ストレージに利用可能なオフヒープメモリの総バイト数。この量は、MemoryManager の実装によって時間とともに変化するかもしれません。
peakMemoryMetrics.* メモリ(および GC)メトリクスのピーク値:
    .JVMHeapMemory オブジェクト割り当てに使われるヒープのピークメモリ使用量。ヒープは1つ以上のメモリプールで構成されます。The used and committed size of the returned memory usage is the sum of those values of all heap memory pools whereas the init and max size of the returned memory usage represents the setting of the heap memory which may not be the sum of those of all heap memory pools. The amount of used memory in the returned memory usage is the amount of memory occupied by both live objects and garbage objects that have not been collected, if any.
    .JVMOffHeapMemory Java 仮想マシーンによって使われる非ヒープメモリのピークメモリ使用量。非ヒープメモリは1つ以上のメモリプールから成ります。The used and committed size of the returned memory usage is the sum of those values of all non-heap memory pools whereas the init and max size of the returned memory usage represents the setting of the non-heap memory which may not be the sum of those of all non-heap memory pools.
    .OnHeapExecutionMemory 使用中のオンヒープ実行メモリのバイト数。
    .OffHeapExecutionMemory 使用中のオフヒープ実行メモリのバイト数。
    .OnHeapStorageMemory 使用中のオンヒープストレージメモリのバイト数。
    .OffHeapStorageMemory 使用中のオフヒープストレージメモリのバイト数。
    .OnHeapUnifiedMemory オンヒープメモリのピーク (実行およびストレージ)。
    .OffHeapUnifiedMemory オフヒープメモリのピーク (実行およびストレージ)。
    .DirectPoolMemory JVM がダイレクトバッファプールに使っているピークメモリ (java.lang.management.BufferPoolMXBean)
    .MappedPoolMemory JVM がマップバッファプールに使っているピークメモリ (java.lang.management.BufferPoolMXBean)
    .ProcessTreeJVMVMemory 仮想メモリサイズのバイト数。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreeJVMRSSMemory 常駐セットサイズ: プロセスが実メモリに保持しているページ数。これは、テキスト、データ、またはスタック領域にカウントされるページです。これには、デマンドロードされていないページやスワップアウトされたページは含まれません。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreePythonVMemory Python の仮想メモリサイズのバイト数。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreePythonRSSMemory Python の常駐セットサイズ。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreeOtherVMemory 他の種類のプロセスの仮想メモリサイズのバイト数。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreeOtherRSSMemory 他の種類のプロセスの常駐セットサイズ。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .MinorGCCount マイナー GC カウントの合計。例えば、ガベージコレクタは Copy、PS Scavenge、ParNew、G1 Young Generation などのいずれかです。
    .MinorGCTime マイナー GC 経過時間の合計。値はミリ秒で表現されます。
    .MajorGCCount メジャー GC カウントの合計。例えば、ガベージコレクタは MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation などのいずれかです。
    .MajorGCTime メジャー GC 経過時間の合計。値はミリ秒で表現されます。

RSS と Vmem の計算は proc(5) に基づいています。

API バージョニング ポリシー

これらのエンドポイントはその上でアプリケーションを開発を容易にするために熱心にバージョン付けされています。特に、Sparkは以下を保証します:

実行中のアプリケーションのUIを検証する時でも、applications/[app-id]の分割はたった一つのアプリケーションだけが利用可能だとしてもまだ必要です。例えば、実行中のアプリケーションのジョブのリストを見るには、http://localhost:4040/api/v1/applications/[app-id]/jobsに行きます。これは両方のモードにおいてパスを矛盾が無い状態にします。

マトリックス

SparkはDropwizard Metrics Libraryに基づいた設定可能なマトリックスシステムを持っています。これにより、ユーザはSparkマトリックスにHTTP, JMX およびCSV ファイルを含む様々なsinkを伝えることができます。メトリクスは Spark コードベースに埋め込まれたソースによって生成されます。特定のアクティビティと Spark コンポーネントのインストルメンテーションを提供します。このマトリクスシステムは、Sparkが $SPARK_HOME/conf/metrics.propertiesに存在すると期待する設定ファイルを通じて設定することができます。独自のファイルの場所はspark.metrics.conf設定プロパティによって指定することができます。設定ファイルを使う代わりに、接頭辞 spark.metrics.conf. が付いた一連の設定パラメータを使うことができます。デフォルトでは、ドライバまたは executor のメトリクスに使われるルート名前空間は、spark.app.id の値です。しかし、しばしば、ユーザはドライバーとexecutorについてアプリケーションを横断してメトリックスを追跡したいと思います。これはアプリケーションID(つまり、spark.app.id) を使って行うのは、アプリケーションの各起動の度に変更するため、難しいです。そのようなユースケースについては、spark.metrics.namespace 設定プロパティを使ってメトリクスのレポートのために独自の名前空間を指定することができます。例えば、ユーザがメトリクスの名前空間をアプリケーションの名前に設定したい場合、spark.metrics.namespace プロパティを ${spark.app.name} のような値に設定することができます。この値はSparkによって適切に展開され、メトリックス システムのルートの名前空間として使われます。どのようなドライバおよびexecutorのメトリックスも spark.app.id を接頭語に使いませんし、どのような spark.metrics.namespace プロパティもそのようなメトリックスに影響しません。

SparkのマトリックスはSparkコンポーネントに対応する異なる インスタンス に分離されます。各インスタンスの中で、マトリックが伝える一連のsinkを設定することができます。現在のところ以下のインスタンスがサポートされています:

各インスタンスは0個以上のsinksに対して報告することができます。sinkはorg.apache.spark.metrics.sink パッケージに含まれています。

Sparkはライセンスの制限のためにデフォルトのビルドに含まれていないGanglia sinkもサポートします:

GangliaSinkをインストールするには、Sparkのカスタムビルドを実行する必要があるでしょう。このライブラリを組み込むことでSparkパッケージにLGPLでライセンスされたコードが含まれることに注意してください。sbt ユーザに関しては、ビルドする前に SPARK_GANGLIA_LGPL 環境変数を設定してください。Mavenユーザに関しては、 -Pspark-ganglia-lgpl プロファイルを有効にします。クラスタのSparkビルドを修正することに加えて、ユーザアプリケーションは spark-ganglia-lgpl アーティファクトにリンクする必要があります。

メトリクス設定ファイルの構文と各シンクで利用可能なパラメータは、サンプル設定ファイル $SPARK_HOME/conf/metrics.properties.template で定義されています。

メトリクス設定ファイルの代わりに Spark 設定パラメータを使う場合、関連するパラメータ名は接頭辞 spark.metrics.conf. とそれに続く設定詳細で構成されます。つまり、パラメータは以下の形式をとります: spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]。この例は、Graphite シンクの Spark 構成パラメータのリストを示します:

"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"

Spark メトリクス設定のデフォルト値は、以下の通りです:

"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"

追加のソースは、メトリクス設定ファイルあるいは設定パラメータ spark.metrics.conf.[component_name].source.jvm.class=[source_name] を使って設定できます。現在のところ、JVM ソースが唯一利用可能なオプションのソースです。例えば、以下の設定パラメータは JVM ソースをアクティブにします: "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"

利用可能なメトリクスプロバイダのリスト

Spark によって使われるメトリクスには、ゲージ、カウンタ、ヒストグラム、メータ、タイマーなど、複数のタイプがあります。詳細は、Dropwizard ライブラリのドキュメントを見てください。以下のコンポーネントとメトリクスのリストは、コンポーネントインスタンスとソース名前空間ごとにグループ化された、使用可能なメトリクスについての名前と幾つかの詳細を報告します。Spark インスツルメンツで使われるメトリクスのもっとも一般的なものは、ゲージとカウンタです。カウンタは.count サフィックスを持つものとして認識されます。タイマー、メータおよびヒストグラムはリストで注釈を付けられ、リストの残りの要素はゲージタイプのメトリクスです。メトリクスの大部分は親コンポーネントインスタンスが設定されるとすぐにアクティブになります。一部のメトリクスは追加の設定パラメータを介して有効にする必要もあります。詳細はリストの中で報告されます。

Component instance = Driver

This is the component with the largest amount of instrumented metrics

Component instance = Executor

これらのメトリクスはSpark executorによって公開されます。

Source = JVM Source

注意:

Component instance = applicationMaster

Note: applies when running on YARN

Component instance = mesos_cluster

Note: applies when running on mesos

Component instance = master

Note: applies when running in Spark standalone as master

Component instance = ApplicationSource

Note: applies when running in Spark standalone as master

Component instance = worker

Note: applies when running in Spark standalone as worker

Component instance = shuffleService

Note: applies to the shuffle service

上級の計測器

Sparkのジョブのパフォーマンスをプロファイルするために幾つかの外部ツールを使用することができます。

Spark はプラグイン API も提供するため、カスタムインストルメンテーションコードを Spark アプリケーションに追加することができます。プラグインを Spark にロードするために2つの設定キーがあります:

どちらも、org.apache.spark.api.plugin.SparkPlugin インタフェースを実装するクラス名のカンマ区切りのリストを取ります。2つの名前が存在するため、1つのリストを Spark のデフォルト設定ファイルに配置できるため、ユーザは設定ファイルのリストを上書きせずにコマンドラインから他のプラグインを簡単に追加することができます。重複するプラグインは無視されます。

TOP
inserted by FC2 system