当前位置: 首页 > 知识库问答 >
问题:

如何从Spring Cloud Stream Kafka活页夹中的偏移量获得消息?

欧阳鸿哲
2023-03-14
Setting offset for partition MY_TOPIC-0 to the committed offset FetchPosition{offset=1076, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=broker.test.com:6667 (id: 1003 rack: /default-rack), epoch=2}}
public interface EventConsumer {

    @Input("my-group-id")
    SubscribableChannel consumeMessage();

}

侦听器类

@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(EventConsumer.class)
public class EventListener {

     @StreamListener(target = "my-group-id")
     public void processMessage(Object msg) {
         log.info("*** MESSAGE: ***", msg);
         **do something**
         **save messages**
     }
}

在读取日志时,它甚至不会进入我为它放置记录器的侦听器类。对此有什么想法吗?

共有1个答案

范华清
2023-03-14

您没有指定从哪个主题消费

@KafkaListener(topics = "MY_TOPIC", groupId = "foo")
public void listenToYourTopic(String message) {
    System.out.println("Received Message in group foo from topic: " + message);
}

当需要为给定主题指定分区时:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

很好的参考:https://www.baeldung.com/spring-kafka

 类似资料:
  • 我正在使用spring-cloud-kafka绑定器将数据读取到KStream。在阅读其中一个主题的数据时,我需要从头开始阅读。 我尝试设置kafka偏移重置和启动偏移属性。但是,找不到任何参考。 你能帮我提供任何示例application.yaml来重置偏移量吗?这样我就可以从一开始就使用主题中的消息。 添加我使用过的应用程序:

  • 问题内容: 我知道相反。给定一个时区,我可以通过以下代码片段获取时区偏移量: 我想知道如何从时区偏移量获取时区名称。 鉴于 (以毫秒为单位; +6.00偏移) 我想得到以下任何可能的时区名称的结果: 问题答案: 用

  • 我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如

  • 问题内容: 有没有一种方法可以获取,样式名称,甚至可以将插入时我给文本的样式在某个位置上甚至与之进行比较?因为我的目的,我创建的自定义,和。因此,我可以选择用于表示常规字母,并用于表示数字的另一种样式。我还具有切换按钮,该按钮在切换时设置为以不同的方式设置数字格式,而在未切换时不定期设置数字格式,因此最后您无法仅根据方法区分哪些数字受到了影响。因此,唯一的方法是比较具有常规和特殊数字样式作为常量的

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步