我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。
当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在consumer中得到安慰。on('message',函数(message){})
。
我只需要最后一条消息,这是生产者发送的。
var kafka = require('kafka-node');
const Admin = require('kafka-node/lib/admin');
var Producer = kafka.Producer,
client = new kafka.KafkaClient(),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
const createProducer = async(req,res,next) => {
var sentMessage = JSON.stringify(req.body.messages);
payloads = [
{ topic: req.body.topic, messages: sentMessage, partition: 0}
];
producer.createTopics(req.body.topic, function(err,data){
console.log("createtopic error"+err);
})
producer.send(payloads, function( err, data) {
console.log(payloads);
})
res.send({
message: 'Successfully send data from producer',
payloads: payloads
})
}
module.exports = {createProducer}
var kafka = require('kafka-node');
const createConsumer = async(req,res,next) =>{
var topic = req.params.topic;
Consumer = kafka.Consumer,
client = new kafka.KafkaClient(),
// offset = new kafka.Offset(client),
consumer = new Consumer(client,
[{ topic: topic, offset: 0}],
{
autoCommit: false,
}
);
consumer.on('message', function (message) {
console.log('message : ',message.value);
});
res.send({
message: "consumer created : ",
topic: topic
})
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log("absbs");
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err);
}
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
})
}
module.exports = {createConsumer}
const express = require("express");
const kafkaConsumer = require("../kafka/consumer") // create routes for consumer
const kafkaproducer = require("../kafka/producer") // create routes for producer
router.post("/kafka/producer",kafkaproducer.createProducer);
router.get("/kafka/consumer/:topic",kafkaConsumer.createConsumer);
module.exports = router;
var express = require('express');
var app = express();
var kafka = require('kafka-node');
var router = require('./routes/routes');
const {
PORT
} = require("./config/config");
var bodyParser = require('body-parser');
app.use( bodyParser.json());
app.use(bodyParser.urlencoded({
extended: true
}));
app.listen(PORT, function(){
console.log('Kafka producer running at 5000');
});
app.use('/', router)
module.export = app;
如何在不使用任何数组或任何东西的情况下获取最后一条消息。
有没有办法删除这个话题?
谢谢
我对代码做了一些更改,现在我可以从我的主题中检索最新消息。
我已经创建了消费者内部的offset.fetchLatestOffset([主题], cb)
,并在消费者选项中进行了一些更改。
请检查这个答案
您必须设置消费者偏移量最新的
,这是给定主题的最后一条发布消息。
consumer = new Consumer(client,
[{ topic: topic, offset: latest}],
{
autoCommit: false,
}
删除一个主题的命令如下
kafka-topics.sh --zookeeper <zk>:2181 --delete --topic <topic name>
注意:如果删除,这将不会产生影响。话题enable未设置为true。
向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?
距今已过去数小时,话题仍未删除。 我看到了一些建议,建议我将放在我的中,然后重新启动Kafka。我试过这个。没奏效。 (为什么默认不设置这个?) 我可以关闭kafka和zookeeper,运行,然后再次启动zookeeper和kafka。但这是相当激烈的。确实应该有一些方法来说服实际上删除一个主题?
因此,我是使用Apache Kafka的新手,我正在尝试创建一个简单的应用程序,以便我可以尝试更好地理解API。我知道这个问题在这里被问了很多,但是我该如何清除存储在主题上的消息/记录? 我看到的大部分回答都是说更改消息保留时间或者删除
我想删除所有空的Kafka主题(定期从cron)。我在文档中找不到一个这样做的命令?转到脚本: 但是,这包括已经过期的消息?在不使用消费者的情况下,如何在主题中找到实际的当前计数?
我们执行以下步骤以删除主题-hgpo.llo.prmt.processed 但即使在12小时后,主题文件夹仍未从/var/kafka/kafka-logs中删除 注意-我们set-delete.topic.enable=true 在/var/kafka/kafka-logs下,我们有许多主题文件夹,如: ..
我正在寻找一种从Kafka主题中删除(完全删除)已消费记录的方法。我知道有几种方法可以做到这一点,例如更改主题的保留时间或删除Kafka logs文件夹。但我要寻找的是一种使用Java API删除某个主题的一定数量记录的方法,如果可能的话。 我试过测试AdminClient API,特别是AdminClient。deleteRecords(recordsToDelete)方法。但如果我没弄错的话,