当前位置: 首页 > 知识库问答 >
问题:

Spring云流:如何将@Transactional与新的消费者函数式编程模型一起使用

蔚宏大
2023-03-14

我有一个StreamListener,我想用新的功能模型和消费者替换它

@Transactional
@StreamListener(PaymentChannels.PENDING_PAYMENTS_INPUT)
public void executePayments(PendingPaymentEvent event) throws Exception {

    paymentsService.triggerInvoicePayment(event.getInvoiceId());
}

我厌倦了某些事情。下面是示例代码。我将日志消息添加到另一个队列中进行测试。然后我抛出一个异常以触发回滚。不幸的是,消息是排队的,即使它们在方法完成之前不在那里(我使用制动点对此进行了测试)。尽管有错误,但事务似乎是自动提交的。

@Transactional
@RequiredArgsConstructor
@Component
public class functionalPayment implements Consumer<PendingPaymentEvent> {
    private final PaymentsService paymentsService;
    private final StreamBridge streamBridge;

    public void accept(PendingPaymentEvent event) {
        paymentsService.triggerInvoicePayment(event.getInvoiceId());

        streamBridge.send("log-out-0",event);
        throw new RuntimeException("Test exception to rollback message from log-out-0");
    }
}

配置:

spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.bind-queue=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.transacted=true

spring.cloud.stream.source=log

spring.cloud.stream.bindings.log-out-0.content-type=application/json
spring.cloud.stream.bindings.log-out-0.destination=log_a
spring.cloud.stream.bindings.log-out-0.group=log_a
spring.cloud.stream.rabbit.bindings.log-out-0.producer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.bind-queue=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.binding-routing-key=log
spring.cloud.stream.rabbit.bindings.log-out-0.producer.transacted=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.log-out-0.producer.routing-key-expression='log'

共有1个答案

杜辰龙
2023-03-14

你有没有尝试过

@Transactional
public class ExecutePaymentConsumer implements Consumer<PendingPaymentEvent> {
   public void accept(PendingPaymentEvent event) {
       paymentsService.triggerInvoicePayment(event.getInvoiceId());
   }
}
. . .
@Bean
public ExecutePaymentConsumer executePayments() {
    return new ExecutePaymentConsumer();
}

 类似资料:
  • 我有一个基于Webflux的微服务,它有一个简单的反应存储库: 而且在这个演示视频中,他们提到他们使用project Reactor有反应性编程支持。所以我想有一种方法我只是不知道。你能教我怎么做对吗? 如果这一切听起来太愚蠢,我很抱歉,但我对Spring、Cloud、Stream和reactive编程非常陌生,还没有找到很多描述这方面的文章。

  • Spring的云流是否也支持Kafka式的动觉再平衡?最近有人promise要解决这个问题https://github.com/spring-projects/spring-integration-aws/issues/99 谢谢

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 我有一个基于服务的应用程序,它使用Amazon SQS,具有多个队列和多个消费者。我这样做是为了实现一个基于事件的架构,并解耦所有服务,其中不同的服务对其他系统状态的变化做出反应。例如: 注册服务: 当新用户注册时,发出事件“registration new” 在用户更新时发出事件'user-更新'。 从队列“registration new”(注册新)中读取,并为搜索中的用户编制索引 从“注册-

  • 我目前正在使用带有的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。 我有: 生产者将消息发布到订阅频道 在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次: 一次在生产者端-消息发布到流时 然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。 是否有任何方法可以仅在一侧(例如消费者侧)截获SC

  • 在Martin Fowler的书中,我读到了和模式。 作者提到,将identityMap放在UnitOfWork内部是一个好主意。但怎么做呢? 据我所知,受会话限制,但作者没有提到 每个unitOfWork实例需要多少个IdentityMap实例? 如果我们有两个并发请求呢?