マトリックス

Flink exposes a metric system that allows gathering and exposing metrics to external systems.

メトリクスの登録

getRuntimeContext().getMetricGroup()を呼び出すことで、RichFunction を継承した全てのユーザ関数からメトリック システムにアクセスすることができます。このメソッドは新しいメトリックを生成および登録できるMetricGroup オブジェクトを返します。

メトリックの型

Flink はCounters, Gauges, Histograms および Metersをサポートします。

Counter

Counter は何かの数を数えるために使われます。現在値はinc()/inc(long n) または dec()/dec(long n)を使って、増減あるいは加減することができます。MetricGroup上でcounter(String name) を呼ぶことでCounterを生成および登録することができます。

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}
class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

別のやり方として、独自の Counter実装を使うこともできます。

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}
class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter())
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

Gauge

Gauge はその場で任意の型の値を提供します。Gaugeを使うには、まずorg.apache.flink.metrics.Gaugeインタフェースを実装するクラスを生成する必要があります。帰り値の型についての制限はありません。MetricGroup上でgauge(String name, Gauge gauge)を呼び出すことで Gaugeを登録することができます。

public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}
new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
    valueToExpose += 1
    value
  }
}

レポーターは公開されたオブジェクトを Stringに変化します。このことは意味のあるtoString() 実装が必要とされることを意味します。

ヒストグラム

Histogram は長い値の分散を測定します。MetricGroup上でhistogram(String name, Histogram histogram) を呼び出すことで登録することができます。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
    this.histogram.update(value);
    return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var histogram: Histogram

  override def open(parameters: Configuration): Unit = {
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram())
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Flink はHistogramのためのデフォルトの実装を提供しませんが、Codahale/DropWizard ヒストグラムを使うことができるラッパーを提供します。このラッパーを使用するには、pom.xmlの中に以下の依存を追加します:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.5-SNAPSHOT</version>
</dependency>

そして、以下のように Codahale/DropWizard ヒストグラムを登録することができます。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
  }
  
  @Override
  public Long map(Long value) throws Exception {
    this.histogram.update(value);
    return value;
  }
}
class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
        
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }
  
  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Meter

A Meter measures an average throughput. An occurrence of an event can be registered with the markEvent() method. Occurrence of multiple events at the same time can be registered with markEvent(long n) method. You can register a meter by calling meter(String name, Meter meter) on a MetricGroup.

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
    this.meter.markEvent();
    return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter

  override def open(config: Configuration): Unit = {
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

Flink はCodahale/DropWizard meters の利用ができる Wrapper を提供します。このラッパーを使うには、以下の依存をpom.xmlに追加してください:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.5-SNAPSHOT</version>
</dependency>

以下のように Codahale/DropWizard meter を登録することができます:

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
  }

  @Override
  public Long map(Long value) throws Exception {
    this.meter.markEvent();
    return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()
  
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

スコープ

各メトリックは識別子とメトリックが報告されるキー-値のペアのセットが割り当てられます。

識別子は3つのコンポーネントに基づきます: メトリックを登録する時のユーザ定義名、任意のユーザ定義のスコープ、およびシステムが提供するスコープ。例えば、もしA.B がシステムスコープだhであれば、C.Dはユーザスコープで、Eはその名前です。そして、メトリックの識別子はA.B.C.D.Eでしょう。

conf/flink-conf.yamlの中のmetrics.scope.delimiterキーを設定することで、識別子のためのデリミタ(デフォルト: .) を使うかを設定することができます。

ユーザ スコープ

MetricGroup#addGroup(String name), MetricGroup#addGroup(int name) あるいは Metric#addGroup(String key, String value)を呼び出すことで、ユーザスコープを定義することができます。これらのメソッドはMetricGroup#getMetricIdentifier および MetricGroup#getScopeComponents が何を返すかに影響します。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter")

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

システム スコープ

システムスコープはメトリックについてのコンテキスト情報を含みます。例えば、どのタスク内で登録されたか、あるいはタスクが何のジョブに所属しているか。

conf/flink-conf.yaml内に以下のキーを設定することで、どのコンテキスト情報が含まれなければならないかを設定することができます。これらのキーのそれぞれは実行時に置き換えられる定数(例えば"taskmanager") および変数(例えば"<task_id>")を含むかも知れない文字列形式を期待します。

  • metrics.scope.jm
    • デフォルト: <host>.jobmanager
    • ジョブマネージャーにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.jm.job
    • デフォルト: <host>.jobmanager.<job_name>
    • ジョブマネージャーとジョブにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm
    • デフォルト: <host>.taskmanager.<tm_id>
    • タスクマネージャーにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm.job
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>
    • タスクマネージャーおよびジョブにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.task
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    • タスクにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.operator
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    • オペレータにスコープされていた全てのメトリックに適用されます。

変数の数あるいは順番に制限はありません。変数は大文字小文字を区別します。

オペレータ メトリックのためのデフォルトのスコープは、結果的にlocalhost.taskmanager.1234.MyJob.MyOperator.0.MyMetricへの識別子の類似になるでしょう。

タスク名を含めたいがタスクマネージャーの情報を省略したい場合は、以下の形式を指定することもできます:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

これは、識別子 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric を生成するかも知れません。

Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. そのように、ジョブおよびオペレータにID (例えば <job_id>)を含めるか、ユニークな名前を割り当てることでユニークさをある程度提供するような形式の文字列を使うことをお勧めします。

全ての変数のリスト

  • JobManager: <host>
  • TaskManager: <host>, <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_id>,<operator_name>, <subtask_index>

重要: バッチAPIについては、<operator_id> は常に <task_id> に等しいです。

ユーザ変数

MetricGroup#addGroup(String key, String value)を呼び出すことでユーザ変数を定義することができます。このメソッドはMetricGroup#getMetricIdentifier, MetricGroup#getScopeComponents および MetricGroup#getAllVariables()が何を返すかに影響します。

重要: ユーザ変数はスコープの形式内で使うことができません。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

Reporter

conf/flink-conf.yaml内の1つ以上のレポーターを設定することで、外部死すt無にメトリクスを公開することができます。これらのレポートは開始時に各ジョブおよびタスクマネージャ上でインスタンス化されるでしょう。

  • metrics.reporter.<name>.<config>: <name>という名前のレポーターのための一般的な設定<config>
  • metrics.reporter.<name>.class: <name>という名前のレポーターが使うレポータークラス。
  • metrics.reporter.<name>.interval: <name>という名前のレポーターが使うレポーター間隔。
  • metrics.reporter.<name>.scope.delimiter: <name>という名前のレポーターのための識別子に使われるデリミタ(デフォルトの値は metrics.scope.delimiterを使います)。
  • metrics.reporters: (任意) レポーター名のカンマ区切りのインクルード リスト。デフォルトでは全ての設定されたレポーターが使われるでしょう。

全てのレポーターは少なくともclass プロパティを持つ必要があります。幾つかはレポートの間隔を指定することができます。以下で、各レポーターに固有のもっと多くの設定をリスト化する予定です:

複数のレポーターを指定するレポーター設定の例:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

重要: レポーターを含むjarはFlinkが開始した時にそれを/libフォルダ内に置くことでアクセス可能でなければなりません。

org.apache.flink.metrics.reporter.MetricReporterインタフェースを実装することで、独自のReporterを書くことができます。レポーターが定期的にレポートを送信しなければならない場合、Scheduled インタフェースも実装しなければなりません。

以下の章はサポートされるレポーターをリスト表示します。

JMX (org.apache.flink.metrics.jmx.JMXReporter)

JMXレポーターはデフォルトで利用可能ですが有効では無いため、追加の依存を含める必要はありません。

パラメータ:

  • port - (任意) 接続のためにJMXがlistenするポート。これはポート範囲でも可能です。範囲が指定された場合は、実際のポートが関係するジョブあるいはタスクマネージャーのログに現れます。この設定が設定されている場合、Flinkは指定されたポート/範囲の特別なJMXコネクタを開始するでしょう。メトリクスはデフォルトのローカルJMXインタフェース上で常に利用可能です。

設定例

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

JMXを使って公開されるMetricsはドメインとキー-プロパティのリストによって識別されます。これらは共にオブジェクト名からきています。

ドメインは常に生成されたメトリックの識別子が後ろに続くorg.apache.flinkで始まります。通常の識別子と対称的に、それはスコープの形式に影響されず、変数を含まず、ジョブを横断して不変です。そのようなドメインのための例はorg.apache.flink.job.task.numBytesOutでしょう。

キー-プロパティ リストは、設定されたスコープ形式に関係なく、指定されたメトリックと関連する全ての変数のための値を含みます。そのようなリストの例はhost=localhost,job_name=MyJob,task_name=MyTaskでしょう。

従ってドメインメトリッククラスを識別しますが、キー-プロパティリストはメトリックの1つ(あるいは複数の)インスタンスを識別します。

Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)

このレポーターを使うには、/opt/flink-metrics-ganglia-1.5-SNAPSHOT.jarをFlink配布物の/lib フォルダにコピーする必要があります。

パラメータ:

  • host - gmond.conf内のudp_recv_channel.bind の元で設定されたgmondホストアドレス
  • port - gmond.conf内のudp_recv_channel.port の元で設定されたgmondポート
  • tmax - 古いメトリックがどれだけ維持されなければならないかのソフトリミット
  • dmax - 古いメトリックがどれだけ維持されなければならないかのハードリミット
  • ttl - 転送されたUDPパケットのためのtime-to-live
  • addressingMode - 使用するUDPアドレスモード (UNICAST/MULTICAST)

設定例

metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)

このレポーターを使うには、/opt/flink-metrics-graphite-1.5-SNAPSHOT.jarをFlink配布物の/lib フォルダにコピーする必要があります。

パラメータ:

  • host - Graphite サーバのホスト
  • port - Graphite サーバのポート
  • protocol - 使用するプロトコル (TCP/UDP)

設定例

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)

このレポーターを使うには、/opt/flink-metrics-prometheus-1.5-SNAPSHOT.jarをFlink配布物の/lib フォルダにコピーする必要があります。

パラメータ:

  • port - (任意) Prometheus exporter がlistenするポート、デフォルトは9249。1つのホスト上でレポーターの幾つかのインスタンスが実行できるようにするために(例えば、1つのタスクマネージャがジョブマネージャと同じ場所に配置される)、9250-9260のようなポート範囲を使うことをお勧めします。

設定例

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink のメトリックの型は以下のように Prometheus メトリックにマップされます:

Flink Prometheus 注意
Counter Gauge Prometheus counters は減らすことができません。
Gauge Gauge 数値と真偽値のみがサポートされます。
ヒストグラム 概要 変位値 .5, .75, .95, .98, .99 および .999
Meter Gauge gauge はメーターのレートを出力します。

全てのFlinkのメトリクス変数(全ての変数のリストを見てください) はラベルとして Prometheus に出力されます。

StatsD (org.apache.flink.metrics.statsd.StatsDReporter)

このレポーターを使うには、/opt/flink-metrics-statsd-1.5-SNAPSHOT.jarをFlink配布物の/lib フォルダにコピーする必要があります。

パラメータ:

  • host - StatsD サーバのホスト
  • port - StatsD サーバのポート

設定例

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)

このレポーターを使うには、/opt/flink-metrics-datadog-1.5-SNAPSHOT.jarをFlink配布物の/lib フォルダにコピーする必要があります。

<host>, <job_name>, <tm_id>, <subtask_index>, <task_name>および <operator_name>のようなFlinkのメトリクス内の全ての変数はタグとしてDatadogに送信されるでしょう。タグはhost:localhost およびjob_name:myjobnameのように見えるでしょう。

パラメータ:

  • apikey - Datadog API キー
  • tags - (任意) Datadogに送信する時にメトリクスに適用されるグローバルタグ。タグはカンマでのみ区切られるべきです。

設定例

metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod

Slf4j (org.apache.flink.metrics.slf4j.Slf4jReporter)

このレポーターを使うには、/opt/flink-metrics-slf4j-1.5-SNAPSHOT.jarをFlink配布物の/lib フォルダにコピーする必要があります。

設定例

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS

システム メトリクス

デフォルトでは、Flinkは現在の状態の深い洞察を提供する幾つかのメトリクスを集めます。この章はこれら全てのメトリクスのリファレンスです。

以下の表は一般的に5つのカラムを特色とします:

  • “Scope” カラムはどのスコープ形式がシステムのスコープを生成するために使われるかを記述します。例えば、もしセルが “Operator” を含む場合、“metrics.scope.operator” のためのスコープ形式が使われます。If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.

  • (任意の)”Infix” カラムはどのinfixがシステムスコープに追加されるかを記述します。

  • “Metrics” カラムは指定されたスコープとinfixについて登録された全てのメトリクスの名前をリスト化します。

  • “Description” カラムは指定されたカラムが何について計測しているかの情報を提供します。

  • “Type” カラムはどのメトリック型が計測のために使われるかを記述します。

infix/metric 名のカラム内の全てのドットはまだ “metrics.delimiter” 設定の支配を受けることに注意してください。

従って、メトリックの識別子を推測するには:

  1. “Scope” カラムに基づいたスコープ形式を取る
  2. Append the value in the “Infix” column if present, and account for the “metrics.delimiter” setting
  3. メトリック名を追加する。

CPU

スコープ Infix マトリックス 解説 種類
Job-/TaskManager Status.JVM.CPU Load JVMの最近のCPU使用率。 Gauge
時間 JVMによって使われたCPU時間。 Gauge

メモリ

スコープ Infix マトリックス 解説 種類
Job-/TaskManager Status.JVM.Memory Heap.Used 現在使われているヒープメモリの量(バイト)。 Gauge
Heap.Committed JVMに利用可能だと保証されているヒープメモリの量(バイト)。 Gauge
Heap.Max メモリ管理のために使うことができるヒープメモリの最大量(バイト) Gauge
NonHeap.Used 現在使われている非ヒープメモリの量(バイト)。 Gauge
NonHeap.Committed JVMに利用可能だと保証されている非ヒープメモリの量(バイト)。 Gauge
NonHeap.Max メモリ管理のために使うことができる非ヒープメモリの最大量(バイト) Gauge
Direct.Count 直接のバッファプール内のバッファ数。 Gauge
Direct.MemoryUsed 直接のバッファプールのためのJVMによって使われているメモリ量(バイト)。 Gauge
Direct.TotalCapacity 直接のバッファプール内の全てのバッファの総許容量(バイト)。 Gauge
Mapped.Count マップされたバッファプール内のバッファの数。 Gauge
Mapped.MemoryUsed マップされたバッファプールのためのJVMによって使われているメモリ量(バイト)。 Gauge
Mapped.TotalCapacity マップされたバッファプール内のバッファの数(バイト)。 Gauge

Threads

スコープ Infix マトリックス 解説 種類
Job-/TaskManager Status.JVM.ClassLoader Threads.Count 有効なスレッドの総数。 Gauge

GarbageCollection

スコープ Infix マトリックス 解説 種類
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 発生したコレクションの総数。 Gauge
<GarbageCollector>.Time ガベージコレクションを実施するために使われた総時間。 Gauge

ClassLoader

スコープ Infix マトリックス 解説 種類
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded JVMが開始してからのロードされたクラスの総数。 Gauge
ClassesUnloaded JVMが開始してからのアンロードされたクラスの総数。 Gauge

ネットワーク

スコープ Infix マトリックス 解説 種類
TaskManager Status.Network AvailableMemorySegments 使用されていないメモリのセグメントの数。 Gauge
TotalMemorySegments 割り当てられたメモリのセグメントの数。 Gauge
タスク buffers inputQueueLength キューされた入力バッファの数。 Gauge
outputQueueLength キューされた出力バッファの数。 Gauge
inPoolUsage 入力バッファの使用率の推定。 Gauge
outPoolUsage 出力バッファの使用率の推定。 Gauge
Network.<Input|Output>.<gate>
taskmanager.net.detailed-metrics の設定オプションが設定されている場合のみ利用可能です)
totalQueueLen 全ての入力/出力チャネルの中でキューされたバッファの総数 Gauge
minQueueLen 全ての入力/出力チャネルの中でキューされたバッファの最小数。 Gauge
maxQueueLen 全ての入力/出力チャネルの中でキューされたバッファの最大数。 Gauge
avgQueueLen 全ての入力/出力のチャネルの中でキューされたバッファの平均数。 Gauge

