当前位置: 首页 > 知识库问答 >
问题:

在Spring Batch中读取一个项目并将项目列表写入文件

谢旻
2023-03-14

我正在尝试制作一个Spring批处理作业,
Reader从数据库读取一个项目
Processor创建项目列表
UnpackingItmeWriter接受项目列表,并将单个项目发送给FlatFileItemWriter<下面是我的代码。我从这个答案中得到了这个答案。我在这个配置中没有做正确的事情,因为当我运行它时,作业甚至没有启动。请给我指出正确的方向。

    @EnableBatchProcessing
    public class CreateFile extends ApplicationCommonConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public DataSource dataSource;

    @Bean
    @StepScope
    public JdbcCursorItemReader<ItemA> reader(
            @Value("#{jobParameters}") Map<String, JobParameter> jobParameters) {
        JdbcCursorItemReader<ItemA> reader = new JdbcCursorItemReader<ItemA>();

        final String QUERY_SELECT = "SELECT * from table"
        reader.setDataSource(dataSource);
        reader.setSql(QUERY_SELECT);
        reader.setRowMapper(new ItemeDeliveryRowMapper());

        return reader;
    }

    public class ItemRowMapper implements RowMapper<ItemA> {

        @Override
        public ItemAmapRow(ResultSet rs, int rowNum) throws SQLException {

            ItemA item= new ItemA();

            item.setCode(rs.getString("Code"));
            return item;
        }

    }



    @Bean
    @StepScope
    public ItemProcessor<ItemA, List<ItemA>> processor(
            @Value("#{jobParameters}") Map<String, JobParameter> jobParameters, @Value("#{stepExecution}")

            StepExecution stepExecution) {

        return new ItemAProcessor();
    }

    public class ItemAProcessor
            implements ItemProcessor<ItemA, List<ItemA>> {

        @Override
        public List<ItemA> process(ItemA item) throws Exception {
            List<Item> itemList = new ArrayList<ItemA>();
            List<String> IdList = new ArrayList<>();

        // creating List of ItemA from one ItemA

            return itemList;
        }
    }


        public class ListUnpackingItemWriter<ItemA> implements ItemWriter<List<ItemA>>, ItemStream, InitializingBean {

        private ItemWriter<ItemA> delegate;

        @Override
        public void write(final List<? extends List<ItemA>> lists) throws Exception {
             List<ItemA> consolidatedList = new ArrayList<>();
            for ( List<ItemA> list : lists) {
                consolidatedList.addAll(list);
            }
            delegate.write(consolidatedList);
        }

        @Override
        public void afterPropertiesSet() {
            Assert.notNull(delegate, "You must set a delegate!");
        }

        @Override
        public void open(ExecutionContext executionContext) throws ItemStreamException {
            if (delegate instanceof ItemStream) {
                ((ItemStream) delegate).open(executionContext);
            }
        }

        @Override
        public void update(ExecutionContext executionContext) throws ItemStreamException{
            if (delegate instanceof ItemStream) {
                ((ItemStream) delegate).update(executionContext);
            }
        }

        @Override
        public void close() {
            if (delegate instanceof ItemStream) {
                ((ItemStream) delegate).close();
            }
        }

        public void setDelegate(ItemWriter<ItemA> delegate) {
            this.delegate = delegate;
        }


    }


    @Bean
    @StepScope
    public FlatFileItemWriter<ItemA> flatWriter() {

        FlatFileItemWriter<ItemA> writer = new FlatFileItemWriter<ItemA>();

        writer.setResource(new FileSystemResource("C:/dev/doc.txt"));

        writer.setHeaderCallback(new FlatFileHeaderCallback() {

            @Override
            public void writeHeader(Writer writer) throws IOException {

                writer.write("Header");

            }
        });

        writer.setLineAggregator(new DelimitedLineAggregator<Item>() {
            {
                setDelimiter("|");
                setFieldExtractor(new BeanWrapperFieldExtractor<Item>() {
                    {
                        setNames(new String[] { "Code" });
                    }
                });
            }
        });

        writer.setFooterCallback(new FlatFileFooterCallback() {

            @Override
            public void writeFooter(Writer writer) throws IOException {
                writer.write("Footer");

            }
        });

        return writer;

    }



    @Bean
    @StepScope
    public ItemWriter<List<ItemA>> writer(@Value("#{jobParameters}") Map<String, JobParameter> jobParameters) {
        ListUnpackingItemWriter<ItemA> listUnpackingItemWriter = new ListUnpackingItemWriter<ItemA>();
        listUnpackingItemWriter.setDelegate(flatWriter());
        return listUnpackingItemWriter;
    }

    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<ItemA> reader,
            ItemProcessor<ItemA, List<ItemA>> processor,
            ItemWriter<List<ItemA>> flatItemWriter) {
        return stepBuilderFactory.get("step1").<ItemA, List<ItemA>>chunk(1).reader(reader)
                .processor(processor).writer(flatItemWriter).build();
    }

    @Bean
    public Job createFileJob(JobBuilderFactory jobs, Step s1) {

        return jobBuilderFactory.get("createFileJob").incrementer(new RunIdIncrementer())
                .listener(listener()).flow(s1).end().build();
    }



}

