先搭好kafka, 创建名为adsonlib的topic, 4个分区
kafka.h
#pragma once
#include "rdkafkacpp.h"
#include "status.h"
#include "glog/logging.h"
#include <iostream>
#include <memory>
namespace adsonlib {
//kafka 对k进行hash分片
class KafkaModeHasher : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb(const RdKafka::Topic *topic,
const std::string *key,
int32_t partition_cnt,
void *msg_opaque) override {
LOG(INFO) << "partitioner: " << (*(uint64_t*)(key->c_str())) << " mod " << partition_cnt;
return (*(uint64_t*)(key->c_str())) % partition_cnt;
}
};
//连接到kafka后,各种回调事件
class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
LOG(ERROR) << "kafka: ERROR (" << RdKafka::err2str(event.err())
<< "): " << event.str() ;
break;
case RdKafka::Event::EVENT_STATS:
LOG(ERROR) << "kafka: STATS : " << event.str() ;
break;
case RdKafka::Event::EVENT_LOG:
LOG(INFO) << "kafka: event:" << event.severity() << event.fac() << event.str();
break;
default:
LOG(INFO) << "kafka EVENT " << event.type() << " ("
<< RdKafka::err2str(event.err()) << "): " << event.str()
;
break;
}
}
};
//kafka发送消息后,回调
class KafkaDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
std::string status_name;
switch (message.status()) {
case RdKafka::Message::MSG_STATUS_NOT_PERSISTED:
status_name = "NotPersisted";
break;
case RdKafka::Message::MSG_STATUS_POSSIBLY_PERSISTED:
status_name = "PossiblyPersisted";
break;
case RdKafka::Message::MSG_STATUS_PERSISTED:
status_name = "Persisted";
break;
default:
status_name = "Unknown?";
break;
}
LOG(INFO) << "Message delivery for (" << message.len()
<< " bytes): " << status_name << ": " << message.errstr()
;
if (message.key())
std::cout << "Key: " << *(message.key()) << ";" ;
}
};
//kafka消费消息回调
class DefaultKafkaConsumeCb : public RdKafka::ConsumeCb {
public:
virtual ~DefaultKafkaConsumeCb(){}
void consume_cb(RdKafka::Message &msg, void *opaque) {
handle_msg(&msg, opaque);
}
void handle_msg(RdKafka::Message *message, void *opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
HandleTimeout(message, opaque);
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
HandleMsg(message, opaque);
break;
case RdKafka::ERR__PARTITION_EOF:
HandlePartitionEOF(message, opaque);
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
HandleError(message, opaque);
break;
default:
HandleError(message, opaque);
}
}
protected:
virtual void HandleTimeout(RdKafka::Message *message, void *opaque) {
}
virtual void HandlePartitionEOF(RdKafka::Message *message, void *opaque) {
}
virtual void HandleError(RdKafka::Message *message, void *opaque) {
LOG(ERROR) << "Consume failed: " << message->errstr();
}
virtual void HandleMsg(RdKafka::Message *message, void *opaque) {
LOG(INFO) << "Read msg at offset " << message->offset();
if (message->key()) {
LOG(INFO) << "Key: " << *(const int64_t*)(message->key()->c_str());
}
auto headers = message->headers();
if (headers) {
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
for (size_t i = 0; i < hdrs.size(); i++) {
const RdKafka::Headers::Header hdr = hdrs[i];
if (hdr.value() != NULL)
printf(" Header: %s = \"%.*s\"\n", hdr.key().c_str(),
(int)hdr.value_size(), (const char *)hdr.value());
else
printf(" Header: %s = NULL\n", hdr.key().c_str());
}
}
printf("%.*s\n", static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
}
};
//kafka 基本配置
struct KafkaConfig {
std::string brokers = "localhost:9092";
std::string topic = "adsonlib";
std::string compress_codec;
int64_t statistics_interval_ms = 5000;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
int64_t stop_offset = RdKafka::Topic::OFFSET_END;
int use_ccb = 1;
int msgflags = RdKafka::Producer::RK_MSG_COPY;
virtual ~KafkaConfig(){}
};
//把kafka producer和consumer封装在了同一个client里。也可以分开封装
struct KafkaClient {
std::unique_ptr<RdKafka::PartitionerCb> mod_partitioner = std::make_unique<KafkaModeHasher>();
std::unique_ptr<RdKafka::Conf> conf; //全局配置
std::unique_ptr<RdKafka::Conf> tconf; //topic配置
std::unique_ptr<RdKafka::EventCb> ex_event_cb;
std::unique_ptr<RdKafka::DeliveryReportCb> ex_dr_cb;
std::unique_ptr<RdKafka::Producer> producer;
std::unique_ptr<RdKafka::Consumer> consumer;
std::unique_ptr<RdKafka::Topic> producer_topic; //topic对象
std::unique_ptr<RdKafka::Topic> consumer_topic;
std::shared_ptr<RdKafka::ConsumeCb> consumer_cb;
void SetConf(const KafkaConfig &cfg) {
config = cfg;
}
KafkaClient(){
conf.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
tconf.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
ex_event_cb.reset(new KafkaEventCb);
ex_dr_cb.reset(new KafkaDeliveryReportCb);
consumer_cb.reset(new DefaultKafkaConsumeCb);
}
~KafkaClient(){
}
Status SetParitioner(std::unique_ptr<RdKafka::PartitionerCb>cb = nullptr){
if(cb){
mod_partitioner = std::move(cb);
}
std::string errstr;
if (mod_partitioner && tconf->set("partitioner_cb", mod_partitioner.get(), errstr) != RdKafka::Conf::CONF_OK) {
return Status("set kakfa prtitioner failed: " + errstr);
}
return 0;
}
Status SetCompressCodec(const std::string &codec = ""){
if(codec.size() > 0){
config.compress_codec = codec;
}
std::string errstr;
if (config.compress_codec.size() > 0 && conf->set("compression.codec", config.compress_codec, errstr) != RdKafka::Conf::CONF_OK) {
return Status("set kakfa compress codec failed: " + errstr);
}
return 0;
}
Status SetStatIntvl(int64_t intvl_ms = -1) {
if(intvl_ms <= 0) return 0;
std::string errstr;
config.statistics_interval_ms = intvl_ms;
if (conf->set("statistics.interval.ms", std::to_string(config.statistics_interval_ms), errstr) != RdKafka::Conf::CONF_OK) {
return Status("set kakfa statistics_interval_ms failed: " + errstr);
}
return 0;
}
Status SetBrokers(const std::string &svrs = ""){
if(svrs.size() > 0) {
config.brokers = svrs;
}
std::string errstr;
if (conf->set("metadata.broker.list", config.brokers, errstr) != RdKafka::Conf::CONF_OK) {
return Status("set kafka broker fail: " + errstr);
}
return 0;
}
Status SetEventCb(std::unique_ptr<RdKafka::EventCb> cb = nullptr) {
std::string errstr;
if(cb) {
ex_event_cb = std::move(cb);
}
if (conf->set("event_cb", ex_event_cb.get(), errstr) != RdKafka::Conf::CONF_OK) {
return Status("set kafka event_cb fail: " + errstr);
}
return 0;
}
Status SetDeliveryReport(std::unique_ptr<RdKafka::DeliveryReportCb> cb = nullptr){
if(cb){
ex_dr_cb = std::move(cb);
}
std::string errstr;
if(conf->set("dr_cb", ex_dr_cb.get(), errstr) != RdKafka::Conf::CONF_OK) {
return Status("set kafka delivery report fail: " + errstr);
}
return 0;
}
Status SetTopic(){
std::string errstr;
if(conf->set("default_topic_conf", tconf.get(), errstr) != RdKafka::Conf::CONF_OK) {
return Status("set kafka tconf fail: " + errstr);
}
return 0;
}
Status CreateProducer(){
std::string errstr;
if(!conf) {
return Status("conf not ready");
}
producer.reset(RdKafka::Producer::create(conf.get(), errstr));
if(!producer){
return Status("create kafka producer fail: " + errstr);
}
LOG(INFO) << "created producer: " << producer->name();
return 0;
}
Status CreateConsumer(){
std::string errstr;
consumer.reset(RdKafka::Consumer::create(conf.get(), errstr));
if(!consumer){
return Status("createkafka consumer fail: " + errstr);
}
LOG(INFO) << "created producer: " << consumer->name();
return 0;
}
Status CreateConsumerTopic(){
std::string errstr;
if(!consumer) {
return Status("consumer not ready");
}
consumer_topic.reset(RdKafka::Topic::create(consumer.get(), config.topic, tconf.get(), errstr));
if(!consumer_topic){
return Status("create kafka consumer topic fail: " + errstr);
}
return 0;
}
Status CreateProducerTopic(){
if(!producer) {
return Status("producer not ready");
}
std::string errstr;
producer_topic.reset(RdKafka::Topic::create(producer.get(),config. topic, tconf.get(), errstr));
if(!producer_topic){
return Status("create kafka consumer topic fail: " + errstr);
}
return 0;
}
Status StartConsumer(int partition, int64_t start_offset = -1){
if(start_offset == -1){
start_offset = config.start_offset;
}
RdKafka::ErrorCode resp = consumer->start(consumer_topic.get(), partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
return Status("Failed to start consumer: " + RdKafka::err2str(resp));
}
return 0;
}
Status Consume(int partition, int64_t timeout = 1000, std::function<void(RdKafka::Message *, void *)> cb = nullptr){
if (config.use_ccb) {
if(-1 == consumer->consume_callback(consumer_topic.get(), partition, timeout, consumer_cb.get(), &config.use_ccb)) {
return Status("sonsume fail");
}
} else {
RdKafka::Message *msg = consumer->consume(consumer_topic.get(), partition, 1000);
cb(msg, NULL);
delete msg;
}
return 0;
}
Status StopConsumer(int partition = -1, int64_t timeout = 1000) {
auto err = consumer->stop(consumer_topic.get(), partition == -1 ? config.partition : partition);
Status status;
if(err) {
status = "stop kafka consumer fail: " + RdKafka::err2str(err);
LOG(INFO) << status;
}
consumer->poll(timeout);
return status;
}
Status ConsumerPoll(int64_t timeout = 1000){
auto code = consumer->poll(timeout);
return 0;
}
Status ProducerFlush(int64_t timeout = 1000){
if(!producer) return 0;
auto code = producer->flush(timeout);
if(code && code != RdKafka::ERR__TIMED_OUT) {
return Status("flush fail: " + RdKafka::err2str(code));
}
return 0;
}
Status Produce(
const std::string &key,
const std::string &msg,
int partition = RdKafka::Topic::PARTITION_UA, //可以指定partition,也可以不指定,不指定就对key进行partition计算
const std::vector<std::pair<std::string, std::string>> &headers = {},
void *opaque = nullptr,
int64_t ts = 0){
std::unique_ptr<RdKafka::Headers> h;
if(headers.size() > 0) {
h.reset(RdKafka::Headers::create());
for(auto &pair: headers) {
h->add(pair.first, pair.second);
}
}
const char *k = key.size() > 0 ? key.c_str() : nullptr;
size_t len = key.size();
RdKafka::ErrorCode resp = producer->produce(
config.topic,
partition,
config.msgflags /* Copy payload */,
/* Value */
(void*)msg.c_str(), msg.size(),
/* Key */
k, len,
/* Timestamp (defaults to now) */
ts,
/* Message headers, if any */
h.get(),
/* Per-message opaque value passed to
* delivery report */
opaque);
if (resp != RdKafka::ERR_NO_ERROR) {
return Status("produce failed: " + RdKafka::err2str(resp));
}
return 0;
}
Status ProducerPoll(int64_t timeout = 1000){
producer->poll(timeout);
return 0;
}
void DumpConf(){
int pass;
for (pass = 0; pass < 2; pass++) {
std::list<std::string> *dump;
if (pass == 0) {
dump = conf->dump();
std::cout << "# Global config" << std::endl;
} else {
dump = tconf->dump();
std::cout << "# Topic config" << std::endl;
}
for (std::list<std::string>::iterator it = dump->begin();
it != dump->end();) {
std::cout << *it << " = ";
it++;
std::cout << *it << std::endl;
it++;
}
std::cout << std::endl;
}
}
private:
KafkaConfig config;
};
} //namespace adsonlib
kafka_test.cpp
#include "gtest/gtest.h"
#include "glog/logging.h"
#include "gflags/gflags.h"
#include "rdkafkacpp.h"
#include "kafka.h"
using namespace adsonlib;
#define ASSERT_TRUE_LOG(s) \
do{\
auto __st__ = (s);\
if(!__st__){ \
LOG(INFO) << __st__;\
} \
ASSERT_TRUE(__st__);\
}while(0)
class KafkaBase {
public:
virtual ~KafkaBase(){}
void Init(){
ASSERT_TRUE(client.SetParitioner());
ASSERT_TRUE(client.SetCompressCodec());
ASSERT_TRUE(client.SetStatIntvl(5000));
ASSERT_TRUE(client.SetBrokers());
ASSERT_TRUE(client.SetEventCb());
ASSERT_TRUE(client.SetDeliveryReport());
ASSERT_TRUE(client.SetTopic());
}
protected:
KafkaClient client;
};
class ProducerTest : public ::testing::Test, public KafkaBase{
public:
void SetUp() override {
client.SetConf(config);
Init();
ASSERT_TRUE(client.CreateProducer());
ASSERT_TRUE(client.CreateProducerTopic());
// client.DumpConf();
LOG(INFO) << "setup one test";
}
void TearDown() override {
LOG(INFO) << "teardown one fixture";
}
static void SetUpTestCase() {
LOG(INFO) << "setup test begin";
}
static void TearDownTestCase() {
LOG(INFO) << "teardown test finish";
}
KafkaConfig config;
};
class ConsumerTest : public ::testing::Test, public KafkaBase {
public:
void SetUp() override {
LOG(INFO) << "setup one test";
client.SetConf(config);
Init();
ASSERT_TRUE(client.CreateConsumer());
ASSERT_TRUE(client.CreateConsumerTopic());
// client.DumpConf();
LOG(INFO) << "setup one test";
}
void TearDown() override {
LOG(INFO) << "teardown one fixture";
}
static void SetUpTestCase() {
LOG(INFO) << "setup test begin";
}
static void TearDownTestCase() {
LOG(INFO) << "teardown test finish";
}
KafkaConfig config;
};
TEST_F(ProducerTest, Init){
LOG(INFO) << "init kafka producer";
int64_t id = 21;
std::string key((char*)&id, sizeof(id));
std::string msg = "hello test msg auto partition";
ASSERT_TRUE(client.Produce(key, msg));
ASSERT_TRUE(client.ProducerPoll());
ASSERT_TRUE_LOG(client.ProducerFlush(5000));
}
TEST_F(ConsumerTest, Init){
LOG(INFO) << "init kafka consumer";
ASSERT_TRUE_LOG(client.StartConsumer(1));
ASSERT_TRUE_LOG(client.Consume(1));
ASSERT_TRUE_LOG(client.ConsumerPoll());
ASSERT_TRUE_LOG(client.StopConsumer(1));
}