当前位置: 首页 > 知识库问答 >
问题:

如何在运行并行作业时安全地将参数从Tasklet传递到步骤

麻学博
2023-03-14

我试图将参数从tasklet安全地传递到同一工作中的一个步骤。

我的工作包括三个小任务(step1、step2、step3),一个接一个,最后是step4(处理器、阅读器、写入器)

这项工作正在并行执行多次。

在tasklet的第1步中,我通过web服务评估param(hashId),然后将其传递到整个链,直到我的阅读器(第4步)

在步骤3中,我创建了一个名为: filePath的新参数,它基于hashid,我将它作为文件资源位置发送到步骤4(阅读器)

我正在使用stepExecution传递这个参数(hashId和filePath)。

我通过tasklet尝试了三种方法:

要将参数(hashId从步骤1传递到步骤2,从步骤2传递到步骤3),我要执行以下操作:

chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("hashId", hashId);

在第4步中,我将基于hashId填充文件路径,并通过这种方式将其传递到我的最后一步(即读处理器和写程序)

public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..

    @Override
     public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext    
     executionContext) throws IOException {

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");

          ...

filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys

        chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("filePath", filePath);
    } 

logger.info("filePath + "for hashId=" + hashId);

}
@Override
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

请注意,在完成该步骤(第3步)之前,我正在打印hashId和filePath值。根据日志,它们是一致的,并按预期填充

我还在我的阅读器中添加了日志,以查看日志中的参数。

@Bean
    @StepScope
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
              logger.info("test filePath="+filePath+");

        return itemReader;
    }

当我执行这个作业10次时,我可以看到并行执行时,param filePath值被其他作业filePath值填充

以下是我使用executionContextPromotionListener升级作业密钥的方式:

工作定义:

 @Bean
    public Job processFileJob() throws Exception {
        return this.jobs.get("processFileJob").
                start.(step1).
                next(step2)
                next(downloadFileTaskletStep()). //step3
                next(processSnidFileStep()).build();  //step4

    }

第3步定义

  public Step downloadFileTaskletStep() {
        return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
    }


  @Bean
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
        ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
        executionContextPromotionListener.setKeys(new String[]{"filePath"});
        return executionContextPromotionListener;
    }

同样的结果导致参数混乱

我可以通过spring batch数据库表:batch_job_execution_context跟踪结果。简而言之:

在这里,您可以看到由hashid构建的文件补丁与源hashid//不正确的记录不同///

{“map”:[{“entry”:[{“string”:“totalRecords”,“int”:5},{“string”:“segmentId”,“long”:13},{“string”:[“filePath”,“/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87f7f7aa.csv”],{“string”:“hashId”,“20df39d201fffc74423cff23789”]

现在如果我们检查其他记录,它们看起来不错。但总是有一两个搞砸了

//更正记录

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}   

知道为什么我有这些线程问题吗?

共有1个答案

宫子晋
2023-03-14

要查看尝试的方法,请执行以下操作:

  • 方法1-编辑JobParametersJobParameters在作业中是不可变的,因此不应尝试在作业执行期间修改它们。
  • 方法2-编辑JobParametersv2方法2实际上与方法1相同,您只是以不同的方式获取对JobParameters的引用。
  • 方法3-使用ExecutionContext促销监听器。这是正确的方法,但您做的事情不正确。执行上下文促销监听器查看步骤的执行上下文并将您指定的键复制到作业的执行上下文。您直接将键添加到作业执行上下文中,这是一个坏主意。

简而言之,方法3最接近正确,但您应该将要共享的属性添加到步骤的ExecutionContext,然后配置ExecutionContextPromotionListener,以将适当的键升级到作业的ExecutionContext

代码将更新如下:

chunkContext.getStepContext()
            .getStepExecution()
            .getExecutionContext()
            .put("filePath", filePath);
 类似资料:
  • 我试图将一个参数从命令行传递到java类。我关注了这篇文章:http://gradle.1045684.n5.nabble.com/gradle-application-plugin-question-td5539555.html,但是代码对我来说不起作用(也许它不适合JavaExec?)。以下是我所尝试的: 上面硬编码的args值的输出是: 任何关于如何将命令行参数传递给gradle的指针都很感

  • 问题内容: 我希望能够在运行selenium时将参数传递给google chrome。我怎样才能做到这一点?运行selenium时,我使用Java命令Java -jar selenium.jar 如何将诸如no-sandbox,ignore-gpu-blacklist,enable-webgl之类的参数传递给google chrome可执行文件。谢谢 我正在用守夜人 问题答案: 您可以使用启动驱动

  • 我们正在实施Spring批量作业, 我们需要将作业参数从Client/MASTER传递给SLAVE。CLIENT/MASTER是我们的作业和分区代码所在的位置。我们使用传递JOB参数的J Unit调用JOB。 SLAVE是定义所有步骤及其实现(读取器Writer和处理器)的地方。 我们能够以独立的方式实现这一点,但不能与客户一起实现 我们正在使用Weblogic和Spring集成以及JMS来实现同

  • 问题内容: 方法context.getBean(name,user)的文档说 允许指定显式构造函数参数/工厂方法参数 但是无论我做什么(尝试一切),使用最合理的设置在初始化期间加载Bean时都会得到以下信息: 注释说可以做到的,但是如果您在该bean的xml定义中指定构造函数参数,则失败。 问题答案: 在javadoc中说: args-在使用静态工厂方法的显式参数创建原型时使用的参数。 因此,be

  • 假设我在服务中有一个可用的字符串test=“value”,并希望将其发送到我正在运行的mainactivity。如何安全地发送数据?我在寻找一个最简单的例子,其他应用程序看不到数据。当我看意图时,它会说: 广播是任何应用程序都可以接收的消息。系统为系统事件提供各种广播,例如当系统启动或设备开始充电时。您可以将广播发送到 但是,我只想将其发送到我的应用程序,因为它包含私人数据。在我的服务中,我尝试了

  • 我有一个java和groovy类,由gradle Task运行。我已经设法使它工作,但我不喜欢在命令行中传递参数的方式。下面是我当前通过命令行执行的方法: my build。gradle代码采用这些参数如下所示: 我想知道有没有一种方法可以更好地传递参数,比如: 以及如何在我的java类中使用它们。