当前位置: 首页 > 知识库问答 >
问题:

Spring batch单线程读取器和多线程写入器

饶明亮
2023-03-14

试图找出是否有人问过这个问题,但没有。

问题就在这里。以下必须通过Spring batch实现,有一个文件需要读取和处理。项读取器不是线程安全的。计划是让多线程同构处理器和多线程同构写入器通过单线程读取器读取项目。

有点像下面:

        ----------> Processor #1 ----------> Writer #1
       |
    Reader -------> Processor #2 ----------> Writer #2
       |
        ----------> Processor #3 ----------> Writer #3

尝试了AsyncItemProcessor和AsyncItemWriter,但在处理器上保留调试点导致在释放该点(即单线程处理)之前不执行读取器。

任务执行程序已尝试如下:

<tasklet task-executor="taskExecutor" throttle-limit="20">

在读取器上启动了多个线程。

同步阅读器也不起作用。

我试着阅读关于partitioner的内容,但它似乎很复杂。

是否有注释将阅读器标记为单线程?将读取数据推送到全局上下文是一个好主意吗?

请指导解决方案

共有2个答案

杨利
2023-03-14

遇到了类似的问题。

这是我目前的做法。正如@mminella建议的那样,将itemReader与平面文件ItemReader作为委托同步。这具有不错的性能。代码目前每秒写入大约4K条记录,但速度并不完全取决于设计,其他属性也有所贡献。

尝试了其他方法来提高性能,但都失败了。

  1. 定义同步ItemReader,将FlatFileItemReader作为委托进行聚合,但我最终维护了大量导致性能下降的状态。也许需要优化或同步的代码会更快
  2. 在不同的线程中激发了每个insert PreparedStatement批处理,但并没有提高多少性能,但我仍然指望这一点,以防我遇到一个环境,在该环境中,批处理的各个线程将显著提高性能
广楚
2023-03-14

我猜Spring Batch API中没有你正在寻找的模式。编码对你的一部分将需要实现你在寻找什么。

方法ItemWriter。write已经根据块大小获取了一个已处理项的列表,因此您可以将该清单分成任意多个线程。您生成自己的线程,并将一段列表传递给每个要写入的线程。

问题出在ItemProcesor.process()方法上,因为它是一项一项地处理,所以你会受到一项的限制,你不能对一项进行太多的线程处理。

所以,挑战是编写自己的阅读器,而不是将一个项目列表交给处理器,这样您就可以并行处理这些项目

在所有这些设置中,您必须记住,您生成的线程将超出Spring batch的读-处理-写事务边界,因此您必须自己处理这个问题——合并所有线程的已处理输出,等待所有线程完成并处理任何错误。总而言之,风险很大。

使条目读取器返回列表而不是单个对象-Spring batch

 类似资料:
  • 我有多个线程从同一队列中写入和读取。我正在使用ConcurrentLinkedQueue。任何线程都可以向队列中添加元素,任何其他线程都可以轮询元素并对其进行处理。我的问题是,如果队列是空的,我想让在队列上轮询的线程等待,直到其他线程将某个元素添加到队列中,如果所有线程都在等待,那么整个进程都应该退出,即不再有线程在写。我用它列出一个目录及其子目录中的所有文件#更快地扫描。 以下是代码片段1: {

  • 我应该设计一个组件,该组件应该通过在Java中使用多线程来实现以下任务,因为文件非常大/多,而且任务必须在很短的窗口内完成: 读取多个csv/xml文件并将所有数据保存在数据库中 读取数据库并将数据写入单独的csv文件 我对多线程很陌生 请告诉我您建议在Java或Spring Batch中使用传统的多线程。这里的输入源是多个的,输出源也是多个的。

  • 我正在开发一个应用程序(控制台),其中包含一个while循环,我希望允许用户暂停和恢复。经过一些研究,似乎一个额外的线程负责等待用户输入是正确的解决方案。 我已经能够实现持续等待输入的额外线程,并且还有一个kill方法,它设置flag变量来停止线程while()循环。不幸的是,除非有输入,否则while循环不会结束,只有在输入线程的while环至少完成一次后,这个kill标志才会生效。由于主线程也

  • 我必须使用Spring Batch配置一个作业。是否可以有一个单线程的项目阅读器,但多线程处理器? 在这种情况下,ItemReader将通过从数据库中读取工作项(通过执行预定义的查询)来创建要处理的工作项,每个处理器将并行处理项/块。

  • 问题内容: 我正在测试本地计算机上的套接字。我正在尝试使用线程在一个程序中同时运行套接字和服务器。我的服务器是回显服务器,因此它可以将收到的所有消息发回。我的问题是,当我同时在客户端和服务器上启动两个线程时,当它们到达我从输入流读取的部分时,它们将“冻结”。它可以很好地工作到客户端发送消息的那部分。然后,它停止了,因为似乎客户端正在等待消息,服务器也是如此,即使我已经通过写入输出流向服务器发送了消

  • 有一天我在想线程,我开始怀疑如果多个线程根本不修改它,它们是否可以访问相同的信息(不使用同步(锁))。 一个例子是10个线程读取1个类,由于我们称之为M的线程,这些线程调用访问器从这个类中获取信息,但不以任何方式修改信息。 同时,线程M偶尔会修改这个类中的数据(这将改变10个线程在“重启”后正在进行的计算结果) 会有问题吗?问题是其中一个线程将线程M修改过的信息“放回”。最重要的是,是否有一天访问