当前位置: 首页 > 知识库问答 >
问题:

重构Spring批处理作业以使用Apache Kafka(分离读写器)

孟浩慨
2023-03-14

我目前有一个Spring批处理作业,只需一个步骤即可从Oracle读取数据,通过多个Spring批处理处理器(CompositeItemProcessor)传递数据,并将数据写入不同的目标,如Oracle和文件(CompositeItemWriter):

<batch:step id="dataTransformationJob">
    <batch:tasklet transaction-manager="transactionManager" task-executor="taskExecutor" throttle-limit="30">
        <batch:chunk reader="dataReader" processor="compositeDataProcessor" writer="compositeItemWriter" commit-interval="100"></batch:chunk>
    </batch:tasklet>
</batch:step>

在上述步骤中,compositeItemWriter配置了两个写入程序,它们一个接一个地运行,向Oracle写入1亿条记录和一个文件。另外,dataReader有一个同步读取方法,以确保多个线程不会从Oracle读取相同的数据。从今天开始,这项工作需要1小时30分钟才能完成。

我计划将上述工作分解为两部分,以便阅读器/处理器生成关于2个Kafka主题的数据(一个用于将数据写入Oracle,另一个用于将数据写入文件)。在等式的另一边,我将有一个具有两个并行流的工作,从每个主题读取数据并将数据分别写入Oracle和文件。

考虑到上述架构,我想了解如何重构Spring Batch作业以使用Kafka。我相信以下领域是我需要解决的问题:

  1. 在不使用Kafka的现有工作中,我的油门限制是30;然而,当我在中间使用Kafka时,如何确定正确的油门限制

注意:我知道Kafka Connect,但不想使用它,因为它需要设置一个Connect群集,而我没有可用的基础设施来支持它。

共有1个答案

宗政学
2023-03-14

回答您的问题:

  1. Kafka制作人无需节流,Kafka中的数据应尽快可用。您的消费者可以根据实现进行节流(如果需要)
  2. Kafka制作人是可html" target="_blank">配置的。100条信息并不一定意味着100次网络通话。您可以向kafka producer写入100条消息(根据配置,它可能会缓冲,也可能不会缓冲),然后刷新缓冲区以强制网络调用。这将导致(几乎)相同的现有行为
  3. 一条消息中可以包含多行,因为Kafka消息的负载完全由您决定。但是在Kafka中,你的推理是将多行信息整合到一条消息中,以避免多次网络呼叫 无效,因为在一次网络调用中可以生成/使用多条消息(行)。对于你的初稿,我建议保持简单,让一行对应一条消息
  4. 据我所知不是这样。(但这一点我可能错了)
  5. 是的,我相信它们应该很好用
 类似资料:
  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中

  • 我试图在spring batch admin中使用mysql数据库,而不是默认的HSQL。根据文件 http://docs.spring.io/spring-batch-admin/reference/reference.xhtml和使用jndi数据源与Spring批处理管理 我复制了to并将其配置值从 到 下面是我的完整配置。 我还尝试应对数据源环境。xml到同一个文件夹,并将其配置更改为mys

  • 我已经使用开始我的作业,当我尝试使用另一个请求停止作业时,然后获取exeption: JobExecutionNotrunningException:JobExecution必须正在运行,才能停止 当打印作业状态总是获取但批处理作业正在运行时 它的web应用程序,首先上传一些CSV文件,并使用spring batch启动一些操作,在执行过程中,如果用户需要停止,则从另一个控制器方法来停止请求,并试

  • 我在运行Spring批处理作业时遇到了一个技术问题。作业只是从DB(MongoDB)读取记录,对记录进行一些计算(聚合)并将记录结果写入另一个表。读取A、处理A、写入记录B B是A的许多记录的聚合。我想使用远程分块来垂直扩展我的系统,从而使处理部分缩放和快速。我面临的问题是,我需要同步A记录,以便在将结果写入B时处理它们不会发生冲突。如果我将10条A记录分发给4个从站,它们在将聚合结果写入B时会发

  • 我有一个批处理任务,从SQLServer读取记录并写入MARIADB。尽管我在批处理过程中实现了分区的概念,但该过程非常缓慢 下面是源系统和目标系统的数据源配置。 以下是配置的步骤和分区步骤 用读者和作者更新帖子 有人能介绍如何使用Spring Batch提高读写性能吗?