1、如果还没有编译rdkafka 可以查看我的另一篇博客 创建消费者
2 、Qt 中使用rdkafka 创建生产者
生产者.h
#ifndef PRODUCER_H
#define PRODUCER_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <QDebug>
#include "rdkafkacpp.h"
#include <QObject>
static bool m_run = true;
static void sigterm (int sig) {
m_run = false;
}
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
m_run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &message) {
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
message.errstr() << std::endl;
// if (message.key())
// std::cout << "Key: " << *(message.key()) << ";" << std::endl;
}
};
class Producer : public QObject
{
Q_OBJECT
public:
explicit Producer(std::string, QObject *parent=nullptr);
~Producer();
signals:
private:
RdKafka::Conf *conf;
RdKafka::Conf *tconf;
RdKafka::Producer *producer;
RdKafka::Topic *topic;
ExampleDeliveryReportCb ex_dr_cb;
ExampleEventCb ex_event_cb;
public:
void produceMessage(std::string message, std::string key);
};
#endif // PRODUCER_H
源文件.cpp
#include "producer.h"
std::string errstr;
std::string topic_str="主题名";
int32_t partition = RdKafka::Topic::PARTITION_UA;
Producer::Producer(std::string ip, QObject *parent):
QObject(parent)
{
std::string brokers = ip;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("event_cb", &ex_event_cb, errstr);
// signal(SIGINT, sigterm);
// signal(SIGTERM, sigterm);
conf->set("dr_cb", &ex_dr_cb, errstr);
producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
topic = RdKafka::Topic::create(producer, topic_str,
tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}
Producer::~Producer()
{
m_run = true;
// 退出前处理完输出队列中的消息
while (m_run && producer->outq_len() > 0) {
// qDebug()<< "Waiting for " << producer->outq_len();
producer->poll(1000);
}
RdKafka::wait_destroyed(1000);
delete conf;
delete tconf;
delete topic;
delete producer;
}
void Producer::produceMessage(std::string message, std::string key)
{
RdKafka::ErrorCode resp =
producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(message.c_str()), message.size(),
const_cast<char *>(key.c_str()), key.size(),
NULL);
if (resp != RdKafka::ERR_NO_ERROR)
qDebug()<< "% Produce failed: " <<QString::fromStdString(RdKafka::err2str(resp));
else
// qDebug()<< "% Produced message (" << message.size() << " bytes)";
qDebug()<<QString::fromStdString(message)<<QString::fromStdString(key);
producer->poll(0);
}
3、如果要修改主题名只需要将 topic_str 的值改成你想要设置的主题
4、发送消息的时候只需要调用 produceMessage(msg, key)这个函数