Kafka-php 使用纯粹的 PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document, 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka broker 交互,较 v0.1.x 更加稳定高效, 由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本。
PHP 版本大于 5.5
Kafka Server 版本大于 0.8.0
消费模块 Kafka Server 版本需要大于 0.9.0
添加 composer 依赖 nmred/kafka-php
到项目的 composer.json
文件中即可,如:
{ "require": { "nmred/kafka-php": "0.2.*" } }
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); // 设置生产相关配置,具体配置参数见 [Configuration](Configuration.md) $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('0.9.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode, $context) { var_dump($errorCode); }); $producer->send();
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('0.9.0.1'); $config->setTopics(array('test')); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
基础协议 API 调用方式见 Example
软件简介 Kafka-php 使用纯粹的 PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document, 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka broker 交互,较 v0.1.x
kafka-php 测试遇到掉数据的问题 遇到的现象 现象1.当消费者进程未启动时 生产者进行生产一条数据 待消费Lag为1 LOG-END-OFFSET-CURRENT-OFFSET=1 |GROUP|TOPIC|PARTITION|CURRENT-OFFSET|LOG-END-OFFSET|LAG| CONSUMER-ID|HOST|CLIENT-ID| |--|--|--|--|--|--|
http://www.bkjia.com/PHPjc/1100706.html 话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。 实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装
安装rdkafka rdkafka 依赖 libkafka yum install rdkafka rdkafka-devel pecl install rdkafka php --ri rdkafka 生产者 连接集群,创建 topic,生产数据。 $rk = new Rdkafka\Producer(); $rk->setLogLevel(LOG_DEBUG); // 链接kafka集群 $r
kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展 话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。 实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装
PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。 1.安装 librdkafka 库 2.安装 php-kafka 扩展 二、代码实现 生产逻辑如下: 配置生产者客户端参数及创建相应的生产者实例 /** * Create a producer */ $conf = new RdKafka\Conf(); $conf->set('log_level'
what is kafka kafka是分布式发布订阅系统 安装brew install kafka 可能会提示brew cask install java 中途会被安装zookeeper 修改server.properties vim /usr/local/etc/kafka/server.properties 增加一行配置 listeners=PLAINTEXT://localhost:909
软件简介 php-kafka-consumer 主要是对 php_rdkafka 的 consumer 的 API 进行一层封装,增加了原程序中所没有的与 zookeeper 交互的功能。在此基础上实现了 rebalance 功能以及 group 功能。 经过简单的压力测试,单个进程的消费能力能达到每秒钟7.8W条,压测详细内容见压力测试。 依赖 php_zookeeper php_rdkafka
# 使用kafka-log4j-appender实现日志采集 ### 背景 kafka内核实现了log4j.AppenderSkeleton接口,通过配置log4j即可将日志生产到topic ### 配置方法 **step0:** 配置pom.xml ```pom org.apache.kafka kafka-clients 2.4.0 compile org.apache.kafka kafka
低级方式(Low level) <?php $rk = new RdKafka\Consumer(); $rk->setLogLevel(LOG_DEBUG); // 指定 broker 地址,多个地址用"," 分割 $rk->addBrokers("192.168.33.1:9092"); $topic = $rk->newTopic("test"); $topic->consumeSta
原标题:Kafka-php:使用 PHP 编写的 Kafka 客户端 Kafka-php Kafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 , 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka brok
我发现maven repo中有几个Kafka。 阿帕奇的maven回购协议中有两个Kafka。https://mvnrepository.com/artifact/org.apache.kafka/kafka https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients 它们都可以从kafka服务器生成Mesg并消耗msg。 我
我对Kafka很陌生,我想了解配额制是如何为Kafka工作的。 到现在为止我一直在跟踪这里的文件 bin/kafka-configs.sh--zookeeper 10.11.10.2:2181--alter--add-config'producer_byte_rate=1024,consumer_byte_rate=1024'--实体类型客户端--实体名称客户端A 我还使用以下命令来测量Kafka
执行kafka客户端的生产者/消费者连接池有意义吗? kafka是否在内部维护已初始化并准备好使用的连接对象列表? 我们希望最小化连接创建的时间,这样在发送/接收消息时就不会有额外的开销。 目前,我们正在使用apache共享池库来保持连接。 任何帮助都将不胜感激。
我使用的是kafka-clients-0.10.1.1(单节点单代理) auto.create.topics.enable的默认值为true。 1.我正在使用以下方式向主题发送消息: 用于消费:
https://spring.io/projects/spring-kafka指出,“所有brokers>=0.10.x.x的用户(以及所有spring boot 1.5.x用户)建议使用spring-kafka 1.3.x或更高版本”我已经在pom.xml中设置了spring-kafka 1.3.9版本,并将Kafka客户端从0.11.x.x覆盖到0.10.2.2 当我运行实例时,我会出现以下错
我是Apache Kafka的一个新用户,我还在了解它的内部结构。 在我的用例中,我需要从Kafka Producer客户端动态增加一个主题的分区数量。 我发现了关于增加分区大小的其他类似问题,但它们使用了zookeeper配置。但是我的kafkaProducer只有Kafka broker配置,而没有zookeeper配置。 有没有什么方法我可以增加一个主题的分区数量从生产者端?我运行的是Kaf
我在kafka消费者文档中看到了这个注释-
我的设置: JDK 11.0.6 Spring护套2.2.4.释放 Springkafka 2.4.1 我已经在PLAINTEXT中验证了我的Zookeeper/Kafka/client应用程序,一切正常。我还用Kafka客户端工具验证了我的密钥库/信任库。 我正在使用KafkaAdmin bean配置我的主题,它似乎在SSL连接上失败了: 我的两个JKS文件位于项目的src/main/resou