当前位置: 首页 > 知识库问答 >
问题:

处理存储库和Kafka事务

邢承弼
2023-03-14

我有一个用例,我需要从一个Kafka主题中消费,做一些工作,生成另一个只有一次语义的Kafka主题,并保存到mongo数据库。看完文档后,我想kafka事务和mongo事务可以同步,但它们仍然是两个不同的事务。在下面的场景中,如果mongo提交失败,是否有方法回滚提交到主题并从消费者处重播的kafka记录。

producer.send()
producer.sendOffsetsToTransaction()
mongoDao.commit()

共有1个答案

归和惬
2023-03-14

如果侦听器抛出异常,kafka事务将回滚并重新传递。

如果mongo提交成功而kafka提交失败,则需要处理重复交付。

如果您将KafkaTransactionManager(或包含一个的KafkaChainedTransactionManager)连接到侦听器容器中,则无需将偏移量发送到事务,容器将在提交之前为您执行

 类似资料:
  • 我已经开始探索Spring Batch,并遇到了一些基本问题。

  • 本文向大家介绍HTML5 / JS存储事件处理程序,包括了HTML5 / JS存储事件处理程序的使用技巧和注意事项,需要的朋友参考一下 仅当存储事件由另一个窗口触发时,才会触发存储事件处理程序。您可以尝试运行以下代码:

  • 这个问题类似于将Kafka用作CQRS EventStore。好主意?,但更具体的实现。当我有数千个事件“源”(DDD中的聚合根)时,如何使用kafka作为事件存储?正如我在链接问题和其他一些地方读到的,我会有每个来源的主题的问题。如果我将事件按类型拆分到主题中,它将更容易使用和存储,但我需要访问特定源的事件流。如何用Kafka做事件来源?

  • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

  • 最近,我注意到Spring Data JDBC,所以我决定在一个新的Spring Boot(2.3.1)应用程序中使用它。在我的用例中,我有一个包含两类表的DB模式: 用于存储应用程序(更复杂的)业务逻辑所使用的实体的表。我使用Spring Data JPA(带有底层Hibernate)来处理它们。 表,用于存储彼此之间没有太多关系的简单数据记录(例如来自外部系统的数据记录)。我决定对它们使用Sp

  • 我们目前正在实现一个过程(使用Kafka处理器API),我们需要将来自一个主题的两个相关事件(消息)的信息合并,然后转发这些合并的信息。事件源于物联网设备,由于我们希望保持其有序,因此源主题使用设备标识符作为键。事件还包含相关ID: 钥匙 留言 我们的第一种方法是创建一个具有连接状态存储的处理器,该存储存储每条传入的消息,使用相关ID作为键。这使我们能够查询存储以获取传入消息的相关ID,如果存储中