See-KafKa

简单舒适的 PHP-KafKa 拓展
授权协议 GPL
开发语言 PHP
所属分类 服务器软件、 分布式应用/网格
软件类型 开源软件
地区 国产
投 递 者 柯景龙
操作系统 跨平台
开源组织
适用人群 未知
 软件概览

[喵咪KafKa(3)]PHP拓展See-KafKa

前言

(Simple 简单 easy 容易 expand 的拓展)

KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同时能传输的数据量)并且高可用一个消息平台,它是分布式消息队列,分布式日志,数据传输通道的不二之选,但是可惜的时PHP的拓展实在不是很好用(php-kafka拓展已经长期不维护存在非常多的问题,rdkafkaC底层编写不利于使用),希望可以更加方便的来使用KafKa这块肥肉于是基于rdKafKa封装的一个简单舒适KafKa拓展诞生了!

附上:

1. 安装

(See-KafKa支持0.9~0.10版本,对0.8版本以及以前的版本协议不支持)

首先需要安装配置好zookeeper+KafKa:可以参考作者博客下的KafKa模块下的介绍安装,作者博客介绍是对于0.8.2.2的安装方式,但是和0.9和0.10的安装并没有区别,只需要去下载0.9和0.10的包即可

在使用之前需要按照顺序先安装librdkafka,在安装php-rdkafka:

# 安装librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
make install
# 安装php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
make install
# 在php.ini加入如下信息
vim /usr/local/php/etc/php.ini
extension=rdkafka.so

这个时候使用php -m 可以看到拓展列表内存在 rdkafka这项证明拓展已经安装成功

2. 使用

See-KafKa完美支持PhalApi,只需要把去拓展库中获取kafka拓展即可,当然不是PhalApi的也可以使用只需要include文件下的kafka.php即可使用

2.1 Producer

KafKa最基础的两个角色其中一个就是Producer(可以参考作者博客介绍)

向KafKa中的一个Topic写入一条消息,需要写入多条可以多次使用setMassage

<?php
/**
 * See-kafka Producer例子
 * 循环写入1w条数据15毫秒
 */

// 配置KafKa集群(默认端口9092)通过逗号分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 设置一个Topic
$KafKa_Lite->setTopic("test");
// 单次写入效率ok  写入1w条15 毫秒
$Producer = $KafKa_Lite->newProducer();
// 参数分别是partition,消息内容,消息key(可选)
// partition:可以设置为KAFKA_PARTITION_UA会自动分配,比如有6个分区写入时会随机选择Partition
$Producer->setMessage(0, "hello");

2.2 Consumer

对于Consumer来说支持4种从offset的获取方式分别为:

  • KAFKAOFFSETSTORED #通过group来获取消息的offset(必须设置group)

  • KAFKAOFFSETEND #获取尾部的offset

  • KAFKAOFFSETBEGINNING #获取头部的offset

  • 手动指定offset开始值

2.2.1 例子1

此例子适合获取一段数据就结束的场景,每一次getMassage都会建立连接然后关闭连接,当循环使用getMassage会造成相对严重的效率问题

<?php
/**
 * See-kafka Consumer例子1
 */

// 配置KafKa集群(默认端口9092)通过逗号分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 设置一个Topic
$KafKa_Lite->setTopic("test");
// 设置Consumer的Group分组(不使用自动offset的时候可以不设置)
$KafKa_Lite->setGroup("test");
// 获取Consumer实例
$consumer = $KafKa_Lite->newConsumer();

// 获取一组消息参数分别为:Partition,maxsize最大返回条数,offset(可选)默认KAFKA_OFFSET_STORED
$rs = $consumer->getMassage(0,100);
//返回结果是一个数组,数组元素类型为Kafka_Message

2.2.1 例子2

例子2适合脚本队列任务

<?php
/**
 * See-kafka Consumer例子1
 * 889 毫秒 获取1w条
 */

// 配置KafKa集群(默认端口9092)通过逗号分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 设置一个Topic
$KafKa_Lite->setTopic("test");
// 设置Consumer的Group分组(不使用自动offset的时候可以不设置)
$KafKa_Lite->setGroup("test");

// 此项设置决定 在使用一个新的group时  是从 最小的一个开始 还是从最大的一个开始  默认是最大的(或尾部)
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');
// 此项配置决定在获取数据后回自动作为一家消费 成功 无需在 一定要 stop之后才会 提交 但是也是有限制的
// 时间越小提交的时间越快,时间越大提交的间隔也就越大 当获取一条数据之后就抛出异常时 更具获取之后的时间来计算是否算作处理完成
// 时间小于这个时间时抛出异常 则不会更新offset 如果大于这个时间则会直接更新offset 建议设置为 100~1000之间
$KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000);

// 获取Consumer实例
$consumer = $KafKa_Lite->newConsumer();

// 开启Consumer获取,参数分别为partition(默认:0),offset(默认:KAFKA_OFFSET_STORED)
$consumer->consumerStart(0);

for ($i = 0; $i < 100; $i++) {
    // 当获取不到数据时会阻塞默认10秒可以通过$consumer->setTimeout()进行设置
    // 阻塞后由数据能够获取会立即返回,超过10秒回返回null,正常返回格式为Kafka_Message
    $message = $consumer->consume();
}

// 关闭Consumer(不关闭程序不会停止)
$consumer->consumerStop();

3. 配置文件

See-kafka提供两种配置文件的配置,分别传入key和value,具体配置项已经作用参看如下地址:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

配置文件说明:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

$KafKa_Lite->setTopicConf();
$KafKa_Lite->setKafkaConf();

在使用Consumer的Group(KAFKAOFFSETSTORED)中需要注意以下配置项,否则你在使用一个新的group会从当前开始计算offset(根据场景):

// 此项设置决定 在使用一个新的group时  是从 最小的一个开始 还是从最大的一个开始  默认是最大的(或尾部)
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');

Consumer获取之后是需要提交告诉KafKa获取成功并且更新offset,但是如果中途报错没有提交offset则下次还是会从头获取,此项配置设置一个自动提交时间,当失败后之前处理的也会吧offset提交到KafKa:

// 此项配置决定在获取数据后回自动作为一家消费 成功 无需在 一定要 stop之后才会 提交 但是也是有限制的
// 时间越小提交的时间越快,时间越大提交的间隔也就越大 当获取一条数据之后就抛出异常时 更具获取之后的时间来计算是否算作处理完成
// 时间小于这个时间时抛出异常 则不会更新offset 如果大于这个时间则会直接更新offset 建议设置为 100~1000之间
$KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000);

4. 异常

在初始化KafKa_Lite会对集群端口进行验证,如果无任何一个可用的则会抛出一个No can use KafKa异常,也可以主动触发ping操作检查集群是否有有可用机器

当获取Consumer异常了会抛出一个KafKaExceptionBase异常,异常有一个code号可参考,Exception/err.php文件,推荐使用try-catch进行处理

5. 总结

See-KafKa的宗旨是为了更加方便把KafKa和PHP相结合,并且能够方便的进行使用,如果大家感兴趣可以使用看看,有问题可以进行反馈,此拓展作者会长期维护下去!

  • 本文介绍基于docker搭建的confluent-kafka及其python接口的使用。 本文只搭建了一个单Broker的confluent-kafka测试环境,考虑到高可用、高吞吐等因素,正式生产环境一般至少要3个节点。 本文采用的系统配置如下: LinuxMint 20.3 (兼容 Ununtu 20.04) docker 20.10.21 docker-compose 2.14.2 pyth

  • Flume部署 解压到指定目录 tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local/src/ 重命名 cd /usr/local/src mv apache-flume-1.7.0-bin/ flume 配置环境变量 vi /etc/profile export FLUME_HOME=/usr/local/src/flume expor

  • I have a simple Producer-Consumer setup: 1 producer(as a thread) and 2 consumers(as 2 processes). The run method of producer: def run(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_s

  • logstash-input-kafka的所有版本见: https://rubygems.org/gems/logstash-input-kafka/versions 2.0.2 - October 14, 2015 (11.5 KB) 2.0.1 - October 8, 2015 (11.5 KB) 2.0.0 - September 23, 2015 (11.5 KB) 1.0.1 - Oc

  • 1 集群规划 安装Kafka前需先安装Zookeeper服务,Zookeeper部署参考:https://blog.csdn.net/qq_48671620/article/details/125154389?spm=1001.2014.3001.5501。 node01 node02 node03 Zookeeper follower leader follower Kafka kafka ka

  • 一、前言 二、Kafka Producer 2.1、构造函数 Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。 构造器接收下列参数: 事件被写入的默认输出 topic 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema Kafka

 相关资料
  • 我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于

  • 在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能

  • Kafka-php 使用纯粹的 PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document, 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka broker 交互,较 v0.1.x 更加稳定

  • 主要内容:什么是消息系统?,什么是Kafka?,优点,用例在大数据中,使用了大量的数据。 关于大数据,主要有两个主要挑战。第一个挑战是如何收集大量数据,第二个挑战是分析收集的数据。 为了克服这些挑战,需要使用消息传递系统。 Kafka专为分布式高吞吐量系统而设计。 Kafka倾向于非常好地取代传统的信息中间服务者。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有容错功能,因此非常适合大型消息处理应用程序。 什么是消息系统? 消息

  • 一、简介 ApacheKafka 是一个分布式的流处理平台。它具有以下特点: 支持消息的发布和订阅,类似于 RabbtMQ、ActiveMQ 等消息队列; 支持数据实时处理; 能保证消息的可靠性投递; 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错; 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量。 二、基本概念 2.1 Messages And Ba

  • 主要内容:1. kafka基本介绍,1 简介,2 kafka的初步认识,2 kafka的高性能的原因,1 优秀架构之磁盘顺序写保证高性能,2 优秀架构之磁盘零拷贝机制保证读数据高性能,3 优秀架构之日志分段保存,稀疏索引文件,4 优秀架构之高并发网络设计,5 优秀架构之冗余副本保证高可用,6 kafka优秀架构思考总结,3 生产者原理,1 生产者案例展示,2 生产者消息发送原理-图解,3 调优 ,生产者如何提供吞吐量(配置参数修改),,,,,,,,,,,,,,,,,,,,,1. kafka基本介