Setting offset for partition MY_TOPIC-0 to the committed offset FetchPosition{offset=1076, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=broker.test.com:6667 (id: 1003 rack: /default-rack), epoch=2}}
public interface EventConsumer {
@Input("my-group-id")
SubscribableChannel consumeMessage();
}
侦听器类
@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(EventConsumer.class)
public class EventListener {
@StreamListener(target = "my-group-id")
public void processMessage(Object msg) {
log.info("*** MESSAGE: ***", msg);
**do something**
**save messages**
}
}
在读取日志时,它甚至不会进入我为它放置记录器的侦听器类。对此有什么想法吗?
您没有指定从哪个主题消费
@KafkaListener(topics = "MY_TOPIC", groupId = "foo")
public void listenToYourTopic(String message) {
System.out.println("Received Message in group foo from topic: " + message);
}
当需要为给定主题指定分区时:
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
很好的参考:https://www.baeldung.com/spring-kafka
我正在使用spring-cloud-kafka绑定器将数据读取到KStream。在阅读其中一个主题的数据时,我需要从头开始阅读。 我尝试设置kafka偏移重置和启动偏移属性。但是,找不到任何参考。 你能帮我提供任何示例application.yaml来重置偏移量吗?这样我就可以从一开始就使用主题中的消息。 添加我使用过的应用程序:
问题内容: 我知道相反。给定一个时区,我可以通过以下代码片段获取时区偏移量: 我想知道如何从时区偏移量获取时区名称。 鉴于 (以毫秒为单位; +6.00偏移) 我想得到以下任何可能的时区名称的结果: 问题答案: 用
我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如
问题内容: 有没有一种方法可以获取,样式名称,甚至可以将插入时我给文本的样式在某个位置上甚至与之进行比较?因为我的目的,我创建的自定义,和。因此,我可以选择用于表示常规字母,并用于表示数字的另一种样式。我还具有切换按钮,该按钮在切换时设置为以不同的方式设置数字格式,而在未切换时不定期设置数字格式,因此最后您无法仅根据方法区分哪些数字受到了影响。因此,唯一的方法是比较具有常规和特殊数字样式作为常量的
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步