Amazon Web Services (AWS)

Amazon Web Services はFlinkを実行することができるクラウド計算サービスを提供します。

EMR: Elastic MapReduce

Amazon Elastic MapReduce (Amazon EMR) はHadoopクラスタを簡単に素早くセットアップwebサービスです。全てのセットアップの面倒を見るので、これはAWS上でFlinkを実行するお勧めの方法です。

Standard EMR Installation

Flink is a supported application on Amazon EMR. Amazon’s documentation describes configuring Flink, creating and monitoring a cluster, and working with jobs.

Custom EMR Installation

Amazon EMR services are regularly updated to new releases but a version of Flink which is not available can be manually installed in a stock EMR cluster.

EMR クラスタの作成

EMR ドキュメントはどうやってEMRクラスタを開始するかを示す例を含んでいます。そのガイドに従って任意のEMRリリースをインストールすることができます。You don’t need to install the All Applications part of the EMR release, but can stick to Core Hadoop.

Note Access to S3 buckets requires configuration of IAM roles when creating an EMR cluster.

EMRクラスタ上にFlinkをインストール

クラスタを作成した後で、マスターノードに接続し、Flinkをインストールします:

  1. Go the Downloads Page and download a binary version of Flink matching the Hadoop version of your EMR cluster, e.g. Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0.
  2. Flinkの配布物を解凍すると、Hadoop設定ディレクトリを設定した後で、YARNを軽油してFlink のジョブをデプロイする用意ができます:
HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar

上に戻る

S3: 簡単なストレージサービス

Amazon Simple Storage Service (Amazon S3) は様々な使い方のためのクラウド オブジェクト ストレージを提供します。You can use S3 with Flink for reading and writing data as well in conjunction with the streaming state backends or even as a YARN object storage.

以下の形式のパスを指定することでS3オブジェクトを通常のファイルのように使うことができます:

s3://<your-bucket>/<endpoint>

終端は1つのファイルまたはディレクトリのどちらかが成りえます。例えば:

// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");

// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");

// Use S3 as FsStatebackend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));

これらの例は精緻なものではなく高可用性セットアップ あるいは RocksDBStateBackendを含めて、S3を他の場所でも使えることに注意してください; FlinkがファイルシステムURIを期待するどこでも。

For most use cases, you may use one of our shaded flink-s3-fs-hadoop and flink-s3-fs-presto S3 filesystem wrappers which are fairly easy to set up. For some cases, however, e.g. for using S3 as YARN’s resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem implementation. Both ways are described below.

注意: EMR上のFlinkを実行している場合は手動でこれを設定する必要はありません。

To use either flink-s3-fs-hadoop or flink-s3-fs-presto, copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.

cp ./opt/flink-s3-fs-presto-1.6-SNAPSHOT.jar ./lib/

Both flink-s3-fs-hadoop and flink-s3-fs-presto register default FileSystem wrappers for URIs with the s3:// scheme, flink-s3-fs-hadoop also registers for s3a://.

アクセス証明書の設定

After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets.

AWS上で証明書をセットアップするお勧めの方法は Identity and Access Management (IAM)を使います。S3バケットにアクセスするためにFlinkインスタンスに証明書を安全に渡すために、IAM機能を使うことができます。これを行う方法の詳細はこのドキュメントの範囲を超えます。AWSユーザガイドを参照してください。探すものはIAM Rolesです。

これを正しくセットアップすると、AWS内のS3へのアクセスを管理することができ、Flinkに何もアクセスキーを配布する必要がありません。

アクセスキー(非推奨)

S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。IAM rolesの紹介なので、これは非推奨であることに注意してください。

You need to configure both s3.access-key and s3.secret-key in Flink’s flink-conf.yaml:

s3.access-key: your-access-key
s3.secret-key: your-secret-key

上に戻る

Hadoop-provided S3 file systems - manual setup

注意: EMR上のFlinkを実行している場合は手動でこれを設定する必要はありません。

This setup is a bit more complex and we recommend using our shaded Hadoop/Presto file systems instead (see above) unless required otherwise, e.g. for using S3 as YARN’s resource storage dir via the fs.defaultFS configuration property in Hadoop’s core-site.xml.

S3 ファイルシステムの設定

Interaction with S3 happens via one of Hadoop’s S3 FileSystem clients:

  1. S3AFileSystem (recommended for Hadoop 2.7 and later): file system for reading and writing regular files using Amazon’s SDK internally. ファイルの最大サイズが無く、IAM roleと連携します。
  2. NativeS3FileSystem (for Hadoop 2.6 and earlier): file system for reading and writing regular files. 最大オブジェクトサイズは5GBで、IAM role と連携しません。

使うことをお勧めするS3ファイルシステム実装です。It uses Amazon’s SDK internally and works with IAM roles (see Configure Access Credentials).

Flinkが有効なHadoop設定を示す必要があります。これはcore-site.xml内で以下のプロパティを含みます:

<configuration>

<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
     large results prior to transmitting them to S3. -->
<property>
  <name>fs.s3a.buffer.dir</name>
  <value>/tmp</value>
</property>

</configuration>

This registers S3AFileSystem as the default FileSystem for URIs with the s3a:// scheme.

NativeS3FileSystem

This file system is limited to files up to 5GB in size and it does not work with IAM roles (see Configure Access Credentials), meaning that you have to manually configure your AWS credentials in the Hadoop config file.

Flinkが有効なHadoop設定を示す必要があります。これはcore-site.xml内で以下のプロパティを含みます:

<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>

これはs3://スキーマを使うURIのためのデフォルトのファイルシステムとしてNativeS3FileSystemを登録します。

上に戻る

Hadoop 設定

You can specify the Hadoop configuration in various ways pointing Flink to the path of the Hadoop configuration directory, for example

  • by setting the environment variable HADOOP_CONF_DIR, or
  • by setting the fs.hdfs.hadoopconf configuration option in flink-conf.yaml:
fs.hdfs.hadoopconf: /path/to/etc/hadoop

これはFlinkを使ってHadoopの設定ディレクトリとして/path/to/etc/hadoopを登録します。Flinkは指定されたディレクトリのcore-site.xmlhdfs-site.xmlファイルを調べないでしょう。

上に戻る

アクセス証明書の設定

注意: EMR上のFlinkを実行している場合は手動でこれを設定する必要はありません。

S3ファイルシステムをセットアップした後で、FlinkがS3バケットにアクセスすることができることを確実にする必要があります。

When using S3AFileSystem, the recommended way of setting up credentials on AWS is via Identity and Access Management (IAM). S3バケットにアクセスするためにFlinkインスタンスに証明書を安全に渡すために、IAM機能を使うことができます。これを行う方法の詳細はこのドキュメントの範囲を超えます。AWSユーザガイドを参照してください。探すものはIAM Rolesです。

これを正しくセットアップすると、AWS内のS3へのアクセスを管理することができ、Flinkに何もアクセスキーを配布する必要がありません。

これは S3AFileSystem とだけ連携し、NativeS3FileSystemとは連携しないことに注意してください。

上に戻る

Access Keys with S3AFileSystem (Discouraged)

S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。IAM rolesの紹介なので、これは非推奨であることに注意してください。

S3AFileSystemのために、Hadoopのcore-site.xmlの中でfs.s3a.access.keyfs.s3a.secret.keyの両方を設定する必要があります :

<property>
  <name>fs.s3a.access.key</name>
  <value></value>
</property>

<property>
  <name>fs.s3a.secret.key</name>
  <value></value>
</property>

上に戻る

Access Keys with NativeS3FileSystem (Discouraged)

S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。しかし、これは非推奨で、必要とされるIAM ロールを持つS3AFileSystemを使う必要があります。

NativeS3FileSystemのために、Hadoopのcore-site.xmlの中でfs.s3.awsAccessKeyIdfs.s3.awsSecretAccessKeyの両方を設定する必要があります :

<property>
  <name>fs.s3.awsAccessKeyId</name>
  <value></value>
</property>

<property>
  <name>fs.s3.awsSecretAccessKey</name>
  <value></value>
</property>

上に戻る

S3 ファイルシステム依存の提供

注意: EMR上のFlinkを実行している場合は手動でこれを設定する必要はありません。

Hadoop’s S3 FileSystem clients are packaged in the hadoop-aws artifact (Hadoop version 2.6 and later). このJARと全ての依存はFlinkのクラスパスに追加される必要があります。別の言い方をすると、ジョブとタスクマネージャーの両方のクラスパスです。どのファイルシステム実装およびどのFlinkおよびHadoopバージョンを使うかに依存して、異なる依存を提供する必要があります(以下を見てください)。

There are multiple ways of adding JARs to Flink’s class path, the easiest being simply to drop the JARs in Flink’s lib folder. 全ての依存と一緒にhadoop-aws JARをコピーする必要があります。全てのマシーン上でHADOOP_CLASSPATH 環境変数の一部としてこれらのJARを含むディレクトリもエクスポートします。

どのファイルシステムを使っているかに依存して、以下の依存を追加してください。hadoop-2.7/share/hadoop/tools/libの中のHadoopバイナリの一部として見つけることができます:

  • S3AFileSystem:
    • hadoop-aws-2.7.3.jar
    • aws-java-sdk-s3-1.11.183.jar and its dependencies:
      • aws-java-sdk-core-1.11.183.jar
      • aws-java-sdk-kms-1.11.183.jar
      • jackson-annotations-2.6.7.jar
      • jackson-core-2.6.7.jar
      • jackson-databind-2.6.7.jar
      • joda-time-2.8.1.jar
      • httpcore-4.4.4.jar
      • httpclient-4.5.3.jar
  • NativeS3FileSystem:
    • hadoop-aws-2.7.3.jar
    • guava-11.0.2.jar

hadoop-common はFlinkの一部として利用可能ですが、GuavaはFlinkによって共有されます。

どのファイルシステムを使っているかに依存して、以下の依存を追加してください。hadoop-2.6/share/hadoop/tools/libの中のHadoopバイナリの一部として見つけることができます:

  • S3AFileSystem:
    • hadoop-aws-2.6.4.jar
    • aws-java-sdk-1.7.4.jar and its dependencies:
      • jackson-annotations-2.1.1.jar
      • jackson-core-2.1.1.jar
      • jackson-databind-2.1.1.jar
      • joda-time-2.2.jar
      • httpcore-4.2.5.jar
      • httpclient-4.2.5.jar
  • NativeS3FileSystem:
    • hadoop-aws-2.6.4.jar
    • guava-11.0.2.jar

hadoop-common はFlinkの一部として利用可能ですが、GuavaはFlinkによって共有されます。

これらのHadoopのバージョンはNativeS3FileSystemのためのサポートのみを持ちます。これはhadoop-commonの一部としてHadoop 2 のためのFlinkと一緒に事前パッケージ化されて同梱されています。クラスパスには何も追加する必要はありません。

上に戻る

一般的な問題

以下の章はAWS上でFlinkと連携する時の一般的な問題をリスト化します。

S3 ファイルシステム設定の失敗

ジョブのサブミットがNo file system found with scheme s3に注目した例外メッセージを伴って失敗した場合、このことはS3のために設置されたファイルシステムが無いことを意味します。Please check out the configuration sections for our shaded Hadoop/Presto or generic Hadoop file systems for details on how to configure this properly.

org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
  No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
Caused by: java.io.IOException: No file system found with scheme s3,
  referenced in file URI 's3://<bucket>/<endpoint>'.
    at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
    at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
    at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
    at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)

上に戻る

AWS アクセスキーID と秘密アクセスキーが指定されていない

ジョブがAWS Access Key ID and Secret Access Key must be specified as the username or passwordに注目した例外を伴って失敗した場合、アクセス証明書が適切にセットアップされていません。Please refer to the access credential section for our shaded Hadoop/Presto or generic Hadoop file systems for details on how to configure this.

org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
  HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
  AWS Access Key ID and Secret Access Key must be specified as the username or password
  (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
  or fs.s3n.awsSecretAccessKey properties (respectively) [...]
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
  be specified as the username or password (respectively) of a s3 URL, or by setting
  the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
    at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
    at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
    at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
    at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)

上に戻る

ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found

この例外を見た場合は、S3ファイルシステムがFlinkのクラスパスの一部ではありません。これを適切に設定する方法の詳細は S3 ファイルシステム依存の章を参照してください。

Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
  at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
  at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
  at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
  at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
  ... 25 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
  ... 32 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
  ... 33 more

上に戻る

IOException: 400: Bad Request

If you have configured everything properly, but get a Bad Request Exception and your S3 bucket is located in region eu-central-1, you might be running an S3 client, which does not support Amazon’s signature version 4.

[...]
Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
Caused by: org.jets3t.service.impl.rest.HttpException [...]

あるいは

com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...]

This should not apply to our shaded Hadoop/Presto S3 file systems but can occur for Hadoop-provided S3 file systems. In particular, all Hadoop versions up to 2.7.2 running NativeS3FileSystem (which depend on JetS3t 0.9.0 instead of a version >= 0.9.4) are affected but users also reported this happening with the S3AFileSystem.

Except for changing the bucket region, you may also be able to solve this by requesting signature version 4 for request authentication, e.g. by adding this to Flink’s JVM options in flink-conf.yaml (see configuration):

env.java.opts: -Dcom.amazonaws.services.s3.enableV4

上に戻る

NullPointerException at org.apache.hadoop.fs.LocalDirAllocator

This Exception is usually caused by skipping the local buffer directory configuration fs.s3a.buffer.dir for the S3AFileSystem. S3AFileSystemを適切に設定する方法を理解するために、S3AFileSystem 設定を参照してください。

[...]
Caused by: java.lang.NullPointerException at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
o.a.h.fs.FileSystem.create(FileSystem.java:907) at
o.a.h.fs.FileSystem.create(FileSystem.java:888) at
o.a.h.fs.FileSystem.create(FileSystem.java:785) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
... 25 more

上に戻る

TOP
inserted by FC2 system