クラスタ

スコープ マトリックス 解説 種類
JobManager numRegisteredTaskManagers 登録れたタスク マネージャの数。 Gauge
numRunningJobs 実行中のジョブの数。 Gauge
taskSlotsAvailable 利用可能なタスク スロットの数。 Gauge
taskSlotsTotal タスクのスロットの総数。 Gauge

可用性

スコープ マトリックス 解説 種類
Job (ジョブマネージャでのみ利用可能) restartingTime ジョブを再開した時間、あるいは現在の再起動が進行中になってからどれだけ経ったか (ミリ病)。 Gauge
uptime ジョブが妨害無しに実行されている時間。

ジョブが完了した場合は -1 を返します。

Gauge
downtime 現在失敗/回復状態にあるジョブについて、この停止期間の間に経過した時間。

実行中のジョブについては0、完了したジョブについては -1 を返します (ミリ秒)。

Gauge
fullRestarts ジョブがサブミットされてから全ての再起動の総数 (ミリ秒)。 Gauge

チェックポイント

スコープ マトリックス 解説 種類
Job (ジョブマネージャでのみ利用可能) lastCheckpointDuration 最後のチェックポイントを完了するためにかかった時間 (ミリ秒) Gauge
lastCheckpointSize 最後のチェックポイントの総サイズ。 Gauge
lastCheckpointExternalPath 最後の外部チェックポイントが格納されたパス。 Gauge
lastCheckpointRestoreTimestamp 最後のチェックポイントがコーディネーターで回復された時のタイムスタンプ (ミリ秒)。 Gauge
lastCheckpointAlignmentBuffered 最後のチェックポイントについて全てのサブタスク上の割り当ての間にバッファされたバイトの数 (バイト)。 Gauge
numberOfInProgressCheckpoints 進行中のチェックポイントの数。 Gauge
numberOfCompletedCheckpoints チェックポイントが完了した数。 Gauge
numberOfFailedCheckpoints 失敗したチェックポイントの数。 Gauge
totalNumberOfCheckpoints チェックポイントの総数 (進行中、完了、失敗)。 Gauge
タスク checkpointAlignmentTime 最後の境界の割り当てが完了するまで掛かった時間のナノ秒数、あるいは現在の割り当てが始まってからどれくらい経ったか(ナノ秒)。 Gauge

IO

スコープ マトリックス 解説 種類
タスク currentLowWatermark このタスクが受信した最低のウォーターマーク(ミリ秒)。 Gauge
numBytesInLocal このタスクがローカルソースから読み込んだ総バイト数。 Counter
numBytesInLocalPerSecond このタスクがローカルのソースから読み込んだ秒あたりのバイト数。 Meter
numBytesInRemote このタスクがリモートソースから読み込んだ総バイト数。 Counter
numBytesInRemotePerSecond このタスクがリモートのソースから読み込んだ秒あたりのバイト数。 Meter
numBytesOut このタスクが発行されてからの総バイト数。 Counter
numBytesOutPerSecond このタスクが発行した秒あたりのバイト数。 Meter
Task/Operator numRecordsIn このオペレータ/タスクが受け取ったレコードの総数。 Counter
numRecordsInPerSecond このオペレータ/タスクが受け取った秒あたりのレコード数。 Meter
numRecordsOut このオペレータ/タスクが発行したレコードの総数。 Counter
numRecordsOutPerSecond このオペレータ/タスクが送信した秒あたりのレコード数。 Meter
numLateRecordsDropped このオペレータ/タスクが到着の遅延のために取り零したレコードの数。 Counter
オペレータ latency 全てのやってくるソースからの分散のレイテンシ(ミリ秒)。 ヒストグラム
numSplitsProcessed このデータソースが処理されてからの総InputSplit数 (もしオペレータがデータソースの場合)。 Gauge

