@EnableKafka
@Configuration
public class ListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean("kafkaListenerContainerTopic1Factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.setBatchListener(true);
return factory;
}
@Bean("kafkaListenerContainerTopic2Factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
}
Listner代码
@Service
public class Listener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received first='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
public void receiveRel(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received second='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@EventListener()
public void eventHandler(ListenerContainerIdleEvent event) {
LOG.info("Inside event");
this.registry.getListenerContainer("second-listener").start();
}
请帮助我解决问题,因为这样的循环每天都会发生。完全读取topic1消息,然后从Topic2读取消息。
您已经在使用一个空闲的事件侦听器来启动第二个侦听器--它也应该停止第一个侦听器。
当第二个监听器空闲时;别闹了。
您应该检查事件用于哪个容器,以决定停止和/或启动哪个容器。
我正在使用Kafka Consumer阅读多个主题,我需要其中一个具有更高优先级。处理需要很多时间,而且(低优先级)主题中总是有很多消息,但我需要尽快处理来自另一个主题的消息。 这和Kafka是否支持主题或消息的优先级类似?但这一个使用的是旧的API。 在新的API(0.10.1.1)中,有一些方法 但我不清楚,如何有效地检测高优先级主题中有新消息,有必要暂停其他主题的消费。 有什么想法/例子吗?
我正在研究Kafka是否支持任何要处理的队列或消息的优先级。 看来它不支持任何这样的事情。我在谷歌上找到了这个也支持这一点的邮件存档:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOEJIJHVHSR=d6astihpsqwvg6vk5xylam6ymdcd6uauoxf-dq@mai
问题内容: 默认情况下,优先级队列的元素如何按照自然顺序排序,因为它没有实现可比的接口。 从文档中可以看出,元素是根据自然顺序进行排序的,但是我找不到任何关于equals方法或可比性的东西。它在内部如何发生? 所有实现的接口:可序列化,可迭代,集合,队列。 如果它实现可比性,那么为什么不在上一行说 例: 第三条打印语句也将打印[1、3、2、4],而不是打印[1、2、3、4]。为什么?应该自然排序吧
我有一个,名为,其中包含类型的对象。 您可以在所有车辆上调用该方法。 我要做的是排序,这样车辆被赋予更高的优先级,并被放在队列的前面。 我假设我必须在这里使用一个比较器,但不知道怎么做。
非常感谢您抽出时间!
假设我有三个指数:城市、博物馆和景点。 现在我正在查询一个术语的所有索引(),例如“维也纳” 作为结果,我得到: 维也纳:维也纳艺术博物馆 有没有办法优先考虑指数,这样我就可以得到第一个城市,而不是景点,最后是博物馆,就像这样: 维也纳 维也纳的Riesenrad 维也纳:维也纳艺术博物馆 维也纳:维也纳历史博物馆