当前位置: 首页 > 知识库问答 >
问题:

单文件到多文件的Spring批处理多线程处理

能正青
2023-03-14

我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间

我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring batch(我是spring batch的新手)

这是我的问题
1。在任务中,是否可以使ItemReader到Item Writer多线程(当我读取文件时,在线程写入数据之前创建一个新线程来处理数据)?如果不是,我需要创建两个步骤,第一步读取单线程文件,另一个步骤是多线程写入单个文件,但是如何将数据列表从以前的任务传递给另一个任务。
2.如果单个线程出现任何故障,我如何停止整个批处理作业处理。
3.如果在一定间隔后发生故障,如何重试批处理作业。我知道在失败的情况下有重试选项,但我找不到在失败的情况下在一定时间间隔后重试任务的选项。这里我不是在谈论调度程序,因为我已经在调度程序下运行了批处理作业,但在失败时,它必须在3分钟后重新运行。

共有3个答案

简滨海
2023-03-14

您可以将输入文件拆分为多个文件,使用分区器并使用线程加载小文件,但如果出错,您必须在DB清理后重新启动所有作业。

<batch:job id="transformJob">
    <batch:step id="deleteDir" next="cleanDB">
        <batch:tasklet ref="fileDeletingTasklet" />
    </batch:step>
    <batch:step id="cleanDB" next="split">
        <batch:tasklet ref="countThreadTasklet" />
    </batch:step>
    <batch:step id="split" next="partitionerMasterImporter">
        <batch:tasklet>
            <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" commit-interval="#{jobExecutionContext['chunk.count']}" />
        </batch:tasklet>
    </batch:step>
    <batch:step id="partitionerMasterImporter" next="partitionerMasterExporter">
        <partition step="importChunked" partitioner="filePartitioner">
            <handler grid-size="10" task-executor="taskExecutor" />
        </partition>
    </batch:step>
</batch:job>

完整的示例代码(在Github上)。

希望这能有所帮助。

米迪
2023-03-14
  1. 关于多线程阅读如何在Spring Batch中设置多线程?答复它会为你指明正确的方向。此外,在这个示例中,还有一些关于重新启动CSV文件的考虑

让我知道这是否有帮助,或者你如何解决问题,因为我对我(未来)的工作感兴趣<我希望我的指示能有所帮助。

晋安国
2023-03-14

下面是我如何解决这个问题的。

>

以目录位置作为作业参数启动作业。

 类似资料:
  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • 在Spring批处理中,我试图读取CSV文件,并希望将每一行分配给一个单独的线程并对其进行处理。我试图通过使用TaskExecutor来实现它,但所有线程都在一次拾取同一行。我还尝试使用Partioner实现这个概念,同样的事情也发生了。请参阅下面我的配置Xml。 步骤说明 我尝试过不同类型的任务执行器,但它们的行为方式都是一样的。如何将每一行分配给单独的线程?

  • 我想编写一个spring boot批处理应用程序,其中我有一个充满事件的数据库表。我想做的是有一个多线程的spring boot批处理应用程序,它将以这种方式工作: 我想有5个线程运行,每个线程将保留一个偏移量来跟踪它读取的事件,以便没有其他线程再次读取相同的事件。我想怎么做: 所以我希望能够在数据库表中为每个线程保留偏移量。有没有办法让Spring Boot环境以这种方式工作?

  • 我使用的是Spring Batch 2.1.8。释放我有一个文件,它由一些头信息和一些需要处理的记录组成。 我有一个使用面向块处理的步骤。该步骤包含ItemReader和ItemWriter的实现。ItemReader实现是线程安全的,而ItemWriter不是。 我想在处理(或写入)任何记录之前使用标题信息。在继续使用面向块的处理时,如何确保这一点? 建议的解决方案:一种解决方案可以是编写一个预

  • 我必须使用Spring Batch配置一个作业。是否可以有一个单线程的项目阅读器,但多线程处理器? 在这种情况下,ItemReader将通过从数据库中读取工作项(通过执行预定义的查询)来创建要处理的工作项,每个处理器将并行处理项/块。