当前位置: 首页 > 知识库问答 >
问题:

如何通过由REST api触发的Spring云流优雅地处理Kafka发布失败

澹台华晖
2023-03-14

我有一个场景,必须通过服务层中RESTAPI的触发器发布多条消息。

此服务还具有db操作(插入、更新)。这里的要求是,如果存在导致publisher无法发布的问题的网络问题,或者当kafka broker不可用时,我们希望回滚包括数据库在内的所有内容,并通过API响应通知用户它失败。

问题1:是否可以通过启用Kafkahtml" target="_blank">事务性来实现需求?

据我所知,默认情况下,同一事务中的数据库将首先提交,然后由Kafka提交,但我不确定的是,如果Kafka没有提交,我们将完全丢失消息,数据库也不会回滚。

我使用以下设置对一个简单的spring boot项目进行了一些测试:

  1. Spring启动版本:2.5.2
  2. Spring云版本:2020.0.3
  3. 使用org.springframework.cloud.stream.function.StreamBridge

我无法再现丢失消息的场景。我所尝试的是在事务方法的末尾放置一个延迟,在事务开始提交和关闭kafka broker之前放置一个暂停(在本地环境中),我可以看到发布者将尝试联系kafka,最终会失败,超时导致整个事务(包括db)回滚。

    @Transactional
    public PurchaseOrder createOrder(PurchaseOrder purchaseOrder) throws InterruptedException {
        log.info("Starting create order");

        var result = purchaseOrderRepository.save(purchaseOrder);
        log.info("purchase order is created: {}", result.getId());

        var kafkaResult = streamBridge.send("toStream-out-0", purchaseOrder);
        log.info("Kafka send result {}", kafkaResult);

        Thread.sleep(10000);
        return result;
    }

这是发布服务器端的超时<代码>组织。阿帕奇。Kafka。常见的错误。TimeoutException:等待EndTxn(true)时,超时在60000毫秒后过期

问题2:如果kafka transactional无法实现此要求,是否有更好的方法?

共有1个答案

单于钊
2023-03-14

您可以在生产者端使用initTransactions:

https://www.baeldung.com/kafka-exactly-once

 类似资料:
  • 我最近在我的Spring4/Hibernate Web应用程序中实现了Spring Security来处理登录/退出和不同的用户角色。 经过大量阅读,它现在似乎工作得很好,但我注意到,由于错误的Spring Security配置而引发的异常没有使用我的自定义处理程序进行优雅的处理,而是显示为一个丑陋的Tomcat错误页面(显示HTTP状态500-UserDetailsService是必需的,后跟一

  • 让我们假设我们有这样一个用python编写的琐碎守护进程: 我们使用< code>start-stop-daemon对其进行守护,默认情况下,它会在< code> - stop上发送< code > SIGTERM (< code > TERM )信号。 假设当前执行的步骤是。此时我们正在发送信号。 发生的情况是执行立即终止。 我发现我可以使用<code>signal.signal(signal.

  • 问题内容: 我正在处理一个将文件附加到电子邮件的PHP表单,并试图妥善处理上传的文件太大的情况。 我了解到,其中有两个设置会影响文件上传的最大大小:和。 如果文件的大小超过,PHP会将文件的大小返回为0。我可以检查一下。 但是,如果超出了范围,我的脚本将以静默方式失败并返回空白表格。 有什么办法可以捕捉到此错误? 问题答案: 从文档中: 如果发布数据的大小大于post_max_size,则 $ _

  • 我的web应用程序是由对服务器端的大量Ajax调用组成的。每次客户登录我的站点时,登录页面将从服务器获取(JSON Web令牌)令牌,并将其作为存储在客户端。(我选择将其存储为cookie,是因为这是唯一让浏览器自动发送的方式,而且据说比HTML5网页存储更安全)。令牌中有一个字段描述令牌的过期日期。对于每个Ajax调用,都会发送令牌进行身份验证。 但是上面的方法有一个问题:“如何在客户端优雅地处

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 按需重新处理大量数据。 在这两种情况下,大约有10,000个石英工作岗位产生并运行。在nightly中,我们有一个quartz作业,该作业产生10,000个作业,每个作业单独处理数据。 我们的问题是,我们正在运行大约30个线程,所以quartz作业自然会失效,并继续失效,直到所有的事情都处理完毕。加工过程可能需要6个小时。这10,000个作业中的每一个都涉及一个特定的域对象,可以并行处理并且完全独