Amazon Web Services (AWS)

Amazon Web Services はFlinkを実行することができるクラウド計算サービスを提供します。

EMR: Elastic MapReduce

Amazon Elastic MapReduce (Amazon EMR) はHadoopクラスタを簡単に素早くセットアップwebサービスです。全てのセットアップの面倒を見るので、これはAWS上でFlinkを実行するお勧めの方法です。

EMR クラスタの作成

EMR ドキュメントはどうやってEMRクラスタを開始するかを示す例を含んでいます。そのガイドに従って任意のEMRリリースをインストールすることができます。EMRリリースの一部として 全てのアプリケーションをインストールする必要はありませんが、Core Hadoopを固持するかもしれません:

クラスタを作成する場合は、必要であればS3バケットへのアクセスを許可してIAM rolesをセットアップするようにしてください。

上に戻る

クラスタを作成した後で、マスターノードに接続し、Flinkをインストールします:

  1. ダウンロードページに行き、EMRクラスタのHadoopバージョンと一致するFlinkのバイナリバージョンをダウンロード します。例えば、Hadoop 2.7 for EMR releases 4.3.0, 4.4.0 あるいは 4.5.0。
  2. Flinkの配布物を解凍すると、Hadoop設定ディレクトリを設定した後で、YARNを軽油してFlink のジョブをデプロイする用意ができます:
HADOOP_CONF_DIR=/etc/hadoop/conf bin/flink run -m yarn-cluster examples/streaming/WordCount.jar

上に戻る

S3: Simple Storage Service

Amazon Simple Storage Service (Amazon S3) は様々な使い方のためのクラウド オブジェクト ストレージを提供します。Flinkと一緒にstreaming state backendsと連結して読み込みおよびデータの書き込みのためにS3を使うことができます。

以下の形式のパスを指定することでS3オブジェクトを通常のファイルのように使うことができます:

s3://<your-bucket>/<endpoint>

終端は1つのファイルまたはディレクトリのどちらかが成りえます。例えば:

// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");

// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");

// Use S3 as FsStatebackend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));

Note that these examples are not exhaustive and you can use S3 in other places as well, including your high availability setup or the RocksDBStateBackend; everywhere that Flink expects a FileSystem URI.

S3 ファイルシステムの設定

注意: EMR上のFlinkを実行している場合は手動でこれを設定する必要はありません。

S3はFlinkによって通常のファイルシステムのように扱われます。S3 とのやり取りはHadoopのS3 ファイルシステム クライアントを経由して起こります。

二つの人気のあるS3ファイルシステムの実装が利用可能です:

  1. S3AFileSystem (お勧め): 内部的にAmazonのSDKを使って通常のファイルを読み書きするためのファイルシステム。ファイルの最大サイズが無く、IAM roleと連携します。
  2. NativeS3FileSystem: 通常のファイルを読み書きするためのファイルシステム。最大オブジェクトサイズは5GBで、IAM role と連携しません。

使うことをお勧めするS3ファイルシステム実装です。内部的にAmazonのSDKを使い、IAM role と連携します(アクセス証明書の設定を見てください)。

Flinkが有効なHadoop設定を示す必要があります。これはcore-site.xml内で以下のプロパティを含みます:

<configuration>

<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
     large results prior to transmitting them to S3. -->
<property>
  <name>fs.s3.buffer.dir</name>
  <value>/tmp</value>
</property>

</configuration>

これはs3://スキーマを使うURIのためのデフォルトのファイルシステムとしてS3AFileSystemを登録します。

NativeS3FileSystem

このファイルシステムはサイズで5GBまでの提出に制限されていて、IAM rolesが動作しません(アクセス証明書の設定を見てください)。Hadoop設定ファイルの中でAWS証明書を手動で設定しなければならないことを意味します。

Flinkが有効なHadoop設定を示す必要があります。これはcore-site.xml内で以下のプロパティを含みます:

<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>

これはs3://スキーマを使うURIのためのデフォルトのファイルシステムとしてNativeS3FileSystemを登録します。

Hadoop 設定

You can specify the Hadoop configuration in various ways, for example by configuring the path to the Hadoop configuration directory in flink-conf.yaml:

fs.hdfs.hadoopconf: /path/to/etc/hadoop

This registers /path/to/etc/hadoop as Hadoop’s configuration directory with Flink. Flink will look for the core-site.xml and hdfs-site.xml files in the specified directory.

上に戻る

アクセス証明書の設定

注意: EMR上のFlinkを実行している場合は手動でこれを設定する必要はありません。

S3ファイルシステムをセットアップした後で、FlinkがS3バケットにアクセスすることができることを確実にする必要があります。

When using S3AFileSystem the recommended way of setting up credentials on AWS is via Identity and Access Management (IAM). S3バケットにアクセスするためにFlinkインスタンスに証明書を安全に渡すために、IAM機能を使うことができます。これを行う方法の詳細はこのドキュメントの範囲を超えます。AWSユーザガイドを参照してください。What you are looking for are IAM Roles.

これを正しくセットアップすると、AWS内のS3へのアクセスを管理することができ、Flinkに何もアクセスキーを配布する必要がありません。

これは S3AFileSystem とだけ連携し、NativeS3FileSystemとは連携しないことに注意してください。

上に戻る

Access Keys with S3AFileSystem (Discouraged)

S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。IAM rolesの紹介なので、これは非推奨であることに注意してください。

For S3AFileSystem you need to configure both fs.s3a.access.key and fs.s3a.secret.key in Hadoop’s core-site.xml:

<property>
  <name>fs.s3a.access.key</name>
  <value></value>
</property>

<property>
  <name>fs.s3a.secret.key</name>
  <value></value>
</property>

上に戻る

Access Keys with NativeS3FileSystem (Discouraged)

S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。But this is discouraged and you should use S3AFileSystem with the required IAM roles.

For NativeS3FileSystem you need to configure both fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey in Hadoop’s core-site.xml:

<property>
  <name>fs.s3.awsAccessKeyId</name>
  <value></value>
</property>

<property>
  <name>fs.s3.awsSecretAccessKey</name>
  <value></value>
</property>

上に戻る

S3 ファイルシステム依存の提供

注意: EMR上のFlinkを実行している場合は手動でこれを設定する必要はありません。

HadoopのS3ファイルシステム クライアントは hadoop-awsの中にパッケージされています。このJARと全ての依存はFlinkのクラスパスに追加される必要があります。別の言い方をすると、ジョブとタスクマネージャーの両方のクラスパスです。どのファイルシステム実装およびどのFlinkおよびHadoopバージョンを使うかに依存して、異なる依存を提供する必要があります(以下を見てください)。

JARをFlinkのクラスパスに追加する複数の方法があります。もっとも簡単なのは単純にJARを Flinkの /lib フォルダに入れることです。全ての依存と一緒にhadoop-aws JARをコピーする必要があります。全てのマシーン上でHADOOP_CLASSPATH 環境変数の一部としてこれらのJARを含むディレクトリもエクスポートします。

どのファイルシステムを使っているかに依存して、以下の依存を追加してください。hadoop-2.7/share/hadoop/tools/libの中のHadoopバイナリの一部として見つけることができます:

  • S3AFileSystem:
    • hadoop-aws-2.7.2.jar
    • aws-java-sdk-1.7.4.jar
    • httpcore-4.2.5.jar
    • httpclient-4.2.5.jar
  • NativeS3FileSystem:
    • hadoop-aws-2.7.2.jar
    • guava-11.0.2.jar

hadoop-common はFlinkの一部として利用可能ですが、GuavaはFlinkによって共有されます。

どのファイルシステムを使っているかに依存して、以下の依存を追加してください。hadoop-2.6/share/hadoop/tools/libの中のHadoopバイナリの一部として見つけることができます:

  • S3AFileSystem:
    • hadoop-aws-2.6.4.jar
    • aws-java-sdk-1.7.4.jar
    • httpcore-4.2.5.jar
    • httpclient-4.2.5.jar
  • NativeS3FileSystem:
    • hadoop-aws-2.6.4.jar
    • guava-11.0.2.jar

hadoop-common はFlinkの一部として利用可能ですが、GuavaはFlinkによって共有されます。

これらのHadoopのバージョンはNativeS3FileSystemのためのサポートのみを持ちます。This comes pre-packaged with Flink for Hadoop 2 as part of hadoop-common. クラスパスには何も追加する必要はありません。

上に戻る

一般的な問題

以下の章はAWS上でFlinkと連携する時の一般的な問題をリスト化します。

S3 ファイルシステム設定の失敗

ジョブのサブミットがNo file system found with scheme s3に注目した例外メッセージを伴って失敗した場合、このことはS3のために設置されたファイルシステムが無いことを意味します。これを適切に設定する方法についての詳細はFileSystem 設定の章 を調べてください。

org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
  No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
Caused by: java.io.IOException: No file system found with scheme s3,
  referenced in file URI 's3://<bucket>/<endpoint>'.
    at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
    at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
    at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
    at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)

上に戻る

AWS アクセスキーID と秘密アクセスキーが指定されていない

ジョブがAWS Access Key ID and Secret Access Key must be specified as the username or passwordに注目した例外を伴って失敗した場合、アクセス証明書が適切にセットアップされていません。これを設定する方法の詳細はアクセス証明書の章 を参照してください。

org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
  HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
  AWS Access Key ID and Secret Access Key must be specified as the username or password
  (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
  or fs.s3n.awsSecretAccessKey properties (respectively) [...]
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
  be specified as the username or password (respectively) of a s3 URL, or by setting
  the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
    at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
    at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
    at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
    at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)

上に戻る

ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found

この例外を見た場合は、S3ファイルシステムがFlinkのクラスパスの一部ではありません。これを適切に設定する方法の詳細は S3 ファイルシステム依存の章を参照してください。

Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
  at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
  at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
  at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
  at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
  ... 25 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
  ... 32 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
  ... 33 more

上に戻る

IOException: 400: Bad Request

全てを適切に設定したが、Bad Request 例外を受け取り、かつ S3 バケットがリージョンeu-central-1にある場合は、Amazon’s signature version 4をサポートしないS3クライアントを実行しているかも知れません。

Currently, this includes all Hadoop versions up to 2.7.2 running NativeS3FileSystem, which depend on JetS3t 0.9.0 instead of a version >= 0.9.4.

回避策はバケットのリージョンを変更することです。

[...]
Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
Caused by: org.jets3t.service.impl.rest.HttpException [...]

上に戻る

NullPointerException at org.apache.hadoop.fs.LocalDirAllocator

この例外は通常S3AFileSystemのためのローカルバッファディレクトリ設定fs.s3.buffer.dirをスキップすることで起こります。S3AFileSystemを適切に設定する方法を理解するために、S3AFileSystem 設定を参照してください。

[...]
Caused by: java.lang.NullPointerException at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
o.a.h.fs.FileSystem.create(FileSystem.java:907) at
o.a.h.fs.FileSystem.create(FileSystem.java:888) at
o.a.h.fs.FileSystem.create(FileSystem.java:785) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
... 25 more
TOP
inserted by FC2 system