<

ドキュメント

Kafka 2.7 Documentation

以前のリリース: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X.

7. セキュリティ

7.1セキュリティ概要

リリース 0.9.0.0で、Kafkaコミュニティは、個々にあるいは一緒に使われる、Kafkaクラスタのセキュリティを強める多くの機能を追加しました。以下のセキュリティ手段が現在のところサポートされます:
  1. SSLまたはSASLを使って、クライアント(プロデューサとコンシューマ)、他のブローカおよびツールからブローカーへの接続の認証。Kafkaは以下のSASL機構をサポートします:
    • SASL/GSSAPI (Kerberos) - バージョン0.9.0.0から
    • SASL/PLAIN - バージョン0.10.0.0から
    • SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - バージョン 0.10.2.0から
    • SASL/OAUTHBEARER - バージョン 2.0から
  2. ブローカーからZooKeeperへの接続の認証
  3. SSLを使って、ブローカーとクライアント間、ブローカー間、あるいはブローカーとツール間で転送されるデータを暗号化 (SSLが有効な場合にパフォーマンスの低下があり、大きさはCPUの型とJVM実装に依存することに注意してください)。
  4. クライアントによる 読み込み/書き込み操作の認証
  5. 認証は差し込み可能で外部の認証サービスとの統合がサポートされます
セキュリティは任意であることに留意する価値はあります - 非セキュアなクラスタ、認証の混合、非認証、暗号化と非暗号化のクライアントがサポートされます。以下の説明はクライアントとブローカーの両方でセキュリティ機能を設定および使う方法を説明します。

7.2SSLを使った暗号化と認証

Apache Kafka により、クライアントは通信の暗号化と認証のために SSL を使うことができます。デフォルトでは SSL は無効ですが、必要に応じて有効にすることができます。以下の段落では、独自の PKI インフラストラクチャをセットアップし、それを使って証明書を作成し、これらを使うように Kafka を構成する方法について詳しく説明します。
  1. 各KafkaブローカーのためのSSLキーと証明書を生成する

    SSL をサポートする1つ以上のブローカーを配備する最初のステップは、全てのサーバの公開/秘密鍵ペアを生成することです。Kafka は全てのキーと証明書がキーストアに格納されることを想定しているため、このタスクには Java の keytool コマンドを使います。このツールは2つの異なるキーストア形式をサポートします。Java 固有の jks 形式は今では非推奨で、PKCS12 もサポートします。PKCS12 は Java バージョン 9 のデフォルトの形式で、使用中の Java バージョンに関係なくこの形式が使われていることを確認するには、以下の全てのコマンドで明示的に PKCS12 形式を指定します。
                    keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12
    上のコマンドの中で2つのパラメータを指定する必要があります:
    1. keystorefile: このブローカーのキー(および後で証明書)を格納するキーストアファイル。キーストアファイルはこのブローカーの秘密鍵と公開鍵が含まれているため、安全に保管する必要があります。このキーは目的のサーバに転送/残すべきではないため、理想的には、このステップはキーが使われる Kafka ブローカーで実行されます。
    2. validity: キーの有効期間の日数。これは、証明書への署名で決定される証明書の有効期間とは異なることに注意してください。同じキーを使って複数の証明書をリクエストできます: キーの有効期間が10年の場合、CA が署名するのは1年間のみ有効な証明書であり、10個の証明書で同じキーを使うことができます。

    作成されたばかりの秘密鍵で使える証明書を取得するには、証明書の署名リクエストを作成する必要があります。この署名のリクエストは、信頼できる CA によって署名された場合、実際の証明書になり、キーストアにインストールして、認証目的で使うことができます。
    証明書の署名要求を作成するには、これまで作成された全てのサーバのキーストアに対して以下のコマンドを実行します。
                    keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
    このコマンドは、証明書にホスト名情報を追加することを想定しています。そうでない場合は、拡張パラメータ -ext SAN=DNS:{FQDN},IP:{IPADDRESS1} を省略することができます。詳細については、以下を見てください。
    ホスト名の検証
    ホスト名検証は、有効になっている場合、接続しているサーバが掲示する証明書の属性をそのサーバの実際のホスト名または IP アドレスと照合して確認し、実際に正しいサーバに接続していることを確認するプロセスです。このチェックの主な理由は、中間者攻撃を防ぐためです。Kafka の場合、このチェックはデフォルトで長い間無効になっていますが、Kafka 2.0.0 以降、サーバのホスト名検証はクライアント接続とブローカー間接続に対してデフォルトで有効になっています。サーバホスト名検証は、ssl.endpoint.identification.algorithm を空に設定することで無効にすることができます。
    動的に設定されたブローカーのリスナーについては、ホスト名の検証は kafka-configs.sh を使って無効にすることができます:
                    bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

    注意:

    通常、"動作させるだけ" の最も簡単な方法で、"時間があれば後で修正する"という約束を除いて、ホスト名検証を無効にする理由はありません!
    ホスト名検証を正しく行うことは、適切なタイミングで行うとそれほど難しくありませんが、クラスタが稼働するとさらに難しくなります - いいですか。いますぐやってください!

    ホスト名検証が有効になっている場合、クライアントはサーバの完全修飾ドメイン名 (FQDN) あるいは IPアドレスを以下の2つのフィールドのいずれかに対して検証します:

    1. 一般名 (CN)
    2. サブジェクトの別名 (SAN)

    Kafka は両方のフィールドをチェックしますが、ホスト名検証のための一般名フィールドの使用は、2000年以降非推奨であり、可能であれば回避する必要があります。さらに、SAN フィールドはさらに柔軟性が高く、証明書で複数の DNS と IP エントリを宣言できます。
    別の利点は、ホスト名の検証に SAN フィールドが使われる場合、一般名を認証目的でより意味のある値に設定できることです。署名済み証明書に SAN フィールドを含める必要があるため、署名リクエストを生成する時に指定します。キーペアを生成する時に指定することもできますが、これは自動的に署名リクエストにコピーされません。
    SANフィールドを追加するには、以下の引数 -ext SAN=DNS:{FQDN},IP:{IPADDRESS} をキーツールコマンドに追加します:
                    keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
  2. 独自のCAの生成

    この手順の後で、クラスタ内の各マシンはすでにトラフィックを暗号化するために使うことができる公開鍵と秘密鍵のペアを持ちます。これは署名書を作成するための基礎です。認証機能を追加するために、署名リクエストは信頼できる機関によって署名される必要があります。これはこのステップで作られます。

    認証局(CA)は証明書の署名に責任があります。CA はパスポートを発行する政府機関のように機能します - 政府機関はパスポートを偽造することが難しくなるように各パスポートにスタンプを押し(署名)します。他の政府機関はパスポートが信頼できることを保証するためにスタンプを検証します。同様に、CAは証明書を署名し、暗号は署名された証明書が計算上は偽造することが難しいことを保証します。従って、CAが本物で信頼された当局であれば、クライアントは信頼できるマシーンに接続していることを強く保証します。

    このガイドでは、私たち自身が認証局となります。企業の環境で運用クラスタをセットアップする場合、これらの認証局は通常、企業全体で信頼されている企業 CA によって署名されます。この場合の考慮事項については、本番環境での一般的な落とし穴を見てください。

    OpenSSL のバグにより、x509 モジュールは要求された拡張フィールドを CSR から 最終的な証明書にコピーしません。 ホスト名検証を有効にするために証明書に SAN 拡張を含める必要があるため、代わりに ca モジュールを使います。これには、CA キーペアを作成する前に、幾つかの追加設定を行う必要があります。
    以下のリストを openssl-ca.cnf というファイルに保存し、必要に応じて有効性と共通属性の値を調整します。

    HOME            = .
    RANDFILE        = $ENV::HOME/.rnd
    
    ####################################################################
    [ ca ]
    default_ca    = CA_default      # The default ca section
    
    [ CA_default ]
    
    base_dir      = .
    certificate   = $base_dir/cacert.pem   # The CA certifcate
    private_key   = $base_dir/cakey.pem    # The CA private key
    new_certs_dir = $base_dir              # Location for new certs after signing
    database      = $base_dir/index.txt    # Database index file
    serial        = $base_dir/serial.txt   # The current serial number
    
    default_days     = 1000         # How long to certify for
    default_crl_days = 30           # How long before next CRL
    default_md       = sha256       # Use public key default MD
    preserve         = no           # Keep passed DN ordering
    
    x509_extensions = ca_extensions # The extensions to add to the cert
    
    email_in_dn     = no            # Don't concat the email in the DN
    copy_extensions = copy          # Required to copy SANs from CSR to cert
    
    ####################################################################
    [ req ]
    default_bits       = 4096
    default_keyfile    = cakey.pem
    distinguished_name = ca_distinguished_name
    x509_extensions    = ca_extensions
    string_mask        = utf8only
    
    ####################################################################
    [ ca_distinguished_name ]
    countryName         = Country Name (2 letter code)
    countryName_default = DE
    
    stateOrProvinceName         = State or Province Name (full name)
    stateOrProvinceName_default = Test Province
    
    localityName                = Locality Name (eg, city)
    localityName_default        = Test Town
    
    organizationName            = Organization Name (eg, company)
    organizationName_default    = Test Company
    
    organizationalUnitName         = Organizational Unit (eg, division)
    organizationalUnitName_default = Test Unit
    
    commonName         = Common Name (e.g. server FQDN or YOUR name)
    commonName_default = Test Name
    
    emailAddress         = Email Address
    emailAddress_default = test@test.com
    
    ####################################################################
    [ ca_extensions ]
    
    subjectKeyIdentifier   = hash
    authorityKeyIdentifier = keyid:always, issuer
    basicConstraints       = critical, CA:true
    keyUsage               = keyCertSign, cRLSign
    
    ####################################################################
    [ signing_policy ]
    countryName            = optional
    stateOrProvinceName    = optional
    localityName           = optional
    organizationName       = optional
    organizationalUnitName = optional
    commonName             = supplied
    emailAddress           = optional
    
    ####################################################################
    [ signing_req ]
    subjectKeyIdentifier   = hash
    authorityKeyIdentifier = keyid,issuer
    basicConstraints       = CA:FALSE
    keyUsage               = digitalSignature, keyEncipherment
    次に、データベースとシリアル番号ファイルを作成します。これらはこの CA で署名された証明書を追跡するために使われます。これらはどちらも CA キーと同じディレクトリにある単なるテキストファイルです。
                    echo 01 > serial.txt
                    touch index.txt
    これらのステップが完了すると、後で証明書に署名するために使われる CA を生成する準備が整います。
                openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM
    CA は、それ自身が署名した公開鍵/秘密鍵ペアと証明書で、他の証明書に署名することのみを目的としています。
    このキーペアは安全に保管する必要があります。誰かがそれにアクセスした場合、インフラストラクチャによって信頼される証明書を作成して署名することができます。つまり、この CA を信頼するサービスに接続する時に、誰にでもなりすますことができることを意味します。
    次のステップはクライアントがこのCAを信頼できるように生成されたCAを **clients' truststore** に追加することです:
                    keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
    注意: もしKafka brokers configで ssl.client.auth を "requested" あるいは "required" に設定することでKafkaブローカーがクライアントの認証を必要とするように設定する場合、Kafkaブローカーのためのトラストストアを提供しクライアントのキーが署名された全てのCA証明書を持つようにしなければなりません。
                    keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    各マシーンの独自の身元を格納するステップ1でのキーストアとは対照的に、クライアントのトラストストアはクライアントが信頼すべき全ての証明書を格納します。証明書をトラストストアにインポートすることはその証明書によって署名された全ての証明書を信頼することを意味します。上の例えから政府機関(CA)を信頼することは、発行した全てのパスポート(証明書)を信頼することも意味します。この特質は信頼の連鎖と呼ばれ、SSLを大規模なKafkaクラスタに配備する時に特に便利です。1つのCAを使ってクラスタ内の全ての証明書を署名することができ、全てのマシーンにそのCAを信頼する同じトラストストアを共有することができます。そのようにして全てのマシーンは全ての他のマシーンを認証することができます。
  3. 証明書の署名

    そして、CAを使って署名します:
                    openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}
    最後にCAの証明書と署名された証明書をキーストアの両方をインポートする必要があります:
                    keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
                    keytool -keystore {keystore} -alias localhost -import -file cert-signed
    パラメータの定義は以下の通りです:
    1. keystore: キーストアの場所
    2. CA 証明書: CA の証明書
    3. 証明書の署名リクエスト: サーバーキーで作成された csr
    4. サーバ証明書: サーバの署名済み証明書を書き込むファイル
    これは truststore.jks と呼ばれる1つのトラストストアを残します - これは、全てのクライアントとブローカーで同じにすることができ、機密情報が含まれないため、これを保護する必要はありません。
    さらに、ノードのキー、証明書、CA 証明書を含むノードごとに1つの server.keystore.jks があります。これらのファイルの使用法については、 Kafka ブローカーの構成Kafka クライアントの構成 を参照してください。

    このトピックに関するツールの支援については、これらの手順を支援するための広範なスクリプトが用意されている easyRSA プロジェクトをチェックしてください。

    SSL key and certificates in PEM format
    From 2.7.0 onwards, SSL key and trust stores can be configured for Kafka brokers and clients directly in the configuration in PEM format. This avoids the need to store separate files on the file system and benefits from password protection features of Kafka configuration. PEM may also be used as the store type for file-based key and trust stores in addition to JKS and PKCS12. To configure PEM key store directly in the broker or client configuration, private key in PEM format should be provided in ssl.keystore.key and the certificate chain in PEM format should be provided in ssl.keystore.certificate.chain. To configure trust store, trust certificates, e.g. public certificate of CA, should be provided in ssl.truststore.certificates. Since PEM is typically stored as multi-line base-64 strings, the configuration value can be included in Kafka configuration as multi-line strings with lines terminating in backslash ('\') for line continuation.

    Store password configs ssl.keystore.password and ssl.truststore.password are not used for PEM. If private key is encrypted using a password, the key password must be provided in ssl.key.password. Private keys may be provided in unencrypted form without a password when PEM is specified directly in the config value. In production deployments, configs should be encrypted or externalized using password protection feature in Kafka in this case. Note that the default SSL engine factory has limited capabilities for decryption of encrypted private keys when external tools like OpenSSL are used for encryption. Third party libraries like BouncyCastle may be integrated witn a custom SslEngineFactory to support a wider range of encrypted private keys.

  4. 本番環境の落とし中

    上記の段落は、独自の CA を作成し、それを使ってクラスタの証明書に署名するプロセスを説明しています。これはサンドボックス、dev、test、類似のシステムに非常に役立ちますが、通常企業環境で本番クラスタの証明書を作成するための正しいプロセスではありません。企業は通常、独自の CA を運用し、ユーザは CSR を送信して CA で証明することができます。これはユーザが CA を安全に保つ責任が無く、誰もが信頼できる中央機関があるという利点があります。ただし、これは、証明書に署名するプロセスの多くの制御をユーザから奪います。多くの売、企業 CA を運用している人は、Kafka でこれらの証明書を使おうとする時に問題を引き起こす可能性がある証明書に厳しい制限を適用します。
    1. 拡張キーの使用
      証明書には、証明書を拡張できる目的を制御する拡張フィールドが含まれている場合があります。このフィールドが空の場合、使用法に制限はありませんが、ここで使用法が指定されている場合は、有効な SSL 実装でこれらの使用法を強制する必要があります。
      Kafka の関連する使用法は以下の通りです:
      • クライアント認証
      • サーバ認証
      クラスタ内通信の場合、全てのブローカーは他のブローカーに対してクライアントとサーバの両方として動作するため、Kafka ブローカーはこれら両方の使用を許可する必要があります。企業の CA が Web サーバ用の署名プロファイルを持ち、これを Kafka にも使うことは珍しくありません。これは serverAuth の使用値のみを含み、SSL ハンドシェイクが失敗します。
    2. 中間証明書
      企業ルート CA はセキュリティ上の理由で、しばしばオフラインになっています。日常的な使用を可能にするために、いわゆる中間 CA が作成され、最終的な証明書の署名に使われます。中間 CA によって署名された証明書をキーストアにインポートする場合、ルート CA までの信頼チェーン全体を提供する必要があります。これは、単に証明書を1つの結合された証明書ファイルにcatし、これを keytool にインポートすることで実行できます。
    3. 拡張フィールドのコピーに失敗
      CA オペレータは、CSR から拡張フィールドをコピーして要求することを躊躇することが多く、悪意のあるものが誤解を招く可能性のある値または詐欺的な値を持つ証明書を取得することが困難になるため、これらを自分で指定することを好みます。適切なホスト名検証を可能にするために、要求された全ての SAN フィールドが証明書に含まれているかどうか、署名済み証明書をダブルチェックすることをお勧めします。以下のコマンドは、証明書の詳細をコンソールに出力することができます。これを最初に要求されたものと比較する必要がります:
                              openssl x509 -in certificate.crt -text -noout
  5. Kafkaブローカーの設定

    Kafkaブローカーは複数のポート上での接続のlistenをサポートします。server.properties 内で以下のプロパティを設定する必要があります。これは1つ以上のカンマ区切りの値を持つ必要があります:
    リスナー
    もしSSLが内部ブローカーの通信で有効で無い場合(有効にする方法は以下を見てください)、PLAINTEXT と SSL ポートの両方が必要になるでしょう。
                listeners=PLAINTEXT://host.name:port,SSL://host.name:port
    以下のSSL設定がブローカー側で必要とされます
                ssl.keystore.location=/var/private/ssl/server.keystore.jks
                ssl.keystore.password=test1234
                ssl.key.password=test1234
                ssl.truststore.location=/var/private/ssl/server.truststore.jks
                ssl.truststore.password=test1234
    注意: ssl.truststore.password は技術的には任意ですが強くお勧めします。もしパスワードが設定されない場合、truststoreへのアクセスはまだ可能ですが、完全性のチェックは無効にされます。考慮に値する任意の設定:
    1. ssl.client.auth=none ("required" => クライアントの認証が必要です、"requested" => クライアント認証が必要で、証明書なしのクライアントはまだ接続することができます。"requested"の使用は大丈夫だという誤った間隔を与え、間違ったクライアントはまだ接続に成功するだろうためお勧めできません。)
    2. ssl.cipher.suites (任意)。cipherスイートはTLSあるいはSSLネットワークプロトコルを使うネットワーク接続のためのセキュリティ設定を取り決めるために使われる認証、暗号、MACおよびキー交換アルゴリズムの組み合わせで名前を付けられます。(デフォルトは空のリストです)
    3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (クライアントから受け付けるつもりのSSLプロトコルをリスト化する。SSLはTLSに賛成して非推奨で、プロダクションでのSSLの使用はお勧めされないことに注意してください)
    4. ssl.keystore.type=JKS
    5. ssl.truststore.type=JKS
    6. ssl.secure.random.implementation=SHA1PRNG
    内部ブローカー通信のためにSSLを有効にしたい場合、以下を server.properties ファイル (デフォルトは PLAINTEXTです)に追加します
                security.inter.broker.protocol=SSL

    幾つかの国での輸入規定により、Oracleの実装はデフォルトで利用可能な暗号アルゴリズムの挙動が制限されます。より強いアルゴリズム(例えば AES with 256-bit keys)が必要な場合JCE Unlimited Strength Jurisdiction Policy Files を取得しJDK/JRE にインストールする必要があります。詳しくはJCA Providers Documentationを見てください。

    JRE/JDKは暗号操作のために使われるデフォルトの疑似ランダム数字生成器(PRNG)を持つだろうため、ssl.secure.random.implementation によって使われる実装を設定する必要はありませんしかし、いくつかの実装ではパフォーマンスの問題があります (特にLinuxシステム上でのデフォルトの選択である NativePRNG はグローバルロックを使います)。SSL接続のパフォーマンスが問題になる場合は、明示的に使用する実装の設定を考慮してください。SHA1PRNG 実装は非ブロッキングで高負荷でとても良いパフォーマンスの特徴を示しています (ブローカーごとに、50 MB/sec の生成されたメッセージに加えてリプリケーショントラフィック)。

    一度ブローカーを開始すると、server.logで見ることができる筈です
                with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
    もしサーバのキーストアとトラストストアが適切にセットアップされているかをすばやく確認するために、以下のコマンドを実行することができます
    openssl s_client -debug -connect localhost:9093 -tls1
    (注意: TLSv1 は ssl.enabled.protocols の下にリスト化されているべきです)
    このマンドの出力の中にサーバの証明書を見る筈です:
                -----BEGIN CERTIFICATE-----
                {variable sized random bytes}
                -----END CERTIFICATE-----
                subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
                issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
    もし証明書が現れないか、他のエラーメッセージがある場合は、キーストアが適切にセットアップされていません。
  6. Kafkaクライアントの設定

    SSLは新しいKafkaプロデューサとコンシューマでサポートされ、古いAPIはサポートされません。SSLの設定はプロデューサとコンシューマの両方で同じでしょう。
    もしクライアントの認証がブローカーで必要ではない場合、以下が最小の設定の例です:
                security.protocol=SSL
                ssl.truststore.location=/var/private/ssl/client.truststore.jks
                ssl.truststore.password=test1234
    注意: ssl.truststore.password は技術的には任意ですが強くお勧めします。もしパスワードが設定されない場合、truststoreへのアクセスはまだ可能ですが、完全性のチェックは無効にされます。もしクライアントの認証が必要な場合、ステップ1でのようにキーストアが生成されなければならず、以下も設定されなければなりません:
                ssl.keystore.location=/var/private/ssl/client.keystore.jks
                ssl.keystore.password=test1234
                ssl.key.password=test1234
    必要とされるかもしれない他の構成の設定は、要求とブローカーの設定によります:
    1. ssl.provider (任意)。SSL接続のために使われるセキュリティプロバイダの名前。デフォルト値はJVMのデフォルトのセキュリティプロバイダです。
    2. ssl.cipher.suites (任意)。cipherスイートはTLSあるいはSSLネットワークプロトコルを使うネットワーク接続のためのセキュリティ設定を取り決めるために使われる認証、暗号、MACおよびキー交換アルゴリズムの組み合わせで名前を付けられます。
    3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. ブローカー側で設定される少なくとも1つのプロトコルがリスト化されるべきです
    4. ssl.truststore.type=JKS
    5. ssl.keystore.type=JKS

    コンソール-プロデューサとコンソール-コンシューマを使った例:
                kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
                kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

7.3SASLを使った認証

  1. JAAS 設定

    KafkaはSASL設定のためのJavaの認証と権限サービス(JAAS) を使います。

    1. KafkaブローカーのためのJAAS設定

      KafkaServer は各KafkaServer/ブローカーによって使われるJAASファイルでのセクション名です。このセクションは内部ブローカーのためにブローカーによって作成されたSASLクライアント接続を含むブローカーのためのSASL設定オプションを提供します。複数のlistenerがSASLを使うように設定される場合、セクション名はピリオドが続く小文字のlistener名が前に付くかもしれません。例えば sasl_ssl.KafkaServer

      Client セクションはzookeeperを使ったSASL接続を認証するために使われます。またブローカーだけがこれらのノードを変更できるように、これらのノードをロックするSASL ACLをブローカーがzookeeperノード上に設定することができます。全てのブローカーに渡って同じprincipal名を持つ必要があります。クライアント以外のセクション名を使いたい場合は、システムプロパティzookeeper.sasl.clientconfigを適切な名前 (例えば-Dzookeeper.sasl.clientconfig=ZkClient)に設定してください。

      ZooKeeperはデフォルトでサービス名として "zookeeper" を使います。もしこれを変更したい場合は、システムプロパティzookeeper.sasl.client.usernameを適切な名前 (例えば-Dzookeeper.sasl.client.username=zk)に設定してください。

      ブローカーはブローカー設定プロパティ sasl.jaas.configも使ってJAASを設定するかもしれません。プロパティ名はSASL機構を含むlistenerのプリフィックスを前に付けなければなりません。つまりlistener.name.{listenerName}.{saslMechanism}.sasl.jaas.config。設定値の中で1つのログインモジュールだけが指定されるかもしれません。もし複数の仕組みがlistener上で設定された場合、設定はlistenerと仕組みのプリフィックスを使って各機構ごとに提供されなければなりません。例えば:

              listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
                  username="admin" \
                  password="admin-secret";
              listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
                  username="admin" \
                  password="admin-secret" \
                  user_admin="admin-secret" \
                  user_alice="alice-secret";
      もしJAAS設定が異なるレベルで定義される場合、以下の優先度の順番が使われます:
      • ブローカーの設定プロパティ listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
      • 静的なJAAS設定の {listenerName}.KafkaServer セクション
      • 静的なJAAS設定の KafkaServer セクション
      ZooKeeper JAAS設定は静的なJAAS設定を使ってのみ設定されるかもしれないことに注意してください。

      ブローカーの設定の例についてはGSSAPI (Kerberos), PLAIN, SCRAM あるいは OAUTHBEARER を見てください。

  2. KafkaクライアントのためのJAAS設定

    クライアントは、クライアントの設定プロパティ sasl.jaas.config あるいはブローカーのように 静的な JAAS 設定ファイルを使って設定することができます。

    1. クライアントの設定プロパティを使ったJAAS 設定

      クライアントは物理的な設定ファイル無しにプロデューサあるいはコンシューマプロパティとしてJAAS設定を指定することができます。このモードはクライアントごとに異なるプロパティを指定することで同じJVM内の異なるプロデューサとコンシューマに異なる証明書を使うことができるようにもします。もし静的なJAAS設定システムプロパティjava.security.auth.login.config とクライアントプロパティ sasl.jaas.config の両方が指定された場合、クライアントプロパティが使われるでしょう。

      設定の例についてはGSSAPI (Kerberos), PLAIN, SCRAM あるいは OAUTHBEARER を見てください。

    2. 静的な設定ファイルを使ったJAAS設定
      静的なJAAS設定ファイルを使ってクライアント上でSASL認証を設定するには:
      1. KafkaClientという名前のクライアント ログイン セクションを持つJAAS設定ファイルを追加します。GSSAPI (Kerberos), PLAIN SCRAM あるいは OAUTHBEARERを設定するために、例の中で説明されたように選択された仕組みのための KafkaClient 内のログインモジュールを設定する。例えば、GSSAPI証明書は以下のように設定することができます:
                KafkaClient {
                com.sun.security.auth.module.Krb5LoginModule required
                useKeyTab=true
                storeKey=true
                keyTab="/etc/security/keytabs/kafka_client.keytab"
                principal="kafka-client-1@EXAMPLE.COM";
            };
      2. JVMパラメータとしてJAAS設定ファイルの場所を各クライアントのJVMに渡す。例えば:
            -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
  • SASL設定

    SASLはそれぞれセキュリティプロトコル SASL_PLAINTEXT あるいは SASL_SSL を使ってトランスポート層としてPLAINTEXTあるいはSSLと一緒に使われるかもしれません。もし SASL_SSL が使われた場合、SSLも設定されなければなりません

    1. SASL 機構
      Kafkaは以下のSASL機構をサポートします:
    2. Kafkaブローカーのための SASL 設定
      1. 少なくとも SASL_PLAINTEXT あるいは SASL_SSL のうちの1つを listeners パラメータに追加することで、server.properties内のSASLポートを設定します。これは1つ以上のカンマ区切りの値を含みます:
            listeners=SASL_PLAINTEXT://host.name:port
        SASLポートのみを設定している場合 (あるいはKafkaブローカーにSASLを使ってお互いを認証させたい場合)、内部ブローカーの通信のために同じSASLプロトコルを設定するようにしてください:
            security.inter.broker.protocol=SASL_PLAINTEXT (あるいは SASL_SSL)
      2. ブローカー内で有効にするために1つ以上のsupported mechanismsを選択し、仕組みのためのSASLを設定するためにステップに従います。ブローカー内で複数の仕組みを有効にするには、ここのステップに従ってください。
    3. KafkaクライアントのためのSASL設定

      SASL認証は新しいJava Kafkaプロデューサとコンシューマについてのみサポートされ、古いAPIはサポートされません。

      クライアント上でSASL認証を設定するには、クライアント認証のためにブローカー内で有効にされるSASL仕組みを選択し、選択された仕組みのためのSASLを設定するためにステップに従います。

  • SASL/Kerberos を使った認証

    1. 必要条件
      1. Kerberos
        組織がすでにKerberosサーバを使っている場合(例えば、Activeディレクトリを使って)、Kafkaのためだけに新しいサーバをインストールする必要はありません。そうでなければ、インストールする必要があります。LinuxのベンダーがおそらくKerberosのためのパッケージとそれをどうやってインストールおよび設定する方法についての概要を持っているでしょう (Ubuntu, Redhat)。Oracle Javaを使っている場合は、JavaバージョンのためのJCEポリシーファイルをダウンロードし、それらを $JAVA_HOME/jre/lib/security にコピーする必要があるでしょう。
      2. Kerberos Principalsの作成
        組織のKerberosあるいはActiveディレクトリ サーバを使っている場合は、(クライアントとツールを経由して)Kerberos認証を使ってKafkaにアクセスするだろうクラスタ内の各Kafkaブローカーと各オペレーティングシステムのユーザのためにKerberos管理者にprincipalを尋ねてください。
        もし独自のKerberosをインストールしている場合は、以下のコマンドを使ってそれらのprincipalを作成する必要があるでしょう:
                sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
                sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
      3. 全てのホストがホスト名を使って到達可能であるようにしてください - それらのFQDNを使って全てのホストが解決可能であることがKerberosの要求です。
    2. Kafkaブローカーの設定
      1. 以下のものに似た適切に修正されたJAASファイルを各Kafkaブローカーの設定ディレクトリに追加し、この例のために kafka_server_jaas.conf を呼んでみましょう (各ブローカーは独自のkeytabを持つ必要があることに注意してください):
                KafkaServer {
                    com.sun.security.auth.module.Krb5LoginModule required
                    useKeyTab=true
                    storeKey=true
                    keyTab="/etc/security/keytabs/kafka_server.keytab"
                    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
                };
        
                // Zookeeper client authentication
                Client {
                com.sun.security.auth.module.Krb5LoginModule required
                useKeyTab=true
                storeKey=true
                keyTab="/etc/security/keytabs/kafka_server.keytab"
                principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
                };
      2. JAASファイル内のKafkaServerセクションはブローカーにどのprincipalを使用するか、このprincipalが格納されているkeytabの場所を伝えます。それによりブローカーはこのセクションで指定されたkeytabを使ってログインすることができます。Zookeeper SASL設定の詳細についてはnotes を見てください。
      3. 各KafkaサーバにJAASと任意にkerb5ファイルの場所をJVMパラメータとして渡します (詳細は ここ を見てください):
            -Djava.security.krb5.conf=/etc/kafka/krb5.conf
                -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      4. JAASファイル内で設定されたkeytabがkafkaブローカーを開始するオペレーティングシステムのユーザに読み込み可能であるようにしてください:
      5. ここで説明されるようにserver.properties内でSASLポートとSASL機構を設定します。例えば:
            listeners=SASL_PLAINTEXT://host.name:port
                security.inter.broker.protocol=SASL_PLAINTEXT
                sasl.mechanism.inter.broker.protocol=GSSAPI
                sasl.enabled.mechanisms=GSSAPI
      6. server.properties 内でサービス名を設定する必要もあります。これはkafkaブローカーのprincipal名と一致しなければなりません。上の例では、principalは "kafka/kafka1.hostname.com@EXAMPLE.com"です。つまり:
            sasl.kerberos.service.name=kafka
    3. Kafkaクライアントの設定
      クライアント上でSASL認証を設定するには:
      1. クライアント (プロデューサ、コンシューマ、コネクト ワーカーなど)は独自のprincipal (通常はクライアントを実行するユーザと同じ名前と一緒に)を使ってクラスタを認証するでしょう。ですので必要に応じてこれらのprincipalを取得あるいは生成してください。そして、各クライアントについてJAAS設定プロパティを設定します。JVM内の子となるクライアントは異なるprincipalを指定することで異なるユーザとして実行するかもしれません。producer.propertiesあるいはconsumer.properties内のプロパティsasl.jaas.configは、プロデューサとコンシューマのようなクライアントがどのようにKafkaブローカーに接続するかを説明します。以下はkeytab(長く実行するプロセスにお勧めです)を使ったクライアントのための設定例です:
            sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
                useKeyTab=true \
                storeKey=true  \
                keyTab="/etc/security/keytabs/kafka_client.keytab" \
                principal="kafka-client-1@EXAMPLE.COM";
        kafka-console-consumer あるいは kafka-console-producer のようなコマンドラインのユーティリティについては、"useTicketCache=true"と一緒にkinitを使うことができます:
            sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
                useTicketCache=true;
        別のやり方として、クライアントのためのJAAS設定はここで説明されたブローカーに似てJVMパラメータとして指定することができます。クライアントはKafkaClientという名前のログイン セクションを使います。このオプションはJVMからの全てのクライアント接続のためにただ一人だけのユーザを許可します。
      2. JAAS設定内で設定されたkeytabがkafkaクライアントを開始するオペレーティングシステムのユーザに読み込み可能であるようにしてください:
      3. 任意でkrb5ファイルの場所をJVMパラメータとして各クライアントJVMに渡します (詳細はここを見てください):
            -Djava.security.krb5.conf=/etc/kafka/krb5.conf
      4. 以下のプロパティを producer.properties あるいは consumer.properties の中で設定します:
            security.protocol=SASL_PLAINTEXT (or SASL_SSL)
            sasl.mechanism=GSSAPI
            sasl.kerberos.service.name=kafka
  • SASL/PLAIN を使った認証

    SASL/PLAIN は安全な認証を実装するためのTLSで一般的に使われる暗号化のための単純な ユーザ名/パスワード 認証です。Kafkaはここで説明されるようにプロダクションでの使用のために拡張可能なSASL/PLAINのためのデフォルトの実装です。

    ユーザ名はACLなどの設定のための認証されたPrincipalとして使われます。
    1. Kafkaブローカーの設定
      1. 以下のものに似た適切に修正されたJAASファイルを各Kafkaブローカーの設定ディレクトリに追加し、この例のために kafka_server_jaas.conf を呼んでみましょう:
                KafkaServer {
                    org.apache.kafka.common.security.plain.PlainLoginModule required
                    username="admin"
                    password="admin-secret"
                    user_admin="admin-secret"
                    user_alice="alice-secret";
                };
        この設定は2つのユーザを定義します (adminalice)。KafkaServer セクション内のプロパティ usernamepasswordはブローカーによって他のブローカーへの接続を開始するために使われます。この例では、adminは内部ブローカー接続のためのユーザです。プロパティ user_userName のセットはブローカーへ接続する全てのユーザのためのパスワードを定義し、ブローカーはこれらのプロパティを使って他のブルーカーからの全てのクライアント接続を検証します。
      2. JAAS設定ファイルの場所をJVMパラメータとして各Kafkaブローカーに渡します:
            -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. ここで説明されるようにserver.properties内でSASLポートとSASL機構を設定します。例えば:
            listeners=SASL_SSL://host.name:port
                security.inter.broker.protocol=SASL_SSL
                sasl.mechanism.inter.broker.protocol=PLAIN
                sasl.enabled.mechanisms=PLAIN
    2. Kafkaクライアントの設定
      クライアント上でSASL認証を設定するには:
      1. 各クライアントについてのJAAS設定プロパティを producer.properties あるいは consumer.properties の中で設定します。ログイン モジュールはプロデューサとコンシューマのようなクライアントがどのようにKafkaブローカーに接続することができるかを説明します。以下はPLAIN機構についてのクライアントのための設定例です:
            sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
                username="alice" \
                password="alice-secret";

        オプションusernamepassword はクライアント接続のためのユーザを設定するためにクライアントによって使われます。この例では、クライアントはブローカーにユーザ alice として接続します。JVM内の子となるクライアントはsasl.jaas.config内で異なるユーザ名とパスワードを指定することで、異なるユーザとして接続するかもしれません。

        別のやり方として、クライアントのためのJAAS設定はここで説明されたブローカーに似てJVMパラメータとして指定することができます。クライアントはKafkaClientという名前のログイン セクションを使います。このオプションはJVMからの全てのクライアント接続のためにただ一人だけのユーザを許可します。

      2. 以下のプロパティを producer.properties あるいは consumer.properties の中で設定します:
            security.protocol=SASL_SSL
            sasl.mechanism=PLAIN
    3. プロダクションでの SASL/PLAIN の使用
      • SASL/PLAIN は明白なパスワードが暗号化無しに回線上を転送されないようにするために、トランスポート層としてSSLのみが使われるべきです。
      • KafkaでのSASL/PLAINのデフォルトの実装はここで示されるようにJAAS設定ファイル内でユーザ名とパスワードを指定します。Kafkaバージョン 2.0以降、設定オプション sasl.server.callback.handler.classsasl.client.callback.handler.class を使って外部ソースからユーザ名とパスワードを取得する独自のコールバック ハンドラを構成することで、ディスク上に平文のパスワードを格納することを避けることができます。
      • プロダクションのシステムでは、外部認証サーバがパスワード認証を実装するかもしれません。Kafkaバージョン 2.0以降、sasl.server.callback.handler.classを設定することでパスワード検証のための外部認証サーバを使う独自のコールバック ハンドラを差し込むことができます。
  • SASL/SCRAM を使った認証

    Salted Challenge Response Authentication Mechanism (SCRAM) はPLAINおよびDIGEST-MD5のようなユーザ名/パスワード認証を実施する伝統的な仕組みでのセキュリティの懸念を解消する一群のSASL機構です。仕組みはRFC 5802で定義されます。Kafkaは安全な認証を行うためにTLSと一緒に使うことができるSCRAM-SHA-256 と SCRAM-SHA-512 をサポートします。ユーザ名はACLなどの設定のための認証されたPrincipalとして使われます。KafkaでのデフォルトのSCRAM実装はZookeeper内にSCRAM認証を格納し、Zookeeperがプライベートネットワーク上にあるKafkaインストレーション内で使うのに適しています。詳細は セキュリティの考慮を参照してください。

    1. SCRAM 証明書の作成

      KafkaでのSCRAM 実装は証明書のストアとしてZookeeperを使います。証明書はkafka-configs.shを使ってZookeeper内で生成することができます。各SCRAM機構を有効にするために、証明書は機構名を持つ設定を追加することで生成されなければなりません。内部ブローカーの通信のための証明書はKafkaブローカーが開始される前に生成されなければなりません。クライアントの証明書は動的に生成および更新され、更新された証明書は新しい接続の認証のために使われるでしょう。

      パスワードalice-secretを持つユーザaliceのためのSCRAM証明書の作成:

          > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

      iterationが指定されない場合は、デフォルトの繰り返しカウントの4096が使われます。ランダムなsaltが生成され、salt, iterations, StoredKey と ServerKey から成るSCRAM証明がZookeeperに格納されます。SCRAM証明と個々のフィールドについての詳細はRFC 5802 を見てください。

      以下の例は、以下を使って作成することができる内部ブローカー通信のためのユーザadminも必要とします:

          > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

      既存の証明書は--describe オプションを使ってリスト化することができます:

          > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice

      証明書は--alter --delete-config オプションを使って1つ以上のSCRAM機構について削除することができます:

          > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
    2. Kafkaブローカーの設定
      1. 以下のものに似た適切に修正されたJAASファイルを各Kafkaブローカーの設定ディレクトリに追加し、この例のために kafka_server_jaas.conf を呼んでみましょう:
            KafkaServer {
                org.apache.kafka.common.security.scram.ScramLoginModule required
                username="admin"
                password="admin-secret";
            };
        KafkaServerセクション内のプロパティ usernamepasswordはブローカーによって他のブローカーへの接続を始めるために使われます。この例では admin が内部ブローカー通信のためのユーザです。
      2. JAAS設定ファイルの場所をJVMパラメータとして各Kafkaブローカーに渡します:
            -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. ここで説明されるようにserver.properties内でSASLポートとSASL機構を設定します。 例えば:
            listeners=SASL_SSL://host.name:port
            security.inter.broker.protocol=SASL_SSL
            sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (あるいは SCRAM-SHA-512)
            sasl.enabled.mechanisms=SCRAM-SHA-256 (あるいは SCRAM-SHA-512)
    3. Kafkaクライアントの設定
      クライアント上でSASL認証を設定するには:
      1. 各クライアントについてのJAAS設定プロパティを producer.properties あるいは consumer.properties の中で設定します。ログイン モジュールはプロデューサとコンシューマのようなクライアントがどのようにKafkaブローカーに接続することができるかを説明します。以下はSCRAM機構のためのクライアントのための設定の例です:
           sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
                username="alice" \
                password="alice-secret";

        オプションusernamepassword はクライアント接続のためのユーザを設定するためにクライアントによって使われます。この例では、クライアントはブローカーにユーザ alice として接続します。JVM内の子となるクライアントはsasl.jaas.config内で異なるユーザ名とパスワードを指定することで、異なるユーザとして接続するかもしれません。

        別のやり方として、クライアントのためのJAAS設定はここで説明されたブローカーに似てJVMパラメータとして指定することができます。クライアントはKafkaClientという名前のログイン セクションを使います。このオプションはJVMからの全てのクライアント接続のためにただ一人だけのユーザを許可します。

      2. 以下のプロパティを producer.properties あるいは consumer.properties の中で設定します:
            security.protocol=SASL_SSL
            sasl.mechanism=SCRAM-SHA-256 (あるいは SCRAM-SHA-512)
    4. Security Considerations for SASL/SCRAM
      • Kafkaでの SASL/SCRAM のデフォルトの実装はSCRAM証明をZookeeperに格納します。これはZookeeperが安全でプライベートネットワーク上にあるインストレーション内でのプロダクションの使用に適しています。
      • Kafkaは最小繰り返しカウントが4096の強いハッシュ関数 SHA-256 と SHA-512 のみをサポートします。強いパスワードと高い繰り返しカウントとの強いハッシュ関数の組み合わせは、Zookeeperのセキュリティが損なわれた場合でもブルートフォースアタックに対抗します。
      • SCRAMはSCRAMの交換の妨害を避けるためにTLS暗号化と一緒の場合のみ使われるべきです。これは辞書攻撃あるいはブルートフォースアタックから保護し、Zookeeperが損なわれたとしても成り済ましから保護します。
      • Kafkaバージョン 2.0以降、ZooKeeperが安全では無いインストレーション内でsasl.server.callback.handler.class を設定することで、独自のコールバック ハンドラを使ってデフォルトのSASL/SCRAM証明ストアを上書きすることができます。
      • セキュリティの考慮についての詳細はRFC 5802を参照してください。
  • SASL/OAUTHBEARER を使った認証

    OAuth 2 Authorization Framework " は、リソースの所有者とHTTPサービスの間の認証のやり取りを調整することでリソースの所有者の代わりに、あるいはサードパーティアプリケーションが代理のアクセス権限を取得できるようにすることのどちらかで、サードパーティアプリケーションがHTTPサービスへの制限付きアクセスを取得することができます。" SASL OAUTHBEARERの仕組みはSASL(つまり非-HTTP)コンテキスト内でフレームワークを使用することができます; それは RFC 7628 で定義されています。Kafkaでのデフォルトの OAUTHBEARER の実装はUnsecured JSON Web Tokens を生成および検証し、プロダクションではないKafkaインストレーションでの使用にのみ適しています。詳細は セキュリティの考慮を参照してください。

    1. Kafkaブローカーの設定
      1. 以下のものに似た適切に修正されたJAASファイルを各Kafkaブローカーの設定ディレクトリに追加し、この例のために kafka_server_jaas.conf を呼んでみましょう:
            KafkaServer {
                org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
                unsecuredLoginStringClaim_sub="admin";
            };
        KafkaServerのセクションのプロパティunsecuredLoginStringClaim_subは、ブローカが他のブローカーに接続を始める時にブローカーによって使われます。この例では、adminは件名 (sub) 要求に現れ、ブローカー間の通信のためのユーザになるでしょう。
      2. JAAS設定ファイルの場所をJVMパラメータとして各Kafkaブローカーに渡します:
            -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. ここで説明されるようにserver.properties内でSASLポートとSASL機構を設定します。 例えば:
            listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
            security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
            sasl.mechanism.inter.broker.protocol=OAUTHBEARER
            sasl.enabled.mechanisms=OAUTHBEARER
    2. Kafkaクライアントの設定
      クライアント上でSASL認証を設定するには:
      1. 各クライアントについてのJAAS設定プロパティを producer.properties あるいは consumer.properties の中で設定します。ログイン モジュールはプロデューサとコンシューマのようなクライアントがどのようにKafkaブローカーに接続することができるかを説明します。以下はOAUTHBEARER機構のためのクライアントのための設定の例です:
           sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
                unsecuredLoginStringClaim_sub="alice";

        オプションunsecuredLoginStringClaim_subは件名 (sub) 要求を設定するためにクライアントによって使われます。これはクライアント接続のためのユーザを定義します。この例では、クライアントはブローカーにユーザ alice として接続します。JVM内の異なるクライアントはsasl.jaas.config内で異なる件名 (sub) 要求を指定することで異なるユーザとして接続することができます。

        別のやり方として、クライアントのためのJAAS設定はここで説明されたブローカーに似てJVMパラメータとして指定することができます。クライアントはKafkaClientという名前のログイン セクションを使います。このオプションはJVMからの全てのクライアント接続のためにただ一人だけのユーザを許可します。

      2. 以下のプロパティを producer.properties あるいは consumer.properties の中で設定します:
            security.protocol=SASL_SSL (あるいは、非運用環境の場合は SASL_PLAINTEXT)
            sasl.mechanism=OAUTHBEARER
      3. SASL/OAUTHBEARER のデフォルトの実装は jackson-databind ライブラリに依存します。それは任意の依存のため、ユーザはそれらのビルドツールを使って依存としてそれを設定する必要があります。
    3. SASL/OAUTHBEARERのための安全では無いトークンの作成オプション
      • Kafkaでの SASL/OAUTHBEARER のデフォルトの実装は、Unsecured JSON Web Tokensを作成し検証します。非プロダクションの使用にのみ適していますが、開発あるいはテスト環境で任意のトークンを作成するための柔軟性を提供します。
      • これらはクライアント側 (そしてもし OAUTHBEARER が内部ブローカー プロトコルの場合ブローカー側)での様々なサポートされるJAASモジュールオプションです:
        安全では無いトークンの作成のためのJAASモジュールオプション ドキュメント
        unsecuredLoginStringClaim_<claimname>="value" 指定された名前と値の文字列の要求を作成します。'iat' と 'exp' (これらは自動的に生成されます)を除く任意の有効な要求名を指定することができます。
        unsecuredLoginNumberClaim_<claimname>="value" 指定された名前と値の Number 要求を作成します。'iat' と 'exp' (これらは自動的に生成されます)を除く任意の有効な要求名を指定することができます。
        unsecuredLoginListClaim_<claimname>="value" 最初の文字がデリミタとして取られる指定された値からパースされた名前と値を持つ String List 要求を作成します。例えば: unsecuredLoginListClaim_fubar="|value1|value2". 'iat' と 'exp' (これらは自動的に生成されます)を除く任意の有効な要求名を指定することができます。
        unsecuredLoginExtension_<extensionname>="value" 指定された名前と値の文字列の拡張を作成します。例えば: unsecuredLoginExtension_traceId="123"。有効な拡張名は、小文字あるいは大文字のアルファベット文字の連続です。さらに、"auth" 拡張名は予約されています。有効な拡張値は、ASCII コード 1-127 の文字の組み合わせです。
        unsecuredLoginPrincipalClaimName プリンシパル名が 'sub'以外の何かであるString 要求の名前を望む場合、独自の要求名を設定してください。
        unsecuredLoginLifetimeSeconds トークンの有効期限をデフォルトの値である3600秒(これは1時間)以外の何かに設定する場合、整数値を設定してください。'exp' 要求は祐くお起源を反映するために設定されるでしょう。
        unsecuredLoginScopeClaimName 任意のトークンスコープが'scope'以外の何かである String あるいは String List 要求の名前を望む場合、独自の要求名を設定してください。
    4. SASL/OAUTHBEARERのための安全では無いトークンの検証オプション
      • これらはUnsecured JSON Web Token検証のためのブローカー側での様々なサポートされるJAASモジュールオプションです:
        安全では無いトークンの検証のためのJAASモジュールオプション ドキュメント
        unsecuredValidatorPrincipalClaimName="value" プリンシパル名を保持する特定の String 要求が存在することを調べたい場合は、空では無い値を設定します; デフォルトは'sub' 要求の存在をチェックします。
        unsecuredValidatorScopeClaimName="value" 任意のトークンスコープが'scope'以外の何かである String あるいは String List 要求の名前を望む場合、独自の要求名を設定してください。
        unsecuredValidatorRequiredScope="value" トークン スコープを保持する String/String List 要求が特定の値が含むことを確認したい場合は、スコープの値の空白区切りのリストを設定します。
        unsecuredValidatorAllowableClockSkewMs="value" 正の数ミリ秒のクロック スキューを許容したい場合は、正の整数値を設定します(デフォルトは0です)。
      • デフォルトの安全では無い SASL/OAUTHBEARER 実装は、独自のログインおよびSASLサーバ コールバック ハンドラを使って上書きすることができます(そしてプロダクションでは上書きされるべきです)。
      • セキュリティの考慮についての詳細は RFC 6749, Section 10を参照してください。
    5. SASL/OAUTHBEARER のためのトークンのリフレッシュ
      Kafkaはクライアントがブローカーに接続を続けることができるように、期限切れになる前に定期的に全てのトークンをリフレッシュします。リフレッシュアルゴリズムがどのように操作するかに影響を与えるパラメータは プロデューサ/コンシューマ/ブローカー設定の一部として指定され、以下の通りです。これらのプロパティの詳細は他の場所のドキュメントを見てください。デフォルトの値は通常は妥当です。その場合、これらの設定パラメータを明示的に設定する必要はないでしょう。
      プロデューサ/コンシューマ/ブローカー 設定プロパティ
      sasl.login.refresh.window.factor
      sasl.login.refresh.window.jitter
      sasl.login.refresh.min.period.seconds
      sasl.login.refresh.min.buffer.seconds
    6. SASL/OAUTHBEARER の セキュア/プロダクションの使用
      プロダクションの使用例ではorg.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallbackのインスタンスを処理することができるorg.apache.kafka.common.security.auth.AuthenticateCallbackHandlerの実装を書き、それを非ブローカークライアントについてはsasl.login.callback.handler.class 設定オプションまたはブローカー(SASL/OAUTHBEARERが内部ブローカープロトコルの場合)については listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class設定オプションのどちらかを使って宣言する必要があるでしょう。

      プロダクションの使用例ではorg.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackのインスタンスを処理することができるorg.apache.kafka.common.security.auth.AuthenticateCallbackHandlerの実装を書き、それをlistener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class ブローカー設定オプションを使って宣言する必要もあるでしょう。

    7. SASL/OAUTHBEARER のセキュリティの考慮
      • Kafkaでの SASL/OAUTHBEARER のデフォルトの実装は、Unsecured JSON Web Tokensを作成し検証します。これは非プロダクションの使用のためのみに適しています。
      • OAUTHBEARER はプロダクション環境ではトークンの傍受を避けるためにTLS-encryptionと一緒にのみ使われるべきです。
      • デフォルトの安全では無いSASL/OAUTHBEARER 実装は上で説明したように独自のloginおよびSASLサーバ コールバック ハンドラを使って上書きすることができます (そしてプロダクション環境では上書きされるべきです)。
      • 一般的なOAuth 2 セキュリティの考慮についての詳細は RFC 6749, Section 10を参照してください。
  • ブローカー内での複数のSASL機構を有効化

    1. JAAS設定ファイルのKafkaServerセクション内の全ての有効にされた仕組みのログインモジュールのための構成を指定します。例えば:
              KafkaServer {
                  com.sun.security.auth.module.Krb5LoginModule required
                  useKeyTab=true
                  storeKey=true
                  keyTab="/etc/security/keytabs/kafka_server.keytab"
                  principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
      
                  org.apache.kafka.common.security.plain.PlainLoginModule required
                  username="admin"
                  password="admin-secret"
                  user_admin="admin-secret"
                  user_alice="alice-secret";
              };
    2. server.properties内のSASL機構を有効にする:
          sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
    3. 必要であればserver.properties内の内部ブローカー通信のためのSASLセキュリティプロトコルと仕組みを指定します:
          security.inter.broker.protocol=SASL_PLAINTEXT (あるいは SASL_SSL)
          sasl.mechanism.inter.broker.protocol=GSSAPI (あるいは他の有効にされた仕組みのうちの1つ)
    4. 有効にされた機構のためのSASLを設定するためにGSSAPI (Kerberos), PLAIN, SCRAM および OAUTHBEARER内の仕組みに固有のステップに従います。
  • 実行中のクラスタ内のSASL機構を修正する

    以下の手順を使って実行中のクラスタ内のSASL機構を修正することができます:

    1. 各ブローカーについて仕組みをserver.properties内のsasl.enabled.mechanismsに追加することで新しいSASL機構を有効にします。ここで説明される両方の仕組みを含めるためにJASS設定ファイルを更新します。クラスタのノードを逐次的にバウンスします。
    2. 新しい仕組みを使ってクライアントを再起動します。
    3. (必要であれば)内部ブローカー通信の仕組みを変更するために、server.properties内のsasl.mechanism.inter.broker.protocolを新しい仕組みに設定し、再びクラスタを逐次的にバウンスします。
    4. (必要であれば)古い仕組みを削除するために、古い仕組みをserver.properties内のsasl.enabled.mechanismsから削除し、JAAS設定ファイルから古い仕組みのためのエントリーを削除します。再び逐次的にクラスタをバウンスします。
  • 移譲トークンを使った認証

    移譲トークン ベースの認証は既存のSASL/SSLメソッドを補完するための軽量な認証機構です。移譲トークンはkafkaブローカーとクライアント間の共有秘密鍵です。移譲トークンは2方向SSLが使われる時にKerberos TGT/keytabあるいはキーストアの分散の追加の負荷無しに安全な環境内の利用可能なワーカーへ作業を分散するためのフレームワークの処理を助けるでしょう。詳細はKIP-48 を見てください。

    移譲トークンの使用法の一般的なステップは:

    1. ユーザはSASLあるいはSSLを経由してKafkaクラスタを認証し、移譲トークンを取得します。これはAdmin APIあるいはkafka-delegation-tokens.sh スクリプトを使って行うことができます。
    2. Kafkaクラスタとの認証のために、ユーザは安全に移譲トークンをKafkaクライアントに渡します。
    3. Token owner/renewer は移譲トークンを renew/expire することができます。
    1. トークンの管理

      マスター キー/秘密鍵は移譲トークンを生成および検証するために使われます。これは設定オプションdelegation.token.master.keyを使って提供されます。同じ秘密鍵のキーが全てのブローカーに渡って設定される必要があります。秘密鍵が設定されないか空の文字列に設定された場合、ブローカーは移譲トークン認証を無効にするでしょう。

      現在の実装では、トークンの詳細はZookeeperに格納され、Zookeeperがプライベートネットワーク上にあるKafkaインストレーションでの使用に適しています。また、現在のところ、マスター key/secretは server.properties 設定ファイル内に平文として格納されます。これらの設定を将来のKafkaリリースで設定可能にするつもりです。

      トークンは現在の寿命と最大の更新可能な寿命を持ちます。デフォルトでは、トークンは各24時間ごとに7日まで更新されなければなりません。これらはdelegation.token.expiry.time.msdelegation.token.max.lifetime.ms設定オプションを使って設定可能です。

      トークンも明示的に取り消すことができます。トークンが期限切れにより更新あるいはトークンが最大生存時間を超えた場合、全てのブローカーのキャッシュとzookeeperから削除されるでしょう。

    2. 移譲トークンの作成

      Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. 移譲トークンのリクエスト (create/renew/expire/describe) はSASLあるいはSSL認証されたチャンネル上で発行されなければなりません。もし初期の認証が移譲トークンを使って行われた場合は、トークンは要求されないかもしれません。kafka-delegation-tokens.sh スクリプトの例は以下で与えられます。

      移譲トークンの作成:

          > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

      移譲トークンの更新:

          > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

      移譲トークンの期限切れ:

          > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

      既存のトークンは --describe オプションを使って説明することができます:

          > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
    3. トークンの認証

      移譲トークンの認証は現在のSASL/SCRAM認証機構と抱き合わせです。ここで説明されるようにKafkaクラスタ上でSASL/SCRAM機構を有効にしなければなりません。

      Kafkaクライアントの設定:

      1. 各クライアントについてのJAAS設定プロパティを producer.properties あるいは consumer.properties の中で設定します。ログイン モジュールはプロデューサとコンシューマのようなクライアントがどのようにKafkaブローカーに接続することができるかを説明します。以下はトークン認証についてのクライアントのための設定例です:
           sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
                username="tokenID123" \
                password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
                tokenauth="true";

        オプション usernamepassword はクライアントによってトークンidとトークンHMACを設定するために使われます。そして、オプション tokenauth はサーバにトークン認証を示すために使われます。この例では、クライアントはトークンid: tokenID123 を使ってブローカーに接続します。JVM内の異なるクライアントはsasl.jaas.config内で詳述される異なるトークンを指定することで異なるトークンを使って接続することができます。

        別のやり方として、クライアントのためのJAAS設定はここで説明されたブローカーに似てJVMパラメータとして指定することができます。クライアントはKafkaClientという名前のログイン セクションを使います。このオプションはJVMからの全てのクライアント接続のためにただ一人だけのユーザを許可します。

    4. 手動で秘密鍵をローテートする手順:

      秘密鍵がローテートされる必要がある場合、再デプロイが必要です。このプロセスの間、既に接続したクライアントは動作し続けるでしょう。しかし古いトークンを持つどのような新しい接続リクエストおよび更新/期限切れのリクエストは失敗するでしょう。手順は以下で与えられます。

      1. 全ての既存のトークンを期限切れにする。
      2. ローリング更新によって秘密鍵をローテートする。そして
      3. 新しいトークンを生成する

      将来のKafkaのリリースでこれを自動化するつもりです。

    5. 移譲トークンの注意
      • 現在のところ、ユーザはそのユーザだけの移譲トークンを作成することだけができます。所有者/更新者はトークンを更新あるいは期限切れにすることができます。所有者/更新者は常にそれらの独自のトークンを表示することができます。他のトークンを表示するには、トークンのリソースに DESCRIBE パーミッションを追加する必要があります。
  • 7.4認証とACL

    Kafkaは差し込み可能なオーソライザと、全てのaclを格納するためにzookeeperを使う取り出したままで使えるオーソライザの実装を同梱します。オーソライザはserver.properties内で authorizer.class.name を設定することで構成できます。取り出したままで使える実装を有効にするには、以下を使います:
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    Kafkaのaclは一般的な形式 "Principal P はホストHからのリソースパターン RPに合致する任意のリソース R のオペレーション O が [許可される/拒否される]" で定義されます。KIP-11 上のacl構造および KIP-290 上のリソースパターンについてもっと読むことができます。aclを追加、削除あるいはリスト化するために、kafkaオーソライザ CLIを使うことができます。デフォルトでは、もしリソースパターンが特定のリソース R に合致しなければ、R がaclに関係がなくなり、従ってsuperユーザ以外の誰もRにアクセスすることができません。その挙動を変えたい場合は、以下をserver.propertiesに含めることができます。
    allow.everyone.if.no.acl.found=true
    以下のようにしてserver.propertiesにsuperユーザを追加することもできます (SSLのユーザ名はカンマを含むかもしれないため、デリミタはセミコロンであることに注意してください)。デフォルトの PrincipalType 文字列 "User" は大文字小文字を区別します。
    super.users=User:Bob;User:Alice
    SSL ユーザ名のカスタマイズ
    デフォルトでは、SSLのユーザ名は "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"の形式でしょう。server.properties内でssl.principal.mapping.rulesを独自のルールに設定することで変更することができます。この設定によりX.500の識別名を短縮名にマッピングするための規則のリストが許可されます。ルールは順番に評価され、識別名に合致する最初のルールが短縮名にマップするために使われます。リスト内の後のどのようなルールも無視されます。
    ssl.principal.mapping.rules の形式は、各ルールが "RULE:" で始まるリストであり、以下の形式の表現が含まれます。デフォルトの規則は、X.500 証明書の識別名の文字列表現を返します。識別名がパターンに一致する場合、置換コマンドが名前に対して実行されます。これは小文字/大文字オプションもサポートし、変換結果を全て小文字/大文字にすることができます。これは、規則の最後に "/L" あるいは "/U" を追加することで行われます。
            RULE:pattern/replacement/
            RULE:pattern/replacement/[LU]
    例の ssl.principal.mapping.rules 値は:
            RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
            RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
            RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
            デフォルト:
    上記の規則は、識別名 "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" を "serviceuser" に、 "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" を "adminuser@admin" に変換します。
    高度な利用例として、server.properties 内で以下のように PrincipalBuilder を設定することで、名前をカスタマイズすることができます:
    principal.builder.class=CustomizedPrincipalBuilderClass
    SASL ユーザ名のカスタマイズ
    デフォルトでは、SASLユーザ名はkerberos principalの最初の部分でしょう。server.properties内でsasl.kerberos.principal.to.local.rulesを独自のルールに設定することで変更することができます。sasl.kerberos.principal.to.local.rulesの形式は、Kerberos configuration file (krb5.conf)内でauth_to_localが動作するのと同じ方法で各ルールが動作するリストです。変換された結果が全て小文字/大文字になるように強制するために、追加の小文字化/大文字化ルールもサポートします。これは、規則の最後に "/L" あるいは "/U" を追加することで行われます。構文については以下の形式を調べてください。各ルールはRULEで始まります: そして以下の形式の表現を含みます。詳細についてはkerberosのドキュメントを見てください。
            RULE:[n:string](regexp)s/pattern/replacement/
            RULE:[n:string](regexp)s/pattern/replacement/g
            RULE:[n:string](regexp)s/pattern/replacement//L
            RULE:[n:string](regexp)s/pattern/replacement/g/L
            RULE:[n:string](regexp)s/pattern/replacement//U
            RULE:[n:string](regexp)s/pattern/replacement/g/U
    デフォルトのルールも適切な位置に保持しながら、user@MYDOMAIN.COM を適切にユーザに変換するルールを追加する例:
    sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

    コマンドライン インタフェース

    Kafka認証管理CLIは他の全てのCLIと一緒にbinディレクトリの下で見つけることができます。CLIスクリプトは kafka-acls.shと呼ばれます。以下はスクリプトがサポートする全てのオプションのリストです:

    オプション 説明 デフォルト オプションの型
    --add スクリプトにユーザがaclを追加しようとしていることを伝える。 アクション
    --remove スクリプトにユーザがaclを削除しようとしていることを伝える。 アクション
    --list スクリプトにユーザがaclをリスト化しようとしていることを伝える。 アクション
    --authorizer オーソライザの完全修飾クラス名。 kafka.security.authorizer.AclAuthorizer 設定
    --authorizer-properties 初期化のためにオーソライザに渡されるだろう key=val ペア。デフォルトのオーソライザについて、値の例は: zookeeper.connect=localhost:2181 設定
    --bootstrap-server Kafkaクラスタへの接続を確立するために使うホスト/ポートのペアのリスト。--bootstrap-server あるいは --authorizer オプションのうちの1つだけが指定されるべきです。 設定
    --command-config Adminクライアントに渡される設定を含むプロパティファイル。このオプションは --bootstrap-server オプションと一緒でのみ使うことができます。 設定
    --cluster スクリプトに、ユーザが単一のクラスタリソース上のaclとやり取りしようとしていることを伝えます。 ResourcePattern
    --topic [topic-name] スクリプトに、ユーザがトピックリソース パターン(s)のaclとやり取りしようとしていることを伝えます。 ResourcePattern
    --group [group-name] スクリプトに、ユーザがコンシューマ-グループ リソース パターン(s)のaclとやり取りしようとしていることを伝えます。 ResourcePattern
    --transactional-id [transactional-id] ACLを追加あるいは削除する transactionalId。* の値は、ACLが全ての transactionalIds に適用されるべきであることを示します。 ResourcePattern
    --delegation-token [delegation-token] ACLを追加あるいは削除する delegation トークン。* の値は、ACLが全てのトークンに適用されるべきであることを示します。 ResourcePattern
    --resource-pattern-type [pattern-type] スクリプトに、ユーザが使用したいリソースパターンの型 (--add)、あるいはリソース パターンのフィルタ (--list and --remove) を伝えますaclを追加する場合は、これは特定のパターンの型でなければなりません。例えば 'literal' または 'prefixed'。
    acl をリスト化または削除する場合、特定のパターンタイプフィルタを使って、特定のタイプのリソースパターンから acl をリスト化または削除することができます。または、'any' または 'match' のフィルタ値を使うことができます。'any' は全てのパターンタイプに一致しますが、リソース名に正確に一致します。'match' は指定されたリソースに影響する全ての acl をリスト化または削除するためにパターンマッチングを実行します。警告: '--remove' スイッチと組み合わせて使う時は、'match'を使うには注意しなければなりません。
    literal 設定
    --allow-principal PrincipalはAllowパーミッションを使ってACLに追加されるだろう PrincipalType:name 形式です。デフォルトの PrincipalType 文字列 "User" は大文字小文字を区別します。
    複数の --allow-principal を1つのコマンドで指定することができます。
    Principal
    --deny-principal PrincipalはDenyパーミッションを使ってACLに追加されるだろう PrincipalType:name 形式です。デフォルトの PrincipalType 文字列 "User" は大文字小文字を区別します。
    複数の --deny-principal を1つのコマンドで指定することができます。
    Principal
    --principal Principal は --list オプションと一緒に使われる PrincipalType:name 形式です。デフォルトの PrincipalType 文字列 "User" は大文字小文字を区別します。これは指定されたプリンシパルについてのACLをリスト化します。
    1つのコマンドの中に複数の --principal を指定することができます。
    Principal
    --allow-host --allow-principal にリスト化されているプリンシパルからのIPあぢれsyがアクセスできるでしょう。 もし --allow-principal がデフォルトに * を指定される場合は、"all hosts" に翻訳されます ホスト
    --deny-host --deny-principal にリスト化されているプリンシパルからのIPアドレスはアクセスを拒否されるでしょう。 もし --deny-principal がデフォルトに * を指定される場合は、"all hosts" に翻訳されます ホスト
    --operation 許可あるいは拒否されるだろうオペレーション。
    有効な値は:
    • Read
    • Write
    • Create
    • Delete
    • Alter
    • Describe
    • ClusterAction
    • DescribeConfigs
    • AlterConfigs
    • IdempotentWrite
    • All
    All オペレーション
    --producer プロデューサの役割についてaclを add/remove するための便利なオプション。これはトピックに WRITE, DESCRIBE および CREATE を許可するaclを生成するでしょう。 便利なもの
    --consumer コンシューマの役割についてaclを add/remove するための便利なオプション。これはトピックに READ, DESCRIBE を、コンシューマグループに READ を許可するaclを生成するでしょう。 便利なもの
    --idempotent プロデューサの冪等性を有効にします。これは --producer オプションと組み合わせて使われるべきです。
    プロデューサが特定のトランザクションidに対して許可されている場合、冪等性は自動的に有効にされることに注意してください。
    便利なもの
    --force 全ての質問に対してyesを想定し、入力を促さないための便利なオプション。 便利なもの
    --zk-tls-config-file 承認者の ZooKeeper クライアント TLS 接続プロパティが定義されているファイルを識別します。("authorizer." プリフィックスの有無に関係なく)以下のプロパティ以外は無視されます: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type 設定

    • Aclの追加
      acl "Principals User:Bob と User:Alice がIP 198.51.100.0 と IP 198.51.100.1からのTOPIC Test-Topicへの Operation Read と Write の実施が許可されている" を追加したいとします。以下のオプションを使ってCLIを実行することでこれを行うことができます:
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
      デフォルトでは、オペレータがリソースにアクセスを許可するaclを明示的に持たない全てのprincipalは拒否されます。幾つかのprincipalを除いてアクセスを許可するallow aclを定義したい場合は、--deny-principal と --deny-host オプションを使う必要があるでしょう。例えば、Test-topicからのReadを全てのユーザに許可し、IP 198.51.100.3からの User:BadBob だけのみ拒否する場合には、以下のコマンドを使ってそれを行うことができます。
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
      --allow-host--deny-host はIPアドレスのみをサポートすることに注意してください (ホスト名はサポートされません)。上の例はリソース パターン オプションとして--topic [topic-name] を指定することでtopicにaclを追加します。同様に、ユーザは --cluster と指定することでクラスタに、 --group [group-name] を指定することでコンシューマグループ aclを追加することができます。特定のタイプのリソースにaclを追加することができます。例えば、acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0" を追加したいとします。ワイルドカード リソース '*' を使って行うことができます。例えば、以下のオプションを使ってCLIを実行します:
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *
      プリフィックスのリソースパターンにaclを追加することができます。例えば、acl "Principal User:Jane is allowed to produce to any Topic whose name starts with 'Test-' from any host" を追加したいとします。以下のオプションを使ってCLIを実行することでこれを行うことができます:
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed
      --resource-pattern-type のデフォルトは 'literal' であることに注意してください。これはまったく同じ名前を持つリソースのみ、あるいはワイルドカード リソース名 '*' の場合はどのような名前を持つリソースに影響します。
    • Acl の削除
      aclの削除はとても似ています。唯一の違いは --add オプションの代わりに、--remove オプションを指定しなければならないでしょう。上の最初の例で追加されたaclを削除するために、以下のオプションを使ってCLIを実行することができます:
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic 
      上のプリフィックスのリソースパターンを追加した acl を削除したい場合は、以下のオプションを使って CLI を実行することができます:
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed
    • aclのリスト化
      リソースと一緒に --list オプションを指定することでリソースのためのaclをリスト化することができます。文字 リソース パターン Test-topicについての全てのaclをリスト化するために、以下のオプションを使ってCLIを実行することができます。
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
      ただし、これはこの正確なリソースパターンに追加されたaclのみを返すでしょう。トピックへのアクセスに影響する他のaclが存在するかもしれません。例えば、トピック ワイルドカード '*' 上のacl、あるいはプリフィックスのリソースパターン上のacl。ワイルドカード リソース パターン上のaclは明示的に問い合わせることができます:
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *
      しかし、Test-topicに一致するプリフィックスのリソースパターン上のaclについて明示的に問い合わせることは、そのようなパターンの名前が分からないため、必ずしも可能ではありません。'--resource-pattern-type match' を使ってTest-topicに影響する 全ての aclをリスト化することができます。例えば、
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match
      これは全ての文字通り、ワイルドカード、およびプリフィックス リソースパターンに合致するaclをリスト化するでしょう。
    • プロデューサあるいはコンシューマとしてprincipalを追加あるいは削除
      acl管理の最も一般的な使用法はプロデューサあるいはコンシューマとしてprincipalを追加/削除することです。そこでこれらを処理するために簡単なオプションを追加しました。User:Bob をTest-topicのプロデューサとして追加するために、以下のコマンドを実行することができます:
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
      AliceをTest-topicのコンシューマとしてコンシューマグループ Group-1と一緒に追加するのに似て、単純に --consumer optionを渡す必要があります:
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 
      コンシューマ オプションについては、コンシューマグループを指定する必要もあることに注意してください。プロデューサあるいはコンシューマの役割からprincipalを削除するには、単純に --remove オプションを渡す必要があります。
    • Admin API based acl management
      ClusterResource に対する変更権限を持つユーザは、ACL管理のためにAdmin APIを使うことができます。kafka-acls.sh スクリプトは zookeeper/authorizer と直接やり取りせずにACLを管理するためにAdminClient APIをサポートします。上記の全ての例は --bootstrap-server オプションを使って実行することができます。例えば:
                  bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
                  bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
                  bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic

    認証プリミティブ

    プロトコルの呼び出しは、通常Kafka内の特定のリソースで幾つかの操作を実行します。効果的な保護を設定するために操作とリソースを知る必要があります。この章では、これらの操作とリソースをリスト化し、有効なシナリオを確認するためにこれらとプロトコルの組み合わせをリスト化します。

    Kafkaでの操作

    特権を構築するために使うことができる幾つかの操作プリミティブがあります。これらは指定されたユーザに特定のプロトコルの呼び出しを許可するために、特定のリソースと照合することができます。それらは:

    • Read
    • Write
    • Create
    • Delete
    • Alter
    • Describe
    • ClusterAction
    • DescribeConfigs
    • AlterConfigs
    • IdempotentWrite
    • All
    Kafkaでのリソース

    上記の操作は以下で説明される特定のリソースに適用することができます。

    • トピック: これは単純にトピックを表現します。トピックに作用する全てのプロトコルの呼び出し (読み取り、書き込みなど)は、対応する特権を追加する必要があります。トピック リソースに許可エラーがある場合、TOPIC_AUTHORIZATION_FAILED (error code: 29) が返されます。
    • Group: これはブローカーのコンシューマグループを表します。グループへの参加など、コンシューマ グループと協力する全てのプロトコルの呼び出しは、対象のグループに対する特権が必要です。特権が付与されていない場合、プロトコルの応答で GROUP_AUTHORIZATION_FAILED (error code: 30) が返されます。
    • Cluster: このリソースはクラスタを表します。制御されたシャットダウンなど、クラスタ全体に影響する操作はクラスタリソース上の特権によって保護されます。クラスタ リソースに認証の問題がある場合、CLUSTER_AUTHORIZATION_FAILED (error code: 31) が返されます。
    • TransactionalId: このリソースはコミットのようなトランザクションに関係するアクションを表します。エラーが起きた場合、TRANSACTIONAL_ID_AUTHORIZATION_FAILED (error code: 53) がブローカーによって返されます。
    • DelegationToken: これはクラスタ内の移譲トークンを表します。移譲トークンの記述のようなアクションはDelegationToken リソース上の特権によって保護できます。これらのオブジェクトはKafkaで少し特別な挙動を持つため、KIP-48移譲トークンを使った認証の関連するアップストリーム ドキュメントを読むことをお勧めします。
    プロトコルの操作とリソース

    以下の表で、Kafka API プロトコルによって実行されるリソース上の有効な操作をリスト化します。

    プロトコル (API キー) オペレーション リソース 注意
    PRODUCE (0) Write TransactionalId transactional.id セットを持つトランザクション プロデューサは、以下の特権を必要とします。
    PRODUCE (0) IdempotentWrite クラスタ 冪等なプロデューサのアクションはこの特権を必要とします。
    PRODUCE (0) Write Topic これは通常の生成アクションに適用されます。
    FETCH (1) ClusterAction クラスタ フォロワーはパーティション データをフェッチするためにクラスタ リソースで ClusterAction を持つ必要があります。
    FETCH (1) Read Topic 通常の Kafka コンシューマは、フェッチする各パーティション上でREAD権限を必要とします。
    LIST_OFFSETS (2) Describe Topic
    METADATA (3) Describe Topic
    METADATA (3) Create クラスタ トピックの自動生成が有効な場合、ブローカー側のAPIはクラスタレベルの権限の存在を調べます。見つかった場合、トピックの生成を許可します。そうでなければ、トピックレベルの特権を繰り返し処理します (次を見てください)。
    METADATA (3) Create Topic これは、トピックの自動生成が有効だが、指定されたユーザがクラスタレベルの権限を持たない(上)場合、トピックの自動生成を許可します。
    LEADER_AND_ISR (4) ClusterAction クラスタ
    STOP_REPLICA (5) ClusterAction クラスタ
    UPDATE_METADATA (6) ClusterAction クラスタ
    CONTROLLED_SHUTDOWN (7) ClusterAction クラスタ
    OFFSET_COMMIT (8) Read グループ オフセットは、特定のグループとトピックに対しても承認されている場合にのみコミットすることができます (以下を見てください)。グループのアクセスは最初に調べられ、続いてトピックのアクセスが調べられます。
    OFFSET_COMMIT (8) Read Topic オフセットのコミットは消費プロセスの一部であるため、読み込みアクションの権限を必要とします。
    OFFSET_FETCH (9) Describe グループ OFFSET_COMMIT と似て、アプリケーションはフェッチするにはグループとトピックレベルの権限も必要としなければなりません。ただし、この場合、readの代わりにdescribeアクセスを必要とします。グループのアクセスは最初に調べられ、続いてトピックのアクセスが調べられます。
    OFFSET_FETCH (9) Describe Topic
    FIND_COORDINATOR (10) Describe グループ FIND_COORDINATOR リクエストは、コンシューマ グループのコーディネータを探している場合に、"Group" の型にすることができます。この権限はグループモードを表します。
    FIND_COORDINATOR (10) Describe TransactionalId これはトランザクション プロデューサにのみ適用され、プロデューサがトランザクション コーディネータを見つけようとする時に調べられます。
    JOIN_GROUP (11) Read グループ
    HEARTBEAT (12) Read グループ
    LEAVE_GROUP (13) Read グループ
    SYNC_GROUP (14) Read グループ
    DESCRIBE_GROUPS (15) Describe グループ
    LIST_GROUPS (16) Describe クラスタ ブローカーが list_groups リクエストを承認するために調べる場合、最初にこのクラスタレベルの認証を調べます。見つからない場合は、個々のグループを調べます。この操作は CLUSTER_AUTHORIZATION_FAILED を返しません。
    LIST_GROUPS (16) Describe グループ どのグループも許可されていない場合、エラーの代わりに空の応答が返されます。この操作は CLUSTER_AUTHORIZATION_FAILED を返しません。これは 2.1 リリースから適用可能です。
    SASL_HANDSHAKE (17) SASL ハンドシェイクは認証プロセスの一部であるため、ここで任意の種類の認証を適用することはできません。
    API_VERSIONS (18) API_VERSIONS リクエストはKafkaプロトコルのハンドシェイクの一部であり、接続時および認証前に起こります。従って認証を使ってこれを制御することはできません。
    CREATE_TOPICS (19) Create クラスタ クラスタ レベルの認証が無い場合、CLUSTER_AUTHORIZATION_FAILED を返しませんが、すぐ下のトピック レベルを代用することができます。問題がある場合は、エラーが投げられます。
    CREATE_TOPICS (19) Create Topic これは 2.0 リリースから適用可能です。
    DELETE_TOPICS (20) Delete Topic
    DELETE_RECORDS (21) Delete Topic
    INIT_PRODUCER_ID (22) Write TransactionalId
    INIT_PRODUCER_ID (22) IdempotentWrite クラスタ
    OFFSET_FOR_LEADER_EPOCH (23) ClusterAction クラスタ この操作についてクラスタレベルの権限がない場合、トピックレベルのものが調べられます。
    OFFSET_FOR_LEADER_EPOCH (23) Describe Topic これは 2.1 リリースから適用可能です。
    ADD_PARTITIONS_TO_TXN (24) Write TransactionalId このAPIはトランザクション リクエストにのみ適用可能です。最初に TransactionalId リソースで書き込みアクションをチェックし、次に対象のトピックをチェックします (下)。
    ADD_PARTITIONS_TO_TXN (24) Write Topic
    ADD_OFFSETS_TO_TXN (25) Write TransactionalId ADD_PARTITIONS_TO_TXN と同様に、トランザクション リクエストにのみ適用可能です。最初に TransactionalId リソースで書き込みアクションをチェックし、次に対象のトピックをチェックします (下)。
    ADD_OFFSETS_TO_TXN (25) Read グループ
    END_TXN (26) Write TransactionalId
    WRITE_TXN_MARKERS (27) ClusterAction クラスタ
    TXN_OFFSET_COMMIT (28) Write TransactionalId
    TXN_OFFSET_COMMIT (28) Read グループ
    TXN_OFFSET_COMMIT (28) Read Topic
    DESCRIBE_ACLS (29) Describe クラスタ
    CREATE_ACLS (30) Alter クラスタ
    DELETE_ACLS (31) Alter クラスタ
    DESCRIBE_CONFIGS (32) DescribeConfigs クラスタ ブローカーの設定がリクエストされた場合、ブローカーはクラスタレベルの権限をチェックします。
    DESCRIBE_CONFIGS (32) DescribeConfigs Topic トピックの設定がリクエストされた場合、ブローカーはトピックレベルの権限をチェックします。
    ALTER_CONFIGS (33) AlterConfigs クラスタ ブローカーの設定が変更された場合、ブローカーはクラスタレベルの権限をチェックします。
    ALTER_CONFIGS (33) AlterConfigs Topic トピックの設定が変更された場合、ブローカーはトピックレベルの権限をチェックします。
    ALTER_REPLICA_LOG_DIRS (34) Alter クラスタ
    DESCRIBE_LOG_DIRS (35) Describe クラスタ 認証の失敗時には空の応答が返されます。
    SASL_AUTHENTICATE (36) SASL_AUTHENTICATE は認証プロセスの一部であるため、ここで任意の種類の認証を適用することはできません。
    CREATE_PARTITIONS (37) Alter Topic
    CREATE_DELEGATION_TOKEN (38) 移譲トークンの作成には特別な規則があります。これについては、移譲トークンを使った認証の章を見てください。
    RENEW_DELEGATION_TOKEN (39) 移譲トークンの更新には特別な規則があります。これについては、移譲トークンを使った認証の章を見てください。
    EXPIRE_DELEGATION_TOKEN (40) 移譲トークンの期限切れには特別な規則があります。これについては、移譲トークンを使った認証の章を見てください。
    DESCRIBE_DELEGATION_TOKEN (41) Describe DelegationToken 移譲トークンの記述には特別な規則があります。これについては、移譲トークンを使った認証の章を見てください。
    DELETE_GROUPS (42) Delete グループ
    ELECT_PREFERRED_LEADERS (43) ClusterAction クラスタ
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs クラスタ ブローカーの設定が変更された場合、ブローカーはクラスタレベルの権限をチェックします。
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Topic トピックの設定が変更された場合、ブローカーはトピックレベルの権限をチェックします。
    ALTER_PARTITION_REASSIGNMENTS (45) Alter クラスタ
    LIST_PARTITION_REASSIGNMENTS (46) Describe クラスタ
    OFFSET_DELETE (47) Delete グループ
    OFFSET_DELETE (47) Read Topic

    7.5実行中のクラスタでの重要なセキュリティ機能

    以前に議論されたサポートされるプロトコルの1つ以上を使って実行中のクラスタを安全にすることができます。これは段階的に行われます:

    • 追加の安全なポート(s)を開くために、逐次的にクラスタノードをバウンスします。
    • PLAINTEXTポートではなく安全なものを使ってクライアントを再起動 (クライアント-ブローカーの接続を安全にしていると仮定)。
    • (必要であれば)ブローカー to ブローカーの安全を有効にするために、再び逐次的にクラスタをバウンスします
    • PLAINTEXT ポートを閉じるために最後の逐次的なバウンス。

    SSLとSASLを設定するための固有のステップは、セクション7.27.3 で説明されます。望ましいプロトコル(s)のセキュリティを有効にするためには、これらのステップに従ってください。

    セキュリティの実装によりブローカー-クライアントとブローカー-ブローカー通信の両方について異なるプロトコルを設定します。これらは別個のバウンスの中で有効にされる必要があります。PLAINTEXT ポートは全体に渡って開いたままにされなければなりません。つまりブローカー および/あるいは クライアントは通信し続けることができます。

    増加するバウンスを実行すると、ブローカーはSIGTERMを介して正常に停止します。次のノードに移動する前に再起動したレプリカがISRリストに返されるのを待つことも良い練習です。

    例として、ブローカー-クライアントとブローカー-ブローカー通信の両方でSSLを使って暗号化したいとします。最初の増加するバウンスの中で、SSLポートがそれぞれのノード上で開かれます:
                listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
    そして、クライアントの設定が新しく開かれた安全なポートを示すように変更して、クライアントを再起動します:
                bootstrap.servers = [broker1:9092,...]
                security.protocol = SSL
                ...etc
    2つ目の増加サーバのバウンスで、Kafkaにブローカー-ブローカープロトコルとしてSSLを使うように指示します (同じSSLポートを使うでしょう):
                listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
                security.inter.broker.protocol=SSL
    最後のバウンスの中で、PLAINTEXTポートを閉じることでクラスタが安全になります:
                listeners=SSL://broker1:9092
                security.inter.broker.protocol=SSL
    別のやり方として、ブローカー-ブローカーおよびブローカー-クライアント通信のために異なるプロトコルが使えるように複数のポートを開くことができます。全体に渡って(つまりブローカー-ブローカーおよびブローカー-クライアント通信)SSL暗号を使いたいが、ブローカー-クライアント通信にSASL認証も追加したいとしましょう。最初のバウンス時に2つの追加のポートを開くことでこれを行うことができます:
                listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
    そしてクライアントを再起動し、それらの設定が新しく開かれたSASL&SSLの安全なポートを指すように変更します:
                bootstrap.servers = [broker1:9093,...]
                security.protocol = SASL_SSL
                ...etc
    2つ目のサーバのバウンスはクラスタが前にポート9092で開いたSSLポートを使って暗号化されたブローカー-ブローカー通信を使うように切り替えるでしょう。
                listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
                security.inter.broker.protocol=SSL
    最後のバウンスはPLAINTEXTポートを閉じることでクラスタを安全にします。
            listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
            security.inter.broker.protocol=SSL
    ZooKeeperはKakfaクラスタとは関係なく安全にすることができます。これを行うステップはセクション7.6.2でカバーされます。

    7.6ZooKeeper 認証

    ZooKeeper は 3.5.x バージョンから、相互 TLS (mTLS) 認証をサポートします。Kafka はバージョン 2.5 から、SASL と mTLS を使った ZooKeeper への認証をサポートします。詳細は、KIP-515: KZ クライアントが新しい TLS でサポートされる認証を使えるようにするを見てください。

    mTLS のみを使う場合、全てのブローカーと CLI ツール (ZooKeeper Security Migration Tool のような) は、ACL化された DN であるため、同じ識別名 (DN) で自分自身を識別する必要があります。これは以下で示すように変更することができますが、カスタム ZooKeeper 認証プロバイダを書き、デプロイする必要があります。通常、ZooKeeper によるブローカーと CLI ツールのホスト名検証が成功するように、各証明書は同じ DN であるが、異なる Subject Alternative Name (SAN) である必要があります。

    mTLS と一緒に ZooKeeper への SASL 認証を使う場合、SASL ID と znode を作成した DN (つまり、ブローカーの証明書を作成) または Security Migration Tool の DN (znode が作成された後に移行を実行した場合)の両方は ACL が適用され、全てのブローカーと CLI tool は、全てが同じ ACL 化された SASL 識別を使うため、異なる DN を使う場合でも認証されます。全ての DN が一致する必要があるのは、mTLS 認証のみを使う場合です (そして、SAN が重要になります -- 繰り返しになりますが、以下で説明するカスタム ZooKeeper 認証プロバイダを書き、配備しない場合です)。

    以下で説明するように、ブローカーのプロパティファイルを使って、ブローカー用の TLS 構成を設定します。

    --zk-tls-config-file <file> オプションを使って、Zookeeper Security Migration Tool に TLS 構成を設定します。kafka-acls.shkafka-configs.sh CLI tools も --zk-tls-config-file <file> オプションをサポートします。

    -zk-tls-config-file <file> オプション(2つのダッシュではなく、1つのダッシュに注意)を使って zookeeper-shell.sh CLI tool 用の TLS 構成を設定します。

    7.6.1新しいクラスタ

    7.6.1.1ZooKeeper SASL 認証
    ブローカー上でZooKeeper SASL 認証を有効にするには、2つの必要なステップがあります:
    1. JAASログインファイルを作成し、上で説明されたように適切なシステムプロパティがそれを指すように設定します
    2. 各ブローカー内で構成プロパティzookeeper.set.aclをtrueに設定する
    KafkaクラスタのためにZooKeeperに格納されたメタデータは全世界から読み込み可能ですが、ブローカーによってのみ修正することができます。この決定の背後にある原理は、ZooKeeperに格納されたデータは感度は良くないですが、不適切なデータの走査はクラスタの崩壊を起こすかもしれないということです。ネットワーク分割によってZooKeeperへのアクセスを制限することもお勧めします (ブローカーと幾つかの管理ツールだけがZooKeeperにアクセスする必要があります)。
    7.6.1.2ZooKeeper Mutual TLS 認証
    ZooKeeper mTLS 認証は、SASL 認証の有無に関わらず、有効にすることができます。上記のように、mTLS を単独で使う場合、全てのブローカーと全ての CLI tools (ZooKeeper Security Migration Toolのような) は、DN は ACL されているため、一般的に同じ識別名 (DN) で自身を識別する必要があります。つまり、ZooKeeper によるブローカーと CLI tool のホスト名検証が成功するように、各証明書には適切な Subject Alternative Name (SAN) が必要です。

    org.apache.zookeeper.server.auth.X509AuthenticationProvider を継承し、メソッド protected String getClientId(X509Certificate clientCert) をオーバーライドするクラスを書くことで、mTLS クライアントの識別のために、DN 以外のものを使うことが可能です。スキーム名を選択し、ZooKeeper で authProvider.[scheme] をカスタム実装の完全修飾クラス名に設定します; 次にそれを使うように ssl.authProvider=[scheme] を設定します。

    TLS 認証を有効にするための(部分的な) ZooKeeper の構成の例は以下の通りです。これらの構成は、ZooKeeper 管理ガイドで説明されています。
            secureClientPort=2182
            serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
            authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
            ssl.keyStore.location=/path/to/zk/keystore.jks
            ssl.keyStore.password=zk-ks-passwd
            ssl.trustStore.location=/path/to/zk/truststore.jks
            ssl.trustStore.password=zk-ts-passwd
    重要: ZooKeeper は、ZooKeeper サーバのキーストア内のキーパスワードを、キーストアパスワード自体とは異なる値に設定することをサポートしていません。キーパスワードをキーストアのパスワードと必ず同じに設定してください。

    mTLS 認証を使って ZooKeeper へ接続するための(部分的な) Kafka ブローカー構成の例は以下の通りです。これらの構成は、上記の ブローカー構成で説明されています。

            # TLS 用に構成された ZooKeeper ポートへの接続
            zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
            # TLS を ZooKeeper に使うために必要 (デフォルトは false)
            zookeeper.ssl.client.enable=true
            # TLS を ZooKeeper に使うために必要
            zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
            # TLS を使って ZooKeeper に キー/トラストストア を定義します; zookeeper.ssl.client.enable=true でない場合は無視されます
            zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
            zookeeper.ssl.keystore.password=kafka-ks-passwd
            zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
            zookeeper.ssl.truststore.password=kafka-ts-passwd
            # znode に ACL を作成するようにブローカーに指示する
            zookeeper.set.acl=true
    重要: ZooKeeper はZooKeeper クライアント(ブローカーなど)のキーストアのキーワードをキーストアパスワード自体とは異なる値に設定することをサポートしません。キーパスワードをキーストアのパスワードと必ず同じに設定してください。

    7.6.2クラスタの移設

    セキュリティをサポートしないバージョンのKafkaを実行しているか、単純にセキュリティを無効にしていて、クラスタを安全にしたい場合、オペレーションへの最小の混乱をもってZooKeeperの認証を有効にするために以下のステップを実行する必要があります:
    1. ZooKeeper で SASL および/または mTLS 認証を有効にします。mTLS を有効にする場合、以下のように、非 TLS ポートと TLS ポートの両方を使うことができます:
          clientPort=2181
          secureClientPort=2182
          serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
          authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
          ssl.keyStore.location=/path/to/zk/keystore.jks
          ssl.keyStore.password=zk-ks-passwd
          ssl.trustStore.location=/path/to/zk/truststore.jks
          ssl.trustStore.password=zk-ts-passwd
    2. ブローカーが JAAS ログインファイルを設定する、かつ/または、必要に応じて ZooKeeper 相互 TLS 構成(TLS対応の ZooKeeper ポートへの接続を含む)を定義する、ローリング再起動を実行します。これによりブローカーは ZooKeeper に対して認証することができます。ローリング リスタートの最後に、ブローカーは厳密なACLを持つznodeを操作することができますが、それらはこれらのACLを持つznodeを作成しないでしょう。
    3. mTLS を有効にした場合は、ZooKeeper の非 TLS ポートを無効にします。
    4. 今度は設定zookeeper.set.aclをtrueに設定して、パラメータブローカーの2つ目のローリング リスタートを実施します。これはznodeを生成する時に安全なACLを使うことを可能にします
    5. ZkSecurityMigrator ツールを実行します。ツールを実行するために、このスクリプトがあります: zookeeper.aclをsecureに設定した./bin/zookeeper-security-migration.sh。このツールはznodeのACLを変更する対応する副木を横断します。mTLS を有効にする場合は、--zk-tls-config-file <file> オプションを使ってください。

    安全なクラスタ内で認証を止めることもできます。そうするには、以下のステップに従います:

    1. JAASログインファイルを設定している、および/または ZooKeeper 相互 TLS 構成を定義する、ブローカーのローリングリスタートを実施します。これはブローカーの認証を有効にしますが、zookeeper.set.aclをfalseに設定します。ローリング リスタートの最後に、ブローカーは安全なACLを持つznodeの生成を停止しますが、まだ認証し全てのznodeを操作することができます。
    2. ZkSecurityMigrator ツールを実行します。ツールを実行するには、zookeeper.acl を unsecure に設定して、このスクリプト bin/zookeeper-security-migration.sh を実行します。このツールはznodeのACLを変更する対応する副木を横断します。TLS 構成を設定する必要がある場合は、--zk-tls-config-file <file> オプションを使ってください。
    3. mTLS を無効にする場合は、ZooKeeper で非 TLS ポートを有効にします。
    4. ブローカーの2回目のローリング再起動を実行します。今回は、JAAS ログインファイルを設定するシステムプロパティを省略、および/または ZooKeeper 相互 LTS 構成(非 TLS 対応の ZooKeeper ポートへの接続を含む)を削除したりします。
    5. mTLS を無効にする場合は、ZooKeeper で TLS ポートを無効にします。
    移設ツールを実行する方法の例です:
        bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181

    パラメータの完全なリストを見るために、これを実行します:

        bin/zookeeper-security-migration.sh --help

    7.6.3ZooKeeperアンサンブルの移設

    ZooKeeper アンサンブルで、SASL および/または mTLS 認証を有効にすることも必要です。これを行うには、サーバのローリング リスタートの実施と2,3のプロパティの設定が必要です。mTLS 情報については、上記を見てください。詳細はZooKeeperのドキュメントを参照してください:
    1. Apache ZooKeeperのドキュメント
    2. Apache ZooKeeperのwiki

    7.6.4ZooKeeper Quorum Mutual TLS 認証

    ZooKeeper サーバ間で mTLS 認証を有効にすることができます。詳細は、ZooKeeper ドキュメントを参照してください。

    7.7ZooKeeper 暗号化

    相互 TLS を使う ZooKeeper 接続は暗号化されます。ZooKeeper バージョン 3.5.7 (Kafka バージョン 2.5 に同梱されているバージョン)から、ZooKeeper はサーバ側の構成 ssl.clientAuth (大文字と小文字を区別しない: want/need/none が有効なオプションで、デフォルトは need) をサポートし、Zookeeper でこの値を none に設定することにより、クライアントは独自の証明書を提示せずに、TLS 暗号化接続を介して接続することができます。TLS 暗号のみで ZooKeeper へ接続するための(部分的な) Kafka ブローカー構成の例は以下の通りです。これらの構成は、上記の ブローカー構成で説明されています。
            # TLS 用に構成された ZooKeeper ポートへの接続
            zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
            # TLS を ZooKeeper に使うために必要 (デフォルトは false)
            zookeeper.ssl.client.enable=true
            # TLS を ZooKeeper に使うために必要
            zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
            TLS を使ってトラストストアを ZooKeeper に定義します; zookeeper.ssl.client.enable=true でない限り無視されます
            # ZooKeeper で ssl.clientAuth = none を想定してキーストア情報を設定する必要はありません
            zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
            zookeeper.ssl.truststore.password=kafka-ts-passwd
            # ブローカーに、znode に ACL を作成するように指示します (SASL 認証を使う場合。そうでなければこれを設定しません)
            zookeeper.set.acl=true
    TOP
    inserted by FC2 system