当前位置: 首页 > 工具软件 > kafka-php > 使用案例 >

php rdkafka 偏移量,Kafka - PHP 使用 Rdkafka 生产/消费数据

葛雪松
2023-12-01

安装rdkafka

rdkafka 依赖 libkafka

yum install rdkafka rdkafka-devel

pecl install rdkafka

php --ri rdkafka

生产者

连接集群,创建 topic,生产数据。

$rk = new Rdkafka\Producer();

$rk->setLogLevel(LOG_DEBUG);

// 链接kafka集群

$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

// 创建topic

$topic = $rk->newTopic("topic_1");

while (true) {

$message = "hello kafka " . date("Y-m-d H:i:s");

echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;

try {

$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);

sleep(2);

} catch (\Exception $e) {

echo $e->getMessage() . PHP_EOL;

}

}

消费者-HighLevel

自动分配partition,rebalance,comsumer group。

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)

$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {

switch ($err) {

case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:

echo "Assign: ";

var_dump($partitions);

$kafka->assign($partitions);

break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:

echo "Revoke: ";

var_dump($partitions);

$kafka->assign(null);

break;

default:

throw new \Exception($err);

}

});

// Configure the group.id. All consumer with the same group.id will consume

// different partitions.

$conf->set('group.id', 'group_1');

// Initial list of Kafka brokers

$conf->set('metadata.broker.list', '192.168.20.6:9092,192.168.20.6:9093');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in

// offset store or the desired offset is out of range.

// 'smallest': start from the beginning

$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'topic_1'

$consumer->subscribe(['topic_1']);

echo "Waiting for partition assignment... (make take some time when\n";

echo "quickly re-joining the group after leaving it.)\n";

while (true) {

$message = $consumer->consume(3e3);

switch ($message->err) {

case RD_KAFKA_RESP_ERR_NO_ERROR:

var_dump($message);

break;

case RD_KAFKA_RESP_ERR__PARTITION_EOF:

sleep(2);

case RD_KAFKA_RESP_ERR__TIMED_OUT:

echo $message->errstr() . PHP_EOL;

break;

default:

throw new \Exception($message->errstr(), $message->err);

break;

}

}

消费者-LowLevel

指定partition消费。

php consumer_lowlevel.php [partitonNuo]

LowLevel 没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。

if (!isset($argv[1])) {

fwrite(STDOUT, "请指定消费分区:");

$partition = (int) fread(STDIN, 1024);

} else {

$partition = (int) $argv[1];

}

$topic = "topic_1";

$conf = new RdKafka\Conf();

// Set the group id. This is required when storing offsets on the broker

$conf->set('group.id', 'group_2');

$rk = new RdKafka\Consumer($conf);

$rk->addBrokers('192.168.20.6:9092,192.168.20.6:9093');

$topicConf = new RdKafka\TopicConf();

$topicConf->set('auto.commit.interval.ms', 2000);

// Set the offset store method to 'file'

// $topicConf->set('offset.store.method', 'file');

// $topicConf->set('offset.store.path', sys_get_temp_dir());

// Alternatively, set the offset store method to 'broker'

$topicConf->set('offset.store.method', 'broker');

// Set where to start consuming messages when there is no initial offset in

// offset store or the desired offset is out of range.

// 'smallest': start from the beginning

$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic($topic, $topicConf);

// Start consuming partition 0

$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

while (true) {

$message = $topic->consume($partition, 3 * 1000);

switch ($message->err) {

case RD_KAFKA_RESP_ERR_NO_ERROR:

var_dump($message);

break;

case RD_KAFKA_RESP_ERR__PARTITION_EOF:

case RD_KAFKA_RESP_ERR__TIMED_OUT:

echo $message->errstr() . PHP_EOL;

break;

default:

throw new \Exception($message->errstr(), $message->err);

break;

}

}

 类似资料: