Amazon Web Services はFlinkを実行することができるクラウド計算サービスを提供します。
Amazon Elastic MapReduce (Amazon EMR) はHadoopクラスタを簡単に素早くセットアップwebサービスです。全てのセットアップの面倒を見るので、これはAWS上でFlinkを実行するお勧めの方法です。
EMR ドキュメントはどうやってEMRクラスタを開始するかを示す例を含んでいます。そのガイドに従って任意のEMRリリースをインストールすることができます。EMRリリースの一部として 全てのアプリケーションをインストールする必要はありませんが、Core Hadoopを固持するかもしれません:
クラスタを作成する場合は、必要であればS3バケットへのアクセスを許可してIAM rolesをセットアップするようにしてください。
クラスタを作成した後で、マスターノードに接続し、Flinkをインストールします:
HADOOP_CONF_DIR=/etc/hadoop/conf bin/flink run -m yarn-cluster examples/streaming/WordCount.jar
Amazon Simple Storage Service (Amazon S3) は様々な使い方のためのクラウド オブジェクト ストレージを提供します。Flinkと一緒にstreaming state backendsと連結して読み込みおよびデータの書き込みのためにS3を使うことができます。
以下の形式のパスを指定することでS3オブジェクトを通常のファイルのように使うことができます:
s3://<your-bucket>/<endpoint>
終端は1つのファイルまたはディレクトリのどちらかが成りえます。例えば:
// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");
// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");
// Use S3 as FsStatebackend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));
これらの例は精緻なものではなく、高可用性セットアップ あるいは RocksDBStateBackendを含めて、S3を他の場所でも使えることに注意してください; FlinkがファイルシステムURIを期待するどこでも。
S3はFlinkによって通常のファイルシステムのように扱われます。S3 とのやり取りはHadoopのS3 ファイルシステム クライアントを経由して起こります。
二つの人気のあるS3ファイルシステムの実装が利用可能です:
S3AFileSystem
(お勧め): 内部的にAmazonのSDKを使って通常のファイルを読み書きするためのファイルシステム。ファイルの最大サイズが無く、IAM roleと連携します。NativeS3FileSystem
: 通常のファイルを読み書きするためのファイルシステム。最大オブジェクトサイズは5GBで、IAM role と連携しません。S3AFileSystem
(お勧め)使うことをお勧めするS3ファイルシステム実装です。内部的にAmazonのSDKを使い、IAM role と連携します(アクセス証明書の設定を見てください)。
Flinkが有効なHadoop設定を示す必要があります。これはcore-site.xml
内で以下のプロパティを含みます:
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
これはs3://
スキーマを使うURIのためのデフォルトのファイルシステムとしてS3AFileSystem
を登録します。
NativeS3FileSystem
このファイルシステムはサイズで5GBまでの提出に制限されていて、IAM rolesが動作しません(アクセス証明書の設定を見てください)。Hadoop設定ファイルの中でAWS証明書を手動で設定しなければならないことを意味します。
Flinkが有効なHadoop設定を示す必要があります。これはcore-site.xml
内で以下のプロパティを含みます:
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
これはs3://
スキーマを使うURIのためのデフォルトのファイルシステムとしてNativeS3FileSystem
を登録します。
様々な方法で Hadoop 設定 を指定することができます。例えば、flink-conf.yaml
内でHadoop設定ディレクトリへのパスを設定します:
fs.hdfs.hadoopconf: /path/to/etc/hadoop
これはFlinkを使ってHadoopの設定ディレクトリとして/path/to/etc/hadoop
を登録します。Flinkは指定されたディレクトリのcore-site.xml
とhdfs-site.xml
ファイルを調べないでしょう。
S3ファイルシステムをセットアップした後で、FlinkがS3バケットにアクセスすることができることを確実にする必要があります。
S3AFileSystem
を使う時にAWS上で証明書をセットアップするお勧めの方法は Identity and Access Management (IAM)を使います。S3バケットにアクセスするためにFlinkインスタンスに証明書を安全に渡すために、IAM機能を使うことができます。これを行う方法の詳細はこのドキュメントの範囲を超えます。AWSユーザガイドを参照してください。探すものはIAM Rolesです。
これを正しくセットアップすると、AWS内のS3へのアクセスを管理することができ、Flinkに何もアクセスキーを配布する必要がありません。
これは S3AFileSystem
とだけ連携し、NativeS3FileSystem
とは連携しないことに注意してください。
S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。IAM rolesの紹介なので、これは非推奨であることに注意してください。
S3AFileSystem
のために、Hadoopのcore-site.xml
の中でfs.s3a.access.key
と fs.s3a.secret.key
の両方を設定する必要があります :
<property>
<name>fs.s3a.access.key</name>
<value></value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value></value>
</property>
S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。しかし、これは非推奨で、必要とされるIAM ロールを持つS3AFileSystem
を使う必要があります。
NativeS3FileSystem
のために、Hadoopのcore-site.xml
の中でfs.s3.awsAccessKeyId
と fs.s3.awsSecretAccessKey
の両方を設定する必要があります :
<property>
<name>fs.s3.awsAccessKeyId</name>
<value></value>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value></value>
</property>
HadoopのS3ファイルシステム クライアントは hadoop-aws
の中にパッケージされています。このJARと全ての依存はFlinkのクラスパスに追加される必要があります。別の言い方をすると、ジョブとタスクマネージャーの両方のクラスパスです。どのファイルシステム実装およびどのFlinkおよびHadoopバージョンを使うかに依存して、異なる依存を提供する必要があります(以下を見てください)。
JARをFlinkのクラスパスに追加する複数の方法があります。もっとも簡単なのは単純にJARを Flinkの /lib
フォルダに入れることです。全ての依存と一緒にhadoop-aws
JARをコピーする必要があります。全てのマシーン上でHADOOP_CLASSPATH
環境変数の一部としてこれらのJARを含むディレクトリもエクスポートします。
どのファイルシステムを使っているかに依存して、以下の依存を追加してください。hadoop-2.7/share/hadoop/tools/lib
の中のHadoopバイナリの一部として見つけることができます:
S3AFileSystem
:
hadoop-aws-2.7.2.jar
aws-java-sdk-1.7.4.jar
httpcore-4.2.5.jar
httpclient-4.2.5.jar
NativeS3FileSystem
:
hadoop-aws-2.7.2.jar
guava-11.0.2.jar
hadoop-common
はFlinkの一部として利用可能ですが、GuavaはFlinkによって共有されます。
どのファイルシステムを使っているかに依存して、以下の依存を追加してください。hadoop-2.6/share/hadoop/tools/lib
の中のHadoopバイナリの一部として見つけることができます:
S3AFileSystem
:
hadoop-aws-2.6.4.jar
aws-java-sdk-1.7.4.jar
httpcore-4.2.5.jar
httpclient-4.2.5.jar
NativeS3FileSystem
:
hadoop-aws-2.6.4.jar
guava-11.0.2.jar
hadoop-common
はFlinkの一部として利用可能ですが、GuavaはFlinkによって共有されます。
これらのHadoopのバージョンはNativeS3FileSystem
のためのサポートのみを持ちます。これはhadoop-common
の一部としてHadoop 2 のためのFlinkと一緒に事前パッケージ化されて同梱されています。クラスパスには何も追加する必要はありません。
以下の章はAWS上でFlinkと連携する時の一般的な問題をリスト化します。
ジョブのサブミットがNo file system found with scheme s3
に注目した例外メッセージを伴って失敗した場合、このことはS3のために設置されたファイルシステムが無いことを意味します。これを適切に設定する方法についての詳細はFileSystem 設定の章 を調べてください。
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
Caused by: java.io.IOException: No file system found with scheme s3,
referenced in file URI 's3://<bucket>/<endpoint>'.
at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
ジョブがAWS Access Key ID and Secret Access Key must be specified as the username or password
に注目した例外を伴って失敗した場合、アクセス証明書が適切にセットアップされていません。これを設定する方法の詳細はアクセス証明書の章 を参照してください。
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
AWS Access Key ID and Secret Access Key must be specified as the username or password
(respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
or fs.s3n.awsSecretAccessKey properties (respectively) [...]
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
be specified as the username or password (respectively) of a s3 URL, or by setting
the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
この例外を見た場合は、S3ファイルシステムがFlinkのクラスパスの一部ではありません。これを適切に設定する方法の詳細は S3 ファイルシステム依存の章を参照してください。
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
... 25 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
... 32 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
... 33 more
400: Bad Request
全てを適切に設定したが、Bad Request
例外を受け取り、かつ S3 バケットがリージョンeu-central-1
にある場合は、Amazon’s signature version 4をサポートしないS3クライアントを実行しているかも知れません。
現在のところ、これはNativeS3FileSystem
を実行する2.7.2の全てのHadoopのバージョンを含みます。バージョン>= 0.9.4の代わりにJetS3t 0.9.0
に依存します。
回避策はバケットのリージョンを変更することです。
[...]
Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
Caused by: org.jets3t.service.impl.rest.HttpException [...]
この例外は通常S3AFileSystem
のためのローカルバッファディレクトリ設定fs.s3.buffer.dir
をスキップすることで起こります。S3AFileSystem
を適切に設定する方法を理解するために、S3AFileSystem 設定を参照してください。
[...]
Caused by: java.lang.NullPointerException at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
o.a.h.fs.FileSystem.create(FileSystem.java:907) at
o.a.h.fs.FileSystem.create(FileSystem.java:888) at
o.a.h.fs.FileSystem.create(FileSystem.java:785) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
... 25 more