编辑:printStack

2018-11-11 22:17:53 [main] INFO  c.t.t.l.Launcher - inside try
2018-11-11 22:17:54 [main] INFO  c.t.t.c.CreateFile - inside JdbcCursorItemReader
2018-11-11 22:17:55 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step step1 in job createJob
org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
    at org.springframework.batch.item.file.FlatFileItemWriter.write(FlatFileItemWriter.java:255) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at com.stackoverflow.spring.config.CreateFile$ListUnpackingItemWriter.write(CreateFile.java:209) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_181]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) ~[spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) ~[spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at com.sun.proxy.$Proxy19.write(Unknown Source) ~[na:na]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:175) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:151) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:274) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:199) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_181]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at com.sun.proxy.$Proxy26.run(Unknown Source) [na:na]
    at com.stackoverflow.spring.launchers.Launcher.Create(Launcher.java:701) [classes/:na]
    at com.stackoverflow.spring.launchers.Launcher.main(Launcher.java:73) [classes/:na]

共有2个答案

锺星腾
2023-03-14

列表UnpackingItemWriter中删除ItemStream(及其实现),并使用flatWriter()注册为流。stream()因此SB将自动管理平面文件流

缑赤岩
2023-03-14

刚刚调用mehtodopen()参数new ExefftionContext()就在delegate.write(合并列表);之前,以打开编写器

open (new ExecutionContext ());
delegate.write(consolidatedList);
 类似资料:
  • 问题内容: 到目前为止,我已经弄清楚了如何导入文件,创建新文件以及使列表随机化。 我在从列表中随机选择50个项目以写入文件时遇到麻烦吗? 因此,如果总随机文件为 我想要3个文件(out_file1 | 2 | 3),其中第一个随机集为3,第二个随机集为3,第三个随机集为3(在此示例中,但我要创建的文件应该有50个) 因此,不会包含最后一个“ 4”,这很好。 如何从随机选择的列表中选择50? 更好的

  • 所以我有一个多项目设置,看起来像这样

  • 问题内容: 我正在做一个多模块项目。我们正在其他几个模块中的一个模块中使用appCtx.xml。 当前的问题是它们并不总是彼此同步。 当有人修改文件并构建项目时,就会发生这种情况,这样做的人可能会忘记将其复制到另一个模块,从而导致问题。 如何将src / main / resources中的appCtx.xml从项目A复制到项目B中的src / main / resources? 问题答案: 您可

  • 问题内容: 我想将非常大的文件读入node.js的JavaScript数组中。 因此,如果文件是这样的: 我将拥有数组: 该函数将如下所示: 因此,将其全部加载为字符串然后拆分的想法是不可接受的。 问题答案: 如果您可以将最终数据拟合到数组中,那么是否也不能像建议的那样将其拟合为字符串并进行拆分?无论如何,如果您希望一次只处理一行文件,也可以尝试如下操作: 编辑:( 作为对 phopkins的 评