Flink Operations Playground
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Flinkオペレーションのプレイグラウンド #

様々な環境でApache Flinkをデプロイして操作する、いろいろな方法があります。この多様性に関係なく、Flinkクラスタの基本的な構成要素は同じであり、同様の動作原理が適用されます。

このプレイグラウンドで、Flinkジョブを管理して実行する方法を学びます。アプリケーションをデプロイして監視する方法、Flinkがジョブの失敗からどのように回復するかの体験、アップグレードや再スケーリングのような日常的な運用タスクを実行する方法が分かります。

注意: このプレイグラウンドで使われるApache Flink Dockerイメージは、Apache Flinkのリリースされたバージョンでのみ使えます。

現在、ドキュメントの最新のスナップショットバージョンを見ているため、以下の全てのバージョン参照は動作しません。 メニューの左下にあるリリースピッカーを使って、ドキュメントを最新のリリースバージョンに切り替えてください。

このプレイグラウンドの構造 #

このプレイグラウンドは、長時間存続するFlinkセッションクラスタとKafkaクラスタで構成されます。

Flinkクラスタは常にJobManagerと1つ以上のFlinkタスクマネージャで構成されます。JobManagerはジョブの送信の処理、ジョブの監督、リソース管理を担当します。Flink TaskManagersはワーカープロセスであり、Flinkジョブを構成する実際のタスクの実行を担当します。このプレイグラウンドでは、1つのTaskManagerから開始しますが、後でさらに多くのTaskManagersにスケールアウトします。 さらに、このプレイグラウンドには専用のクライアントコンテナが付属しており、これは最初にFlinkジョブを送信し、後で様々な操作タスクを実行するために使います。クライアントコンテナはFlinkクラスタ自体には必要ありませんが、使いやすくするためにのみ含まれています。

Kafkaクラスタは、ZookeeperサーバとKafkaブローカーで構成されます。

Flink Docker Playground

プレイグラウンドが開始されると、Flink Event CountというFlinkジョブがJobManagerに送信されます。さらに、2つのKafkaトピックinputoutputが作成されます。

Click Event Count Example

ジョブはinputトピックからClickEventを消費します。それぞれにtimestamppageが付いています。次に、イベントはpageでキー付けされ、15秒のwindowsでカウントされます。結果はoutputトピックに書き込まれます。

6つの異なるページがあり、ページごとに1000個のクリックイベントを15秒生成します。したがって、Flinkジョブの出力はページおよびウィンドウごとに1000のビューが表示されるはずです。

Back to top

プレイグラウンドの開始 #

プレイグラウンド環境はわずか数ステップでセットアップできます。必要なコマンドを順に説明し、全てが正しく実行されていることを検証する方法を説明します。

マシンにDocker (1.12+)とdocker-compose (2.1+)がインストールされていることを前提とします。

必要な設定ファイルはflink-playgroundsリポジトリにあります。まずコードをチェックアウトし、dockerイメージをビルドします:

git clone https://github.com/apache/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build

次に、プレイグラウンドを開始します:

docker-compose up -d

その後、次のコマンドを使って実行中のDockerコンテナを検査できます:

docker-compose ps

                    Name                                  Command               State                   Ports                
-----------------------------------------------------------------------------------------------------------------------------
operations-playground_clickevent-generator_1   /docker-entrypoint.sh java ...   Up       6123/tcp, 8081/tcp                  
operations-playground_client_1                 /docker-entrypoint.sh flin ...   Exit 0                                       
operations-playground_jobmanager_1             /docker-entrypoint.sh jobm ...   Up       6123/tcp, 0.0.0.0:8081->8081/tcp    
operations-playground_kafka_1                  start-kafka.sh                   Up       0.0.0.0:9094->9094/tcp              
operations-playground_taskmanager_1            /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp                  
operations-playground_zookeeper_1              /bin/sh -c /usr/sbin/sshd  ...   Up       2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

これは、クライアントコンテナがFlinkジョブを正常に送信し(Exit 0)、全てのクラスタコンポーネントとデータジェネレータが実行中である(Up)ことを示します。

以下を呼び出すことで、プレイグラウンド環境を停止できます:

docker-compose down -v

プレイグラウンドに入る #

このプレイグラウンドでは、試したりチェックしたりできることがたくさんあります。次の2つのセクションでは、Flinkクラスタとやりとりをする方法を示し、Flinkの主要な機能のいくつかを示します。

Flinkクラスタを観察するのに最も適した開始点は、http://localhost:8081で公開されているWebUIです。全てがうまくいっていれば、クラスタは最初に1つのTaskManagerで構成され、Click Event Countと呼ばれるジョブを実行していることが分かります。

Playground Flink WebUI

