クラスローディングのデバッグ

Flinkアプリケーションを実行する場合、長い間JVMは様々なクラスをロードするでしょう。これらのクラスは2つの範囲に分類することができます:

  • Flink フレームワーク ドメイン: これはFlinkディレクトリ内の/lib ディレクトリ内の全てのコードを含みます。デフォルトではこれらのApache Flinkとそのコアの依存のクラスです。

  • ユーザ コード ドメイン: これらはCLIあるいはwebインタフェースを使ってサブミットされたJARファイル内に含まれる全てのクラスです。ジョブのクラスおよびJARに入れられた全てのライブラリとコネクタを含みます。

クラスのロードは様々なFlinkセットアップと少し異なる挙動をします:

スタンドアローン

Flinkクラスタを開始する時に、ジョブマネージャとタスクマネージャはクラスパス内のFlinkフレームワーククラスと一緒に開始されます。クラスタに対してサブミットされた全てのジョブからのクラスは動的にロードされます。

YARN

YARN のクラス ローディングは1つのジョブの配備とセッションの間で異なります:

  • Flinkのジョブを(bin/flink run -m yarn-cluster ...を使って)直接YARNにサブミットする時、専用のタスクマネージャとジョブマネージャがそのジョブのために開始されます。これらのJVMはクラスパスの中にFlinkのフレームワークのクラスとユーザのコードのクラスの両方を持ちます。つまり、この場合動的なクラスローディング無しです。

  • YARNセッションを開始する時に、ジョブマネージャとタスクマネージャはクラスパス内のFlinkフレームワーククラスと一緒に開始されます。セッションに対してサブミットされた全てのジョブからのクラスは動的にロードされます。

Mesos

このドキュメントに従ったMesosのセットアップは現在のところYARNセッションにとても良く似た挙動をします: タスクマネージャとジョブマネージャはクラスパス内のFlinkのフレームワーククラスと一緒に開始され、ジョブクラスはジョブがサブミットされた時に動的にロードされます。

クラスロードの解決順の設定

Flinkはユーザコードのjar(s)からクラスをロードするためにクラスローダーの階層構造を使用します。ユーザコードのクラスローダは親のクラスローダへの参照をもいます。これはほとんどの場合でデフォルトのJavaクラスローダです。デフォルトでは、Javaクラスローダは最初に親のクラスローダ内でクラスを探し、それからクラスローダの階層構造を持つ場合の子のクラスローダ内で探すでしょう。これはユーザのjar内にFlinkに同梱されるバージョンと衝突するライブラリのバージョンがある場合に問題になります。Flink設定内でclassloader.resolve-order: child-firstを使ってクラスローダの決定順を設定することでこの挙動を変更することができます。しかし、Flinkのクラスは同様にclassloader.parent-first-patternsを使って設定するkとおができますが、まず最初に親のクラスローダを使って決定されるでしょう (configを見てください)

動的クラスロードの回避

全てのコンポーネント (ジョブマネージャ、タスクマネージャ、クライアント、アプリケーションマスタ、…) は起動時にクラスパスの設定を記録します。ログの開始時の環境情報の一部として見つけることができます。

Flinkのジョブマネージャとタスクマネージャが1つの特定のジョブだけに限定されているセットアップを実行する場合、JARファイルがクラスパスの一部で動的にロードされないように/lib フォルダの中に直接置くことができます。

それは通常ジョブのJARファイルを /lib ディレクトリに配置するように動作します。JAR はクラスパス(AppClassLoader) と動的なクラスローダ(FlinkUserCodeClassLoader)の両方の一部です。AppClassLoader は FlinkUserCodeClassLoaderの親(そしてJavaはデフォルトで親から先にロードする)なので、これはクラスが一度だけロードされることになる筈です。

ジョブのJARファイルが/lib フォルダに配置できないセットアップについては (例えば、セットアップが複数のジョブによって使われるセッション)、共通のライブラリを/libフォルダに配置し、動的なクラスのロードを避けることができます。

ジョブ内での手動のクラスロード

幾つかの場合において、変換関数、ソースあるいはシンクはクラスを手動で(reflectionを使って動的に)ロードする必要があります。そうするには、ジョブのクラスにアクセスするクラスローダが必要です。

In that case, the functions (or sources or sinks) can be made a RichFunction (for example RichMapFunction or RichWindowFunction) and access the user code class loader via getRuntimeContext().getUserCodeClassLoader().

X は X 例外にキャストすることができません

com.foo.X cannot be cast to com.foo.Xの形式の例外を見る場合、それはクラスcom.foo.X の複数のバージョンが異なるクラスローダによってロードされ、そのクラスの型がそれぞれに割り当てられようとしたことを意味します。

ほとんどの場合の理由は、以前の実行の試行からロードされたcom.foo.Xクラスのオブジェクトがまだどこかにキャッシュされていて、コードをリロードした再起動されたタスク/オペレータによって取り上げられたからです。これも動的なクラスのロードを使用する配備でのみ起こることに注意してください。

キャッシュされたオブジェクトのインスタンスの一般的な原因:

  • Apache Avroを使う時: SpecificDatumReaderはレコードのインスタンスをキャッシュします。SpecificData.INSTANCEを使って避けてください。このドキュメントも見てください。

  • (Apache Avroのような)オブジェクトをクローンするためのシリアライズ化フレームワークの使用

  • オブジェクトを拘禁します(例えば GuavaのInterners)

動的にロードされたクラスのアンロード

また、動的なクラスのロードを引き起こす全てのシナリオ(つまり、スタンドアローン、セッション、mesos…) はアンロードされたクラスに依存します。Class unloading means that the Garbage Collector finds that no objects from a class exist and more, and thus removes the class (the code, static variable, metadata, etc).

タスクマネージャがタスクを開始(あるいは再開)する時はいつでも特定のタスクのコードをロードするでしょう。クラスがアンロードすることができない場合は、クラスの新しいバージョンがロードされ、ロードされたクラスの総数が時間超過を蓄積するため、これはメモリ リークになるでしょう。これは一般的にOutOfMemoryError: Metaspaceを通じて宣言します。

クラスのリークの一般的な原因と推奨される修正:

  • Lingering Threads: アプリケーションの関数/ソース/シンクが全てのスレッドをシャットダウンするようにします。Lingering threads cost resources themselves and additionally typically hold references to (user code) objects, preventing garbage collection and unloading of the classes.

  • Interners: 関数/ソース/シンクの生存時間を超えて生存する特別な構造内のオブジェクトをキャッシュすることを避けます。例としては、シリアライザの中での、Guava’s interners、あるいは Avro’s クラス/オブジェクト キャッシュです。

Apache Flink はデフォルトで多くのクラスをクラスパスにロードします。Flinkが使っているライブラリと異なるバージョンをユーザが使う場合、しばしば結果はIllegalAccessExceptions あるいは NoSuchMethodErrorとなります。

Hadoopによって、例えばFlinkはaws-sdk ライブラリ、あるいはprotobuf-javaに依存します。ユーザコードがこれらのライブラリを使っていて、問題に遭遇した場合、ユーザのコードのjar内の依存を移動することをお勧めします。

Apache Maven はmaven-shade-pluginを提供します。これによりコンパイルの後でクラスのパッケージを変更することができます(つまり、書いているコードはシェードによって影響を受けません)。例えば、ユーザコードのjar内でaws sdkからcom.amazonaws パッケージを持つ場合、コードがaws skdバージョンを呼び出すように、シェードプラグインはそれらをorg.myorg.shaded.com.amazonaws パッケージ内に移動するでしょう。

この文章のページはシェード プラグインを使ってクラスを移動を説明します。

guavaのようなFlinkの幾つかの依存は、Flinkの維持者によって取り除かれ、ユーザは通常それを心配する必要はありません。

上に戻る

TOP
inserted by FC2 system