This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
メトリックレポーター #
Flinkを使って、外部システムにメトリクスをレポートできます。 Flinkのメトリクスシステムの詳細については、メトリクスシステムドキュメントをご覧ください。
conf/flink-conf.yaml
で1つ以上のレポーターを設定することで、外部システムにメトリクスを公開できます。これらのレポーターは、各ジョブとtask managerの起動時にインスタンス化されます。
以下は、全てのレポーターで一般的に適用されるパラメータのリストです。全てのプロパティは設定でmetrics.reporter.<reporter_name>.<property>
を設定することで設定できます。レポーターはさらに、実装固有のパラメータを提供する場合があります。これについては、それぞれのレポーターのセクションに記載されています。
Key | Default | Type | Description |
---|---|---|---|
factory.class |
(none) | String | The reporter factory class to use for the reporter named <name>. |
interval |
10 s | Duration | The reporter interval to use for the reporter named <name>. Only applicable to push-based reporters. |
scope.delimiter |
"." | String | The delimiter used to assemble the metric identifier for the reporter named <name>. |
scope.variables.additional |
Map | The map of additional variables that should be included for the reporter named <name>. Only applicable to tag-based reporters. | |
scope.variables.excludes |
"." | String | The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters. |
filter.includes |
"*:*:*" | List<String> | The metrics that should be included for the reporter named <name>. Filters are specified as a list, with each filter following this format:<scope>[:<name>[,<name>][:<type>[,<type>]]] A metric matches a filter if the scope pattern and at least one of the name patterns and at least one of the types match.
|
filter.excludes |
List<String> | The metrics that should be excluded for the reporter named <name>. The format is identical to filter.includes |
|
<parameter> |
(none) | String | Configures the parameter <parameter> for the reporter named <name>. |
全てのレポーターの設定はfactory.class
プロパティが含まれている必要があります。
一部のレポーター(Scheduled
と呼ばれる)はレポートの間隔
を指定できます。
複数のレポーターを指定するレポーター設定の例:
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes: job_id;task_attempt_num
metrics.reporter.my_jmx_reporter.scope.variables.additional: cluster_name:my_test_cluster,tag_name:tag_value
metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
重要: レポーターを含むjarは、Flinkが起動した時にアクセス可能でなければなりません。 レポーターはプラグインとしてロードされます。 このページに記載されている全てのレポーターはデフォルトで利用可能です。
org.apache.flink.metrics.reporter.MetricReporter
インタフェースを実装することで、独自のReporter
を書くことができます。
レポーターがレポートを定期的に送信する必要がある場合は、Scheduled
インタフェースも実装する必要があります。
report()
メソッドが長時間ブロックされないように注意してください。より多くの時間を必要とするレポーターは変わりにオペレーションを非同期で実行する必要があります。
MetricReporterFactory
を追加実装することで、レポーターはプラグインとしてもロードすることもできます。
識別子 vs タグ #
レポーターがメトリクスを公開する方法には、通常2つの形式があります。
識別子ベースのレポーターは、全てのスコープの情報とメトリック名を含むフラットな文字列を組み立てます。
例としては、job.MyJobName.numRestarts
があります。
一方、タグベースのレポーターは、論理スコープとメトリック名(例えば、job.numRestarts
)で構成されるメトリクスの汎用クラスを定義します。そして、上記のメトリックの特定のインスタンスを一連のkey-value
ペア、いわゆる"tags"または"variables" (例えば、“jobName=MyJobName”)としてレポートします。
Push vs. Pull #
メトリクスはプッシュまたはプルのいずれかを介して公開されます。
プッシュベースのレポーターは通常Scheduled
インタフェースを実装し、定期的に現在のメトリクスのサマリを外部システムに送信します。
プルベースのレポーターは、代わりに外部システムからクエリされます。
レポーター #
以下の章はサポートされるレポーターをリスト表示します。
JMX #
(org.apache.flink.metrics.jmx.JMXReporter) #
Type: pull/tags
パラメータ:
port
- (オプション) JMXが接続をlistenするポート。 1つのホスト上でレポーターの幾つかのインスタンスが実行できるようにするために(例えば、1つのTaskManagerがJobManagerと同じ場所に配置される)、9250-9260
のようなポート範囲を使うことをお勧めします。 範囲が指定された場合は、実際のポートが関係するジョブあるいはTaskManagerのログに現れます。 この設定が設定されている場合、Flinkは指定されたポート/範囲の特別なJMXコネクタを開始するでしょう。 メトリクスはデフォルトのローカルJMXインタフェース上で常に利用可能です。
設定例
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789
JMXを使って公開されるMetricsはドメインとキー-プロパティのリストによって識別されます。これらは共にオブジェクト名からきています。
ドメインは常に生成されたメトリックの識別子が後ろに続くorg.apache.flink
で始まります。通常の識別子と対称的に、それはスコープの形式に影響されず、変数を含まず、ジョブを横断して不変です。
そのようなドメインのための例に、org.apache.flink.job.task.numBytesOut
がありmす。
キー-プロパティ リストは、設定されたスコープ形式に関係なく、指定されたメトリックと関連する全ての変数のための値を含みます。
そのようなリストの例に、host=localhost,job_name=MyJob,task_name=MyTask
があります。
従ってドメインメトリッククラスを識別しますが、キー-プロパティリストはメトリックの1つ(あるいは複数の)インスタンスを識別します。
Graphite #
(org.apache.flink.metrics.graphite.GraphiteReporter) #
Type: push/identifier
パラメータ:
host
- Graphiteサーバホストport
- Graphiteサーバーポートprotocol
- 使うプロトコル(TCP/UDP)
設定例
metrics.reporter.grph.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
metrics.reporter.grph.interval: 60 SECONDS
InfluxDB #
(org.apache.flink.metrics.influxdb.InfluxdbReporter) #
Type: push/tags
パラメータ:
Key | Default | Type | Description |
---|---|---|---|
connectTimeout |
10000 | Integer | (optional) the InfluxDB connect timeout for metrics |
consistency |
ONE | Enum |
(optional) the InfluxDB consistency level for metrics Possible values:
|
db |
(none) | String | the InfluxDB database to store metrics |
host |
(none) | String | the InfluxDB server host |
password |
(none) | String | (optional) InfluxDB username's password used for authentication |
port |
8086 | Integer | the InfluxDB server port |
retentionPolicy |
(none) | String | (optional) the InfluxDB retention policy for metrics |
scheme |
http | Enum |
the InfluxDB schema Possible values:
|
username |
(none) | String | (optional) InfluxDB username used for authentication |
writeTimeout |
10000 | Integer | (optional) the InfluxDB write timeout for metrics |
設定例
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.scheme: http
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: flink-metrics
metrics.reporter.influxdb.password: qwerty
metrics.reporter.influxdb.retentionPolicy: one_hour
metrics.reporter.influxdb.consistency: ANY
metrics.reporter.influxdb.connectTimeout: 60000
metrics.reporter.influxdb.writeTimeout: 60000
metrics.reporter.influxdb.interval: 60 SECONDS
レポーターは、httpプロトコルを使って、指定された保持ポリシー (またはサーバで指定されたデフォルトポリシー)を使用して、InfluxDB サーバにメトリクスを送信します。 全てのFlinkメトリクス変数(全ての変数のリストを参照)は、InfluxDBタグとして公開されます。
Prometheus #
(org.apache.flink.metrics.prometheus.PrometheusReporter) #
Type: pull/tags
パラメータ:
port
- (オプション) Prometheus exporterがlistenするポート。デフォルトは9249。1つのホスト上でレポーターの幾つかのインスタンスが実行できるようにするために(例えば、1つのTaskManagerがJobManagerと同じ場所に配置される)、9250-9260
のようなポート範囲を使うことをお勧めします。filterLabelValueCharacters
- (オプション) ラベル値の文字をフィルタするかどうかを指定します。有効にすると、[a-zA-Z0-9:_]に一致しない全ての文字は削除されます。そうでない場合は、文字は削除されません。このオプションを無効にする前に、ラベルの値がPrometheusの要件を満たしていることを確認してください。
設定例
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
Flink のメトリックの型は以下のように Prometheus メトリックにマップされます:
Flink | Prometheus | 注意 |
---|---|---|
カウンター | ゲージ | Prometheus counters は減らすことができません。 |
ゲージ | ゲージ | 数値と真偽値のみがサポートされます。 |
ヒストグラム | 概要 | 変位値 .5, .75, .95, .98, .99 および .999 |
メーター | ゲージ | gaugeはメーターレートを公開します。 |
全てのFlinkメトリクス変数(全ての変数のリストを参照)は、ラベルとしてPrometheusに公開されます。
PrometheusPushGateway #
(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) #
Type: push/tags
パラメータ:
Key | Default | Type | Description |
---|---|---|---|
deleteOnShutdown |
true | Boolean | Specifies whether to delete metrics from the PushGateway on shutdown. Flink will try its best to delete the metrics but this is not guaranteed. See here for more details. |
filterLabelValueCharacters |
true | Boolean | Specifies whether to filter label value characters. If enabled, all characters not matching [a-zA-Z0-9:_] will be removed, otherwise no characters will be removed. Before disabling this option please ensure that your label values meet the Prometheus requirements. |
groupingKey |
(none) | String | Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2 . Please ensure that your grouping key meets the Prometheus requirements. |
hostUrl |
(none) | String | The PushGateway server host URL including scheme, host name, and port. |
jobName |
(none) | String | The job name under which metrics will be pushed |
randomJobNameSuffix |
true | Boolean | Specifies whether a random suffix should be appended to the job name. |
設定例
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS
PrometheusPushGatewayReporter はメトリクスをPushgatewayにプッシュします。それらは Prometheus によって掬い取られます。
ユースケースについては、Prometheus documentation を参照してください。
StatsD #
(org.apache.flink.metrics.statsd.StatsDReporter) #
Type: push/identifier
パラメータ:
host
- StatsDサーバホストport
- StatsDサーバポート
設定例
metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
metrics.reporter.stsd.interval: 60 SECONDS
Datadog #
(org.apache.flink.metrics.datadog.DatadogHttpReporter) #
Type: push/tags
<host>
、<job_name>
、<tm_id>
、<subtask_index>
、<task_name>
、<operator_name>
のようなFlinkのメトリクス内の全ての変数はタグとしてDatadogに送信されます。タグはhost:localhost
とjob_name:myjobname
のように見えるでしょう。
注意 legacy reasonsでレポーターはメトリック識別子_と_タグの_両方_を使います。この冗長性はuseLogicalIdentifier
を有効にすることで回避できます。
注意 ヒストグラムは、Datadogヒストグラムの命名規則(<metric_name>.<aggregation>
)に従うことで、一連のgaugesとして公開されます。
min
集計はデフォルトで報告されますが、sum
は使えません。
Datadogが提供するヒストグラムと対照的に、レポートされる集計は特定のレポート間隔で計算されません。
パラメータ:
apikey
- Datadog APIキーproxyHost
- (オプション) Datadogに送信する時に使うプロキシのホストproxyPort
- (オプション) Datadogに送信する時に使うプロキシのポート。デフォルトは8080。dataCenter
- (オプション) 接続するデータセンター(EU
/US
)。デフォルトはUS
。maxMetricsPerRequest
- (オプション) 各リクエストに含むメトリクスの最大数。デフォルトは2000。useLogicalIdentifier
-> (オプション) レポーターが論理メトリック識別子を使うかどうか。デフォルトはfalse
。
設定例
metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.proxyHost: my.web.proxy.com
metrics.reporter.dghttp.proxyPort: 8080
metrics.reporter.dghttp.dataCenter: US
metrics.reporter.dghttp.maxMetricsPerRequest: 2000
metrics.reporter.dghttp.interval: 60 SECONDS
metrics.reporter.dghttp.useLogicalIdentifier: true
Slf4j #
(org.apache.flink.metrics.slf4j.Slf4jReporter) #
Type: push/identifier
設定例
metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
metrics.reporter.slf4j.interval: 60 SECONDS