Flink exposes a metric system that allows gathering and exposing metrics to external systems.
getRuntimeContext().getMetricGroup()
を呼び出すことで、RichFunction を継承した全てのユーザ関数からメトリック システムにアクセスすることができます。このメソッドは新しいメトリックを生成および登録できるMetricGroup
オブジェクトを返します。
Flink はCounters
, Gauges
, Histograms
および Meters
をサポートします。
Counter
は何かの数を数えるために使われます。現在値はinc()/inc(long n)
または dec()/dec(long n)
を使って、増減あるいは加減することができます。MetricGroup
上でcounter(String name)
を呼ぶことでCounter
を生成および登録することができます。
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter")
}
override def map(value: String): String = {
counter.inc()
value
}
}
別のやり方として、独自の Counter
実装を使うこともできます。
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter())
}
override def map(value: String): String = {
counter.inc()
value
}
}
Gauge
はその場で任意の型の値を提供します。Gauge
を使うには、まずorg.apache.flink.metrics.Gauge
インタフェースを実装するクラスを生成する必要があります。帰り値の型についての制限はありません。MetricGroup
上でgauge(String name, Gauge gauge)
を呼び出すことで Gaugeを登録することができます。
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
@Override
public String map(String value) throws Exception {
valueToExpose++;
return value;
}
}
new class MyMapper extends RichMapFunction[String,String] {
@transient private var valueToExpose = 0
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
override def map(value: String): String = {
valueToExpose += 1
value
}
}
レポーターは公開されたオブジェクトを String
に変化します。このことは意味のあるtoString()
実装が必要とされることを意味します。
Histogram
は長い値の分散を測定します。MetricGroup
上でhistogram(String name, Histogram histogram)
を呼び出すことで登録することができます。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var histogram: Histogram
override def open(parameters: Configuration): Unit = {
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram())
}
override def map(value: Long): Long = {
histogram.update(value)
value
}
}
Flink はHistogram
のためのデフォルトの実装を提供しませんが、Codahale/DropWizard ヒストグラムを使うことができるラッパーを提供します。このラッパーを使用するには、pom.xml
の中に以下の依存を追加します:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
そして、以下のように Codahale/DropWizard ヒストグラムを登録することができます。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
class MyMapper extends RichMapFunction[Long, Long] {
@transient private var histogram: Histogram
override def open(config: Configuration): Unit = {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
}
override def map(value: Long): Long = {
histogram.update(value)
value
}
}
A Meter
measures an average throughput. An occurrence of an event can be registered with the markEvent()
method. Occurrence of multiple events at the same time can be registered with markEvent(long n)
method.
You can register a meter by calling meter(String name, Meter meter)
on a MetricGroup
.
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter
override def open(config: Configuration): Unit = {
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter())
}
override def map(value: Long): Long = {
meter.markEvent()
value
}
}
Flink はCodahale/DropWizard meters の利用ができる Wrapper を提供します。このラッパーを使うには、以下の依存をpom.xml
に追加してください:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
以下のように Codahale/DropWizard meter を登録することができます:
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter
override def open(config: Configuration): Unit = {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
}
override def map(value: Long): Long = {
meter.markEvent()
value
}
}
各メトリックは識別子とメトリックが報告されるキー-値のペアのセットが割り当てられます。
識別子は3つのコンポーネントに基づきます: メトリックを登録する時のユーザ定義名、任意のユーザ定義のスコープ、およびシステムが提供するスコープ。例えば、もしA.B
がシステムスコープだhであれば、C.D
はユーザスコープで、E
はその名前です。そして、メトリックの識別子はA.B.C.D.E
でしょう。
conf/flink-conf.yaml
の中のmetrics.scope.delimiter
キーを設定することで、識別子のためのデリミタ(デフォルト: .
) を使うかを設定することができます。
MetricGroup#addGroup(String name)
, MetricGroup#addGroup(int name)
あるいは Metric#addGroup(String key, String value)
を呼び出すことで、ユーザスコープを定義することができます。これらのメソッドはMetricGroup#getMetricIdentifier
および MetricGroup#getScopeComponents
が何を返すかに影響します。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter")
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
システムスコープはメトリックについてのコンテキスト情報を含みます。例えば、どのタスク内で登録されたか、あるいはタスクが何のジョブに所属しているか。
conf/flink-conf.yaml
内に以下のキーを設定することで、どのコンテキスト情報が含まれなければならないかを設定することができます。これらのキーのそれぞれは実行時に置き換えられる定数(例えば"taskmanager") および変数(例えば"<task_id>")を含むかも知れない文字列形式を期待します。
metrics.scope.jm
metrics.scope.jm.job
metrics.scope.tm
metrics.scope.tm.job
metrics.scope.task
metrics.scope.operator
変数の数あるいは順番に制限はありません。変数は大文字小文字を区別します。
オペレータ メトリックのためのデフォルトのスコープは、結果的にlocalhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
への識別子の類似になるでしょう。
タスク名を含めたいがタスクマネージャーの情報を省略したい場合は、以下の形式を指定することもできます:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
これは、識別子 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
を生成するかも知れません。
Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. そのように、ジョブおよびオペレータにID (例えば <job_id>)を含めるか、ユニークな名前を割り当てることでユニークさをある程度提供するような形式の文字列を使うことをお勧めします。
重要: バッチAPIについては、<operator_id> は常に <task_id> に等しいです。
MetricGroup#addGroup(String key, String value)
を呼び出すことでユーザ変数を定義することができます。このメソッドはMetricGroup#getMetricIdentifier
, MetricGroup#getScopeComponents
および MetricGroup#getAllVariables()
が何を返すかに影響します。
重要: ユーザ変数はスコープの形式内で使うことができません。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
conf/flink-conf.yaml
内の1つ以上のレポーターを設定することで、外部死すt無にメトリクスを公開することができます。これらのレポートは開始時に各ジョブおよびタスクマネージャ上でインスタンス化されるでしょう。
metrics.reporter.<name>.<config>
: <name>
という名前のレポーターのための一般的な設定<config>
。metrics.reporter.<name>.class
: <name>
という名前のレポーターが使うレポータークラス。metrics.reporter.<name>.interval
: <name>
という名前のレポーターが使うレポーター間隔。metrics.reporter.<name>.scope.delimiter
: <name>
という名前のレポーターのための識別子に使われるデリミタ(デフォルトの値は metrics.scope.delimiter
を使います)。metrics.reporters
: (任意) レポーター名のカンマ区切りのインクルード リスト。デフォルトでは全ての設定されたレポーターが使われるでしょう。全てのレポーターは少なくともclass
プロパティを持つ必要があります。幾つかはレポートの間隔
を指定することができます。以下で、各レポーターに固有のもっと多くの設定をリスト化する予定です:
複数のレポーターを指定するレポーター設定の例:
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
重要: レポーターを含むjarはFlinkが開始した時にそれを/libフォルダ内に置くことでアクセス可能でなければなりません。
org.apache.flink.metrics.reporter.MetricReporter
インタフェースを実装することで、独自のReporter
を書くことができます。レポーターが定期的にレポートを送信しなければならない場合、Scheduled
インタフェースも実装しなければなりません。
以下の章はサポートされるレポーターをリスト表示します。
JMXレポーターはデフォルトで利用可能ですが有効では無いため、追加の依存を含める必要はありません。
パラメータ:
port
- (任意) 接続のためにJMXがlistenするポート。これはポート範囲でも可能です。範囲が指定された場合は、実際のポートが関係するジョブあるいはタスクマネージャーのログに現れます。この設定が設定されている場合、Flinkは指定されたポート/範囲の特別なJMXコネクタを開始するでしょう。メトリクスはデフォルトのローカルJMXインタフェース上で常に利用可能です。設定例
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789
JMXを使って公開されるMetricsはドメインとキー-プロパティのリストによって識別されます。これらは共にオブジェクト名からきています。
ドメインは常に生成されたメトリックの識別子が後ろに続くorg.apache.flink
で始まります。通常の識別子と対称的に、それはスコープの形式に影響されず、変数を含まず、ジョブを横断して不変です。そのようなドメインのための例はorg.apache.flink.job.task.numBytesOut
でしょう。
キー-プロパティ リストは、設定されたスコープ形式に関係なく、指定されたメトリックと関連する全ての変数のための値を含みます。そのようなリストの例はhost=localhost,job_name=MyJob,task_name=MyTask
でしょう。
従ってドメインメトリッククラスを識別しますが、キー-プロパティリストはメトリックの1つ(あるいは複数の)インスタンスを識別します。
このレポーターを使うには、/opt/flink-metrics-ganglia-1.5-SNAPSHOT.jar
をFlink配布物の/lib
フォルダにコピーする必要があります。
パラメータ:
host
- gmond.conf
内のudp_recv_channel.bind
の元で設定されたgmondホストアドレスport
- gmond.conf
内のudp_recv_channel.port
の元で設定されたgmondポートtmax
- 古いメトリックがどれだけ維持されなければならないかのソフトリミットdmax
- 古いメトリックがどれだけ維持されなければならないかのハードリミットttl
- 転送されたUDPパケットのためのtime-to-liveaddressingMode
- 使用するUDPアドレスモード (UNICAST/MULTICAST)設定例
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST
このレポーターを使うには、/opt/flink-metrics-graphite-1.5-SNAPSHOT.jar
をFlink配布物の/lib
フォルダにコピーする必要があります。
パラメータ:
host
- Graphite サーバのホストport
- Graphite サーバのポートprotocol
- 使用するプロトコル (TCP/UDP)設定例
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
このレポーターを使うには、/opt/flink-metrics-prometheus-1.5-SNAPSHOT.jar
をFlink配布物の/lib
フォルダにコピーする必要があります。
パラメータ:
port
- (任意) Prometheus exporter がlistenするポート、デフォルトは9249。1つのホスト上でレポーターの幾つかのインスタンスが実行できるようにするために(例えば、1つのタスクマネージャがジョブマネージャと同じ場所に配置される)、9250-9260
のようなポート範囲を使うことをお勧めします。設定例
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
Flink のメトリックの型は以下のように Prometheus メトリックにマップされます:
Flink | Prometheus | 注意 |
---|---|---|
Counter | Gauge | Prometheus counters は減らすことができません。 |
Gauge | Gauge | 数値と真偽値のみがサポートされます。 |
ヒストグラム | 概要 | 変位値 .5, .75, .95, .98, .99 および .999 |
Meter | Gauge | gauge はメーターのレートを出力します。 |
全てのFlinkのメトリクス変数(全ての変数のリストを見てください) はラベルとして Prometheus に出力されます。
このレポーターを使うには、/opt/flink-metrics-statsd-1.5-SNAPSHOT.jar
をFlink配布物の/lib
フォルダにコピーする必要があります。
パラメータ:
host
- StatsD サーバのホストport
- StatsD サーバのポート設定例
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
このレポーターを使うには、/opt/flink-metrics-datadog-1.5-SNAPSHOT.jar
をFlink配布物の/lib
フォルダにコピーする必要があります。
<host>
, <job_name>
, <tm_id>
, <subtask_index>
, <task_name>
および <operator_name>
のようなFlinkのメトリクス内の全ての変数はタグとしてDatadogに送信されるでしょう。タグはhost:localhost
およびjob_name:myjobname
のように見えるでしょう。
パラメータ:
apikey
- Datadog API キーtags
- (任意) Datadogに送信する時にメトリクスに適用されるグローバルタグ。タグはカンマでのみ区切られるべきです。設定例
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod
このレポーターを使うには、/opt/flink-metrics-slf4j-1.5-SNAPSHOT.jar
をFlink配布物の/lib
フォルダにコピーする必要があります。
設定例
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS
デフォルトでは、Flinkは現在の状態の深い洞察を提供する幾つかのメトリクスを集めます。この章はこれら全てのメトリクスのリファレンスです。
以下の表は一般的に5つのカラムを特色とします:
“Scope” カラムはどのスコープ形式がシステムのスコープを生成するために使われるかを記述します。例えば、もしセルが “Operator” を含む場合、“metrics.scope.operator” のためのスコープ形式が使われます。If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.
(任意の)”Infix” カラムはどのinfixがシステムスコープに追加されるかを記述します。
“Metrics” カラムは指定されたスコープとinfixについて登録された全てのメトリクスの名前をリスト化します。
“Description” カラムは指定されたカラムが何について計測しているかの情報を提供します。
“Type” カラムはどのメトリック型が計測のために使われるかを記述します。
infix/metric 名のカラム内の全てのドットはまだ “metrics.delimiter” 設定の支配を受けることに注意してください。
従って、メトリックの識別子を推測するには:
スコープ | Infix | マトリックス | 解説 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.CPU | Load | JVMの最近のCPU使用率。 | Gauge |
時間 | JVMによって使われたCPU時間。 | Gauge |
スコープ | Infix | マトリックス | 解説 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Memory | Heap.Used | 現在使われているヒープメモリの量(バイト)。 | Gauge |
Heap.Committed | JVMに利用可能だと保証されているヒープメモリの量(バイト)。 | Gauge | ||
Heap.Max | メモリ管理のために使うことができるヒープメモリの最大量(バイト) | Gauge | ||
NonHeap.Used | 現在使われている非ヒープメモリの量(バイト)。 | Gauge | ||
NonHeap.Committed | JVMに利用可能だと保証されている非ヒープメモリの量(バイト)。 | Gauge | ||
NonHeap.Max | メモリ管理のために使うことができる非ヒープメモリの最大量(バイト) | Gauge | ||
Direct.Count | 直接のバッファプール内のバッファ数。 | Gauge | ||
Direct.MemoryUsed | 直接のバッファプールのためのJVMによって使われているメモリ量(バイト)。 | Gauge | ||
Direct.TotalCapacity | 直接のバッファプール内の全てのバッファの総許容量(バイト)。 | Gauge | ||
Mapped.Count | マップされたバッファプール内のバッファの数。 | Gauge | ||
Mapped.MemoryUsed | マップされたバッファプールのためのJVMによって使われているメモリ量(バイト)。 | Gauge | ||
Mapped.TotalCapacity | マップされたバッファプール内のバッファの数(バイト)。 | Gauge |
スコープ | Infix | マトリックス | 解説 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | Threads.Count | 有効なスレッドの総数。 | Gauge |
スコープ | Infix | マトリックス | 解説 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.GarbageCollector | <GarbageCollector>.Count | 発生したコレクションの総数。 | Gauge |
<GarbageCollector>.Time | ガベージコレクションを実施するために使われた総時間。 | Gauge |
スコープ | Infix | マトリックス | 解説 | 種類 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | ClassesLoaded | JVMが開始してからのロードされたクラスの総数。 | Gauge |
ClassesUnloaded | JVMが開始してからのアンロードされたクラスの総数。 | Gauge |
スコープ | Infix | マトリックス | 解説 | 種類 |
---|---|---|---|---|
TaskManager | Status.Network | AvailableMemorySegments | 使用されていないメモリのセグメントの数。 | Gauge |
TotalMemorySegments | 割り当てられたメモリのセグメントの数。 | Gauge | ||
タスク | buffers | inputQueueLength | キューされた入力バッファの数。 | Gauge |
outputQueueLength | キューされた出力バッファの数。 | Gauge | ||
inPoolUsage | 入力バッファの使用率の推定。 | Gauge | ||
outPoolUsage | 出力バッファの使用率の推定。 | Gauge | ||
Network.<Input|Output>.<gate> taskmanager.net.detailed-metrics の設定オプションが設定されている場合のみ利用可能です) |
totalQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの総数 | Gauge | |
minQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの最小数。 | Gauge | ||
maxQueueLen | 全ての入力/出力チャネルの中でキューされたバッファの最大数。 | Gauge | ||
avgQueueLen | 全ての入力/出力のチャネルの中でキューされたバッファの平均数。 | Gauge |
スコープ | マトリックス | 解説 | 種類 |
---|---|---|---|
JobManager | numRegisteredTaskManagers | 登録れたタスク マネージャの数。 | Gauge |
numRunningJobs | 実行中のジョブの数。 | Gauge | |
taskSlotsAvailable | 利用可能なタスク スロットの数。 | Gauge | |
taskSlotsTotal | タスクのスロットの総数。 | Gauge |
スコープ | マトリックス | 解説 | 種類 |
---|---|---|---|
Job (ジョブマネージャでのみ利用可能) | restartingTime | ジョブを再開した時間、あるいは現在の再起動が進行中になってからどれだけ経ったか (ミリ病)。 | Gauge |
uptime |
ジョブが妨害無しに実行されている時間。
ジョブが完了した場合は -1 を返します。 |
Gauge | |
downtime |
現在失敗/回復状態にあるジョブについて、この停止期間の間に経過した時間。
実行中のジョブについては0、完了したジョブについては -1 を返します (ミリ秒)。 |
Gauge | |
fullRestarts | ジョブがサブミットされてから全ての再起動の総数 (ミリ秒)。 | Gauge |
スコープ | マトリックス | 解説 | 種類 |
---|---|---|---|
Job (ジョブマネージャでのみ利用可能) | lastCheckpointDuration | 最後のチェックポイントを完了するためにかかった時間 (ミリ秒) | Gauge |
lastCheckpointSize | 最後のチェックポイントの総サイズ。 | Gauge | |
lastCheckpointExternalPath | 最後の外部チェックポイントが格納されたパス。 | Gauge | |
lastCheckpointRestoreTimestamp | 最後のチェックポイントがコーディネーターで回復された時のタイムスタンプ (ミリ秒)。 | Gauge | |
lastCheckpointAlignmentBuffered | 最後のチェックポイントについて全てのサブタスク上の割り当ての間にバッファされたバイトの数 (バイト)。 | Gauge | |
numberOfInProgressCheckpoints | 進行中のチェックポイントの数。 | Gauge | |
numberOfCompletedCheckpoints | チェックポイントが完了した数。 | Gauge | |
numberOfFailedCheckpoints | 失敗したチェックポイントの数。 | Gauge | |
totalNumberOfCheckpoints | チェックポイントの総数 (進行中、完了、失敗)。 | Gauge | |
タスク | checkpointAlignmentTime | 最後の境界の割り当てが完了するまで掛かった時間のナノ秒数、あるいは現在の割り当てが始まってからどれくらい経ったか(ナノ秒)。 | Gauge |
スコープ | マトリックス | 解説 | 種類 |
---|---|---|---|
タスク | currentLowWatermark | このタスクが受信した最低のウォーターマーク(ミリ秒)。 | Gauge |
numBytesInLocal | このタスクがローカルソースから読み込んだ総バイト数。 | Counter | |
numBytesInLocalPerSecond | このタスクがローカルのソースから読み込んだ秒あたりのバイト数。 | Meter | |
numBytesInRemote | このタスクがリモートソースから読み込んだ総バイト数。 | Counter | |
numBytesInRemotePerSecond | このタスクがリモートのソースから読み込んだ秒あたりのバイト数。 | Meter | |
numBytesOut | このタスクが発行されてからの総バイト数。 | Counter | |
numBytesOutPerSecond | このタスクが発行した秒あたりのバイト数。 | Meter | |
Task/Operator | numRecordsIn | このオペレータ/タスクが受け取ったレコードの総数。 | Counter |
numRecordsInPerSecond | このオペレータ/タスクが受け取った秒あたりのレコード数。 | Meter | |
numRecordsOut | このオペレータ/タスクが発行したレコードの総数。 | Counter | |
numRecordsOutPerSecond | このオペレータ/タスクが送信した秒あたりのレコード数。 | Meter | |
numLateRecordsDropped | このオペレータ/タスクが到着の遅延のために取り零したレコードの数。 | Counter | |
オペレータ | latency | 全てのやってくるソースからの分散のレイテンシ(ミリ秒)。 | ヒストグラム |
numSplitsProcessed | このデータソースが処理されてからの総InputSplit数 (もしオペレータがデータソースの場合)。 | Gauge |
スコープ | マトリックス | 解説 | 種類 |
---|---|---|---|
オペレータ | commitsSucceeded | もしKafkaのコミットが有効にされチェックポイントが有効な場合のKafkaのオフセットのコミットの成功のカウント。 | Counter |
オペレータ | commitsFailed | もしKafkaのコミットが有効にされチェックポイントが有効な場合のKafkaのオフセットのコミットの失敗のカウント。 | Counter |
Flinkによりシステム内を移動するレコードのレイテンシを追跡することができます。レンテンシの追跡を有効にするにはlatencyTrackingInterval
(ミリ秒) がExecutionConfig
内で正の値に設定されなければなりません。
latencyTrackingInterval
で、ソースは定期的にLatencyMarker
と呼ばれる特別なレコードを発行するでしょう。マーカーはレコードがソースから発行された時間からのタイムスタンプを含みます。レイテンシのマーカーは通常のユーザレコードに追いつくことができません。従ってもしレコードがオペレータの前にキューされている場合は、マーカーによって追跡されているレイテンシに追加されるでしょう。
ユーザレコードはオペレータを迂回するためユーザレコードがオペレータ内で費やした時間をレイテンシ マーカーは計上しないことに注意してください。特にマーカーは例えばウィンドウのバッファでレコードが費やした時間を計上しません。オペレータが新しいレコードを受け付けず、従ってそれらがキューされている場合のみ、レイテンシはマーカーがそれを反映することを使って計測します。
全ての中間オペレータはレイテンシの分散を計算するために、各ソースから最近のn
個のレイテンシのリストを保持します。The sink operators keep a list from each source, and each parallel source instance to allow detecting latency issues caused by individual machines.
現在のところ、Flinkはクラスタ内の全てのマシーンのクロックが同期していることを仮定します。負のレイテンシの結果を避けるために、(NTPのような)自動化されたクロックの同期サービスをセットアップすることをお勧めします。
メトリックはMonitoring REST APIを使ってキューにすることができます。
以下はサンプルのJSON応答を使った利用可能なエンドポイントのリストです。全てのエンドポイントはhttp://hostname:8081/jobmanager/metrics
からのサンプルで、以下でURLの一部のpath のみをリスト化します。
鍵括弧内の値は変数で、例えばhttp://hostname:8081/jobs/<jobid>/metrics
は、例えばhttp://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics
としてリクエストされなければなりません。
特定のエンティティのためのメトリクスのリクエスト:
/jobmanager/metrics
/taskmanagers/<taskmanagerid>/metrics
/jobs/<jobid>/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
それぞれの型の全てのエンティティを横断して集約されたメトリクスのリクエスト:
/taskmanagers/metrics
/jobs/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
それぞれの型の全てのエンティティの部分集合上の集約されたメトリクスのリクエスト:
/taskmanagers/metrics?taskmanagers=A,B,C
/jobs/metrics?jobs=D,E,F
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
利用可能なメトリクスのリストのリクエスト:
GET /jobmanager/metrics
[
{
"id": "metric1"
},
{
"id": "metric2"
}
]
特定の(集約されていない)メトリクスの値のリクエスト:
GET taskmanagers/ABCDE/metrics?get=metric1,metric2
[
{
"id": "metric1",
"value": "34"
},
{
"id": "metric2",
"value": "2"
}
]
特定のメトリクスについての集約された値のリクエスト:
GET /taskmanagers/metrics?get=metric1,metric2
[
{
"id": "metric1",
"min": 1,
"max": 34,
"avg": 15,
"sum": 45
},
{
"id": "metric2",
"min": 2,
"max": 14,
"avg": 7,
"sum": 16
}
]
特定のメトリクスについての集約された値のリクエスト:
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[
{
"id": "metric1",
"min": 1,
"max": 34,
},
{
"id": "metric2",
"min": 2,
"max": 14,
}
]
各タスクあるいはオペレータについて集められたメトリクスはダッシュボード内で可視化することもできます。ジョブについてのメインのページ上で、Metrics
タブを選択します。上のグラフ内のタスクの1つを選択した後で、Add Metric
ドロップダウン メニューを使って表示するメトリクスを選択することができます。
<subtask_index>.<metric_name>
としてリスト化されます。<subtask_index>.<operator_name>.<metric_name>
としてリスト化されます。各メトリックはx軸が時間をy軸が計測された値を表す、個別のグラフとして可視化されるでしょう。全てのグラフは自動的に10秒ごとに更新され、他のページに移動するまでそうし続けます。
可視化されるメトリクスの数には制限がありません; しかしながら数値メトリクスのみが可視化可能です。