我尝试在下面的步骤中使用多线程,但下面出现了一个异常:
我的步骤:代码:
<step id="generateRecordFile" >
<tasklet>
<chunk reader="inputFileReader" writer="outputFileWriter"
commit-interval="100" task-executor="asyncTaskExecutor">
<streams>
<stream ref="inputFileReader" />
</streams>
</chunk>
</tasklet>
</step>
<beans:bean id="asyncTaskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
Reader:
<bean id="inputFileReader"
class="org.springframework.batch.item.file.MultiResourceItemReader"
scope="step">
<property name="resources" value="#{jobParameters['fileLocation']}" />
<property name="delegate" ref="fileInputFileReader" />
</bean>
<bean id="fileProductFeeReader" class="<package>.SynchronizedItemStreamReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<!-- split it -->
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names"
value="name,age,address,mobileno" />
<property name="delimiter">
<util:constant
static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
</property>
</bean>
</property>
<property name="fieldSetMapper">
<bean
class="com.services.extractor.Mapper">
</bean>
</property>
</bean>
</property>
</bean>
<bean id="outputFileWriter"
class="org.springframework.batch.item.support.CompositeItemWriter"
scope="step">
<property name="delegates">
<list>
<ref bean="routeWriter" />
</list>
</property>
</bean>
<bean id="routeWriter"
class="com.services.extractor.processor.IngestionWriter"
scope="step">
</bean>
公共类SynchronizedItemStreamReader实现ResourceAwareItemReaderItemStream{
private FlatFileItemReader<T> delegate;
private Resource resource;
public void setDelegate(FlatFileItemReader<T> delegate) {
this.delegate = delegate;
}
/**
* This delegates to the read method of the <code>delegate</code>
*/
public synchronized T read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return this.delegate.read();
}
public void close() {
this.delegate.close();
}
public void open(ExecutionContext executionContext) {
this.delegate.open(executionContext);
}
public void update(ExecutionContext executionContext) {
this.delegate.update(executionContext);
}
@Override
public void setResource(Resource resource) {
this.resource = resource;
}
writer class:
public class IngestionWriter implements ItemWriter<Person> {
@Override
public void write(List<? extends Person> items) throws IOException {
//logic to set into db
}
org.springframework.batch.item.ReaderNotOpenException:Reader must be open before it can be read.:org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195)
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83)
at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
谢谢!!!
AbstractemCountingItemStreamItemReader实现不是线程安全的。您可以尝试解决这个问题,实现自己的读取器,并同步改变共享状态的方法,如close()、open()和update()。
null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?
我知道子进程是进程,而不是线程。我使用了错误的语义,因为大多数人在谈到“多线程”时都知道您的意图。所以我会把它保留在标题中。 想象一下这样一个场景:使用一个自定义函数或模块,您连续有多个类似和复杂的事情要做。使用所有可用的核心/线程(例如8/16)非常有意义,这就是的目的。 理想情况下,您需要多个同时工作的人员,并向一个控制器发送/从一个控制器发送/回调消息。 node cpool、fork po
我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出
我想编写一个spring boot批处理应用程序,其中我有一个充满事件的数据库表。我想做的是有一个多线程的spring boot批处理应用程序,它将以这种方式工作: 我想有5个线程运行,每个线程将保留一个偏移量来跟踪它读取的事件,以便没有其他线程再次读取相同的事件。我想怎么做: 所以我希望能够在数据库表中为每个线程保留偏移量。有没有办法让Spring Boot环境以这种方式工作?
以下是我的步骤: 它工作得很好。 现在,我需要处理bt“chuck专用线程”。 我添加了以下配置: 问题出现在这里,因为我收到了这样的信息: 或者 我相关的是: 我的数据源是: 和财产: 有什么想法吗?
我有一个要求,我想在下面的场景中使用Spring批处理框架。 我有一个在交易日期列上分区的表。我想使用Spring批处理框架的阅读器、处理器和写入器来处理该表的记录。我想做的是根据交易日期创建单独的线程进行读取、写入和处理。假设有4个交易日期,那么我想为单独的交易日期创建4个单独的线程。在每个线程中,阅读器将从该交易日期的表中读取记录,在处理器中丰富记录,然后在写入器中发布/写入。 我是Sprin