当前位置: 首页 > 知识库问答 >
问题:

Spring云流数据库事务不回滚

郜德容
2023-03-14

我试图编写一个spring-cloud-stream函数(spring-starter-parent 2.5.3,java 11,spring-cloud-version 2020.0.3),该函数同时具有Kafka和Postgres事务。每当使用的消息以字符串“fail”开始时,该函数将引发一个模拟错误,我希望这将导致数据库事务回滚,然后导致kafka事务回滚。(我知道Kafka交易不是XA,这很好。)到目前为止,我还没有让数据库事务工作,但kafka事务工作。

目前我使用的是@transactional注释,它似乎不会启动数据库事务。(Kafka绑定器文档建议使用ChainedTransactionManager同步数据库+Kafka事务,但Spring Kafka文档声明不赞成使用@transactional注释,而S.C.S.针对此问题的示例使用@transaction注释和由start-jpa库创建的默认事务管理器(我认为))。我可以在调试器中看到,无论我是否@enableTransactionManagement并在我的使用者上使用@transaction,使用者都是在使用堆栈中更高的事务模板的kafka事务中执行的,但我在任何地方都看不到数据库事务。

我有几个问题想弄明白:

  • 无论我是否有@transactional注释,Kafka侦听器容器都在Kafka事务的上下文中运行我的使用者,这是否正确?如果是这样,有没有办法只在Kafka事务中运行特定的函数?
  • 由于容器没有方法拦截对生产者的调用(据我所知),对于生产者来说,上述情况会改变吗?
  • 如何同步Kafka事务和数据库事务,以便在Kafka提交之前进行DB提交?

我有以下Crud存储库、处理程序集合和application.yml

@Repository
public interface AuditLogRepository extends CrudRepository<AuditLog, Long> {

  /**
   * Create a new audit log entry if and only if another with the same message does not already
   * exist. This is idempotent.
   */
  @Transactional
  @Modifying
  @Query(
      nativeQuery = true,
      value = "insert into audit_log (message) values (?1) on conflict (message) do nothing")
  void createIfNotExists(String message);
}
@Profile("ft")
@Configuration
@EnableTransactionManagement
public class FaultTolerantHandlers {

  private static final Logger LOGGER = LoggerFactory.getLogger(FaultTolerantHandlers.class);

  @Bean
  public NewTopic inputTopic() {
    return TopicBuilder.name("input").partitions(1).replicas(1).build();
  }

  @Bean
  public NewTopic inputDltTopic() {
    return TopicBuilder.name("input.DLT").partitions(1).build();
  }

  @Bean
  public NewTopic leftTopic() {
    return TopicBuilder.name("left").partitions(1).build();
  }

  @Bean
  public NewTopic rightTopic() {
    return TopicBuilder.name("right").partitions(1).build();
  }

  @Bean
  public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
      LOGGER.info("Producing messages to input...");
      template.send("input", "pass-1".getBytes());
      template.send("input", "fail-1".getBytes());
      template.send("input", "pass-2".getBytes());
      template.send("input", "fail-2".getBytes());
      LOGGER.info("Produced input.");
    };
  }

  @Bean
  ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
      BinderFactory binders) {
    return (container, dest, group) -> {
      ProducerFactory<byte[], byte[]> pf =
          ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
              .getTransactionalProducerFactory();
      KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(requireNonNull(pf));
      container.setAfterRollbackProcessor(
          new DefaultAfterRollbackProcessor<>(
              new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L)));
    };
  }

  // Receive messages from `input`.
  // For each input, write an audit log to the database.
  // For each input, produce a message to both `left` and `right` atomically.
  // After three failed attempts to achieve the above, shuffle the message
  // off to `input.DLT` and move on.
  @Bean
  @Transactional
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository
  ) {
    return input -> {
      bridge.send("left", ("left-" + input).getBytes());
      repository.createIfNotExists(input);

      if (input.startsWith("fail")) {
        throw new RuntimeException("Simulated error");
      }

      bridge.send("right", ("right-" + input).getBytes());
    };
  }

  @Bean
  public Consumer<Message<String>> logger() {
    return message -> {
      var receivedTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
      LOGGER.info("Received on topic=" + receivedTopic + " payload=" + message.getPayload());
    };
  }
}
spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: 'tx-'
          required-acks: all
      bindings:
        persistAndSplit-in-0:
          destination: input
          group: input
        logger-in-0:
          destination: left,right,input.DLT
          group: logger
          consumer:
            properties:
              isolation.level: read_committed
    function:
      definition: persistAndSplit;logger

