当前位置: 首页 > 工具软件 > MOTAN-C++ > 使用案例 >

Motan客户端源码阅读

百里锋
2023-12-01

本文着重从源码的角度探究Motan客户端的如下几个方面:

客户端的使用

客户端的实现

之前了解了一些Motan的服务启动的底层代码,了解到服务端其实是在Netty服务的基础上绑定处理Handler并且进行包装,以motan协议为通讯方式,并最终监听在发布端口上。
本篇将涉及到以上几个关键点,分析(走马观花),如有纰漏,望乞斧正:

  • 客户端的配置

Motan客户端的结合Spring的XML配置方式:

    <motan:registry regProtocol="zookeeper" name="registry" address="127.0.0.1:2181" connectTimeout="2000"/>
    <!-- motan协议配置 -->
    <motan:protocol default="true" name="motan" haStrategy="failover"
                    loadbalance="roundrobin" maxClientConnection="10" minClientConnection="2"/>
    <!-- 通用referer基础配置 -->
    <motan:basicReferer requestTimeout="200" accessLog="false"
                        retries="2" group="motan-demo-rpc" module="motan-demo-rpc"
                        application="myMotanDemo" protocol="motan" registry="registry"
                        id="motantestClientBasicConfig" throwException="true" check="true"/>
    <!-- 具体referer配置。使用方通过beanid使用服务接口类 -->
    <motan:referer id="motanDemoReferer" retries="3"
                   interface="com.weibo.motan.demo.service.MotanDemoService"
                   connectTimeout="1000" requestTimeout="1000" basicReferer="motantestClientBasicConfig"/>

和服务器端的配置很相似,协议、registry注册中心、referer引用。配置属性,在这里先不深究。
该配置告诉Motan:该客户端需要调用com.weibo.motan.demo.service.MotanDemoService这个接口声明的服务,使用zookeeper注册中心,以及motan协议

  • 使用的基本姿势:
public static void main(String[] args) throws InterruptedException {
        ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"classpath:motan_demo_client.xml"});
        MotanDemoService service = (MotanDemoService) ctx.getBean("motanDemoReferer");
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            System.out.println(service.hello("motan" + i));
            Thread.sleep(1000);
        }
        System.out.println("motan demo is finish.");
        System.exit(0);
    }
  • 从一开始就注定不一般
    客户端的任务是调用远程接口定义的服务,在不同的JVM中调用。
    首先要确认的一点是,客户端持有的只是一个服务的接口定义,并不能直接实例化。那么
    MotanDemoService service = (MotanDemoService) ctx.getBean("motanDemoReferer");这句就注定是不一般的操作。

  • 从XML到Java对象
    再谈到MotanNamespaceHandler其实也并不陌生了,之前已经对它做过介绍了。

registerBeanDefinitionParser("referer", new MotanBeanDefinitionParser(RefererConfigBean.class, false));

上面配置的motan:referer将会被解析成RefererConfigBean对象,其类声明如下:

public class RefererConfigBean<T> extends RefererConfig<T> implements FactoryBean<T>, BeanFactoryAware, InitializingBean, DisposableBean

它实现了FactoryBean接口,这个接口可以在Spring加载类的时候,可以允许自定义。

Spring有两种类型的Bean,一种是普通Bean,另外一种是FactoryBean,当类的构造方式比较复杂,
或者类需要在外部控制其构造方式和是否单例,也有可能是自定义类的实现的时候,可以实现该接口,
常见于各类框架,像Mybatis中的SqlSessionFactoryBean类,Motan中的客户端对象RefererConfigBean也是这样。
它允许通过
T getObject() throws Exception;返回Spring加载生成的对象;
Class getObjectType();返回对象类型;
boolean isSingleton(); 返回是否为单例;

查看Motan的源码,发现如下的实现(片段):

public class RefererConfigBean<T> extends RefererConfig<T> implements FactoryBean<T>, BeanFactoryAware, InitializingBean, DisposableBean {
...
@Override
    public T getObject() throws Exception {
        return getRef();
    }
    @Override
    public Class<?> getObjectType() {
        return getInterface();
    }
    @Override
    public boolean isSingleton() {
        return true;
    }
...
}

是的,我们定义的motan:referer将返回interface声明的类型,并且由getRef方法负责创建具体的对象,这样就可以任框架自由发挥实现了。
所以客户端初始化的代码是由下面的这个方法引出的:

