REST APIの監視

Flinkは実行中のジョブと最近完了したジョブの状態と統計をクエリするために使うことができる監視APIを持っています。この監視APIはFlink独自のダッシュボードによって使われますが、独自の監視ツールによっても使えるように設計されています。

監視APIはHTTP GETリクエストと受け付け、JSONデータを使って応答するREST-ful APIです。

概要

監視APIはJobManagerの一部として実行するwebサーバによって支援されます。デフォルトでは、このサーバは8081をlistenします。これはjobmanager.web.portを使ってflink-conf.yamlの中で設定することができます。監視API webサーバとwebダッシュボードwebサーバは現在のところ同じもので従って同じポートで一緒に動作します。しかし、それらは異なるHTTP URLで応答します。

複数のジョブマネージャー(高可用性のため)の場合、各ジョブマネージャーは監視APIの独自のインスタンスを実行するでしょう。ジョブマネージャーがクラスタリーダーとして選択されている間、これは完了および実行中のジョブについての情報を提供します。

開発中

REST API のバックエンドはflink-runtime-web プロジェクトの中にあります。コアのクラスは org.apache.flink.runtime.webmonitor.WebRuntimeMonitor で、サーバとリクエストのルーティングをセットアップします。

RESTリクエストを処理しURLを変換するために、NettyNetty Router ライブラリを使います。この組み合わせは軽い依存を持ち、Netty HTTPのパフォーマンスがとても良いため、この選択がされました。

新しいリクエストを追加するには、新しいrequest ハンドラ クラスを追加する必要があります。このことを見るための良い例が org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandlerにあります。ハンドラを作成した後で、ハンドラはorg.apache.flink.runtime.webmonitor.WebRuntimeMonitorの中でリクエスト ルータと一緒に登録される必要があります。

利用可能なリクエスト

以下は利用可能なリクエストのリストと例のJSON応答です。All requests are of the sample form http://hostname:8081/jobs, below we list only the path part of the URLs.

角括弧内の値は変数です。例えばhttp://hostname:8081/jobs/<jobid>/exceptions は、例えば http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/exceptionsとしてリクエストされなければならないでしょう。

  • /config
  • /overview
  • /jobs
  • /joboverview/running
  • /joboverview/completed
  • /jobs/<jobid>
  • /jobs/<jobid>/vertices
  • /jobs/<jobid>/config
  • /jobs/<jobid>/exceptions
  • /jobs/<jobid>/accumulators
  • /jobs/<jobid>/vertices/<vertexid>
  • /jobs/<jobid>/vertices/<vertexid>/subtasktimes
  • /jobs/<jobid>/vertices/<vertexid>/taskmanagers
  • /jobs/<jobid>/vertices/<vertexid>/accumulators
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
  • /jobs/<jobid>/plan
  • /jars/upload
  • /jars
  • /jars/:jarid
  • /jars/:jarid/plan
  • /jars/:jarid/run

概要

/config

監視APIとサーバセットアップについてのいくつかの情報

結果の例:

{
  "refresh-interval": 3000,
  "timezone-offset": 3600000,
  "timezone-name": "Central European Time",
  "flink-version": "1.3-SNAPSHOT",
  "flink-revision": "8124545 @ 16.09.2015 @ 15:38:42 CEST"
}

/overview

Flinkクラスター状態の簡単な概要

結果の例:

{
  "taskmanagers": 17,
  "slots-total": 68,
  "slots-available": 68,
  "jobs-running": 0,
  "jobs-finished": 3,
  "jobs-cancelled": 1,
  "jobs-failed": 0
}

ジョブの概要

/jobs

ステータスrunning, finished, failed, canceledによってグループ化された、ジョブのID

結果の例:

{
  "jobs-running": [],
  "jobs-finished": ["7684be6004e4e955c2a558a9bc463f65","49306f94d0920216b636e8dd503a6409"],
  "jobs-cancelled":[],
  "jobs-failed":[]
}

