以下の質問は一般的なFlinkプロジェクトについてよく聞かれる質問です。さらに質問がある場合は、ドキュメントに質問するか、あるいは コミュニティに質問してください。
Flink はデータ処理システムで、もう一つのHadoopの MapReduce コンポーネントです。MapReduce上でビルドされるというより、独自のランタンムが付属しています。そのように、Hadoopエコシステムとは完全に独立して動作することができます。しかしながら、Flinkはデータを読み書きするためにHadoopの分散ファイルシステム(HDFS)にアクセスすることもでき、クラスタリソースを供給するためにHadoopの次世代のリソースマネージャー(YARNを)にアクセスすることができます。ほとんどのFlinkユーザはデータを格納するためにHadoop HDFSを使っているため、すでにFlinkはHDFSにアクセスするために必要なライブラリを積み込んでいます。
No. Flink はHadoopのインストレーション無しに実行することができます。しかし、Hadoop分散ファイルシステム(HDFS)内に格納されているデータを解析するためにFlinkを使うためのとても一般的 なセットアップです。それらのセットアップをそのままで動作するようにするために、FlinkはデフォルトでHadoopクライアントライブラリをバンドルします。
さらに、既存のHadoop YARNクラスタを使うユーザのための特別なFlinkのYARN組み込みダウンロードを提供します。Apache Hadoop YARN is Hadoop’s cluster resource manager that allows to use different execution engines next to each other on a cluster.
Flinkプログラムの進捗を追跡する多くの方法があります:
conf/flink-config.yml
の中で設定されます)。log/flink-<user>-jobmanager-<host>.log
と log/flink-<user>-taskmanager-<host>.log
)。Flink プログラムの中で、並行化は操作がどのようにタスクスロットに割り当てられた個々のタスク内に分割するかを決めます。クラスタ内の各ノードは少なくとも一つのタスクスロットを持ちます。タスクスロットの総数は全てのマシーン上のタスクスロットの数です。並行化がN
に設定された場合、Flinkは操作を利用可能なタスクスロットを使って同時に計算可能なN
個の並行タスクに分割しようとします。タスクスロットの数は全てのタスクが同時にタスクスロット内で計算されるように並行化と一致しなければなりません。
注意: 全ての操作が複数のタスクに分割することができるわけではありません。例えば、グルーピング無しのGroupReduce
は、reduce操作を実施するためにグループ全体が確実に1つのノードになければならないので、並行度1で実施されなければなりません。 Flink は並行度が1でなければならないかどうかを決定し、それに応じて設定します。
並行度はFlinkプログラムの実行上でfine-grained制御を保証するために多くの方法で設定することができます。並行度を設定する方法の詳細な説明は 設定ガイドを見てください。また、処理スロットと並行度がお互いにどう関係しているかを説明しているこの図もチェックしてください。
Flinkの全ての機能はjava.io.Serializableによって定義されたように並列化されなければなりません。全ての関数のインタフェースが並行化されるので、例外は関数の中で使われるフィールドの一つが並行化されていないことを意味します。
特に、関数が内部クラスあるいは匿名内部クラスの場合、それは覆っているクラスへの隠された参照を含んでいます(デバッガ内で関数を見ると、通常はthis$0
で呼ばれます)。覆っているクラスが並行化されていない場合は、これはおそらくソースのエラーです。解決方法は、
それはタイプ情報のための明示的な値が提供されていないかもしれないことを意味します。コード内にimport org.apache.flink.api.scala._
文を持つようにします。
ジェネリックパラメータを取る関数あるいはクラス内でflinkのオペレーションを使っている場合は、そのパラメータのためにTypeInformation が利用可能でなければなりません。これはコンテキストバウンドを使うことで達成されます:
def myFunction[T: TypeInformation](input: DataSet[T]): DataSet[Seq[T]] = {
input.reduceGroup( i => i.toSeq )
}
Flinkがどうやってタイプを扱うかについての深い議論は、Type Extraction and Serialization を見てください。
超並列の設定(100+並行スレッド)でFlinkを実行する場合、設定パラメータ taskmanager.network.numberOfBuffers
を使って大量のネットワークバッファを適用する必要があります。大まかには、バッファの数は少なくとも 4 * numberOfTaskManagers * numberOfSlotsPerTaskManager^2
でなければなりません。詳細はConfiguration Reference を見てください。
これらの例外の最も一般的なのは、Flinkが間違ったHDFSバージョンでセットアップされている場合です。異なるHDFSバージョンはしばしばお互いに互換性がなく、マスターとクライアントのファイルシステム間の接続を破壊します。
Call to <host:port> failed on local exception: java.io.EOFException
at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
at org.apache.hadoop.ipc.Client.call(Client.java:743)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
at $Proxy0.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:276
異なるHadoopとHDFSバージョンのためにFlinkをセットアップする詳細な方法は、ダウンロードページ と ビルドの説明 を参照してください。
Flink はデフォルトでHadoop 2.2.バイナリと一緒に提供されます。これらのバイナリはHDFSあるいはYARNに接続するために使われます。(特に高負荷の時に)HDFSに書き込む時に例外を起こすHDFSのクライアントのバグがあるようです。共通する例外は以下の通りです:
HDFS client trying to connect to the standby Namenode "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby"
java.io.IOException: Bad response ERROR for block BP-1335380477-172.22.5.37-1424696786673:blk_1107843111_34301064 from datanode 172.22.5.81:50010
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:732)
Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0
at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:478)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:6039)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:6002)
これらのいずれかを経験した場合は、ローカルのHDFSのバージョンと一致するHadoopのバージョンを使ってビルドされたFlinkを使うことをお勧めします。正確なHadoopバージョンに対してFlinkを手動でビルドすることもできます(例えば、カスタムパッチレベルを使ったHadoop配布物を使う場合)。
FlinkはEclipseのScalaプラグインにまだ適切に統合されていないScalaコンパイラの新しい機能("quasiquotes"と呼ばれます)を使います。この機能をEclipseで利用可能にするには、手動でflink-scala プロジェクトをcompiler pluginを使うように設定する必要があります:
are not grouped/joined correctly?
キーはメソッドjava.lang.Object#hashCode()
, java.lang.Object#equals(Object o)
と java.util.Comparable#compareTo(...)
を正しく実装しなければなりません。これらのメソッドは通常不適切なデフォルトの実装を使って常に支援されています。従って、全てのキーは hashCode()
と equals(Object o)
を上書きしなければなりません。
全てのデータタイプクラスはpublicでpublicなnullコンストラクタ(引数無しのコンストラクタ)を持たなければなりません。さらに、クラスはabstractあるいはinterfaceであってはなりません。クラスが内部クラスの場合、それらはpubilcかつstaticでなければなりません。
シャットダウンはいくつかのクリーンアップ作業を行うかも知れないので、プロセスの停止は時に数秒かかります。
いくつかのエラーの場合において、JobManagerあるいはTaskManagerが提供されたstopスクリプト(bin/stop-local.sh
あるいは bin/stop- cluster.sh
)では停止されない事がありえます。Linux/Macでは以下のようにしてそれらのプロセスをkillすることができます:
jps
コマンドを使うか、全てのJavaプロセスを見つけるために ps -ef | grep java
コマンドを使うことができます。kill -9 <pid>
を使ってプロセスをkillします。ここで pid
は影響を受けた JobManager あるいは TaskManager プロセスのプロセスidです。Windows上では、TaskManager はすべてのプロセスの表を表示し、そのエントリを右クリックしてプロセスを破壊することができます。
JobManager とTaskManager サービスの両方とも、それぞれのログファイルに(SIGKILL と SIGTERMのような)シグナルを書き込むでしょう。これは停止の動きをする問題のデバッグに有用です。
これらの例外は通常プログラム内の関数が例えばリストあるいはマップ内に大量のオブジェクトを集めるために多くのメモリを消費する場合に起こります。Javaでの OutOfMemoryExceptionsはちょっとトリッキーなものです。その例外はほとんどのメモリを割り当てられているコンポーネントによって必ずしも投げられるわけではなく、提供されない最後のメモリのビットをリクエストしようとしたコンポーネントによって投げられます。
これを動作させるには二つの方法があります:
関数内でメモリを使うのを少なくできないかを調べる。例えば、オブジェクトタイプの代わりにプリミティブなタイプの配列を使う。
Flinkが地震の処理のために保持しているメモリを減らす。TaskManagerはソート、ハッシュ、キャッシュ、ネットワークバッファなどのために利用可能なメモリをいくらか取り分けています。その部分の目織はユーザ定義の関数では利用できません。By reserving it, the system can guarantee to not run out of memory on large inputs, but to plan with the available memory and destage operations to disk, if necessary. デフォルトでシステムはメモリの約70%を保持します。ユーザ定義関数内でもっとメモリを必要とするアプリケーションを頻繁に実行する場合は、taskmanager.memory.fraction
あるいは taskmanager.memory.size
設定エントリを使ってその値を減らすことができます。. 詳細は設定リファレンス を見てください。これはJVMヒープへメモリをもっと残しますが、もっと頻繁にディスクにデータ処理タスクを持っていくでしょう。
OutOfMemoryExceptions の他の理由には、間違った状態のバックエンドの使用です。デフォルトでは、Flinkはストリーミングジョブの中でオペレーターのための状態のヒープに基づいた状態バックエンドを使っています。RocksDBStateBackend
を使って利用可能なヒープ空間より大きなサイズの状態を持つことができます。
ジョブのログの挙動を調べる。Emitting logging per or tuple may be helpful to debug jobs in small setups with tiny data sets, it becomes very inefficient and disk space consuming if used for large input data.
java.lang.Exception: タスクが実行されたスロットが解放されました。おそらく TaskManagerの喪失
は通常大きなガベージコレクションのストールがある場合に起こります。この場合、素早い修正はG1ガベージコレクターを使うことです。それは徐々に動作し、停止の頻度が少なくなります。さらに、もっと多くのメモリをユーザコードに取っておくことができます(例えばシステムあたり0.4およびユーザあたり0.6)。
これらのやり方の両方が失敗し、エラーが続く場合は、AKKA_WATCH_HEARTBEAT_PAUSE (akka.watch.heartbeat.pause)をもっと大きな値(例えば、 600s)に設定することで単純にTaskManagerのハートビートの中止を増加します。これは、タスクマネージャーが喪失したと見なす前にさらに長い間隔でジョブマネージャーがハートビートを待つようにするでしょう。
./bin/yarn-session.sh
スクリプトはYARNセッションが開いている間実行することを意図しています。しかし、いくつかのエラーの場合、そのスクリプトは突然実行を停止します。出力は以下のように見えます:
07:34:27,004 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host
Flink JobManager is now running on worker1:6123
JobManager Web Interface: http://jobtracker-host:54311/proxy/application_1295604279745_273123/
07:34:51,528 INFO org.apache.flinkyarn.Client - Application application_1295604279745_273123 finished with state FINISHED at 1398152089553
07:34:51,529 INFO org.apache.flinkyarn.Client - Killing the Flink-YARN application.
07:34:51,529 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killing application application_1295604279745_273123
07:34:51,534 INFO org.apache.flinkyarn.Client - Deleting files in hdfs://user/marcus/.flink/application_1295604279745_273123
07:34:51,559 INFO org.apache.flinkyarn.Client - YARN Client is shutting down
ここでの問題は、Application Master (AM)が停止しYARNクライアントがアプリケーションが終了したと見なしていることです。
この挙動には3つの理由が考えられます。
ApplicationMaster が例外で終了した。エラーをデバッグするには、コンテナーのログファイルを見てください。yarn-site.xml
ファイルはその設定パスを含んでいます。パスのキーは yarn.nodemanager.log-dirs
で、デフォルトの値は ${yarn.log.dir}/userlogs
です。
YARN hがApplicationMasterを実行しているコンテナーをkillした。AMがあまりに多くのメモリを使用したか、ほかのリソースがYARNの限界を超えたかの場合に起こります。この場合、ホスト上のnodemanagerのログにエラーメッセージを見つけるでしょう。
オペレーティングシステムはAMのJVMをシャットダウンします。これはもしYARNの設定が間違っていて、物理的に利用可能な量より多くのメモリを設定している場合に起こるかも知れません。これが起きたかどうかを見るために、AMが実行されていたマシーン上でdmesg
を実行します。LinuxのOOM killerのメッセージを見るでしょう。
YARNセッションを開始する間、以下のような例外を受け取ります:
Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:234)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:214)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5193)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5175)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5149)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2090)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1393)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1382)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1307)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:384)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:380)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:380)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:324)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2021)
at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1989)
at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1954)
at org.apache.flinkyarn.Utils.setupLocalResource(Utils.java:176)
at org.apache.flinkyarn.Client.run(Client.java:362)
at org.apache.flinkyarn.Client.main(Client.java:568)
このエラーの理由は、HDFS内のユーザのホームディレクトリが間違ったパーミッションを持っているからです。ユーザ (この場合 robert
) は、自身のホームディレクトリ内にディレクトリを生成することができません。
Flink はユーザのホームディレクトリ内にFlink jar と設定ファイルを格納する.flink/
ディレクトリを作成します。
Flink は全てのユーザタスク上でcancel()
を呼ぶことでジョブを取り消します。理想的には、タスクはその呼び出しに適切に反応し、現在実行しているものを停止します。そして全てのスレッドをシャットダウンすることができます。
タスクが一定の時間の間に反応しない場合は、Flinkは定期的にスレッドの割り込みを開始するでしょう。
タスクマネージャーのログにはユーザコードがブロックされた現在のメソッドのスタックも含まれるでしょう。
ストリーミングプログラムについては、Flinkはストリーミングデータフローの状態のスナップショットを取るために新しい方法を持ち、回復のためにそれらを使います。この仕組みは効果的で柔軟性があります。詳細はストリーミングの耐障害性 のドキュメントを見てください。
バッチ処理プログラムについては、Flinkは変換の順番を記憶し、失敗したジョブを再開することができます。
Flinkの Accumulators はHadoopのカウンターにとてもよく似た動作をしますが、もっと強力です。
Flink はAPIを使って深く統合された分散キャッシュを持ちます。それを使う方法の詳細はJavaDocsを参照してください。
全てのタスク上でデータセットを利用可能にするためには、代わりにブロードキャスト変数 を使うことをお勧めします。それらは分散キャッシュよりももっと効率的で使いやすいです。