よくある質問 (FAQ)

以下の質問は一般的なFlinkプロジェクトについてよく聞かれる質問です。さらに質問がある場合は、ドキュメントに質問するか、あるいは コミュニティに質問してください。

概要

Flink はデータ処理システムで、もう一つのHadoopの MapReduce コンポーネントです。MapReduce上でビルドされるというより、独自のランタンムが付属しています。そのように、Hadoopエコシステムとは完全に独立して動作することができます。しかしながら、Flinkはデータを読み書きするためにHadoopの分散ファイルシステム(HDFS)にアクセスすることもでき、クラスタリソースを供給するためにHadoopの次世代のリソースマネージャー(YARNを)にアクセスすることができます。ほとんどのFlinkユーザはデータを格納するためにHadoop HDFSを使っているため、すでにFlinkはHDFSにアクセスするために必要なライブラリを積み込んでいます。

No. Flink はHadoopのインストレーション無しに実行することができます。しかし、Hadoop分散ファイルシステム(HDFS)内に格納されているデータを解析するためにFlinkを使うためのとても一般的 なセットアップです。それらのセットアップをそのままで動作するようにするために、FlinkはデフォルトでHadoopクライアントライブラリをバンドルします。

さらに、既存のHadoop YARNクラスタを使うユーザのための特別なFlinkのYARN組み込みダウンロードを提供します。Apache Hadoop YARN is Hadoop’s cluster resource manager that allows to use different execution engines next to each other on a cluster.

使い方

Flinkプログラムの進捗を追跡する多くの方法があります:

  • ジョブマネージャー(分散システムのマスター)がプログラムの実行を監視するためにwebインタフェースを開始します。デフォルトではポート8081で実行します(conf/flink-config.ymlの中で設定されます)。
  • コマンドラインからプログラムを開始する場合は、操作を通じたプログラムの進捗としてすべての操作のステータスの変更を出力するでしょう。
  • 全てのステータスの変更はジョブマネージャーのログファイルにも記録されます。

なぜプログラムが失敗したかをどうやって明らかにしますか?

  • ジョブマネージャーのwebフロントエンド(デフォルトではポート8081)は、失敗したタスクの例外を表示します。
  • コマンドラインからプログラムを実行する場合は、タスク例外は標準エラーストリームに出力され、コンソールに表示されます。
  • コマンドラインとwebインタフェースを使ってどの並行タスクが失敗し他のタスクの実行を中止したかを調べることができます。
  • 失敗したタスクと応答の例外がマスターと例外が発生したワーカーのログファイルに報告されます (log/flink-<user>-jobmanager-<host>.loglog/flink-<user>-taskmanager-<host>.log)。
  • LocalExecutorを使ってローカルでプログラムを開始した場合、関数の中にブレークポイントを置くことができ、通常の Java/Scala プログラムのようにデバッグすることができます。
  • Accumulators は並行実行の挙動を追跡するのにとても役立ちます。そして、プログラムの操作内で情報を集めることができ、プログラムの実行の後でそれらを表示することができます。

並行化とは何ですか?それをどうやって設定しますか?

Flink プログラムの中で、並行化は操作がどのようにタスクスロットに割り当てられた個々のタスク内に分割するかを決めます。クラスタ内の各ノードは少なくとも一つのタスクスロットを持ちます。タスクスロットの総数は全てのマシーン上のタスクスロットの数です。並行化がNに設定された場合、Flinkは操作を利用可能なタスクスロットを使って同時に計算可能なN個の並行タスクに分割しようとします。タスクスロットの数は全てのタスクが同時にタスクスロット内で計算されるように並行化と一致しなければなりません。

注意: 全ての操作が複数のタスクに分割することができるわけではありません。例えば、グルーピング無しのGroupReduceは、reduce操作を実施するためにグループ全体が確実に1つのノードになければならないので、並行度1で実施されなければなりません。 Flink は並行度が1でなければならないかどうかを決定し、それに応じて設定します。

並行度はFlinkプログラムの実行上でfine-grained制御を保証するために多くの方法で設定することができます。並行度を設定する方法の詳細な説明は 設定ガイドを見てください。また、処理スロットと並行度がお互いにどう関係しているかを説明しているこの図もチェックしてください。

エラー

なぜ"NonSerializableException"となるのですか?

Flinkの全ての機能はjava.io.Serializableによって定義されたように並列化されなければなりません。全ての関数のインタフェースが並行化されるので、例外は関数の中で使われるフィールドの一つが並行化されていないことを意味します。

