我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息?
这是我的课;
@Slf4j
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
@Autowired
private SimpMessagingTemplate webSocket;
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private BrokerProducerService brokerProducerService;
@Autowired
private GlobalConfig globalConfig;
@Override
@KafkaListener(id = "snapshotOfOutagesId", topics = Constants.KAFKA_TOPIC, groupId = "snapshotOfOutages", autoStartup = "false")
public void consumeToSnapshot(ConsumerRecord<String, OutageDTO> cr, @Payload String content) {
log.info("Received content from Kafka notification to notification-snapshot topic: {}", content);
MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
JSONObject jsonObject= new JSONObject(content);
Map<String, Object> outageMap = jsonToMap(jsonObject);
brokerProducerService.sendMessage(globalConfig.getTopicProperties().getSnapshotTopicName(),
outageMap.get("outageId").toString(), toJson(outageMap));
listenerContainer.stop();
}
@Scheduled(initialDelayString = "${scheduler.kafka.snapshot.monitoring}",fixedRateString = "${scheduler.kafka.snapshot.monitoring}")
private void consumeWithScheduler() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
if (listenerContainer != null){
listenerContainer.start();
}
}
这是我在应用程序中的Kafka属性。yml;
kafka:
streams:
common:
configs:
"[bootstrap.servers]": 192.168.99.100:9092
"[client.id]": event
"[producer.id]": event-producer
"[max.poll.interval.ms]": 300000
"[group.max.session.timeout.ms]": 300000
"[session.timeout.ms]": 200000
"[auto.commit.interval.ms]": 1000
"[auto.offset.reset]": latest
"[group.id]": event-consumer-group
"[max.poll.records]": 1
还有我的KafkaConfiguration课程;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(globalConfig.getBrokerProperties().getConfigs());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
你目前正在做的是:
autoStartup=false
)所以你描述的行为并不令人惊讶。
@KafkaListener
不需要预定任务即可开始使用消息。我认为您可以删除autoStartup=false
并删除计划的作业,之后监听器将逐个消耗主题上的所有消息,并等待新消息出现在主题上。
另外,我注意到了一些其他的事情:
这些属性适用于Kafka流,对于常规SpringKafka,您需要如下属性:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
...etc
还有:为什么使用@Payload String Content
而不是已经序列化的cr.getVaue()
?
我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。
我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测
我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到
我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。
我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行
有人能帮我弄清楚这件事吗。 谢了!