重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

マトリックス

Flink exposes a metric system that allows gathering and exposing metrics to external systems.

Registering metrics

getRuntimeContext().getMetricGroup()を呼び出すことで、RichFunction を継承した全てのユーザ関数からメトリック システムにアクセスすることができます。このメソッドは新しいメトリックを生成および登録できるMetricGroup オブジェクトを返します。

Metric types

CountersGauges およびHistogramsをサポートします。

Counter

Counter は何かの数を数えるために使われます。現在値はinc()/inc(long n) または dec()/dec(long n)を使って、増減あるいは加減することができます。MetricGroup上でcounter(String name) を呼ぶことでCounterを生成および登録することができます。

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @public Integer map(String value) throws Exception {
    this.counter.inc();
  }
}

別のやり方として、独自の Counter実装を使うこともできます。

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }
}

Gauge

Gauge はその場で任意の型の値を提供します。Gaugeを使うには、まずorg.apache.flink.metrics.Gaugeインタフェースを実装するクラスを生成する必要があります。帰り値の型についての制限はありません。MetricGroup上でgauge(String name, Gauge gauge)を呼び出すことで Gaugeを登録することができます。

public class MyMapper extends RichMapFunction<String, Integer> {
  private int valueToExpose;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }
}

レポーターは公開されたオブジェクトを Stringに変化します。このことは意味のあるtoString() 実装が必要とされることを意味します。

ヒストグラム

Histogram は長い値の分散を測定します。MetricGroup上でhistogram(String name, Histogram histogram) を呼び出すことで登録することができます。

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @public Integer map(Long value) throws Exception {
    this.histogram.update(value);
  }
}

Flink はHistogramのためのデフォルトの実装を提供しませんが、Codahale/DropWizard ヒストグラムを使うことができるラッパーを提供します。このラッパーを使用するには、pom.xmlの中に以下の依存を追加します:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.1-SNAPSHOT</version>
</dependency>

そして、以下のように Codahale/DropWizard ヒストグラムを登録することができます。

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram histogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropWizardHistogramWrapper(histogram));
  }
}

スコープ

Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope. 例えば、A.B がシステムスコープ、C.D がユーザスコープ、そしてE が名前の場合、メトリックのための識別子はA.B.C.D.Eになるでしょう。

conf/flink-conf.yamlの中のmetrics.scope.delimiterキーを設定することで、識別子のためのデリミタ(デフォルト: .) を使うかを設定することができます。

ユーザ スコープ

MetricGroup#addGroup(String name) あるいは MetricGroup#addGroup(int name)のどちらかを呼び出すことで、ユーザスコープを定義することができます。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

システム スコープ

システムスコープはメトリックについてのコンテキスト情報を含みます。例えば、どのタスク内で登録されたか、あるいはタスクが何のジョブに所属しているか。

conf/flink-conf.yaml内に以下のキーを設定することで、どのコンテキスト情報が含まれなければならないかを設定することができます。これらのキーのそれぞれは実行時に置き換えられる定数(例えば"taskmanager") および変数(例えば"<task_id>")を含むかも知れない文字列形式を期待します。

  • metrics.scope.jm
    • デフォルト: <host>.jobmanager
    • ジョブマネージャーにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.jm.job
    • デフォルト: <host>.jobmanager.<job_name>
    • ジョブマネージャーとジョブにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm
    • デフォルト: <host>.taskmanager.<tm_id>
    • タスクマネージャーにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm.job
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>
    • タスクマネージャーおよびジョブにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm.task
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    • タスクにスコープされていた全てのメトリックに適用されます。
  • metrics.scope.tm.operator
    • デフォルト: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    • オペレータにスコープされていた全てのメトリックに適用されます。

変数の数あるいは順番に制限はありません。変数は大文字小文字を区別します。

オペレータ メトリックのためのデフォルトのスコープは、結果的にlocalhost.taskmanager.1234.MyJob.MyOperator.0.MyMetricへの識別子の類似になるでしょう。

タスク名を含めたいがタスクマネージャーの情報を省略したい場合は、以下の形式を指定することもできます:

metrics.scope.tm.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

これは、識別子 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric を生成するかも知れません。

Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.

全ての変数のリスト

  • JobManager: <host>
  • TaskManager: <host>, <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_name>, <subtask_index>

Reporter

conf/flink-conf.yaml内の1つ以上のレポーターを設定することで、外部死すt無にメトリクスを公開することができます。

  • metrics.reporters: 名前付きのレポーターのリスト
  • metrics.reporter.<name>.<config>: <name>という名前のレポーターのための一般的な設定<config>
  • metrics.reporter.<name>.class: <name>という名前のレポーターが使うレポータークラス。
  • metrics.reporter.<name>.interval: <name>という名前のレポーターが使うレポーター間隔。

