SSL Setup
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

SSL セットアップ #

このページでは、Flinkプロセスとのネットワーク通信およびFlinkプロセス巻のネットワーク通信で、TLS/SSL認証と暗号化を有効にする方法について説明します。 **注意: TLS/SSL認証はデフォルトでは有効になっていません。

内部および外部の接続性 #

認証と暗号化によってマシーンプロセス間のネットワーク接続を保護する場合、Apache Flinkは内部接続と外部接続を区別します。 内部接続はFlinkプロセス間で行われる全ての接続を指します。これらの接続はFlinkの独自プロトコルを実行します。ユーザが内部接続のエンドポイントに直接接続することはりません。 外部/REST接続エンドポイントは、外部からFlinkプロセスに対して行われる全ての接続を指します。これには、Flink CLIとJobManager/Dispatcherとの通信を含む、実行中のFlinkジョブ/アプリケーションを開始および制御するためのweb UIとRESTコマンドが含まれます。

柔軟性を高めるために、内部および外部接続のセキュリティを個別に有効にして設定できます。

Internal and External Connectivity

内部接続 #

内部接続には以下のものが含まれます:

  • 制御メッセージ: JobManager / TaskManager / Dispatcher / ResourceManager の間RPC
  • データプレーン: シャッフル、ブロードキャスト、再配布などの際にデータを交換するためのTaskManager間の接続。
  • Blobサービス(ライブラリ、その他のartifactの配布)。

全ての内部接続はSSL認証され、暗号化されます。接続では、相互認証が使われます。つまり、各接続のサーバとクライアントの両方が相互に証明書を提示する必要があります。専用CSを使って内部証明書に排他的に署名する場合、証明書は共有secretとして効果的に機能します。 内部通信用の証明書は他の相手がFlinkとやりとりするのに必要がなく、コンテナイメージに単に追加するか、YARNデプロイメントに添付するだけで済みます。

  • このセットアップを実現する最も簡単な方法は、Flinkデプロイメント用の専用の公開/秘密キーペアと事故証明書生成することです。キーとトラストストアは同一で、そのキーペア/証明書のみが含まれます。 例を以下に示します

  • オペレータが企業全体の内部用CSを使うように制約されている(自己署名証明書を生成できない)環境では、そのCAによって署名されたFlinkデプロイメント用の専用のキーペア/証明書を持つことをお勧めします。 ただし、トラストストアにはSSLハンドシェイク中にデプロイメントの証明書を受け入れるCAのパブリック証明書も含める必要があります(JDKトラストストアの実装の要件)。

    注意: そのため、デプロイメント証明書(security.ssl.internal.cert.fingerprint)のフィンガープリントを指定することが重要です。自己署名されていない場合、その証明書が唯一の信頼できる証明書として固定し、トラストストアがそのCAによって署名された全ての証明書を信頼しないようにします。

注意: 内部通信は共有証明書を使って相互認証されるため、Flinkはホスト名の検証をスキップできます。 これにより、コンテナベースのセットアップが容易になります。

外部/REST接続 #

全ての外部接続は、HTTP/RESTエンドポイント経由で公開されます。例えば、web UIとCLIで使われます。

  • ジョブを送信するためのDispatcherとの通信(セッションクラスタ)
  • 実行中のジョブ/アプリケーションを検査および変更するためのJobMasterとの通信

RESTエンドポイントはSSL接続を要求するように設定できます。ただし、サーバはデフォルトであらゆるクライアントからの接続を受け入れます。つまり、RESTエンドポイントはクライアントを認証しないことを意味します。

RESTエンドポイントへの接続の認証が必要な場合は、設定によって単純な相互認証を有効にできますが、“side car proxy"をデプロイすることをお勧めします: RESTエンドポイントをループバックインタフェース(あるいは、Kubernetesのポッドローカルインタフェース)にバインドし、リクエストを認証してFlinkに転送するRESTプロキシを開始します。 Flinkユーザがデプロイしたプロキシの例としては、Envoy ProxyまたはNGINX with MOD_AUTHがあります。

プロキシに認証を移譲する背後にある理論的根拠は、そのようなプロキシは様々な認証オプションを提供し、したがって既存のインフラストラクチャへの統合が向上するということです。

SSLの設定 #

SSLは、内部外部接続に対して個別に有効にできます。

  • security.ssl.internal.enabled: 全ての内部接続に対してSSLを有効にします。
  • security.ssl.rest.enabled: REST / 外部接続に対してSSLを有効にします。

*注意: 後方互換性のために、security.ssl.enabledオプションは引き続き存在し、内部とRESTエンドポイントの両方でSSLを有効にします。

内部接続については、必要に応じて、様々な接続タイプのセキュリティを個別に無効にできます。 security.ssl.internal.enabledtrueの場合、次のパラメータをfalseにして特定の接続タイプのSSLを無効にできます:

  • taskmanager.data.ssl.enabled: TaskManager間のデータ通信
  • blob.service.ssl.enabled: JobManagerからTaskManagerへのBLOBの転送
  • pekko.ssl.enabled: JobManager / TaskManager / ResourceManager間のPekkoベースのRPC接続

キーストアとトラストストア #

SSL設定では、キーストアトラストストアの設定を必要とします。キーストアには公開証明書(公開キー)と秘密キーが含まれ、トラストストアには信頼された証明書または信頼された機関が含まれます。トラストストアがキーストアの証明書を信頼するように、両方のトラストストアを設定する必要があります。

内部接続 #

内部通信はサーバとクライアントの間で相互認証するため、キーストアとトラストストアは通常、共有secretとして機能する専用の証明書を参照します。このようなセットアップでは、証明書でワイルドカードホスト名またはアドレスを使えます。 自己署名証明書をつあく場合、同じファイルをキーストアとトラストストアとして使うことも可能です。

security.ssl.internal.keystore: /path/to/file.keystore
security.ssl.internal.keystore-password: keystore_password
security.ssl.internal.key-password: key_password
security.ssl.internal.truststore: /path/to/file.truststore
security.ssl.internal.truststore-password: truststore_password

自己署名ではなく、CAによって署名された証明書を使う場合は、接続を確立する時に特定の証明書のみを信頼できるように証明書ピンニングを使う必要があります。

security.ssl.internal.cert.fingerprint: 00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00

RESTエンドポイント(外部接続) #

RESTエンドポイントの場合、デフォルトではキーストアはサーバのエンドポイントによって使われ、トラストストアはRESTクライアント(CLIクライアントを含む)によってサーバの証明書を受け入れるために使われます。RESTキーストアに自己署名証明書がある場合、トラストストアはその証明書を直接信頼する必要があります。 RESTエンドポイントが適切な証明書階層を通じて署名された証明書を使う場合、その階層のルートはトラストストア内にある必要があります。

相互認証が有効な場合、キーストアとトラストストアは内部接続と同様にサーバエンドポイントとRESTクライアントの両方で使われます。

security.ssl.rest.keystore: /path/to/file.keystore
security.ssl.rest.keystore-password: keystore_password
security.ssl.rest.key-password: key_password
security.ssl.rest.truststore: /path/to/file.truststore
security.ssl.rest.truststore-password: truststore_password
security.ssl.rest.authentication-enabled: false

暗号スイート #

IETF RFC 7525は、強力なセキュリティのために特定の暗号スイートのセットを使うことを進めています。 これらの暗号スイートは多くのセットアップではそのままでは利用できないため、Flinkのデフォルト値は、若干弱いものの互換性の高い暗号スイートに設定されています。 可能であれば、Flink設定に以下のエントリを追加して、SSLセットアップをより強力な暗号スイートに更新することをお勧めします。

security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

これらの暗号スイートがセットアップでサポートされない場合、Flinkプロセスは相互に接続できないことがわかります。

SSLオプションの完全なリスト #

Key Default Type Description
security.context.factory.classes
"org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory";"org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory" List<String> List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.
security.delegation.token.provider.<serviceName>.enabled
true Boolean Controls whether to obtain credentials for services when security is enabled. By default, credentials for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.
security.delegation.tokens.enabled
true Boolean Indicates whether to start delegation tokens system for external services.
security.delegation.tokens.renewal.retry.backoff
1 h Duration The time period how long to wait before retrying to obtain new delegation tokens after a failure.
security.delegation.tokens.renewal.time-ratio
0.75 Double Ratio of the tokens's expiration time when new credentials should be re-obtained.
security.kerberos.access.hadoopFileSystems
(none) List<String> A semicolon-separated list of Kerberos-secured Hadoop filesystems Flink is going to access. For example, security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002;hdfs://namenode3:9003. The JobManager needs to have access to these filesystems to retrieve the security tokens.
security.kerberos.krb5-conf.path
(none) String Specify the local location of the krb5.conf file. If defined, this conf would be mounted on the JobManager and TaskManager containers/pods for Kubernetes and Yarn. Note: The KDC defined needs to be visible from inside the containers.
security.kerberos.login.contexts
(none) String A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication)
security.kerberos.login.keytab
(none) String Absolute path to a Kerberos keytab file that contains the user credentials.
security.kerberos.login.principal
(none) String Kerberos principal name associated with the keytab.
security.kerberos.login.use-ticket-cache
true Boolean Indicates whether to read from your Kerberos ticket cache.
security.kerberos.relogin.period
1 min Duration The time period when keytab login happens automatically in order to always have a valid TGT.
security.module.factory.classes
"org.apache.flink.runtime.security.modules.HadoopModuleFactory";"org.apache.flink.runtime.security.modules.JaasModuleFactory";"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory" List<String> List of factories that should be used to instantiate security modules. All listed modules will be installed. Keep in mind that the configured security context might rely on some modules being present.
security.ssl.algorithms
"TLS_RSA_WITH_AES_128_CBC_SHA" String The comma separated list of standard SSL algorithms to be supported. Read more here
security.ssl.internal.cert.fingerprint
(none) String The sha1 fingerprint of the internal certificate. This further protects the internal communication to present the exact certificate used by Flink.This is necessary where one cannot use private CA(self signed) or there is internal firm wide CA is required
security.ssl.internal.close-notify-flush-timeout
-1 Integer The timeout (in ms) for flushing the `close_notify` that was triggered by closing a channel. If the `close_notify` was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default)
security.ssl.internal.enabled
false Boolean Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).
security.ssl.internal.handshake-timeout
-1 Integer The timeout (in ms) during SSL handshake. (-1 = use system default)
security.ssl.internal.key-password
(none) String The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore
(none) String The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore-password
(none) String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.session-cache-size
-1 Integer The size of the cache used for storing SSL session objects. According to here, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default).
security.ssl.internal.session-timeout
-1 Integer The timeout (in ms) for the cached SSL session objects. (-1 = use system default)
security.ssl.internal.truststore
(none) String The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.truststore-password
(none) String The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.protocol
"TLSv1.2" String The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.
security.ssl.provider
"JDK" String The SSL engine provider to use for the ssl transport:
  • JDK: default Java-based SSL engine
  • OPENSSL: openSSL-based SSL engine using system libraries
OPENSSL is based on netty-tcnative and comes in two flavours:
  • dynamically linked: This will use your system's openSSL libraries (if compatible) and requires opt/flink-shaded-netty-tcnative-dynamic-*.jar to be copied to lib/
  • statically linked: Due to potential licensing issues with openSSL (see LEGAL-393), we cannot ship pre-built libraries. However, you can build the required library yourself and put it into lib/:
    git clone https://github.com/apache/flink-shaded.git && cd flink-shaded && mvn clean package -Pinclude-netty-tcnative-static -pl flink-shaded-netty-tcnative-static
security.ssl.rest.authentication-enabled
false Boolean Turns on mutual SSL authentication for external communication via the REST endpoints.
security.ssl.rest.cert.fingerprint
(none) String The sha1 fingerprint of the rest certificate. This further protects the rest REST endpoints to present certificate which is only used by proxy serverThis is necessary where once uses public CA or internal firm wide CA
security.ssl.rest.enabled
false Boolean Turns on SSL for external communication via the REST endpoints.
security.ssl.rest.key-password
(none) String The secret to decrypt the key in the keystore for Flink's external REST endpoints.
security.ssl.rest.keystore
(none) String The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints.
security.ssl.rest.keystore-password
(none) String The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints.
security.ssl.rest.truststore
(none) String The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints.
security.ssl.rest.truststore-password
(none) String The password to decrypt the truststore for Flink's external REST endpoints.
security.ssl.verify-hostname
true Boolean Flag to enable peer’s hostname verification during ssl handshake.
zookeeper.sasl.disable
false Boolean
zookeeper.sasl.login-context-name
"Client" String
zookeeper.sasl.service-name
"zookeeper" String

キーストアとトラストストアの作成とデプロイ #

キー、証明書、キーストアとトラストストは、keytoolユーティリティを使って生成できます。 Flinkクラスタ内の各ノードからアクセスできる適切なJavaキーストアとトラストストアが必要です。

  • スタンドアローンセットアップの場合、これはファイルを各ノードにコピーするか、マウントされた共有ディレクトリにファイルを追加することを意味します。
  • コンテナベースのセットアップの場合、キーストアとトラストストアファイルをコンテナイメージに追加します。
  • Yarnセットアップの場合、クラスタのデプロイメントフェーズでキーストアとトラストストアファイルを自動的に配布できます。

外部に面したRESTエンドポイントの場合、証明書のcommon nameまたはsubject alternative namesはノードのホスト名とIPアドレスに一致する必要があります。

スタンドアローンとKubernetesのSSLセットアップの例 #

内部接続

以下のkeytoolコマンドを使ってキーストアにキーペアを作成します:

$ keytool -genkeypair \
  -alias flink.internal \
  -keystore internal.keystore \
  -dname "CN=flink.internal" \
  -storepass internal_store_password \
  -keyalg RSA \
  -keysize 4096 \
  -storetype PKCS12

キーストア内の単一のキー/証明書は、サーバとクライアントのエンドポイントで同じように使われます(相互認証)。 キーペアは内部セキュリティの共通secretとして機能うし、キーストアとトラストストアとして直接使えます。

security.ssl.internal.enabled: true
security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore
security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore
security.ssl.internal.keystore-password: internal_store_password
security.ssl.internal.truststore-password: internal_store_password
security.ssl.internal.key-password: internal_store_password

RESTエンドポイント

RESTエンドポイントは、Flinkの一部ではないツール(例えば、REST APIへのcurlリクエストなど)を含む外部プロセスからの接続を受け取る可能性があります。 CA階層を通じて署名された適切な証明書をセットアップすることは、RESTエンドポイントにとって意味がある場合があります。

ただし、前述したように、RESTエンドポイントはクライアントを認証しないため、通常はいずれにしてもプロキシを介してセキュリティを保護する必要があります。

RESTエンドポイント(単純な自己署名証明書)

この例では、単純なキーストア/トラストストアペアの作成する方法を示します。トラストストアには主キーが含まれていないため、ほかのアプリケーションと共有できます。この例では、myhost.company.org / ip:10.0.2.15はJobManagerのノード(またはサービス)です。

$ keytool -genkeypair -alias flink.rest -keystore rest.keystore -dname "CN=myhost.company.org" -ext "SAN=dns:myhost.company.org,ip:10.0.2.15" -storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12

$ keytool -exportcert -keystore rest.keystore -alias flink.rest -storepass rest_keystore_password -file flink.cer

$ keytool -importcert -keystore rest.truststore -alias flink.rest -storepass rest_truststore_password -file flink.cer -noprompt
security.ssl.rest.enabled: true
security.ssl.rest.keystore: /path/to/flink/conf/rest.keystore
security.ssl.rest.truststore: /path/to/flink/conf/rest.truststore
security.ssl.rest.keystore-password: rest_keystore_password
security.ssl.rest.truststore-password: rest_truststore_password
security.ssl.rest.key-password: rest_keystore_password

RESTエンドポイント(自己署名CAを使用)

以下のkyetoolコマンドを使って、自己署名CAを持つトラストストアを作成します。

$ keytool -genkeypair -alias ca -keystore ca.keystore -dname "CN=Sample CA" -storepass ca_keystore_password -keyalg RSA -keysize 4096 -ext "bc=ca:true" -storetype PKCS12

$ keytool -exportcert -keystore ca.keystore -alias ca -storepass ca_keystore_password -file ca.cer

$ keytool -importcert -keystore ca.truststore -alias ca -storepass ca_truststore_password -file ca.cer -noprompt

ここで、上記のCAによって署名された証明書を持つRESTエンドポイントのキーストアを作成します。 flink.company.org / ip:10.0.2.15をJobManagerのホスト名にします。

$ keytool -genkeypair -alias flink.rest -keystore rest.signed.keystore -dname "CN=flink.company.org" -ext "SAN=dns:flink.company.org" -storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12

$ keytool -certreq -alias flink.rest -keystore rest.signed.keystore -storepass rest_keystore_password -file rest.csr

$ keytool -gencert -alias ca -keystore ca.keystore -storepass ca_keystore_password -ext "SAN=dns:flink.company.org,ip:10.0.2.15" -infile rest.csr -outfile rest.cer

$ keytool -importcert -keystore rest.signed.keystore -storepass rest_keystore_password -file ca.cer -alias ca -noprompt

$ keytool -importcert -keystore rest.signed.keystore -storepass rest_keystore_password -file rest.cer -alias flink.rest -noprompt

Now add the following configuration to your flink-conf.yaml:

security.ssl.rest.enabled: true
security.ssl.rest.keystore: /path/to/flink/conf/rest.signed.keystore
security.ssl.rest.truststore: /path/to/flink/conf/ca.truststore
security.ssl.rest.keystore-password: rest_keystore_password
security.ssl.rest.key-password: rest_keystore_password
security.ssl.rest.truststore-password: ca_truststore_password

curlユーティリティを使ったRESTエンドポイントをクエリするためのヒント

opensslを使って、キーストアをPEM形式に変換できます:

$ openssl pkcs12 -passin pass:rest_keystore_password -in rest.keystore -out rest.pem -nodes

次に、curlを使ってRESTエンドポイントをクエリできます:

$ curl --cacert rest.pem flink_url

相互SSLが有効な場合:

$ curl --cacert rest.pem --cert rest.pem flink_url

YARNデプロイメントのヒント #

YARNの場合、Yarnのツールを使って次のことを行うことができます:

  • 内部通信のためのセキュリティの設定は、上記の例と全く同じです。

  • RESTエンドポイントを保護するには、JobManagerがデプロイされる可能性のあるすべてのホストに対して有効になるように、RESTエンドポイントの証明書を発行する必要があります。これは、ワイルドカードDNS名を使うか、複数のDNS名を追加することで行えます。

  • キーストアとトラストストアをデプロイする最も簡単な方法は、YARNクライアントのship filesオプション(-yt)を使うことです。 キーストアとトラストストアをローカルディレクトリ(deploy-keys/など)にコピーし、次のようにYARNセッションを開始します: flink run -m yarn-cluster -yt deploy-keys/ flinkapp.jar

  • YARNを使ってデプロイする場合、FlinkのwebダッシュボードはYARNプロキシのトラッキングURLを使ってアクセス可能です。 YARNプロキシがFlinkのHTTPS URLにアクセスできるようにするには、FlinkのSSL証明書を受け付けるようにYARNプロキシを設定する必要があります。 そのためには、YARNプロキシノード上のJavaのデフォルトのトラストストアに独自のCA証明書を追加します。

Back to top

inserted by FC2 system