当前位置: 首页 > 知识库问答 >
问题:

使用分区步骤在spring批处理中访问@JobScope bean

桂德义
2023-03-14

是否有方法访问分区步骤中定义为JobScope的bean?

我们将http客户端bean定义为JobScope,因为每个作业都是唯一的,但它是动态创建的,我们需要在从属步骤中使用它来发出post请求。当我们自动连线我们得到的一切

Error creating bean with name 'scopedTarget.captureErpStockTasklet':
Scope 'step' is not active for the current thread; consider defining a 
scoped proxy for this bean if you intend to refer to it from a singleton;
nested exception is java.lang.IllegalStateException: No context holder 
available for step scope

这里是作业配置类(我删除了所有不需要的内容,只剩下分区步骤和客户机类的配置,它们位于作业范围中,因为在每次作业运行时都需要特定于作业:

@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = {"com.example.importjob"})
public class FrozenFileImportJobConfig {
@Value("${thread.pool:10}")
private int threadPoolSize;
@Value("${partitioner.max.thread.pool:10}")
private int maxThreadPoolSize;
@Value("${grid.size:10}")
private int gridSize;
@Value("${client.url:some_url}")
private int url;
@Value("${client.id:client_id}")
private int clientId;
@Value("${client.secret:secret}")
private int clientSecret;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
 @Autowired
private ErpStockIdPartitioner erpStockIdPartitioner;
@Autowired
private CaptureErpStockTasklet captureErpStockTasklet;
@Bean(destroyMethod = "destroy")
@JobScope
public Client client() {
  return new Client(url, clientId, clientSecret);
}

public ExecutionContextPromotionListener contextPromotionListener() {
  final ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
  executionContextPromotionListener.setKeys(new String[] {"erp_stock_ids"});
  return executionContextPromotionListener;
}

public Step captureErpStockDBTaskletStep() {
  return stepBuilderFactory.get("captureErpStockDBTaskletStep").tasklet(captureErpStockTasklet).build();
}

@Bean
public ThreadPoolTaskExecutor erpStockIdTaskExecutor() {
  final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  threadPoolTaskExecutor.setCorePoolSize(threadPoolSize);
  threadPoolTaskExecutor.setMaxPoolSize(maxThreadPoolSize);
  threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
  return threadPoolTaskExecutor;
}
public Step partitioningCaptureStep() {
  return stepBuilderFactory.get("partitioningCaptureStep").partitioner(captureErpStockDBTaskletStep())
.partitioner("erpStockIdPartitioner", erpStockIdPartitioner).taskExecutor(erpStockIdTaskExecutor())
.gridSize(gridSize).allowStartIfComplete(true).build();
}
@Bean
public Job riverIslandFrozenFileImportJob() {
  return jobBuilderFactory.get("riverIslandFrozenFileImportJob").start(partitioningCaptureStep()).build();
}
}

下面是CaptureerStockDBTasklet,它作为主分区CaptureStep的分区步骤调用

@Component
@StepScope
public class CaptureErpStockTasklet implements Tasklet {
  private static final Logger LOG = LoggerFactory.getLogger(CaptureErpStockTasklet.class);
  @Value("#{jobParameters[" + FtpJobParameters.ORGANIZATION_ID + "]}")
  private Long organizationId;
  @Autowired
  private ErpStockRepository erpStockRepository;
  @Autowired
  private Client client;
  @Override
  public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
    //importing of erp stock from DB, nothing special in that code so removed
     client.captureErpStock(stock);
     LOG.info("[{}] Finished importing ERP {}", organizationId, erpStock);
     return RepeatStatus.FINISHED;
  }
}

当我将Clinet的配置更改为StepScope时,它工作得很好,但我会为每个步骤创建客户端,这不是我想要的,我希望整个作业都有一个客户端。

共有1个答案

严锐
2023-03-14

我认为,当在分区步骤中使用作业范围bean时,这是spring批处理中的一个问题。

请参阅:https://jira.spring.io/browse/BATCH-2269和多线程访问Spring Batch 3.0中的作业范围bean

 类似资料:
  • 我正在尝试为分区配置Spring批处理步骤。这里很好的示例显示了一个关于“ID范围”的分区,但我不知道如何从“数据页”范围开始。 在我的顺序步骤中,我有: null

  • 我试图配置我的第一个多线程作业。我们有大约200,000条记录的主目录,我们需要处理。我想将文件分解为10个文件并处理它们。拆分文件tasklet工作正常 主步骤在我的配置中运行,但从步骤不运行。下面是我的配置。 分割者: MultiResourceItemReader: FlatFileItemWriter: 作业配置: 从属步骤配置: 请告知我做错了什么。我没有看到处理器urlFileItem

  • 给定一个使用分区的Spring批处理作业,是否可能有多个分区步骤? 例如: 在上述示例中,是否可以将另一个添加到(最好不需要为每个分区步骤提供分区器)?如果没有,是否有其他方法来配置多个步骤,这些步骤将针对每个分区逐个执行?

  • 我有以下Spring批处理作业配置: 我用以下代码开始这项工作: 如何从作业步骤访问参数?

  • 我们在POC中使用远程分区,处理大约2000万条记录。为了处理这些记录,slave需要一些大约5000行的静态元数据。我们当前的POC使用EhCache从数据库一次将元数据加载到从机中,并将其放入缓存中,这样子用户调用就可以从缓存中获取这些数据,从而获得更好的性能。 现在,由于我们使用远程分区,我们的从机大约有20个MDP/线程,因此每个消息侦听器首先调用以从数据库获取元数据,因此基本上每个远程机

  • 使用Spring Batch 3.0.4.Release。 我将作业配置为使用分区步骤。从机步骤使用块大小1。任务执行器中有六个线程。我使用从六到数百的各种网格大小来运行这个测试。我的网格大小是从StepExecutions的数量,我希望==我的分区器创建的ExecutionContexts的数量。 下面是Java配置代码: