当前位置: 首页 > 知识库问答 >
问题:

以编程方式停止BlockingQueue上同步的Spring批处理并行流

裴曜灿
2023-03-14

我有一个spring批处理作业,基本上从文件读取、处理每一行并写入输出(另一个文件)。由于处理步骤成本高昂,我希望它在多个线程中运行,但由于读取和写入步骤使用文件,因此这些步骤必须在单个线程上运行。我最终有3个流,每个流并行运行,每个流有一个步骤,在2个阻塞队列上同步。读取步骤从文件中读取并写入一个队列。处理步骤是多线程的,从队列中读取、处理并写入另一个队列。写入步骤,从第二个队列读取并将输出写入另一个文件。

它工作得很好,除了我不能找到一个干净和“快速”的方法来停止工作时,一切都完成了。现在,我在两个队列上使用超时的“poll ”,并假设如果在一定时间内没有项目出现,那么我们就完成了。这将作业终止延迟了指定的秒数,我不能使用很短的时间,因为由于某种外力(如机器负载)作业可能会受到延迟。

我尝试使用类似毒丸的东西,但问题是,如果我重写FlatFileItemReader上的“doRead”方法,在它得到“null”(表示文件结束)时返回毒丸,则该读取器将永远不会结束,作业也永远不会终止。

有人有建议吗?从文档中我知道,我可能只是从读取步骤(文件)的读取器和写入步骤(文件)中的编写器上放置一个“同步”,但我真的更喜欢不同的解决方案。

共有2个答案

壤驷子安
2023-03-14

因此,我将发布我的解决方案,以防有人感兴趣或面临类似问题。

我,总结我最终使用了迪恩克拉克建议的毒丸。我最终简化了工作,只使用一个BlocklingQueue,但我仍然有如何注入毒丸的问题,因为它是在步骤之间共享的队列,而不是在一个步骤内。

基本上,我只是让Spring Batch正常运行,并在负责注入毒丸的步骤中添加了一个监听器,而不是让读者返回毒丸,让处理器检测并忽略它。这个侦听器覆盖“afterStep ”,只是将它添加到队列中。从队列中读取的步骤将得到毒丸和队列的末尾,表示“没有更多的工作要做”,并通过返回null正常终止。

另一个“怪癖”是,在一个作业中,从队列中读取的步骤配置了一个线程池来并行处理项目,因此我需要杀死/取消阻止从队列中读取的所有线程。一个很好的技巧是让Reader从队列中读取,如果它是毒丸,只需将其重新注入队列并返回null。这样,每个线程都会得到一个毒丸并正确终止。

缑泓
2023-03-14

您只需在读取器中添加一个有状态变量即可跟踪作业的结束。

public PoisoningReader<T> extends FlatFileItemReader<T> {
    private boolean endJob = false;

    @Override
    public T doRead() {
        if (endJob) {
            return null;
        }

        T object = super.doRead();
        if (object == null) {
            endJob = true;
            return new PoisonPill();
        }
        return item;
    }
 类似资料:
  • 我有一份刚起步的工作。我希望在应用程序的特定点以编程方式运行此作业,而不是在启动应用程序时。 当在启动时运行时,我没有问题,但是当我尝试以编程方式运行它时,我得到了一个“NoSuchJobException”(

  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

  • 问题内容: 我正在开发一个Twitter应用程序,该应用程序直接从Twitter引用图像。如何防止动画gif播放? 在页面末尾使用不适用于Firefox。 是否有更好的JavaScript技巧?最好对所有浏览器都适用 问题答案: 这不是跨浏览器的解决方案,但是可以在Firefox和Opera中使用(不适用于ie8:-/)。采取从这里

  • Spring批处理-需要帮助以并行和多个节点运行批处理作业的独立步骤。一个spring批处理作业(JobA),包含三个步骤[步骤A(在compute1中)和步骤B(在compute2中)以及步骤C] StepA和StepB是独立的步骤,占用大量内存,因此不能在同一计算节点/JVM上并行运行。要使StepC同时启动(StepA和StepB),需要成功完成。我不想为了节省时间而依次执行步骤A和步骤B。

  • 问题内容: 是否可以通过命令在任何行停止执行python脚本? 喜欢 问题答案: sys.exit()可以完全满足您的要求。