因此,在下面的配置中,当我们将Spring Boot容器扩展到10个JVM时,事件的数量随机多于发布的数量,例如,如果发布了320000条消息,那么事件有时会达到320500条等等。。
//Consumer container bean
private static final int CONCURRENCY = 1;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
props.put("enable.auto.commit", "false");
//props.put("isolation.level", "read_committed");
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
factory.getContainerProperties().setPollTimeout(3000);
factory.setConcurrency(CONCURRENCY);
return factory;
}
//Listener
@KafkaListener(id="claimserror",topics = "${kafka.topic.dataintakeclaimsdqerrors}",groupId = "topic1", containerFactory = "kafkaListenerContainerFactory")
public void receiveClaimErrors(String event,Acknowledgment ack) throws JsonProcessingException {
//save event to table ..
}
更新以下更改现在似乎工作正常,我将在消费者中添加重复检查,以防止消费者失败的情况
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "-1");
//props.put("isolation.level", "read_committed");
return props;
}
这种方法对我有用。
您必须按如下方式配置KafkaListenerContainerFactory:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
并像这样使用ConnettMessageListenerContainer
:
@Bean
public IntegrationFlow inboundFlow() {
final ContainerProperties containerProps = new ContainerProperties(PartitionConfig.TOPIC);
containerProps.setGroupId(GROUP_ID);
ConcurrentMessageListenerContainer concurrentListener = new ConcurrentMessageListenerContainer(kafkaFactory, containerProps);
concurrentListener.setConcurrency(10);
final KafkaMessageDrivenChannelAdapter kafkaMessageChannel = new KafkaMessageDrivenChannelAdapter(concurrentListener);
return IntegrationFlows
.from(kafkaMessageChannel)
.channel(requestsIn())
.get();
}
如需有关how-does-kafka-Guarrance-consumers-doesnt-read-a-single-message-Two遍和文档ConcurrentMessageListenerContainer的更多信息,请参阅此文档
您可以尝试将ENABLE\u idemponence\u CONFIG
设置为true,这将有助于确保生产者在流中只写入每条消息的一个副本。
我试图模拟流流量使用KafkaStorm。我使用KafkaSpout阅读来自一个主题的消息,该主题由一个生产者发送,该生产者阅读这些推文并将其发送到一个主题。我的问题是,在拓扑消耗了本主题中的所有推文发送后,它会继续阅读本主题中的消息两次。如何阻止KafkaSpout阅读两次?复制因子设置为1)
Kafka如何保证消费者不会将一条信息读两遍? 或者上述情况是否可能?同一条信息可以被单个消费者或多个消费者阅读两次吗?
斯坦纳树问题是组合优化问题,与最小生成树相似,是最短网络的一种。最小生成树是在给定的点集和边中寻求最短网络使所有点连通。而最小斯坦纳树允许在给定点外增加额外的点,使生成的最短网络开销最小。 1. 什么是斯坦纳树? 斯坦纳树问题是组合优化学科中的一个问题。将指定点集合中的所有点连通,且边权总和最小的生成树称为最小斯坦纳树(Minimal Steiner Tree),其实最小生成树是最小斯坦纳树的一种
我尝试使用Jenkins上的java项目运行声纳分析,使用标准的maven sonar:sonar goal,并使用post STEP>>Execute SonarQube Scanner。我使用的maven目标是。 对于SonarQube扫描仪,我使用了下面的maven目标 SonarQube版本-7.7声纳扫描仪版本-3.3.0 Jenkins版本-2.164.3 Maven版本-3.6.1
我们能比较詹金斯和声纳吗?如果是,怎么做。我想知道詹金斯和声纳的优缺点。比如为什么要使用声纳,它比詹金斯有什么优势,反之亦然?
我的JUnit测试覆盖范围在jenkins上构建时不会传播到声纳。声纳上的“单元测试覆盖范围”字段保持空白,但“单元测试成功”字段显示正确的值。我正在使用jacoco进行测试覆盖。在jenkins上,jacoco的报告运行良好,并在生成的html中显示了正确的覆盖率。我就是不能把它送到声纳上。 使用jenkins的Jacoco插件,我使用以下参数调用独立的声纳分析。 路径和蚁任务是正确的。也许我错