当前位置: 首页 > 工具软件 > librdkafka > 使用案例 >

Qt 中使用librdkafka librdkafka++ 创建生产者

顾恺
2023-12-01

1、如果还没有编译rdkafka 可以查看我的另一篇博客 创建消费者

2 、Qt 中使用rdkafka 创建生产者

      生产者.h

#ifndef PRODUCER_H
#define PRODUCER_H

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <QDebug>
#include "rdkafkacpp.h"
#include <QObject>


static bool m_run = true;

static void sigterm (int sig) {
    m_run = false;
}

class ExampleEventCb : public RdKafka::EventCb {
public:
    void event_cb (RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str() << std::endl;
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
                m_run = false;
            break;

        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                    event.severity(), event.fac().c_str(), event.str().c_str());
            break;

        default:
            std::cerr << "EVENT " << event.type() <<
                         " (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str() << std::endl;
            break;
        }
    }
};


class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb (RdKafka::Message &message) {
        std::cout << "Message delivery for (" << message.len() << " bytes): " <<
                     message.errstr() << std::endl;
//        if (message.key())
//            std::cout << "Key: " << *(message.key()) << ";" << std::endl;
    }
};


class Producer : public QObject
{
    Q_OBJECT
public:
    explicit Producer(std::string, QObject *parent=nullptr);
    ~Producer();

signals:

private:
    RdKafka::Conf *conf;
    RdKafka::Conf *tconf;
    RdKafka::Producer *producer;
    RdKafka::Topic *topic;
    ExampleDeliveryReportCb ex_dr_cb;
    ExampleEventCb ex_event_cb;


public:
    void produceMessage(std::string message, std::string key);
};

#endif // PRODUCER_H

源文件.cpp

#include "producer.h"



std::string errstr;
std::string topic_str="主题名";
int32_t partition = RdKafka::Topic::PARTITION_UA;


Producer::Producer(std::string ip, QObject *parent):
    QObject(parent)
{
    std::string brokers = ip;
    conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("bootstrap.servers", brokers, errstr);

    conf->set("event_cb", &ex_event_cb, errstr);

//    signal(SIGINT, sigterm);
//    signal(SIGTERM, sigterm);


    conf->set("dr_cb", &ex_dr_cb, errstr);

    producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }
    std::cout << "% Created producer " << producer->name() << std::endl;

    topic = RdKafka::Topic::create(producer, topic_str,
                                                   tconf, errstr);
    if (!topic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        exit(1);
    }
}

Producer::~Producer()
{

    m_run = true;
    // 退出前处理完输出队列中的消息
    while (m_run && producer->outq_len() > 0) {
//        qDebug()<< "Waiting for " << producer->outq_len();
        producer->poll(1000);
    }
    RdKafka::wait_destroyed(1000);

    delete conf;
    delete tconf;
    delete topic;
    delete producer;

}



void Producer::produceMessage(std::string message, std::string key)
{

    RdKafka::ErrorCode resp =
            producer->produce(topic, partition,
                              RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                              const_cast<char *>(message.c_str()), message.size(),
                              const_cast<char *>(key.c_str()), key.size(),
                              NULL);

    if (resp != RdKafka::ERR_NO_ERROR)
        qDebug()<< "% Produce failed: " <<QString::fromStdString(RdKafka::err2str(resp));
    else
//        qDebug()<< "% Produced message (" << message.size() << " bytes)";
    qDebug()<<QString::fromStdString(message)<<QString::fromStdString(key);
    producer->poll(0);
}

3、如果要修改主题名只需要将 topic_str 的值改成你想要设置的主题

4、发送消息的时候只需要调用 produceMessage(msg, key)这个函数

 类似资料: