同步: 同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。
异步: 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。
在实际项目开发中很多业务场景需要使用异步去完成,比如消息通知,日志记录等常用的功能都可以通过异步去执行,提高效率。一般来说,完成异步操作一般有两种,消息队列MQ
和线程池处理ThreadPoolExecutor
,而在Spring4
以后提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor
,直接在方法上使用注解启用@Async
,即可方便的使用异步线程(这里不要忘记在任一Configuration文件加上@EnableAsync
打开注解功能)
SimpleAsyncTaskExecutor
:不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。SyncTaskExecutor
:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方。ConcurrentTaskExecutor
:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。SimpleThreadPoolTaskExecutor
:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。ThreadPoolTaskExecutor
:最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor
的包装。参考:Java8异步编程
@Configuration
@EnableAsync
public class SyncConfiguration {
}
增加一个service类,用来做积分处理。 @Async添加在方法上,代表该方法为异步处理。
public class ScoreService {
private static final Logger logger = LoggerFactory.getLogger(ScoreService.class);
@Async
public void addScore(){
//TODO 模拟睡5秒,用于赠送积分处理
try {
Thread.sleep(1000*5);
logger.info("--------------处理积分--------------------");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。Executors各个方法的弊端:
newFixedThreadPool
和newSingleThreadExecutor
:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOMnewCachedThreadPool
和newScheduledThreadPool
:主要问题是线程数最大数是Integer.MAX_VALUE
,可能会创建数量非常多的线程,甚至OOM@Async
默认异步配置使用的是SimpleAsyncTaskExecutor
,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。
针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0
时开启限流机制,默认关闭限流机制即concurrencyLimit=-1
,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能
自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。自定义线程池有如下模式:
通过查看Spring源码关于@Async
的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。**但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。**且重新实现 public Executor getAsyncExecutor()
方法。
这里我使用了配置文件注入的方式,首先配置好配置文件
# 配置核心线程数
async:
executor:
thread:
core_pool_size: 5
# 配置最大线程数
max_pool_size: 5
# 配置队列大小
queue_capacity: 999
# 配置线程最大空闲时间
keep_alive_seconds: 60
# 配置线程池中的线程的名称前缀
name:
prefix: test-async-
@Configuration
@EnableAsync
public class ExecutorConfig1 implements AsyncConfigurer {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("开启SpringBoot的线程池!");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(corePoolSize);
// 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(maxPoolSize);
// 设置缓冲队列大小
executor.setQueueCapacity(queueCapacity);
// 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(keepAliveSeconds);
// 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix(namePrefix);
// 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
// CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
// 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程池初始化
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return asyncServiceExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> logger.error(String.format("执行异步任务'%s'", method), ex);
}
}
@Configuration
@EnableAsync
public class ExecutorConfig2 extends AsyncConfigurerSupport {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("开启SpringBoot的线程池!");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(corePoolSize);
// 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(maxPoolSize);
// 设置缓冲队列大小
executor.setQueueCapacity(queueCapacity);
// 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(keepAliveSeconds);
// 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix(namePrefix);
// 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
// CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
// 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程池初始化
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return asyncServiceExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> logger.error(String.format("执行异步任务'%s'", method), ex);
}
}
由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)
先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class)
,又查询是否存在默认名称为TaskExecutor的线程池。
因此在替换默认的线程池时,需设置默认的线程池名称为TaskExecutor,这样的模式,最终底层为TaskExecutor.class
,在替换默认的线程池时,可不指定线程池名称。
@Configuration
@EnableAsync
public class ExecutorConfig3 {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
public Executor taskExecutor() {
logger.info("开启SpringBoot的线程池!");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(corePoolSize);
// 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(maxPoolSize);
// 设置缓冲队列大小
executor.setQueueCapacity(queueCapacity);
// 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(keepAliveSeconds);
// 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix(namePrefix);
// 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
// CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
// 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程池初始化
executor.initialize();
return executor;
}
@Bean(name = "myTask")
public Executor taskExecutor() {
logger.info("开启SpringBoot的线程池!");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(corePoolSize);
// 设置最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(maxPoolSize);
// 设置缓冲队列大小
executor.setQueueCapacity(queueCapacity);
// 设置线程的最大空闲时间,超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(keepAliveSeconds);
// 设置线程名字的前缀,设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix(namePrefix);
// 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
// CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务,
// 当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程池初始化
executor.initialize();
return executor;
}
}
@Async
注解使用系统默认或者自定义的线程池(代替默认线程池),也可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("mytask")
。
参考文章
https://mp.weixin.qq.com/s/ACJgGFofD9HAYqW4u8qJUw