Metric Reporters
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

メトリックレポーター #

Flinkを使って、外部システムにメトリクスをレポートできます。 Flinkのメトリクスシステムの詳細については、メトリクスシステムドキュメントをご覧ください。

conf/flink-conf.yamlで1つ以上のレポーターを設定することで、外部システムにメトリクスを公開できます。これらのレポーターは、各ジョブとtask managerの起動時にインスタンス化されます。

以下は、全てのレポーターで一般的に適用されるパラメータのリストです。全てのプロパティは設定でmetrics.reporter.<reporter_name>.<property>を設定することで設定できます。レポーターはさらに、実装固有のパラメータを提供する場合があります。これについては、それぞれのレポーターのセクションに記載されています。

Key Default Type Description
factory.class
(none) String The reporter factory class to use for the reporter named <name>.
interval
10 s Duration The reporter interval to use for the reporter named <name>. Only applicable to push-based reporters.
scope.delimiter
"." String The delimiter used to assemble the metric identifier for the reporter named <name>.
scope.variables.additional
Map The map of additional variables that should be included for the reporter named <name>. Only applicable to tag-based reporters.
scope.variables.excludes
"." String The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters.
filter.includes
"*:*:*" List<String> The metrics that should be included for the reporter named <name>. Filters are specified as a list, with each filter following this format:
<scope>[:<name>[,<name>][:<type>[,<type>]]]
A metric matches a filter if the scope pattern and at least one of the name patterns and at least one of the types match.
  • scope: Filters based on the logical scope.
    Specified as a pattern where * matches any sequence of characters and . separates scope components.

    For example:
    "jobmanager.job" matches any job-related metrics on the JobManager,
    "*.job" matches all job-related metrics and
    "*.job.*" matches all metrics below the job-level (i.e., task/operator metrics etc.).

  • name: Filters based on the metric name.
    Specified as a comma-separate list of patterns where * matches any sequence of characters.

    For example, "*Records*,*Bytes*" matches any metrics where the name contains "Records" or "Bytes".

  • type: Filters based on the metric type. Specified as a comma-separated list of metric types: [counter, meter, gauge, histogram]
Examples:
  • "*:numRecords*" Matches metrics like numRecordsIn.
  • "*.job.task.operator:numRecords*" Matches metrics like numRecordsIn on the operator level.
  • "*.job.task.operator:numRecords*:meter" Matches meter metrics like numRecordsInPerSecond on the operator level.
  • "*:numRecords*,numBytes*:counter,meter" Matches all counter/meter metrics like or numRecordsInPerSecond.
filter.excludes
List<String> The metrics that should be excluded for the reporter named <name>. The format is identical to filter.includes
<parameter>
(none) String Configures the parameter <parameter> for the reporter named <name>.

全てのレポーターの設定はfactory.classプロパティが含まれている必要があります。 一部のレポーター(Scheduledと呼ばれる)はレポートの間隔を指定できます。

複数のレポーターを指定するレポーター設定の例:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes: job_id;task_attempt_num
metrics.reporter.my_jmx_reporter.scope.variables.additional: cluster_name:my_test_cluster,tag_name:tag_value

metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

重要: レポーターを含むjarは、Flinkが起動した時にアクセス可能でなければなりません。 レポーターはプラグインとしてロードされます。 このページに記載されている全てのレポーターはデフォルトで利用可能です。

org.apache.flink.metrics.reporter.MetricReporterインタフェースを実装することで、独自のReporterを書くことができます。 レポーターがレポートを定期的に送信する必要がある場合は、Scheduledインタフェースも実装する必要があります。 report()メソッドが長時間ブロックされないように注意してください。より多くの時間を必要とするレポーターは変わりにオペレーションを非同期で実行する必要があります。 MetricReporterFactoryを追加実装することで、レポーターはプラグインとしてもロードすることもできます。

識別子 vs タグ #

レポーターがメトリクスを公開する方法には、通常2つの形式があります。

識別子ベースのレポーターは、全てのスコープの情報とメトリック名を含むフラットな文字列を組み立てます。 例としては、job.MyJobName.numRestartsがあります。

一方、タグベースのレポーターは、論理スコープとメトリック名(例えば、job.numRestarts)で構成されるメトリクスの汎用クラスを定義します。そして、上記のメトリックの特定のインスタンスを一連のkey-valueペア、いわゆる"tags"または"variables" (例えば、“jobName=MyJobName”)としてレポートします。

