マトリックス

Flink exposes a metric system that allows gathering and exposing metrics to external systems.

Registering metrics

getRuntimeContext().getMetricGroup()を呼び出すことで、RichFunction を継承した全てのユーザ関数からメトリック システムにアクセスすることができます。このメソッドは新しいメトリックを生成および登録できるMetricGroup オブジェクトを返します。

Metric types

Flink supports Counters, Gauges, Histograms and Meters.

Counter

Counter は何かの数を数えるために使われます。現在値はinc()/inc(long n) または dec()/dec(long n)を使って、増減あるいは加減することができます。MetricGroup上でcounter(String name) を呼ぶことでCounterを生成および登録することができます。

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @public Integer map(String value) throws Exception {
    this.counter.inc();
  }
}

別のやり方として、独自の Counter実装を使うこともできます。

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }
}

Gauge

Gauge はその場で任意の型の値を提供します。Gaugeを使うには、まずorg.apache.flink.metrics.Gaugeインタフェースを実装するクラスを生成する必要があります。帰り値の型についての制限はありません。MetricGroup上でgauge(String name, Gauge gauge)を呼び出すことで Gaugeを登録することができます。

public class MyMapper extends RichMapFunction<String, Integer> {
  private int valueToExpose;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }
}
public class MyMapper extends RichMapFunction[String,Int] {
  val valueToExpose = 5

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }
  ...
}

レポーターは公開されたオブジェクトを Stringに変化します。このことは意味のあるtoString() 実装が必要とされることを意味します。

ヒストグラム

Histogram は長い値の分散を測定します。MetricGroup上でhistogram(String name, Histogram histogram) を呼び出すことで登録することができます。

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @public Integer map(Long value) throws Exception {
    this.histogram.update(value);
  }
}

Flink はHistogramのためのデフォルトの実装を提供しませんが、Codahale/DropWizard ヒストグラムを使うことができるラッパーを提供します。このラッパーを使用するには、pom.xmlの中に以下の依存を追加します:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.3-SNAPSHOT</version>
</dependency>

そして、以下のように Codahale/DropWizard ヒストグラムを登録することができます。

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram histogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropWizardHistogramWrapper(histogram));
  }
}

Meter

A Meter measures an average throughput. An occurrence of an event can be registered with the markEvent() method. Occurrence of multiple events at the same time can be registered with markEvent(long n) method. You can register a meter by calling meter(String name, Meter meter) on a MetricGroup.

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @public Integer map(Long value) throws Exception {
    this.meter.markEvent();
  }
}

Flink offers a Wrapper that allows usage of Codahale/DropWizard meters. To use this wrapper add the following dependency in your pom.xml:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.3-SNAPSHOT</version>
</dependency>

You can then register a Codahale/DropWizard meter like this:

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Meter meter;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();

    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropWizardMeterWrapper(meter));
  }
}

スコープ

Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope. 例えば、A.B がシステムスコープ、C.D がユーザスコープ、そしてE が名前の場合、メトリックのための識別子はA.B.C.D.Eになるでしょう。

conf/flink-conf.yamlの中のmetrics.scope.delimiterキーを設定することで、識別子のためのデリミタ(デフォルト: .) を使うかを設定することができます。

ユーザ スコープ

MetricGroup#addGroup(String name) あるいは MetricGroup#addGroup(int name)のどちらかを呼び出すことで、ユーザスコープを定義することができます。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

システム スコープ

システムスコープはメトリックについてのコンテキスト情報を含みます。例えば、どのタスク内で登録されたか、あるいはタスクが何のジョブに所属しているか。

conf/flink-conf.yaml内に以下のキーを設定することで、どのコンテキスト情報が含まれなければならないかを設定することができます。これらのキーのそれぞれは実行時に置き換えられる定数(例えば"taskmanager") および変数(例えば"<task_id>")を含むかも知れない文字列形式を期待します。

  • metrics.scope.jm
    • デフォルト: <host>.jobmanager
    • ジョブマネージャーにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.jm.job
    • デフォルト: <host>.jobmanager.<job_name>
    • ジョブマネージャーとジョブにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm
    • デフォルト: <host>.taskmanager.<tm_id>
    • タスクマネージャーにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm.job
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>
    • タスクマネージャーおよびジョブにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.task
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    • タスクにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.operator
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    • オペレータにスコープされていた全てのメトリックに適用されます。

変数の数あるいは順番に制限はありません。変数は大文字小文字を区別します。

オペレータ メトリックのためのデフォルトのスコープは、結果的にlocalhost.taskmanager.1234.MyJob.MyOperator.0.MyMetricへの識別子の類似になるでしょう。

タスク名を含めたいがタスクマネージャーの情報を省略したい場合は、以下の形式を指定することもできます:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

これは、識別子 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric を生成するかも知れません。

Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.

全ての変数のリスト

  • JobManager: <host>
  • TaskManager: <host>, <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_name>, <subtask_index>

