Flinkは実行中のジョブと最近完了したジョブの状態と統計をクエリするために使うことができる監視APIを持っています。この監視APIはFlink独自のダッシュボードによって使われますが、独自の監視ツールによっても使えるように設計されています。
監視APIはHTTP GETリクエストと受け付け、JSONデータを使って応答するREST-ful APIです。
監視APIはJobManagerの一部として実行するwebサーバによって支援されます。デフォルトでは、このサーバは8081
をlistenします。これはjobmanager.web.port
を使ってflink-conf.yaml
の中で設定することができます。監視API webサーバとwebダッシュボードwebサーバは現在のところ同じもので従って同じポートで一緒に動作します。しかし、それらは異なるHTTP URLで応答します。
複数のジョブマネージャー(高可用性のため)の場合、各ジョブマネージャーは監視APIの独自のインスタンスを実行するでしょう。ジョブマネージャーがクラスタリーダーとして選択されている間、これは完了および実行中のジョブについての情報を提供します。
REST API のバックエンドはflink-runtime-web
プロジェクトの中にあります。コアのクラスは org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
で、サーバとリクエストのルーティングをセットアップします。
RESTリクエストを処理しURLを変換するために、Netty と Netty Router ライブラリを使います。この組み合わせは軽い依存を持ち、Netty HTTPのパフォーマンスがとても良いため、この選択がされました。
新しいリクエストを追加するには、新しいrequest ハンドラ クラスを追加する必要があります。このことを見るための良い例が org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler
にあります。ハンドラを作成した後で、ハンドラはorg.apache.flink.runtime.webmonitor.WebRuntimeMonitor
の中でリクエスト ルータと一緒に登録される必要があります。
以下は利用可能なリクエストのリストと例のJSON応答です。全てのリクエストはhttp://hostname:8081/jobs
からのサンプルで、以下でURLの一部のpath のみをリスト化します。
角括弧内の値は変数です。例えばhttp://hostname:8081/jobs/<jobid>/exceptions
は、例えば http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions
としてリクエストされなければならないでしょう。
/config
/overview
/jobs/overview
/jobs/<jobid>
/jobs/<jobid>/vertices
/jobs/<jobid>/config
/jobs/<jobid>/exceptions
/jobs/<jobid>/accumulators
/jobs/<jobid>/vertices/<vertexid>
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
/jobs/<jobid>/vertices/<vertexid>/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
/jobs/<jobid>/plan
/jars/upload
/jars
/jars/:jarid
/jars/:jarid/plan
/jars/:jarid/run
/config
監視APIとサーバセットアップについてのいくつかの情報
結果の例:
{
"refresh-interval": 3000,
"timezone-offset": 3600000,
"timezone-name": "Central European Time",
"flink-version": "1.5-SNAPSHOT",
"flink-revision": "8124545 @ 16.09.2015 @ 15:38:42 CEST"
}
/overview
Flinkクラスター状態の簡単な概要
結果の例:
{
"taskmanagers": 17,
"slots-total": 68,
"slots-available": 68,
"jobs-running": 0,
"jobs-finished": 3,
"jobs-cancelled": 1,
"jobs-failed": 0
}
/jobs/overview
ステータスによってグループ化され、それぞれがステータスの小さな概要を持つ、ジョブ
結果の例:
{
"jobs":[
{
"jid": "7684be6004e4e955c2a558a9bc463f65",
"name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
"state": "FINISHED",
"start-time": 1442419702857,
"end-time": 1442419975312,
"duration":272455,
"last-modification": 1442419975312,
"tasks": {
"total": 6,
"pending": 0,
"running": 0,
"finished": 6,
"canceling": 0,
"canceled": 0,
"failed": 0
}
},
{
"jid": "49306f94d0920216b636e8dd503a6409",
"name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
...
}]
}
/jobs/<jobid>
一つのジョブについての概要。データフロープランのリスト、ステータス、状態遷移のタイムスタンプ、各頂点(オペレータ)のための集約情報。
結果の例:
{
"jid": "ab78dcdbb1db025539e30217ec54ee16",
"name": "WordCount Example",
"state":"FINISHED",
"start-time":1442421277536,
"end-time":1442421299791,
"duration":22255,
"now":1442421991768,
"timestamps": {
"CREATED": 1442421277536, "RUNNING": 1442421277609, "FAILING": 0, "FAILED": 0, "CANCELLING": 0, "CANCELED": 0, "FINISHED": 1442421299791, "RESTARTING": 0
},
"vertices": [ {
"id": "19b5b24062c48a06e4eac65422ac3317",
"name": "CHAIN DataSource (at getTextDataSet(WordCount.java:142) ...",
"parallelism": 2,
"status": "FINISHED",
"start-time": 1442421277609,
"end-time": 1442421299469,
"duration": 21860,
"tasks": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 2, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
},
"metrics": {
"read-bytes": 0, "write-bytes": 37098, "read-records": 0, "write-records": 3312
}
}, {
"id": "f00c89b349b5c998cfd9fe2a06e50fd0",
"name":"Reduce (SUM(1), at main(WordCount.java:67)",
"parallelism": 2,
....
}, {
"id": "0a36cbc29102d7bc993d0a9bf23afa12",
"name": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: ))",
...
} ],
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 3, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
},
"plan": {
// see plan details below
}
}
/jobs/<jobid>/vertices
現在のところ /jobs/<jobid>
と同じ
/jobs/<jobid>/config
ジョブによって使われるユーザ定義の実行設定
結果の例:
{
"jid": "ab78dcdbb1db025539e30217ec54ee16",
"name": "WordCount Example",
"execution-config": {
"execution-mode": "PIPELINED",
"restart-strategy": "Restart deactivated",
"job-parallelism": -1,
"object-reuse-mode": false
}
}
/jobs/<jobid>/exceptions
ジョブによって観測された復元不可能な例外truncated
フラグは、そうしなければ応答があまりに大きくなるため、リストされないが多くの例外が発生したかどうかを定義します
結果の例:
{
"root-exception": "java.io.IOException: File already exists:/tmp/abzs/2\n\tat org.apache.flink.core.fs.local.LocalFileSystem. ...",
"all-exceptions": [ {
"exception": "java.io.IOException: File already exists:/tmp/abzs/1\n\tat org.apache.flink...",
"task": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: )) (1/2)",
"location": "localhost:49220"
}, {
"exception": "java.io.IOException: File already exists:/tmp/abzs/2\n\tat org.apache.flink...",
"task": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: )) (2/2)",
"location": "localhost:49220"
} ],
"truncated":false
}
/jobs/<jobid>/accumulators
The aggregated user accumulators plus job accumulators.
結果の例:
{
"job-accumulators":[],
"user-task-accumulators": [ {
"name": "avglen",
"type": "DoubleCounter",
"value": "DoubleCounter 61.5162972"
},
{
"name": "genwords",
"type": "LongCounter",
"value": "LongCounter 37500000"
} ]
}
/jobs/<jobid>/vertices/<vertexid>
一つの特定の頂点についての情報。そのサブタスクのそれぞれについての概要を持つ。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"name": "CHAIN DataSource -> Map -> FlatMap -> Combine(SUM(1))",
"parallelism": 2,
"now": 1442424002154,
"subtasks": [ {
"subtask":0,
"status": "FINISHED",
"attempt": 0,
"host": "localhost",
"start-time": 1442421093762,
"end-time": 1442421386680,
"duration": 292918,
"metrics": {
"read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
}
}, {
"subtask": 1,
"status": "FINISHED",
"attempt": 0,
"host": "localhost",
"start-time": 1442421093774,
"end-time": 1442421386267,
"duration": 292493,
"metrics": {
"read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
このリクエストは指定された頂点の全てのサブタスクの状態遷移についてタイムスタンプを返します。これらは例えばサブタスク間でタイムラインの比較を生成するために使うことができます。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"name": "CHAIN DataSource -> Map -> Combine(SUM(1))",
"now":1442423745088,
"subtasks": [ {
"subtask": 0,
"host": "localhost",
"duration": 292924,
"timestamps": {
"CREATED": 1442421093741, "SCHEDULED": 1442421093756, "DEPLOYING": 1442421093762, "RUNNING": 1442421094026, "FINISHED": 1442421386680, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
}, {
"subtask": 1,
"host": "localhost",
"duration": 292494,
"timestamps": {
"CREATED": 1442421093741, "SCHEDULED": 1442421093773, "DEPLOYING": 1442421093774, "RUNNING": 1442421094013, "FINISHED": 1442421386267, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
一つの特定の頂点のためのタスクマネージャーの統計。これは/jobs/<jobid>/vertices/<vertexid>
によって返されるサブタスクの統計の集約です。
結果の例:
{
"id": "fe20bcc29b87cdc76589ca42114c2499",
"name": "Reduce (SUM(1), at main(WordCount.java:72)",
"now": 1454348282653,
"taskmanagers": [ {
"host": "ip-10-0-43-227:35413",
"status": "FINISHED",
"start-time": 1454347870991,
"end-time": 1454347872111,
"duration": 1120,
"metrics": {
"read-bytes": 32503056, "write-bytes": 9637041, "read-records": 2906087, "write-records": 849467
},
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
},{
"host": "ip-10-0-43-227:41486",
"status": "FINISHED",
"start-time": 1454347871001,
"end-time": 1454347872395,
"duration": 1394,
"metrics": {
"read-bytes": 32389499, "write-bytes": 9608829, "read-records": 2895999, "write-records": 846948
},
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/accumulators
特定の頂点についての集約されたユーザ定義のアキュムレータ。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"user-accumulators": [ {
"name": "avglen", "type": "DoubleCounter", "value": "DoubleCounter 123.03259440000001"
}, {
"name": "genwords", "type": "LongCounter", "value": "LongCounter 75000000"
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
指定された頂点の全てのサブタスクについての全てのユーザ定義のアキュムレータを取得する。/jobs/<jobid>/vertices/<vertexid>/accumulators
リクエストによって集約された形式で返される個々のアキュムレータです。
結果の例:
{
"id": "dceafe2df1f57a1206fcb907cb38ad97",
"parallelism": 2,
"subtasks": [ {
"subtask": 0,
"attempt": 0,
"host": "localhost",
"user-accumulators": [ {
"name": "genwords", "type": "LongCounter", "value": "LongCounter 62500000"
}, {
"name": "genletters", "type": "LongCounter", "value": "LongCounter 1281589525"
} ]
}, {
"subtask": 1,
"attempt": 0,
"host": "localhost",
"user-accumulators": [ {
"name": "genwords", "type": "LongCounter", "value": "LongCounter 12500000"
}, {
"name": "genletters", "type": "LongCounter", "value": "LongCounter 256317905"
} ]
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
特定のサブタスクの現在あるいは最新の実行試行の概要。例は以下を見てください。
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
特定のサブタスクの特定の実行試行の概要。 failure/recoveryの場合は複数の実行試行が起こり得ます。
結果の例:
{
"subtask": 0,
"status": "FINISHED",
"attempt": 0,
"host": "localhost",
"start-time": 1442421093762,
"end-time": 1442421386680,
"duration": 292918,
"metrics": {
"read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
}
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
一つの特定の実行試行の間の一つの特定のサブタスクについて集められたアキュームレータ(failure/recoveryの場合は複数の試行が起こります)。
結果の例:
{
"subtask": 0,
"attempt": 0,
"id": "b22f94d91bf41ddb",
"user-accumulators": [ {
"name": "genwords", "type":"LongCounter", "value":"LongCounter 62500000"
}, {
"name": "genletters", "type": "LongCounter", "value": "LongCounter 1281589525"
}, {
"name": "avglen", "type": "DoubleCounter", "value": "DoubleCounter 102.527162"
} ]
}
/jobs/<jobid>/plan
ジョブのデータフロー計画。計画はジョブの概要(/jobs/<jobid>
)にも含まれます。
結果の例:
{
"jid":"ab78dcdbb1db025539e30217ec54ee16",
"name":"WordCount Example",
"nodes": [ {
"id": "f00c89b349b5c998cfd9fe2a06e50fd0",
"parallelism": 2,
"operator": "GroupReduce",
"operator_strategy": "Sorted Group Reduce",
"description": "Reduce (SUM(1), at main(WordCount.java:67)",
"inputs": [ {
"num": 0,
"id":"19b5b24062c48a06e4eac65422ac3317",
"ship_strategy": "Hash Partition on [0]",
"local_strategy":"Sort (combining) on [0:ASC]",
"exchange":"pipelined"
} ],
"optimizer_properties": {
"global_properties": [
{ "name":"Partitioning", "value":"HASH_PARTITIONED" },
{ "name":"Partitioned on", "value":"[0]" },
{ "name":"Partitioning Order", "value":"(none)" },
{ "name":"Uniqueness", "value":"not unique" }
],
"local_properties": [
{ "name":"Order", "value":"[0:ASC]" },
{ "name":"Grouped on", "value":"[0]" },
{ "name":"Uniqueness", "value":"not unique" }
],
"estimates": [
{ "name":"Est. Output Size", "value":"(unknown)" },
{ "name":"Est. Cardinality", "value":"(unknown)" }
],
"costs": [
{ "name":"Network", "value":"(unknown)" },
{ "name":"Disk I/O", "value":"(unknown)" },
{ "name":"CPU", "value":"(unknown)" },
{ "name":"Cumulative Network", "value":"(unknown)" },
{ "name":"Cumulative Disk I/O", "value":"(unknown)" },
{ "name":"Cumulative CPU","value":"(unknown)" }
],
"compiler_hints": [
{ "name":"Output Size (bytes)", "value":"(none)" },
{ "name":"Output Cardinality", "value":"(none)" },
{ "name":"Avg. Output Record Size (bytes)", "value":"(none)" },
{ "name":"Filter Factor", "value":"(none)" }
]
}
},
{
"id": "19b5b24062c48a06e4eac65422ac3317",
"parallelism": 2,
"operator": "Data Source -> FlatMap -> GroupCombine",
"operator_strategy":" (none) -> FlatMap -> Sorted Combine",
"description":"DataSource (at getTextDataSet(WordCount.java:142) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:67)) -> Combine(SUM(1), at main(WordCount.java:67)",
"optimizer_properties": {
...
}
},
{
"id": "0a36cbc29102d7bc993d0a9bf23afa12",
"parallelism": 2,
"operator": "Data Sink",
"operator_strategy": "(none)",
"description": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: ))",
"inputs":[ {
"num": 0,
"id": "f00c89b349b5c998cfd9fe2a06e50fd0",
"ship_strategy": "Forward",
"exchange": "pipelined"
} ],
"optimizer_properties": {
...
}
} ]
}
/jobs/:jobid/cancel
へのDELETE
リクエスト。
ジョブの取り消しを引き起こします。成功時の結果は {}
です。
セーブポイントを引き起こし、セーブポイントが成功した後でジョブを取り消します。
/jobs/:jobid/cancel-with-savepoint/
へのGET
リクエストは、デフォルトのセーブポイントディレクトリへのセーブポイントを引き起こし、ジョブを取り消します。
/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory
へのGET
リクエストは、指定された宛先のディレクトリへのセーブポイントを引き起こし、ジョブを取り消します。
セーブポイントは完了するのに少しかかるため、この操作は非同期で起こります。このリクエストの結果は、進行中の取り消しの場所です。
トリガーの結果の例:
{
"status": "accepted",
"request-id": 1,
"location": "/jobs/:jobid/cancel-with-savepoint/in-progress/1"
}
取り消しの進捗は以下でユーザによって監視されなければなりません
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId
リクエスト ID はトリガーの結果によって返されます。
{
"status": "in-progress",
"request-id": 1
}
{
"status": "success",
"request-id": 1,
"savepoint-path": "<savepointPath>"
}
savepointPath
はセーブポイントの外部パスを示します。これはセーブポイントを再開するために使うことができます。
{
"status": "failed",
"request-id": 1,
"cause": "<error message>"
}
REST APIとwebフロントエンドを使ってFlinkプログラムをアップロード、実行、リスト表示することができます。
jarfile
の下のマルチパート データとして送信されたjarファイルを使って、/jars/upload
にPOST
リクエストを送信します。マルチパートデータが自身のContent-Type
を含むようにもしてください。幾つかのhttpライブラリはデフォルトではヘッダを追加しません。
マルチパートの積み荷は以下のように始まるべきです
------BoundaryXXXX
Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
Content-Type: application/x-java-archive
/jars/:jarid/run
にPOST
リクエストを送信します。jarid
パラメータは設定されたwebフロントエンド アップロード ディレクトリ(設定キーはjobmanager.web.upload.dir
)内のプログラム JAR のファイル名です。
以下のクエリパラメータ(全て任意)を指定することができます:
program-args=arg1 arg2 arg3
entry-class=EntryClassName.class
parallelism=4
savepointPath=hdfs://path/to/savepoint
allowNonRestoredState=true
呼び出しが成功すると、サブミットされたジョブのIDを持つ応答を取得するでしょう。
例: セーブポイントを持つプログラムを実行する
リクエスト:
POST: /jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true
応答:
{"jobid": "869a9868d49c679e7355700e0857af85"}
以下はFLIP-6のためのREST APIドキュメントです。
/blobserver/port | |
動詞: GET |
応答コード: 200 OK |
説明 | |
|
|
|
/config | |
動詞: GET |
応答コード: 200 OK |
説明 | |
|
|
|
/jobmanager/config | |
動詞: GET |
応答コード: 200 OK |
説明 | |
|
|
|
/jobmanager/metrics | |
動詞: GET |
応答コード: 200 OK |
説明 | |
クエリ パラメータ | |
|
|
|
|
|
/jobs | |
動詞: GET |
応答コード: 200 OK |
説明 | |
|
|
|
/jobs | |
動詞: POST |
応答コード: 202 Accepted |
説明 | |
|
|
|
/jobs/overview | |
動詞: GET |
応答コード: 200 OK |
説明 | |
|
|
|
/jobs/:jobid | |
動詞: PATCH |
応答コード: 202 Accepted |
説明 | |
パスのパラメータ | |
|
|
クエリ パラメータ | |
|
|
|
|
|
/jobs/:jobid | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/accumulators | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/checkpoints | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/checkpoints/config | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/checkpoints/:checkpointid | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/config | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/exceptions | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/metrics | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
クエリ パラメータ | |
|
|
|
|
|
/jobs/:jobid/plan | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/vertices/:vertexid/accumulators | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/jobs/:jobid/vertices/:vertexid/metrics | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
クエリ パラメータ | |
|
|
|
|
|
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/metrics | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
クエリ パラメータ | |
|
|
|
|
|
/jobs/:jobid/vertices/:vertexid/subtasktimes | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/overview | |
動詞: GET |
応答コード: 200 OK |
説明 | |
|
|
|
/taskmanagers | |
動詞: GET |
応答コード: 200 OK |
説明 | |
|
|
|
/taskmanagers/:taskmanagerid | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
|
|
|
/taskmanagers/:taskmanagerid/metrics | |
動詞: GET |
応答コード: 200 OK |
説明 | |
パスのパラメータ | |
|
|
クエリ パラメータ | |
|
|
|
|
|