当前位置: 首页 > 知识库问答 >
问题:

Spring批处理条件步骤,未正确执行

程祯
2023-03-14

我对spring batch基本上是新手。第一步,我从SFTP服务器获取一个文件,并使用JSch将其传输到本地目录。第二步,我读取本地目录中的文件,并将内容写入数据库。问题在于,在某些情况下,只执行saveDruchAltransaction(步骤2),而不执行getRemoteFile(步骤1)

配置类:

@Qualifier("writer")
@Autowired
JdbcBatchItemWriter<TopUpTransaction> writer;

@Qualifier("reader")
@Autowired
FlatFileItemReader<TopUpTransaction> reader;

@Bean(name="importWithdrawalTransactionJob")
public Job importWithdrawalTransactionJob() throws Exception {
    return jobBuilderFactory.get("importWithdrawalTransactionJob")
            .incrementer(new RunIdIncrementer())
            .start(getRemoteFile()).on(ExitStatus.FAILED.getExitCode()).end()
            .next(saveWithdrawalTransaction()).end()
            .build();
}

@Bean
public GetRemoteFileTasklet sFtpGetRemoteFilesTasklet() {
    return new GetRemoteFileTasklet();
}

@Bean
public Step getRemoteFile() throws Exception {
    return stepBuilderFactory.get("getRemoteFile")
            .tasklet(sFtpGetRemoteFilesTasklet())
            .build();
}

@Bean
public Step saveWithdrawalTransaction() throws Exception {
    return stepBuilderFactory.get("saveWithdrawalTransaction")
            .<TopUpTransaction, TopUpTransaction> chunk(1)
            .reader(reader)
            .processor(processor())
            .writer(writer)
            .build();
}

读者:

@Bean
public FlatFileItemReader<TopUpTransaction> reader() throws Exception {

File folder = new File(csvFilePath);
File filesToProcess[] = folder.listFiles();
FlatFileItemReader<TopUpTransaction> reader = new FlatFileItemReader<TopUpTransaction>();

if (filesToProcess.length > 0) {
for (File file : filesToProcess) {
    if (file.isFile()) {
        System.out.println(file.getName());
        reader.setResource( new FileSystemResource(file.getAbsolutePath()));
        reader.setLineMapper(new DefaultLineMapper<TopUpTransaction>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "lineIdentifier","transactionDate", "beneAccNum", "beneAccName", "senderName", "senderInfo",
                        "creditedAmountWithCurrency", "originalAmountWithCurrency", "netAmountWithCurrency", 
                        "charges", "otherInfo1", "otherInfo2",
                        "ubTransactionId"});
                setDelimiter("|");
            }});
            setFieldSetMapper(new CustomTopUpMapper());          
        }});
        reader.setRecordSeparatorPolicy(new CustomReaderPolicy());
        System.out.println("end read");
        file.renameTo(new File(file.getName() + "-processed"));
    }
}

作家:

@Bean(name="writer")
public JdbcBatchItemWriter<TopUpTransaction> writer() {

    StringBuffer insertWithdrawalTransaction = new StringBuffer();
    insertWithdrawalTransaction.append("INSERT INTO top_up_transactions (customer_id,txn_date, bene_acc_num, bene_acc_name,");
    insertWithdrawalTransaction.append("sender_name, sender_info, credited_amount, credited_currency,");
    insertWithdrawalTransaction.append("original_amount, original_currency,");
    insertWithdrawalTransaction.append("net_amount, net_currency, charges, other_info_1, other_info_2,");
    insertWithdrawalTransaction.append("ubp_txn_id,status) ");
    insertWithdrawalTransaction.append("VALUES (:customerId, :transactionDate,:beneAccNum,");
    insertWithdrawalTransaction.append(":beneAccName,:senderName,:senderInfo,:creditedAmount,:creditedAmountCurrency,");
    insertWithdrawalTransaction.append(":originalAmount,:originalAmountCurrency,");
    insertWithdrawalTransaction.append(":netAmount,:netAmountCurrency,:charges,:otherInfo1,:otherInfo2,");
    insertWithdrawalTransaction.append(":ubTransactionId,:status)");
    System.out.println("start write");
    JdbcBatchItemWriter<TopUpTransaction> writer = new JdbcBatchItemWriter<TopUpTransaction>();
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<TopUpTransaction>());
    writer.setSql(insertWithdrawalTransaction.toString());
    writer.setDataSource(dataSource);
    System.out.println("end write");

    return writer;
}

