Upgrading Applications and Flink Versions
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

アプリケーションとFlinkバージョンのアップグレード #

Flinkのデータストリームプログラムは一般的に週、月あるいは年のような長期間実行するように設定されています。全ての長期間実行するサービスのように、Flinkのストリーミングアプリケーションは維持される必要があります。これはバグの修正、改善の実装、あるいはアプリケーションを最新のバージョンのFlinkクラスタに移設することも含まれます。

この文章はFlinkストリーミングアプリケーションを更新する方法と、実行中のストリーミングアプリケーションを異なるFlinkクラスタに移設する方法を説明します。

API compatibility guarantees #

The classes & members of the Java/Scala APIs that are intended for users are annotated with the following stability annotations:

  • Public
  • PublicEvolving
  • Experimental
Annotations on a class also apply to all members of that class, unless otherwise annotated.

Any API without such an annotation is considered internal to Flink, with no guarantees being provided.

An API that is source compatible means that code written against the API will continue to compile against a later version. An API that is binary compatible means that code compiled against the API will continue to run against a later version.

This table lists the source / binary compatibility guarantees for each annotation when upgrading to a particular release:

Annotation Major release
(Source / Binary)
Minor release
(Source / Binary)
Patch release
(Source / Binary)
Public / / /
PublicEvolving / / /
Experimental / / /
Example Code written against a PublicEvolving API in 1.15.2 will continue to run in 1.15.3, without having to recompile the code. That same code would have to be recompiled when upgrading to 1.16.0 though.

ストリーミング アプリケーションの再起動 #

The line of action for upgrading a streaming application or migrating an application to a different cluster is based on Flink’s Savepoint feature. セーブポイントはある時点のアプリケーションの状態の一貫性のあるスナップショットです。

実行中のストリーミングアプリケーションからセーブポイントを取る2つの方法があります。

  • セーブポイントの取得と処理の継続。
> ./bin/flink savepoint <jobID> [pathToSavepoint]

以前の時間の1点からアプリケーションを再起動できるように、定期的にセーブポイントを取ることをお勧めします。

  • 1つのアクションとして、セーブポイントの取得とアプリケーションを停止します。
> ./bin/flink cancel -s [pathToSavepoint] <jobID>

セーブポイントが完了した後ですぐにアプリケーションが取り消されることを意味します。つまり、セーブポイントの後では他のチェックポイントは取られません。

Given a savepoint taken from an application, the same or a compatible application (see Application State Compatibility section below) can be started from that savepoint. セーブポイントからアプリケーションを開始することは、セーブポイント内で維持されているオペレータの状態で初期化されることを意味します。これはセーブポイントを使ってアプリケーションを開始することで行われます。

> ./bin/flink run -d -s [pathToSavepoint] ~/application.jar

開始されたアプリケーションのオペレータはセーブポイントが取られた時点の元のアプリケーション(つまり、セーブポイントが取られたアプリケーション)のオペレータの状態で初期化されます。開始されたアプリケーションは正しくこの場所から処理を続けます。

Note: Even though Flink consistently restores the state of an application, it cannot revert writes to external systems. このことは、アプリケーションを止めずに取られたセーブポイントから再開する場合に問題になるかもしれません。この場合、アプリケーションは恐らくセーブポイントが取られた後のデータを発行します。再開されたアプリケーションは(アプリケーションのロジックを変更したかどうかに依存して)同じデータを再び発行するかもしれません。The exact effect of this behavior can be very different depending on the SinkFunction and storage system. Cassandraのようなキー-値ストアへの等冪な書き込みの場合には2回発行されたデータはOKかもしれませんが、Kafkaのような長く続くログに追加する場合は問題になるかもしれません。どの場合でも、再起動したアプリケーションの挙動を注意深く調べてテストすべきです。

アプリケーションの状態の互換性 #

バグを修正するため、あるいはあプ位ケーションを改善するためにアップグレードする場合は、通常は状態を保持しながら実行中のアプリケーションのアプリケーション ロジックを置き換えることが目的です。アップグレードされたアプリケーションを元のアプリケーションで取られたセーブポイントから開始することで、これを行います。However, this does only work if both applications are state compatible, meaning that the operators of upgraded application are able to initialize their state with the state of the operators of original application.

