我试图在Spring批处理作业中使用多线程步骤,但我得到一个“范围‘作业’对于当前线程不活动……”。我在Spring中尝试了几种方法,但目前我正在使用我认为是OOTB Spring构造的方法,但仍然失败。
错误是:
2021-09-19 22:40:03,432 ERROR [https-jsse-nio-8448-exec-4]: org.springframework.batch.core.step.AbstractStep Encountered an error executing step writeToDatabaseStep in job softlayerUploadJob
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.softLayerDataItemQueue': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:365)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
at com.sun.proxy.$Proxy126.read(Unknown Source)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113)
at org.html" target="_blank">springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159)
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:353)
... 19 common frames omitted
基本作业结构简化:作业SoftLayerUpload作业步骤:softlayerUploadFileStep(不能多线程)从Excel文件读取写入SoftLayerDataItemQueue)bean,最终写入java.util.队列步骤:WriteToDatabase aseStep从SoftLayerDataItemQueue bean读取使用JpaWriter写入数据库
SoftLayerJob配置。JAVA
public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);
private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();
// private Queue<SoftLayerData> queue = new LinkedList<>();
// @Value("#{stepExecution.jobExecution.jobInstance.instanceId}")
@Value("#{jobExecution.jobInstance.instanceId}")
private int jobInstanceId;
public Queue<SoftLayerData> getQueue() {
Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
if (result == null) {
result = new LinkedList<>();
queueMap.put(jobInstanceId, result);
}
logger.info("Returning queue with item count=" + result.size());
return result;
}
@Override
public void write(List<? extends SoftLayerData> items) throws Exception {
logger.info("@@@ Attempting to add item to queue with bean hashCode=" + this.hashCode() + " job instanceid="
+ jobInstanceId + " ");
logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
if (logger.isDebugEnabled()) {
for (SoftLayerData item : items) {
logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
}
}
getQueue().addAll(items);
}
@Override
public SoftLayerData read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
logger.info("@@@ Attempting to remove item from queue with bean hashCode=" + this.hashCode()
+ " job instanceid=" + jobInstanceId + " ");
SoftLayerData result = null;
if (getQueue() != null && getQueue().size() > 0) {
result = getQueue().remove();
logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
} else {
logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
}
return result;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: open()");
/* Unused method */
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: update()");
/* Unused method */
}
@Override
public void close() throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: close()");
/* Unused method */
}
}
SoftLayerDataItemQueue。JAVA
public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);
private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();
@Value("#{jobExecution.jobInstance.instanceId}")
private int jobInstanceId;
public Queue<SoftLayerData> getQueue() {
Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
if (result == null) {
result = new LinkedList<>();
queueMap.put(jobInstanceId, result);
}
logger.info("Returning queue with item count=" + result.size());
return result;
}
@Override
public void write(List<? extends SoftLayerData> items) throws Exception {
logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
if (logger.isDebugEnabled()) {
for (SoftLayerData item : items) {
logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
}
}
getQueue().addAll(items);
}
@Override
public SoftLayerData read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
+ " job instanceid=" + jobInstanceId + " ");
SoftLayerData result = null;
if (getQueue() != null && getQueue().size() > 0) {
result = getQueue().remove();
logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
} else {
logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
}
return result;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: open()");
/* Unused method */
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: update()");
/* Unused method */
}
@Override
public void close() throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: close()");
/* Unused method */
}
}
注意:我不喜欢使用SoftLayerDataItemQueue,但我想不出任何其他方法来写入在一个步骤中处理的项目,并在另一个步骤中读取它们,尤其是对于大容量和并行处理。我希望Spring能有某种方式将数据从一个步骤写入另一个步骤,但我找不到它。其他人建议写一个文件,或者在工作或步骤上下文中。
从某种意义上说,这只是一个部分答案,因为这解决了我的问题,但我无法解释为什么这是必要的。遵循https://github.com/spring-projects/spring-batch/issues/1335
我创建了自己的SimpleAsyncTaskExecutor,如下所示:
public class ParallelSimpleAsyncTaskExecutor extends SimpleAsyncTaskExecutor {
/**
*
*/
private static final long serialVersionUID = 1L;
public ParallelSimpleAsyncTaskExecutor(String prefix) {
super(prefix);
}
@Override
protected void doExecute(Runnable task) {
JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
super.doExecute(new Runnable() {
@Override
public
void run() {
JobSynchronizationManager.register(jobExecution);
try {
task.run();
} finally {
JobSynchronizationManager.release();
}
}
});
}
}
在我的Spring批处理作业中,我试图使用JobExecutionContext在步骤之间共享数据,只有当我将步骤保持为单线程时,它才会起作用,如下所示: 但是,添加时发生错误: 我试着像这样解决这个问题:https://github.com/spring-projects/spring-batch/issues/1335,但它似乎只使用了主线程之外的一个线程。 有没有办法在不添加经过调整的代码的
这是我的简单工作配置: 如您所见,我已按照此处的说明配置了bean,并将以下行添加到我的中: 这就是我犯的错误。我哪里做错了? 请注意,我的项目配置了2个数据源,以及Spring MVC Web: 数据源1,对于JPA/Hibrate: 数据源2,用于Spring批处理JDBC作业存储库: 我找不到这里提到的神秘XML。 出什么事了? 谢谢。
如果我有一个Web应用程序,它的应用程序上下文加载了我的webapp和所有作业配置文件的所有内容,如果我的作业中有一个没有范围="步骤"的简单ItemReader,那么阅读器是单例的,对吗?所以如果我通过SimpleJobLauncher从控制器启动两次作业,我会使用同一个bean,对吗?除非我放入范围="步骤",以便每个作业执行一个bean? 另一方面,如果我从CommandLineJobRun
我试图在运行时设置spring批处理作业的块大小,方法是将其注入我的步骤中,如下所示: 但我得到以下错误:java.lang.非法状态异常:没有上下文持有人可用的工作范围 我在网上做了一些研究,但不明白为什么我会遇到这个异常。如果您能帮助我理解这个错误的含义以及如何解决它,我将不胜感激。谢谢!
我只想知道spring batch中是否有“job”范围,就像“step”范围一样?如果没有,我们应该开发我们的自定义范围,还是有更好的替代方案? 提前谢了。
我正在处理一个使用Spring批处理的项目。在本项目中,我使用Spring批处理后期绑定,其中我使用JobParameters注入了一个参数(将用作SQL读取器查询的条件的字符串)。目前,我正在使用进行后期绑定,所有操作都非常正常。 这里我要问的是何时使用以及何时使用。我已经阅读了Spring批处理参考文件,并在谷歌上搜索了StepScope和jobscope。我得到的只是: a.StepScop