当前位置: 首页 > 知识库问答 >
问题:

暂停和恢复后,Kafka消费者未消费消息

丁立果
2023-03-14

我正在使用这个node-rdkafka库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中暂停消费者恢复消费者,但问题是在恢复消费者后它停止了消费消息。

这是我的代码。

const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const log_divider = '-----------------------------------';
const consumer = new Kafka.KafkaConsumer({
    'group.id':'gsuite_consumer',
    'metadata.broker.list': '*******',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': '********',
    'sasl.password': '********',
    'security.protocol': 'SASL_SSL',
    'enable.auto.commit':false
}, {});

// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
    if (err) {
        console.log(`Error connecting to Kafka broker: ${err}`);
        process.exit(-1);
    }

    console.log("Connected to Kafka broker");
});

consumer.on('disconnected', (args) => {
    console.error(`Consumer got disconnected: ${JSON.stringify(args)}`);
});

let max_queue_size = 3;
let current_queue = [];
let is_pause = false;
// register ready handler.
consumer.on('ready', (arg)=>{
    console.log('consumer ready.' + JSON.stringify(arg));
    console.log('Consumer is ready');
    consumer.subscribe([topic]);
    setInterval(function() {
        console.log('consumer has consume on :'+timeMs());  
        consumer.consume();
      }, 1000);
});

consumer.on('data',async (data)=>{
    console.log('************consumer is consuming data***********:'+timeMs());
    if(!is_pause) {
        is_pause = true;
        if(data && typeof data !== 'undefined') {
            try {
                console.log('consumer has received the data:'+timeMs());
                consumer.pause([topic]);
                console.log('consumer has pause the consuming:'+timeMs());
                await processMessage(data);
                console.log('consumer is resumed:'+timeMs());
                consumer.resume([topic]);
                console.log(log_divider);
                is_pause = false;
            } catch(error) {
                console.log('data consuming error');
                console.log(error);
            }
        } else {
            is_pause = false;
        }
    }
});

async function processMessage(data) {
   // await print_bulk(data);
    await processData(0,data);
}

async function print_bulk(data) {
    for(var i=0;i<data.length;i++) {
        await processData(i,data[i]);
    }
}


/**
 * Wait specified number of milliseconds.
 * @param ms
 */
async function wait(ms) {
    console.log('wait for the 3 sec');
    return new Promise((resolve) => setTimeout(resolve, ms));
}

var timeMs = ()=> {
    var d = new Date();
    var h = addZero(d.getHours(), 2);
    var m = addZero(d.getMinutes(), 2);
    var s = addZero(d.getSeconds(), 2);
    var ms = addZero(d.getMilliseconds(), 3);
    return h + ":" + m + ":" + s + ":" + ms;
}

var addZero = (x, n)=> {
    while (x.toString().length < n) {
        x = "0" + x;
    }
    return x;
}
async function processData(i,m) {
    if (m) {
        console.log('processing a data start:'+timeMs());
        console.log('Received a message:');
        console.log('  message: ' + m.value.toString());
        console.log('  key: ' + m.key);
        console.log('  size: ' + m.size);
        console.log('  topic: ' + m.topic);
        console.log('  offset: ' + m.offset);
        console.log('  partition: ' + m.partition);
        consumer.commitMessage(m);
    }
    await wait(3000);
    console.log('process a data completed:'+timeMs());
    // delete current_queue[i];
    // console.log('after delting lenght of current queue:'+current_queue.length);
    // console.log(log_divider);
    return true;
}

任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。

共有1个答案

狄赞
2023-03-14

我已经解决了这个问题。以及< code>consumer.pause()

所以会像这样

consumer.pause(consumer.assignments());
consumer.resume(consumer.assignments());
 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个Spring靴Kafka消费者 为了避免重新平衡,我尝试在KafkaContainer上调用pause()和resume(),但消费者总是在运行 我错过了什么吗?有人能指导我如何正确地达到要求的行为吗?

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外: