背景
最近资讯asyncload使用的同学越来越多,会有些一些经常性的问题,这里我做一下整理和answer,同时介绍一下asyncload的UserGuide 和一些限制等。
关于asyncload,又名异步并行加载 ,可参见我之前的文章: (业务层)异步并行加载技术分析和设计
UserGuide篇
几个基本概念:
- 线程池 (定义异步处理的线程池模型,包括线程数,队列大小等)
- 匹配信息 (定义哪些方法需要实施,包括超时时间等)
- 匹配主体 (比如常见的service,dao等,需要进行异步并行加载处理的对象)
声明式: 常规配置(半侵入)
基本步骤:
1. 配置线程池
<bean id="asyncLoadExecutor" class="com.agapple.asyncload.AsyncLoadExecutor" init-method="initital" destroy-method="destory">
<property name="poolSize" value="10" />
<property name="acceptCount" value="20" />
<property name="mode" value="CALLSRUN" />
</bean>
- 关于poolSize/acceptCount的建议参数,请参考我的另一篇文章: ThreadPoolExecutor几点使用建议
- 关于mode参数,目前支持REJECT和CALLSRUN。
REJECT:当异步提交的任务数超过了acceptCount后,直接返回Reject异常
CALLSRUN:当异步提交的任务数超过了acceptCount后,由当前提交的线程执行runnable任务。此时的线程模型就变为了poolSize+1线程数,你的提交线程也就成为了其中的一个工作线程。建议使用该参数
2. 匹配信息配置
<bean id="asyncLoadConfig" class="com.agapple.asyncload.AsyncLoadConfig">
<property name="defaultTimeout" value="3000" />
<property name="needThreadLocalSupport" value="false" />
<property name="needBarrierSupport" value="false" />
<property name="matches">
<map>
<entry key-ref="asyncLoadMethodMatch" value="2000" />
</map>
</property>
</bean>
- defaultTimeout:指异步提交任务后,等待返回结果的超时时间,可以有效保护系统的健壮性(当外部系统不可用时)。如果不想进行超时控制,可设置为0。默认值为0
- needThreadLocalSupport:指异步任务提交后,原先的正常业务处理线程A和异步任务的处理线程B,是否共享ThreadLocal变量,使用需慎用,一般不建议开启。默认值为false,不开启
- needBarrierSupport:指当原先一个业务处理线程,被拆分为N多个异步任务并行处理后,可以通过设置栅栏,在某一代码处要求所有的异步结果均返回后才进行下一步操作。默认值为false,不开启
- matches:匹配点定义,asyncload自带的匹配方式,如果使用spring拦截器处理可不配置该属性,使用spring pointcut定义匹配点。
<bean id="asyncLoadMethodMatch" class="com.agapple.asyncload.impl.AsyncLoadPerl5RegexpMethodMatcher" >
<property name="patterns">
<list>
<value>(.*)RemoteModel(.*)</value>
</list>
</property>
<property name="excludedPatterns">
<list>
<value>(.*)listRemoteModel(.*)</value>
</list>
</property>
<property name="excludeOveride" value="false" />
</bean>
- patterns:代表满足该正则的匹配
- excludedPatterns:代表需要被排除的匹配
- excludeOveride:true/false
true: 优先执行excluded排除匹配
false:优先执行满足匹配,在满足匹配通过后,再执行排除匹配
3. 匹配主体配置
<bean id="asyncLoadTestFactoryBean" class="com.agapple.asyncload.impl.spring.AsyncLoadFactoryBean">
<property name="targetClass" value="com.agapple.asyncload.domain.AsyncLoadTestService" /><!-- 指定具体的代理目标class -->
<property name="target">
<ref bean="asyncLoadTestService" />
</property>
<property name="executor" ref="asyncLoadExecutor" />
<property name="config" ref="asyncLoadConfig" />
</bean>
- 类似于spring ProxyFactoryBean配置模式,对应的target即为你需要操作的服务对象。
- config即为匹配信息定义,步骤2中的定义
- executor即为线程池定义,步骤1中的定义
public class AsyncLoadFactoryBeanTest extends BaseAsyncLoadNoRunTest {
@Resource(name = "asyncLoadTestFactoryBean")
private AsyncLoadTestService asyncLoadTestFactoryBean;
@Test
public void testFactoryBean() {
AsyncLoadTestModel model1 = asyncLoadTestFactoryBean.getRemoteModel("first", 1000);
.............
}
}
注意:此时需要引用的bean name为步骤3中定义的主体配置中的名字。
声明式: 集成spring拦截器模式 (更少侵入)
步骤1: 配置线程池
见上一章节的步骤1定义
步骤2: 匹配信息配置
和上一章节的步骤2定义,基本类似。不过可以不需配置匹配点,可省略asyncLoadMethodMatch定义
步骤3:匹配主体配置
a. 定义拦截器
<bean id="asyncLoadInterceptor" class="com.agapple.asyncload.impl.spring.AsyncLoadInterceptor" >
<property name="asyncLoadTemplate" ref="asyncLoadTemplate" />
</bean>
- 注意这里依赖了一个asyncLoadTemplate配置,后面再介绍下对应的配置。
定义一个pointcut
<bean id="asyncLoadPointcut" class="org.springframework.aop.support. Perl5RegexpMethodPointcut">
<property name="pattern">
<value>(.*)RemoteModel(.*)</value>
</property>
<property name="ExcludedPattern">
<value>(.*)listRemoteModel(.*)</value>
</property>
</bean>
<bean id="asyncloadAdvisor" class="org.springframework.aop.support.DefaultPointcutAdvisor">
<property name="advice" ref="asyncLoadInterceptor"></property>
<property name="pointcut" ref="asyncLoadPointcut"></property>
</bean>
<bean id="asyncLoadTestProxy" class="org.springframework.aop.framework.ProxyFactoryBean">
<property name="proxyTargetClass" value="true" />
<property name="target" ref="asyncLoadTestService" />
<property name="interceptorNames">
<list>
<value>asyncLoadInterceptor</value>
</list>
</property>
</bean>
这样就完成了配置,是不是觉得比较easy.
步骤4:使用 (AsyncLoadSpringInteceptorTest)
public class AsyncLoadSpringInteceptorTest extends BaseAsyncLoadNoRunTest {
@Resource(name = "asyncLoadTestServiceForInteceptor")
private AsyncLoadTestService asyncLoadTestServiceForInteceptor;
@Test
public void testSpringInteceptor() {
AsyncLoadTestModel model1 = asyncLoadTestServiceForInteceptor.getRemoteModel("first", 1000);
AsyncLoadTestModel model2 = asyncLoadTestServiceForInteceptor.getRemoteModel("two", 1000);
long start = 0, end = 0;
start = System.currentTimeMillis();
System.out.println(model1.getDetail());
end = System.currentTimeMillis();
Assert.assertTrue((end - start) > 500l); // 第一次会阻塞, 响应时间会在1000ms左右
start = System.currentTimeMillis();
System.out.println(model2.getDetail());
end = System.currentTimeMillis();
Assert.assertTrue((end - start) < 500l); // 第二次不会阻塞,第一个已经阻塞了1000ms
}
}
可以直接操作原先的主体bean
--------------------------------------------------------------------------------分割线--------------------------------------------------------------------------------------------------------------
如果是兼容老系统,减少配置变更,可以考虑使用spring auto-proxy机制,对原先的配置侵入几乎为0
spring: BeanNameAutoProxyCreator auto-proxy,直接针对现有的bean name实施拦截器切入
<bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">
<property name="optimize" value="false"/>
<property name="proxyTargetClass" value="false" />
<property name="beanNames">
<list>
<value>asyncLoadTestServiceForInteceptor</value>
</list>
</property>
<property name="interceptorNames">
<list>
<value>asyncLoadInterceptor</value>
</list>
</property>
</bean>
asyncload : CompositeAutoProxyCreator ,使用: AsyncLoadSpringCompsiteTest
<bean class="com.agapple.asyncload.impl.spring.CompositeAutoProxyCreator">
<property name="optimize" value="false"/>
<property name="proxyTargetClass" value="false" />
<property name="beanNames">
<list>
<value>asyncLoadTestServiceForInteceptor</value>
</list>
</property>
<property name="interceptorNames">
<list>
<value>asyncLoadInterceptor</value>
</list>
</property>
</bean>
相比于BeanNameAutoProxyCreator,CompositeAutoProxyCreator会有一种融合机制,假如发现需要操作的bean已经进行了spring aop代理配置后,直接将当前的interceptor加入到原先aop配置定义中,而不会是两次代理封装)
两次代理封装问题:
- 第一次封装为cglib代理后,生成的对象为final类,无法再次生成cglib代理类。如果无接口,无法再次生成jdk代理
(编程式)模板模式
模板模式: AsyncLoadTemplate
配置:
<bean id="asyncLoadTemplate" class="com.agapple.asyncload.impl.template.AsyncLoadTemplate" >
<property name="executor" ref="asyncLoadExecutor" />
<property name="config" ref="asyncLoadConfig" />
</bean>
- 需要依赖线程池定义 和 匹配信息定义
public class AsyncLoadTemplateTest extends BaseAsyncLoadNoRunTest {
@Resource(name = "asyncLoadTemplate")
private AsyncLoadTemplate asyncLoadTemplate;
@Resource(name = "asyncLoadTestService")
private AsyncLoadTestService asyncLoadTestService;
@Test
public void testTemplate() {
AsyncLoadTestModel model2 = asyncLoadTemplate.execute(new AsyncLoadCallback<AsyncLoadTestModel>() {
public AsyncLoadTestModel doAsyncLoad() {
// 总共sleep 2000ms
return asyncLoadTestService.getRemoteModel("ljhtest", 1000);
}
});
}
}
- 接受AsyncLoadCallback进行异步并行业务处理单元封装
使用模板模式的好处: (自由定义异步并行处理单元)
- 比如针对服务B依赖服务A,两者依赖的间隔时间很多,当将A和B的调用各自做异步并行加载,会发现A的调用几乎都是阻塞式。此时可以选择将A和B的一个完整处理,做为一个异步并行处理的单元。
- 比如一个方法调用中,可以考虑将部分代码进行异步调用,而不是以方法为一个单元。
(编程式)非spring容器
public class AsyncLoadProxyTest extends BaseAsyncLoadNoRunTest {
@Test
public void testProxy() {
AsyncLoadTestService asyncLoadTestService = xxxxx; //你原先的业务处理主体
// 初始化config
AsyncLoadConfig config = new AsyncLoadConfig(3 * 1000l);
// 初始化executor
AsyncLoadExecutor executor = new AsyncLoadExecutor(10, 100);
executor.initital();
// 初始化proxy
AsyncLoadEnhanceProxy<AsyncLoadTestService> proxy = new AsyncLoadEnhanceProxy<AsyncLoadTestService>();
proxy.setService(asyncLoadTestService); //传递你原先的业务对象
proxy.setConfig(config);
proxy.setExecutor(executor);
AsyncLoadTestService service = proxy.getProxy(); //获取到异步并行处理包装过的服务对象
// 执行测试
AsyncLoadTestModel model1 = service.getRemoteModel("first", 1000); // 进行业务请求
}
}
DevGuide 篇
1. asyncload是否存在一些使用限制?
ans : 存在一定的使用限制和建议
使用限制:
- 不支持 == null的判断 (原因: asyncload因为需要做异步处理,所以在执行方法调用时,比如xxxService.getRemote()。会预先生成一个假的返回对象,永远不会为null, 所以==null一定返回为false)
规避: 可以使用AsyncLoadUtils.isNull(xxxModel)进行判断,注意调用此方法,会阻塞直到原先的调用结果返回,然后再依据返回结果进行==null判断 - 不支持以下几种的方法调用,主要是针对返回结果类型:
a. void, 没有返回对象。
b. final类,比如java.lang.String
c. java.lang.Object,因为asyncload分析是基于当前的class,不能基于运行时对象进行处理。所以针对方法定义中返回为java.lang.Object的不支持
d. 原生类型, 比如int,long等
e. array类型,比如int[],long[]等
f. 非public的类型,比如在一些类的返回结果,返回了一个内部protected类 - threadlocal使用限制 (原先:引入asyncload后,原先在一个线程中处理的业务会分散到多个线程中,所以ThreadLocal默认无法进行共享处理,虽然asyncload可以有技术做到共享ThreadLocal,但这样会打破原先ThreadLocal的语义,导致出现线程安全问题。严重慎用)
规避:使用模板模式控制拦截器粒度,尽量将ThreadLocal的操作放在一个异步并行处理单元,或者不进行异步处理。
使用建议:
- 应用在I/O只读查询操作上,比如查询数据库,调用远程服务接口,调用cache等。
- 将请求发起放在前面调用,数据结果处理统一在最后处理。尽量让请求可以走到并行处理
- 使用spring无嵌入配置(composite配置)
2. asyncload异步并行处理后,如何确保返回结果?提交给ThreadPoolExectuor后的future是如何存储的?
比如有个ProductService中,有个方法ProductModel product.getProductById(Long productId)
ans : asyncload会在两个层面进行扩展处理(字节码或者拦截器)
- 服务主体层(ProductService),asyncload会通过常规配置(字节码处理)或者拦截器配置,会改变原先对于getProductById()的方法调用逻辑,这里就会提交到一个线程池中进行处理。
return asyncLoadTemplate.execute(new AsyncLoadCallback() { public Object doAsyncLoad() { try { return temp.proceed(); } catch (Throwable e) { throw new AsyncLoadException("AsyncLoadInterceptor invoke error!", e); } } }, invocation.getMethod().getReturnType()); // 这里指定了返回目标class
- 服务返回对象(ProductModel):asyncload提交任务到线程池之后,会根据原先method.getReturnType()获取到返回结果的类型定义,通过字节码处理技术生成了一个原先retrunType类型的子类,同时覆盖了原先ProductModel中的所有方法,比如getId()方法就会变为:
调用productModel.getId()方法就会先调用loadFuture(),阻塞等待future的返回,然后再委托给future的返回对象调用getId方法进行返回public ProductModelSub extends ProductModel { public Future future; //持有线程池返回对象 public Object loadObject() throws Exception { return loadFuture(); } private Object loadFuture() throws AsyncLoadException { try { // 使用cglib lazyLoader,避免每次调用future if (timeout <= 0) {// <=0处理,不进行超时控制 return future.get(); } else { return future.get(timeout, TimeUnit.MILLISECONDS); } } catch (TimeoutException e) { future.cancel(true); throw new AsyncLoadException(e); } catch (InterruptedException e) { throw new AsyncLoadException(e); } catch (Exception e) { throw new AsyncLoadException(e); } } public Long getId(){ ProductModel model = loadObject(); //先阻塞等待future返回 return model.getId(); } }
3. asyncload的服务依赖关系链的处理?
ans :
首先依赖关系的定义:如果服务B依赖了服务A的返回结果。(比如这里是ProductModel.getId()的返回结果,将做为服务B ProductDetailService.getProudctDetailByProductId(Long productId)),进行服务B的返回调用参数)
出现依赖关系后的处理:其实很简单,当B需要ProductModel.getId()的结果,进行构造自己的参数时,此时服务A的调用就会。也就是转变为了A,B是一个串行调用。
4. asyncload的线程池配置是否有讲究 ?
ans: poolSize不宜开的过大,一般建议为20~30,acceptCount建议为poolSize的两倍,model建议为CALLSRUN。
关于poolSize/acceptCount的建议参数,请参考我的另一篇文章: ThreadPoolExecutor几点使用建议
4. asyncload是否可以提升性能? 比如tps ,响应时间?
ans :
- 针对响应时间,为你最长依赖关系链的时间之后。所以只要你配置后的依赖关系链有一处做了并行,就可以得到提升。这里需要注意线程池的设置,避免出现大量的异步任务进行等待,导致单个任务的处理时间过长。
- 针对tps,计算公式 tps = 1000 / (每个request的响应时间),只要响应时间减少了,可支持的tps就会上升。注意如果你当前统计的访问tps只有100个,没有出现竞争资源瓶颈,使用asyncload后,当前的tps是不会增加,说白了你每秒就100个request。 (可支持tps会增加,如果当前tps已经存在竞争瓶颈,就会有所增加)
可以分享一下我当时一个实施场景的数据:(2o并发的持续高压)
对比项 | 主干代码 | 并行加载实施代码 | 提升幅度 | 提升百分比 |
响应时间 | 347ms | 281ms | 66ms | 19% |
tps | 60.7 | 70.9 | 10.2 | 16.8% |
cpu使用率 | user:44.97% sys:4.67% | user:53.12% sys:5.87% | user:8.15 sys:1.2 | user:18.1% sys:25.7% |
load | 9.39 | 6.21 | 3.18 | 51.2% |
最后
目前asyncload的实施场景已经有好多个,包括阿里巴巴,淘宝,良无限等。至于提升多少性能要结合具体的业务,也就是说你的可提升空间有多少。当然异步并行后,会适当的增加系统资源的消耗。