what is kafka
kafka是分布式发布订阅系统
安装brew install kafka
可能会提示brew cask install java
中途会被安装zookeeper
修改server.properties
vim /usr/local/etc/kafka/server.properties
增加一行配置
listeners=PLAINTEXT://localhost:9092
首先我们启动zookeeper
zkserver start
启动kafka server
cd /usr/local/Cellar/kafka/0.10.1.0/bin
./kafka-server-start /usr/local/etc/kafka/server.properties 新建session查看kafka的topic
./kafka-topics --list --zookeeper localhost:2181
一般产出的是test
启动kafka的生产者
./kafka-console-producer --topic [topic-name] --broker-list localhost:9092
开启一个kafka消费者
./kafka-console-consumer --bootstrap-server localhost:9092 -topic [topic-name]
查看kafka的topic数量
./kafka-topics --list --zookeeper localhost:2181
2181 端口是固定的
创建一个topic
./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gwr
Broker
Kafka集群包含了一个或多个服务器,这种服务器称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别称为Topic
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition
Producer
负责发布消息到Kafka broker
Consumer
消费消费者,向Kafka broker读取消息的客户端
php 使用kafka扩展
composer安装kafka扩展
composer require nmred/kafka-php 生产者producer.php
require 'vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('0.10.0.0.');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
//$producer->send(array(
// array(
// 'topic' => 'test',
// 'value' => 'test1....message.',
// 'key' => '',
// ),
//));
for ($i = 0; $i < 100; $i++) {
$result = $producer->send(array(
array(
'topic' => 'test',
'value' => 'test1....message.',
'key' => '',
),
));
var_dump($result);
}
消费者 cosumer.php
require 'vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.10.0.1');
$config->setTopics(array('test'));
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
ERROR [Replica Manager on Broker 0]: Error processing append operation on partition test-0 (kafka.server.ReplicaManager)
Well this bug report is mostly about the fact that a producer that worked with 0.9.0.1 breaks when updating to 0.10.0.0. Whether this is an issue of Kafka or the client isn't that interesting, but for sure something changed on the server side.
需要将broker version从0.9.0.0 切换到0.10.0.0