当前位置: 首页 > 知识库问答 >
问题:

为什么Spring批处理是单线程而不是多线程

祁聪
2023-03-14

我正在通过quartz调度程序调用spring批处理作业,它应该每1分钟运行一次。当作业第一次运行时,成功打开ItemReader并运行作业。但是,当作业尝试第二次运行时,它使用的是第一次运行的相同实例,该实例已经初始化,并接收“java.lang.IllegalStateException:Stream is eignitialized.Close before re-opening”。我已经将itemreader和ItemWriter的作用域设置为步骤。

如果我在配置上做错了什么,请告诉我?

<?xml version="1.0" encoding="UTF-8"?>

<import resource="context.xml"/>
<import resource="database.xml"/>   
<bean id="MyPartitioner" class="com.MyPartitioner" />
<bean id="itemProcessor" class="com.MyProcessor" scope="step" />

<bean id="itemReader" class="com.MyItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="query...."/>
    <property name="rowMapper">
        <bean class="com.MyRowMapper" scope="step"/>
    </property>
</bean>

<job id="MyJOB" xmlns="http://www.springframework.org/schema/batch">
    <step id="masterStep">
        <partition step="slave" partitioner="MyPartitioner">
            <handler grid-size="10" task-executor="taskExecutor"/>
        </partition>
    </step>
</job>
<step id="slave" xmlns="http://www.springframework.org/schema/batch">
    <tasklet>
        <chunk reader="itemReader" writer="mysqlItemWriter" processor="itemProcessor" commit-interval="100"/>
    </tasklet>
</step>

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="20"/>
    <property name="maxPoolSize" value="20"/>
    <property name="allowCoreThreadTimeOut" value="true"/>
</bean>

<bean id="mysqlItemWriter" class="com.MyItemWriter" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[      
            query.....
        ]]>
        </value>
    </property>

<property name="itemPreparedStatementSetter">
        <bean class="com.MyPreparedStatementSetter" scope="step"/>
    </property>
</bean>

石英作业调用程序-

Scheduler scheduler = new    StdSchedulerFactory("quartz.properties").getScheduler();
JobKey jobKey = new JobKey("QUARTZJOB", "QUARTZJOB");
JobDetail jobDetail =     JobBuilder.newJob("com.MySpringJobInvoker").withIdentity(jobKey).build();
jobDetail.getJobDataMap().put("jobName", "SpringBatchJob");
SimpleTrigger smplTrg = newTrigger().withIdentity("QUARTZJOB", "QUARTZJOB").startAt(new Date(startTime))  
                            .withSchedule(simpleSchedule().withIntervalInSeconds(frequency).withRepeatCount(repeatCnt))
                            .forJob(jobDetail).withPriority(5).build();
scheduler.scheduleJob(jobDetail, smplTrg);

石英工单-

public class MySpringJobInvoker implements Job
{
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException
{
    JobDataMap data = jobExecutionContext.getJobDetail().getJobDataMap();

    ApplicationContext applicationContext =ApplicationContextUtil.getInstance();

    JobLauncher jobLauncher = (JobLauncher) applicationContext.getBean("jobLauncher");
    org.springframework.batch.core.Job job = (org.springframework.batch.core.Job) applicationContext.getBean(data.getString("jobName"));
    JobParameters param = new JobParametersBuilder().addString("myparam","myparam").addString(Long.toString(System.currentTimeMillis(),Long.toString(System.currentTimeMillis())).toJobParameters();
    JobExecution execution = jobLauncher.run(job, param);
}

public class ApplicationContextUtil{
private static ApplicationContext ApplicationContext;

public static synchronized ApplicationContext getInstance()
{
    if(applicationContext == null)
    {
        applicationContext = new ClassPathXmlApplicationContext("myjob.xml");
    }
    return applicationContext;
}

}

共有1个答案

澹台昆
2023-03-14

您从Quartz传递给Spring批处理作业的参数是什么?您能发布异常堆栈跟踪吗?

如果您试图用相同的参数执行批处理的第二个实例--它将不起作用。Spring批处理基于传递的参数来标识作业的唯一实例--因此作业的每个新实例都需要传递不同的参数。

 类似资料:
  • 本文向大家介绍请为什么说js是单线程,而不是多线程呢?相关面试题,主要包含被问及请为什么说js是单线程,而不是多线程呢?时的应答技巧和注意事项,需要的朋友参考一下 JavaScript语言的一大特点就是单线程,也就是说,同一个时间只能做一件事。那么,为什么JavaScript不能有多个线程呢?这样能提高效率啊。 JavaScript的单线程,与它的用途有关。作为浏览器脚本语言,JavaScript

  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 此答案指示如何将转换为,同时管理将发生阻塞的位置: 我的问题和评论中的问题一样: 怎么了?为什么你使用一个额外的线程与Promise结合? 答复如下: 它会在你拉线的时候卡住线。如果您已经为这样的未来配置了ExecutionContext,这很好,但是默认的ExecutionContext包含的线程与您拥有的处理器一样多。 我不确定我是否理解这个解释。重申: 有什么问题?在未来内部阻塞不是和手动创

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • 问题内容: 我一直在寻找一些方法来轻松地对我的一些简单分析代码进行多线程处理,因为我注意到numpy仅使用一个内核,尽管事实上它应该是多线程的。 我知道numpy是为多个内核配置的,因为我可以看到使用numpy.dot的测试使用了我的所有内核,因此我只是将Mean重新实现为点积,并且运行速度更快。是否有某些原因意味着不能自己快速运行?我发现较大的数组具有类似的行为,尽管该比率比示例中显示的3接近2

  • 我想编写一个spring boot批处理应用程序,其中我有一个充满事件的数据库表。我想做的是有一个多线程的spring boot批处理应用程序,它将以这种方式工作: 我想有5个线程运行,每个线程将保留一个偏移量来跟踪它读取的事件,以便没有其他线程再次读取相同的事件。我想怎么做: 所以我希望能够在数据库表中为每个线程保留偏移量。有没有办法让Spring Boot环境以这种方式工作?