当前位置: 首页 > 知识库问答 >
问题:

Spring Integration+Spring Batch:工作不会停止

石苏燕
2023-03-14

我想从ftp服务器上读取文件,然后将其保存到本地存储库中并从服务器上删除,运行读取文件的作业,在DB上找到一条记录,更改一个参数并保存。

Spring集成配置:

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer(DefaultFtpSessionFactory sessionFactory) {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
        fileSynchronizer.setRemoteDirectory(remoteDirectory);
        fileSynchronizer.setDeleteRemoteFiles(true);
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(cron = "*/5 * * * * ?"))
    public FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource(FtpInboundFileSynchronizer fileSynchronizer) throws Exception {
        FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(fileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File(localDirectory));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<>());
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "fileInputChannel")
    public FileWritingMessageHandler fileWritingMessageHandler() {
        FileWritingMessageHandler messageHandler = new FileWritingMessageHandler(new File(localDirectory));
        messageHandler.setOutputChannelName("jobLaunchRequestChannel");
        return messageHandler;
    }

    @ServiceActivator(inputChannel = "jobLaunchRequestChannel", outputChannel = "jobLaunchingGatewayChannel")
    public JobLaunchRequest jobLaunchRequest(File file) throws IOException {
        String[] content = FileUtils.readFileToString(file, "UTF-8").split("\\s+");
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("filename", file.getAbsolutePath())
                .addString("id", content[0]).addString("salary", content[1])
//                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
        return new JobLaunchRequest(increaseSalaryJob, jobParameters);
    }

    @Bean
    @ServiceActivator(inputChannel = "jobLaunchingGatewayChannel")
    public JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
        jobLaunchingGateway.setOutputChannelName("finish");
        return jobLaunchingGateway;
    }

    @ServiceActivator(inputChannel = "finish")
    public void finish() {
        System.out.println("FINISH");
    }
}

Spring批处理配置:

 @Bean
    public Job increaseSalaryJob(CustomJobListener listener, Step step1) {
        return jobBuilderFactory.get("increaseSalaryJob")
                .preventRestart()
                .listener(listener)
                .start(step1)
                .build();
    }

    @Bean
    public Step step1(ItemReader<Employee> reader) {
        return stepBuilderFactory.get("step1")
                .transactionManager(transactionManager)
                .<Employee, Employee> chunk(1)
                .reader(reader)
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public ItemReader<Employee> reader(@Value("#{jobParameters[id]}") Integer id) {
        log.info("reader");
        return () -> employeeService.get(id);
    }

    @Bean
    @StepScope
    public ItemProcessor<Employee, Employee> processor() {
        log.info("processor");
        return employee -> {
            log.info(employee.getName() + " had salary " + employee.getSalary());
            Integer salary = employee.getSalary() + 1;
            employee.setSalary(salary);
            log.info(employee.getName() + " have salary " + employee.getSalary());

            return employee;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Employee> writer() {
        log.info("writer");
        return employees -> {
            for (Employee employee : employees) {
                try {
                    employeeService.update(employee);
                    log.info(employee.getName() + " updated with salary " + employee.getSalary());
                } catch (ValidationException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    @Bean
    public MapJobRepositoryFactoryBean jobRepositoryFactoryBean(PlatformTransactionManager transactionManager) {
        return new MapJobRepositoryFactoryBean(transactionManager);
    }

    @Bean
    public JobRepository jobRepository(MapJobRepositoryFactoryBean jobRepositoryFactoryBean) throws Exception {
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

我会很高兴得到任何帮助。

共有1个答案

马弘和
2023-03-14

您需要确保您的读取器在某个时刻返回null。这就是该步骤解释没有更多的数据要处理并退出的方式(如果没有更多的步骤要运行,这又会停止周围的作业)。

也就是说,我看到面向块的步骤的输入是一个ID。对于这个用例,一个简单的tasklet就足够了,不需要具有单个输入记录和chunksize=1的面向块的tasklet。

 类似资料:
  • null GrantPermissionRule是否停止了为其他人所宣传的工作?

  • 问题内容: 我正在尝试使用Linux下的ProcessBuilder类将mp3文件解码为wav文件。由于某些原因,该过程不会停止,因此我必须手动取消它。 有人可以给我一个提示。我认为引用的代码很容易重现: jstack的输出 问题答案: 您需要清空进程的输出(通过)和错误(通过)流,否则可能会阻塞。 引用过程文档: 由于某些本机平台仅为标准输入和输出流提供有限的缓冲区大小,因此未能及时写入子流程的

  • 问题内容: 我想停止redis服务器,并且它一直在运行。我正在使用redis-2.6.7 检查它是否正在运行: 它显示“ … bind:地址已在使用中”,因此它已在运行。 我努力了 它只是挂起,没有任何反应。我中断检查,是的,它仍在运行。 我努力了 我得到“无法打开配置文件’停止’” 我试过了: 仍在运行。 我想要停止它的原因是,当我尝试通过Python设置或获取值时,它只是挂起了。所以我认为我会

  • 对于最近的一个项目,我们使用Cycle2。我已升级到最新版本。我们正在使用Sitecore呈现内容。无论我采取何种方法(见下文),我都无法使autostop正常工作。我们每个幻灯片放映有2-3张幻灯片,我们希望它按以下模式移动:1-2-3-1。 我们是否在如下规则中将其呈现给自动播放: 或者如果我们让它在JS中以编程方式播放,而不使用“循环幻灯片”类: 我们正在正确加载JQuery。 在对功能做了

  • 问题内容: 基本上,一切似乎都可以正常运行并启动,但是由于某些原因,我无法调用任何命令。我已经很轻松地环顾了一个小时,然后看了一些示例/观看视频,但我终生无法找出问题所在。代码如下: 我在中拥有的调试输出实际上可以正常工作并做出响应,并且整个bot都可以运行,没有任何异常,但是它只是不会调用命令。 问题答案: 覆盖提供的默认值将禁止运行任何其他命令。要解决此问题,请在的末尾添加一行。例如: 默认值

  • 我正在用Unity 2018.2.2(最新版本于2018年8月8日)开发一款android小游戏。我在这个游戏上工作了几周,但现在,Unity在试图找到我的JDK时毫无理由地失败了。当我分配文件夹时(编辑- [如果您看不到图像:“未能检测到Java版本。Android开发需要JDK 8(1.8)。安装JavaRunatime环境(JRE)是不够的。”] 第一个我想的是,它可能会因为JDK的更新而失