特に、関数が内部クラスあるいは匿名内部クラスの場合、それは覆っているクラスへの隠された参照を含んでいます(デバッガ内で関数を見ると、通常はthis$0で呼ばれます)。覆っているクラスが並行化されていない場合は、これはおそらくソースのエラーです。解決方法は、

  • 関数を(覆っているクラスにそれ以上参照を持たない)スタンドアローンのクラスあるいは静的な内部クラスにします。
  • 覆っているクラスを並行化します。
  • Java8のlambda関数を使います。

Scala APIで明示的な値と明白なパラメータについてのエラーがでます。

それはタイプ情報のための明示的な値が提供されていないかもしれないことを意味します。コード内にimport org.apache.flink.api.scala._ 文を持つようにします。

ジェネリックパラメータを取る関数あるいはクラス内でflinkのオペレーションを使っている場合は、そのパラメータのためにTypeInformation が利用可能でなければなりません。これはコンテキストバウンドを使うことで達成されます:

def myFunction[T: TypeInformation](input: DataSet[T]): DataSet[Seq[T]] = {
  input.reduceGroup( i => i.toSeq )
}

Flinkがどうやってタイプを扱うかについての深い議論は、Type Extraction and Serialization を見てください。

十分なバッファが利用できないというエラーメッセージが出ます。どうやってこれを直せますか?

超並列の設定(100+並行スレッド)でFlinkを実行する場合、設定パラメータ taskmanager.network.numberOfBuffersを使って大量のネットワークバッファを適用する必要があります。大まかには、バッファの数は少なくとも 4 * numberOfTaskManagers * numberOfSlotsPerTaskManager^2でなければなりません。詳細はConfiguration Reference を見てください。

私のジョブが早い時間にjava.io.EOFExceptionで失敗します。原因は何でしょうか?

これらの例外の最も一般的なのは、Flinkが間違ったHDFSバージョンでセットアップされている場合です。異なるHDFSバージョンはしばしばお互いに互換性がなく、マスターとクライアントのファイルシステム間の接続を破壊します。

Call to <host:port> failed on local exception: java.io.EOFException
    at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
    at org.apache.hadoop.ipc.Client.call(Client.java:743)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
    at $Proxy0.getProtocolVersion(Unknown Source)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
    at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
    at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:276

異なるHadoopとHDFSバージョンのためにFlinkをセットアップする詳細な方法は、ダウンロードページビルドの説明 を参照してください。

私のジョブがHDFS/Hadoopコードからの様々な例外で失敗します。何ができるでしょうか?

Flink はデフォルトでHadoop 2.2.バイナリと一緒に提供されます。これらのバイナリはHDFSあるいはYARNに接続するために使われます。(特に高負荷の時に)HDFSに書き込む時に例外を起こすHDFSのクライアントのバグがあるようです。共通する例外は以下の通りです:

  • HDFS client trying to connect to the standby Namenode "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby"
  • java.io.IOException: Bad response ERROR for block BP-1335380477-172.22.5.37-1424696786673:blk_1107843111_34301064 from datanode 172.22.5.81:50010 at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:732)

  • Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0 at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:478) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:6039) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:6002)

これらのいずれかを経験した場合は、ローカルのHDFSのバージョンと一致するHadoopのバージョンを使ってビルドされたFlinkを使うことをお勧めします。正確なHadoopバージョンに対してFlinkを手動でビルドすることもできます(例えば、カスタムパッチレベルを使ったHadoop配布物を使う場合)。

EclipseでScalaプロジェクトのcompilationエラーがでます。

FlinkはEclipseのScalaプラグインにまだ適切に統合されていないScalaコンパイラの新しい機能("quasiquotes"と呼ばれます)を使います。この機能をEclipseで利用可能にするには、手動でflink-scala プロジェクトをcompiler pluginを使うように設定する必要があります:

  • flink-scalaの上で右クリックし、"Properties"を選択します
  • "Scala Compiler"を選択し、"Advanced"タブをクリックします(それが無い場合は、おそらくScalaのためのEclipseを適切に設定していないでしょう。)
  • "Use Project Settings"ボックスをチェックします
  • "Xplugin"フィールドの中に、"/home//.m2/repository/org/scalamacros/paradise_2.10.4/2.0.1/paradise_2.10.4-2.0.1.jar"パスを設定します
  • 注意: プラグインがダウンロード可能になるように、まずコマンドライン上でMavenを使ったFlinkをビルドする必要があります。

私のプログラムが正しい結果を計算しません。Why are my custom key types

are not grouped/joined correctly?

キーはメソッドjava.lang.Object#hashCode(), java.lang.Object#equals(Object o)java.util.Comparable#compareTo(...)を正しく実装しなければなりません。これらのメソッドは通常不適切なデフォルトの実装を使って常に支援されています。従って、全てのキーは hashCode()equals(Object o)を上書きしなければなりません。

