Flinkは実行中のジョブと最近完了したジョブの状態と統計をクエリするために使うことができる監視APIを持っています。この監視APIはFlink独自のダッシュボードによって使われますが、独自の監視ツールによっても使えるように設計されています。
監視APIはHTTP GETリクエストと受け付け、JSONデータを使って応答するREST-ful APIです。
監視APIはJobManagerの一部として実行するwebサーバによって支援されます。デフォルトでは、このサーバは8081
をlistenします。これはjobmanager.web.port
を使ってflink-conf.yaml
の中で設定することができます。監視API webサーバとwebダッシュボードwebサーバは現在のところ同じもので従って同じポートで一緒に動作します。しかし、それらは異なるHTTP URLで応答します。
複数のジョブマネージャー(高可用性のため)の場合、各ジョブマネージャーは監視APIの独自のインスタンスを実行するでしょう。ジョブマネージャーがクラスタリーダーとして選択されている間、これは完了および実行中のジョブについての情報を提供します。
REST API のバックエンドはflink-runtime-web
プロジェクトの中にあります。コアのクラスは org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
で、サーバとリクエストのルーティングをセットアップします。
RESTリクエストを処理しURLを変換するために、Netty と Netty Router ライブラリを使います。この組み合わせは軽い依存を持ち、Netty HTTPのパフォーマンスがとても良いため、この選択がされました。
新しいリクエストを追加するには、新しいrequest ハンドラ クラスを追加する必要があります。このことを見るための良い例が org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler
にあります。ハンドラを作成した後で、ハンドラはorg.apache.flink.runtime.webmonitor.WebRuntimeMonitor
の中でリクエスト ルータと一緒に登録される必要があります。
以下は利用可能なリクエストのリストと例のJSON応答です。All requests are of the sample form http://hostname:8081/jobs
, below we list only the path part of the URLs.
角括弧内の値は変数です。例えばhttp://hostname:8081/jobs/<jobid>/exceptions
は、例えば http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions
としてリクエストされなければならないでしょう。
/config
/overview
/jobs
/joboverview/running
/joboverview/completed
/jobs/<jobid>
/jobs/<jobid>/vertices
/jobs/<jobid>/config
/jobs/<jobid>/exceptions
/jobs/<jobid>/accumulators
/jobs/<jobid>/vertices/<vertexid>
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
/jobs/<jobid>/vertices/<vertexid>/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
/jobs/<jobid>/plan
/jars/upload
/jars
/jars/:jarid
/jars/:jarid/plan
/jars/:jarid/run
/config
監視APIとサーバセットアップについてのいくつかの情報
結果の例:
{
"refresh-interval": 3000,
"timezone-offset": 3600000,
"timezone-name": "Central European Time",
"flink-version": "1.3-SNAPSHOT",
"flink-revision": "8124545 @ 16.09.2015 @ 15:38:42 CEST"
}
/overview
Flinkクラスター状態の簡単な概要
結果の例:
{
"taskmanagers": 17,
"slots-total": 68,
"slots-available": 68,
"jobs-running": 0,
"jobs-finished": 3,
"jobs-cancelled": 1,
"jobs-failed": 0
}
/jobs
ステータスrunning, finished, failed, canceledによってグループ化された、ジョブのID
結果の例:
{
"jobs-running": [],
"jobs-finished": ["7684be6004e4e955c2a558a9bc463f65","49306f94d0920216b636e8dd503a6409"],
"jobs-cancelled":[],
"jobs-failed":[]
}
/joboverview
ステータスによってグループ化され、それぞれがステータスの小さな概要を持つ、ジョブ
結果の例:
{
"running":[],
"finished":[
{
"jid": "7684be6004e4e955c2a558a9bc463f65",
"name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
"state": "FINISHED",
"start-time": 1442419702857,
"end-time": 1442419975312,
"duration":272455,
"last-modification": 1442419975312,
"tasks": {
"total": 6,
"pending": 0,
"running": 0,
"finished": 6,
"canceling": 0,
"canceled": 0,
"failed": 0
}
},
{
"jid": "49306f94d0920216b636e8dd503a6409",
"name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
...
}]
}
/joboverview/running
ステータスによってグループ化され、それぞれがステータスの小さな概要を持つ、ジョブ/joboverview
と同じですが、現在実行中のジョブのみを含みます。
/joboverview/completed
ステータスによってグループ化され、それぞれがステータスの小さな概要を持つ、ジョブ/joboverview
と同じですが、完了した(finished, canceled, あるいは failed) ジョブのみを含みます。
/jobs/<jobid>
一つのジョブについての概要。データフロープランのリスト、ステータス、状態遷移のタイムスタンプ、各頂点(オペレータ)のための集約情報。
結果の例:
{
"jid": "ab78dcdbb1db025539e30217ec54ee16",
"name": "WordCount Example",
"state":"FINISHED",
"start-time":1442421277536,
"end-time":1442421299791,
"duration":22255,
"now":1442421991768,
"timestamps": {
"CREATED": 1442421277536, "RUNNING": 1442421277609, "FAILING": 0, "FAILED": 0, "CANCELLING": 0, "CANCELED": 0, "FINISHED": 1442421299791, "RESTARTING": 0
},
"vertices": [ {
"id": "19b5b24062c48a06e4eac65422ac3317",
"name": "CHAIN DataSource (at getTextDataSet(WordCount.java:142) ...",
"parallelism": 2,
"status": "FINISHED",
"start-time": 1442421277609,
"end-time": 1442421299469,
"duration": 21860,
"tasks": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 2, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
},
"metrics": {
"read-bytes": 0, "write-bytes": 37098, "read-records": 0, "write-records": 3312
}
}, {
"id": "f00c89b349b5c998cfd9fe2a06e50fd0",
"name":"Reduce (SUM(1), at main(WordCount.java:67)",
"parallelism": 2,
....
}, {
"id": "0a36cbc29102d7bc993d0a9bf23afa12",
"name": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: ))",
...
} ],
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 3, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
},
"plan": {
// see plan details below
}
}
/jobs/<jobid>/vertices
現在のところ /jobs/<jobid>
と同じ
/jobs/<jobid>/config
ジョブによって使われるユーザ定義の実行設定
結果の例:
{
"jid": "ab78dcdbb1db025539e30217ec54ee16",
"name": "WordCount Example",
"execution-config": {
"execution-mode": "PIPELINED",
"restart-strategy": "Restart deactivated",
"job-parallelism": -1,
"object-reuse-mode": false
}
}
/jobs/<jobid>/exceptions
ジョブによって観測された復元不可能な例外The truncated
flag defines whether more exceptions occurred, but are not listed, because the response would otherwise get too big.
結果の例:
{
"root-exception": "java.io.IOException: File already exists:/tmp/abzs/2\n\tat org.apache.flink.core.fs.local.LocalFileSystem. ...",
"all-exceptions": [ {
"exception": "java.io.IOException: File already exists:/tmp/abzs/1\n\tat org.apache.flink...",
"task": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: )) (1/2)",
"location": "localhost:49220"
}, {
"exception": "java.io.IOException: File already exists:/tmp/abzs/2\n\tat org.apache.flink...",
"task": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: )) (2/2)",
"location": "localhost:49220"
} ],
"truncated":false
}
/jobs/<jobid>/accumulators
The aggregated user accumulators plus job accumulators.
結果の例:
{
"job-accumulators":[],
"user-task-accumulators": [ {
"name": "avglen",
"type": "DoubleCounter",
"value": "DoubleCounter 61.5162972"
},
{
"name": "genwords",
"type": "LongCounter",
"value": "LongCounter 37500000"
} ]
}
/jobs/<jobid>/vertices/<vertexid>
一つの特定の頂点についての情報。そのサブタスクのそれぞれについての概要を持つ。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"name": "CHAIN DataSource -> Map -> FlatMap -> Combine(SUM(1))",
"parallelism": 2,
"now": 1442424002154,
"subtasks": [ {
"subtask":0,
"status": "FINISHED",
"attempt": 0,
"host": "localhost",
"start-time": 1442421093762,
"end-time": 1442421386680,
"duration": 292918,
"metrics": {
"read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
}
}, {
"subtask": 1,
"status": "FINISHED",
"attempt": 0,
"host": "localhost",
"start-time": 1442421093774,
"end-time": 1442421386267,
"duration": 292493,
"metrics": {
"read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
このリクエストは指定された頂点の全てのサブタスクの状態遷移についてタイムスタンプを返します。これらは例えばサブタスク間でタイムラインの比較を生成するために使うことができます。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"name": "CHAIN DataSource -> Map -> Combine(SUM(1))",
"now":1442423745088,
"subtasks": [ {
"subtask": 0,
"host": "localhost",
"duration": 292924,
"timestamps": {
"CREATED": 1442421093741, "SCHEDULED": 1442421093756, "DEPLOYING": 1442421093762, "RUNNING": 1442421094026, "FINISHED": 1442421386680, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
}, {
"subtask": 1,
"host": "localhost",
"duration": 292494,
"timestamps": {
"CREATED": 1442421093741, "SCHEDULED": 1442421093773, "DEPLOYING": 1442421093774, "RUNNING": 1442421094013, "FINISHED": 1442421386267, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
一つの特定の頂点のためのタスクマネージャーの統計。これは/jobs/<jobid>/vertices/<vertexid>
によって返されるサブタスクの統計の集約です。
結果の例:
{
"id": "fe20bcc29b87cdc76589ca42114c2499",
"name": "Reduce (SUM(1), at main(WordCount.java:72)",
"now": 1454348282653,
"taskmanagers": [ {
"host": "ip-10-0-43-227:35413",
"status": "FINISHED",
"start-time": 1454347870991,
"end-time": 1454347872111,
"duration": 1120,
"metrics": {
"read-bytes": 32503056, "write-bytes": 9637041, "read-records": 2906087, "write-records": 849467
},
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
},{
"host": "ip-10-0-43-227:41486",
"status": "FINISHED",
"start-time": 1454347871001,
"end-time": 1454347872395,
"duration": 1394,
"metrics": {
"read-bytes": 32389499, "write-bytes": 9608829, "read-records": 2895999, "write-records": 846948
},
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/accumulators
特定の頂点についての集約されたユーザ定義のアキュムレータ。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"user-accumulators": [ {
"name": "avglen", "type": "DoubleCounter", "value": "DoubleCounter 123.03259440000001"
}, {
"name": "genwords", "type": "LongCounter", "value": "LongCounter 75000000"
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
指定された頂点の全てのサブタスクについての全てのユーザ定義のアキュムレータを取得する。/jobs/<jobid>/vertices/<vertexid>/accumulators
リクエストによって集約された形式で返される個々のアキュムレータです。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"parallelism": 2,
"subtasks": [ {
"subtask": 0,
"attempt": 0,
"host": "localhost",
"user-accumulators": [ {
"name": "genwords", "type": "LongCounter", "value": "LongCounter 62500000"
}, {
"name": "genletters", "type": "LongCounter", "value": "LongCounter 1281589525"
} ]
}, {
"subtask": 1,
"attempt": 0,
"host": "localhost",
"user-accumulators": [ {
"name": "genwords", "type": "LongCounter", "value": "LongCounter 12500000"
}, {
"name": "genletters", "type": "LongCounter", "value": "LongCounter 256317905"
} ]
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
特定のサブタスクの現在あるいは最新の実行試行の概要。例は以下を見てください。
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
特定のサブタスクの特定の実行試行の概要。 failure/recoveryの場合は複数の実行試行が起こり得ます。
結果の例:
{
"subtask": 0,
"status": "FINISHED",
"attempt": 0,
"host": "localhost",
"start-time": 1442421093762,
"end-time": 1442421386680,
"duration": 292918,
"metrics": {
"read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
}
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
一つの特定の実行試行の間の一つの特定のサブタスクについて集められたアキュームレータ(failure/recoveryの場合は複数の試行が起こります)。
結果の例:
{
"subtask": 0,
"attempt": 0,
"id": "b22f94d91bf41ddb",
"user-accumulators": [ {
"name": "genwords", "type":"LongCounter", "value":"LongCounter 62500000"
}, {
"name": "genletters", "type": "LongCounter", "value": "LongCounter 1281589525"
}, {
"name": "avglen", "type": "DoubleCounter", "value": "DoubleCounter 102.527162"
} ]
}
/jobs/<jobid>/plan
ジョブのデータフロー計画。計画はジョブの概要(/jobs/<jobid>
)にも含まれます。
結果の例:
{
"jid":"ab78dcdbb1db025539e30217ec54ee16",
"name":"WordCount Example",
"nodes": [ {
"id": "f00c89b349b5c998cfd9fe2a06e50fd0",
"parallelism": 2,
"operator": "GroupReduce",
"operator_strategy": "Sorted Group Reduce",
"description": "Reduce (SUM(1), at main(WordCount.java:67)",
"inputs": [ {
"num": 0,
"id":"19b5b24062c48a06e4eac65422ac3317",
"ship_strategy": "Hash Partition on [0]",
"local_strategy":"Sort (combining) on [0:ASC]",
"exchange":"pipelined"
} ],
"optimizer_properties": {
"global_properties": [
{ "name":"Partitioning", "value":"HASH_PARTITIONED" },
{ "name":"Partitioned on", "value":"[0]" },
{ "name":"Partitioning Order", "value":"(none)" },
{ "name":"Uniqueness", "value":"not unique" }
],
"local_properties": [
{ "name":"Order", "value":"[0:ASC]" },
{ "name":"Grouped on", "value":"[0]" },
{ "name":"Uniqueness", "value":"not unique" }
],
"estimates": [
{ "name":"Est. Output Size", "value":"(unknown)" },
{ "name":"Est. Cardinality", "value":"(unknown)" }
],
"costs": [
{ "name":"Network", "value":"(unknown)" },
{ "name":"Disk I/O", "value":"(unknown)" },
{ "name":"CPU", "value":"(unknown)" },
{ "name":"Cumulative Network", "value":"(unknown)" },
{ "name":"Cumulative Disk I/O", "value":"(unknown)" },
{ "name":"Cumulative CPU","value":"(unknown)" }
],
"compiler_hints": [
{ "name":"Output Size (bytes)", "value":"(none)" },
{ "name":"Output Cardinality", "value":"(none)" },
{ "name":"Avg. Output Record Size (bytes)", "value":"(none)" },
{ "name":"Filter Factor", "value":"(none)" }
]
}
},
{
"id": "19b5b24062c48a06e4eac65422ac3317",
"parallelism": 2,
"operator": "Data Source -> FlatMap -> GroupCombine",
"operator_strategy":" (none) -> FlatMap -> Sorted Combine",
"description":"DataSource (at getTextDataSet(WordCount.java:142) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:67)) -> Combine(SUM(1), at main(WordCount.java:67)",
"optimizer_properties": {
...
}
},
{
"id": "0a36cbc29102d7bc993d0a9bf23afa12",
"parallelism": 2,
"operator": "Data Sink",
"operator_strategy": "(none)",
"description": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: ))",
"inputs":[ {
"num": 0,
"id": "f00c89b349b5c998cfd9fe2a06e50fd0",
"ship_strategy": "Forward",
"exchange": "pipelined"
} ],
"optimizer_properties": {
...
}
} ]
}
DELETE
request to /jobs/:jobid/cancel
.
Triggers job cancellation, result on success is {}
.
Triggers a savepoint and cancels the job after the savepoint succeeds.
GET
request to /jobs/:jobid/cancel-with-savepoint/
triggers a savepoint to the default savepoint directory and cancels the job.
GET
request to /jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory
triggers a savepoint to the given target directory and cancels the job.
Since savepoints can take some time to complete this operation happens asynchronously. The result to this request is the location of the in-progress cancellation.
Sample Trigger Result:
{
"status": "accepted",
"request-id": 1,
"location": "/jobs/:jobid/cancel-with-savepoint/in-progress/1"
}
The progress of the cancellation has to be monitored by the user at
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId
The request ID is returned by the trigger result.
{
"status": "in-progress",
"request-id": 1
}
{
"status": "success",
"request-id": 1,
"savepoint-path": "<savepointPath>"
}
The savepointPath
points to the external path of the savepoint, which can be used to resume the savepoint.
{
"status": "failed",
"request-id": 1,
"cause": "<error message>"
}
It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
Send a POST
request to /jars/:jarid/run
. The jarid
parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key jobmanager.web.upload.dir
).
You can specify the following query parameters (all optional):
program-args=arg1 arg2 arg3
entry-class=EntryClassName.class
parallelism=4
savepointPath=hdfs://path/to/savepoint
allowNonRestoredState=true
If the call succeeds, you will get a response with the ID of the submitted job.
Example: Run program with a savepoint
リクエスト:
POST: /jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true
Response:
{"jobid": "869a9868d49c679e7355700e0857af85"}