当前位置: 首页 > 知识库问答 >
问题:

从Spring批处理器调用异步REST api

钱选
2023-03-14

我写了一个处理列表列表的Spring批处理作业。

Reader返回List of List。处理器处理每个ListItem并返回已处理的List。Writer将内容写入DB并从List of List中sftp。

我有一个从spring批处理程序调用异步REST api的用例。在ListenableFuture响应上,我实现了LitenableFutureCallback来处理成功和失败,这正如预期的那样,但在异步调用返回某些内容之前,ItemProcessor不会等待来自异步api的回调,而是将对象(列表)返回给编写器。

我不知道如何实现和处理异步调用从项目处理器。

我确实读过关于AsyncItem处理机和AsyncItemWriter,但我不确定这是否是我应该在这个场景中使用的东西。

我还想过从AsyncRestTemplate对ListenableFuture响应调用get(),但根据文档,它将阻止当前线程,直到收到响应为止。

我正在寻求一些关于如何实现这一点的帮助。下面是代码片段:

处理器:

public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {

... Initialization code

@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
    logger.info("Entering MailingDocsEntity processor");


    List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);


    for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
        System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());

       ..code to get the file

         //If the file is not a pdf convert it
         String fileExtension = readFromSpResponse.getFileExtension();
         String fileName = readFromSpResponse.getFileName();
         byte[] fileBytes = readFromSpResponse.getByteArray();

         try {

             //Do checks to make sure PDF file is being sent
             if (!"pdf".equalsIgnoreCase(fileExtension)) {
                 //Only doc, docx and xlsx conversions are supported

                     ...Building REquest object
                     //make async call to pdf conversion service
            pdfService.convertDocxToPdf(request, mailingDocsEntity);

                 } else {
                     logger.error("The file cannot be converted to a pdf.\n"
                        );

                 }
             }


         } catch (Exception ex){
             logger.error("There has been an exception while processing data", ex);

         }

    }
    return synchronizedList;
}

}

异步PdfConversion服务类:

@Service
public class PdfService{


   @Autowired
   @Qualifier("MicroServiceAsyncRestTemplate")
   AsyncRestTemplate microServiceAsyncRestTemplate;

   public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){

        ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();


            try {

                HttpHeaders headers = new HttpHeaders();
                headers.setContentType(MediaType.APPLICATION_JSON);

                HttpEntity<?> entity = new HttpEntity<>(request, headers);



                ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);

                ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
                microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>()  {

                    @Override
                    public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
                        ...code to do stuff on success


                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        pdfResponse.setMessage("Exception while retrieving response");

                    }
                });

            } catch (Exception e) {
                String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
                pdfResponse.setMessage(message);
                logger.error(message, e);
            }


        return pdfResponse;
    }

}

我最初的批处理作业是同步的,我正在转换为异步的,以加快处理速度。我确实试图寻找类似的问题,但找不到足够的信息。非常感谢您的指点和帮助。

非常感谢。

共有1个答案

查修谨
2023-03-14

我确实读过关于AsyncItem处理机和AsyncItemWriter,但我不确定这是否是我应该在这个场景中使用的东西。

是的,AsyncItemProccherAsyncItemWriter适合您的用例。AsyncItemProccher将在新线程上为一个项目执行委托ItemProcencer逻辑(您的Rest调用)。一旦项目完成,结果的Future将传递给要写入的AsynchItemWriter。然后AsynchItemWriter将打开Future并写入该项目。这些组件的优点是您不必自己处理Future的包装,展开等。

您可以找到:

  • 更多详情请点击此处:https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#asynchronous-处理器
  • 举个例子:https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java

希望这有帮助。

 类似资料:
  • 我正在用异步JobLauncher在Spring Batch中配置一个(长时间运行的)作业,我有两个RESTendpoint: null 谢谢朱利奥

  • 我的FlatFileItemWriter回调有一个奇怪的问题。我有一个自定义ItemWriter实现FlatFileFolterCallback和FlatFileHeaderCallback。因此,我在我的FlatFileItemWriter中设置页眉和页脚回调如下: ItemWriter Bean FlatFileItemWriter Bean 步进豆 我的writeFooter、writeHe

  • 我对Spring批处理跳过逻辑有一些问题。我已经配置了一个作业的步骤来跳过两个异常(SQLIntegrityConstraintViolation异常和乐观锁定失败异常): 但当作业运行时,由于我将其配置为跳过的异常,作业以未知状态完成: 我做错什么了吗?我希望这一步跳过负责抛出其中一个异常的项,并继续处理,以便以完成状态结束。

  • 我需要多次调用webservice并传递使用来自多个表的数据创建的数据 UI->控制器->服务->(获取数据(表1,表2)并运行一些验证)1。如果验证失败-返回错误消息并停止。2.如果验证通过-调用JobLauncher并返回“任务启动”消息。 在异步作业中,我想到遵循2个步骤。 > bulkinsertstep > 我需要调用DB查询2个更多的表(Table3、table4)并创建一个大的数据集

  • 我定义了一个块,提交间隔为10,跳过限制为10。处理器类通过应用一些算术运算来操作字段。其中一条记录(比如第6条记录)在处理器类中发生异常。在此之后,再次处理1到5条记录,跳过第6条记录,处理7到10条记录,并将其写入XML(自定义XML编写器类)。由于处理器处理1-5条记录两次,因此预期字段值计算两次是错误的。您能否建议一种解决方案,让处理器只处理一次记录,只跳过失败的记录,并将处理后的记录写入

  • 关于skip,我有一个非常基本的问题。我正在使用spring示例提供的spring batch simple cli项目,并试图理解跳过行为。它有一个非常基本的示例读取器,可以读取字符串数组(我将其修改为从Hellowworld 1到Hellowworld 10的10个字符串列表中读取),还有一个基本的编写器,可以登录到控制台。writer抛出java。每写一次都有例外。我在作业配置中增加了4个跳