我想使用spring cloud stream手动提交偏移量-仅当消息处理成功时。这是我的代码应用程序。yml公司
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
---------------------------------------------------------------------------------
spring:
application:
name: springCloud
cloud:
stream:
default-binder: kafka
kafka:
bindings:
myChannel:
consumer:
autoCommitOffset: false
但我的确认对象为空,因为在标头中,对象“kafka\u确认”本身不存在。
使用yaml文件时,请使用属性“自动提交偏移量”。
您使用的是什么版本?
在3.1中,不建议使用autoComentOffset
,而支持设置ackMode
(在本例中为手册
);但是,看起来autoComentOffset
现在被完全忽略而不是不建议使用。
我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放
如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?
我们在Spring Cloud Stream上有一个应用程序,它与Project React集成在一起。我们通过在消息标题中设置spring.cloud.stream.sendto.destination来动态设置目标主题并发布消息。我们正在寻找处理错误的情况下,如Kafka服务器断断续续或主题不可用而发布。我们已经实现了@ServiceActivator来处理所有错误。动态设置主题时,Servi
我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。