当前位置: 首页 > 工具软件 > librdkafka > 使用案例 >

Qt 中使用librdkafka librdkafka++ 创建消费者

邓鸿雪
2023-12-01

1、首先需要编译rdkafka  编译kafka

2、编译好kafka 我们只需要用到 librdkafka.lib librdkafkacpp.lib librdkafka.dll librdkafka.dll (本人编译的是 windows下的release x64位 版本)这四个文件

3、在Qt 中将kafka消费者封装在线程中

      头文件.h

#ifndef UDPCLIENT_H
#define UDPCLIENT_H
#include <QThread>
#include "rdkafkacpp.h"


class udpclient : public QThread
{
    Q_OBJECT

public:
    explicit udpclient(std::string, QObject *parent = nullptr);
    ~udpclient();

public:
    void msg_consume(RdKafka::Message* message, void* opaque);

    void setRun();


private:
    //重写线程执行函数
    void run();
    std::string m_kafkaIp;

signals:
    void valueUpdate(const QString& str, const std::string& str1);


};

#endif // UDPCLIENT_H

源文件.cpp

#include "udpclient.h"
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <signal.h>


static bool m_run = true;
static bool exit_eof = true;
static int eof_cnt = 0;
static int partition_cnt = 0;
static int verbosity = 1;
static long msg_cnt = 0;
static int64_t msg_bytes = 0;

static long msg_cnt1 = 0;
static int64_t msg_bytes1 = 0;

static void sigterm (int sig) {
    m_run = false;
}


class ExampleEventCb : public RdKafka::EventCb {
public:
    void event_cb (RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr<<"ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str();
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
                m_run = false;
            break;

        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                    event.severity(), event.fac().c_str(), event.str().c_str());
            break;

        case RdKafka::Event::EVENT_THROTTLE:
            std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
                         event.broker_name() << " id " << (int)event.broker_id() << std::endl;
            break;

        default:
            std::cerr << "EVENT " << event.type() <<
                         " (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str() << std::endl;
            break;
        }
    }
};



class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:

    void msg_consume(RdKafka::Message* message, void* opaque) {
    }

    void consume_cb (RdKafka::Message &msg, void *opaque) {
//        msg_consume(&msg, opaque);
    }
};

udpclient::udpclient(std::string ip, QObject *parent):
    QThread(parent)
{
    m_kafkaIp = ip;
}

udpclient::~udpclient()
{

}

void udpclient::msg_consume(RdKafka::Message* message, void* opaque) {
    switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
        //std::cerr << "RdKafka::ERR__TIMED_OUT"<<std::endl;
        break;

    case RdKafka::ERR_NO_ERROR:
        /* Real message */
        msg_cnt++;
        msg_bytes += message->len();
        if (verbosity >= 3)
            std::cerr << "Read msg at offset " << message->offset() << std::endl;
        RdKafka::MessageTimestamp ts;
        ts = message->timestamp();
        if (verbosity >= 2 &&
                ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
            std::string tsname = "?";
            if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
                tsname = "create time";
            else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME)
                tsname = "log append time";
            std::cout << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;
        }
        if (verbosity >= 2 && message->key()) {
            std::cout << "Key: " << *message->key() << std::endl;
        }
        if (verbosity >= 1) {
            //数据在这里可以进行处理
            /*
                处理代码
            */
            emit valueUpdate(static_cast<const char *>(message->payload()), *message->key());

        }
        break;

    case RdKafka::ERR__PARTITION_EOF:
        /* Last message */
        if (exit_eof && ++eof_cnt == partition_cnt) {
            std::cerr << "%% EOF reached for all " << partition_cnt <<
                         " partition(s)" << std::endl;
            m_run = false;
        }
        break;

    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        m_run = false;
        break;

    default:
        /* Errors */
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        m_run = false;
    }
}


void udpclient::setRun()
{
    m_run = false;
}

//重写线程执行函数
void udpclient::run()
{

    std::string brokers = m_kafkaIp;
    std::string errstr;
    std::string topic_str="主题名";
//  std::string topic_str1 = "xxx";
//  std::string topic_str2 = "xxx";
//  std::string topic_str3 = "xxxs";
    std::vector<std::string> topics, topics1;
    std::string group_id="2221";
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    //group.id必须设置
    if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
           std::cerr << errstr << std::endl;
           exit(1);
         }

    topics.push_back(topic_str);
//   topics.push_back(topic_str2);
//   topics1.push_back(topic_str1);
//   topics1.push_back(topic_str3);
   //bootstrap.servers可以替换为metadata.broker.list
    conf->set("bootstrap.servers", brokers, errstr);
    ExampleConsumeCb ex_consume_cb;
    conf->set("consume_cb", &ex_consume_cb, errstr);

    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);
    conf->set("default_topic_conf", tconf, errstr);

    signal(SIGINT, sigterm);
    signal(SIGTERM, sigterm);

    RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);

    if (!consumer) {
        std::cerr << "Failed to create consumer: " << errstr << std::endl;
        exit(1);
    }
    std::cout << "% Created consumer " << consumer->name() << std::endl;
    RdKafka::ErrorCode err = consumer->subscribe(topics);
    if (err) {
        std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
                  << RdKafka::err2str(err) << std::endl;
        exit(1);
    }
    while (m_run)
    {

        //0毫秒未订阅到消息,触发RdKafka::ERR__TIMED_OUT
        RdKafka::Message *msg = consumer->consume(0);
        consumer->commitSync();
        msg_consume(msg, NULL);
        delete msg;
        msleep(100);

    }

    consumer->close();
    delete conf;
    delete tconf;
    delete consumer;

    std::cerr << "% Consumed " << msg_cnt << " messages ("
              << msg_bytes << " bytes)" << std::endl;

    //应用退出之前等待rdkafka清理资源
    RdKafka::wait_destroyed(1000);
    quit();

}



4、使用前需要将librdkafka.lib 和 librdkafkacpp.lib库文件添加到你自己的项目中

5、添加完库后,编译你自己的项目的时候需要注意编译器的选择需要和 librdkafka.lib库文件配套。比如你编译完的librdkafka.lib 是release x64位的,那么选择编译器来编译项目的时候也需要选择 release x64的

6、项目如果编译没有报错,但是运行的时候直接奔溃的话,只需要把librdkafka.dll librdkafka.dll这两个dll 放到你编译输出的xx.exe同级目录下即可

 类似资料: