当前位置: 首页 > 知识库问答 >
问题:

在Spring批处理作业中,在单个事务中写入两个kafka主题的最佳方法是什么

上官季
2023-03-14

当前实施情况

我有一个Spring批量工作,写一个Kafka主题。我从数据库中读取记录,转换它们,然后写入Kafka主题。

对现有作业的新更改

我应该再写一个审计主题和主要主题。

对于从数据库中读取的每个记录,我正在将一条消息(例如类Abc类型)写入主主题,对于同一条记录,我假设将另一个实体类类型的消息写入审计主题。

问题陈述

目前,我正在使用不同的KakfaTemplate来写入这两个主题,但问题是如果作业在写入主主题后失败,而它从未写入审计主题,该如何处理。如何回滚事务(我尚未在当前实现中实现事务)。

我需要更改应用程序的整个实现吗??我应该在一个事务中写这两个主题,还是有任何解决方案可以用于我当前的实现?

交易经理

@Override
protected JobRepository createJobRepo(){
JobRepositoryFactoryBean fac = new JobRepositoryFactoryBean;
fac.setDataSource(ds);
fac.setTransactionManger(transactionManger);
fac.set();
return fac.getObject();

共有2个答案

秦昊穹
2023-03-14

要正确实现这一点,您需要使用JTA事务管理器配置Spring Batch,该事务管理器协调DatasourceTransactionManager(用于Spring Batch的技术元数据)和KafkaTransactionManager(用于您的业务数据)。

在Spring批处理作业中,在单个事务中写入两个kafka主题的最佳方法是什么

如果你在这里使用了之前问题的建议:https://stackoverflow.com/a/65287130/5019386,两个写入程序将在Spring批处理驱动的同一事务中执行。

茅星雨
2023-03-14

从长远来看,改变实现方式会让你的生活变得更加轻松。您描述的问题称为事务发件箱模式,有许多公认的实现。

批处理作业适用于Kafka连接器(Debezium是一种更复杂、更灵活的解决方案)。连接器自动处理扩展、协调、偏移处理和并发,否则您必须使用select for update等实现这些功能。

我更喜欢的解决方案是简化问题。将其分为两部分。

使用连接器将记录写入主题。使用具有精确一次语义的SMT(无状态单消息转换)的kafka streams应用程序,将转换后的消息生成到审核日志。这样,只有当原始主题中的消息已生成时,adit日志中才会有消息。事务复杂性已经过时了。

kafka连接器(Debezium)将处理重试、故障转移、偏移等。

另一种较旧的方法是事务发件箱,可以使用Debezium TX发件箱

 类似资料:
  • 然后,我对一个方法使用了注释,该方法执行以下操作: 这不起作用。是事务性的,但是当调用方法时,没有正在进行的事务,并且我得到一个。 我打算尝试方法,但Javadoc声明这只用于本地事务,因此它似乎不符合我的需要。 我的下一步是尝试直接使用Kafka的Producer API,看看这种模式是否有效,但如果有人能告诉我知道我在浪费时间,Kafka不支持事务性地写多个主题,我会很感激。 我确实在Conf

  • 我有一个瞬移工作,接受Kafka的主题,通过一堆操作员。我想知道什么是最好的方法来处理中间发生的异常。 假设存在异常,使用并在catch块中输出到,并在调用外部服务以更新另一个相关作业状态的末尾为提供单独的接收器函数 但是,我的问题是,通过这样做,我似乎仍然需要调用并传入一个空值,以便继续到下面的运算符并进入最后一个阶段,在这个阶段,将流入单独的接收器函数。这样做对吗? 另外,我不确定如果不在操作

  • 需要读取spring批处理中的文件,对其进行处理并将其作为一个提要保存。一个提要包含50%的信息。当我必须持久化提要的最终结果时,我需要使用公共字段将它们组合起来,并像一个项目一样持久化。请参见下面的示例。 我需要保留的最终信息如下: 请建议我如何在我的Spring批工作中实现这一点。 谢谢

  • 我使用FlatFileItemReader创建了一个spring批处理作业,它从一个分隔文件中读取数据,然后使用JdbcBatchItemWriter写入DB。我的setp配置如下所示。 上面的配置是为每100行打开单独的事务,因此,如果在完成tasklet(步骤1)之前发生故障,则我无法恢复之前提交的行。有没有办法在一个事务中运行整个tasklet?。 另外:我使用MapJobRepositor

  • 我正在使用Spring AMQP侦听RabbitMQ队列。在侦听队列时,根据业务逻辑,我的服务可以引发RuntimeException,在这种情况下,消息将重试多次。在最大次数重试后,消息将保留在DLQ中。我想知道,在DLQ中处理这些消息的最佳方法是什么?我从博客上读到我可以使用停车场队列。但在这种情况下,如何监控队列并通知人们死信消息?P、 对不起,我的英语不好。希望我能够解释我的问题:)

  • 我有几个不同的Spring批处理作业需要写入同一个平面文件。在平面文件中写入数据的顺序并不重要。这些批处理作业可能同时运行。 FlatFileItemWriter是否合适?我担心的是,如果多个作业同时尝试写入平面文件,数据可能会混合在一起。