All reporters must at least have the class property, some allow specifying a reporting interval. 以下で、各レポーターに固有のもっと多くの設定をリスト化する予定です:

複数のレポーターを指定するレポーター設定の例:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

org.apache.flink.metrics.reporter.MetricReporterインタフェースを実装することで、独自のReporterを書くことができます。レポーターが定期的にレポートを送信しなければならない場合、Scheduled インタフェースも実装しなければなりません。

以下の章はサポートされるレポーターをリスト表示します。

JMX (org.apache.flink.metrics.jmx.JMXReporter)

JMXレポーターはデフォルトで利用可能ですが有効では無いため、追加の依存を含める必要はありません。

パラメータ:

  • port - JMVが接続のためにlistenするポート。これはポート範囲でも可能です。範囲が指定された場合は、実際のポートが関係するジョブあるいはタスクマネージャーのログに現れます。ポートを指定しない場合は、特別なJMXサーバは開始されないでしょう。デフォルトのローカルJMXインタフェース上でメトリックがまだ利用可能です。

Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)

依存:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-ganglia</artifactId>
      <version>1.1-SNAPSHOT</version>
</dependency>

パラメータ:

  • host - gmond.conf内のudp_recv_channel.bind の元で設定されたgmondホストアドレス
  • port - gmond.conf内のudp_recv_channel.port の元で設定されたgmondポート
  • tmax - 古いメトリックがどれだけ維持されなければならないかのソフトリミット
  • dmax - 古いメトリックがどれだけ維持されなければならないかのハードリミット
  • ttl - time-to-live for transmitted UDP packets
  • addressingMode - 使用するUDPアドレスモード (UNICAST/MULTICAST)

Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)

依存:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-graphite</artifactId>
      <version>1.1-SNAPSHOT</version>
</dependency>

パラメータ:

  • host - Graphite サーバのホスト
  • port - Graphite サーバのポート

StatsD (org.apache.flink.metrics.statsd.StatsDReporter)

依存:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-statsd</artifactId>
      <version>1.1-SNAPSHOT</version>
</dependency>

パラメータ:

  • host - StatsD サーバのホスト
  • port - StatsD サーバのポート

System metrics

Flink は以下のシステム メトリックを公開します:

上に戻る
スコープ マトリックス 解説
JobManager
TaskManager.Status.JVM ClassLoader.ClassesLoaded JVMが開始してからのロードされたクラスの総数。
ClassLoader.ClassesUnloaded JVMが開始してからのアンロードされたクラスの総数。
GargabeCollector.<garbageCollector>.Count 発生したコレクションの総数。
GargabeCollector.<garbageCollector>.Time ガベージコレクションを実施するために使われた総時間。
Memory.Heap.Used 現在使われているヒープメモリの総量。
Memory.Heap.Committed JVMに利用可能だと保証されているヒープメモリの総量。
Memory.Heap.Max メモリ管理のために使うことができるヒープメモリの最大総量。
Memory.NonHeap.Used 現在使われている非ヒープメモリの総量。
Memory.NonHeap.Committed JVMに利用可能だと保証されている非ヒープメモリの総量。
Memory.NonHeap.Max メモリ管理のために使うことができる非ヒープメモリの最大総量。
Memory.Direct.Count 直接のバッファプール内のバッファ数。
Memory.Direct.MemoryUsed 直接のバッファプールのためのJVMによって使われているメモリ総量。
Memory.Direct.TotalCapacity 直接のバッファプール内の全てのバッファの総許容量。
Memory.Mapped.Count マップされたバッファプール内のバッファの数。
Memory.Mapped.MemoryUsed マップされたバッファプールのためのJVMによって使われているメモリ総量。
Memory.Mapped.TotalCapacity マップされたバッファプール内のバッファの数。
Threads.Count 有効なスレッドの総数。
CPU.Load JVMの最新のCPU使用率。
CPU.Time JVMによって使われたCPU時間。
ジョブ
タスク currentLowWatermark タスクが受信した最低のwatermark。
lastCheckpointDuration 最後のチェックポイントを完了するためにかかった時間。
lastCheckpointSize 最後のチェックポイントの総サイズ。
restartingTime ジョブを再起動するために掛かる時間。
numBytesInLocal このタスクがローカルソースから読み込んだ総バイト数。
numBytesInRemote このタスクがリモートソースから読み込んだ総バイト数。
numBytesOut このタスクが発行されてからの総バイト数。
オペレータ numRecordsIn このオペレータが受信した総レコード数。
numRecordsOut このオペレーションが発行されてからの総レコード数。
numSplitsProcessed このデータソースが処理されてからの総InputSplit数。
TOP
inserted by FC2 system