データタイプについてjava.lang.InstantiationException が出ます。何が間違っていますか?

全てのデータタイプクラスはpublicでpublicなnullコンストラクタ(引数無しのコンストラクタ)を持たなければなりません。さらに、クラスはabstractあるいはinterfaceであってはなりません。クラスが内部クラスの場合、それらはpubilcかつstaticでなければなりません。

シャットダウンはいくつかのクリーンアップ作業を行うかも知れないので、プロセスの停止は時に数秒かかります。

いくつかのエラーの場合において、JobManagerあるいはTaskManagerが提供されたstopスクリプト(bin/stop-local.sh あるいは bin/stop- cluster.sh)では停止されない事がありえます。Linux/Macでは以下のようにしてそれらのプロセスをkillすることができます:

  • JobManager / TaskManager プロセスのプロセスid (pid)を決定します。(OpenJDKをインストールしている場合は)Linux上で jps コマンドを使うか、全てのJavaプロセスを見つけるために ps -ef | grep java コマンドを使うことができます。
  • kill -9 <pid>を使ってプロセスをkillします。ここで pid は影響を受けた JobManager あるいは TaskManager プロセスのプロセスidです。

Windows上では、TaskManager はすべてのプロセスの表を表示し、そのエントリを右クリックしてプロセスを破壊することができます。

JobManager とTaskManager サービスの両方とも、それぞれのログファイルに(SIGKILL と SIGTERMのような)シグナルを書き込むでしょう。これは停止の動きをする問題のデバッグに有用です。

OutOfMemoryException が出ました。何ができるでしょうか?

これらの例外は通常プログラム内の関数が例えばリストあるいはマップ内に大量のオブジェクトを集めるために多くのメモリを消費する場合に起こります。Javaでの OutOfMemoryExceptionsはちょっとトリッキーなものです。その例外はほとんどのメモリを割り当てられているコンポーネントによって必ずしも投げられるわけではなく、提供されない最後のメモリのビットをリクエストしようとしたコンポーネントによって投げられます。

これを動作させるには二つの方法があります:

  1. 関数内でメモリを使うのを少なくできないかを調べる。例えば、オブジェクトタイプの代わりにプリミティブなタイプの配列を使う。

  2. Flinkが地震の処理のために保持しているメモリを減らす。TaskManagerはソート、ハッシュ、キャッシュ、ネットワークバッファなどのために利用可能なメモリをいくらか取り分けています。その部分の目織はユーザ定義の関数では利用できません。By reserving it, the system can guarantee to not run out of memory on large inputs, but to plan with the available memory and destage operations to disk, if necessary. デフォルトでシステムはメモリの約70%を保持します。ユーザ定義関数内でもっとメモリを必要とするアプリケーションを頻繁に実行する場合は、taskmanager.memory.fraction あるいは taskmanager.memory.size設定エントリを使ってその値を減らすことができます。. 詳細は設定リファレンス を見てください。これはJVMヒープへメモリをもっと残しますが、もっと頻繁にディスクにデータ処理タスクを持っていくでしょう。

OutOfMemoryExceptions の他の理由には、間違った状態のバックエンドの使用です。デフォルトでは、Flinkはストリーミングジョブの中でオペレーターのための状態のヒープに基づいた状態バックエンドを使っています。RocksDBStateBackendを使って利用可能なヒープ空間より大きなサイズの状態を持つことができます。

TaskManager ログファイルが大きくなるのはなぜですが?

ジョブのログの挙動を調べる。Emitting logging per or tuple may be helpful to debug jobs in small setups with tiny data sets, it becomes very inefficient and disk space consuming if used for large input data.

私のタスクマネージャーに割り当てられたスロットが解放されました。どうすべきですか?

java.lang.Exception: タスクが実行されたスロットが解放されました。おそらく TaskManagerの喪失は通常大きなガベージコレクションのストールがある場合に起こります。この場合、素早い修正はG1ガベージコレクターを使うことです。それは徐々に動作し、停止の頻度が少なくなります。さらに、もっと多くのメモリをユーザコードに取っておくことができます(例えばシステムあたり0.4およびユーザあたり0.6)。

これらのやり方の両方が失敗し、エラーが続く場合は、AKKA_WATCH_HEARTBEAT_PAUSE (akka.watch.heartbeat.pause)をもっと大きな値(例えば、 600s)に設定することで単純にTaskManagerのハートビートの中止を増加します。これは、タスクマネージャーが喪失したと見なす前にさらに長い間隔でジョブマネージャーがハートビートを待つようにするでしょう。

YARN の配備

YARNのセッションは数秒しか実行されません。

./bin/yarn-session.sh スクリプトはYARNセッションが開いている間実行することを意図しています。しかし、いくつかのエラーの場合、そのスクリプトは突然実行を停止します。出力は以下のように見えます:

07:34:27,004 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host
Flink JobManager is now running on worker1:6123
JobManager Web Interface: http://jobtracker-host:54311/proxy/application_1295604279745_273123/
07:34:51,528 INFO  org.apache.flinkyarn.Client                                   - Application application_1295604279745_273123 finished with state FINISHED at 1398152089553
07:34:51,529 INFO  org.apache.flinkyarn.Client                                   - Killing the Flink-YARN application.
07:34:51,529 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Killing application application_1295604279745_273123
07:34:51,534 INFO  org.apache.flinkyarn.Client                                   - Deleting files in hdfs://user/marcus/.flink/application_1295604279745_273123
07:34:51,559 INFO  org.apache.flinkyarn.Client                                   - YARN Client is shutting down

ここでの問題は、Application Master (AM)が停止しYARNクライアントがアプリケーションが終了したと見なしていることです。

この挙動には3つの理由が考えられます。

  • ApplicationMaster が例外で終了した。エラーをデバッグするには、コンテナーのログファイルを見てください。yarn-site.xml ファイルはその設定パスを含んでいます。パスのキーは yarn.nodemanager.log-dirsで、デフォルトの値は ${yarn.log.dir}/userlogsです。

  • YARN hがApplicationMasterを実行しているコンテナーをkillした。AMがあまりに多くのメモリを使用したか、ほかのリソースがYARNの限界を超えたかの場合に起こります。この場合、ホスト上のnodemanagerのログにエラーメッセージを見つけるでしょう。

  • オペレーティングシステムはAMのJVMをシャットダウンします。これはもしYARNの設定が間違っていて、物理的に利用可能な量より多くのメモリを設定している場合に起こるかも知れません。これが起きたかどうかを見るために、AMが実行されていたマシーン上でdmesgを実行します。LinuxのOOM killerのメッセージを見るでしょう。

YARNのセッションが起動時にHDFSのパーミッション例外でクラッシュします。

YARNセッションを開始する間、以下のような例外を受け取ります:

Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:234)
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:214)
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:158)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5193)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5175)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5149)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2090)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
  at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
  at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
  at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:396)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)

  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
  at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
  at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
  at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1393)
  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1382)
  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1307)
  at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:384)
  at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:380)
  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:380)
  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:324)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
  at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365)
  at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
  at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2021)
  at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1989)
  at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1954)
  at org.apache.flinkyarn.Utils.setupLocalResource(Utils.java:176)
  at org.apache.flinkyarn.Client.run(Client.java:362)
  at org.apache.flinkyarn.Client.main(Client.java:568)

このエラーの理由は、HDFS内のユーザのホームディレクトリが間違ったパーミッションを持っているからです。ユーザ (この場合 robert) は、自身のホームディレクトリ内にディレクトリを生成することができません。

Flink はユーザのホームディレクトリ内にFlink jar と設定ファイルを格納する.flink/ディレクトリを作成します。

私のジョブはジョブの取り消しに反応しませんか?

Flink は全てのユーザタスク上でcancel() を呼ぶことでジョブを取り消します。理想的には、タスクはその呼び出しに適切に反応し、現在実行しているものを停止します。そして全てのスレッドをシャットダウンすることができます。

タスクが一定の時間の間に反応しない場合は、Flinkは定期的にスレッドの割り込みを開始するでしょう。

タスクマネージャーのログにはユーザコードがブロックされた現在のメソッドのスタックも含まれるでしょう。

特徴

ストリーミングプログラムについては、Flinkはストリーミングデータフローの状態のスナップショットを取るために新しい方法を持ち、回復のためにそれらを使います。この仕組みは効果的で柔軟性があります。詳細はストリーミングの耐障害性 のドキュメントを見てください。

バッチ処理プログラムについては、Flinkは変換の順番を記憶し、失敗したジョブを再開することができます。

CounterやDistributedCacheのようなHadoop的なユーティリティがサポートされますか?

Flinkの Accumulators はHadoopのカウンターにとてもよく似た動作をしますが、もっと強力です。

Flink はAPIを使って深く統合された分散キャッシュを持ちます。それを使う方法の詳細はJavaDocsを参照してください。

全てのタスク上でデータセットを利用可能にするためには、代わりにブロードキャスト変数 を使うことをお勧めします。それらは分散キャッシュよりももっと効率的で使いやすいです。


TOP
inserted by FC2 system