我有一个用例,我需要使用Kafka进行批处理。假设在1分钟内有大约100个请求,我不想立即发布每个请求,而是想将所有100个请求分批发布到topic一次。
但是使用以下配置,批处理不会发生,一旦发送消息,它就会发布到主题并同时在消费者中接收
生产者配置
public class KafkaProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, 1);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.LINGER_MS_CONFIG, 60000);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100000);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
消费者配置
public class KafkaConfig {
ConsumerFactory<String, String> kafkaConsumerFactory(Boolean autoCommit) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory(Boolean.TRUE));
factory.setBatchListener(true);
factory.setConcurrency(1);
return factory;
}
}
在这里,我设置了 linger.ms = 60000,根据我的理解,如果 linger.ms 设置为某个值,那么即使发送方线程更早变得可用并且未达到批大小,生产者也将至少等待该时间。
但是在我的情况下,一旦发送消息,它就会发布到主题,而无需等待60000毫秒或批大小达到正在设置的值
生产者
@Autowired
private KafkaTemplate<String, String> kafka;
kafka.send("batch-test", message);
消费者
@KafkaListener(id = "testGroup", topics = {"batch-test"})
public void test(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
System.out.println(messages.get(i) + partitions.get(i) + "-" + offsets.get(i) + "");
}
}
这里的小更新我检查了Spring kakfa日志,发现ProducerConfig值设置不正确,它们正在退回到默认值。
roducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
为什么 linger.ms,而 batch.size 会回退到默认值?
就我的理解而言,BATCH_SIZE_CONFIG (batch.size)将与LINGER_MS_CONFIG (linger.ms)一起完美地工作,只有当多个记录打算发送到同一个分区,而不是跨多个分区时,这意味着,您应该有一些键控机制或更好的分区策略来实现批处理,同时将事件生成到主题中。
来自文档:
当多条记录发送到同一个分区时,生产者将把它们批处理在一起。此参数控制每个批次将使用的字节内存量(不是消息!)。当批次已满时,批次中的所有消息都将被发送。然而,这并不意味着生产者将等待批次已满。生产者将发送半满批次,甚至批次中只有一条单页消息。因此,将批次大小设置得太大不会导致发送消息的延迟;它只会为批次使用更多内存。将批次设置得太小会增加一些开销,因为生产者需要更频繁地发送mesages。
linger.ms 控制在发送当前批处理之前等待其他消息的时间量。KafkaProducer 会在当前租金批次已满或达到 linger.ms 限制时发送一批消息。默认情况下,只要有发送者html" target="_blank">线程可以发送消息,即使批处理中只有一条消息,创建者也会发送消息。通过将 linger.ms 设置为大于 0,我们指示生产者等待几毫秒以向批处理添加其他消息,然后再将其发送到代理。这会增加延迟,但也会增加吞吐量(因为我们一次发送更多消息,因此每条消息的开销更少)。
找到了解决方案,linger。ms
没有造成任何影响,因为没有从代码中设置该值(仍不知道原因)。因此,仍然存在。ms
的默认值为0,因为没有进行批处理。
稍后,我设置了两个linger的值。ms
和批处理。应用程序中的size
。属性,然后它工作。
spring.kafka.producer.batch-size=1000000
spring.kafka.producer.properties.linger.ms=10000
我正试图将一个阻塞消费者集成为Reactor铝-SR1中的助焊剂订户。我想使用一个并行调度器,并发地执行阻塞操作。 我实现了一个主类来描述我意图:
我在使用R的group_by和SUMMARY函数时遇到了一些问题,我想知道你们是否可以帮我一些忙。我有一张类似的表格: 我试图使用dplyr的group_by和SUMMARY来找到频率列的平均值。下面是我的示例代码: 我所期望的是,一个表格被吐出来,分解按单个类别分组的平均频率,如下所示: 但是,我收到的是一个按类别分组的表,每个类别接收整个表的平均值,如下所示: 有什么线索吗?我应该说我是初学者
以下switch语句具有奇怪的行为: 我认为,当cat命令在被执行时失败时,“after cat”将被写入,而||之后的部分将被执行。但是,当我查看输出时,似乎在回显“后猫”后会发生中断,因此实际状态不会改变,将再次进入。然后stty也会失败(因为串行适配器丢失)。之后,cat命令againt在开始时失败,但现在进入“catch”块。。。。 下面是相关的输出: 我做错了什么?
我已经配置了log4j2.xml文件,application.log文件将被创建,它应该每天翻转。 但是在JVM中,applicatoin.log文件在10MB之后会翻转,如果翻转三次,第一个文件会被覆盖。也就是说我随时都application.logapplication-2020-10-16.log.zip. 为什么log4j2(v2.13)即使配置为每日,也会每10MB滚动一次文件?任何在l
我目前正在使用Cucumber和Java开发一个基于Selenium的BDD测试自动化框架。我的框架使用Junit,由于Junit不支持软断言,所以我尝试在测试中使用AssertJ断言。然而,这些断言似乎不起作用。让我试着借助下面的代码来解释这一点:
现在,在我的drools项目中,我在单独的DRL文件中有两组规则,它们由议程组分割。对于议程组“preCheck”,我将该议程组中的每个规则的自动聚焦设置为true。例子: 对于另一个议程组-“default规则”-规则没有设置自动焦点属性。示例: 在通过RESTAPI调用规则时,我还试图通过JSON负载将焦点设置为“preCheck”议程组。例子: 然而,在执行规则时,似乎首先要评估“defau