当前位置: 首页 > 知识库问答 >
问题:

Spring Batch 3.0中作业范围bean的多线程访问

呼延智明
2023-03-14

在SpringBatch 3.0中,我试图在分区和多线程步骤(配置了一个任务:executor bean)中为bean使用新的作业范围功能,在这两种情况下,我都遇到了异常

  Caused by: java.lang.IllegalStateException: No context holder available for job scope
        at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:153)
        at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:338)

但如果我使豆步范围它的工作正常。

我注意到JobSynsynizationManager上的评论说

N、 B.每个{@link Job}实现都有责任确保{@link JobContext}在作业执行中可能涉及的每个线程上可用,包括池中的工作线程。

所以我想知道我是否需要做些什么来设置它,或者它是否是作业范围实现中的错误,它没有正确设置工作线程?

StepSynchronizationManager有一个类似的注释,但在这种情况下,显然有什么东西在步骤中正确设置了线程。

复制问题的示例代码:

TestItemReader

package test;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.InitializingBean;

public class TestItemReader implements ItemReader<Integer>, InitializingBean {

    private List<Integer> items;

    @Override
    public synchronized Integer read() throws Exception, UnexpectedInputException,
            ParseException, NonTransientResourceException {

        if (items.size() > 0) {
            return items.remove(0);
        }

        return null;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        System.out.println("Initialising reader");

        items = new ArrayList<Integer>();   
        for (int i=0;i<100;i++) items.add(i);       
    }
}

TestItemWriter

package test;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class TestItemWriter implements ItemWriter<Integer> {

    @Override
    public void write(List<? extends Integer> items) throws Exception {

        for (int i : items) {
            System.out.println(Thread.currentThread().getName() + " Writing " + i);
        }       
    }
}

测试作业上下文。xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
                        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                         http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <job id="job" restartable="true" xmlns="http://www.springframework.org/schema/batch">
        <step id="index">   
            <tasklet task-executor="executor">
                <chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
            </tasklet>
        </step>
    </job>

    <bean id="itemReader" class="test.TestItemReader" scope="job"/>

    <bean id="itemWriter" class="test.TestItemWriter"/>

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean class="org.springframework.batch.test.JobLauncherTestUtils">
        <property name="job" ref="job"/>
    </bean>

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>

    <batch:job-repository id="jobRepository"/>

    <jdbc:embedded-database id="dataSource" type="HSQL">
        <jdbc:script location="classpath:/org/springframework/batch/core/schema-hsqldb.sql"/>
    </jdbc:embedded-database>

    <task:executor id="executor" queue-capacity="0" pool-size="5"/>

</beans>

工作测试

package test;

import java.util.Collection;

import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.ContextConfiguration;

@ContextConfiguration(locations={"test-job-context.xml"})
public class JobTest extends AbstractJUnit4SpringContextTests {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @BeforeClass
    public static void beforeClassSetup() {    

        BasicConfigurator.configure();
        Logger.getRootLogger().setLevel(Level.WARN);
        Logger.getLogger("org.springframework.batch.core.scope.JobScope").setLevel(Level.DEBUG);
        Logger.getLogger("org.springframework.batch.core.scope.StepScope").setLevel(Level.DEBUG);
    }

    @Test
    public void testJobLaunch() throws Exception {

        JobExecution execution = jobLauncherTestUtils.launchJob();

        System.out.println("After execution "  + execution);

        Collection<StepExecution> stepExecutions = execution.getStepExecutions();
        for (StepExecution stepExecution : stepExecutions) {
            System.out.println("StepExecution " + stepExecution);
        }
    }   
}

运行上述JUnit测试将再现该问题。如果将读卡器上的作用域更改为步骤,或删除该作用域,则测试将正常完成。

共有1个答案

赵高韵
2023-03-14

这是因为当前执行(本例中为JobExecution)存储在线程本地(参见org.springframework.batch.core.scope.context.SynchronizationManagerSupport)中。尽管如此,为其添加多线程支持似乎并不不合理。请随意为其创建一个Jira问题(如果您愿意,还可以创建一个拉动请求)。

 类似资料:
  • 问题内容: 场景:我们有一个在Websphere中运行的Spring托管的Web应用程序。(Spring 3.0.x,WAS 7)Webapp通过Spring的Web应用程序利用Websphere的工作管理器(配置为10的线程池)以执行计算密集型db读取操作。因此,基本上,有一个请求来生成10个不同的文件。要生成文档,只需db读取即可收集/处理数据。因此,我们基本上产生了10个线程来处理10个文档

  • 我正在开发一个Spring Boot应用程序,它在给定的网站中查找给定的,如果找到匹配项,就会删除这些网页。我正在编写一个cron作业,每5分钟刷新一次结果,如下所示: 数据库有100个,在cron作业中,我首先列出了上次获取结果的最旧的10个关键字。因此,例如,第一次运行应该使用id 1到10的,第二次运行应该使用id 11到20,依此类推,第11次运行应该再次使用id 1到10,并且该过程继续

  • 我想在注射过程中给CDI中的pojo bean一个移动范围。 我创建了一个普通的bean,并在托管Bean中注入了与相同的内容: //POJO类 当我使用相同的语法注入时,Pojo bean的填充值无法在新视图bean中恢复。 但是当我在类声明中使用时,它会起作用,然后是非作用域注入,如下所示: 注射: 前一种情况在我做制片人和资格赛时得到了解决,但我觉得这将是一种我不应该做的开销。作为CDI的新

  • 问题内容: 有人可以解释一下我一直只使用“原型”的Spring bean的作用域吗,但是还有其他参数可以代替吗? 我在说什么的例子 问题答案: 从Spring规范开始,支持五种类型的bean作用域: 1.单身人士(默认*) 每个Spring IoC容器将单个bean定义的作用域限定为单个对象实例。 2.原型 将单个bean定义的作用域限定为任意数量的对象实例。 3.要求 将单个bean定义的范围限

  • 我现在尝试了很多东西,但我似乎错过了一块拼图。故事是这样的:我有一个请求范围的bean,它从HttpServletRequest读取一些SessionContext。此属性在过滤器中设置。因此,当代码在正确的线程上运行时,这是非常好的。 现在我开始使用java 8s的新功能CompletableFuture,我有其中三个功能在请求线程等待结果时并行计算东西。我想做的是提升/移交/传播bean或请求

  • 我已经对Spring bean使用的注释进行了注释。我通过spring DI成功地创建了相同的bean,并设置了对象注入的范围。现在,我想用Struts2和di做同样的事情。为此,我在中创建了bean定义 和简单的操作来创建bean并将其注入到我的操作中 在JSP中,我在会话bean上使用简单的迭代器 null