監視および計測器
Sparkアプリケーションを監視するいくつかの方法があります: web UI、マトリクス、および実験的な計測器。
Web インタフェース
各SparkContextはデフォルトではポート4040でWeb UIを起動します。これはアプリケーションに関する有用な情報を表示します。以下のものが含まれます:
- スケジューラのステージおよびタスクのリスト
- RDDサイズおよびメモリの使用量の概要
- 環境情報
- 実行中のexecutorの情報
単にwebブラウザでhttp://<driver-node>:4040
を開くことで、このインタフェースにアクセスできます。同じホスト上で複数のSparkContextが実行中の場合、それらは4040から始まる連続したポートに割り当てられるでしょう(4041, 4042など)。
デフォルトではこの情報はアプリケーションの持続期間のみ利用可能です。後でwebUIを見るためには、アプリケーションの開始前にspark.eventLog.enabled
をtrueに設定します。これはUIに表示された情報を符号化したSparkイベントを永続的なストレージに記録するようにSparkを設定します。
後で見る
アプリケーションのイベントログが存在する場合、Sparkの履歴サーバを使ってアプリケーションのUIを構築することが可能です。以下を実行することで履歴サーバを開始することができます:
./sbin/start-history-server.sh
これはデフォルトで http://<server-url>:18080
にwebインタフェースを生成し、未完了あるいは完了済みのアプリケーションと試行をリスト化します。
ファイルシステム プロバイダ クラス(以下のspark.history.provider
を見てください)を使っている場合、ログの基本ディレクトリはspark.history.fs.logDirectory
設定オプションで提供され、基本ディレクトリはアプリケーションのログイベントを表すサブディレクトリを含まなければなりません。
sparkのジョブ自身はイベントを記録するように設定され、同じ共有、書き込み可能なディレクトリに記録するように設定されていなければなりません。例えば、もしサーバがhdfs://namenode/shared/spark-logs
のログディレクトリを設定されている場合、クライアント側のオプションは以下のようになります:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs
履歴サーバは以下のようにして設定することができます:
環境変数
環境変数 | 意味 |
---|---|
SPARK_DAEMON_MEMORY |
履歴サーバに割り当てるメモリ (デフォルト: 1g)。 |
SPARK_DAEMON_JAVA_OPTS |
履歴サーバのためのJVMオプション (デフォルト: none)。 |
SPARK_DAEMON_CLASSPATH |
履歴サーバのためのクラスパス (デフォルト: none). |
SPARK_PUBLIC_DNS |
履歴サーバの公開アドレス。設定されない場合、アプリケーションの履歴へのリンクはサーバの内部アドレスが使われ、壊れたリンクとなります (デフォルト: none)。 |
SPARK_HISTORY_OPTS |
spark.history.* 履歴サーバの設定オプション (デフォルト: none)。
|
ローリングイベントログファイルに圧縮を適用
実行時間の長いアプリケーション(例えばストリーミング)は、1つの巨大なエベントログファイルになる可能性があり、維持にコストが掛かり、また Spark履歴サーバで更新ごとに再生するために大量のリソースが必要になります。
spark.eventLog.rolling.enabled
と spark.eventLog.rolling.maxFileSize
を有効にすると、幾つかのシナリオで役立つが、それでもログの全体的なサイズを減らすのには役に立たないような、1つの巨大なイベントログファイルではなく、ローリングイベントログファイルを使用できるようになります。
Spark履歴サーバは、Spark履歴サーバ上のspark.history.fs.eventLog.rolling.maxFilesToRetain
設定をすることで、ローリングイベントファイルに圧縮を適用してログの全体的なサイズを削減することができます。
詳細は以下で説明しますが、圧縮は損失のある操作であることに先に注意してください。圧縮により UI に表示されなくなるイベントが破棄されます - オプションを有効にする前に破棄されるイベントを確認することをお勧めします。
圧縮が発生すると、履歴サーバはアプリケーションで利用可能な全てのイベントログファイルを列挙し、圧縮のターゲットとして保持される最小のインデックスを持つファイルよりもインデックスが少ないイベントログファイルになるように考慮します。例えば、アプリケーション A に5つのイベントログがあり、spark.history.fs.eventLog.rolling.maxFilesToRetain
が 2 に設定された場合、最初の3つのログファイルが圧縮対象として選択されます。
ターゲットを選択すると、それらを分析して除外できるイベントを特定し、除外することに決定したイベントを破棄して1つの圧縮ファイルに書きなおします。
圧縮は古いデータを指すイベントを除外しようとします。現在、除外するイベントの候補を以下に示します:
- 終了したジョブのイベントと、関連するステージ/タスク イベント
- 終了した executor のイベント
- 終了した SQL 実行のイベントと、関連するジョブ/ステージ/タスク イベント。
書き換えが完了すると、元のログファイルはベストエフォートの方式で削除されます。履歴サーバは元のファイルを削除できない場合がありますが、それは履歴サーバの操作には影響しません。
圧縮中にあまり多くのスペースが削減されないと判明した場合、Spark 履歴サーバは古いイベントログファイルを圧縮しない場合があることに注意してください。ストリーミングクエリの場合、各マイクロバッチが1つ以上のジョブをトリガーしてすぐに終了するため通常は圧縮が実行されると予測されますが、バッチクエリの場合は多くの場合で圧縮は実行されません。
これは Spark 3.0 で導入された新しい機能であり、完全には安定しているとは限らないことにも注意してください。状況によっては、圧縮により予想よりも多くのイベントが除外され、アプリケーションの履歴サーバで UI の問題が発生する場合があります。注意して使ってください。
Spark 履歴サーバの設定オプション
Spark Historyサーバのためのセキュリティ オプションはセキュリティのページの中でより詳細にカバーされます。
プロパティ名 | デフォルト | 意味 | これ以降のバージョンから |
---|---|---|---|
spark.history.provider | org.apache.spark.deploy.history.FsHistoryProvider |
アプリケーション履歴のバックエンド実装のクラス名。今のところ、Sparkによって提供された1つの実装しかありません。これはファイルシステムに保存されたアプリケーションログを探します。 | 1.1.0 |
spark.history.fs.logDirectory | file:/tmp/spark-events |
ファイルシステムの履歴プロバイダのために、URLはロードするアプリケーションのイベントログを含みますディレクトリです。これはローカルの file:// パス、HDFS パスhdfs://namenode/shared/spark-logs 、あるいはHadoop APIによってサポートされる別のファイルシステムのパスです。
|
1.1.0 |
spark.history.fs.update.interval | 10s | ファイルシステム履歴プロバイダがログディレクトリ内で新規あるいは更新ログをチェックする周期。短い周期は、更新されたアプリケーションの再読み込みのサーバ負荷を犠牲にして、新しいアプリケーションを素早く検知します。更新が完了するとすぐに完了あるいは未完了のアプリケーションのリスト化は変更を反映するでしょう。 | 1.4.0 |
spark.history.retainedApplications | 50 | キャッシュ内にUIデータを保持するアプリケーションの数。この上限を超えると、一番古いアプリケーションがキャッシュから削除されます。アプリケーションがキャッシュに無い場合、それがUIからアクセスされる場合はディスクからロードされなければならないでしょう。 | 1.0.0 |
spark.history.ui.maxApplications | Int.MaxValue | 履歴サマリページ上で表示されるアプリケーションの数。履歴サマリページ上に表示されていない場合でも、それらのURLを直接アクセスすることでアプリケーションのUIがまだ利用可能です。 | 2.0.1 |
spark.history.ui.port | 18080 | 履歴サーバのwebインタフェースが紐付けられるポート。 | 1.0.0 |
spark.history.kerberos.enabled | false | 履歴サーバがログインするためにkerberosを使わなければならないかどうかを示す。もし履歴サーバがsecureなHadoopクラスタ上のHDFSファイルにアクセスする場合にこれが必要とされます。 | 1.0.1 |
spark.history.kerberos.principal | (none) |
spark.history.kerberos.enabled=true の場合、履歴サーバの kerberos プリンシパル名を指定します。
|
1.0.1 |
spark.history.kerberos.keytab | (none) |
spark.history.kerberos.enabled=true の場合、履歴サーバの kerberosキータブファイルの場所を指定します。
|
1.0.1 |
spark.history.fs.cleaner.enabled | false | 履歴サーバが定期的にイベントログをストレージから削除するかどうかを指定します。 | 1.4.0 |
spark.history.fs.cleaner.interval | 1d |
spark.history.fs.cleaner.enabled=true の場合、ファイルシステムジョブ履歴クリーナーが削除するファイルの確認頻度を指定します。2つのうち少なくとも1つの条件が満たされた場合、ファイルが削除されます。まず、spark.history.fs.cleaner.maxAge より古い場合は削除されます。ファイルの数が spark.history.fs.cleaner.maxNum より多い場合にも削除されます。Spark は最も古い試行時間の順にアプリケーションから完了した試行をクリーンアップしようとします。
|
1.4.0 |
spark.history.fs.cleaner.maxAge | 7d |
spark.history.fs.cleaner.enabled=true の場合、これより古いジョブ履歴ファイルは、ファイルシステム履歴クリーナーの実行時に削除されます。
|
1.4.0 |
spark.history.fs.cleaner.maxNum | Int.MaxValue |
spark.history.fs.cleaner.enabled=true の場合、イベントログディレクトリ内のファイルの最大数を指定します。Spark は完了した試行ログをクリーンアップして、ログディレクトリをこの制限内に維持しようとします。これは、HDFS の `dfs.namenode.fs-limits.max-directory-items` のような基本的なファイルシステムの制限よりも小さくする必要があります。
|
3.0.0 |
spark.history.fs.endEventReparseChunkSize | 1m | 最後のイベントを見つけるためにログファイルの最後にどれだけのバイトをパースするか。これはイベントログファイルの不要な部分をスキップすることでアプリケーションのリスティングの生成をスピードアップするために使われます。この設定を0に設定することで無効にすることができます。 | 2.4.0 |
spark.history.fs.inProgressOptimization.enabled | true | 進行中のログの最適化された処理を有効にする。このオプションは進行中としてリスト化されたイベントログのリネームに失敗した完了したアプリケーションをそのままにするかもしれません。 | 2.4.0 |
spark.history.fs.driverlog.cleaner.enabled | spark.history.fs.cleaner.enabled |
履歴サーバが定期的にドライバーログをストレージから削除するかどうかを指定する。 | 3.0.0 |
spark.history.fs.driverlog.cleaner.interval | spark.history.fs.cleaner.interval |
spark.history.fs.driverlog.cleaner.enabled=true の場合、ファイルシステムドライバログクリーナーが削除するファイルの確認頻度を指定します。もしファイルがspark.history.fs.driverlog.cleaner.maxAge より古い場合は、それらは削除されます。
|
3.0.0 |
spark.history.fs.driverlog.cleaner.maxAge | spark.history.fs.cleaner.maxAge |
spark.history.fs.driverlog.cleaner.enabled=true の場合、これより古いドライバログファイルは、ドライバログクリーナーの実行時に削除されます。
|
3.0.0 |
spark.history.fs.numReplayThreads | 利用可能なコアの25% | 履歴サーバによって使われイベントログを処理するためのスレッドの数。 | 2.0.0 |
spark.history.store.maxDiskUsage | 10g | キャッシュ アプリケーション履歴情報が格納されているローカルディレクトリのディスク使用量の最大値。 | 2.3.0 |
spark.history.store.path | (none) | アプリケーションの履歴データをキャッシュするローカルディレクトリ。設定された場合、履歴サーバはアプリケーションのデータをメモリ内に維持する代わりにディスクに格納するでしょう。ディスクに書き込まれたデータは履歴サーバが再起動した時に再利用されるでしょう。 | 2.3.0 |
spark.history.custom.executor.log.url | (none) | 履歴サーバでクラスタマネージャーのアプリケーションログの URL を使う代わりに、外部のログサービスをサポートするための独自の spark executor ログの URL を指定します。Spark はクラスタマネージャーで変更できるパターンを介して、幾つかのパス変数をサポートします。クラスタマネージャーでどのパターンがサポートされるかを調べるには、ドキュメントがある場合はドキュメントを調べてください。この設定はライブアプリケーションには影響せず、履歴サーバにのみ影響します。 現在のところ、YARN モードのみがこの設定をサポートします。 | 3.0.0 |
spark.history.custom.executor.log.url.applyIncompleteApplication | false | カスタム spark executor ログの URL を不完全なアプリケーションにも適用するかどうかを指定します。実行中のアプリケーションの executor ログを元のログの URL として指定する必要がある場合は、これを `false` に設定します。不完全なアプリケーションには、正常にシャットダウンしなかったアプリケーションが含まれる場合があることに注意してください。これが `true` に設定されていても、この設定はライブアプリケーションには影響せず、履歴サーバにのみ影響します。 | 3.0.0 |
spark.history.fs.eventLog.rolling.maxFilesToRetain | Int.MaxValue |
非圧縮として保持されるイベントログファイルの最大数。デフォルトでは、全てのイベントログファイルが保持されます。技術的な理由により、最小値は 1 です。 詳細については、"古いイベントログファイルの圧縮の適用" のセクションを読んでください。 |
3.0.0 |
spark.history.store.hybridStore.enabled | false | イベントログのパース時にHybridStoreをストアとして使うかどうか。HybridStoreはデータを最初にインメモリストアに書き込み、インメモリストアへの書き込みが完了した後でデータをディスクストアにダンプするバックグランドスレッドを持ちます。 | 3.1.0 |
spark.history.store.hybridStore.maxMemoryUsage | 2g | HybridStoreを作成するために使える最大メモリ空間。HybridStoreはヒープメモリを共有利用するため、HybridStoreが有効にされた場合はヒープメモリをSHSのメモリオプションを介して増やす必要があります。 | 3.1.0 |
これら全てのUI内でテーブルはヘッダーをクリックすることでソート可能です。遅いタスク、データのゆがみなどを識別するのを容易にします。
注意
-
履歴サーバは完了および未完了のSparkジョブを表示します。障害の後でアプリケーションが複数回の試行をする場合、外に出ていく全ての未完了の試行あるいは最後に成功した試行と共に、失敗した試行が表示されるでしょう。
-
未完了のアプリケーションは断続的に更新されるだけです。更新の間の時間は変更されたファイルのチェックの間隔によって定義されます(
spark.history.fs.update.interval
)。大きなクラスターほど更新の間隔は大きな値に設定されるかも知れません。実行中のアプリケーションを見る方法は実際には自身のweb UIを見ることです。 -
完了したと自身を登録しないで終了したアプリケーションは、未完了としてリスト化されるでしょう - それらがもう実行していないとしてもです。これはアプリケーションがクラッシュした場合に起こりえます。
-
Sparkジョブの完了の信号を送る1つの方法はSparkのContextを明示的に終了することです (
sc.stop()
)。あるいは、Pythonではwith SparkContext() as sc:
を使ってSparkのContextのsteupとtear downを処理するために構築します。
REST API
UI内でマトリックスを見ることに加えて、それらはJSONとして利用可能です。これにより開発者は新しい表示方法およびSparkのための監視ツールを簡単に作成することができます。JSONは実行中のアプリケーションおよび履歴サーバ内の両方で利用可能です。エンドポイントは /api/v1
にマウントされます。例えば、履歴サーバの場合、それらは一般的にhttp://<server-url>:18080/api/v1
でアクセスすることができ、実行中のアプリケーションの場合http://localhost:4040/api/v1
で利用可能です。
APIの中では、アプリケーションはアプリケーションID [app-id]
によって参照されます。YARN上で実行中の場合、各アプリケーションは複数の試行を持つかも知れませんが、クライアントモードでのアプリケーションと異なり、クラスタモードでのアプリケーションの試行IDです。YARNクラスタモードのアプリケーションはそれらの [attempt-id]
によって識別することができます。以下でリスト化されているAPIでは、[app-id]
は実際には[base-app-id]/[attempt-id]
で、[base-app-id]
はYARNのアプリケーションIDです。
エンドポイント | 意味 |
---|---|
/applications |
全てのアプリケーションのリスト。?status=[completed|running] 選択された状態のアプリケーションのみをリスト化します。?minDate=[date] リスト化する最も早い開始日付/時間。?maxDate=[date] リスト化するもっとも最近の開始日付/時間。?minEndDate=[date] リスト化する最も早い終了日付/時間。?maxEndDate=[date] リスト化する最も最近の終了日付/時間。?limit=[limit] リスト化するアプリケーションの数を制限する。例: ?minDate=2015-02-10 ?minDate=2015-02-03T16:42:40.000GMT ?maxDate=2015-02-11T20:41:30.000GMT ?minEndDate=2015-02-12 ?minEndDate=2015-02-12T09:15:10.000GMT ?maxEndDate=2015-02-14T16:30:45.000GMT ?limit=10 |
/applications/[app-id]/jobs |
指定されたアプリケーションの全てのジョブのリスト。?status=[running|succeeded|failed|unknown] 特定の状態のジョブのみをリスト化します。
|
/applications/[app-id]/jobs/[job-id] |
指定されたジョブの詳細。 |
/applications/[app-id]/stages |
指定されたアプリケーションの全てのステージのリスト。?status=[active|complete|pending|failed] は指定された状態のステージのみをリスト化します。?details=true はタスクデータを持つ全てのステージをリスト化します。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] は指定されたタスク状態を持つタスクのみをリスト化します。クエリパラメータ taskStatus はdetails=true の時のみ効果があります。指定された全てのタスク状態に一致する全てのタスクを返す ?details=true&taskStatus=SUCCESS&taskStatus=FAILED のような複数のtaskStatus もサポートします。?withSummaries=true は、タスクメトリクスの分布とexecutorメトリクスの分布を含むステージをリスト化します。?quantiles=0.0,0.25,0.5,0.75,1.0 は指定された分位のメトリクスを要約します。クエリパラメータ quantiles はwithSummaries=true の時のみ効果があります。デフォルト値は0.0,0.25,0.5,0.75,1.0 です。
|
/applications/[app-id]/stages/[stage-id] |
指定されたステージの全ての試行のリスト。?details=true は指定されたステージのタスクデータを持つ全ての試行をリスト化します。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] は指定されたタスク状態を持つタスクのみをリスト化します。クエリパラメータ taskStatus はdetails=true の時のみ効果があります。指定された全てのタスク状態に一致する全てのタスクを返す ?details=true&taskStatus=SUCCESS&taskStatus=FAILED のような複数のtaskStatus もサポートします。?withSummaries=true は、各試行のタスクメトリクスの分布とexecutorメトリクスの分布を含むステージをリスト化します。?quantiles=0.0,0.25,0.5,0.75,1.0 は指定された分位のメトリクスを要約します。クエリパラメータ quantiles はwithSummaries=true の時のみ効果があります。デフォルト値は0.0,0.25,0.5,0.75,1.0 です。例: ?details=true ?details=true&taskStatus=RUNNING ?withSummaries=true ?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] |
指定されたステージの試行についての詳細。?details=true は指定されたステージの試行の全てのタスクデータをリスト化します。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] は指定されたタスク状態を持つタスクのみをリスト化します。クエリパラメータ taskStatus はdetails=true の時のみ効果があります。指定された全てのタスク状態に一致する全てのタスクを返す ?details=true&taskStatus=SUCCESS&taskStatus=FAILED のような複数のtaskStatus もサポートします。?withSummaries=true は、指定されたステージの試行のタスクメトリクスの分布とexecutorメトリクスの分布をリスト化します。?quantiles=0.0,0.25,0.5,0.75,1.0 は指定された分位のメトリクスを要約します。クエリパラメータ quantiles はwithSummaries=true の時のみ効果があります。デフォルト値は0.0,0.25,0.5,0.75,1.0 です。例: ?details=true ?details=true&taskStatus=RUNNING ?withSummaries=true ?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary |
指定されたステージの試行の中の全てのタスクのマトリックスの概要。?quantiles 指定された変位内の測定基準を要約します。例: ?quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList |
指定されたステージの試行の全てのタスクのリスト。?offset=[offset]&length=[len] 指定された範囲のタスクをリスト化します。?sortBy=[runtime|-runtime] タスクをソートします。?status=[running|success|killed|failed|unknown] はその状態のタスクのみをリスト化します。例: ?offset=10&length=50&sortBy=runtime&status=running
|
/applications/[app-id]/executors |
指定されたアプリケーションの全ての動作中のexecutorのリスト。 |
/applications/[app-id]/executors/[executor-id]/threads |
指定されたアクティブなexecutorの中で実行中の全てのスレッドのトレースを積み重ねます。履歴サーバを介しては利用できません。 |
/applications/[app-id]/allexecutors |
指定されたアプリケーションの全て(動作中および停止)のexecutorのリスト。 |
/applications/[app-id]/storage/rdd |
指定されたアプリケーションのための保存されたRDDのリスト。 |
/applications/[app-id]/storage/rdd/[rdd-id] |
指定されたRDDのストレージ状態の詳細。 |
/applications/[base-app-id]/logs |
指定されたアプリケーションの全ての試みについてのイベントログをzipファイルのファイルとしてダウンロードします。 |
/applications/[base-app-id]/[attempt-id]/logs |
指定されたアプリケーションの試行のイベントログをzipファイルとしてダウンロードします。 |
/applications/[app-id]/streaming/statistics |
ストリーミング コンテキストのための統計。 |
/applications/[app-id]/streaming/receivers |
全てのストリーミング レシーバーのリスト。 |
/applications/[app-id]/streaming/receivers/[stream-id] |
指定されたレシーバーの詳細。 |
/applications/[app-id]/streaming/batches |
全ての保持されたバッチのリスト。 |
/applications/[app-id]/streaming/batches/[batch-id] |
指定されたバッチの詳細。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations |
指定されたバッチの全ての出力操作のリスト。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] |
指定された操作と指定されたバッチの称しあ。 |
/applications/[app-id]/sql |
指定されたアプリケーションの全てのクエリをリスト化します。?details=[true (default) | false] はSparkプランノードの詳細をリスト化/隠します。?planDescription=[true (default) | false] は、物理プランのサイズが大きい時にオンデマンドで物理プランの説明 を有効/無効にします。?offset=[offset]&length=[len] は指定された範囲のクエリをリスト化します。
|
/applications/[app-id]/sql/[execution-id] |
指定されたクエリの詳細?details=[true (default) | false] は指定されたクエリの詳細に加えてメトリクスの詳細をリスト化/隠します。?planDescription=[true (default) | false] は、物理プランのサイズが大きい時にオンデマンドで指定されたクエリについての物理プランの説明 を有効/無効にします。
|
/applications/[app-id]/environment |
指定されたアプリケーションの環境の詳細。 |
/version |
現在のsparkバージョンを取得します。 |
取り出すことができるジョブとステージの数はスタンドアローンのSpark UIの保持機構によって制限されます; "spark.ui.retainedJobs"
はジョブ上でトリガーするガベージコレクションの閾値を定義し、spark.ui.retainedStages
はステージ上のそれを定義します。ガベージコレクションは再生によって起こることに注意してください: それらの値を増加し履歴サーバを再起動することで、より多くのエントリを取り出すことができます。
Executorのタスクのメトリクス
REST APIはタスクの実行を保証しながらSpark executorによって収集されたタスクのメトリクスの値を公開します。このメソッドはパフォーマンスのトラブルシューティングとワークロードの特徴付けに使うことができます。簡単な説明と利用可能なメトリクスのリスト:
Spark Executor のタスクのメトリクス名 | 簡単な説明 |
---|---|
executorRunTime | executorがこのタスクを実行するために費やす時間。これはシャッフルデータを取得する時間を含みます。値はミリ秒で表現されます。 |
executorCpuTime | executorがこのタスクを実行するために費やすCPU時間。これはシャッフルデータを取得する時間を含みます。値はナノ秒で表現されます。 |
executorDeserializeTime | このタスクをデシリアライズするために費やす時間。値はミリ秒で表現されます。 |
executorDeserializeCpuTime | executor上でこのタスクをデシリアライズするために掛かるCPU時間。値はナノ秒で表現されます。 |
resultSize | このタスクがTaskResultとしてドライバに送り返されるバイト数。 |
jvmGCTime | JVMがこのタスクを実行する間にガベージコレクションで費やす時間。値はミリ秒で表現されます。 |
resultSerializationTime | タスクの結果をシリアライズするために費やす時間。値はミリ秒で表現されます。 |
memoryBytesSpilled | このタスクによって浪費されたインメモリのバイト数。 |
diskBytesSpilled | このタスクによって浪費されたディスク上のバイト数。 |
peakExecutionMemory | シャッフル、集約およびjoin中に作成された内部データ構造によって使われるピークメモリ。このaccumulatorの値はこのタスク内で作成されたそのような全てのデータ構造に渡るピークサイズのおおよその合計でなければなりません。SQLジョブについては、これは全ての安全では無いオペレータと ExternalSort のみを追跡します。 |
inputMetrics.* | org.apache.spark.rdd.HadoopRDD または永続データから読み込まれるデータに関係するメトリクス。 |
.bytesRead | 読み込まれた総バイト数。 |
.recordsRead | 読み込まれた総レコード数。 |
outputMetrics.* | 外部(例えば、分散ファイルシステム)へのデータ書き込みに関係するメトリクス。出力を持つタスクのみで定義されます。 |
.bytesWritten | 書き込まれた総バイト数。 |
.recordsWritten | 書き込まれた総レコード数。 |
shuffleReadMetrics.* | シャッフル読み込みオペレーションに関係するメトリクス。 |
.recordsRead | シャッフル操作で読み込まれたレコード数。 |
.remoteBlocksFetched | シャッフル操作で取得されたリモートブロックの数。 |
.localBlocksFetched | シャッフルオペレーションで取得された(リモートのexecutorからの読み込みに対して)ローカルブロックの数 |
.totalBlocksFetched | シャッフル操作で取得されたブロックの数 (ローカルおよびリモートの両方) |
.remoteBytesRead | シャッフル操作で読み込まれたリモートバイトの数 |
.localBytesRead | (リモートexecutorからの読み込みに対して)ローカルディスクからのシャッフルオペレーションの読み込みのバイト数 |
.totalBytesRead | シャッフル操作で読み込まれたバイト数 (ローカルおよびリモートの両方) |
.remoteBytesReadToDisk | シャッフル操作でディスクに読み込まれたリモートバイトの数メモリへの読み込みに対して、シャッフルの読み込みオペレーションで大きなブロックがディスクへ取得されます。これはデフォルトの挙動です。 |
.fetchWaitTime | タスクがリモートのシャッフルブロックを待つ時間。これはシャッフル入力データでブロックする時間のみを含みます。例えば、もしタスクが処理中のブロックAをまだ完了していない間にブロックBが取得される場合、ブロックB上でブロックしているとは見なされません。値はミリ秒で表現されます。 |
shuffleWriteMetrics.* | シャッフルデータを書き込む操作に関係するメトリクス。 |
.bytesWritten | シャッフル操作で書き込まれるバイト数 |
.recordsWritten | シャッフル操作で書き込まれるレコードの数 |
.writeTime | ディスクあるいはバッファキャッシュへの書き込みのブロックで費やされる時間。値はナノ秒で表現されます。 |
Executor のメトリクス
executor レべルのメトリクスは Heartbeat の一部として各 executor からドライバに送信され、JVM ヒープメモリや GC 情報などの executor 自身のパフォーマンスメトリクスを説明します。executor メトリクスの値と executor ごとの測定されたメモリピーク値は、REST API を使って JSON 形式および Prometheus 形式で公開されます。JSON のエンドポイントは、以下で公開されます: /applications/[app-id]/executors
。そして、Prometheusのエンドポイントは以下で公開されます: /metrics/executors/prometheus
。The Prometheus endpoint is conditional to a configuration parameter: spark.ui.prometheus.enabled=true
(the default is false
). さらに、spark.eventLog.logStageExecutorMetrics
が true の場合、executor メモリメトリクスの集計されたステージごとのピーク値がイベントログに書き込まれます。
executorのメモリメトリクスもDropwizard metrics libraryに基づくSparkメトリクスシステムを介して公開されます。簡単な説明と利用可能なメトリクスのリスト:
executor レベルメトリクス名 | 簡単な説明 |
---|---|
rddBlocks | この executor のブロックマネージャーの RDD ブロック。 |
memoryUsed | この executor で使われるストレージメモリ。 |
diskUsed | この executor で RDD ストレージのために使われるディスク容量。 |
totalCores | この executor で利用可能なコア数。 |
maxTasks | この executor で同時に実行できるタスクの最大数。 |
activeTasks | 現在実行中のタスクの数。 |
failedTasks | この executor で失敗したタスクの数。 |
completedTasks | この executor で完了したタスクの数。 |
totalTasks | この executor のタスク(実行中、失敗、完了)の総数。 |
totalDuration | この executor で JVM がタスクの実行に費やした経過時間。値はミリ秒で表現されます。 |
totalGCTime | この executor で JVM がガベージコレクションに費やした経過時間の合計。値はミリ秒で表現されます。 |
totalInputBytes | この executor の入力バイトの合計。 |
totalShuffleRead | この executor のシャッフル読み込みバイトの合計。 |
totalShuffleWrite | この executor のシャッフル書き込みバイトの合計。 |
maxMemory | ストレージに利用可能なメモリの総量のバイト数。 |
memoryMetrics.* | メモリメトリクスの現在の値: |
.usedOnHeapStorageMemory | 現在ストレージ用に使用されているオンヒープメモリのバイト数。 |
.usedOffHeapStorageMemory | 現在ストレージ用に使用されているオフヒープメモリのバイト数。 |
.totalOnHeapStorageMemory | ストレージに利用可能なオンヒープメモリの総バイト数。この量は、MemoryManager の実装によって時間とともに変化するかもしれません。 |
.totalOffHeapStorageMemory | ストレージに利用可能なオフヒープメモリの総バイト数。この量は、MemoryManager の実装によって時間とともに変化するかもしれません。 |
peakMemoryMetrics.* | メモリ(および GC)メトリクスのピーク値: |
.JVMHeapMemory | オブジェクト割り当てに使われるヒープのピークメモリ使用量。ヒープは1つ以上のメモリプールで構成されます。The used and committed size of the returned memory usage is the sum of those values of all heap memory pools whereas the init and max size of the returned memory usage represents the setting of the heap memory which may not be the sum of those of all heap memory pools. The amount of used memory in the returned memory usage is the amount of memory occupied by both live objects and garbage objects that have not been collected, if any. |
.JVMOffHeapMemory | Java 仮想マシーンによって使われる非ヒープメモリのピークメモリ使用量。非ヒープメモリは1つ以上のメモリプールから成ります。The used and committed size of the returned memory usage is the sum of those values of all non-heap memory pools whereas the init and max size of the returned memory usage represents the setting of the non-heap memory which may not be the sum of those of all non-heap memory pools. |
.OnHeapExecutionMemory | 使用中のオンヒープ実行メモリのバイト数。 |
.OffHeapExecutionMemory | 使用中のオフヒープ実行メモリのバイト数。 |
.OnHeapStorageMemory | 使用中のオンヒープストレージメモリのバイト数。 |
.OffHeapStorageMemory | 使用中のオフヒープストレージメモリのバイト数。 |
.OnHeapUnifiedMemory | オンヒープメモリのピーク (実行およびストレージ)。 |
.OffHeapUnifiedMemory | オフヒープメモリのピーク (実行およびストレージ)。 |
.DirectPoolMemory | JVM がダイレクトバッファプールに使っているピークメモリ (java.lang.management.BufferPoolMXBean ) |
.MappedPoolMemory | JVM がマップバッファプールに使っているピークメモリ (java.lang.management.BufferPoolMXBean ) |
.ProcessTreeJVMVMemory | 仮想メモリサイズのバイト数。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
.ProcessTreeJVMRSSMemory | 常駐セットサイズ: プロセスが実メモリに保持しているページ数。これは、テキスト、データ、またはスタック領域にカウントされるページです。これには、デマンドロードされていないページやスワップアウトされたページは含まれません。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
.ProcessTreePythonVMemory | Python の仮想メモリサイズのバイト数。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
.ProcessTreePythonRSSMemory | Python の常駐セットサイズ。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
.ProcessTreeOtherVMemory | 他の種類のプロセスの仮想メモリサイズのバイト数。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
.ProcessTreeOtherRSSMemory | 他の種類のプロセスの常駐セットサイズ。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
.MinorGCCount | マイナー GC カウントの合計。例えば、ガベージコレクタは Copy、PS Scavenge、ParNew、G1 Young Generation などのいずれかです。 |
.MinorGCTime | マイナー GC 経過時間の合計。値はミリ秒で表現されます。 |
.MajorGCCount | メジャー GC カウントの合計。例えば、ガベージコレクタは MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation などのいずれかです。 |
.MajorGCTime | メジャー GC 経過時間の合計。値はミリ秒で表現されます。 |
RSS と Vmem の計算は proc(5) に基づいています。
API バージョニング ポリシー
これらのエンドポイントはその上でアプリケーションを開発を容易にするために熱心にバージョン付けされています。特に、Sparkは以下を保証します:
- エンドポイントは一つのバージョンからは削除されることはないでしょう
- 各フィールドは全ての指定されたエンドポイントにおいて削除されないでしょう
- 新しいエンドポイントが追加されるかも知れません
- 新しいフィールドが既存のエンドポイントに追加されるかも知れません
- APIの新しいバージョンは将来別個のエンドポイント(例えば、
api/v2
)に追加されるかも知れません。新しいバージョンは後方互換性が必要とされません。 - APIのバージョンは削除されるかも知れませんが、新しいapiバージョンの少なくとも一つの既存のマイナーバージョンがあります。
実行中のアプリケーションのUIを検証する時でも、applications/[app-id]
の分割はたった一つのアプリケーションだけが利用可能だとしてもまだ必要です。例えば、実行中のアプリケーションのジョブのリストを見るには、http://localhost:4040/api/v1/applications/[app-id]/jobs
に行きます。これは両方のモードにおいてパスを矛盾が無い状態にします。
マトリックス
SparkはDropwizard Metrics Libraryに基づいた設定可能なマトリックスシステムを持っています。これにより、ユーザはSparkマトリックスにHTTP, JMX およびCSV ファイルを含む様々なsinkを伝えることができます。メトリクスは Spark コードベースに埋め込まれたソースによって生成されます。特定のアクティビティと Spark コンポーネントのインストルメンテーションを提供します。このマトリクスシステムは、Sparkが $SPARK_HOME/conf/metrics.properties
に存在すると期待する設定ファイルを通じて設定することができます。独自のファイルの場所はspark.metrics.conf
の設定プロパティによって指定することができます。設定ファイルを使う代わりに、接頭辞 spark.metrics.conf.
が付いた一連の設定パラメータを使うことができます。デフォルトでは、ドライバまたは executor のメトリクスに使われるルート名前空間は、spark.app.id
の値です。しかし、しばしば、ユーザはドライバーとexecutorについてアプリケーションを横断してメトリックスを追跡したいと思います。これはアプリケーションID(つまり、spark.app.id
) を使って行うのは、アプリケーションの各起動の度に変更するため、難しいです。そのようなユースケースについては、spark.metrics.namespace
設定プロパティを使ってメトリクスのレポートのために独自の名前空間を指定することができます。例えば、ユーザがメトリクスの名前空間をアプリケーションの名前に設定したい場合、spark.metrics.namespace
プロパティを ${spark.app.name}
のような値に設定することができます。この値はSparkによって適切に展開され、メトリックス システムのルートの名前空間として使われます。どのようなドライバおよびexecutorのメトリックスも spark.app.id
を接頭語に使いませんし、どのような spark.metrics.namespace
プロパティもそのようなメトリックスに影響しません。
SparkのマトリックスはSparkコンポーネントに対応する異なる インスタンス に分離されます。各インスタンスの中で、マトリックが伝える一連のsinkを設定することができます。現在のところ以下のインスタンスがサポートされています:
master
: Spark スタンドアローンマスタープロセス。applications
: 様々なアプリケーション上で報告するマスター内のコンポーネント。worker
: Spark スタンドアローン ワーカープロセス。executor
: Sparkの executor。driver
: Spark ドライバープロセス (SparkContextが生成されるプロセス)。shuffleService
: Sparkのシャッフルサービス。applicationMaster
: YARN上で実行する時のSparkアプリケーションマスタ。mesos_cluster
: Mesos で実行中の場合、Spark クラスタスケジューラ。
各インスタンスは0個以上のsinksに対して報告することができます。sinkはorg.apache.spark.metrics.sink
パッケージに含まれています。
ConsoleSink
: コンソールにマトリックス情報を記録します。CSVSink
: 定期的な間隔でマトリックスデータをCSVファイルにエクスポートします。JmxSink
: JMXコンソールで見るためにマトリックスを登録します。MetricsServlet
: マトリックスデータをJSONデータとして提供するために既存のSpark UI内にservletを追加する。PrometheusServlet
: (実験的) マトリックスデータを Prometheus 形式で提供するために既存の Spark UI 内に servlet を追加する。GraphiteSink
: マトリックスをGraphiteノードに送信する。Slf4jSink
: マトリックスをログエントリーとしてslf4jに送信する。StatsdSink
: メトリクスをStatsDノードに送信します。
Sparkはライセンスの制限のためにデフォルトのビルドに含まれていないGanglia sinkもサポートします:
GangliaSink
: マトリックスをGangliaノードあるいはマルチキャストグループに送信する。
GangliaSink
をインストールするには、Sparkのカスタムビルドを実行する必要があるでしょう。このライブラリを組み込むことでSparkパッケージにLGPLでライセンスされたコードが含まれることに注意してください。sbt ユーザに関しては、ビルドする前に SPARK_GANGLIA_LGPL
環境変数を設定してください。Mavenユーザに関しては、 -Pspark-ganglia-lgpl
プロファイルを有効にします。クラスタのSparkビルドを修正することに加えて、ユーザアプリケーションは spark-ganglia-lgpl
アーティファクトにリンクする必要があります。
メトリクス設定ファイルの構文と各シンクで利用可能なパラメータは、サンプル設定ファイル $SPARK_HOME/conf/metrics.properties.template
で定義されています。
メトリクス設定ファイルの代わりに Spark 設定パラメータを使う場合、関連するパラメータ名は接頭辞 spark.metrics.conf.
とそれに続く設定詳細で構成されます。つまり、パラメータは以下の形式をとります: spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]
。この例は、Graphite シンクの Spark 構成パラメータのリストを示します:
"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"
Spark メトリクス設定のデフォルト値は、以下の通りです:
"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"
追加のソースは、メトリクス設定ファイルあるいは設定パラメータ spark.metrics.conf.[component_name].source.jvm.class=[source_name]
を使って設定できます。現在のところ、JVM ソースが唯一利用可能なオプションのソースです。例えば、以下の設定パラメータは JVM ソースをアクティブにします: "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"
利用可能なメトリクスプロバイダのリスト
Spark によって使われるメトリクスには、ゲージ、カウンタ、ヒストグラム、メータ、タイマーなど、複数のタイプがあります。詳細は、Dropwizard ライブラリのドキュメントを見てください。以下のコンポーネントとメトリクスのリストは、コンポーネントインスタンスとソース名前空間ごとにグループ化された、使用可能なメトリクスについての名前と幾つかの詳細を報告します。Spark インスツルメンツで使われるメトリクスのもっとも一般的なものは、ゲージとカウンタです。カウンタは.count
サフィックスを持つものとして認識されます。タイマー、メータおよびヒストグラムはリストで注釈を付けられ、リストの残りの要素はゲージタイプのメトリクスです。メトリクスの大部分は親コンポーネントインスタンスが設定されるとすぐにアクティブになります。一部のメトリクスは追加の設定パラメータを介して有効にする必要もあります。詳細はリストの中で報告されます。
Component instance = Driver
This is the component with the largest amount of instrumented metrics
- namespace=BlockManager
- disk.diskSpaceUsed_MB
- memory.maxMem_MB
- memory.maxOffHeapMem_MB
- memory.maxOnHeapMem_MB
- memory.memUsed_MB
- memory.offHeapMemUsed_MB
- memory.onHeapMemUsed_MB
- memory.remainingMem_MB
- memory.remainingOffHeapMem_MB
- memory.remainingOnHeapMem_MB
- namespace=HiveExternalCatalog
- note: these metrics are conditional to a configuration parameter:
spark.metrics.staticSources.enabled
(default is true) - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- note: these metrics are conditional to a configuration parameter:
- namespace=CodeGenerator
- note: these metrics are conditional to a configuration parameter:
spark.metrics.staticSources.enabled
(default is true) - compilationTime (histogram)
- generatedClassSize (histogram)
- generatedMethodSize (histogram)
- sourceCodeSize (histogram)
- note: these metrics are conditional to a configuration parameter:
- namespace=DAGScheduler
- job.activeJobs
- job.allJobs
- messageProcessingTime (timer)
- stage.failedStages
- stage.runningStages
- stage.waitingStages
- namespace=LiveListenerBus
- listenerProcessingTime.org.apache.spark.HeartbeatReceiver (timer)
- listenerProcessingTime.org.apache.spark.scheduler.EventLoggingListener (timer)
- listenerProcessingTime.org.apache.spark.status.AppStatusListener (timer)
- numEventsPosted.count
- queue.appStatus.listenerProcessingTime (timer)
- queue.appStatus.numDroppedEvents.count
- queue.appStatus.size
- queue.eventLog.listenerProcessingTime (timer)
- queue.eventLog.numDroppedEvents.count
- queue.eventLog.size
- queue.executorManagement.listenerProcessingTime (timer)
- namespace=appStatus (all metrics of type=counter)
- note: Introduced in Spark 3.0. Conditional to a configuration parameter:
spark.metrics.appStatusSource.enabled
(default is false) - stages.failedStages.count
- stages.skippedStages.count
- stages.completedStages.count
- tasks.blackListedExecutors.count // deprecated use excludedExecutors instead
- tasks.excludedExecutors.count
- tasks.completedTasks.count
- tasks.failedTasks.count
- tasks.killedTasks.count
- tasks.skippedTasks.count
- tasks.unblackListedExecutors.count // deprecated use unexcludedExecutors instead
- tasks.unexcludedExecutors.count
- jobs.succeededJobs
- jobs.failedJobs
- jobDuration
- note: Introduced in Spark 3.0. Conditional to a configuration parameter:
- namespace=AccumulatorSource
- note: User-configurable sources to attach accumulators to metric system
- DoubleAccumulatorSource
- LongAccumulatorSource
- namespace=spark.streaming
- 注意: これはSpark 構造化ストリーミングのみに適用されます。Conditional to a configuration parameter:
spark.sql.streaming.metricsEnabled=true
(default is false) - eventTime-watermark
- inputRate-total
- latency
- processingRate-total
- states-rowsTotal
- states-usedBytes
- 注意: これはSpark 構造化ストリーミングのみに適用されます。Conditional to a configuration parameter:
- namespace=JVMCPU
- jvmCpuTime
- namespace=executor
- 注意: これらのメトリクスはローカルモードのドライバのみで利用可能です。
- この名前空間で利用可能なメトリクスの完全なリストは、executorコンポーネントインスタンスの対応するエントリで見つかります。
- namespace=ExecutorMetrics
- note: these metrics are conditional to a configuration parameter:
spark.metrics.executorMetricsSource.enabled
(default is true) - このソースはメモリに関係するメトリクスを含みます。この名前空間で利用可能なメトリクスの完全なリストは、executorコンポーネントインスタンスの対応するエントリで見つかります。
- note: these metrics are conditional to a configuration parameter:
- namespace=ExecutorAllocationManager
- 注意: これらのメトリクスは動的割り当てを使っている場合のみ発行されます。Conditional to a configuration parameter
spark.dynamicAllocation.enabled
(default is false) - executors.numberExecutorsToAdd
- executors.numberExecutorsPendingToRemove
- executors.numberAllExecutors
- executors.numberTargetExecutors
- executors.numberMaxNeededExecutors
- executors.numberExecutorsGracefullyDecommissioned.count
- executors.numberExecutorsDecommissionUnfinished.count
- executors.numberExecutorsExitedUnexpectedly.count
- executors.numberExecutorsKilledByDriver.count
- 注意: これらのメトリクスは動的割り当てを使っている場合のみ発行されます。Conditional to a configuration parameter
- namespace=plugin.<Plugin Class Name>
- Optional namespace(s). この名前空間内のメトリクスはユーザ定義のコードで定義され、SparkプラグインAPIを使って設定されます。独自のプラグインをSparkにロードする方法については、以下の“Advanced Instrumentation”を見てください。
Component instance = Executor
これらのメトリクスはSpark executorによって公開されます。
- namespace=executor (metrics are of type counter or gauge)
- notes:
spark.executor.metrics.fileSystemSchemes
(default:file,hdfs
) determines the exposed file system metrics.
- bytesRead.count
- bytesWritten.count
- cpuTime.count
- deserializeCpuTime.count
- deserializeTime.count
- diskBytesSpilled.count
- filesystem.file.largeRead_ops
- filesystem.file.read_bytes
- filesystem.file.read_ops
- filesystem.file.write_bytes
- filesystem.file.write_ops
- filesystem.hdfs.largeRead_ops
- filesystem.hdfs.read_bytes
- filesystem.hdfs.read_ops
- filesystem.hdfs.write_bytes
- filesystem.hdfs.write_ops
- jvmGCTime.count
- memoryBytesSpilled.count
- recordsRead.count
- recordsWritten.count
- resultSerializationTime.count
- resultSize.count
- runTime.count
- shuffleBytesWritten.count
- shuffleFetchWaitTime.count
- shuffleLocalBlocksFetched.count
- shuffleLocalBytesRead.count
- shuffleRecordsRead.count
- shuffleRecordsWritten.count
- shuffleRemoteBlocksFetched.count
- shuffleRemoteBytesRead.count
- shuffleRemoteBytesReadToDisk.count
- shuffleTotalBytesRead.count
- shuffleWriteTime.count
- succeededTasks.count
- threadpool.activeTasks
- threadpool.completeTasks
- threadpool.currentPool_size
- threadpool.maxPool_size
- threadpool.startedTasks
- notes:
- namespace=ExecutorMetrics
- notes:
- These metrics are conditional to a configuration parameter:
spark.metrics.executorMetricsSource.enabled
(default value is true) - ExecutorMetrics are updated as part of heartbeat processes scheduled
for the executors and for the driver at regular intervals:
spark.executor.heartbeatInterval
(default value is 10 seconds) - An optional faster polling mechanism is available for executor memory metrics,
it can be activated by setting a polling interval (in milliseconds) using the configuration parameter
spark.executor.metrics.pollingInterval
- These metrics are conditional to a configuration parameter:
- JVMHeapMemory
- JVMOffHeapMemory
- OnHeapExecutionMemory
- OnHeapStorageMemory
- OnHeapUnifiedMemory
- OffHeapExecutionMemory
- OffHeapStorageMemory
- OffHeapUnifiedMemory
- DirectPoolMemory
- MappedPoolMemory
- MinorGCCount
- MinorGCTime
- MajorGCCount
- MajorGCTime
- “ProcessTree*” metric counters:
- ProcessTreeJVMVMemory
- ProcessTreeJVMRSSMemory
- ProcessTreePythonVMemory
- ProcessTreePythonRSSMemory
- ProcessTreeOtherVMemory
- ProcessTreeOtherRSSMemory
- note: “ProcessTree” metrics are collected only under certain conditions.
The conditions are the logical AND of the following:
/proc
filesystem exists,spark.executor.processTreeMetrics.enabled=true
. “ProcessTree” metrics report 0 when those conditions are not met.
- notes:
- namespace=JVMCPU
- jvmCpuTime
- namespace=NettyBlockTransfer
- shuffle-client.usedDirectMemory
- shuffle-client.usedHeapMemory
- shuffle-server.usedDirectMemory
- shuffle-server.usedHeapMemory
- namespace=HiveExternalCatalog
- note: these metrics are conditional to a configuration parameter:
spark.metrics.staticSources.enabled
(default is true) - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- note: these metrics are conditional to a configuration parameter:
- namespace=CodeGenerator
- note: these metrics are conditional to a configuration parameter:
spark.metrics.staticSources.enabled
(default is true) - compilationTime (histogram)
- generatedClassSize (histogram)
- generatedMethodSize (histogram)
- sourceCodeSize (histogram)
- note: these metrics are conditional to a configuration parameter:
- namespace=plugin.<Plugin Class Name>
- Optional namespace(s). この名前空間内のメトリクスはユーザ定義のコードで定義され、SparkプラグインAPIを使って設定されます。独自のプラグインをSparkにロードする方法については、以下の“Advanced Instrumentation”を見てください。
Source = JVM Source
注意:
- Activate this source by setting the relevant
metrics.properties
file entry or the configuration parameter:spark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
- These metrics are conditional to a configuration parameter:
spark.metrics.staticSources.enabled
(default is true) - This source is available for driver and executor instances and is also available for other instances.
- This source provides information on JVM metrics using the Dropwizard/Codahale Metric Sets for JVM instrumentation and in particular the metric sets BufferPoolMetricSet, GarbageCollectorMetricSet and MemoryUsageGaugeSet.
Component instance = applicationMaster
Note: applies when running on YARN
- numContainersPendingAllocate
- numExecutorsFailed
- numExecutorsRunning
- numLocalityAwareTasks
- numReleasedContainers
Component instance = mesos_cluster
Note: applies when running on mesos
- waitingDrivers
- launchedDrivers
- retryDrivers
Component instance = master
Note: applies when running in Spark standalone as master
- ワーカー
- aliveWorkers
- apps
- waitingApps
Component instance = ApplicationSource
Note: applies when running in Spark standalone as master
- 状態
- runtime_ms
- cores
Component instance = worker
Note: applies when running in Spark standalone as worker
- executors
- coresUsed
- memUsed_MB
- coresFree
- memFree_MB
Component instance = shuffleService
Note: applies to the shuffle service
- blockTransferRate (meter) - rate of blocks being transferred
- blockTransferMessageRate (meter) - rate of block transfer messages, i.e. if batch fetches are enabled, this represents number of batches rather than number of blocks
- blockTransferRateBytes (meter)
- blockTransferAvgTime_1min (gauge - 1-minute moving average)
- numActiveConnections.count
- numRegisteredConnections.count
- numCaughtExceptions.count
- openBlockRequestLatencyMillis (histogram)
- registerExecutorRequestLatencyMillis (histogram)
- registeredExecutorsSize
- shuffle-server.usedDirectMemory
- shuffle-server.usedHeapMemory
上級の計測器
Sparkのジョブのパフォーマンスをプロファイルするために幾つかの外部ツールを使用することができます。
- Gangliaのようなクラスタ全体の監視ツールはクラスタ全体の使用率やリソースのボトルネックに関する識見を提供することができます。例えば、Gangliaダッシュボードは特定の作業がディスク、ネットワーク、あるいはCPUに束縛しているかどうかを素早く明らかにすることができます。
- dstat, iostat および iotopのようなOSのプロファイルツールは各ノード上の微小なプロファイリングを提供することができます。
- スタックトレースを提供するための
jstack
、ヒープのダンプを作成するためのjmap
、時系列の統計をレポートするためのjstat
および 様々なJVMプロパティを視覚的に調べるためのjconsole
のような JVMユーティリティはJVM内部に慣れるために役に立つでしょう。
Spark はプラグイン API も提供するため、カスタムインストルメンテーションコードを Spark アプリケーションに追加することができます。プラグインを Spark にロードするために2つの設定キーがあります:
spark.plugins
spark.plugins.defaultList
どちらも、org.apache.spark.api.plugin.SparkPlugin
インタフェースを実装するクラス名のカンマ区切りのリストを取ります。2つの名前が存在するため、1つのリストを Spark のデフォルト設定ファイルに配置できるため、ユーザは設定ファイルのリストを上書きせずにコマンドラインから他のプラグインを簡単に追加することができます。重複するプラグインは無視されます。