この章では、状態の互換性を維持するためにどうやってアプリケーションを修正することができるかを議論します。

データストリーム API #

マッチング オペレータの状態 #

アプリケーションがセーブポイントから再起動される時に、Flinkは開始されたアプリケーションのステートフルなオペレータにセーブポイント内のオペレータの状態を合致させます。合致はオペレータのIDに基づいて行われます。これもセーブポイント内に保存されています。Each operator has a default ID that is derived from the operator’s position in the application’s operator topology. 従って、修正されていないアプリケーションは常に独自のセーブポイントのうちの一つから再起動することができます。しかし、オペレータのデフォルトのIDはアプリケーションが修正された場合に変更されがちです。従って、もしオペレータのIDが明示的に指定された場合は、修正されたアプリケーションはセーブポイントからのみ起動することができます。Assigning IDs to operators is very simple and done using the uid(String) method as follows:

val mappedEvents: DataStream[(Int, Long)] = events
  .map(new MyStatefulMapFunc()).uid("mapper-1")

Note: Since the operator IDs stored in a savepoint and IDs of operators in the application to start must be equal, it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future. 幾つかのオペレータはユーザに見えない内部的な状態を持つため、この忠告は全てのオペレータ、つまり明示的に宣言されたオペレータの状態を持つ、持たないオペレータに対して適用されます。Upgrading an application without assigned operator IDs is significantly more difficult and may only be possible via a low-level workaround using the setUidHash() method.

Important: As of 1.3.x this also applies to operators that are part of a chain.

デフォルトでは、セーブポイントに格納された全ての状態は開始するアプリケーションのオペレータに一致しなければなりません。しかし、アプリケーションをセーブポイントから開始する時に、ユーザはオペレータに合致することができない状態を明示的にスキップ(そして結果的に破棄)することに同意することができます。ステートフルなオペレータに関しては、セーブポイント内で見つかった状態の無いオペレータをデフォルトの状態で初期化します。Users may enforce best practices by calling ExecutionConfig#disableAutoGeneratedUIDs which will fail the job submission if any operator does not contain a custom unique ID.

ステートフル オペレータとユーザ関数 #

アプリケーションをアップグレードする時に、ユーザ関数とオペレータは1つの制限付きで自由に修正することができます。オペレータの状態のデータ型を変更することはできません。セーブポイントからの状態は(現在のところ)オペレータにロードされる前に異なるデータ型に変換することができないため、これは重要です。従って、アプリケーションのアップグレード時にオペレータの状態のデータ型を変更をすると、アプリケーションの状態の一貫性を壊し、アップグレードされたアプリケーションのセーブポイントからの再起動を妨げます。

オペレータの状態はユーザ定義あるいは内部的なものかもしれません。

  • User-defined operator state: In functions with user-defined operator state the type of the state is explicitly defined by the user. オペレータの状態のデータ型は変更することができませんが、この制限を克服するための次善策は異なるデータ型を持つ2つ目の状態を定義し、元の状態から新しい状態へ状態を移行するためのロジックを実装することです。This approach requires a good migration strategy and a solid understanding of the behavior of key-partitioned state.

  • Internal operator state: Operators such as window or join operators hold internal operator state which is not exposed to the user. これらのオペレータのために、内部的な状態のデータ型はオペレータの入力型あるいは出力型に依存します。その結果、入力あるいは出力のそれぞれの型の変更はアプリケーションの状態の一貫性を壊しアップグレードを妨げます。以下の表は内部状態を持つオペレータをリスト化し、状態が入力と出力の型にどう関係するかを示します。キー付けされたストリームに適用されるオペレータについては、キーの型(KEY)も常に状態のデータ型の一部です。

オペレータ 内部的なオペレータの状態のデータ型
ReduceFunction[IOT] IOT (Input and output type) [, KEY]
WindowFunction[IT, OT, KEY, WINDOW] IT (Input type), KEY
AllWindowFunction[IT, OT, WINDOW] IT (Input type)
JoinFunction[IT1, IT2, OT] IT1, IT2 (Type of 1. and 2. input), KEY
CoGroupFunction[IT1, IT2, OT] IT1, IT2 (Type of 1. and 2. input), KEY
組み込みの集約 (sum, min, max, minBy, maxBy) Input Type [, KEY]

