This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
メトリクス #
Flinkは、メトリクスを収集し、外部システムに公開するメトリックシステムを公開します。
メトリクスの登録 #
You can access the metric system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup()
.
This method returns a MetricGroup
object on which you can create and register new metrics.
メトリクスの型 #
Flink supports Counters
, Gauges
, Histograms
and Meters
.
カウンター #
カウンター
は何かを数えるために使われます。The current value can be in- or decremented using inc()/inc(long n)
or dec()/dec(long n)
.
You can create and register a Counter
by calling counter(String name)
on a MetricGroup
.
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter")
}
override def map(value: String): String = {
counter.inc()
値
}
}
class MyMapper(MapFunction):
def __init__(self):
self.counter = None
def open(self, runtime_context: RuntimeContext):
self.counter = runtime_context \
.get_metrics_group() \
.counter("my_counter")
def map(self, value: str):
self.counter.inc()
return value
Alternatively you can also use your own Counter
implementation:
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter())
}
override def map(value: String): String = {
counter.inc()
値
}
}
また、Python APIではサポートされません。
ゲージ #
A Gauge
provides a value of any type on demand. In order to use a Gauge
you must first create a class that implements the org.apache.flink.metrics.Gauge
interface.
帰り値の型についての制限はありません。
You can register a gauge by calling gauge(String name, Gauge gauge)
on a MetricGroup
.
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
@Override
public String map(String value) throws Exception {
valueToExpose++;
return value;
}
}
new class MyMapper extends RichMapFunction[String,String] {
@transient private var valueToExpose = 0
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
override def map(value: String): String = {
valueToExpose += 1
値
}
}
class MyMapper(MapFunction):
def __init__(self):
self.value_to_expose = 0
def open(self, runtime_context: RuntimeContext):
runtime_context \
.get_metrics_group() \
.gauge("my_gauge", lambda: self.value_to_expose)
def map(self, value: str):
self.value_to_expose += 1
return value
Note that reporters will turn the exposed object into a String
, which means that a meaningful toString()
implementation is required.
ヒストグラム #
A Histogram
measures the distribution of long values.
You can register one by calling histogram(String name, Histogram histogram)
on a MetricGroup
.
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var histogram: Histogram = _
override def open(parameters: Configuration): Unit = {
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram())
}
override def map(value: Long): Long = {
histogram.update(value)
値
}
}
また、Python APIではサポートされません。
Flink does not provide a default implementation for Histogram
, but offers a
Wrapper
that allows usage of Codahale/DropWizard histograms.
To use this wrapper add the following dependency in your pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.19-SNAPSHOT</version>
</dependency>
そして、以下のように Codahale/DropWizard ヒストグラムを登録することができます。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
class MyMapper extends RichMapFunction[Long, Long] {
@transient private var histogram: Histogram = _
override def open(config: Configuration): Unit = {
val dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
}
override def map(value: Long): Long = {
histogram.update(value)
値
}
}
また、Python APIではサポートされません。
メーター #
A Meter
measures an average throughput. An occurrence of an event can be registered with the markEvent()
method. Occurrence of multiple events at the same time can be registered with markEvent(long n)
method.
You can register a meter by calling meter(String name, Meter meter)
on a MetricGroup
.
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter = _
override def open(config: Configuration): Unit = {
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter())
}
override def map(value: Long): Long = {
meter.markEvent()
値
}
}
class MyMapperMeter(MapFunction):
def __init__(self):
self.meter = None
def open(self, runtime_context: RuntimeContext):
# an average rate of events per second over 120s, default is 60s.
self.meter = runtime_context
.get_metrics_group()
.meter("my_meter", time_span_in_seconds=120)
def map(self, value: str):
self.meter.mark_event()
return value
Flink offers a
Wrapper
that allows usage of Codahale/DropWizard meters.
To use this wrapper add the following dependency in your pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.19-SNAPSHOT</version>
</dependency>
以下のように Codahale/DropWizard meter を登録することができます:
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter = _
override def open(config: Configuration): Unit = {
val dropwizardMeter: com.codahale.metrics.Meter = new com.codahale.metrics.Meter()
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
}
override def map(value: Long): Long = {
meter.markEvent()
値
}
}
また、Python APIではサポートされません。
スコープ #
各メトリックは識別子とメトリックが報告されるキー-値のペアのセットが割り当てられます。
識別子は3つのコンポーネントに基づきます: メトリックを登録する時のユーザ定義名、任意のユーザ定義のスコープ、およびシステムが提供するスコープ。
For example, if A.B
is the system scope, C.D
the user scope and E
the name, then the identifier for the metric will be A.B.C.D.E
.
You can configure which delimiter to use for the identifier (default: .
) by setting the metrics.scope.delimiter
key in conf/flink-conf.yaml
.
ユーザ スコープ #
You can define a user scope by calling MetricGroup#addGroup(String name)
, MetricGroup#addGroup(int name)
or MetricGroup#addGroup(String key, String value)
.
These methods affect what MetricGroup#getMetricIdentifier
and MetricGroup#getScopeComponents
return.
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter")
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
counter = runtime_context \
.get_metric_group() \
.add_group("my_metrics") \
.counter("my_counter")
counter = runtime_context \
.get_metric_group() \
.add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
システム スコープ #
システムスコープはメトリックについてのコンテキスト情報を含みます。例えば、どのタスク内で登録されたか、あるいはタスクが何のジョブに所属しているか。
Which context information should be included can be configured by setting the following keys in conf/flink-conf.yaml
.
Each of these keys expect a format string that may contain constants (e.g. “taskmanager”) and variables (e.g. “<task_id>”) which will be replaced at runtime.
metrics.scope.jm
- Default: <host>.jobmanager
- ジョブマネージャーにスコープされていた全てのメトリックに適用されます。
metrics.scope.jm-job
- Default: <host>.jobmanager.<job_name>
- ジョブマネージャーとジョブにスコープされていた全てのメトリックに適用されます。
metrics.scope.tm
- Default: <host>.taskmanager.<tm_id>
- タスクマネージャーにスコープされていた全てのメトリックに適用されます。
metrics.scope.tm-job
- Default: <host>.taskmanager.<tm_id>.<job_name>
- タスクマネージャーおよびジョブにスコープされていた全てのメトリックに適用されます。
metrics.scope.task
- Default: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
- タスクにスコープされていた全てのメトリックに適用されます。
metrics.scope.operator
- Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
- オペレータにスコープされていた全てのメトリックに適用されます。
変数の数あるいは順番に制限はありません。変数は大文字小文字を区別します。
The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
タスク名を含めたいがタスクマネージャーの情報を省略したい場合は、以下の形式を指定することもできます:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
.
Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.
全ての変数のリスト #
- JobManager: <host>
- TaskManager: <host>, <tm_id>
- Job: <job_id>, <job_name>
- Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
- Operator: <operator_id>,<operator_name>, <subtask_index>
Important: For the Batch API, <operator_id> is always equal to <task_id>.
ユーザ変数 #
You can define a user variable by calling MetricGroup#addGroup(String key, String value)
.
This method affects what MetricGroup#getMetricIdentifier
, MetricGroup#getScopeComponents
and MetricGroup#getAllVariables()
returns.
重要: ユーザ変数はスコープ形式では使えません。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
Reporter #
For information on how to set up Flink’s metric reporters please take a look at the metric reporters documentation.
システム メトリクス #
デフォルトでは、Flinkは現在の状態の深い洞察を提供する幾つかのメトリクスを集めます。 この章はこれら全てのメトリクスのリファレンスです。
以下の表は一般的に5つのカラムを特色とします:
-
The “Scope” column describes which scope format is used to generate the system scope. For example, if the cell contains “Operator” then the scope format for “metrics.scope.operator” is used. If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.
-
The (optional)“Infix” column describes which infix is appended to the system scope.
-
The “Metrics” column lists the names of all metrics that are registered for the given scope and infix.
-
The “Description” column provides information as to what a given metric is measuring.
-
The “Type” column describes which metric type is used for the measurement.
Note that all dots in the infix/metric name columns are still subject to the “metrics.delimiter” setting.
従って、メトリックの識別子を推測するには:
- Take the scope-format based on the “Scope” column
- Append the value in the “Infix” column if present, and account for the “metrics.delimiter” setting
- メトリック名を追加する。
CPU #
スコープ | Infix | メトリクス | 説明 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.CPU | Load | JVMの最近のCPU使用率。 | ゲージ |
時間 | JVMによって使われたCPU時間。 | ゲージ |
メモリ #
The memory-related metrics require Oracle’s memory management (also included in OpenJDK’s Hotspot implementation) to be in place. Some metrics might not be exposed when using other JVM implementations (e.g. IBM’s J9).
スコープ | Infix | メトリクス | 説明 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Memory | Heap.Used | 現在使われているヒープメモリの量(バイト)。 | ゲージ |
Heap.Committed | JVMに利用可能だと保証されているヒープメモリの量(バイト)。 | ゲージ | ||
Heap.Max | メモリ管理のために使うことができるヒープメモリの最大量(バイト) This value might not be necessarily equal to the maximum value specified through -Xmx or the equivalent Flink configuration parameter. Some GC algorithms allocate heap memory that won't be available to the user code and, therefore, not being exposed through the heap metrics. |
ゲージ | ||
NonHeap.Used | 現在使われている非ヒープメモリの量(バイト)。 | ゲージ | ||
NonHeap.Committed | JVMに利用可能だと保証されている非ヒープメモリの量(バイト)。 | ゲージ | ||
NonHeap.Max | メモリ管理のために使うことができる非ヒープメモリの最大量(バイト) | ゲージ | ||
Metaspace.Used | The amount of memory currently used in the Metaspace memory pool (in bytes). | ゲージ | ||
Metaspace.Committed | The amount of memory guaranteed to be available to the JVM in the Metaspace memory pool (in bytes). | ゲージ | ||
Metaspace.Max | The maximum amount of memory that can be used in the Metaspace memory pool (in bytes). | ゲージ | ||
Direct.Count | 直接のバッファプール内のバッファ数。 | ゲージ | ||
Direct.MemoryUsed | 直接のバッファプールのためのJVMによって使われているメモリ量(バイト)。 | ゲージ | ||
Direct.TotalCapacity | 直接のバッファプール内の全てのバッファの総許容量(バイト)。 | ゲージ | ||
Mapped.Count | マップされたバッファプール内のバッファの数。 | ゲージ | ||
Mapped.MemoryUsed | マップされたバッファプールのためのJVMによって使われているメモリ量(バイト)。 | ゲージ | ||
Mapped.TotalCapacity | マップされたバッファプール内のバッファの数(バイト)。 | ゲージ | ||
Status.Flink.Memory | Managed.Used | The amount of managed memory currently used. | ゲージ | |
Managed.Total | The total amount of managed memory. | ゲージ |
スレッド #
スコープ | Infix | メトリクス | 説明 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Threads | Count | 有効なスレッドの総数。 | ゲージ |
GarbageCollection #
スコープ | Infix | メトリクス | 説明 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.GarbageCollector | <GarbageCollector>.Count | 発生したコレクションの総数。 | ゲージ |
<GarbageCollector>.Time | ガベージコレクションを実施するために使われた総時間。 | ゲージ |
ClassLoader #
スコープ | Infix | メトリクス | 説明 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | ClassesLoaded | JVMが開始してからのロードされたクラスの総数。 | ゲージ |
ClassesUnloaded | JVMが開始してからのアンロードされたクラスの総数。 | ゲージ |
ネットワーク #
Deprecated: use Default shuffle service metrics
スコープ | Infix | メトリクス | 説明 | 種類 |
---|---|---|---|---|
TaskManager | Status.Network | AvailableMemorySegments | 使用されていないメモリのセグメントの数。 | ゲージ |
TotalMemorySegments | 割り当てられたメモリのセグメントの数。 | ゲージ | ||
タスク | buffers | inputQueueLength | キューされた入力バッファの数。(ブロッキング サブパーティションを使っている LocalInputChannel を無視します) | ゲージ |
outputQueueLength | キューされた出力バッファの数。 | ゲージ | ||
inPoolUsage | 入力バッファの使用率の推定。(ignores LocalInputChannels) | ゲージ | ||
inputFloatingBuffersUsage | An estimate of the floating input buffers usage. (ignores LocalInputChannels) | ゲージ | ||
inputExclusiveBuffersUsage | An estimate of the exclusive input buffers usage. (ignores LocalInputChannels) | ゲージ | ||
outPoolUsage | 出力バッファの使用率の推定。The pool usage can be > 100% if overdraft buffers are being used. | ゲージ | ||
Network.<Input|Output>.<gate|partition> (only available if taskmanager.network.detailed-metrics config option is set) |
totalQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの総数 | ゲージ | |
minQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの最小数。 | ゲージ | ||
maxQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの最大数。 | ゲージ | ||
avgQueueLen | 全ての入力/出力のチャネルの中でキューされたバッファの平均数。 | ゲージ |
デフォルトのシャッフル サービス #
nettyネットワーク通信を使用したタスク エグゼキュータ間のデータ交換に関係するメトリクス。
スコープ | Infix | メトリクス | 説明 | 種類 |
---|---|---|---|---|
TaskManager | Status.Shuffle.Netty | AvailableMemorySegments | 使用されていないメモリのセグメントの数。 | ゲージ |
UsedMemorySegments | The number of used memory segments. | ゲージ | ||
TotalMemorySegments | 割り当てられたメモリのセグメントの数。 | ゲージ | ||
AvailableMemory | The amount of unused memory in bytes. | ゲージ | ||
UsedMemory | The amount of used memory in bytes. | ゲージ | ||
TotalMemory | The amount of allocated memory in bytes. | ゲージ | ||
RequestedMemoryUsage | Experimental: The usage of the network memory. Shows (as percentage) the total amount of requested memory from all of the subtasks. It can exceed 100% as not all requested memory is required for subtask to make progress. However if usage exceeds 100% throughput can suffer greatly and please consider increasing available network memory, or decreasing configured size of network buffer pools. | ゲージ | ||
タスク | Shuffle.Netty.Input.Buffers | inputQueueLength | キューされた入力バッファの数。 | ゲージ |
inputQueueSize | The real size of queued input buffers in bytes. The size for local input channels is always `0` since the local channel takes records directly from the output queue. | ゲージ | ||
inPoolUsage | 入力バッファの使用率の推定。(ignores LocalInputChannels) | ゲージ | ||
inputFloatingBuffersUsage | An estimate of the floating input buffers usage. (ignores LocalInputChannels) | ゲージ | ||
inputExclusiveBuffersUsage | An estimate of the exclusive input buffers usage. (ignores LocalInputChannels) | ゲージ | ||
Shuffle.Netty.Output.Buffers | outputQueueLength | キューされた出力バッファの数。 | ゲージ | |
outputQueueSize | The real size of queued output buffers in bytes. | ゲージ | ||
outPoolUsage | 出力バッファの使用率の推定。The pool usage can be > 100% if overdraft buffers are being used. | ゲージ | ||
Shuffle.Netty.<Input|Output>.<gate|partition> (only available if taskmanager.network.detailed-metrics config option is set) |
totalQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの総数 | ゲージ | |
minQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの最小数。 | ゲージ | ||
maxQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの最大数。 | ゲージ | ||
avgQueueLen | 全ての入力/出力のチャネルの中でキューされたバッファの平均数。 | ゲージ | ||
Shuffle.Netty.Input | numBytesInLocal | このタスクがローカルソースから読み込んだ総バイト数。 | カウンター | |
numBytesInLocalPerSecond | このタスクがローカルのソースから読み込んだ秒あたりのバイト数。 | メーター | ||
numBytesInRemote | このタスクがリモートソースから読み込んだ総バイト数。 | カウンター | ||
numBytesInRemotePerSecond | このタスクがリモートのソースから読み込んだ秒あたりのバイト数。 | メーター | ||
numBuffersInLocal | このタスクがローカルソースから読み込んだネットワーク バッファの総数。 | カウンター | ||
numBuffersInLocalPerSecond | このタスクがローカルのソースから読み込んだ秒あたりのネットワークバッファの数 | メーター | ||
numBuffersInRemote | このタスクがリモートソースから読み込んだネットワーク バッファの総数。 | カウンター | ||
numBuffersInRemotePerSecond | このタスクがリモートのソースから読み込んだ秒あたりのネットワークバッファの数 | メーター |
クラスタ #
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
JobManager | numRegisteredTaskManagers | 登録れたタスク マネージャの数。 | ゲージ |
numPendingTaskManagers | (only applicable to Native Kubernetes / YARN) The number of outstanding taskmanagers that Flink has requested. | ゲージ | |
numRunningJobs | 実行中のジョブの数。 | ゲージ | |
taskSlotsAvailable | 利用可能なタスク スロットの数。 | ゲージ | |
taskSlotsTotal | タスクのスロットの総数。 | ゲージ |
可用性 #
The metrics in this table are available for each of the following job states: INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING. Whether these metrics are reported depends on the metrics.job.status.enable setting.
Evolving The semantics of these metrics may change in later releases.
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
Job (only available on JobManager) | <jobStatus>State | For a given state, return 1 if the job is currently in that state, otherwise return 0. | ゲージ |
<jobStatus>Time | For a given state, if the job is currently in that state, return the time (in milliseconds) since the job transitioned into that state, otherwise return 0. | ゲージ | |
<jobStatus>TimeTotal | For a given state, return how much time (in milliseconds) the job has spent in that state in total. | ゲージ |
Experimental
While the job is in the RUNNING state the metrics in this table provide additional details on what the job is currently doing. Whether these metrics are reported depends on the metrics.job.status.enable setting.
スコープ メトリクス 説明 種類 Job (only available on JobManager) deployingState Return 1 if the job is currently deploying* tasks, otherwise return 0. ゲージ deployingTime Return the time (in milliseconds) since the job has started deploying* tasks, otherwise return 0. ゲージ deployingTimeTotal Return how much time (in milliseconds) the job has spent deploying* tasks in total. ゲージ *A job is considered to be deploying tasks when:
- for streaming jobs, any task is in the DEPLOYING state
- for batch jobs, if at least 1 task is in the DEPLOYING state, and there are no INITIALIZING/RUNNING tasks
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
Job (only available on JobManager) | uptime | Attention: deprecated, use runningTime. | ゲージ |
downtime | Attention: deprecated, use restartingTime, cancellingTime failingTime. | ゲージ | |
fullRestarts | Attention: deprecated, use numRestarts. | ゲージ | |
numRestarts | The total number of restarts since this job was submitted, including full restarts and fine-grained restarts. | ゲージ |
チェックポイント #
Note that for failed checkpoints, metrics are updated on a best efforts basis and may be not accurate.
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
Job (only available on JobManager) | lastCheckpointDuration | 最後のチェックポイントを完了するためにかかった時間 (ミリ秒) | ゲージ |
lastCheckpointSize | The checkpointed size of the last checkpoint (in bytes), this metric could be different from lastCheckpointFullSize if incremental checkpoint or changelog is enabled. | ゲージ | |
lastCompletedCheckpointId | The identifier of the last completed checkpoint. | ゲージ | |
lastCheckpointFullSize | The full size of the last checkpoint (in bytes). | ゲージ | |
lastCheckpointExternalPath | 最後の外部チェックポイントが格納されたパス。 | ゲージ | |
lastCheckpointRestoreTimestamp | 最後のチェックポイントがコーディネーターで回復された時のタイムスタンプ (ミリ秒)。 | ゲージ | |
numberOfInProgressCheckpoints | 進行中のチェックポイントの数。 | ゲージ | |
numberOfCompletedCheckpoints | チェックポイントが完了した数。 | ゲージ | |
numberOfFailedCheckpoints | 失敗したチェックポイントの数。 | ゲージ | |
totalNumberOfCheckpoints | チェックポイントの総数 (進行中、完了、失敗)。 | ゲージ | |
Task | checkpointAlignmentTime | 最後の境界の割り当てが完了するまで掛かった時間のナノ秒数、あるいは現在の割り当てが始まってからどれくらい経ったか(ナノ秒)。This is the time between receiving first and the last checkpoint barrier. You can find more information in the [Monitoring State and Checkpoints section]({{< ref "docs/ops/state/large_state_tuning" >}}#monitoring-state-and-checkpoints) | ゲージ |
checkpointStartDelayNanos | The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew. | ゲージ |
State Access Latency #
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
Task/Operator | stateClearLatency | The latency of clear operation for state | ヒストグラム |
valueStateGetLatency | The latency of Get operation for value state | ヒストグラム | |
valueStateUpdateLatency | The latency of update operation for value state | ヒストグラム | |
listStateGetLatency | The latency of get operation for list state | ヒストグラム | |
listStateAddLatency | The latency of add operation for list state | ヒストグラム | |
listStateAddAllLatency | The latency of addAll operation for list state | ヒストグラム | |
listStateUpdateLatency | The latency of update operation for list state | ヒストグラム | |
listStateMergeNamespacesLatency | The latency of merge namespace operation for list state | ヒストグラム | |
mapStateGetLatency | The latency of get operation for map state | ヒストグラム | |
mapStatePutLatency | The latency of put operation for map state | ヒストグラム | |
mapStatePutAllLatency | The latency of putAll operation for map state | ヒストグラム | |
mapStateRemoveLatency | The latency of remove operation for map state | ヒストグラム | |
mapStateContainsLatency | The latency of contains operation for map state | ヒストグラム | |
mapStateEntriesInitLatency | The init latency of entries operation for map state | ヒストグラム | |
mapStateKeysInitLatency | The init latency of keys operation for map state | ヒストグラム | |
mapStateValuesInitLatency | The init latency of values operation for map state | ヒストグラム | |
mapStateIteratorInitLatency | The init latency of iterator operation for map state | ヒストグラム | |
mapStateIsEmptyLatency | The latency of isEmpty operation for map state | ヒストグラム | |
mapStateIteratorHasNextLatency | The latency of iterator#hasNext operation for map state | ヒストグラム | |
mapStateIteratorNextLatency | The latency of iterator#next operation for map state | ヒストグラム | |
mapStateIteratorRemoveLatency | The latency of iterator#remove operation for map state | ヒストグラム | |
aggregatingStateGetLatency | The latency of get operation for aggregating state | ヒストグラム | |
aggregatingStateAddLatency | The latency of add operation for aggregating state | ヒストグラム | |
aggregatingStateMergeNamespacesLatency | The latency of merge namespace operation for aggregating state | ヒストグラム | |
reducingStateGetLatency | The latency of get operation for reducing state | ヒストグラム | |
reducingStateAddLatency | The latency of add operation for reducing state | ヒストグラム | |
reducingStateMergeNamespacesLatency | The latency of merge namespace operation for reducing state | ヒストグラム |
RocksDB #
Certain RocksDB native metrics are available but disabled by default, you can find full documentation here
State Changelog #
Note that the metrics are only available via reporters.
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
Job (only available on TaskManager) | numberOfUploadRequests | Total number of upload requests made | カウンター |
numberOfUploadFailures | Total number of failed upload requests (request may be retried after the failure) | カウンター | |
attemptsPerUpload | The number of attempts per upload | ヒストグラム | |
totalAttemptsPerUpload | The total count distributions of attempts for per upload | ヒストグラム | |
uploadBatchSizes | The number of upload tasks (coming from one or more writers, i.e. backends/tasks) that were grouped together and form a single upload resulting in a single file | ヒストグラム | |
uploadLatenciesNanos | The latency distributions of uploads | ヒストグラム | |
uploadSizes | The size distributions of uploads | ヒストグラム | |
uploadQueueSize | Current size of upload queue. Queue items can be packed together and form a single upload. | ゲージ | |
Task/Operator | startedMaterialization | The number of started materializations. | カウンター |
completedMaterialization | The number of successfully completed materializations. | カウンター | |
failedMaterialization | The number of failed materializations. | カウンター | |
lastDurationOfMaterialization | The duration of the last materialization (in milliseconds). | ゲージ | |
lastFullSizeOfMaterialization | The full size of the materialization part of the last reported checkpoint (in bytes). | ゲージ | |
lastIncSizeOfMaterialization | The incremental size of the materialization part of the last reported checkpoint (in bytes). | ゲージ | |
lastFullSizeOfNonMaterialization | The full size of the non-materialization part of the last reported checkpoint (in bytes). | ゲージ | |
lastIncSizeOfNonMaterialization | The incremental size of the non-materialization part of the last reported checkpoint (in bytes). | ゲージ |
IO #
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
Job (only available on TaskManager) | [<source_id>.[<source_subtask_index>.]]<operator_id>.<operator_subtask_index>.latency | The latency distributions from a given source (subtask) to an operator subtask (in milliseconds), depending on the }}#metrics-latency-granularity">latency granularity. | ヒストグラム |
Task | numBytesInLocal | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | カウンター |
numBytesInLocalPerSecond | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | メーター | |
numBytesInRemote | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | カウンター | |
numBytesInRemotePerSecond | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | メーター | |
numBuffersInLocal | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | カウンター | |
numBuffersInLocalPerSecond | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | メーター | |
numBuffersInRemote | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | カウンター | |
numBuffersInRemotePerSecond | Attention: deprecated, use }}#default-shuffle-service">Default shuffle service metrics. | メーター | |
numBytesOut | このタスクが発行されてからの総バイト数。 | カウンター | |
numBytesOutPerSecond | このタスクが発行した秒あたりのバイト数。 | メーター | |
numBuffersOut | このタスクが発行されてからのネットワークバッファの総数。 | カウンター | |
numBuffersOutPerSecond | このタスクが発行した秒あたりのネットワークバッファ数。 | メーター | |
isBackPressured | Whether the task is back-pressured. | ゲージ | |
idleTimeMsPerSecond | The time (in milliseconds) this task is idle (has no data to process) per second. Idle time excludes back pressured time, so if the task is back pressured it is not idle. | メーター | |
busyTimeMsPerSecond | The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated. | ゲージ | |
backPressuredTimeMsPerSecond | The time (in milliseconds) this task is back pressured (soft or hard) per second. It's a sum of softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond. | ゲージ | |
softBackPressuredTimeMsPerSecond | The time (in milliseconds) this task is softly back pressured per second. Softly back pressured task will be still responsive and capable of for example triggering unaligned checkpoints. | ゲージ | |
hardBackPressuredTimeMsPerSecond | The time (in milliseconds) this task is back pressured in a hard way per second. During hard back pressured task is completely blocked and unresponsive preventing for example unaligned checkpoints from triggering. | ゲージ | |
maxSoftBackPressuredTimeMs | Maximum recorded duration of a single consecutive period of the task being softly back pressured in the last sampling period. Please check softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond for more information. | ゲージ | |
maxHardBackPressuredTimeMs | Maximum recorded duration of a single consecutive period of the task being in the hard back pressure state in the last sampling period. Please check softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond for more information. | ゲージ | |
changelogBusyTimeMsPerSecond | The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'dstl.dfs.upload.max-in-flight' for more information. | ゲージ | |
mailboxMailsPerSecond | The number of actions processed from the task's mailbox per second which includes all actions, e.g., checkpointing, timer, or cancellation actions. | メーター | |
mailboxLatencyMs | The latency is the time that actions spend waiting in the task's mailbox before being processed. The metric is a statistic of the latency in milliseconds that is measured approximately once every second and includes the last 60 measurements. | ヒストグラム | |
mailboxQueueSize | The number of actions in the task's mailbox that are waiting to be processed. | ゲージ | |
initializationTime | The time in milliseconds that one task spends on initialization, return 0 when the task is not in initialization/running status. Most of the initialization time is usually spent in restoring from the checkpoint. | カウンター | |
Task (only if buffer debloating is enabled and in non-source tasks) | estimatedTimeToConsumeBuffersMs | The estimated time (in milliseconds) by the buffer debloater to consume all of the buffered data in the network exchange preceding this task. This value is calculated by approximated amount of the in-flight data and calculated throughput. | ゲージ |
debloatedBufferSize | The desired buffer size (in bytes) calculated by the buffer debloater. Buffer debloater is trying to reduce buffer size when the amount of in-flight data (after taking into account current throughput) exceeds the configured target value. | ゲージ | |
Task/Operator | numRecordsIn | このオペレータ/タスクが受け取ったレコードの総数。 | カウンター |
numRecordsInPerSecond | このオペレータ/タスクが受け取った秒あたりのレコード数。 | メーター | |
numRecordsOut | このオペレータ/タスクが発行したレコードの総数。 | カウンター | |
numRecordsOutPerSecond | このオペレータ/タスクが送信した秒あたりのレコード数。 | メーター | |
numLateRecordsDropped | このオペレータ/タスクが到着の遅延のために取り零したレコードの数。 | カウンター | |
currentInputWatermark |
このオペレータ/タスクが受信した最後のウォーターマーク(ミリ秒)。
Note: For operators/tasks with 2 inputs this is the minimum of the last received watermarks. |
ゲージ | |
Operator | currentInputNWatermark |
The last watermark this operator has received in its N'th input (in milliseconds), with index N starting from 1. For example currentInput1Watermark, currentInput2Watermark, ...
Note: Only for operators with 2 or more inputs. |
ゲージ |
currentOutputWatermark | このオペレータが発行した最後のウォーターマーク(ミリ秒)。 | ゲージ | |
watermarkAlignmentDrift |
The current drift from the minimal watermark emitted by all sources/tasks/splits that belong
to the same watermark group.
Note: Available only when watermark alignment is enabled and the first common watermark is announced. You can configure the update interval in the WatermarkStrategy. |
ゲージ | |
numSplitsProcessed | このデータソースが処理されてからの総InputSplit数 (もしオペレータがデータソースの場合)。 | ゲージ |
コネクタ #
Kafka コネクタ #
Please refer to Kafka monitoring.
Kinesis Source #
スコープ | メトリクス | ユーザ変数 | 説明 | 種類 |
---|---|---|---|---|
オペレータ | millisBehindLatest | stream, shardId | The number of milliseconds the consumer is behind the head of the stream, indicating how far behind current time the consumer is, for each Kinesis shard. 特定のシャードのメトリックは、ストリーム名とシャードidで指定することができます。 A value of 0 indicates record processing is caught up, and there are no new records to process at this moment. -1の値はメトリックにまだレポートされた値が無いことを示します。 | ゲージ |
オペレータ | sleepTimeMillis | stream, shardId | コンシューマがKinesisからレコードを取得する前にスリープに費やすミリ秒。 特定のシャードのメトリックは、ストリーム名とシャードidで指定することができます。 | ゲージ |
オペレータ | maxNumberOfRecordsPerFetch | stream, shardId | Kinesisへの1回のgetRecord呼び出しでコンシューマが要求するレコードの最大数。If ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS is set to true, this value is adaptively calculated to maximize the 2 Mbps read limits from Kinesis. | ゲージ |
オペレータ | numberOfAggregatedRecordsPerFetch | stream, shardId | Kinesisへの1回のgetRecordの呼び出しでコンシューマによってフェッチされる集約されたKinesisのレコード数。 | ゲージ |
オペレータ | numberOfDeggregatedRecordsPerFetch | stream, shardId | Kinesisへの1回のgetRecordの呼び出しでコンシューマによってフェッチされる集約されていないKinesisのレコード数。 | ゲージ |
オペレータ | averageRecordSizeBytes | stream, shardId | 1回のgetRecordの呼び出しでコンシューマによってフェッチされるKinesisレコードの平均サイズのバイト数。 | ゲージ |
オペレータ | runLoopTimeNanos | stream, shardId | 実行ループでコンシューマが実際に掛かった時間のナノ秒。 | ゲージ |
オペレータ | loopFrequencyHz | stream, shardId | 1秒間のgetRecordの呼び出し回数。 | ゲージ |
オペレータ | bytesRequestedPerFetch | stream, shardId | getRecordへの1回の呼び出しで要求されたバイト数 (2 Mbps / loopFrequencyHz)。 | ゲージ |
Kinesis Sink #
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
オペレータ | numRecordsOutErrors (deprecated, please use numRecordsSendErrors) | Number of rejected record writes. | カウンター |
オペレータ | numRecordsSendErrors | Number of rejected record writes. | カウンター |
オペレータ | CurrentSendTime | Number of ms taken for 1 round trip of the last request batch. | ゲージ |
HBase Connectors #
スコープ | メトリクス | ユーザ変数 | 説明 | 種類 |
---|---|---|---|---|
オペレータ | lookupCacheHitRate | n/a | Cache hit ratio for lookup. | ゲージ |
システム リソース #
システムリソースのレポートはデフォルトで無効です。When metrics.system-resource
is enabled additional metrics listed below will be available on Job- and TaskManager.
System resources metrics are updated periodically and they present average values for a
configured interval (metrics.system-resource-probing-interval
).
System resources reporting requires an optional dependency to be present on the
classpath (for example placed in Flink’s lib
directory):
com.github.oshi:oshi-core:6.1.5
(licensed under MIT license)
Including it’s transitive dependencies:
net.java.dev.jna:jna-platform:jar:5.10.0
net.java.dev.jna:jna:jar:5.10.0
Failures in this regard will be reported as warning messages like NoClassDefFoundError
logged by SystemResourcesMetricsInitializer
during the startup.
システム CPU #
スコープ | Infix | メトリクス | 説明 |
---|---|---|---|
Job-/TaskManager | System.CPU | 使い方 | マシーンのCPU使用率の全体的な%。 |
Idle | % of CPU Idle time on the machine. | ||
Sys | % of System CPU time on the machine. | ||
ユーザ | % of User CPU time on the machine. | ||
IOWait | % of IOWait CPU time on the machine. | ||
Irq | % of Irq CPU time on the machine. | ||
SoftIrq | % of SoftIrq CPU time on the machine. | ||
Nice | % of Nice CPU time on the machine. | ||
Steal | % of Steal CPU time on the machine. | ||
Load1min | 1分間の平均CPUロード | ||
Load5min | 5分間の平均CPUロード | ||
Load15min | 15分間の平均CPUロード | ||
UsageCPU* | 各プロセッサごとのCPU使用率の% |
システム メモリ #
スコープ | Infix | メトリクス | 説明 |
---|---|---|---|
Job-/TaskManager | System.Memory | 利用可能 | 利用可能なメモリのバイト数 |
Total | 総メモリのバイト数 | ||
System.Swap | Used | 使用済みswapバイト数 | |
Total | 総swapバイト数 |
システム ネットワーク #
スコープ | Infix | メトリクス | 説明 |
---|---|---|---|
Job-/TaskManager | System.Network.INTERFACE_NAME | ReceiveRate | 秒間あたりの平均受信レートのバイト数 |
SendRate | 秒間あたりの平均送信レートのバイト数 |
投機的実行 #
Metrics below can be used to measure the effectiveness of speculative execution.
スコープ | メトリクス | 説明 | 種類 |
---|---|---|---|
Job (only available on JobManager) | numSlowExecutionVertices | Number of slow execution vertices at the moment. | ゲージ |
numEffectiveSpeculativeExecutions | Number of effective speculative execution attempts, i.e. speculative execution attempts which finish earlier than their corresponding original attempts. | カウンター |
End-to-End latency tracking #
Flinkによりシステム内を移動するレコードのレイテンシを追跡することができます。この機能はデフォルトで無効です。
To enable the latency tracking you must set the latencyTrackingInterval
to a positive number in either the
Flink configuration or ExecutionConfig
.
At the latencyTrackingInterval
, the sources will periodically emit a special record, called a LatencyMarker
.
マーカーはレコードがソースから発行された時間からのタイムスタンプを含みます。
Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator,
it will add to the latency tracked by the marker.
Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. 特にマーカーは例えばウィンドウのバッファでレコードが費やした時間を計上しません。 Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.
The LatencyMarker
s are used to derive a distribution of the latency between the sources of the topology and each
downstream operator. これらの分布はヒストグラム メトリクスとして報告されます。The granularity of these distributions can
be controlled in the Flink configuration. For the highest
granularity subtask
Flink will derive the latency distribution between every source subtask and every downstream
subtask, which results in quadratic (in the terms of the parallelism) number of histograms.
現在のところ、Flinkはクラスタ内の全てのマシーンのクロックが同期していることを仮定します。We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.
Warning Enabling latency metrics can significantly impact the performance
of the cluster (in particular for subtask
granularity). It is highly recommended to only use them for debugging
purposes.
State access latency tracking #
Flink also allows to track the keyed state access latency for standard Flink state-backends or customized state backends which extending from AbstractStateBackend
. この機能はデフォルトで無効です。
To enable this feature you must set the state.backend.latency-track.keyed-state-enabled
to true in the Flink configuration.
Once tracking keyed state access latency is enabled, Flink will sample the state access latency every N
access, in which N
is defined by state.backend.latency-track.sample-interval
.
This configuration has a default value of 100. A smaller value will get more accurate results but have a higher performance impact since it is sampled more frequently.
As the type of this latency metrics is histogram, state.backend.latency-track.history-size
will control the maximum number of recorded values in history, which has the default value of 128.
A larger value of this configuration will require more memory, but will provide a more accurate result.
Warning Enabling state-access-latency metrics may impact the performance. It is recommended to only use them for debugging purposes.
REST API の統合 #
Metrics can be queried through the Monitoring REST API.
以下はサンプルのJSON応答を使った利用可能なエンドポイントのリストです。All endpoints are of the sample form http://hostname:8081/jobmanager/metrics
, below we list only the path part of the URLs.
Values in angle brackets are variables, for example http://hostname:8081/jobs/<jobid>/metrics
will have to be requested for example as http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics
.
特定のエンティティのためのメトリクスのリクエスト:
/jobmanager/metrics
/taskmanagers/<taskmanagerid>/metrics
/jobs/<jobid>/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
それぞれの型の全てのエンティティを横断して集約されたメトリクスのリクエスト:
/taskmanagers/metrics
/jobs/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
それぞれの型の全てのエンティティの部分集合上の集約されたメトリクスのリクエスト:
/taskmanagers/metrics?taskmanagers=A,B,C
/jobs/metrics?jobs=D,E,F
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
Warning Metric names can contain special characters that you need to escape when querying metrics.
For example, “a_+_b
” would be escaped to “a_%2B_b
”.
List of characters that should be escaped:
Character | Escape Sequence |
---|---|
# | %23 |
$ | %24 |
& | %26 |
+ | %2B |
/ | %2F |
; | %3B |
= | %3D |
? | %3F |
@ | %40 |
利用可能なメトリクスのリストのリクエスト:
GET /jobmanager/metrics
[
{
"id": "metric1"
},
{
"id": "metric2"
}
]
特定の(集約されていない)メトリクスの値のリクエスト:
GET taskmanagers/ABCDE/metrics?get=metric1,metric2
[
{
"id": "metric1",
"value": "34"
},
{
"id": "metric2",
"value": "2"
}
]
特定のメトリクスについての集約された値のリクエスト:
GET /taskmanagers/metrics?get=metric1,metric2
[
{
"id": "metric1",
"min": 1,
"max": 34,
"avg": 15,
"sum": 45
},
{
"id": "metric2",
"min": 2,
"max": 14,
"avg": 7,
"sum": 16
}
]
特定のメトリクスについての集約された値のリクエスト:
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[
{
"id": "metric1",
"min": 1,
"max": 34
},
{
"id": "metric2",
"min": 2,
"max": 14
}
]
ダッシュボードの統合 #
各タスクあるいはオペレータについて集められたメトリクスはダッシュボード内で可視化することもできます。On the main page for a
job, select the Metrics
tab. After selecting one of the tasks in the top graph you can select metrics to display using
the Add Metric
drop-down menu.
- Task metrics are listed as
<subtask_index>.<metric_name>
. - Operator metrics are listed as
<subtask_index>.<operator_name>.<metric_name>
.
各メトリックはx軸が時間をy軸が計測された値を表す、個別のグラフとして可視化されるでしょう。 全てのグラフは自動的に10秒ごとに更新され、他のページに移動するまでそうし続けます。
可視化されるメトリクスの数には制限がありません; しかしながら数値メトリクスのみが可視化可能です。