Flink WebUIには、Flinkクラスタとそのジョブ(JobGraph、Metrics、Checkpointing Statistics、TaskManager Statusなど)に関する多くの有益で興味深い情報が含まれています。

ログ #

JobManager

JobManagerのログはdocker-composeを介してtailできます。

docker-compose logs -f jobmanager

最初の起動後は、主にチェックポイントの完了ごとにログメッセージが表示されているはずです。

TaskManager

TaskManagerのログも同じ方法でtalできます。

docker-compose logs -f taskmanager

最初の起動後は、主にチェックポイントの完了ごとにログメッセージが表示されているはずです。

Flink CLIはクライアントコンテナ内から使えます。例えば、Flink CLIのhelpメッセージを表示するには、次のコマンドを実行します

docker-compose run --no-deps client flink --help

Flink REST APIは、ホスト上のlocalhost:8081またはクライアントコンテナのjobmanager:8081を介して公開されます。例えば、現在実行中の全てのジョブを一覧表示するには、次を実行します:

curl localhost:8081/jobs

注意: _curl_コマンドがマシンで利用できない場合、(Flink CLIと同様に)クライアントコンテナから実行できます:

docker-compose run --no-deps client curl jobmanager:8081/jobs 

Kafkaトピック #

次のコマンドを実行することで、Kafkaトピックに書き込まれているレコードを見ることができます

//input topic (1000 records/s)
docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic input

//output topic (24 records/min)
docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic output

Back to top

遊びの時間です! #

FlinkとDockerコンテナを操作する方法を学習したので、プレイグラウンドで試してみることができる一般的な操作タスクをいくつか見てみましょう。 これらのタスクは全てお互いに独立しているため、任意の順番で実行できます。 ほとんどのタスクはCLIREST APIを介して実行できます。

実行中のジョブの一覧表示 #

Command

docker-compose run --no-deps client flink list

Expected Output

Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Request

curl localhost:8081/jobs

Expected Response (pretty-printed)

{
  "jobs": [
    {
      "id": "<job-id>",
      "status": "RUNNING"
    }
  ]
}

JobIDは送信時にジョブに割り当てられ、CLIまたはREST APIを介してジョブに対してアクションを実行するために必要です。

障害と回復の監視 #

Flinkは(部分的な)障害が発生した場合に確実に1回の処理を保証します。このプレイグラウンドでは、この動作をある程度まで検証できます。

ステップ 1: 出力の観察 #

上記で説明したように、このプレイグラウンドのイベントは、各ウィンドウにちょうど1000件のレコードが含まれるように生成されます。したがって、Flinkがデータの損失や重複なしにTaskManagerの障害から正常に回復したことを確認するには、outputトピックをtailし、回復後に全てのウィンドウが存在してカウントが正しいことを確認します。

このためには、outputトピックから読み取りを開始し、回復するまでこのコマンドを実行したままにしておきます(ステップ 3)。

docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic output

ステップ 2: 障害の導入 #

部分的な障害をシミュレートするために、TaskManagerを強制終了します。プロダクションセットアップでは、これはTaskManagerプロセス、TaskManagerマシーンの喪失、またはフレームワークまたはユーザコードから投げられた単なる一時的な例外(例えば、外部的リソースが一時的にりようできないためなど)に相当します。

docker-compose kill taskmanager

数秒後にJobManagerはTaskManagerの喪失に気づき、影響を受けたジョブを取り消し、回復のためにすぐに再送信します。 ジョブが再開されると、タスクはSCHEDULED状態のままになりmさう。これは紫色の四角で示されます(以下のスクリーンショットを見てください)。

Playground Flink WebUI
注意: ジョブのタスクがSCHEDULED状態にあり、まだRUNNINGではない場合でも、ジョブの全体的な状態はRUNNINGと表示されます。

この時点では、ジョブのタスクはSCHEDULED状態からRUNNINGに移行できません。リソース(TaskManagerによって提供されるTaskSlots)がないためです。 新しいTaskManagerが利用可能になるまで、ジョブはキャンセルと再送信のサイクルを繰り返します。

その間、データジェネレータはClickEventinputトピックにプッシュし続けます。これは、データを処理するジョブが停止している間にデータが生成される実際のプロダクションセットアップに似ています。

ステップ 3: リカバリ #

TaskManagerを再起動すると、JobManagerに再接続します。

docker-compose up -d taskmanager

JobManagerは、新しいTaskManagerについて通知されると、新しく利用可能なTaskSlotsに回復ジョブのタスクをスケジュールします。再起動すると、タスクは失敗する前に取得された最後に成功したチェックポイントから状態を回復し、RUNNING状態に切り替えます。