Push vs. Pull #

メトリクスはプッシュまたはプルのいずれかを介して公開されます。

プッシュベースのレポーターは通常Scheduledインタフェースを実装し、定期的に現在のメトリクスのサマリを外部システムに送信します。

プルベースのレポーターは、代わりに外部システムからクエリされます。

レポーター #

以下の章はサポートされるレポーターをリスト表示します。

JMX #

(org.apache.flink.metrics.jmx.JMXReporter) #

Type: pull/tags

パラメータ:

  • port - (オプション) JMXが接続をlistenするポート。 1つのホスト上でレポーターの幾つかのインスタンスが実行できるようにするために(例えば、1つのTaskManagerがJobManagerと同じ場所に配置される)、9250-9260のようなポート範囲を使うことをお勧めします。 範囲が指定された場合は、実際のポートが関係するジョブあるいはTaskManagerのログに現れます。 この設定が設定されている場合、Flinkは指定されたポート/範囲の特別なJMXコネクタを開始するでしょう。 メトリクスはデフォルトのローカルJMXインタフェース上で常に利用可能です。

設定例

metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789

JMXを使って公開されるMetricsはドメインとキー-プロパティのリストによって識別されます。これらは共にオブジェクト名からきています。

ドメインは常に生成されたメトリックの識別子が後ろに続くorg.apache.flinkで始まります。通常の識別子と対称的に、それはスコープの形式に影響されず、変数を含まず、ジョブを横断して不変です。 そのようなドメインのための例に、org.apache.flink.job.task.numBytesOutがありmす。

キー-プロパティ リストは、設定されたスコープ形式に関係なく、指定されたメトリックと関連する全ての変数のための値を含みます。 そのようなリストの例に、host=localhost,job_name=MyJob,task_name=MyTaskがあります。

従ってドメインメトリッククラスを識別しますが、キー-プロパティリストはメトリックの1つ(あるいは複数の)インスタンスを識別します。

Graphite #

(org.apache.flink.metrics.graphite.GraphiteReporter) #

Type: push/identifier

パラメータ:

  • host - Graphiteサーバホスト
  • port - Graphiteサーバーポート
  • protocol - 使うプロトコル(TCP/UDP)

設定例

metrics.reporter.grph.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
metrics.reporter.grph.interval: 60 SECONDS

InfluxDB #

(org.apache.flink.metrics.influxdb.InfluxdbReporter) #

Type: push/tags

パラメータ:

Key Default Type Description
connectTimeout
10000 Integer (optional) the InfluxDB connect timeout for metrics
consistency
ONE

Enum

(optional) the InfluxDB consistency level for metrics

Possible values:
  • "ALL"
  • "ANY"
  • "ONE"
  • "QUORUM"
db
(none) String the InfluxDB database to store metrics
host
(none) String the InfluxDB server host
password
(none) String (optional) InfluxDB username's password used for authentication
port
8086 Integer the InfluxDB server port
retentionPolicy
(none) String (optional) the InfluxDB retention policy for metrics
scheme
http

Enum

the InfluxDB schema

Possible values:
  • "http"
  • "https"
username
(none) String (optional) InfluxDB username used for authentication
writeTimeout
10000 Integer (optional) the InfluxDB write timeout for metrics

設定例

metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.scheme: http
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: flink-metrics
metrics.reporter.influxdb.password: qwerty
metrics.reporter.influxdb.retentionPolicy: one_hour
metrics.reporter.influxdb.consistency: ANY
metrics.reporter.influxdb.connectTimeout: 60000
metrics.reporter.influxdb.writeTimeout: 60000
metrics.reporter.influxdb.interval: 60 SECONDS

レポーターは、httpプロトコルを使って、指定された保持ポリシー (またはサーバで指定されたデフォルトポリシー)を使用して、InfluxDB サーバにメトリクスを送信します。 全てのFlinkメトリクス変数(全ての変数のリストを参照)は、InfluxDBタグとして公開されます。

Prometheus #

(org.apache.flink.metrics.prometheus.PrometheusReporter) #

Type: pull/tags

パラメータ:

  • port - (オプション) Prometheus exporterがlistenするポート。デフォルトは9249。1つのホスト上でレポーターの幾つかのインスタンスが実行できるようにするために(例えば、1つのTaskManagerがJobManagerと同じ場所に配置される)、9250-9260のようなポート範囲を使うことをお勧めします。
  • filterLabelValueCharacters - (オプション) ラベル値の文字をフィルタするかどうかを指定します。有効にすると、[a-zA-Z0-9:_]に一致しない全ての文字は削除されます。そうでない場合は、文字は削除されません。このオプションを無効にする前に、ラベルの値がPrometheusの要件を満たしていることを確認してください。

設定例

metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory

Flink のメトリックの型は以下のように Prometheus メトリックにマップされます:

Flink Prometheus 注意
カウンター ゲージ Prometheus counters は減らすことができません。
ゲージ ゲージ 数値と真偽値のみがサポートされます。
ヒストグラム 概要 変位値 .5, .75, .95, .98, .99 および .999
メーター ゲージ gaugeはメーターレートを公開します。

全てのFlinkメトリクス変数(全ての変数のリストを参照)は、ラベルとしてPrometheusに公開されます。

PrometheusPushGateway #

(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) #

Type: push/tags

パラメータ:

Key Default Type Description
deleteOnShutdown
true Boolean Specifies whether to delete metrics from the PushGateway on shutdown. Flink will try its best to delete the metrics but this is not guaranteed. See here for more details.
filterLabelValueCharacters
true Boolean Specifies whether to filter label value characters. If enabled, all characters not matching [a-zA-Z0-9:_] will be removed, otherwise no characters will be removed. Before disabling this option please ensure that your label values meet the Prometheus requirements.
groupingKey
(none) String Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the Prometheus requirements.
hostUrl
(none) String The PushGateway server host URL including scheme, host name, and port.
jobName
(none) String The job name under which metrics will be pushed
randomJobNameSuffix
true Boolean Specifies whether a random suffix should be appended to the job name.

設定例

metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS

PrometheusPushGatewayReporter はメトリクスをPushgatewayにプッシュします。それらは Prometheus によって掬い取られます。

ユースケースについては、Prometheus documentation を参照してください。

StatsD #

(org.apache.flink.metrics.statsd.StatsDReporter) #

Type: push/identifier

パラメータ:

  • host - StatsDサーバホスト
  • port - StatsDサーバポート

設定例

metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
metrics.reporter.stsd.interval: 60 SECONDS

Datadog #

(org.apache.flink.metrics.datadog.DatadogHttpReporter) #

Type: push/tags

<host><job_name><tm_id><subtask_index><task_name><operator_name>のようなFlinkのメトリクス内の全ての変数はタグとしてDatadogに送信されます。タグはhost:localhostjob_name:myjobnameのように見えるでしょう。

注意 legacy reasonsでレポーターはメトリック識別子_と_タグの_両方_を使います。この冗長性はuseLogicalIdentifierを有効にすることで回避できます。

注意 ヒストグラムは、Datadogヒストグラムの命名規則(<metric_name>.<aggregation>)に従うことで、一連のgaugesとして公開されます。 min集計はデフォルトで報告されますが、sumは使えません。 Datadogが提供するヒストグラムと対照的に、レポートされる集計は特定のレポート間隔で計算されません。

パラメータ:

  • apikey - Datadog APIキー
  • proxyHost - (オプション) Datadogに送信する時に使うプロキシのホスト
  • proxyPort - (オプション) Datadogに送信する時に使うプロキシのポート。デフォルトは8080。
  • dataCenter - (オプション) 接続するデータセンター(EU/US)。デフォルトはUS
  • maxMetricsPerRequest - (オプション) 各リクエストに含むメトリクスの最大数。デフォルトは2000。
  • useLogicalIdentifier -> (オプション) レポーターが論理メトリック識別子を使うかどうか。デフォルトはfalse

設定例

metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.proxyHost: my.web.proxy.com
metrics.reporter.dghttp.proxyPort: 8080
metrics.reporter.dghttp.dataCenter: US
metrics.reporter.dghttp.maxMetricsPerRequest: 2000
metrics.reporter.dghttp.interval: 60 SECONDS
metrics.reporter.dghttp.useLogicalIdentifier: true

Slf4j #

(org.apache.flink.metrics.slf4j.Slf4jReporter) #

Type: push/identifier

設定例

metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
metrics.reporter.slf4j.interval: 60 SECONDS

Back to top

inserted by FC2 system