谢谢!

共有1个答案

罗昕
2023-03-14
  @Bean
  @Transactional
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository
  ) {

在本例中,@transactional位于bean定义中(在应用程序初始化期间只执行一次);要获得运行时事务,您需要对lambda中的代码进行这样的注释;比如...

  @Bean
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository,
      TxCode code
  ) {
    return Txcode:run;
  }
@Component
class TxCode {

    @Autowired
    AuditLogRepository repository

    @Autowired
    StreamBridge bridge;

    @Transactional
    void run(String input) {
      bridge.send("left", ("left-" + input).getBytes());
      repository.createIfNotExists(input);

      if (input.startsWith("fail")) {
        throw new RuntimeException("Simulated error");
      }

      bridge.send("right", ("right-" + input).getBytes());
    };
}

(或者你也可以通过桥并回购)。

return str -> code.run(str, repo, bridge);
 类似资料:
  • 我试图了解运行批处理任务时通过Spring Cloud数据流WRT数据源配置的预期行为。 Spring批处理数据库表(Batch\u JOB\u EXECUTION等)是否在SCDF数据库本身中?当通过SCDF启动任务时,似乎发生了一些神奇的事情,它在SCDF数据库中创建了这些表,并似乎在使用它们。它似乎正在将SCDF数据源注入我的应用程序? 我目前在localhost服务器版本2.0.1上运行。

  • 我正在尝试用Java创建一个PoC应用程序,以了解在使用Kafka进行消息发布时如何在Spring Cloud Stream中进行事务管理。我试图模拟的用例是一个接收消息的处理器。然后,它进行一些处理,并生成两条发送到两个不同主题的新消息。我希望能够将这两条消息作为单个事务发布。因此,如果发布第二条消息失败,我希望滚动(而不是提交)第一条消息。SpringCloudStream支持这样的用例吗?

  • 我希望有一个Spring云流侦听器处理有关其中发送的所有消息的完整事务。或者,即使之后有异常,也会提交函数中使用StreamBridge手动发送的所有消息。 这是我的lib版本: 我的Spring云流形态: 我的测试java代码: 要运行的测试类: 我还添加了TransactionManager: 在最后的这个示例中,我的兔子队列中有: 或者我应该只有两条消息test.other.request.

  • 在服务中定义了一个事务性函数,并从一个调度任务调用它,在这个函数中,对同一对象进行了两轮插入,在第二轮插入过程中,由于重复键的存在,会引发异常。我已经将事务注释配置为针对抛出的异常进行回滚,并且打开了log4j springframework日志,从日志中我可以看到事务管理器正在对回滚进行一些操作,例如确定是否对抛出的异常进行回滚,但是我看不到任何正在进行数据库删除操作的日志,并且检查数据库显示回

  • 在spring cloud dataflow中,根据我的理解,每个流都是一个微服务,但数据流服务器不是。我说的对吗?

  • 我正在使用spring数据jpa为我的服务实现多事务(数据库),带有两个持久的单元名。并在Jboss 6.4 EAP中部署 以下是我的服务详情 在Jboss中,我无法部署它,我遇到以下异常: 原因:org.springframework.beans.factory.BeanCreationException:无法自动装配字段:私有com.test.demo.domain.repository.Tb