我想通过属性为Spring Cloud Stream ;在Kafka consumer上设置一个自动偏移提交间隔。
正如我从度量中看到的,默认情况下,Spring Cloud Stream ;Kafka对每个消耗的消息提交偏移量。对于高负载的主题(例如,如果流量为每秒10K消息),它会变得非常明显,并增加Kafka broker的负载。
我们以以下方式声明消费者:
@Bean
public Consumer<TestEvent> testEvents() {
…
}
我尝试了几个选择,但都没有帮助我。
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
consumer-properties:
auto.offset.reset: latest
auto.commit.interval.ms: 2000
enable.auto.commit: true
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
{
"status": "DOWN",
"components": {
"binders": {
"status": "DOWN",
"components": {
"kafka": {
"status": "DOWN",
"details": {
"error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
}
}
}
}, …
据我所知,enable.auto.commit:true
不建议用于Spring Cloud Stream。
备选方案2
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
ack-mode: TIME
ack-time: 2000
但是,这样的配置属性ack-mode:time
并没有帮助,因为我们看到每个消耗的消息都有偏移量提交。
我使用maven dependencyspring-cloud-starter-stream-kafka
version3.0.12.release
首先,ack-mode
是一个特定于Kafka的使用者绑定属性,您在公共属性中有它。
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
consumer:
ack-mode: TIME
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
其次,没有ACK-TIME
属性,您必须通过容器定制器在容器上设置它:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, dest, group) -> {
container.getContainerProperties().setAckTime(2000L);
container.getContainerProperties().setLogContainerConfig(true);
};
}
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
问题内容: 使用javascript,我知道我的用户时区为UTC +3。 现在,我想用此知识创建DateTime对象: 我得到的回应是: 我究竟做错了什么?我该如何解决? 问题答案: 这个怎么样…
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?
问题内容: 我想运行一组通用的食谱,这些食谱将在我们的实验室中供用户选择我们产品的版本,并希望使用jenkins来完成。Jenkins作业在节点引导期间传递此信息的最佳方法是什么?是否通过- j选项,例如-j’{“ load_version”:“ $ LOAD_VERSION”}’,其中$ LOAD_VERSION是Jenins作业的参数,有几种选择? 问题答案: 是的,可以让您为食谱的第一个Ch