/joboverview

ステータスによってグループ化され、それぞれがステータスの小さな概要を持つ、ジョブ

結果の例:

{
  "running":[],
  "finished":[
    {
      "jid": "7684be6004e4e955c2a558a9bc463f65",
      "name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
      "state": "FINISHED",
      "start-time": 1442419702857,
      "end-time": 1442419975312,
      "duration":272455,
      "last-modification": 1442419975312,
      "tasks": {
         "total": 6,
         "pending": 0,
         "running": 0,
         "finished": 6,
         "canceling": 0,
         "canceled": 0,
         "failed": 0
      }
    },
    {
      "jid": "49306f94d0920216b636e8dd503a6409",
      "name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
      ...
    }]
}

/joboverview/running

ステータスによってグループ化され、それぞれがステータスの小さな概要を持つ、ジョブ/joboverviewと同じですが、現在実行中のジョブのみを含みます。

/joboverview/completed

ステータスによってグループ化され、それぞれがステータスの小さな概要を持つ、ジョブ/joboverviewと同じですが、完了した(finished, canceled, あるいは failed) ジョブのみを含みます。

実行中および完了したジョブの詳細

/jobs/<jobid>

一つのジョブについての概要。データフロープランのリスト、ステータス、状態遷移のタイムスタンプ、各頂点(オペレータ)のための集約情報。

結果の例:

{
  "jid": "ab78dcdbb1db025539e30217ec54ee16",
  "name": "WordCount Example",
  "state":"FINISHED",
  "start-time":1442421277536,
  "end-time":1442421299791,
  "duration":22255,
  "now":1442421991768,
  "timestamps": {
    "CREATED": 1442421277536, "RUNNING": 1442421277609, "FAILING": 0, "FAILED": 0, "CANCELLING": 0, "CANCELED": 0, "FINISHED": 1442421299791, "RESTARTING": 0
  },
  "vertices": [ {
    "id": "19b5b24062c48a06e4eac65422ac3317",
    "name": "CHAIN DataSource (at getTextDataSet(WordCount.java:142) ...",
    "parallelism": 2,
    "status": "FINISHED",
    "start-time": 1442421277609,
    "end-time": 1442421299469,
    "duration": 21860,
    "tasks": {
      "CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 2, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
    },
    "metrics": {
      "read-bytes": 0, "write-bytes": 37098, "read-records": 0, "write-records": 3312
    }
  }, {
    "id": "f00c89b349b5c998cfd9fe2a06e50fd0",
    "name":"Reduce (SUM(1), at main(WordCount.java:67)",
    "parallelism": 2,
    ....
  }, {
    "id": "0a36cbc29102d7bc993d0a9bf23afa12",
    "name": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter:  ))",
    ...
  } ],
  "status-counts": {
    "CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 3, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
  },
  "plan": {
    // see plan details below
  }
}

/jobs/<jobid>/vertices

現在のところ /jobs/<jobid>と同じ

/jobs/<jobid>/config

ジョブによって使われるユーザ定義の実行設定

結果の例:

{
  "jid": "ab78dcdbb1db025539e30217ec54ee16",
  "name": "WordCount Example",
  "execution-config": {
    "execution-mode": "PIPELINED",
    "restart-strategy": "Restart deactivated",
    "job-parallelism": -1,
    "object-reuse-mode": false
  }
}

/jobs/<jobid>/exceptions

ジョブによって観測された復元不可能な例外The truncated flag defines whether more exceptions occurred, but are not listed, because the response would otherwise get too big.

結果の例:

{
  "root-exception": "java.io.IOException: File already exists:/tmp/abzs/2\n\tat org.apache.flink.core.fs.local.LocalFileSystem. ...",
  "all-exceptions": [ {
    "exception": "java.io.IOException: File already exists:/tmp/abzs/1\n\tat org.apache.flink...",
    "task": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter:  )) (1/2)",
    "location": "localhost:49220"
  }, {
    "exception": "java.io.IOException: File already exists:/tmp/abzs/2\n\tat org.apache.flink...",
    "task": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter:  )) (2/2)",
    "location": "localhost:49220"
  } ],
  "truncated":false
}

