アプリケーションとFlinkバージョンのアップグレード

Flinkのデータストリームプログラムは一般的に週、月あるいは年のような長期間実行するように設定されています。全ての長期間実行するサービスのように、Flinkのストリーミングアプリケーションは維持される必要があります。これはバグの修正、改善の実装、あるいはアプリケーションを最新のバージョンのFlinkクラスタに移設することも含まれます。

この文章はFlinkストリーミングアプリケーションを更新する方法と、実行中のストリーミングアプリケーションを異なるFlinkクラスタに移設する方法を説明します。

ストリーミング アプリケーションの再起動

ストリーミングアプリケーションの更新あるいはアプリケーションを異なるクラスタに移設する行動の方針は、Flinkの セーブポイント 機能に基づいています。セーブポイントはある時点のアプリケーションの状態の一貫性のあるスナップショットです。

実行中のストリーミングアプリケーションからセーブポイントを取る2つの方法があります。

  • セーブポイントの取得と処理の継続。 > ./bin/flink savepoint <jobID> [pathToSavepoint] 以前の時間の1点からアプリケーションを再起動できるように、定期的にセーブポイントを取ることをお勧めします。

  • 1つのアクションとして、セーブポイントの取得とアプリケーションを停止します。 > ./bin/flink cancel -s [pathToSavepoint] <jobID> セーブポイントが完了した後ですぐにアプリケーションが取り消されることを意味します。つまり、セーブポイントの後では他のチェックポイントは取られません。

アプリケーションからセーブポイントが取られると、同じあるいは互換性のあるアプリケーション(以下のApplication State Compatibilityの章を見てください) がセーブポイントから開始することができます。セーブポイントからアプリケーションを開始することは、セーブポイント内で維持されているオペレータの状態で初期化されることを意味します。これはセーブポイントを使ってアプリケーションを開始することで行われます。 > ./bin/flink run -d -s [pathToSavepoint] ~/application.jar

開始されたアプリケーションのオペレータはセーブポイントが取られた時点の元のアプリケーション(つまり、セーブポイントが取られたアプリケーション)のオペレータの状態で初期化されます。開始されたアプリケーションは正しくこの場所から処理を続けます。

注意: Flinkはアプリケーションの状態を一貫性を持って回復しますが、外部システムへの書き込みを戻すことはできません。このことは、アプリケーションを止めずに取られたセーブポイントから再開する場合に問題になるかもしれません。この場合、アプリケーションは恐らくセーブポイントが取られた後のデータを発行します。再開されたアプリケーションは(アプリケーションのロジックを変更したかどうかに依存して)同じデータを再び発行するかもしれません。この挙動の正確な影響はSinkFunction とストレージシステムに依存して異なるかもしれません。Cassandraのようなキー-値ストアへの等冪な書き込みの場合には2回発行されたデータはOKかもしれませんが、Kafkaのような長く続くログに追加する場合は問題になるかもしれません。どの場合でも、再起動したアプリケーションの挙動を注意深く調べてテストすべきです。

アプリケーションの状態の互換性

バグを修正するため、あるいはあプ位ケーションを改善するためにアップグレードする場合は、通常は状態を保持しながら実行中のアプリケーションのアプリケーション ロジックを置き換えることが目的です。アップグレードされたアプリケーションを元のアプリケーションで取られたセーブポイントから開始することで、これを行います。しかし、これは両方のアプリケーションが状態に互換性がある場合にのみ動作します。つまり、アップグレードされたアプリケーションのオペレータは元のアプリケーションのオペレータの状態を使ってそれらの状態を更新できることを意味します。

この章では、状態の互換性を維持するためにどうやってアプリケーションを修正することができるかを議論します。

マッチング オペレータの状態

アプリケーションがセーブポイントから再起動される時に、Flinkは開始されたアプリケーションのステートフルなオペレータにセーブポイント内のオペレータの状態を合致させます。合致はオペレータのIDに基づいて行われます。これもセーブポイント内に保存されています。各オペレータは、アプリケーションのオペレータのトポロジ内のオペレータの位置から派生されたデフォルトのIDを持ちます。従って、修正されていないアプリケーションは常に独自のセーブポイントのうちの一つから再起動することができます。しかし、オペレータのデフォルトのIDはアプリケーションが修正された場合に変更されがちです。従って、もしオペレータのIDが明示的に指定された場合は、修正されたアプリケーションはセーブポイントからのみ起動することができます。オペレータへのIDの割り当てはとても簡単で以下のようにuid(String) メソッドを使って行われます:

val mappedEvents: DataStream[(Int, Long)] = events
  .map(new MyStatefulMapFunc()).uid(“mapper-1”)

注意: セーブポイント内に格納されたオペレータIDと、開始するアプリケーション内のオペレータのIDは、等しくなければならないため、将来アップグレードされるかもしれないアプリケーションの全てのオペレータにユニークなIDを割り当てることを強くお勧めします。幾つかのオペレータはユーザに見えない内部的な状態を持つため、この忠告は全てのオペレータ、つまり明示的に宣言されたオペレータの状態を持つ、持たないオペレータに対して適用されます。割り当てられたオペレータIDを持たないアプリケーションのアップグレードはかなり難しく、setUidHash() メソッドを使う低レベルな回避策を使ってのみ可能かもしれません。

デフォルトでは、セーブポイントに格納された全ての状態は開始するアプリケーションのオペレータに一致しなければなりません。しかし、アプリケーションをセーブポイントから開始する時に、ユーザはオペレータに合致することができない状態を明示的にスキップ(そして結果的に破棄)することに同意することができます。ステートフルなオペレータに関しては、セーブポイント内で見つかった状態の無いオペレータをデフォルトの状態で初期化します。

ステートフル オペレータとユーザ関数

アプリケーションをアップグレードする時に、ユーザ関数とオペレータは1つの制限付きで自由に修正することができます。オペレータの状態のデータ型を変更することはできません。セーブポイントからの状態は(現在のところ)オペレータにロードされる前に異なるデータ型に変換することができないため、これは重要です。従って、アプリケーションのアップグレード時にオペレータの状態のデータ型を変更をすると、アプリケーションの状態の一貫性を壊し、アップグレードされたアプリケーションのセーブポイントからの再起動を妨げます。

オペレータの状態はユーザ定義あるいは内部的なものかもしれません。

  • ユーザ定義のオペレータの状態: ユーザ定義のオペレータの状態を持つ関数内で、ユーザによって状態の型は明示的に定義されます:オペレータの状態のデータ型は変更することができませんが、この制限を克服するための次善策は異なるデータ型を持つ2つ目の状態を定義し、元の状態から新しい状態へ状態を移行するためのロジックを実装することです。このやり方は良い移行ストラテジとキーでパーティション化された状態の挙動のしっかりとした理解を必要とします。

  • 内部的なオペレータの状態: ウィンドウあるいはjoinオペレータのようなオペレータはユーザに公開されない内部的なオペレータの状態を持ちます。これらのオペレータのために、内部的な状態のデータ型はオペレータの入力型あるいは出力型に依存します。その結果、入力あるいは出力のそれぞれの型の変更はアプリケーションの状態の一貫性を壊しアップグレードを妨げます。以下の表は内部状態を持つオペレータをリスト化し、状態が入力と出力の型にどう関係するかを示します。キー付けされたストリームに適用されるオペレータについては、キーの型(KEY)も常に状態のデータ型の一部です。

オペレータ 内部的なオペレータの状態のデータ型
ReduceFunction[IOT] IOT (Input and output type) [, KEY]
FoldFunction[IT, OT] OT (Output type) [, KEY]
WindowFunction[IT, OT, KEY, WINDOW] IT (Input type), KEY
AllWindowFunction[IT, OT, WINDOW] IT (Input type)
JoinFunction[IT1, IT2, OT] IT1, IT2 (Type of 1. and 2. input), KEY
CoGroupFunction[IT1, IT2, OT] IT1, IT2 (Type of 1. and 2. input), KEY
Built-in Aggregations (sum, min, max, minBy, maxBy) Input Type [, KEY]

アプリケーションのトポロジー

1つ以上の既存のオペレータのロジックの変更に加えて、アプリションのトポロジの変更、つまり オペレータの追加あるいは削除、オペレータの並行度の変更、あるいは挙動に繋がるオペレータの修正、によってアプリケーションをアップグレードすることができます。

トポロジを変更してアプリケーションをアップグレードする場合は、アプリケーションの状態の一貫性を保持するために、2,3のことを考慮する必要があります。

  • ステートレス オペレータの追加と削除: 以下が適用される場合ではない場合は問題ではありません。
  • ステートフル オペレータの追加: 他の状態を引き継がない場合は、オペレータの状態はデフォルトの状態で初期化されるでしょう。
  • ステートフル オペレータの削除: 他のオペレータが引き継がない場合は、削除されたオペレータの状態は失われます。アップグレードされたアプリケーションを開始する時に、明示的に状態の破棄に同意する必要があります。
  • オペレータの入力と出力の型を変更: 内部状態を持つオペレータの前あるいは後に新しいオペレータを追加する時には、内部的なオペレータの状態のデータ型を保持するためにステートフルなオペレータの入力あるいは出力の型が変更されないことを保証する必要があります (詳細は上を見てください)。
  • オペレータの繋がりの変更: パフォーマンスの改善のためにオペレータを繋げることができます。しかし、繋がりの最初ではないステートフル オペレータが繋がりに含まれる場合、繋がりはアプリケーションのアップグレードの可能性を制限するかもしれません。そのような場合、繋がりからステートフルなオペレータが削除されたような繋がりを壊すことはできません。既存のステートフルなオペレータを繋がりに追加あるいは挿入することもできません。繋がりの挙動を、繋げられたオペレータの並行度を変更、あるいは明示的なオペレータの繋がりの指示を追加あるいは削除することで、変更することができます。

この章では、バージョン 1.1.x から 1.2.x へのFlinkフレームワークのアップグレード、および2つのバージョン間でジョブを移行する一般的な方法を説明します。

一言でいうと、この方法は2つの基本的なステップからなります:

  1. 移設したいジョブのために Flink 1.1.x内でセーブポイントを取ります。
  2. 前に取ったセーブポイントから Flink 1.2.x でジョブを再開します。

それらの2つの基本的なステップに加えて、Flinkのバージョンを変更したい方法に応じて、いくつかの追加のステップが必要です。このガイドでは、Flink 1.1.x から 1.2.x へのアップグレードする2つの異なる有り方を区別します: in-place アップグレードと shadow copy アップグレード。

in-place アップデートのためには、セーブポイントを取った後で、以下が必要です:

  1. Stop/cancel all running jobs.
  2. Flink 1.1.x を実行するクラスタをシャットダウンする。
  3. クラスタ上で、Flink を 1.2.x にアップグレードする
  4. 新しいバージョンでのクラスタを再起動する。

shadow copyのためには、以下が必要です:

  1. セーブポイントから再開する前に、古いFlink 1.1.x インストレーションに加えて、Flink 1.2.x の新しいインストレーションをセットアップします。
  2. 新しい Flink 1.2.x インストレーションを使ってセーブポイントから再開します。
  3. 全てがうまく実行されれば、古いFlink 1.1.x クラスタを停止しシャットダウンします。

以下では、ジョブの移設の成功のための事前条件を最初に示し、その後以前に概要を説明したステップについての詳細に入ります。

前提条件

移設の前に、移設しようとしているジョブがセーブポイントのベスト プラクティスに従っていることを確認してください。特に、ジョブ内のオペレータのために明示的なuidが設定されていることを確認することをお勧めします。

これは soft事前条件で、uidの割り当てを忘れた場合にも再開は動作する筈です。これが動作しないような状況に遭遇した場合は、Flink 1.1 からsetUidHash(String hash)呼び出しを使っているジョブまで、手動で従来のvertex idを追加することができます。それぞれのオペレータ (オペレータの繋がりの中: 最初のオペレータ)のために、web uiあるいはログ内でオペレータについて見ることができるハッシュを表す32文字の16進文字列を割り当てる必要があります。

オペレータのuidに加えて、移設を失敗させるようなジョブの移設について現在のところ3つhard 事前条件があります:

  1. 前のリリースノートで述べられたように、semi-asynchronousモードを使ってチェックポイントを取られたRocksDB内の状態についての移設をサポートしません。古いジョブがこのモードを使っている場合は、移設の基本として使われるセーブポイントを取る前に、fully-asynchronousモードを使うようにジョブを変更することができます。

  2. CEP オペレータは現在のところ移設をサポートしません。ジョブがこのオペレータを使う場合は、(現在のところ)それを移設することはできません。将来のバグフィックス リリースで、CEPオペレータの移設のサポートを提供することを計画しています。

  3. 他の重要な事前条件として、全てのセーブポイントのデータが新しいインストレーションからアクセス可能であり、同じ絶対パスに存在することがあります。セーブポイントのデータは、一般的にはちょうど作成されたセーブポイントファイルの中には自身が含まれていないことに注意してください。セーブポイントのファイル(例えば、状態のバックエンドのスナップショットからの出力)の中から追加のファイルを参照することができます!現在のところセーブポイントに所属する全てのデータを識別および移動する簡単な方法はありません。

ジョブの移設の最初の大きなステップは、Flink 1.1.x 内で動いているジョブのセーブポイントを取ることです。以下のコマンドを使ってこれを行うことができます:

$ bin/flink savepoint :jobId [:targetDirectory]

詳細は、セーブポイントのドキュメントを読んでください。

このステップでは、クラスタのフレームワークのバージョンを更新します。基本的にこれは新しいバージョンを使ってFlinkインストレーションの内容を置き換えることを意味します。このステップはクラスタ内でFlinkをどのように実行しているかに依存するかもしれません (例えば、スタンドアローン、Mesos、…)。

クラスタ内のFlinkのインストールについて詳しくない場合は、配備とクラスタのセットアップ ドキュメントを読んでください。

ジョブの移設の最後のステップとして、更新されたクラスタ上で上で取られたセーブポイントから再開します。以下のコマンドを使ってこれを行うことができます:

$ bin/flink run -s :savepointPath [:runArgs]

この場合も、詳細についてはセーブポイントのドキュメントを見てください。

互換性の表

以下の表で示されるように、セーブポイントはFlinkのバージョンを超えて互換性があります:

Created with \ Resumed with 1.1.x 1.2.x
1.1.x X X
1.2.x   X
  • Flink 1.1.x から 1.2.x に移設されたジョブの最大の並行度は、現在のところジョブの並行度として固定されています。このことは、移行の後で並行度を増やすことができないことを意味します。この制限は将来のバグフィックスのリリースで取り除かれるかもしれません。
TOP
inserted by FC2 system