Reporter

conf/flink-conf.yaml内の1つ以上のレポーターを設定することで、外部死すt無にメトリクスを公開することができます。These reporters will be instantiated on each job and task manager when they are started.

  • metrics.reporters: 名前付きのレポーターのリスト
  • metrics.reporter.<name>.<config>: <name>という名前のレポーターのための一般的な設定<config>
  • metrics.reporter.<name>.class: <name>という名前のレポーターが使うレポータークラス。
  • metrics.reporter.<name>.interval: <name>という名前のレポーターが使うレポーター間隔。
  • metrics.reporter.<name>.scope.delimiter: The delimiter to use for the identifier (default value use metrics.scope.delimiter) for the reporter named <name>.

All reporters must at least have the class property, some allow specifying a reporting interval. 以下で、各レポーターに固有のもっと多くの設定をリスト化する予定です:

複数のレポーターを指定するレポーター設定の例:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

Important: The jar containing the reporter must be accessible when Flink is started by placing it in the /lib folder.

org.apache.flink.metrics.reporter.MetricReporterインタフェースを実装することで、独自のReporterを書くことができます。レポーターが定期的にレポートを送信しなければならない場合、Scheduled インタフェースも実装しなければなりません。

以下の章はサポートされるレポーターをリスト表示します。

JMX (org.apache.flink.metrics.jmx.JMXReporter)

JMXレポーターはデフォルトで利用可能ですが有効では無いため、追加の依存を含める必要はありません。

パラメータ:

  • port - (optional) the port on which JMX listens for connections. これはポート範囲でも可能です。範囲が指定された場合は、実際のポートが関係するジョブあるいはタスクマネージャーのログに現れます。If this setting is set Flink will start an extra JMX connector for the given port/range. Metrics are always available on the default local JMX interface.

設定例

metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

Metrics exposed through JMX are identified by a domain and a list of key-properties, which together form the object name.

The domain always begins with org.apache.flink followed by a generalized metric identifier. In contrast to the usual identifier it is not affected by scope-formats, does not contain any variables and is constant across jobs. An example for such a domain would be org.apache.flink.job.task.numBytesOut.

The key-property list contains the values for all variables, regardless of configured scope formats, that are associated with a given metric. An example for such a list would be host=localhost,job_name=MyJob,task_name=MyTask.

The domain thus identifies a metric class, while the key-property list identifies one (or multiple) instances of that metric.

Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)

In order to use this reporter you must copy /opt/flink-metrics-ganglia-1.3-SNAPSHOT.jar into the /lib folder of your Flink distribution.

パラメータ:

  • host - gmond.conf内のudp_recv_channel.bind の元で設定されたgmondホストアドレス
  • port - gmond.conf内のudp_recv_channel.port の元で設定されたgmondポート
  • tmax - 古いメトリックがどれだけ維持されなければならないかのソフトリミット
  • dmax - 古いメトリックがどれだけ維持されなければならないかのハードリミット
  • ttl - time-to-live for transmitted UDP packets
  • addressingMode - 使用するUDPアドレスモード (UNICAST/MULTICAST)

設定例

metrics.reporters: gang
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)

In order to use this reporter you must copy /opt/flink-metrics-graphite-1.3-SNAPSHOT.jar into the /lib folder of your Flink distribution.

パラメータ:

  • host - Graphite サーバのホスト
  • port - Graphite サーバのポート
  • protocol - protocol to use (TCP/UDP)

設定例

metrics.reporters: grph
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

StatsD (org.apache.flink.metrics.statsd.StatsDReporter)

In order to use this reporter you must copy /opt/flink-metrics-statsd-1.3-SNAPSHOT.jar into the /lib folder of your Flink distribution.

パラメータ:

  • host - StatsD サーバのホスト
  • port - StatsD サーバのポート

設定例

metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

System metrics

By default Flink gathers several metrics that provide deep insights on the current state. This section is a reference of all these metrics.

The tables below generally feature 4 columns:

  • The “Scope” column describes which scope format is used to generate the system scope. For example, if the cell contains “Operator” then the scope format for “metrics.scope.operator” is used. If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.

  • The (optional)”Infix” column describes which infix is appended to the system scope.

  • The “Metrics” column lists the names of all metrics that are registered for the given scope and infix.

  • The “Description” column provides information as to what a given metric is measuring.

Note that all dots in the infix/metric name columns are still subject to the “metrics.delimiter” setting.

Thus, in order to infer the metric identifier:

  1. Take the scope-format based on the “Scope” column
  2. Append the value in the “Infix” column if present, and account for the “metrics.delimiter” setting
  3. Append metric name.

CPU:

スコープ Infix マトリックス 解説
Job-/TaskManager Status.JVM.CPU Load The recent CPU usage of the JVM.
時間 JVMによって使われたCPU時間。

Memory:

