我试图使用kafka-node从kafka主题读取压缩消息。
问题是,最近插入的消息留在EOL上方,在插入其他消息之前无法访问。实际上,EOL和高水位偏移之间存在间隙,这会阻止读取最新消息。原因尚不清楚。
已使用创建主题
kafka-topics.sh --zookeeper ${KAFKA_HOST}:2181 --create --topic atopic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0" --partitions 1 --replication-factor 1
主题中产生了许多关键值。有些钥匙是一样的。
var client = new kafka.KafkaClient({kafkaHost: "<host:port>",autoConnect: true})
var producer = new HighLevelProducer(client);
producer.send(payload, function(error, result) {
debug('Sent payload to Kafka: ', payload);
if (error) {
console.error(error);
} else {
res(true)
}
client.close()
});
});
这是插入的键和值
key - 1
key2 - 1
key3 - 1
key - 2
key2 - 2
key3 - 2
key1 - 3
key - 3
key2 - 3
key3 - 3
然后请求主题键集。
var options = {
id: 'consumer1',
kafkaHost: "<host:port>",
groupId: "consumergroup1",
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest'
};
var consumerGroup = new ConsumerGroup(options, topic);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);
consumerGroup.on('done', function(message) {
consumerGroup.close(true,function(){ });
})
function onError (error) {
console.error(error);
}
function onMessage (message) {)
console.log('%s read msg Topic="%s" Partition=%s Offset=%d HW=%d', this.client.clientId, message.topic, message.partition, message.offset, message.highWaterOffset, message.value);
}
})
consumer1 read msg Topic="atopic" Partition=0 Offset=4 highWaterOffset=10 Key=key2 value={"name":"key2","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=5 highWaterOffset=10 Key=key3 value={"name":"key3","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=6 highWaterOffset=10 Key=key1 value={"name":"key1","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=7 highWaterOffset=10 Key=key value={"name":"key","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
有一个高水位偏移量,表示最新的值10。然而,消费者看到的偏移值只有7。不知何故,压缩阻止了消费者看到最新消息。
目前尚不清楚如何避免这种限制并允许消费者查看最新消息。
感谢您的建议。谢谢
在与Kafka进行了更多的合作之后,Kafka节点api似乎具有以下行为(我认为这实际上源于Kafka本身)。
当在高水位关闭之前查询消息时,只有高水位关闭之前的消息才会返回给消费者组。如果消息没有被复制,这是有意义的,因为组中的另一个消费者不一定会看到这些消息。
仍然可以使用消费者而不是消费者组并通过查询特定分区来请求和接收高WaterOffset之外的消息。
此外,当偏移量不一定在latestOffset时,似乎会触发“done”事件。在这种情况下,有必要在邮件中提交进一步的查询。偏移量1。如果你继续这样做,你可以得到所有消息的最新补偿。
我不清楚为什么Kafka会有这种行为,但可能有一些较低层次的细节揭示了这种紧急行为。
不知何故,压缩会阻止消费者看到最新的消息。
是的,你错过了一些信息,但你也看到了其他信息。
压缩是删除早期关键帧。
请注意,根本没有url-1值
Key=key2 value={"name":"key2","url":"2"}
Key=key3 value={"name":"key3","url":"2"}
Key=key1 value={"name":"key1","url":"3"}
Key=key value={"name":"key","url":"3"}
这是因为您为同一键发送了新值。
你发送了10条消息,所以这个主题的高潮偏移量是10
你的代码看起来不一定是错的,但你应该还有两个3值。打印的偏移量对应于此逻辑。
key - 1 | 0
key2 - 1 | 1
key3 - 1 | 2
key - 2 | 3
key2 - 2 | 4
key3 - 2 | 5
key1 - 3 | 6
key - 3 | 7
key2 - 3 | 8
key3 - 3 | 9
一般来说,我建议不要让Kafka尝试压缩主题,每秒编写10倍的日志段,也不要使用不同的库,例如节点rdkafka
我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从
我收到了一个数据库更改流,这些更改最终形成了一个压缩的主题。流基本上是键/值对,并且键空间很大(~4 GB)。 这个主题由一个kafka流进程使用,该进程将数据存储在RockDB中(每个消费者/碎片单独使用)。处理器做两件不同的事情: 将数据连接到另一个流中。 检查来自主题的邮件是新密钥还是对现有密钥的更新。如果是更新,则将旧的键/值和新的键/值对发送到不同的主题(更新很少)。 null
我只是Kafka的新手,我有个问题: 我在Kafka中有一个主题“A”,我启动Spring boot应用程序并使用MessageChannel向主题“A”发送一些消息,然后我停止应用程序。 当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(并非所有消息)?我搜索了所有的解决方案,但它们对我帮助不大,如果我只发送新消息,它总是会立即收到消息。如果你有可运行的代码,请分享,我非常感谢:
我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话
只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)
当我向Kafka主题发送消息时,我可能会收到一条比其他消息大得多的消息。 因此需要在单消息级进行压缩。根据https://cwiki.apache.org/confluence/display/kafka/compression, 一组消息可以被压缩并表示为一个压缩消息。 同样,根据https://github.com/apache/kafka/blob/0.10.1/clients/src/ma