public synchronized void initRef() {
        if (initialized.get()) {
            return;
        }
        try {
            //1、加载ref接口类型
            interfaceClass = (Class) Class.forName(interfaceClass.getName(), true, Thread.currentThread().getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new MotanFrameworkException("ReferereConfig initRef Error: Class not found " + interfaceClass.getName(), e,
                    MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
        }
        if (CollectionUtil.isEmpty(protocols)) {
            throw new MotanFrameworkException(String.format("%s RefererConfig is malformed, for protocol not set correctly!",
                    interfaceClass.getName()));
        }
        checkInterfaceAndMethods(interfaceClass, methods);
        clusterSupports = new ArrayList<ClusterSupport<T>>(protocols.size());
        List<Cluster<T>> clusters = new ArrayList<Cluster<T>>(protocols.size());
        String proxy = null;
        //2、加载ConfigHandler
        ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
        List<URL> registryUrls = loadRegistryUrls();
        String localIp = getLocalHostAddress(registryUrls);
        //为了便于管理,client只能使用一种协议
        for (ProtocolConfig protocol : protocols) {
            Map<String, String> params = new HashMap<String, String>();
            params.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_REFERER);
            params.put(URLParamType.version.getName(), URLParamType.version.getValue());
            params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
            collectConfigParams(params, protocol, basicReferer, extConfig, this);
            collectMethodConfigParams(params, this.getMethods());
            //构建客户端的URL
            URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, interfaceClass.getName(), params);
            //3、根据客户端URL和注册中心URL创建集群对象
            ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);
            clusterSupports.add(clusterSupport);
            clusters.add(clusterSupport.getCluster());
            proxy = (proxy == null) ? refUrl.getParameter(URLParamType.proxy.getName(), URLParamType.proxy.getValue()) : proxy;
        }
        //4、创建代理对象
        ref = configHandler.refer(interfaceClass, clusters, proxy);
        initialized.set(true);
    }

为了便于理解,抽出4个重要的部分:
* 加载接口类型
* 创建ConfigHandler
* 创建集群对象
* 生成代理对象

ConfigHandler默认使用的是SimpleConfigHandler,使用SPI机制扩展加载。
之前提到过ConfigHandler,对于客户端来说,它也是从配置到协议的转化,它为客户端提供构建集群的支持,并且提供了创建代理的功能。

 @Override
    public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
        ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);
        clusterSupport.init();
        return clusterSupport;
    }
    @Override
    public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
        return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
    }

其中关于集群,它的类声明和类变量:

public class ClusterSupport<T> implements NotifyListener {
    private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<String, Protocol>();
    private Cluster<T> cluster;
    private List<URL> registryUrls;
    private URL url;
    private Class<T> interfaceClass;
    private Protocol protocol;
    private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL, List<Referer<T>>>();
...

它有一个协议列表,集群对象,注册中心地址,客户端的URL,类型对象。
其中集群对象是用来处理集群的LB和HA的,注册中心地址是用来发现服务的,客户端URL可以用来做服务治理和统计
它实现了一个NotifyListener接口,这个接口是用来让这个集群对象处理注册中心数据变化同步服务的。

有了集群处理的供,接下来需要把对方法的调用也进行包装,使得客户端调用方法,像是在服务器本地调用方法一样。
SimpleConfigHandler的方法:

    @Override
    public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
        return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
    }

使用Spi机制加载出代理工厂,Motan默认使用JdkProxyFactory代理工厂,使用原生的代理方式。
传入的代理对象为RefererInvocationHandler对象。

public class RefererInvocationHandler<T> implements InvocationHandler {
    private List<Cluster<T>> clusters;
    private Class<T> clz;
    private SwitcherService switcherService = null;
    private String interfaceName;

它持有集群列表,和用来选择服务的SwitcherService,根据SPI方式加载。
重点来了,InvokerHandler的invoke方法,每次调用我们声明的接口对象,按本文的例子,就是这句:
service.hello("motan" + i)的时候,将触发的代理方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (isLocalMethod(method)) {
            if ("toString".equals(method.getName())) {
                return clustersToString();
            }
            if ("equals".equals(method.getName())) {
                return proxyEquals(args[0]);
            }
            throw new MotanServiceException("can not invoke local method:" + method.getName());
        }
        DefaultRequest request = new DefaultRequest();
        request.setRequestId(RequestIdGenerator.getRequestId());
        request.setArguments(args);
        String methodName = method.getName();
        boolean async = false;
        if (methodName.endsWith(MotanConstants.ASYNC_SUFFIX) && method.getReturnType().equals(ResponseFuture.class)) {
            methodName = MotanFrameworkUtil.removeAsyncSuffix(methodName);
            async = true;
        }
        RpcContext.getContext().putAttribute(MotanConstants.ASYNC_SUFFIX, async);
        request.setMethodName(methodName);
        request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
        request.setInterfaceName(interfaceName);
        RpcContext curContext = RpcContext.getContext();
        Map<String, String> attachments = curContext.getRpcAttachments();
        if (!attachments.isEmpty()) { // set rpccontext attachments to request
            for (Map.Entry<String, String> entry : attachments.entrySet()) {
                request.setAttachment(entry.getKey(), entry.getValue());
            }
        }
        if (StringUtils.isNotBlank(curContext.getClientRequestId())) {// add to attachment if client request id is set
            request.setAttachment(URLParamType.requestIdFromClient.getName(), curContext.getClientRequestId());
        }
        // 当 referer配置多个protocol的时候,比如A,B,C,
        // 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
        for (Cluster<T> cluster : clusters) {
            String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
            Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
            if (switcher != null && !switcher.isOn()) {
                continue;
            }
            request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
            request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
            // 带上client的application和module
            request.setAttachment(URLParamType.application.getName(), cluster.getUrl().getApplication());
            request.setAttachment(URLParamType.module.getName(), cluster.getUrl().getModule());
            Response response = null;
            boolean throwException =
                    Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(),
                            URLParamType.throwException.getValue()));
            Class returnType = getRealReturnType(async, this.clz, method, methodName);
            try {
                response = cluster.call(request);
                if (async) {
                    if (response instanceof ResponseFuture) {
                        ((ResponseFuture) response).setReturnType(returnType);
                        return response;
                    } else {
                        ResponseFuture responseFuture = new DefaultResponseFuture(request, 0, cluster.getUrl());
                        if (response.getException() != null) {
                            responseFuture.onFailure(response);
                        } else {
                            responseFuture.onSuccess(response);
                        }
                        responseFuture.setReturnType(returnType);
                        return responseFuture;
                    }
                } else {
                    Object value = response.getValue();
                    if (value != null && value instanceof DeserializableObject) {
                        try {
                            value = ((DeserializableObject) value).deserialize(returnType);
                        } catch (IOException e) {
                            LoggerUtil.error("deserialize response value fail! deserialize type:" + returnType, e);
                            throw new MotanFrameworkException("deserialize return value fail! deserialize type:" + returnType, e);
                        }
                    }
                    return value;
                }
            } catch (RuntimeException e) {
                if (ExceptionUtil.isBizException(e)) {
                    Throwable t = e.getCause();
                    // 只抛出Exception,防止抛出远程的Error
                    if (t != null && t instanceof Exception) {
                        throw t;
                    } else {
                        String msg =
                                t == null ? "biz exception cause is null. origin error msg : " + e.getMessage() : ("biz exception cause is throwable error:" + t.getClass()
                                        + ", errmsg:" + t.getMessage());
                        throw new MotanServiceException(msg, MotanErrorMsgConstant.SERVICE_DEFAULT_ERROR);
                    }
                } else if (!throwException) {
                    LoggerUtil.warn("RefererInvocationHandler invoke false, so return default value: uri=" + cluster.getUrl().getUri()
                            + " " + MotanFrameworkUtil.toString(request), e);
                    return getDefaultReturnValue(returnType);
                } else {
                    LoggerUtil.error(
                            "RefererInvocationHandler invoke Error: uri=" + cluster.getUrl().getUri() + " "
                                    + MotanFrameworkUtil.toString(request), e);
                    throw e;
                }
            }
        }
        throw new MotanServiceException("Referer call Error: cluster not exist, interface=" + clz.getName() + " "
                + MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);
    }

方法有点长,它做了如下三件事:
* 判定方法是不是toString方法和equals方法,不允许调用这两个方法
* 创建DefaultRequest对象,这个对象用来远程调用
* 通过集群对象选出一个可以调用的远程服务,把请求发送过去,并且接收到结果或者异步结果
细节方面,如何使用HA和BL配置,以后再分析。

所以Motan的客户端,实际上是用到了JDK原生代理,附加上集群和注册中心的功能,调用方法的时候触发集群HA和BL模块,选出一个可以使用的服务端,发送DefaultRequest对象进行调用。

下次将以一次完整的客户端调用为基础,分析Motan客户端和服务器端的调用过程。

-EOF-

 类似资料: