@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
// Transform events and store them in MongoDB using
// spring-boot-data-mongodb-reactive
...
}
假设我希望实现与在spring-amqp函数周围使用@transaction
时基本相同的功能:
这里有几个问题。
@StreamListener
方法只调用一次,只是为了设置flux
,所以该方法的@transactional
是没有意义的-消息会流经flux,因此与单个消息有关的任何事情都必须在flux的上下文中完成。是的,你需要使用手动工具;大概是基于mongodb存储操作的结果。您可能需要使用flux
,以便能够访问通道和传递标记头。
我的使用者绑定到匿名使用者组,而不是我指定的使用者组。 我的春靴应用 我的输入输出通道接口 我的控制台日志-- :在3.233秒内启动ConsumerApplication(JVM运行于4.004):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]发现组协调器Singh:9092(ID:2
我有以下Spring Cloud Stream Kafka Streams Binder 3. x应用程序: 当我通过这个应用程序运行X消息时,通过使用和从联调将它们发布到,点1和点2的消息计数是相等的,正如我所期望的那样。 当我使用连接到Kafka代理的实时应用程序做同样的事情时,点1和点2的计数仍然显着不同: 消费者在< code >主题2上有很大的滞后,并且该滞后保持不变(在我停止发布消息后
我使用RabbitMQ网络创建了一个主题交换UITX并绑定到交换两个队列TX. Q1和TX. Q2,每个队列都相应地绑定了路由密钥rk1和rk2,并向交换产生了少量消息。 现在,我想使用Spring Cloud Stream创建一个消费者,它只从Q1获取消息。我尝试使用配置: 以及使用消息的方法的注释。 因此,我可以看到使用者创建了一个同名TX.Q1的队列(或绑定),但新队列/绑定的路由键是# 如
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我是Spring云流的新手。我正在使用兔子MQ粘合剂。问题是当我在主类中使用单个 bean 启动应用程序时,它就成功启动了。但是,如果我在主类中注册了 1 个以上的 bean,则应用程序无法从以下日志启动。 application.yml文件
是否可以使用函数()样式,使用多个独立的函数/绑定来实现反应性SCS应用程序?我发现的所有示例总是只注册一个具有默认绑定的函数bean。我想注册多个,每个都有自己的绑定。 传统上,这可以使用来完成,但现在不推荐使用函数支持。