ジョブは、Kafkaからの入力イベント(停止中に蓄積された)の完全なバックログを迅速に処理し、ストリームの先頭に到達するまではるかに速い速度(24レコード/分より速く)で出力を生成します。outputで、全ての時間ウィンドウで全てのキー(pages)が存在し、それぞれのカウントがちょうど1000であることが分かります。 windows and that every count is exactly one thousand. FlinkKafkaProducerを"at-least-once"モードで使っているため、重複した出力レコードが表示される可能性があります。

注意: ほとんどのプロダクションセットアップは、リソースマネージャ(Kubernetes, Yarn)に依存していて、失敗したプロセスを自動的に再起動します。

ジョブのアップグレードと再スケーリング #

Flinkジョブのアップグレードには常に2つの手順が必要です: まず、Flinkジョブはセーブポイントでグレースフルに停止されます。セーブポイントは、明確に定義されたグローバルに一貫した時点での完全なアプリケーションの状態の一貫したスナップショットです(チェックポイントに似ています)。次に、アップグレードされたFlinkジョブがセーブポイントから開始されます。この文脈では、“upgrade"は次のような様々な意味を持ちます:

  • 設定のアップグレード(ジョブの並列実行を含む)
  • ジョブのトポロジーへのアップグレード(オペレータの追加/削除)
  • ジョブのユーザ定義の関数のアップグレード

アップグレードを開始する前に、アップグレード中にデータが失われたり破損したりしないことを確認するために、outputトピックのtailを開始することをお勧めします。

docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic output

ステップ 1: ジョブの停止 #

グレースフルにジョブを停止するために、CLIまたはREST APIのいずれかの"stop"コマンドを使う必要があります。 このために、ジョブのJobIDが必要です。これはlisting all running JobsまたはWebUIから取得できます。JobIDを使って、ジョブの停止を進めることができます:

Command

docker-compose run --no-deps client flink stop <job-id>

Expected Output

Suspending job "<job-id>" with a savepoint.
Savepoint completed. Path: file:<savepoint-path>

セーブポイントは、flink-conf.yamlで設定されたstate.savepoints.dirに保存されています。これはローカルマシンの*/tmp/flink-savepoints-directory/*にマウントされます。次のステップでこのセーブポイントへのパスが必要になります。

Request

# triggering stop
curl -X POST localhost:8081/jobs/<job-id>/stop -d '{"drain": false}'

Expected Response (pretty-printed)

{
  "request-id": "<trigger-id>"
}

Request

# check status of stop action and retrieve savepoint path
 curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id>

Expected Response (pretty-printed)

{
  "status": {
    "id": "COMPLETED"
  },
  "operation": {
    "location": "<savepoint-path>"
  }
}

ステップ 2a: 変更せずにジョブを再起動します #

これで、このセーブポイントからアップグレードされたジョブを再開できます。簡単にするために、何も変更せずに再起動することで、再開できます。

Command

docker-compose run --no-deps client flink run -s <savepoint-path> \
  -d /opt/ClickCountJob.jar \
  --bootstrap.servers kafka:9092 --checkpointing --event-time

Expected Output

Job has been submitted with JobID <job-id>

Request

# Uploading the JAR from the Client container
docker-compose run --no-deps client curl -X POST -H "Expect:" \
  -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

Expected Response (pretty-printed)

{
  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
  "status": "success"
}

Request

# Submitting the Job
curl -X POST http://localhost:8081/jars/<jar-id>/run \
  -d '{"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'

Expected Response (pretty-printed)

{
  "jobid": "<job-id>"
}

ジョブが再びRUNNINGになると、停止中に蓄積されたバックログをジョブが処理している間、レコードがより速い速度で生成されることがoutputトピックに表示されます。さらに、アップグレード中にデータが失われていないことが分かります: 正確に1000個が全てのウィンドウに存在します。

ステップ 2b: 異なる並列処理でジョブを再起動します(再スケーリング) #

あるいは、再送中に別の並列処理を渡すことで、このセーブポイントからジョブを再スケールすることもできます。

Command

docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \
  -d /opt/ClickCountJob.jar \
  --bootstrap.servers kafka:9092 --checkpointing --event-time

Expected Output

Starting execution of program
Job has been submitted with JobID <job-id>

Request

# Uploading the JAR from the Client container
docker-compose run --no-deps client curl -X POST -H "Expect:" \
  -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

Expected Response (pretty-printed)

{
  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
  "status": "success"
}

Request

# Submitting the Job
curl -X POST http://localhost:8081/jars/<jar-id>/run \
  -d '{"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'

Expected Response (pretty-printed

{
  "jobid": "<job-id>"
}
ジョブは再送信されまsちあが、並列処理を増やして実行するのに十分なTaskSlotsがないため(2つ利用可、3つ必要)、ジョブは開始されません。With

docker-compose scale taskmanager=2

2つのTaskSlotを持つ2つ目のTaskManagerをFlinkクラスタに追加でき、JobManagerに自動的に追加されます。TaskManagerを追加した直後に、ジョブは再び実行を開始するはずです。

ジョブが再び"RUNNING"になると、outputトピックで再スケーリング中にデータが失われていないことが分かります: 正確に1000個が全てのウィンドウに存在します。

ジョブのメトリクスのクエリ #

JobManagerは、REST APIを介して、システムとユーザのmetricsを公開します。

エンドポイントはこれらのメトリクスの範囲によって異なります。ジョブにスコープされたメトリクスは、jobs/<job-id>/metricsで一覧表示できます。メトリクスの実際の値は、getクエリパラメータを介してクエリできます。

Request

curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"

Expected Response (pretty-printed; no placeholders)

[
  {
    "id": "lastCheckpointSize",
    "value": "9378"
  }
]

REST APIはメトリクスのクエリに使えるだけでなく、実行中のジョブのステータスについての詳細情報を取得することもできます。

Request

# find the vertex-id of the vertex of interest
curl localhost:8081/jobs/<jod-id>

Expected Response (pretty-printed)

{
  "jid": "<job-id>",
  "name": "Click Event Count",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1564467066026,
  "end-time": -1,
  "duration": 374793,
  "now": 1564467440819,
  "timestamps": {
    "CREATED": 1564467066026,
    "FINISHED": 0,
    "SUSPENDED": 0,
    "FAILING": 0,
    "CANCELLING": 0,
    "CANCELED": 0,
    "RECONCILING": 0,
    "RUNNING": 1564467066126,
    "FAILED": 0,
    "RESTARTING": 0
  },
  "vertices": [
    {
      "id": "<vertex-id>",
      "name": "ClickEvent Source",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066423,
      "end-time": -1,
      "duration": 374396,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 0,
        "read-bytes-complete": true,
        "write-bytes": 5033461,
        "write-bytes-complete": true,
        "read-records": 0,
        "read-records-complete": true,
        "write-records": 166351,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "ClickEvent Counter",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066469,
      "end-time": -1,
      "duration": 374350,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 5085332,
        "read-bytes-complete": true,
        "write-bytes": 316,
        "write-bytes-complete": true,
        "read-records": 166305,
        "read-records-complete": true,
        "write-records": 6,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "ClickEventStatistics Sink",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066476,
      "end-time": -1,
      "duration": 374343,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 20668,
        "read-bytes-complete": true,
        "write-bytes": 0,
        "write-bytes-complete": true,
        "read-records": 6,
        "read-records-complete": true,
        "write-records": 0,
        "write-records-complete": true
      }
    }
  ],
  "status-counts": {
    "CREATED": 0,
    "FINISHED": 0,
    "DEPLOYING": 0,
    "RUNNING": 4,
    "CANCELING": 0,
    "FAILED": 0,
    "CANCELED": 0,
    "RECONCILING": 0,
    "SCHEDULED": 0
  },
  "plan": {
    "jid": "<job-id>",
    "name": "Click Event Count",
    "type": "STREAMING",
    "nodes": [
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEventStatistics Sink",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "FORWARD",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEvent Counter",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "HASH",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEvent Source",
        "optimizer_properties": {}
      }
    ]
  }
}

様々なスコープ(例えば、TaskManagerのメトリクス)のメトリクスをクエリする方法を含む、可能なクエリの完全なリストについては、REST APIリファレンスを参照してください。

Back to top

バリエーション #

Click Event Countアプリケーションが常に--checkpointing--event-timeのプログラムパラメータで開始されることに気づいたかもしれません。docker-compose.yamlclientコンテナのコマンドでこれらを省略すると、ジョブの動作を変更できます。

  • --checkpointingは、Flinkの耐障害性の仕組みであるチェックポイントを有効にします。これを使わずに実行して障害と回復を経ると、データが実際には喪失していることに気づくはずです。

  • --event-timeは、ジョブのイベント時間セマンティクスを有効にします。無効にすると、ジョブはClickEventのタイムスタンプではなく、実時間に基づいてウィンドウにイベントを割り当てます。従って、ウィンドウごとのイベントの数はちょうど1000ではなくなります。

Click Event Countアプリケーションには、デフォルトではオフになっているオプションがあり、バックプレッシャー下でこのジョブの動作を調査できます。このオプションはdocker-compose.yamlclientコンテナのコマンドに追加できます。

  • --backpressureは、ジョブの途中に追加のオペレータを追加し、偶数の分の間(例えば、10:12の間は含み、10:13の間は除く)に厳しいバックプレッシャーを追加します。これは、outputQueueLengthoutPoolUsageなどの様々なネットワークメトリクスを検査することで確認でき、WebUIで利用可能なバックプレッシャー監視を使うことで確認できます。
inserted by FC2 system