当前位置: 首页 > 知识库问答 >
问题:

spring批处理中多线程进程中的ReaderNotOpenException

拓拔谭三
2023-03-14

我尝试在下面的步骤中使用多线程,但下面出现了一个异常:

我的步骤:代码:

<step id="generateRecordFile" >
          <tasklet>
             <chunk reader="inputFileReader" writer="outputFileWriter"
            commit-interval="100" task-executor="asyncTaskExecutor">
                  <streams>
            <stream ref="inputFileReader" />
                  </streams>
             </chunk>
          </tasklet>
        </step>

        <beans:bean id="asyncTaskExecutor"
            class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
Reader:    
        <bean id="inputFileReader"
                class="org.springframework.batch.item.file.MultiResourceItemReader"
                scope="step">
                <property name="resources" value="#{jobParameters['fileLocation']}" />
                <property name="delegate" ref="fileInputFileReader" />
            </bean>
            <bean id="fileProductFeeReader" class="<package>.SynchronizedItemStreamReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
                <property name="lineMapper">
                    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                        <!-- split it -->
                        <property name="lineTokenizer">
                            <bean
                                class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                                <property name="names"
                                    value="name,age,address,mobileno" />
                                <property name="delimiter">
                                    <util:constant
                                        static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
                                </property>
                            </bean>
                        </property>
                        <property name="fieldSetMapper">
                            <bean
                                class="com.services.extractor.Mapper">
                            </bean>
                        </property>
                    </bean>
                </property>
        </bean>

        <bean id="outputFileWriter"
                class="org.springframework.batch.item.support.CompositeItemWriter"
                scope="step">
                <property name="delegates">
                    <list>
                        <ref bean="routeWriter" />
                    </list>
                </property>
        </bean>

        <bean id="routeWriter"
                class="com.services.extractor.processor.IngestionWriter"
                scope="step">
            </bean> 

公共类SynchronizedItemStreamReader实现ResourceAwareItemReaderItemStream{

private FlatFileItemReader<T> delegate;
private Resource resource;


public void setDelegate(FlatFileItemReader<T> delegate) {
    this.delegate = delegate;
}

/**
 * This delegates to the read method of the <code>delegate</code>
 */
public synchronized T read()
        throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    return this.delegate.read();
}

public void close() {
    this.delegate.close();
}

public void open(ExecutionContext executionContext) {
    this.delegate.open(executionContext);
}

public void update(ExecutionContext executionContext) {
    this.delegate.update(executionContext);
}

@Override
public void setResource(Resource resource) {
    this.resource = resource;
}

    writer class:       
        public class IngestionWriter implements ItemWriter<Person> {
            @Override
            public void write(List<? extends Person> items) throws IOException {
                //logic to set into db
                }
org.springframework.batch.item.ReaderNotOpenException:Reader must be open before it can be read.:org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
    at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195)
    at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83)
    at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)

谢谢!!!

共有1个答案

咸高谊
2023-03-14

AbstractemCountingItemStreamItemReader实现不是线程安全的。您可以尝试解决这个问题,实现自己的读取器,并同步改变共享状态的方法,如close()、open()和update()。

 类似资料:
  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 我知道子进程是进程,而不是线程。我使用了错误的语义,因为大多数人在谈到“多线程”时都知道您的意图。所以我会把它保留在标题中。 想象一下这样一个场景:使用一个自定义函数或模块,您连续有多个类似和复杂的事情要做。使用所有可用的核心/线程(例如8/16)非常有意义,这就是的目的。 理想情况下,您需要多个同时工作的人员,并向一个控制器发送/从一个控制器发送/回调消息。 node cpool、fork po

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • 我想编写一个spring boot批处理应用程序,其中我有一个充满事件的数据库表。我想做的是有一个多线程的spring boot批处理应用程序,它将以这种方式工作: 我想有5个线程运行,每个线程将保留一个偏移量来跟踪它读取的事件,以便没有其他线程再次读取相同的事件。我想怎么做: 所以我希望能够在数据库表中为每个线程保留偏移量。有没有办法让Spring Boot环境以这种方式工作?

  • 以下是我的步骤: 它工作得很好。 现在,我需要处理bt“chuck专用线程”。 我添加了以下配置: 问题出现在这里,因为我收到了这样的信息: 或者 我相关的是: 我的数据源是: 和财产: 有什么想法吗?

  • 我有一个要求,我想在下面的场景中使用Spring批处理框架。 我有一个在交易日期列上分区的表。我想使用Spring批处理框架的阅读器、处理器和写入器来处理该表的记录。我想做的是根据交易日期创建单独的线程进行读取、写入和处理。假设有4个交易日期,那么我想为单独的交易日期创建4个单独的线程。在每个线程中,阅读器将从该交易日期的表中读取记录,在处理器中丰富记录,然后在写入器中发布/写入。 我是Sprin