微线程:

public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
{
    Session session = null;
    Channel channel = null;
    //deleteLocalFiles();
    System.out.println("=====start tasklet=====");
    try {
        JSch ssh = new JSch();
        session = ssh.getSession("username", "host", 22);
        session.setPassword("password");
        session.setConfig("StrictHostKeyChecking", "no");
        session.connect();
        channel = session.openChannel("sftp");
        channel.connect();
        ChannelSftp sftp = (ChannelSftp) channel;
        sftp.get("*.rpt", "src/main/resources/files/");
    } catch (JSchException e) {
        e.printStackTrace();
    } catch (SftpException e) {
        e.printStackTrace();
    } finally {
        if (channel != null) {
            channel.disconnect();
        }
        if (session != null) {
            session.disconnect();
        }
    }

我不知道该怎么做...我一直在尝试我从各地找到的东西。请帮忙谢谢!

共有1个答案

阳福
2023-03-14

您可以使用JobExecutionDeciderdecision在故障期间控制流程。

对于Xml,您可以提到这一点

<decision id="decision" decider="decider">
            <next on="FAILED" to="step2" />
            <next on="COMPLETED" to="step3" />
        </decision>

jobs.get("job")
           .preventRestart()
           .listener(failedCleanupListener)
           .flow(step1)
           .next(step2)
           .next(step3)
           .end()
           .build();
 类似资料:
  • 我正在尝试修复Spring Batch中的一个问题,这个问题最近一直困扰着我们的系统。我们有一份工作,在大多数情况下都很好。下载和处理数据是一个多步骤的工作。 问题是有时工作会爆棚。也许我们试图连接到的服务器抛出了错误,或者我们在工作进行到一半时关闭了服务器。此时,下次我们的quartz调度程序尝试运行该作业时,它似乎什么也不做。以下是此作业定义的删节版本: 委婉地说,我是Spring Batch

  • 我试图配置我的第一个多线程作业。我们有大约200,000条记录的主目录,我们需要处理。我想将文件分解为10个文件并处理它们。拆分文件tasklet工作正常 主步骤在我的配置中运行,但从步骤不运行。下面是我的配置。 分割者: MultiResourceItemReader: FlatFileItemWriter: 作业配置: 从属步骤配置: 请告知我做错了什么。我没有看到处理器urlFileItem

  • 我正在做一个Spring批处理工作,它包含几个步骤(超过10个)。 谢谢你的回答。

  • 我正在做一个包括Spring批处理的项目,在复制代码片段之前,我要简单地总结一下这项工作是如何使用cron的。 cron在我的项目上调用restapi(@PostMapping(“/jobs/external/{jobName}”) 在post方法中,我获取作业并执行它 在每次执行中,我都应该执行一个步骤 该步骤包含一个读卡器(对弹性API的外部rest调用以获取文档)和一个处理器 现在我的问题是

  • 使用Spring Batch 3.0.4.Release。 我将作业配置为使用分区步骤。从机步骤使用块大小1。任务执行器中有六个线程。我使用从六到数百的各种网格大小来运行这个测试。我的网格大小是从StepExecutions的数量,我希望==我的分区器创建的ExecutionContexts的数量。 下面是Java配置代码:

  • 我有一个Spring批处理作业。它的阅读器通过一些复杂的sql从数据库中读取一些记录。现在,对于我从数据库收到的每一条记录,我必须再打一个表来获取一些属性。注意:-我不能在阅读器步骤中加入这个表sql。所以这些获取的属性以及现有记录中的数据需要写入文件。这可以用Spring批处理完成吗?