当前位置: 首页 > 知识库问答 >
问题:

使用DeployerPartitionHandler在spring cloud任务中运行的spring批处理的launchWorker中获取空指针异常

谭翔
2023-03-14

我正在使用DeployerPartitionHandler(本地变体)对Spring批处理作业进行分区。当我运行作业时,在worker的启动步骤中得到一个空指针异常,如下所示

at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313)
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302)
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy248.run(Unknown Source)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:207)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:181)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:168)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:163)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:781)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:765)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:319)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204).

当作业开始时,我会看到下面的错误消息

调试[O.S.C.task.repository.support.SimpleTaskrepository]创建:TaskExecution{executionid=0,parentexecutionid=null,exitcode=null,taskname='mytask',starttime=tue nov24 17:14:48 IST 2020,endtime=null,exitmessage='null“,exteralexecutionid='null”,errormessage='null“,arguments=[--spring.profiles.active=local,

DEBUG[O.S.C.T.Batch.Partition.DeployerPartitionHandler]返回3个分区[main][]

此外,我在TaskRepository中看到了另一个条目,但这一次它在大多数列DEBUG[o.s.c.task.repository.support.simpleTaskRepository]中都是空值:TaskExecution{executionid=65,parentexecutionid=null,exitcode=null,taskname='null',starttime=null,endtime=null,exitmessage='null',exitmessage='null',externalexecutionid='null',

我认为,由于spring批处理是一个单一的任务,所以我希望TaskRepository中只有一个条目,但无法理解为什么要进行第二个条目。我使用Postgres并遵循示例代码中提到的相同步骤,但无法理解https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/partitioned-batch-job问题

下面是代码,以供参考

@Bean
@Profile("!worker")
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
        TaskRepository taskRepository) throws Exception {
    

    Resource resource = this.resourceLoader.getResource("maven://XXX:YYYY:0.0.2-SNAPSHOT");

    DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository);
      
      
    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");
    partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("MYTASK");

    return partitionHandler;
}


@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
    
    Random random = new Random();
    return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
        .start(step1(partitionHandler))
        .build();
}

@Bean
@Profile("!worker")
public Step step1(PartitionHandler partitionHandler) throws Exception {

    return this.stepBuilderFactory.get("step1")
        .partitioner(workerStep().getName(), partitioner())         
        .step(workerStep())
        .partitionHandler(partitionHandler)
        .build();
}


@Bean
@Profile("!worker")
public Partitioner partitioner() {
    return new Partitioner() {
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
            System.out.println("In partitioner");
            Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

            for (int i = 0; i < 3; i++) {
                ExecutionContext context1 = new ExecutionContext();
                context1.put("partitionNumber", i);

                partitions.put("partition" + i, context1);
            }
            
            return partitions;
        }
    };
}

@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
    
    return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}

@Bean

public Step workerStep() {

    return this.stepBuilderFactory.get("workerStep")
        .tasklet(workerTasklet(null))
    
        .build();
}

@Bean
@StepScope
public Tasklet workerTasklet(
    final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {

    return new Tasklet() {
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

            return RepeatStatus.FINISHED;
        }
    };
}

共有1个答案

佴德曜
2023-03-14

因为代码似乎有点不完整,所以不能百分之百确定。

但我可以解决类似的问题后,添加

  1. @enableTask

任务2和任务3将在另一个表中有一个映射,以说明它们与任务#1相关联

 类似资料:
  • 我在运行一个简单的helloworld示例drools项目时遇到了以下错误。 代码: 口水代码:

  • 我是Selenium的新手,在运行下面的脚本时,我会遇到空指针异常,一旦站点调用loads,测试就会停止运行。我无法理解例外的原因。 代码如下: 我得到以下错误: 请帮助我,我做错了什么,提前感谢!

  • 我试图存根这个方法:QueryUtils.to顺序(排序,根,构建器)和我正在做 但它进入queryUtils方法体,它会说Sort为null,并抛出一个NPE。但是,当它是存根时,为什么需要进入方法体?我以前没有遇到过这个问题,我认为它不应该关心该方法的内部逻辑是什么。

  • 此函数转换Hbase格式的数据 这是我在第125行得到的错误:hbaseputs.saveasnewapiHadoopDataSet(job.getconfiguration)

  • 我正在学习spring batch,并试图理解在异常期间项目处理器是如何工作的。 我试图通过在我的项目处理器中为一条记录手动抛出异常来模拟异常 现在根据跳过限制,当异常被抛出时,项目处理器将重新处理块并跳过抛出错误的项目,项目写入也将所有记录插入数据库,除了一条异常记录。 这一切都很好,因为我的处理器,它只是转换为大写字母名称,它可以运行很多次,但影响很大。 其他的选择是什么?

  • 让我们考虑一个<代码>父< /代码>类,它只包含一个<代码>整数< /代码>属性。我用一个空变量创建了6个父类对象。然后我将这些对象添加到列表中。 我想通过属性的值检索相应的对象。我使用了Java8流。 但是我得到了,所以我编辑了代码: 但是如果任何对象为null,我想抛出一个异常。如果列表中没有对象为null,那么我想从列表中检索相应的对象。 如何使用Java 8 Streams使用一条语句实现

  • 我正在写一个任务,将文件从一个公共文件夹中的特定位置解压,如下所示 现在,当我执行任务时,它会给我NullPointerException,而没有其他细节。我不知道还需要什么。 这是我在stackTrace中得到的信息: 原因:java。org上的lang.NullPointerException。格雷德尔。应用程序编程接口。内部的文件IdentityFileResolver。doResolve(

  • 我正在使用CSV reader从获取数据,并在使用DataProvider的测试函数中使用该数据。