PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。
1.安装 librdkafka 库
2.安装 php-kafka 扩展
二、代码实现
生产逻辑如下:
/**
* Create a producer
*/
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");
2.构建主题;
/**
* Create a topic instance from the producer
*/
$topic = $rk->newTopic("test");
3.发送消息
/**
* Producing messages
* The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
* 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
* The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
* 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
* The message payload can be anything.
* 消息可以是任何内容。
*/
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
4.关闭生产者实例。
/**
* Proper shutdown
* This should be done prior to destroying a producer instance
* to make sure all queued and in-flight produce requests are completed before terminating.
* 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
* Not calling flush can lead to message loss!
* 不调用flush会导致消息丢失!
*/
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);
完整代码如下
<?php
/**
* Created by PhpStorm.
* User: liulu
* Date: 2020/1/1
* Time: 18:38
*/
/**
* Create a producer
*/
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");
/**
* Create a topic instance from the producer
*/
$topic = $rk->newTopic("test");
/**
* Producing messages
* The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
* 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
* The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
* 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
* The message payload can be anything.
* 消息可以是任何内容。
*/
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
/**
* Proper shutdown
* This should be done prior to destroying a producer instance
* to make sure all queued and in-flight produce requests are completed before terminating.
* 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
* Not calling flush can lead to message loss!
* 不调用flush会导致消息丢失!
*/
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);