当前位置: 首页 > 知识库问答 >
问题:

@KafkaListener从特定的kafka主题获取所有消息

华修永
2023-03-14

我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息?

这是我的课;

@Slf4j
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {

    @Autowired
    private SimpMessagingTemplate webSocket;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private BrokerProducerService brokerProducerService;

    @Autowired
    private GlobalConfig globalConfig;

    @Override
    @KafkaListener(id = "snapshotOfOutagesId", topics = Constants.KAFKA_TOPIC, groupId = "snapshotOfOutages", autoStartup = "false")
    public void consumeToSnapshot(ConsumerRecord<String, OutageDTO> cr, @Payload String content) {
        log.info("Received content from Kafka notification to notification-snapshot topic: {}", content);
        MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
        JSONObject jsonObject= new JSONObject(content);
        Map<String, Object> outageMap = jsonToMap(jsonObject);
        brokerProducerService.sendMessage(globalConfig.getTopicProperties().getSnapshotTopicName(),
                outageMap.get("outageId").toString(), toJson(outageMap));
        listenerContainer.stop();
    }

    @Scheduled(initialDelayString = "${scheduler.kafka.snapshot.monitoring}",fixedRateString = "${scheduler.kafka.snapshot.monitoring}")
    private void consumeWithScheduler() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
        if (listenerContainer != null){
            listenerContainer.start();
        }
    }

这是我在应用程序中的Kafka属性。yml;

kafka:
  streams:
    common:
      configs:
        "[bootstrap.servers]": 192.168.99.100:9092
        "[client.id]": event
        "[producer.id]": event-producer
        "[max.poll.interval.ms]": 300000
        "[group.max.session.timeout.ms]": 300000
        "[session.timeout.ms]": 200000
        "[auto.commit.interval.ms]": 1000
        "[auto.offset.reset]": latest
        "[group.id]": event-consumer-group
        "[max.poll.records]": 1

还有我的KafkaConfiguration课程;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(globalConfig.getBrokerProperties().getConfigs());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

共有1个答案

边国安
2023-03-14

你目前正在做的是:

  1. 创建一个侦听器,但不要启动它(autoStartup=false
  2. 当计划的作业启动时,启动容器(将开始使用主题中的第一条消息)
  3. 当第一条消息被消耗时,您停止容器(导致不再消耗任何消息)

所以你描述的行为并不令人惊讶。

@KafkaListener不需要预定任务即可开始使用消息。我认为您可以删除autoStartup=false并删除计划的作业,之后监听器将逐个消耗主题上的所有消息,并等待新消息出现在主题上。

另外,我注意到了一些其他的事情:

这些属性适用于Kafka流,对于常规SpringKafka,您需要如下属性:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      ...etc

还有:为什么使用@Payload String Content而不是已经序列化的cr.getVaue()

 类似资料:
  • 我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。

  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行

  • 有人能帮我弄清楚这件事吗。 谢了!