アプリケーションのトポロジー #

1つ以上の既存のオペレータのロジックの変更に加えて、アプリションのトポロジの変更、つまり オペレータの追加あるいは削除、オペレータの並行度の変更、あるいは挙動に繋がるオペレータの修正、によってアプリケーションをアップグレードすることができます。

トポロジを変更してアプリケーションをアップグレードする場合は、アプリケーションの状態の一貫性を保持するために、2,3のことを考慮する必要があります。

  • Adding or removing a stateless operator: This is no problem unless one of the cases below applies.
  • Adding a stateful operator: The state of the operator will be initialized with the default state unless it takes over the state of another operator.
  • Removing a stateful operator: The state of the removed operator is lost unless another operator takes it over. アップグレードされたアプリケーションを開始する時に、明示的に状態の破棄に同意する必要があります。
  • Changing of input and output types of operators: When adding a new operator before or behind an operator with internal state, you have to ensure that the input or output type of the stateful operator is not modified to preserve the data type of the internal operator state (see above for details).
  • Changing operator chaining: Operators can be chained together for improved performance. 1.3.x から取ったセーブポイントから回復する時に状態の一貫性を保持しながらチェーンを修正することが可能です。statefulのオペレータがチェーンから外れるといったような、チェーンを破壊することが可能です。チェーンに新しいあるいは既存のstatefulオペレータを追加あるいは挿入、またはチェーン内で順番を修正することも可能です。しかし、セーブポイントを 1.3.x へアップグレードする時は、チェーンに関してトポロジーが変化しないことが最高です。All operators that are part of a chain should be assigned an ID as described in the Matching Operator State section above.

テーブル API & SQL #

Due to the declarative nature of Table API & SQL programs, the underlying operator topology and state representation are mostly determined and optimized by the table planner.

Be aware that any change to both the query and the Flink version could lead to state incompatibility. Every new major-minor Flink version (e.g. 1.12 to 1.13) might introduce new optimizer rules or more specialized runtime operators that change the execution plan. However, the community tries to keep patch versions state-compatible (e.g. 1.13.1 to 1.13.2).

See the table state management section for more information.

Flinkフレームワークバージョンのアップグレード #

この章ではバージョンを超えてFlinkをアップグレードする一般的な方法を説明し、そのバージョン間でジョブを移動します。

一言でいうと、この方法は2つの基本的なステップからなります:

  1. 以前のバージョン、移設したいジョブのための古いFlinkのバージョン、でセーブポイントを取ります。
  2. 新しいFlinkバージョンの元で、以前に取ったセーブポイントからジョブを再開します。

Besides those two fundamental steps, some additional steps can be required that depend on the way you want to change the Flink version. In this guide we differentiate two approaches to upgrade across Flink versions: in-place upgrade and shadow copy upgrade.

For in-place update, after taking savepoints, you need to:

  1. 全ての実行中のジョブを 中断/取り消し します。
  2. 古いFlinkバージョンを実行するクラスタをシャットダウンします。
  3. クラスタ上で新しいバージョンへFlinkをアップグレードします。
  4. 新しいバージョンでのクラスタを再起動する。

For shadow copy, you need to:

  1. セーブポイントから再開する前に、古いFlinkのインストレーションに加えて新しいFlinkバージョンの新しいインストレーションをセットアップします。
  2. 新しいFlinkインストレーションを使ってセーブポイントから再開します。
  3. 全てがうまく動作すれば、古いFlinkクラスタを停止およびシャットダウンします。

In the following, we will first present the preconditions for successful job migration and then go into more detail about the steps that we outlined before.

前提条件 #

Before starting the migration, please check that the jobs you are trying to migrate are following the best practices for savepoints.

In particular, we advise you to check that explicit uids were set for operators in your job.

This is a soft precondition, and restore should still work in case you forgot about assigning uids. If you run into a case where this is not working, you can manually add the generated legacy vertex ids from previous Flink versions to your job using the setUidHash(String hash) call. For each operator (in operator chains: only the head operator) you must assign the 32 character hex string representing the hash that you can see in the web ui or logs for the operator.

Besides operator uids, there are currently two hard preconditions for job migration that will make migration fail:

  1. We do not support migration for state in RocksDB that was checkpointed using semi-asynchronous mode. In case your old job was using this mode, you can still change your job to use fully-asynchronous mode before taking the savepoint that is used as the basis for the migration.

  2. Another important precondition is that all the savepoint data must be accessible from the new installation under the same (absolute) path. This also includes access to any additional files that are referenced from inside the savepoint file (the output from state backend snapshots), including, but not limited to additional referenced savepoints from modifications with the State Processor API.

STEP 1: Stop the existing job with a savepoint #

The first major step in version migration is taking a savepoint and stopping your job running on the old Flink version.

以下のコマンドを使ってこれを行うことができます:

$ bin/flink stop [--savepointPath :savepointPath] :jobId

For more details, please read the savepoint documentation.

ステップ 2: クラスタを新しいFlinkバージョンに更新する。 #

このステップでは、クラスタのフレームワークのバージョンを更新します。What this basically means is replacing the content of the Flink installation with the new version. This step can depend on how you are running Flink in your cluster (e.g. standalone, …).

If you are unfamiliar with installing Flink in your cluster, please read the deployment and cluster setup documentation.

ステップ 3: 新しいFlinkバージョン下でセーブポイントからジョブを再開する。 #

ジョブの移設の最後のステップとして、更新されたクラスタ上で上で取られたセーブポイントから再開します。You can do this with the command:

$ bin/flink run -s :savepointPath [:runArgs]

For more details, please take a look at the savepoint documentation.

互換性の表 #

以下の表で示されるように、セーブポイントはFlinkのバージョンを超えて互換性があります:

Created with \ Resumed with 1.1.x 1.2.x 1.3.x 1.4.x 1.5.x 1.6.x 1.7.x 1.8.x 1.9.x 1.10.x 1.11.x 1.12.x 1.13.x 1.14.x 1.15.x 1.16.x 1.17.x 制限事項
1.1.x O O O The maximum parallelism of a job that was migrated from Flink 1.1.x to 1.2.x+ is currently fixed as the parallelism of the job. This means that the parallelism can not be increased after migration. この制限は将来のバグフィックスのリリースで取り除かれるかもしれません。
1.2.x O O O O O O O O O O O O O O O When migrating from Flink 1.2.x to Flink 1.3.x+, changing parallelism at the same time is not supported. Users have to first take a savepoint after migrating to Flink 1.3.x+, and then change parallelism.

Savepoints created for CEP applications cannot be restored in 1.4.x+.

Savepoints from Flink 1.2 that contain a Scala TraversableSerializer are not compatible with Flink 1.8 anymore because of an update in this serializer. You can get around this restriction by first upgrading to a version between Flink 1.3 and Flink 1.7 and then updating to Flink 1.8.
1.3.x O O O O O O O O O O O O O O セーブポイントがScalaのcaseクラスを含む場合は、Flink 1.3.0 から Flink 1.4.[0,1] の移行は失敗します。代わりにユーザは直接 1.4.2+ に移行する必要があります。
1.4.x O O O O O O O O O O O O O
1.5.x O O O O O O O O O O O O There is a known issue with resuming broadcast state created with 1.5.x in versions 1.6.x up to 1.6.2, and 1.7.0: FLINK-11087. Users upgrading to 1.6.x or 1.7.x series need to directly migrate to minor versions higher than 1.6.2 and 1.7.0, respectively.
1.6.x O O O O O O O O O O O
1.7.x O O O O O O O O O O
1.8.x O O O O O O O O O O
1.9.x O O O O O O O O O
1.10.x O O O O O O O O
1.11.x O O O O O O O
1.12.x O O O O O O
1.13.x O O O O O Don't upgrade from 1.12.x to 1.13.x with an unaligned checkpoint. Please use a savepoint for migrating.
1.14.x O O O O
1.15.x O O O For Table API: 1.15.0 and 1.15.1 generated non-deterministic UIDs for operators that make it difficult/impossible to restore state or upgrade to next patch version. A new table.exec.uid.generation config option (with correct default behavior) disables setting a UID for new pipelines from non-compiled plans. Existing pipelines can set table.exec.uid.generation=ALWAYS if the 1.15.0/1 behavior was acceptable due to a stable environment. See FLINK-28861 for more information.
1.16.x O O
1.17.x O

Back to top

inserted by FC2 system