/jobs/<jobid>/accumulators

The aggregated user accumulators plus job accumulators.

結果の例:

{
  "job-accumulators":[],
  "user-task-accumulators": [ {
    "name": "avglen",
    "type": "DoubleCounter",
    "value": "DoubleCounter 61.5162972"
  },
  {
    "name": "genwords",
    "type": "LongCounter",
    "value": "LongCounter 37500000"
  } ]
}

/jobs/<jobid>/vertices/<vertexid>

一つの特定の頂点についての情報。そのサブタスクのそれぞれについての概要を持つ。

結果の例:

{
  "id": "dceafe2df1f57a1206fcb907cb38ad97",
  "name": "CHAIN DataSource -> Map -> FlatMap -> Combine(SUM(1))",
  "parallelism": 2,
  "now": 1442424002154,
  "subtasks": [ {
    "subtask":0,
    "status": "FINISHED",
    "attempt": 0,
    "host": "localhost",
    "start-time": 1442421093762,
    "end-time": 1442421386680,
    "duration": 292918,
    "metrics": {
      "read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
    }
  }, {
    "subtask": 1,
    "status": "FINISHED",
    "attempt": 0,
    "host": "localhost",
    "start-time": 1442421093774,
    "end-time": 1442421386267,
    "duration": 292493,
    "metrics": {
      "read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
    }
  } ]
}

/jobs/<jobid>/vertices/<vertexid>/subtasktimes

このリクエストは指定された頂点の全てのサブタスクの状態遷移についてタイムスタンプを返します。これらは例えばサブタスク間でタイムラインの比較を生成するために使うことができます。

結果の例:

{
  "id": "dceafe2df1f57a1206fcb907cb38ad97",
  "name": "CHAIN DataSource -> Map -> Combine(SUM(1))",
  "now":1442423745088,
  "subtasks": [ {
    "subtask": 0,
    "host": "localhost",
    "duration": 292924,
    "timestamps": {
      "CREATED": 1442421093741, "SCHEDULED": 1442421093756, "DEPLOYING": 1442421093762, "RUNNING": 1442421094026, "FINISHED": 1442421386680, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
    }
  }, {
    "subtask": 1,
    "host": "localhost",
    "duration": 292494,
    "timestamps": {
      "CREATED": 1442421093741, "SCHEDULED": 1442421093773, "DEPLOYING": 1442421093774, "RUNNING": 1442421094013, "FINISHED": 1442421386267, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
    }
  } ]
}

/jobs/<jobid>/vertices/<vertexid>/taskmanagers

一つの特定の頂点のためのタスクマネージャーの統計。これは/jobs/<jobid>/vertices/<vertexid>によって返されるサブタスクの統計の集約です。

結果の例:

{
  "id": "fe20bcc29b87cdc76589ca42114c2499",
  "name": "Reduce (SUM(1), at main(WordCount.java:72)",
  "now": 1454348282653,
  "taskmanagers": [ {
    "host": "ip-10-0-43-227:35413",
    "status": "FINISHED",
    "start-time": 1454347870991,
    "end-time": 1454347872111,
    "duration": 1120,
    "metrics": {
      "read-bytes": 32503056, "write-bytes": 9637041, "read-records": 2906087, "write-records": 849467
    },
    "status-counts": {
      "CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
    }
  },{
    "host": "ip-10-0-43-227:41486",
    "status": "FINISHED",
    "start-time": 1454347871001,
    "end-time": 1454347872395,
    "duration": 1394,
    "metrics": {
      "read-bytes": 32389499, "write-bytes": 9608829, "read-records": 2895999, "write-records": 846948
    },
    "status-counts": {
      "CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
    }
  } ]
}

/jobs/<jobid>/vertices/<vertexid>/accumulators

特定の頂点についての集約されたユーザ定義のアキュムレータ。

結果の例:

{
  "id": "dceafe2df1f57a1206fcb907cb38ad97",
  "user-accumulators": [ {
    "name": "avglen", "type": "DoubleCounter", "value": "DoubleCounter 123.03259440000001"
  }, {
    "name": "genwords", "type": "LongCounter", "value": "LongCounter 75000000"
  } ]
}

/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators

指定された頂点の全てのサブタスクについての全てのユーザ定義のアキュムレータを取得する。/jobs/<jobid>/vertices/<vertexid>/accumulators リクエストによって集約された形式で返される個々のアキュムレータです。

結果の例:

{
  "id": "dceafe2df1f57a1206fcb907cb38ad97",
  "parallelism": 2,
  "subtasks": [ {
    "subtask": 0,
    "attempt": 0,
    "host": "localhost",
    "user-accumulators": [ {
      "name": "genwords", "type": "LongCounter", "value": "LongCounter 62500000"
    }, {
      "name": "genletters", "type": "LongCounter", "value": "LongCounter 1281589525"
    } ]
  }, {
    "subtask": 1,
    "attempt": 0,
    "host": "localhost",
    "user-accumulators": [ {
      "name": "genwords", "type": "LongCounter", "value": "LongCounter 12500000"
    }, {
      "name": "genletters", "type": "LongCounter", "value": "LongCounter 256317905"
    } ]
  } ]
}

/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>

特定のサブタスクの現在あるいは最新の実行試行の概要。例は以下を見てください。

/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>

特定のサブタスクの特定の実行試行の概要。 failure/recoveryの場合は複数の実行試行が起こり得ます。

結果の例:

{
  "subtask": 0,
  "status": "FINISHED",
  "attempt": 0,
  "host": "localhost",
  "start-time": 1442421093762,
  "end-time": 1442421386680,
  "duration": 292918,
  "metrics": {
    "read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
  }
}

/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators

一つの特定の実行試行の間の一つの特定のサブタスクについて集められたアキュームレータ(failure/recoveryの場合は複数の試行が起こります)。

結果の例:

{
  "subtask": 0,
  "attempt": 0,
  "id": "b22f94d91bf41ddb",
  "user-accumulators": [ {
    "name": "genwords", "type":"LongCounter", "value":"LongCounter 62500000"
  }, {
    "name": "genletters", "type": "LongCounter", "value": "LongCounter 1281589525"
  }, {
  "name": "avglen", "type": "DoubleCounter", "value": "DoubleCounter 102.527162"
  } ]
}

/jobs/<jobid>/plan

ジョブのデータフロー計画。計画はジョブの概要(/jobs/<jobid>)にも含まれます。

結果の例:

{
  "jid":"ab78dcdbb1db025539e30217ec54ee16",
  "name":"WordCount Example",
  "nodes": [ {
    "id": "f00c89b349b5c998cfd9fe2a06e50fd0",
    "parallelism": 2,
    "operator": "GroupReduce",
    "operator_strategy": "Sorted Group Reduce",
    "description": "Reduce (SUM(1), at main(WordCount.java:67)",
    "inputs": [ {
      "num": 0,
      "id":"19b5b24062c48a06e4eac65422ac3317",
      "ship_strategy": "Hash Partition on [0]",
      "local_strategy":"Sort (combining) on [0:ASC]",
      "exchange":"pipelined"
    } ],
    "optimizer_properties": {
      "global_properties": [
        { "name":"Partitioning", "value":"HASH_PARTITIONED" },
        { "name":"Partitioned on", "value":"[0]" },
        { "name":"Partitioning Order", "value":"(none)" },
        { "name":"Uniqueness", "value":"not unique" }
      ],
      "local_properties": [
        { "name":"Order", "value":"[0:ASC]" },
        { "name":"Grouped on", "value":"[0]" },
        { "name":"Uniqueness", "value":"not unique" }
      ],
      "estimates": [
        { "name":"Est. Output Size", "value":"(unknown)" },
        { "name":"Est. Cardinality", "value":"(unknown)" }
      ],
      "costs": [
        { "name":"Network", "value":"(unknown)" },
        { "name":"Disk I/O", "value":"(unknown)" },
        { "name":"CPU", "value":"(unknown)" },
        { "name":"Cumulative Network", "value":"(unknown)" },
        { "name":"Cumulative Disk I/O", "value":"(unknown)" },
        { "name":"Cumulative CPU","value":"(unknown)" }
      ],
      "compiler_hints": [
        { "name":"Output Size (bytes)", "value":"(none)" },
        { "name":"Output Cardinality", "value":"(none)" },
        { "name":"Avg. Output Record Size (bytes)", "value":"(none)" },
        { "name":"Filter Factor", "value":"(none)" }
      ]
    }
  },
  {
    "id": "19b5b24062c48a06e4eac65422ac3317",
    "parallelism": 2,
    "operator": "Data Source -> FlatMap -> GroupCombine",
    "operator_strategy":" (none) -> FlatMap -> Sorted Combine",
    "description":"DataSource (at getTextDataSet(WordCount.java:142) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:67)) -> Combine(SUM(1), at main(WordCount.java:67)",
    "optimizer_properties": {
      ...
    }
  },
  {
    "id": "0a36cbc29102d7bc993d0a9bf23afa12",
    "parallelism": 2,
    "operator": "Data Sink",
    "operator_strategy": "(none)",
    "description": "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter:  ))",
    "inputs":[ {
      "num": 0,
      "id": "f00c89b349b5c998cfd9fe2a06e50fd0",
      "ship_strategy": "Forward",
      "exchange": "pipelined"
    } ],
    "optimizer_properties": {
      ...
    }
  } ]
}

Job Cancellation

Cancel Job

DELETE request to /jobs/:jobid/cancel.

Triggers job cancellation, result on success is {}.

Cancel Job with Savepoint

Triggers a savepoint and cancels the job after the savepoint succeeds.

GET request to /jobs/:jobid/cancel-with-savepoint/ triggers a savepoint to the default savepoint directory and cancels the job.

GET request to /jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory triggers a savepoint to the given target directory and cancels the job.

Since savepoints can take some time to complete this operation happens asynchronously. The result to this request is the location of the in-progress cancellation.

Sample Trigger Result:

{
  "status": "accepted",
  "request-id": 1,
  "location": "/jobs/:jobid/cancel-with-savepoint/in-progress/1"
}
Monitoring Progress

The progress of the cancellation has to be monitored by the user at

/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId

The request ID is returned by the trigger result.

In-Progress
{
  "status": "in-progress",
  "request-id": 1
}
Success
{
  "status": "success",
  "request-id": 1,
  "savepoint-path": "<savepointPath>"
}

The savepointPath points to the external path of the savepoint, which can be used to resume the savepoint.

Failed
{
  "status": "failed",
  "request-id": 1,
  "cause": "<error message>"
}

Submitting Programs

It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.

Run a Program (POST)

Send a POST request to /jars/:jarid/run. The jarid parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key jobmanager.web.upload.dir).

You can specify the following query parameters (all optional):

  • Program arguments: program-args=arg1 arg2 arg3
  • Main class to execute: entry-class=EntryClassName.class
  • Default parallelism: parallelism=4
  • Savepoint path to restore from: savepointPath=hdfs://path/to/savepoint
  • Allow non restored state: allowNonRestoredState=true

If the call succeeds, you will get a response with the ID of the submitted job.

Example: Run program with a savepoint

リクエスト:

POST: /jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true

Response:

{"jobid": "869a9868d49c679e7355700e0857af85"}
TOP
inserted by FC2 system