コネクタ

Kafka コネクタ

スコープ マトリックス 解説 種類
オペレータ commitsSucceeded もしKafkaのコミットが有効にされチェックポイントが有効な場合のKafkaのオフセットのコミットの成功のカウント。 Counter
オペレータ commitsFailed もしKafkaのコミットが有効にされチェックポイントが有効な場合のKafkaのオフセットのコミットの失敗のカウント。 Counter

レイテンシの追跡

Flinkによりシステム内を移動するレコードのレイテンシを追跡することができます。レンテンシの追跡を有効にするにはlatencyTrackingInterval (ミリ秒) がExecutionConfig内で正の値に設定されなければなりません。

latencyTrackingIntervalで、ソースは定期的にLatencyMarkerと呼ばれる特別なレコードを発行するでしょう。マーカーはレコードがソースから発行された時間からのタイムスタンプを含みます。レイテンシのマーカーは通常のユーザレコードに追いつくことができません。従ってもしレコードがオペレータの前にキューされている場合は、マーカーによって追跡されているレイテンシに追加されるでしょう。

ユーザレコードはオペレータを迂回するためユーザレコードがオペレータ内で費やした時間をレイテンシ マーカーは計上しないことに注意してください。特にマーカーは例えばウィンドウのバッファでレコードが費やした時間を計上しません。オペレータが新しいレコードを受け付けず、従ってそれらがキューされている場合のみ、レイテンシはマーカーがそれを反映することを使って計測します。

全ての中間オペレータはレイテンシの分散を計算するために、各ソースから最近のn個のレイテンシのリストを保持します。The sink operators keep a list from each source, and each parallel source instance to allow detecting latency issues caused by individual machines.

現在のところ、Flinkはクラスタ内の全てのマシーンのクロックが同期していることを仮定します。負のレイテンシの結果を避けるために、(NTPのような)自動化されたクロックの同期サービスをセットアップすることをお勧めします。

REST API の統合

メトリックはMonitoring REST APIを使ってキューにすることができます。

以下はサンプルのJSON応答を使った利用可能なエンドポイントのリストです。全てのエンドポイントはhttp://hostname:8081/jobmanager/metricsからのサンプルで、以下でURLの一部のpath のみをリスト化します。

鍵括弧内の値は変数で、例えばhttp://hostname:8081/jobs/<jobid>/metrics は、例えばhttp://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metricsとしてリクエストされなければなりません。

特定のエンティティのためのメトリクスのリクエスト:

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

それぞれの型の全てのエンティティを横断して集約されたメトリクスのリクエスト:

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics

それぞれの型の全てのエンティティの部分集合上の集約されたメトリクスのリクエスト:

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

利用可能なメトリクスのリストのリクエスト:

GET /jobmanager/metrics

[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

特定の(集約されていない)メトリクスの値のリクエスト:

GET taskmanagers/ABCDE/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

特定のメトリクスについての集約された値のリクエスト:

GET /taskmanagers/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

特定のメトリクスについての集約された値のリクエスト:

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
  }
]

ダッシュボードの統合

各タスクあるいはオペレータについて集められたメトリクスはダッシュボード内で可視化することもできます。ジョブについてのメインのページ上で、Metrics タブを選択します。上のグラフ内のタスクの1つを選択した後で、Add Metricドロップダウン メニューを使って表示するメトリクスを選択することができます。

  • タスクのメトリクスは<subtask_index>.<metric_name>としてリスト化されます。
  • オペレータのメトリクスは<subtask_index>.<operator_name>.<metric_name>としてリスト化されます。

各メトリックはx軸が時間をy軸が計測された値を表す、個別のグラフとして可視化されるでしょう。全てのグラフは自動的に10秒ごとに更新され、他のページに移動するまでそうし続けます。

可視化されるメトリクスの数には制限がありません; しかしながら数値メトリクスのみが可視化可能です。

上に戻る

TOP
inserted by FC2 system