当前位置: 首页 > 知识库问答 >
问题:

如何使用Kafka节点从Kafka主题中删除消息

金钊
2023-03-14

我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。

当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在consumer中得到安慰。on('message',函数(message){})

我只需要最后一条消息,这是生产者发送的。

var kafka = require('kafka-node');
const Admin = require('kafka-node/lib/admin');
var Producer = kafka.Producer,
client = new kafka.KafkaClient(),
producer = new Producer(client);
producer.on('ready', function () {
    console.log('Producer is ready');
});

producer.on('error', function (err) {
    console.log('Producer is in error state');
    console.log(err);
})

const createProducer = async(req,res,next) => {
    var sentMessage = JSON.stringify(req.body.messages);
    payloads = [
        { topic: req.body.topic, messages: sentMessage, partition: 0}
    ];
    producer.createTopics(req.body.topic, function(err,data){
        console.log("createtopic error"+err);
    })
    producer.send(payloads, function( err, data) {
        console.log(payloads);
    })
    res.send({
        message: 'Successfully send data from producer',
        payloads: payloads
    })
}

module.exports = {createProducer}
var kafka = require('kafka-node');

const createConsumer = async(req,res,next) =>{
    var topic = req.params.topic;
    
    Consumer = kafka.Consumer,
    client = new kafka.KafkaClient(),
    // offset = new kafka.Offset(client),
    consumer = new Consumer(client,
        [{ topic: topic, offset: 0}],
        {
            autoCommit: false,

        }
    );
    consumer.on('message', function (message) {
        console.log('message : ',message.value);
    });

    res.send({
        message: "consumer created : ",
        topic: topic
    })
    consumer.on('error', function (err) {
        console.log('Error:',err);
    })
    
    consumer.on('offsetOutOfRange', function (err) {
        console.log("absbs");
        topic.maxNum = 2;
        offset.fetch([topic], function (err, offsets) {
            if (err) {
              return console.error(err);
            }
            var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
            consumer.setOffset(topic.topic, topic.partition, min);
        });
    })
}

module.exports = {createConsumer}
const express = require("express");
const kafkaConsumer = require("../kafka/consumer") // create routes for consumer
const kafkaproducer = require("../kafka/producer") // create routes for producer

router.post("/kafka/producer",kafkaproducer.createProducer);
router.get("/kafka/consumer/:topic",kafkaConsumer.createConsumer);

module.exports = router;
var express = require('express');
var app = express();
var kafka = require('kafka-node');
var router = require('./routes/routes');
const {
    PORT
  } = require("./config/config");

var bodyParser = require('body-parser');
app.use( bodyParser.json());
app.use(bodyParser.urlencoded({
  extended: true
}));

app.listen(PORT, function(){
    console.log('Kafka producer running at 5000');
});

app.use('/', router)

module.export = app;

如何在不使用任何数组或任何东西的情况下获取最后一条消息。

有没有办法删除这个话题?

谢谢

共有2个答案

申屠浩歌
2023-03-14

我对代码做了一些更改,现在我可以从我的主题中检索最新消息。

我已经创建了消费者内部的offset.fetchLatestOffset([主题], cb),并在消费者选项中进行了一些更改。

请检查这个答案

经伟
2023-03-14

您必须设置消费者偏移量最新的,这是给定主题的最后一条发布消息。

consumer = new Consumer(client,
    [{ topic: topic, offset: latest}],
    {
        autoCommit: false,

    }

删除一个主题的命令如下

kafka-topics.sh --zookeeper <zk>:2181 --delete --topic <topic name>

注意:如果删除,这将不会产生影响。话题enable未设置为true。

 类似资料:
  • 向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?

  • 距今已过去数小时,话题仍未删除。 我看到了一些建议,建议我将放在我的中,然后重新启动Kafka。我试过这个。没奏效。 (为什么默认不设置这个?) 我可以关闭kafka和zookeeper,运行,然后再次启动zookeeper和kafka。但这是相当激烈的。确实应该有一些方法来说服实际上删除一个主题?

  • 因此,我是使用Apache Kafka的新手,我正在尝试创建一个简单的应用程序,以便我可以尝试更好地理解API。我知道这个问题在这里被问了很多,但是我该如何清除存储在主题上的消息/记录? 我看到的大部分回答都是说更改消息保留时间或者删除

  • 我想删除所有空的Kafka主题(定期从cron)。我在文档中找不到一个这样做的命令?转到脚本: 但是,这包括已经过期的消息?在不使用消费者的情况下,如何在主题中找到实际的当前计数?

  • 我们执行以下步骤以删除主题-hgpo.llo.prmt.processed 但即使在12小时后,主题文件夹仍未从/var/kafka/kafka-logs中删除 注意-我们set-delete.topic.enable=true 在/var/kafka/kafka-logs下,我们有许多主题文件夹,如: ..

  • 我正在寻找一种从Kafka主题中删除(完全删除)已消费记录的方法。我知道有几种方法可以做到这一点,例如更改主题的保留时间或删除Kafka logs文件夹。但我要寻找的是一种使用Java API删除某个主题的一定数量记录的方法,如果可能的话。 我试过测试AdminClient API,特别是AdminClient。deleteRecords(recordsToDelete)方法。但如果我没弄错的话,