PHP-rdkafka は PHP 5 / PHP 7 Kafka 0.8 / 0.9 / 0.10 クライアントの動作を提供する薄いlibrdkafkaバインディングです。
高レベルおよび低レベルのコンシューマ, プロデューサ およびメタデータ API をサポートします。
API はlibrdkafkaのAPIのものをできる限り再構成し、ここで完全に説明されます。
https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples.html
プロデュースのために、まずプロデューサを作成し、それをブローカー(Kafkaサーバ)に追加する必要があります:
<?php
$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");
次に、プロデューサからトピック インスタンスを作成します:
<?php
$topic = $rk->newTopic("test");
そこからプロデュース メソッドを使って、必要なだけ多くのメッセージを生成することができます:
<?php
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
最初の引数はパーティションです。RD_KAFKA_PARTITION_UA は割り当てられていないことを意味し、librdkafkaにパーティションを選ばせます。
2つ目の引数はメッセージフラグで、現在のところ常に0でなければなりません。
メッセージの積み荷はなんでも構いません。
RdKafka\KafkaConsumer クラスは自動的なパーティションの割り当て/廃止をサポートします。ここの例を見てください。
最初に低レベルのコンシューマを作成し、それをブローカー(Kafkaサーバ)に追加する必要があります:
<?php
$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");
次に、newTopic()
メソッドを呼ぶことでトピック インスタンスを作成し、パーティション 0 上で消費を開始します:
<?php
$topic = $rk->newTopic("test");
// 最初の引数は消費を開始するパーティションです
// 2つ目の引数は消費を開始するオフセットです。有効な値は
// : RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED。
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
次に、消費されたメッセージを引き出します:
<?php
while (true) {
// 最初の引数は(再び)パーティションです。
// 2つ目の引数はタイムアウトです。
$msg = $topic->consume(0, 1000);
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
複数のトピック および/あるいはパーティションからの消費は、librdkafkaにこれらのトピック/パーティションからの全てのメッセージを内部キューに送信するように指示し、このキューから消費することで行うことができます:
キューの作成:
<?php
$queue = $rk->newQueue();
Adding topars to the queue:
<?php
$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
次に、キューから消費されたメッセージを取り出します:
<?php
while (true) {
// 唯一の引数はタイムアウトです。
$msg = $queue->consume(1000);
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
librdkafkaはローカルファイル内、あるいはブローカー上にオフセットを格納することができます。デフォルトはローカルファイルで、消費オフセットとしてRD_KAFKA_OFFSET_STORED
の使用を開始するとすぐにrdkafkaはオフセットを格納し始めます。
デフォルトでは、トピックとパーテイションに基づいた名前を持つファイルが現在のディレクトリに生成されます。ディレクトリはoffset.store.path
設定プロパティを設定することで変更することができます。
他の興味深いプロパティ: offset.store.sync.interval.ms
, offset.store.method
, auto.commit.interval.ms
, auto.commit.enable
, offset.store.method
, group.id
。
<?php
$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);
$topicConf->set("offset.store.sync.interval.ms", 60e3);
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
librdkafkaはデフォルトで各消費されたパーティションについて1GBまでのメッセージをバッファするでしょう。コンシューマ上でqueued.max.messages.kbytes
パラメータの値を減らすことでメモリの消費を下げることができます。
各コンシューマおよびプロデューサのインスタンスはtopic.metadata.refresh.interval.ms
パラメータで定義された間隔でトピックのメタデータを取りに行くでしょう。librdkafkaのバージョンに依存して、パラメータのデフォルトは10秒あるいは600秒です。
librdkafka はデフォルトでクラスタの全てのトピックについてのメタデータを取得します。topic.metadata.refresh.sparse
を文字列"true"
に設定することでlibrdkafkaに使用しているトピックだけを取得するようにします。
topic.metadata.refresh.sparse
を "true"
、topic.metadata.refresh.interval.ms
を60秒(幾らかのジッターを加えて)に設定することで、コンシューマとトピックの数に依存して大きく帯域を減らすことができます。
この設定によりlibrdkafkaのスレッドは、それらを持つlibrdkafkaが完了するとすぐに、終了することができます。これにより、効果的にPHPプロセス/リクエストを素早く終了することができます。
これを有効にする時は以下のようにシグナルをマスクする必要があります:
<?php
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);
ブローカー ソケットの操作がブロックされるかもしれない最大時間。値を低くするとわずかなCPU使用率の増加と引き換えに応答性が改善します。
この設定の値を減らすとシャットダウン スピードを改善します。その値はlibrdkafkaが読み込みループの1つの繰り返しの中でブロックするだろう最大時間を定義します。これはまたメインのlibrdkafkaのスレッドが終了をチェックするだろう頻度を定義します。
これはlibrdkafkaがメッセージのバッチを送信する前に待つ最大およびデフォルトの時間を定義します。この設定を例えば1msに設定することは、メッセージをひとまとめにする代わりに、メッセージができる限り早く送信されることを確実にします。
これはrdkafkaのインスタンス、およびPHPプロセス/リクエストのシャットダウン時間を減らすところで見られました。
これは低レンテンシのための最適化された設定です。これによりPHPプロセス/リクエストがメッセージをできるだけ早く送信し素早く終了することができます。
<?php
$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
}
$producer = new \RdKafka\Producer($conf);
$consumer = new \RdKafka\Consumer($conf);
生成の後でのpollも終了時間を減らすために重要かもしれません:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html
ドキュメントが十分でない場合は、Gitter あるいは Google Groups上のphp-rdkafkaチャンネルで気軽に質問を尋ねてください。
IDEがphp-rdkafkaのapiを自動的に発見できないため、php-rdkafkaクラス、関数および定数のためのスタブのセットを提供する外部パッケージの使用を考慮することができます: kwn/php-rdkafka-stubs
もしあなたが貢献したいのであれば嬉しいです:)
始める前に、どのように変更がマージされるかを見るためにCONTRIBUTING document を見てください。
librdkafkaからコピーされたドキュメント。
著作者: 貢献者を見てください。
php-rdkafkaはMITライセンスのもとにリリースされています。