当前位置: 首页 > 工具软件 > Spring Sync > 使用案例 >

SpringBoot @Async异步多线程

谢承
2023-12-01

一、简介

1、概念

同步: 同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。
异步: 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。

2、异步多线程概述

在实际项目开发中很多业务场景需要使用异步去完成,比如消息通知,日志记录等常用的功能都可以通过异步去执行,提高效率。一般来说,完成异步操作一般有两种,消息队列MQ和线程池处理ThreadPoolExecutor,而在Spring4以后提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接在方法上使用注解启用@Async,即可方便的使用异步线程(这里不要忘记在任一Configuration文件加上@EnableAsync打开注解功能)

3、Spring已实现线程池

  • SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
  • SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方。
  • ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
  • SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。
  • ThreadPoolTaskExecutor :最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor的包装。

4、异步方法

参考:Java8异步编程

  • 最简单的异步调用,返回值为void
  • 带参数的异步调用,异步方法可以传入参数
  • 存在返回值,常调用返回Future

二、@Async默认线程池

1、默认@Async异步调用例子

1.1 开启异步任务

@Configuration
@EnableAsync
public class SyncConfiguration {

}

1.2 在方法上标记异步调用

增加一个service类,用来做积分处理。 @Async添加在方法上,代表该方法为异步处理。

public class ScoreService {

    private static final Logger logger = LoggerFactory.getLogger(ScoreService.class);

    @Async
    public void addScore(){
        //TODO 模拟睡5秒,用于赠送积分处理
        try {
            Thread.sleep(1000*5);
            logger.info("--------------处理积分--------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2、默认线程池弊端

2.1 Executors弊端

在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。Executors各个方法的弊端:

  • newFixedThreadPoolnewSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM
  • newCachedThreadPoolnewScheduledThreadPool:主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM

2.2 @Async弊端

@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。

针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能

三、@Async自定义线程池

1、介绍

自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。自定义线程池有如下模式:

  • 重新实现接口AsyncConfigurer
  • 继承AsyncConfigurerSupport
  • 配置由自定义的TaskExecutor替代内置的任务执行器

通过查看Spring源码关于@Async的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。**但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。**且重新实现 public Executor getAsyncExecutor()方法。

2、Spring自定义异步线程池几种方式

2.1 配置application.yml

这里我使用了配置文件注入的方式,首先配置好配置文件

# 配置核心线程数
async:
  executor:
    thread:
      core_pool_size: 5
      # 配置最大线程数
      max_pool_size: 5
      # 配置队列大小
      queue_capacity: 999
      # 配置线程最大空闲时间
      keep_alive_seconds: 60
      # 配置线程池中的线程的名称前缀
      name:
        prefix: test-async-

2.2 实现接口AsyncConfigurer

@Configuration
@EnableAsync
public class ExecutorConfig1 implements AsyncConfigurer {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("开启SpringBoot的线程池!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 设置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(maxPoolSize);
        // 设置缓冲队列大小
        executor.setQueueCapacity(queueCapacity);
        // 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix(namePrefix);
        // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
        // CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
        // 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 线程池初始化
        executor.initialize();

        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncServiceExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> logger.error(String.format("执行异步任务'%s'", method), ex);
    }
}

2.3 继承AsyncConfigurerSupport

@Configuration
@EnableAsync
public class ExecutorConfig2 extends AsyncConfigurerSupport {


    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("开启SpringBoot的线程池!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 设置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(maxPoolSize);
        // 设置缓冲队列大小
        executor.setQueueCapacity(queueCapacity);
        // 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix(namePrefix);
        // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
        // CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
        // 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 线程池初始化
        executor.initialize();

        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncServiceExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> logger.error(String.format("执行异步任务'%s'", method), ex);
    }
}

2.4 配置自定义的TaskExecutor

由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查询是否存在默认名称为TaskExecutor的线程池。

因此在替换默认的线程池时,需设置默认的线程池名称为TaskExecutor,这样的模式,最终底层为TaskExecutor.class,在替换默认的线程池时,可不指定线程池名称。

@Configuration
@EnableAsync
public class ExecutorConfig3 {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
    public Executor taskExecutor() {
        logger.info("开启SpringBoot的线程池!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 设置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(maxPoolSize);
        // 设置缓冲队列大小
        executor.setQueueCapacity(queueCapacity);
        // 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix(namePrefix);
        // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
        // CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
        // 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 线程池初始化
        executor.initialize();

        return executor;
    }
    
    @Bean(name = "myTask")
    public Executor taskExecutor() {
        logger.info("开启SpringBoot的线程池!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 设置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(maxPoolSize);
        // 设置缓冲队列大小
        executor.setQueueCapacity(queueCapacity);
        // 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix(namePrefix);
        // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
        // CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
        // 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 线程池初始化
        executor.initialize();

        return executor;
    }

}

2.5 多线程

@Async注解使用系统默认或者自定义的线程池(代替默认线程池),也可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("mytask")


参考文章

https://mp.weixin.qq.com/s/ACJgGFofD9HAYqW4u8qJUw

https://juejin.cn/post/6970927877348917261#heading-0

https://www.cnblogs.com/kenx/p/15268311.html

 类似资料: