KafkaBridge

Apache Kafka 客户端 SDK
授权协议 MIT
开发语言 C/C++
所属分类 服务器软件、 JMS/消息中间件
软件类型 开源软件
地区 国产
投 递 者 沙靖琪
操作系统 跨平台
开源组织 360
适用人群 未知
 软件概览

KafkaBridge 是奇虎 360 开源的 Kafka 客户端 SDK ,底层基于 librdkafka ,与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的 Kafka 系统细节,只需调用极少量的接口,就可完成消息的生产和消费。此外,针对使用者比较关心的消息生产的可靠性,作了近一步的提升。

特点

  • 支持多种语言:c++/c、php、python、golang,且各语言接口完全统一;

  • 接口少,简单易用;

  • 针对高级用户,支持通过配置文件调整所有的 librdkafka 的配置;

  • 在非按 key 写入数据的情况下,尽最大努力将消息成功写入;

  • 支持同步和异步两种数据写入方式;

  • 在消费时,除默认自动提交 offset 外,允许用户通过配置手动提交 offset ;

  • 在 php-fpm 场景中,复用长连接生产消息,避免频繁创建断开连接的开销;

编译

依赖 liblog4cplus, boost, swig-3.0.12, cmake

git clone

git clone --recursive https://github.com/Qihoo360/kafkabridge.git

cxx/c

进入cxx/c目录,执行build.sh -release,在./lib/release下会产生libqbus.so。

go

进入go目录,执行build.sh,在gopath/src/qbus 目录下生成qbus.go和libQBus_go.so。

python

进入python目录,执行build.sh,在当前目录生成qbus.py和_qbus.so。 编译脚本提供了选项,可以通过-h查看。可以通过-s选项传递python相关头文件路径。默认-s /usr/local/python2.7/include/python2.7

php

进入python目录, 执行build.sh,当前目录生成扩展qbus.so和qbus.php。 编译脚本提供了选项,可以通过-h查看。可以通过s选项传递php相关头文件路径,可以通过-v传递php的版本。默认选项-s /usr/local/php -v php。

使用

数据生产

  • 在非按key写入的情况下,sdk尽最大努力提交每一条消息,只要Kafka集群存有一台broker正常,就会重试发送;

  • 每次写入数据只需要调用produce接口,在异步发送的场景下,通过返回值可以判断发送队列是否填满,发送队列可通过配置文件调整;

  • 在同步发送的场景中,produce接口返回当前消息是否写入成功,但是写入性能会有所下降,CPU使用率会有所上升,推荐还是使用异步写入方式;。

  • 下面是生产接口,以c++为例:

bool QbusProducer::init(const string& broker_list, const string& log_path, const string& config_path, const string& topic)
bool QbusProducer::produce(const char* data, size_t data_len, const std::string& key)
void QbusProducer::uninit()
  • c++ sdk的使用范例:

#include <string>
#include <iostream>
#include "qbus_producer.h"

int main(int argc, const char* argv[]) {
    qbus::QbusProducer qbus_producer;
    if (!qbus_producer.init("127.0.0.1:9092",
                    "./log",
                    "./config",
                    "topic_test")) {
        std::cout << "Failed to init" << std::endl;
        return 0;
    }

    std::string msg("test\n");
    if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
        std::cout << "Failed to produce" << std::endl;
    }

    qbus_producer.uninit();

    return 0;
}

数据消费

  • 消费只需调用subscribeOne订阅topic(也支持同时订阅多个topic),然后执行start就开始消费,当前进程非阻塞,每条消息通过callback接口回调给使用者;

  • sdk还支持用户手动提交offset方式,用户可以通过callback中返回的消息体,在代码其他逻辑中进行提交。

  • 下面是消费接口,以c++为例:

bool QbusConsumer::init(string broker_list, string log_path, string config_path, QbusConsumerCallback& callback)
bool QbusConsumer::subscribeOne(string group, string topic)
bool QbusConsumer::start()
void QbusConsumer::stop()
  • c++ sdk的使用范例:

#include <iostream>
#include "qbus_consumer.h"

qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
    public:
        virtual void deliveryMsg(const std::string& topic,
                    const char* msg,
                    const size_t msg_len) const {
            std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
        }

};

int main(int argc, char* argv[]) {
    MyCallback my_callback;
    if (qbus_consumer.init("127.0.0.1:9092",
                    "log",
                    "config",
                    my_callback)) {
        if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
            if (!qbus_consumer.start()) {
                std::cout << "Failed to start" << std::endl;
                return NULL;
            }

            while (1) sleep(1);  //可以执行其他业务逻辑

            qbus_consumer.stop();
        } else {
            std::cout << "Failed subscribe" << std::endl;
        }
    } else {
        std::cout << "Failed init" << std::endl;
    }
    return 0;
}
  • 导引\\ KafkaBridge 封装了对Kafka集群的读写操作,接口极少,简单易用,稳定可靠,支持c++/c、php、python、golang等多种语言,并特别针对php-fpm场景中作了长连接复用的优化,已在360公司内部广泛使用。\\ 前言\\ 众所周知,Kafka是近几年来大数据领域最流行的分布式流处理平台。它最初由LinkedIn公司开发, 已于2010年贡献给了Apache基金会并

  • 导引\\ KafkaBridge 封装了对Kafka集群的读写操作,接口极少,简单易用,稳定可靠,支持c++/c、php、python、golang等多种语言,并特别针对php-fpm场景中作了长连接复用的优化,已在360公司内部广泛使用。\\ 前言\\ 众所周知,Kafka是近几年来大数据领域最流行的分布式流处理平台。它最初由LinkedIn公司开发, 已于2010年贡献给了Apache基金会并

 相关资料
  • 我想在一些计算机之间建立点对点连接,这样用户就可以在没有外部服务器的情况下聊天和交换文件。我最初的想法如下: 我在服务器上制作了一个中央服务器插座,所有应用程序都可以连接到该插座。此ServerSocket跟踪已连接的套接字(客户端),并将新连接的客户端的IP和端口提供给所有其他客户端。每个客户端都会创建一个新的ServerSocket,所有客户端都可以连接到它。 换句话说:每个客户端都有一个Se

  • URI 方法 URI() string 返回当前客户端使用的服务器地址。 SetURI 方法 SetURI(uri string) 设置当前客户端使用的服务器地址。如果你想要设置多个服务器地址,请使用 SetURIList 方法代替该方法。 URIList 方法 URIList() []string 返回当前客户端可使用的服务器地址列表。 SetURIList 方法 SetURIList(uriL

  • 客户端事件通过 SetEvent 方法进行设置。 客户端事件有两个,它们分别定义为: type onErrorEvent interface { OnError(name string, err error) }   type onFailswitchEvent interface { OnFailswitch(Client) } 因为 go 语言不需要显式实现接口的特点,所以这两

  • 创建客户端有两种方式,一种是直接使用特化的构造器函数,另一种是使用工厂构造器函数。 第一种方式返回的是具体的客户端结构体指针对象,第二种方式返回的是客户端接口对象。 使用特化的构造器函数创建客户端 特化的构造器函数有下面几个: func NewHTTPClient(uri ...string) (client *HTTPClient) func NewTCPClient(uri ...string

  • 整个直播的业务架构是: +---------+ +-----------------+ +---------+ | Encoder +-->---+ SRS/CDN Network +--->---+ Player | +---------+ +-----------------+ +---------+ 客户端推流(Encoder)和播放器(

  • 客户端下载地址: windows 32位安装包 windows 64位安裝包 mac 安装包 Android App iOS App

  • 问题 你想使用网络上提供的服务。 解决方案 创建一个基本的 TCP 客户机。 在 Node.js 中 net = require 'net' domain = 'localhost' port = 9001 connection = net.createConnection port, domain connection.on 'connect', () -> console.log

  • 客户端调用服务端 服务端的proto文件copy到客户端 获取grpc客户端 //sample 为服务名称 var client = await provider.FindGrpcClient<RpcTest.RpcTestClient>("sample"); 调用服务方法 var result = await client.SayHelloAsync(new HelloRequest() {