スコープ Infix マトリックス 解説
Job-/TaskManager Status.JVM.Memory Memory.Heap.Used 現在使われているヒープメモリの総量。
Heap.Committed JVMに利用可能だと保証されているヒープメモリの総量。
Heap.Max メモリ管理のために使うことができるヒープメモリの最大総量。
NonHeap.Used 現在使われている非ヒープメモリの総量。
NonHeap.Committed JVMに利用可能だと保証されている非ヒープメモリの総量。
NonHeap.Max メモリ管理のために使うことができる非ヒープメモリの最大総量。
Direct.Count 直接のバッファプール内のバッファ数。
Direct.MemoryUsed 直接のバッファプールのためのJVMによって使われているメモリ総量。
Direct.TotalCapacity 直接のバッファプール内の全てのバッファの総許容量。
Mapped.Count マップされたバッファプール内のバッファの数。
Mapped.MemoryUsed マップされたバッファプールのためのJVMによって使われているメモリ総量。
Mapped.TotalCapacity マップされたバッファプール内のバッファの数。

Threads:

スコープ Infix マトリックス 解説
Job-/TaskManager Status.JVM.ClassLoader Threads.Count 有効なスレッドの総数。

GarbageCollection:

スコープ Infix マトリックス 解説
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 発生したコレクションの総数。
<GarbageCollector>.Time ガベージコレクションを実施するために使われた総時間。

ClassLoader:

スコープ Infix マトリックス 解説
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded JVMが開始してからのロードされたクラスの総数。
ClassesUnloaded JVMが開始してからのアンロードされたクラスの総数。

Network:

スコープ Infix マトリックス 解説
TaskManager Status.Network AvailableMemorySegments The number of unused memory segments.
TotalMemorySegments The number of allocated memory segments.
タスク buffers inputQueueLength The number of queued input buffers.
outputQueueLength The number of queued output buffers.
inPoolUsage An estimate of the input buffers usage.
outPoolUsage An estimate of the output buffers usage.

Cluster:

スコープ マトリックス 解説
JobManager numRegisteredTaskManagers The number of registered taskmanagers.
numRunningJobs The number of running jobs.
taskSlotsAvailable The number of available task slots.
taskSlotsTotal The total number of task slots.

Checkpointing:

スコープ マトリックス 解説
Job (only available on JobManager) lastCheckpointDuration 最後のチェックポイントを完了するためにかかった時間。
lastCheckpointSize 最後のチェックポイントの総サイズ。
lastCheckpointExternalPath The path where the last checkpoint was stored.
タスク checkpointAlignmentTime The time in nanoseconds that the last barrier alignment took to complete, or how long the current alignment has taken so far.

IO:

スコープ マトリックス 解説
タスク currentLowWatermark The lowest watermark this task has received.
numBytesInLocal このタスクがローカルソースから読み込んだ総バイト数。
numBytesInLocalPerSecond The number of bytes this task reads from a local source per second.
numBytesInRemote このタスクがリモートソースから読み込んだ総バイト数。
numBytesInRemotePerSecond The number of bytes this task reads from a remote source per second.
numBytesOut このタスクが発行されてからの総バイト数。
numBytesOutPerSecond The number of bytes this task emits per second.
Task/Operator numRecordsIn The total number of records this operator/task has received.
numRecordsInPerSecond The number of records this operator/task receives per second.
numRecordsOut The total number of records this operator/task has emitted.
numRecordsOutPerSecond The number of records this operator/task sends per second.
オペレータ latency The latency distributions from all incoming sources.
numSplitsProcessed The total number of InputSplits this data source has processed (if the operator is a data source).

Latency tracking

Flink allows to track the latency of records traveling through the system. To enable the latency tracking a latencyTrackingInterval (in milliseconds) has to be set to a positive value in the ExecutionConfig.

At the latencyTrackingInterval, the sources will periodically emit a special record, called a LatencyMarker. The marker contains a timestamp from the time when the record has been emitted at the sources. Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator, it will add to the latency tracked by the marker.

Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.

All intermediate operators keep a list of the last n latencies from each source to compute a latency distribution. The sink operators keep a list from each source, and each parallel source instance to allow detecting latency issues caused by individual machines.

Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.

Dashboard integration

Metrics that were gathered for each task or operator can also be visualized in the Dashboard. On the main page for a job, select the Metrics tab. After selecting one of the tasks in the top graph you can select metrics to display using the Add Metric drop-down menu.

  • Task metrics are listed as <subtask_index>.<metric_name>.
  • Operator metrics are listed as <subtask_index>.<operator_name>.<metric_name>.

Each metric will be visualized as a separate graph, with the x-axis representing time and the y-axis the measured value. All graphs are automatically updated every 10 seconds, and continue to do so when navigating to another page.

There is no limit as to the number of visualized metrics; however only numeric metrics can be visualized.

上に戻る

TOP
inserted by FC2 system