当前位置: 首页 > 知识库问答 >
问题:

SpringBoot中的@async正在创建新线程,但控制器在返回响应之前等待async调用完成

谯嘉胜
2023-03-14

上下文

@EnableAsync(proxyTargetClass = true)
@EnableScheduling
@SpringBootApplication
public class CardsApplication {

    public static void main(String[] args) {
        SpringApplication.run(CardsApplication.class, args);
    }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }

}

控制器

@Validated
@RequiredArgsConstructor
public class CardEligibilityController {
    private final CardEligibilityService cardEligibilityService;

    @PostMapping("/check-eligibility")
    @CrossOrigin(origins = "*")
    public EligibilityResponse checkEligibility(@RequestBody @Valid Applicant applicant){
        return cardEligibilityService.eligibilityService(applicant);
    }
}

服务1

public interface CardEligibilityService {
    EligibilityResponse eligibilityService(Applicant applicant);
}

@Slf4j
@Service
@RequiredArgsConstructor
public class CardEligibilityServiceImpl implements CardEligibilityService {

    private final ThirdPartyEligibilityAdapter thirdPartyEligibilityAdapter;
    private final QueueService queueService;
    private final QueueMessageResponseService queueMessageResponseService;

    @Override
    public EligibilityResponse eligibilityService(Applicant applicant){
        EligibilityResponse eligibilityResponse = checkEligibility(applicant);
        queueService.pushMessage(queueMessageResponseService.createQueueResponse(applicant,eligibilityResponse));
        return eligibilityResponse;
    }

    private EligibilityResponse checkEligibility(Applicant applicant) {
        return thirdPartyEligibilityAdapter.getEligibility(applicant);
    }

}
public interface QueueService {
     void pushMessage(QueueMessage queueMessage);
     void retry();
}


@Service
@RequiredArgsConstructor
@Slf4j
public class QueueServiceImpl implements QueueService{

    private final List<QueueMessage> deadQueue = new LinkedList<>();


    //TODO check why async gets response stuck
    @Override
    @Async
    public void pushMessage(QueueMessage queueMessage){
        try {
            //Push message to a queue - Queue settings Rabbit/Kafka - then this could be
            //used by listeners to persist the data into DB
            log.info("message queued {} ", queueMessage);
        } catch (Exception e) {
            log.error("Error {} , queueMessage {} ", e, queueMessage);
            deadQueue.add(queueMessage);
        }
    }

   
**This method is a fault tolerance mechanism in case push to queue had any issues, The Local Method call to pushMessage isn’t the problem I also tried this by deleting retry method method**

    @Override
    @Scheduled(fixedDelay = 300000)
    public void retry() {
        log.info("Retrying Message push if there are any failure in enqueueing ");
        final List<QueueMessage> temp = new LinkedList<>(deadQueue);
        deadQueue.clear();
        Collections.reverse(temp);
        temp.forEach(this::pushMessage);
    }

}
public interface QueueMessageResponseService {
    QueueMessage createQueueResponse(Applicant applicant, EligibilityResponse eligibilityResponse);
}

@Service
public class QueueMessageResponseServiceServiceImpl implements QueueMessageResponseService {
    @Override
    public QueueMessage createQueueResponse(Applicant applicant, EligibilityResponse eligibilityResponse) {
        return new QueueMessage(applicant,eligibilityResponse);
    }
}

如果我在异步方法中添加thread.sleep(20);,这将按照预期工作,用户将得到响应,而无需等待async完成。但仍无法了解原因。

@Async
    public void pushMessage(QueueMessage queueMessage) {
        try {
            //Push message to a queue - Queue settings Rabbit/Kafka - then this could be
            //used by listeners to persist the data into DB
            Thread.sleep(20);
            log.info("message queued {} ", queueMessage);
        } catch (Exception e) {
            log.error("Error {} , queueMessage {} ", e, queueMessage);
            deadQueue.add(queueMessage);
        }
    }

共有1个答案

尉迟轶
2023-03-14

retry中的pushmessage的调用是本地调用。因此不涉及代理,方法是同步执行的。

您必须将async方法移动到它自己的类中。

 类似资料:
  • 我正在使用Jasmine 2.0和require.js。当我将异步代码放入beforeach函数中时,我无法使异步测试正常工作。在异步调用完成之前,我的it语句仍在运行。 这是我的规格: 当我在it中包含异步时,第一个规范失败,但第二个规范通过。 理想情况下,我希望异步之前的

  • 下面的函数在for循环中调用几个异步函数。它解析不同的CSV文件来构建单个JavaScript对象。我想在for循环完成后返回对象。它在执行异步任务时立即返回空对象。有道理,但是我尝试了各种Promise/异步 /await组合,希望在for循环完成后运行一些东西。我显然不明白发生了什么。对于这样的事情,有更好的模式吗?还是我想错了? 这是我用来调用函数的代码,希望用CSV数据填充我的“retCo

  • 我正在使用Spring Webflow R2DBC将一些数据插入数据库。 要求提供数据- 控制器 服务 道 主要问题是我不知道如何让它等待所有结果返回并添加到最终

  • 我有一个服务,返回一个可观察到的,做一个超文本传输协议请求到我的服务器,并获得数据。我想使用这些数据,但我总是最终得到。有什么问题? 服务: 组成部分: 我检查了如何从异步调用返回响应?发布,但找不到解决方案

  • 我是TPL的新手,我想知道:C#5.0新增的异步编程支持(通过新的异步和等待关键字)与线程的创建有什么关系? 具体来说,每次使用异步/等待是否都会创建一个新线程?如果有许多嵌套方法使用异步/等待,那么是否为这些方法中的每一个都创建了一个新线程?

  • 我是新来Java的,最近在学校学习多线程,我尝试创建一个小程序,可以将任务分割成一个小的部分,使用循环在多个线程中运行。问题是,在循环之后,我需要将研究结果求和并打印出来,在线程完成之前,循环下的打印运行。而我的同学所做的只是在打印结果之前添加睡眠,但是当线程花费太长时它就不起作用了。在运行其他代码之前,是否需要等待所有的线程在循环中首先完成?