当前位置: 首页 > 工具软件 > kafka-php > 使用案例 >

nmred kafka-php,PHP使用Kafka

吕飞翼
2023-12-01

what is kafka

kafka是分布式发布订阅系统

安装brew install kafka

可能会提示brew cask install java

中途会被安装zookeeper

修改server.properties

vim /usr/local/etc/kafka/server.properties

增加一行配置

listeners=PLAINTEXT://localhost:9092

首先我们启动zookeeper

zkserver start

启动kafka server

cd /usr/local/Cellar/kafka/0.10.1.0/bin

./kafka-server-start /usr/local/etc/kafka/server.properties 新建session查看kafka的topic

./kafka-topics --list --zookeeper localhost:2181

一般产出的是test

启动kafka的生产者

./kafka-console-producer --topic [topic-name] --broker-list localhost:9092

开启一个kafka消费者

./kafka-console-consumer --bootstrap-server localhost:9092 -topic [topic-name]

查看kafka的topic数量

./kafka-topics --list --zookeeper localhost:2181

2181 端口是固定的

创建一个topic

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gwr

Broker

Kafka集群包含了一个或多个服务器,这种服务器称为broker

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别称为Topic

Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition

Producer

负责发布消息到Kafka broker

Consumer

消费消费者,向Kafka broker读取消息的客户端

php 使用kafka扩展

composer安装kafka扩展

composer require nmred/kafka-php 生产者producer.php

require 'vendor/autoload.php';

date_default_timezone_set('PRC');

$config = \Kafka\ProducerConfig::getInstance();

$config->setMetadataRefreshIntervalMs(10000);

$config->setMetadataBrokerList('127.0.0.1:9092');

$config->setBrokerVersion('0.10.0.0.');

$config->setRequiredAck(1);

$config->setIsAsyn(false);

//$config->setProduceInterval(500);

$producer = new \Kafka\Producer();

//$producer->send(array(

// array(

// 'topic' => 'test',

// 'value' => 'test1....message.',

// 'key' => '',

// ),

//));

for ($i = 0; $i < 100; $i++) {

$result = $producer->send(array(

array(

'topic' => 'test',

'value' => 'test1....message.',

'key' => '',

),

));

var_dump($result);

}

消费者 cosumer.php

require 'vendor/autoload.php';

date_default_timezone_set('PRC');

$config = \Kafka\ConsumerConfig::getInstance();

$config->setMetadataRefreshIntervalMs(10000);

$config->setMetadataBrokerList('127.0.0.1:9092');

$config->setGroupId('test');

$config->setBrokerVersion('0.10.0.1');

$config->setTopics(array('test'));

//$config->setOffsetReset('earliest');

$consumer = new \Kafka\Consumer();

$consumer->start(function($topic, $part, $message) {

var_dump($message);

});

ERROR [Replica Manager on Broker 0]: Error processing append operation on partition test-0 (kafka.server.ReplicaManager)

Well this bug report is mostly about the fact that a producer that worked with 0.9.0.1 breaks when updating to 0.10.0.0. Whether this is an issue of Kafka or the client isn't that interesting, but for sure something changed on the server side.

需要将broker version从0.9.0.0 切换到0.10.0.0

 类似资料: