我编写了一个Spring-Cloud-Stream应用程序,其中制作人将消息发布到指定的Kafka主题。我的问题是,如何添加生产者回调以接收确认/确认消息已成功发布到主题上?就像我们在《KafkaSpring》中所做的那样。发送(记录,新回调{…}) (维护异步生产者)。下面是我的代码: 如何确保tryEmitNext已成功写入主题? 实施ProducerListener是否是一种解决方案,是否可
我们正在为Kafka使用Spring Cloud Stream,并寻找精确的一次性语义。我们有一个解决方案,正如预期的那样运行良好1)启用幂等元 但现在我们发现有一个属性(producer.sendfoffsetstotransaction)Kafka API,它可以帮助我们修复消费者端的重复处理,而无需任何元数据存储逻辑。现在,我不确定如何使用具有此属性的spring cloud stream实
我有一个场景,我看到了不同的行为。共有3种不同的服务 第一个服务将从Solace队列监听并将其生成到kafka topic-1(启用事务的地方) 第二服务将从上面的kafka主题-1监听并将其写入另一个kafka主题-2(我们没有手动提交,启用事务以生成到其他主题,自动提交偏移为false 现在,在我在第二个服务中启用事务和隔离级别后,问题是我无法读取任何消息,如果我在第二个服务中禁用了事务,我就
因此,我认为我自己陷入了困惑,因为我知道SpringCloudStreams有两种不同的Kafka活页夹: 《春云流》Kafka活页夹 我正在寻找正确的YAML设置,以便在spring cloud streams的常规kafka活页夹中定义序列化器和反序列化器: 我可以使用以下逻辑调整默认值: 即: 我想我应该能够在每个主题的基础上做到这一点: 我遇到了设置: 但这似乎不起作用。我可以设置一个简单
如何对反应流管道进行错误处理。喜欢 应用程序错误处理(例如:errorChannel) 系统错误处理(使用DLQ、再处理等) 当前文档仅描述了非反应性管道的错误处理。https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_hand
我希望有一个Spring云流侦听器处理有关其中发送的所有消息的完整事务。或者,即使之后有异常,也会提交函数中使用StreamBridge手动发送的所有消息。 这是我的lib版本: 我的Spring云流形态: 我的测试java代码: 要运行的测试类: 我还添加了TransactionManager: 在最后的这个示例中,我的兔子队列中有: 或者我应该只有两条消息test.other.request.
对于已由侦听器容器启动的事务,我们需要为所有应用实例设置相同的事务id前缀。对于仅生产事务,我们需要为每个实例设置不同的值。 我在应用程序中使用了Spring Cloud Stream Kafka活页夹,它既有事务类型,也有属性Spring。云流动Kafka。粘合剂交易事务id前缀用于创建公共事务管理器。 我想知道如何使这一切正常工作,因为似乎你不能同时拥有这两种方式。
我见过几个这样的例子: 使用流的目的是什么,它是否能提高传输效率? 理论上-是的-我显然不想流式传输我的文件传输,但这就是在连接上发生的。。。那么,这个关键字的实际好处是什么呢?它是否强制执行某种形式的特殊缓冲以减少一些开销?无论哪种方式,数据都在完整传输!
我想使用输出通道在仅生产者(不在读写过程中)事务中向Kafka主题发送一些东西。我在StackOverflow(生产者端的Spring Cloud stream kafka事务)上阅读了留档和另一个主题。 问题是我需要为每个节点设置唯一的transactionIdPrefix。有什么建议吗?
Spring Kafka和Spring Cloud Stream允许我们创建事务生产者和处理器。我们可以在其中一个示例项目中看到该功能的实际应用:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples: 在这个摘录中,有来自Kafka主题的读取、数据库中的写入
问题与https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455但对于功能性方法 当产生适当的“Foo”(json)时,会调用fo消费者。但是当产生“null”有效负载/墓碑时,永远不会调用消费者函数。 我尝试使用自定义“Convertor”方法的变通方法,该方法正在发挥作用: 然后,食品消费者可以检查有效
我正在使用com的Spring Cloud Streams。蔚蓝色的spring:azure Spring-Cloud-Stream绑定器eventhubs:2.8.0,我正在使用供应商。 它适用于Spring boot 2.3.12。发布。 但是,如果我去Spring启动版本 有什么想法如何使用更高版本的Spring靴吗? 如下所述,对于当前最新的spring boot 2.4.5,org需要2
目的: > 我想测试文档https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_spring_cloud_stream_sendto_destination上的spring.cloud.stream.sendto.destination功能 然后我不想用函数Bean
在本页中,您不能只进行一次输出 但我只需要使用Spring Cloud Stream Kafka活页夹进行一次输出 那我该怎么办? 一些文章说使用org.springframework.cloud.stream.function.StreamBridge但它不适合我 我让StreamBridge向Kafka发送主题,但Kafka不会为我的Spring Boot应用程序生成主题 这是我的applic
我正在尝试迁移到Spring Cloud Stream的新函数式编程模型,替换如下的条件StreamListener注释 用类似的东西 具有将通道链接到主题的关联属性 我需要这种间接级别,因为有多种Avro消息类型都到达MyTopic,需要反序列化并以不同的方式路由 这一切都很顺利,我可以按预期使用和路由消息。然而,以这种方式使用functionRouter会产生意想不到的副作用,即当没有可用的主