本文着重从源码的角度探究Motan客户端的如下几个方面:
之前了解了一些Motan的服务启动的底层代码,了解到服务端其实是在Netty服务的基础上绑定处理Handler并且进行包装,以motan协议为通讯方式,并最终监听在发布端口上。
本篇将涉及到以上几个关键点,分析(走马观花),如有纰漏,望乞斧正:
Motan客户端的结合Spring的XML配置方式:
<motan:registry regProtocol="zookeeper" name="registry" address="127.0.0.1:2181" connectTimeout="2000"/>
<!-- motan协议配置 -->
<motan:protocol default="true" name="motan" haStrategy="failover"
loadbalance="roundrobin" maxClientConnection="10" minClientConnection="2"/>
<!-- 通用referer基础配置 -->
<motan:basicReferer requestTimeout="200" accessLog="false"
retries="2" group="motan-demo-rpc" module="motan-demo-rpc"
application="myMotanDemo" protocol="motan" registry="registry"
id="motantestClientBasicConfig" throwException="true" check="true"/>
<!-- 具体referer配置。使用方通过beanid使用服务接口类 -->
<motan:referer id="motanDemoReferer" retries="3"
interface="com.weibo.motan.demo.service.MotanDemoService"
connectTimeout="1000" requestTimeout="1000" basicReferer="motantestClientBasicConfig"/>
和服务器端的配置很相似,协议、registry注册中心、referer引用。配置属性,在这里先不深究。
该配置告诉Motan:该客户端需要调用com.weibo.motan.demo.service.MotanDemoService
这个接口声明的服务,使用zookeeper
注册中心,以及motan协议
。
public static void main(String[] args) throws InterruptedException {
ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"classpath:motan_demo_client.xml"});
MotanDemoService service = (MotanDemoService) ctx.getBean("motanDemoReferer");
for (int i = 0; i < Integer.MAX_VALUE; i++) {
System.out.println(service.hello("motan" + i));
Thread.sleep(1000);
}
System.out.println("motan demo is finish.");
System.exit(0);
}
从一开始就注定不一般
客户端的任务是调用远程接口定义的服务,在不同的JVM中调用。
首先要确认的一点是,客户端持有的只是一个服务的接口定义,并不能直接实例化。那么
MotanDemoService service = (MotanDemoService) ctx.getBean("motanDemoReferer");
这句就注定是不一般的操作。
从XML到Java对象
再谈到MotanNamespaceHandler
其实也并不陌生了,之前已经对它做过介绍了。
registerBeanDefinitionParser("referer", new MotanBeanDefinitionParser(RefererConfigBean.class, false));
上面配置的motan:referer
将会被解析成RefererConfigBean
对象,其类声明如下:
public class RefererConfigBean<T> extends RefererConfig<T> implements FactoryBean<T>, BeanFactoryAware, InitializingBean, DisposableBean
它实现了FactoryBean
接口,这个接口可以在Spring加载类的时候,可以允许自定义。
Spring有两种类型的Bean,一种是普通Bean,另外一种是
FactoryBean
,当类的构造方式比较复杂,
或者类需要在外部控制其构造方式和是否单例,也有可能是自定义类的实现的时候,可以实现该接口,
常见于各类框架,像Mybatis中的SqlSessionFactoryBean
类,Motan中的客户端对象RefererConfigBean
也是这样。
它允许通过
T getObject() throws Exception;返回Spring加载生成的对象;
Class getObjectType();返回对象类型;
boolean isSingleton(); 返回是否为单例;
查看Motan的源码,发现如下的实现(片段):
public class RefererConfigBean<T> extends RefererConfig<T> implements FactoryBean<T>, BeanFactoryAware, InitializingBean, DisposableBean {
...
@Override
public T getObject() throws Exception {
return getRef();
}
@Override
public Class<?> getObjectType() {
return getInterface();
}
@Override
public boolean isSingleton() {
return true;
}
...
}
是的,我们定义的motan:referer
将返回interface声明的类型,并且由getRef
方法负责创建具体的对象,这样就可以任框架自由发挥实现了。
所以客户端初始化的代码是由下面的这个方法引出的:
public synchronized void initRef() {
if (initialized.get()) {
return;
}
try {
//1、加载ref接口类型
interfaceClass = (Class) Class.forName(interfaceClass.getName(), true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new MotanFrameworkException("ReferereConfig initRef Error: Class not found " + interfaceClass.getName(), e,
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
if (CollectionUtil.isEmpty(protocols)) {
throw new MotanFrameworkException(String.format("%s RefererConfig is malformed, for protocol not set correctly!",
interfaceClass.getName()));
}
checkInterfaceAndMethods(interfaceClass, methods);
clusterSupports = new ArrayList<ClusterSupport<T>>(protocols.size());
List<Cluster<T>> clusters = new ArrayList<Cluster<T>>(protocols.size());
String proxy = null;
//2、加载ConfigHandler
ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
List<URL> registryUrls = loadRegistryUrls();
String localIp = getLocalHostAddress(registryUrls);
//为了便于管理,client只能使用一种协议
for (ProtocolConfig protocol : protocols) {
Map<String, String> params = new HashMap<String, String>();
params.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_REFERER);
params.put(URLParamType.version.getName(), URLParamType.version.getValue());
params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
collectConfigParams(params, protocol, basicReferer, extConfig, this);
collectMethodConfigParams(params, this.getMethods());
//构建客户端的URL
URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, interfaceClass.getName(), params);
//3、根据客户端URL和注册中心URL创建集群对象
ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);
clusterSupports.add(clusterSupport);
clusters.add(clusterSupport.getCluster());
proxy = (proxy == null) ? refUrl.getParameter(URLParamType.proxy.getName(), URLParamType.proxy.getValue()) : proxy;
}
//4、创建代理对象
ref = configHandler.refer(interfaceClass, clusters, proxy);
initialized.set(true);
}
为了便于理解,抽出4个重要的部分:
* 加载接口类型
* 创建ConfigHandler
* 创建集群对象
* 生成代理对象
ConfigHandler
默认使用的是SimpleConfigHandler
,使用SPI机制扩展加载。
之前提到过ConfigHandler,对于客户端来说,它也是从配置到协议的转化,它为客户端提供构建集群的支持,并且提供了创建代理的功能。
@Override
public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);
clusterSupport.init();
return clusterSupport;
}
@Override
public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
}
其中关于集群,它的类声明和类变量:
public class ClusterSupport<T> implements NotifyListener {
private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<String, Protocol>();
private Cluster<T> cluster;
private List<URL> registryUrls;
private URL url;
private Class<T> interfaceClass;
private Protocol protocol;
private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL, List<Referer<T>>>();
...
它有一个协议列表,集群对象,注册中心地址,客户端的URL,类型对象。
其中集群对象是用来处理集群的LB和HA的,注册中心地址是用来发现服务的,客户端URL可以用来做服务治理和统计
它实现了一个NotifyListener接口,这个接口是用来让这个集群对象处理注册中心数据变化同步服务的。
有了集群处理的供,接下来需要把对方法的调用也进行包装,使得客户端调用方法,像是在服务器本地调用方法一样。
SimpleConfigHandler的方法:
@Override
public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
}
使用Spi机制加载出代理工厂,Motan默认使用JdkProxyFactory
代理工厂,使用原生的代理方式。
传入的代理对象为RefererInvocationHandler
对象。
public class RefererInvocationHandler<T> implements InvocationHandler {
private List<Cluster<T>> clusters;
private Class<T> clz;
private SwitcherService switcherService = null;
private String interfaceName;
它持有集群列表,和用来选择服务的SwitcherService
,根据SPI方式加载。
重点来了,InvokerHandler的invoke方法,每次调用我们声明的接口对象,按本文的例子,就是这句:
service.hello("motan" + i)
的时候,将触发的代理方法:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (isLocalMethod(method)) {
if ("toString".equals(method.getName())) {
return clustersToString();
}
if ("equals".equals(method.getName())) {
return proxyEquals(args[0]);
}
throw new MotanServiceException("can not invoke local method:" + method.getName());
}
DefaultRequest request = new DefaultRequest();
request.setRequestId(RequestIdGenerator.getRequestId());
request.setArguments(args);
String methodName = method.getName();
boolean async = false;
if (methodName.endsWith(MotanConstants.ASYNC_SUFFIX) && method.getReturnType().equals(ResponseFuture.class)) {
methodName = MotanFrameworkUtil.removeAsyncSuffix(methodName);
async = true;
}
RpcContext.getContext().putAttribute(MotanConstants.ASYNC_SUFFIX, async);
request.setMethodName(methodName);
request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
request.setInterfaceName(interfaceName);
RpcContext curContext = RpcContext.getContext();
Map<String, String> attachments = curContext.getRpcAttachments();
if (!attachments.isEmpty()) { // set rpccontext attachments to request
for (Map.Entry<String, String> entry : attachments.entrySet()) {
request.setAttachment(entry.getKey(), entry.getValue());
}
}
if (StringUtils.isNotBlank(curContext.getClientRequestId())) {// add to attachment if client request id is set
request.setAttachment(URLParamType.requestIdFromClient.getName(), curContext.getClientRequestId());
}
// 当 referer配置多个protocol的时候,比如A,B,C,
// 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
for (Cluster<T> cluster : clusters) {
String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
if (switcher != null && !switcher.isOn()) {
continue;
}
request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
// 带上client的application和module
request.setAttachment(URLParamType.application.getName(), cluster.getUrl().getApplication());
request.setAttachment(URLParamType.module.getName(), cluster.getUrl().getModule());
Response response = null;
boolean throwException =
Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(),
URLParamType.throwException.getValue()));
Class returnType = getRealReturnType(async, this.clz, method, methodName);
try {
response = cluster.call(request);
if (async) {
if (response instanceof ResponseFuture) {
((ResponseFuture) response).setReturnType(returnType);
return response;
} else {
ResponseFuture responseFuture = new DefaultResponseFuture(request, 0, cluster.getUrl());
if (response.getException() != null) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}
responseFuture.setReturnType(returnType);
return responseFuture;
}
} else {
Object value = response.getValue();
if (value != null && value instanceof DeserializableObject) {
try {
value = ((DeserializableObject) value).deserialize(returnType);
} catch (IOException e) {
LoggerUtil.error("deserialize response value fail! deserialize type:" + returnType, e);
throw new MotanFrameworkException("deserialize return value fail! deserialize type:" + returnType, e);
}
}
return value;
}
} catch (RuntimeException e) {
if (ExceptionUtil.isBizException(e)) {
Throwable t = e.getCause();
// 只抛出Exception,防止抛出远程的Error
if (t != null && t instanceof Exception) {
throw t;
} else {
String msg =
t == null ? "biz exception cause is null. origin error msg : " + e.getMessage() : ("biz exception cause is throwable error:" + t.getClass()
+ ", errmsg:" + t.getMessage());
throw new MotanServiceException(msg, MotanErrorMsgConstant.SERVICE_DEFAULT_ERROR);
}
} else if (!throwException) {
LoggerUtil.warn("RefererInvocationHandler invoke false, so return default value: uri=" + cluster.getUrl().getUri()
+ " " + MotanFrameworkUtil.toString(request), e);
return getDefaultReturnValue(returnType);
} else {
LoggerUtil.error(
"RefererInvocationHandler invoke Error: uri=" + cluster.getUrl().getUri() + " "
+ MotanFrameworkUtil.toString(request), e);
throw e;
}
}
}
throw new MotanServiceException("Referer call Error: cluster not exist, interface=" + clz.getName() + " "
+ MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);
}
方法有点长,它做了如下三件事:
* 判定方法是不是toString
方法和equals
方法,不允许调用这两个方法
* 创建DefaultRequest
对象,这个对象用来远程调用
* 通过集群对象选出一个可以调用的远程服务,把请求发送过去,并且接收到结果或者异步结果
细节方面,如何使用HA和BL配置,以后再分析。
所以Motan的客户端,实际上是用到了JDK原生代理,附加上集群和注册中心的功能,调用方法的时候触发集群HA和BL模块,选出一个可以使用的服务端,发送DefaultRequest对象进行调用。
下次将以一次完整的客户端调用为基础,分析Motan客户端和服务器端的调用过程。
-EOF-