PHP Kafka クライアント - php-rdkafka

Join the chat at https://gitter.im/arnaud-lb/php-rdkafka

サポートされる Kafkaバージョン: 0.8, 0.9, 0.10 サポートされるPHPバージョン: 5.3 .. 7.x ビルド状態

PHP-rdkafka は PHP 5 / PHP 7 Kafka 0.8 / 0.9 / 0.10 クライアントの動作を提供する薄いlibrdkafkaバインディングです。

高レベルおよび低レベルのコンシューマ, プロデューサ およびメタデータ API をサポートします。

API はlibrdkafkaのAPIのものをできる限り再構成し、ここで完全に説明されます。

目次

  1. インストール
  2. 使い方
  3. ドキュメント
  4. 著作権表示
  5. ライセンス

インストール

https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.setup.html

https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples.html

使い方

プロデュース

プロデュースのために、まずプロデューサを作成し、それをブローカー(Kafkaサーバ)に追加する必要があります:

<?php

$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");

次に、プロデューサからトピック インスタンスを作成します:

<?php

$topic = $rk->newTopic("test");

そこからプロデュース メソッドを使って、必要なだけ多くのメッセージを生成することができます:

<?php

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

最初の引数はパーティションです。RD_KAFKA_PARTITION_UA は割り当てられていないことを意味し、librdkafkaにパーティションを選ばせます。

2つ目の引数はメッセージフラグで、現在のところ常に0でなければなりません。

メッセージの積み荷はなんでも構いません。

高レベルの消費

RdKafka\KafkaConsumer クラスは自動的なパーティションの割り当て/廃止をサポートします。ここの例を見てください。

低レベルの消費

最初に低レベルのコンシューマを作成し、それをブローカー(Kafkaサーバ)に追加する必要があります:

<?php

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");

次に、newTopic() メソッドを呼ぶことでトピック インスタンスを作成し、パーティション 0 上で消費を開始します:

<?php

$topic = $rk->newTopic("test");

// 最初の引数は消費を開始するパーティションです
// 2つ目の引数は消費を開始するオフセットです。有効な値は
// : RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED。
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

次に、消費されたメッセージを引き出します:

<?php

while (true) {
    // 最初の引数は(再び)パーティションです。
    // 2つ目の引数はタイムアウトです。
    $msg = $topic->consume(0, 1000);
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

複数のトピック/パーティションからの低レベルの消費

複数のトピック および/あるいはパーティションからの消費は、librdkafkaにこれらのトピック/パーティションからの全てのメッセージを内部キューに送信するように指示し、このキューから消費することで行うことができます:

キューの作成:

<?php
$queue = $rk->newQueue();

Adding topars to the queue:

<?php

$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

次に、キューから消費されたメッセージを取り出します:

<?php

while (true) {
    // 唯一の引数はタイムアウトです。
    $msg = $queue->consume(1000);
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

格納されたオフセットの使用

librdkafkaはローカルファイル内、あるいはブローカー上にオフセットを格納することができます。デフォルトはローカルファイルで、消費オフセットとしてRD_KAFKA_OFFSET_STOREDの使用を開始するとすぐにrdkafkaはオフセットを格納し始めます。

デフォルトでは、トピックとパーテイションに基づいた名前を持つファイルが現在のディレクトリに生成されます。ディレクトリはoffset.store.path 設定プロパティを設定することで変更することができます。

他の興味深いプロパティ: offset.store.sync.interval.ms, offset.store.method, auto.commit.interval.ms, auto.commit.enable, offset.store.method, group.id

<?php

$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);
$topicConf->set("offset.store.sync.interval.ms", 60e3);

$topic = $rk->newTopic("test", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

興味深い設定パラメータ

queued.max.messages.kbytes

librdkafkaはデフォルトで各消費されたパーティションについて1GBまでのメッセージをバッファするでしょう。コンシューマ上でqueued.max.messages.kbytesパラメータの値を減らすことでメモリの消費を下げることができます。

topic.metadata.refresh.sparse and topic.metadata.refresh.interval.ms

各コンシューマおよびプロデューサのインスタンスはtopic.metadata.refresh.interval.ms パラメータで定義された間隔でトピックのメタデータを取りに行くでしょう。librdkafkaのバージョンに依存して、パラメータのデフォルトは10秒あるいは600秒です。

librdkafka はデフォルトでクラスタの全てのトピックについてのメタデータを取得します。topic.metadata.refresh.sparseを文字列"true"に設定することでlibrdkafkaに使用しているトピックだけを取得するようにします。

topic.metadata.refresh.sparse"true"topic.metadata.refresh.interval.ms を60秒(幾らかのジッターを加えて)に設定することで、コンシューマとトピックの数に依存して大きく帯域を減らすことができます。

internal.termination.signal

この設定によりlibrdkafkaのスレッドは、それらを持つlibrdkafkaが完了するとすぐに、終了することができます。これにより、効果的にPHPプロセス/リクエストを素早く終了することができます。

これを有効にする時は以下のようにシグナルをマスクする必要があります:

<?php
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);

socket.blocking.max.ms

ブローカー ソケットの操作がブロックされるかもしれない最大時間。値を低くするとわずかなCPU使用率の増加と引き換えに応答性が改善します。

この設定の値を減らすとシャットダウン スピードを改善します。その値はlibrdkafkaが読み込みループの1つの繰り返しの中でブロックするだろう最大時間を定義します。これはまたメインのlibrdkafkaのスレッドが終了をチェックするだろう頻度を定義します。

queue.buffering.max.ms

これはlibrdkafkaがメッセージのバッチを送信する前に待つ最大およびデフォルトの時間を定義します。この設定を例えば1msに設定することは、メッセージをひとまとめにする代わりに、メッセージができる限り早く送信されることを確実にします。

これはrdkafkaのインスタンス、およびPHPプロセス/リクエストのシャットダウン時間を減らすところで見られました。

パフォーマンス / 低レイテンシの設定

これは低レンテンシのための最適化された設定です。これによりPHPプロセス/リクエストがメッセージをできるだけ早く送信し素早く終了することができます。

<?php

$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf->set('internal.termination.signal', SIGIO);
} else {
    $conf->set('queue.buffering.max.ms', 1);
}

$producer = new \RdKafka\Producer($conf);
$consumer = new \RdKafka\Consumer($conf);

生成の後でのpollも終了時間を減らすために重要かもしれません:

$producer->produce(...);
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}

ドキュメント

https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html

助けを求める

ドキュメントが十分でない場合は、Gitter あるいは Google Groups上のphp-rdkafkaチャンネルで気軽に質問を尋ねてください。

スタブ

IDEがphp-rdkafkaのapiを自動的に発見できないため、php-rdkafkaクラス、関数および定数のためのスタブのセットを提供する外部パッケージの使用を考慮することができます: kwn/php-rdkafka-stubs

貢献

もしあなたが貢献したいのであれば嬉しいです:)

始める前に、どのように変更がマージされるかを見るためにCONTRIBUTING document を見てください。

著作権表示

librdkafkaからコピーされたドキュメント。

著作者: 貢献者を見てください。

ライセンス

php-rdkafkaはMITライセンスのもとにリリースされています。

TOP
inserted by FC2 system