当前位置: 首页 > 工具软件 > SOFARPC > 使用案例 >

sofarpc高级进阶之服务发布刨析

张伯寅
2023-12-01

SOFARPC 服务发布原理

SOFARPC 是蚂蚁SOFASTACK中的rpc框架,每一个中间件的兴起都值得我们学习它的设计理念,以拓展我们的知识储备。基本应该这里就不再展示了。没了解的过的同学可以参考:https://www.sofastack.tech/projects/sofa-rpc/overview/

本次基于zookeeper作为注册分析。

暴露服务

配置好接口后如下

@SofaService(interfaceType = HelloSofaV2.class,
        bindings = {@SofaServiceBinding(bindingType = "bolt"),@SofaServiceBinding(bindingType = "rest")})
@Service
public class HelloSofav2Impl implements HelloSofaV2 {
    private Logger logger = LoggerFactory.getLogger(HelloSofav2Impl.class);
    @Override
    public String sayHello(String sofa) {
        logger.info("hello sofa...");
        return "hellosofa";
    }
}

会在接口实现上标识@SofaService注解。ServiceBeanFactoryPostProcessor是spring BeanFactoryPostProcessor扩展点的扩展通过编程的方式定义bean

ServiceBeanFactoryPostProcessor#postProcessBeanFactory->transformSofaBeanDefinition->generateSofaServiceDefinitionOnClass->generateSofaServiceDefinition

private void generateSofaServiceDefinition(String beanId, SofaService sofaServiceAnnotation,
                                               Class<?> beanClass, BeanDefinition beanDefinition,
                                               ConfigurableListableBeanFactory beanFactory) {
        if (sofaServiceAnnotation == null) {
            return;
        }
        AnnotationWrapperBuilder<SofaService> wrapperBuilder = AnnotationWrapperBuilder.wrap(
            sofaServiceAnnotation).withBinder(binder);
    	//通过代理实现占位符解析
        sofaServiceAnnotation = wrapperBuilder.build();

        Class<?> interfaceType = sofaServiceAnnotation.interfaceType();
        if (interfaceType.equals(void.class)) {
            Class<?> interfaces[] = beanClass.getInterfaces();

            if (beanClass.isInterface() || interfaces == null || interfaces.length == 0) {
                interfaceType = beanClass;
            } else if (interfaces.length == 1) {
                interfaceType = interfaces[0];
            } else {
                throw new FatalBeanException("Bean " + beanId + " has more than one interface.");
            }
        }

        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition();
        String serviceId = SofaBeanNameGenerator.generateSofaServiceBeanName(interfaceType,
            sofaServiceAnnotation.uniqueId());

        if (!beanFactory.containsBeanDefinition(serviceId)) {
            builder.getRawBeanDefinition().setScope(beanDefinition.getScope());
            builder.setLazyInit(beanDefinition.isLazyInit());
            builder.getRawBeanDefinition().setBeanClass(ServiceFactoryBean.class);
            builder.addPropertyValue(AbstractContractDefinitionParser.INTERFACE_CLASS_PROPERTY,
                interfaceType);
            builder.addPropertyValue(AbstractContractDefinitionParser.UNIQUE_ID_PROPERTY,
                sofaServiceAnnotation.uniqueId());
            builder.addPropertyValue(AbstractContractDefinitionParser.BINDINGS,
                getSofaServiceBinding(sofaServiceAnnotation, sofaServiceAnnotation.bindings()));
            builder.addPropertyReference(ServiceDefinitionParser.REF, beanId);
            builder.addPropertyValue(ServiceDefinitionParser.BEAN_ID, beanId);
            builder.addPropertyValue(AbstractContractDefinitionParser.DEFINITION_BUILDING_API_TYPE,
                true);
            builder.addDependsOn(beanId);
            ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(serviceId,
                builder.getBeanDefinition());
        } else {
            SofaLogger.error("SofaService was already registered: {}", serviceId);
        }
    }

通过BeanDefinitionBuilder创建ServiceFactoryBean。通过看SOFARPC简绍我们知道@SofaService注解支持占位符,占位符的解析通过 PlaceHolderAnnotationInvocationHandler 这个类去实现的,上面注释的部分是实现原理。到这需要讲目光转移到ServiceFactoryBean通过继承关系ServiceFactoryBean继承了AbstractContractFactoryBean实现了InitializingBean,模板的设计模式

ServiceFactoryBean#doAfterPropertiesSet

@Override
    protected void doAfterPropertiesSet() {
        if (!apiType && hasSofaServiceAnnotation()) {
            throw new ServiceRuntimeException(
                "Bean " + beanId + " of type " + ref.getClass()
                        + " has already annotated by @SofaService,"
                        + " can not be registered using xml. Please check it.");
        }

        Implementation implementation = new DefaultImplementation();
        implementation.setTarget(ref);
        service = buildService();

        // default add jvm binding and service jvm binding should set serialize as true
        if (bindings.size() == 0) {
            JvmBindingParam jvmBindingParam = new JvmBindingParam().setSerialize(true);
            bindings.add(new JvmBinding().setJvmBindingParam(jvmBindingParam));
        }

        for (Binding binding : bindings) {
            service.addBinding(binding);
        }

        ComponentInfo componentInfo = new ServiceComponent(implementation, service,
            bindingAdapterFactory, sofaRuntimeContext);
        sofaRuntimeContext.getComponentManager().register(componentInfo);
    }

主要注意力集中在**sofaRuntimeContext.getComponentManager().register(componentInfo);**通过调用会到 ComponentManagerImpl#doRegister

private ComponentInfo doRegister(ComponentInfo ci) {
        ComponentName name = ci.getName();
        if (isRegistered(name)) {
            SofaLogger.error("Component was already registered: {}", name);
            if (ci.canBeDuplicate()) {
                return getComponentInfo(name);
            }
            throw new ServiceRuntimeException("Component can not be registered duplicated: " + name);
        }

        try {
            ci.register();
        } catch (Throwable t) {
            SofaLogger.error("Failed to register component: {}", ci.getName(), t);
            return null;
        }

        SofaLogger.info("Registering component: {}", ci.getName());

        try {
            ComponentInfo old = registry.putIfAbsent(ci.getName(), ci);
            if (old != null) {
                SofaLogger.error("Component was already registered: {}", name);
                if (ci.canBeDuplicate()) {
                    return old;
                }
                throw new ServiceRuntimeException("Component can not be registered duplicated: "
                                                  + name);

            }
            if (ci.resolve()) {
                typeRegistry(ci);
                //暴露服务
                ci.activate();
            }
        } catch (Throwable t) {
            ci.exception(new Exception(t));
            SofaLogger.error("Failed to create the component {}", ci.getName(), t);
        }

        return ci;
    }

ci.activate()会依靠com.alipay.sofa.runtime.spi.binding.Binding暴力服务 。Binding是SOFARPC中的扩展点。通过 BindingAdapterFactory包装成BindingAdapter<?>,通过

Object outBinding(Object contract, T binding, Object target,
                  SofaRuntimeContext sofaRuntimeContext);

暴露服务。以RpcBindingAdapter为例

@Override
    public Object outBinding(Object contract, RpcBinding binding, Object target,
                             SofaRuntimeContext sofaRuntimeContext) {

        ApplicationContext applicationContext = sofaRuntimeContext.getSofaRuntimeManager()
            .getRootApplicationContext();
        ProviderConfigContainer providerConfigContainer = applicationContext
            .getBean(ProviderConfigContainer.class);
        ProcessorContainer processorContainer = applicationContext
            .getBean(ProcessorContainer.class);

        String uniqueName = providerConfigContainer.createUniqueName((Contract) contract, binding);
        ProviderConfig providerConfig = providerConfigContainer.getProviderConfig(uniqueName);
        processorContainer.processorProvider(providerConfig);

        if (providerConfig == null) {
            throw new ServiceRuntimeException(LogCodes.getLog(
                LogCodes.INFO_SERVICE_METADATA_IS_NULL, uniqueName));
        }

        try {
            //暴露服务
            providerConfig.export();
        } catch (Exception e) {
            throw new ServiceRuntimeException(LogCodes.getLog(LogCodes.ERROR_PROXY_PUBLISH_FAIL), e);
        }

        if (providerConfigContainer.isAllowPublish()) {
            providerConfig.setRegister(true);
            List<RegistryConfig> registrys = providerConfig.getRegistry();
            for (RegistryConfig registryConfig : registrys) {
                Registry registry = RegistryFactory.getRegistry(registryConfig);
                registry.init();
                registry.start();
                registry.register(providerConfig);
            }
        }
        return Boolean.TRUE;
    }

providerConfig#export

@Override
    public void export() {
        if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
            Thread thread = factory.newThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(providerConfig.getDelay());
                    } catch (Throwable ignore) { // NOPMD
                    }
                    doExport();
                }
            });
            thread.start();
        } else {
            doExport();
        }
    }
private void doExport() {
        if (exported) {
            return;
        }

        // 检查参数
        checkParameters();

        String appName = providerConfig.getAppName();

        //key  is the protocol of server,for concurrent safe
        Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
        // 将处理器注册到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            String protocol = serverConfig.getProtocol();

            String key = providerConfig.buildKey() + ":" + protocol;

            if (LOGGER.isInfoEnabled(appName)) {
                LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
            }

            // 注意同一interface,同一uniqueId,不同server情况
            AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
            }
            int c = cnt.incrementAndGet();
            hasExportedInCurrent.put(serverConfig.getProtocol(), true);
            int maxProxyCount = providerConfig.getRepeatedExportLimit();
            if (maxProxyCount > 0) {
                if (c > maxProxyCount) {
                    decrementCounter(hasExportedInCurrent);
                    // 超过最大数量,直接抛出异常
                    throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_DUPLICATE_PROVIDER_CONFIG, key,
                        maxProxyCount));
                } else if (c > 1) {
                    if (LOGGER.isInfoEnabled(appName)) {
                        LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.WARN_DUPLICATE_PROVIDER_CONFIG, key, c));
                    }
                }
            }

        }

        try {
            // 构造请求调用器
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig);

            preProcessProviderTarget(providerConfig, (ProviderProxyInvoker) providerProxyInvoker);
            // 初始化注册中心
            if (providerConfig.isRegister()) {
                List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
                if (CommonUtils.isNotEmpty(registryConfigs)) {
                    for (RegistryConfig registryConfig : registryConfigs) {
                        RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                    }
                }
            }
            // 将处理器注册到server
            for (ServerConfig serverConfig : serverConfigs) {
                try {
                    Server server = serverConfig.buildIfAbsent();
                    // 注册请求调用器
                    server.registerProcessor(providerConfig, providerProxyInvoker);
                    if (serverConfig.isAutoStart()) {
                        //启动服务
                        server.start();
                    }

                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    LOGGER.errorWithApp(appName,
                        LogCodes.getLog(LogCodes.ERROR_REGISTER_PROCESSOR_TO_SERVER, serverConfig.getId()), e);
                }
            }

            // 注册到注册中心
            providerConfig.setConfigListener(new ProviderAttributeListener());
            register();
        } catch (Exception e) {
            decrementCounter(hasExportedInCurrent);
            if (e instanceof SofaRpcRuntimeException) {
                throw e;
            }
            throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_BUILD_PROVIDER_PROXY), e);
        }

        // 记录一些缓存数据
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }

最后通过BoltServer启动服务。到这服务就发布完了。

注册服务

通过sofaboot。spring.factories中看到有一个SofaRpcAutoConfiguration。里面配置一了一个SofaBootRpcStartListener,这个监听器监听 SofaBootRpcStartEvent。SofaBootRpcStartEvent又是ApplicationContextRefreshedListener 发布的,ApplicationContextRefreshedListener监听的是 ContextRefreshedEvent,也是就说spring boot 启动完成会发布一个SofaBootRpcStartEvent这个事件被SofaBootRpcStartListener监听到

@Override
    public void onApplicationEvent(SofaBootRpcStartEvent event) {
        //choose disable metrics lookout
        disableLookout();

        //extra info
        processExtra(event);

        //start fault tolerance
        faultToleranceConfigurator.startFaultTolerance();

        Collection<ProviderConfig> allProviderConfig = providerConfigContainer
            .getAllProviderConfig();
        if (!CollectionUtils.isEmpty(allProviderConfig)) {
            //start server
            serverConfigContainer.startServers();
        }

        //set allow all publish
        providerConfigContainer.setAllowPublish(true);

        //register registry
        providerConfigContainer.publishAllProviderConfig();

        //export dubbo
        providerConfigContainer.exportAllDubboProvideConfig();
    }

通过注释可以看到providerConfigContainer.publishAllProviderConfig(); 通过Registry注册服务到注册中心上

public void register(ProviderConfig config) {
        String appName = config.getAppName();
        if (!registryConfig.isRegister()) {
            if (LOGGER.isInfoEnabled(appName)) {
                LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE));
            }
            return;
        }

        //发布
        if (config.isRegister()) {
            registerProviderUrls(config);
        }

        if (config.isSubscribe()) {
            // 订阅配置节点
            if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) {
                //订阅接口级配置
                subscribeConfig(config, config.getConfigListener());
            }
        }
    }

通过registerProviderUrls(config);我们可以看到具体的发布流程,到此服务注册完成。

SPI

接下来重点说明sofarpc spi 的应该

spi是jdk中的工具类,大量被其它框架引用sofarpc也不例外,sofarpc好多设计应该是模仿dubbo去设计的基于插件的微内核,因为jdk spi比较单一,很多框架基于jdk spi 做了扩展

ExtensionLoader 是sofarpc中spi 的工具栏,通过ExtensionLoaderFactory可以得到ExtensionLoader

ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
        ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
        if (loader == null) {
            synchronized (ExtensionLoaderFactory.class) {
                loader = LOADER_MAP.get(clazz);
                if (loader == null) {
                    loader = new ExtensionLoader<T>(clazz, listener);
                    LOADER_MAP.put(clazz, loader);
                }
            }
        }
        return loader;
    }

一个类会绑定一个ExtensionLoader绑定,并且会做缓存,构建 ExtensionLoader

protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
        if (RpcRunningState.isShuttingDown()) {
            this.interfaceClass = null;
            this.interfaceName = null;
            this.listeners = null;
            this.factory = null;
            this.extensible = null;
            this.all = null;
            return;
        }
        // 接口为空,既不是接口,也不是抽象类
        if (interfaceClass == null ||
                !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
            throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
        }
        this.interfaceClass = interfaceClass;
        this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
        this.listeners = new ArrayList<>();
        if (listener != null) {
            listeners.add(listener);
        }
        Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
        if (extensible == null) {
            throw new IllegalArgumentException(
                    "Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
        } else {
            this.extensible = extensible;
        }

        this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
        this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
        if (autoLoad) {
            List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
            for (String path : paths) {
                loadFromFile(path);
            }
        }
    }
public @interface Extension {
    /**
     * 扩展点名字
     *
     * @return 扩展点名字
     */
    String value();

    /**
     * 扩展点编码,默认不需要,当接口需要编码的时候需要
     *
     * @return 扩展点编码
     * @see Extensible#coded()
     */
    byte code() default -1;

    /**
     * 优先级排序,默认不需要
     *
     * @return 排序
     */
    int order() default 0;

    /**
     * 是否覆盖其它低{@link #order()}的同名扩展
     *
     * @return 是否覆盖其它低排序的同名扩展
     * @since 5.2.0
     */
    boolean override() default false;

    /**
     * 排斥其它扩展,可以排斥掉其它低{@link #order()}的扩展
     *
     * @return 排斥其它扩展
     * @since 5.2.0
     */
    String[] rejection() default {};
}

会按照优先级、相同的扩展点存在是否覆盖、等特性。

sofarpc中扩展点存在META-INF/services/sofa-rpc/、META-INF/services/路径中,

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tDkD8CgW-1626429409833)(C:\Users\subay\AppData\Roaming\Typora\typora-user-images\image-20210716165212395.png)]

sofarpc 中所有的扩展点

负载均衡

  1. consistentHash
  2. localPref
  3. random
  4. roundRobin
  5. weightRoundRobin
  6. weightConsistentHash
  7. auto

默认的负载均衡算法是 random

集群容错

  1. failfast
  2. failover

默认容错策略 failover

sofarpc 全局配置

sofarpc的默认配置都在 rpc-config-default.json 中

如果想覆盖默认配置需要在sofa-rpc/rpc-config.json或者

META-INF/sofa-rpc/rpc-config.json中配置覆盖通过 rpc.config.order定义优先级

到处服务发布的基本流程和关键的技术点已经完成了。下节我们一起来分析服务引用流程

个人公众号也会发布一些spring stack,sofa stack 源码分析文章

负载均衡

  1. consistentHash
  2. localPref
  3. random
  4. roundRobin
  5. weightRoundRobin
  6. weightConsistentHash
  7. auto

默认的负载均衡算法是 random

集群容错

  1. failfast
  2. failover

默认容错策略 failover

sofarpc 全局配置

sofarpc的默认配置都在 rpc-config-default.json 中

如果想覆盖默认配置需要在sofa-rpc/rpc-config.json或者

META-INF/sofa-rpc/rpc-config.json中配置覆盖通过 rpc.config.order定义优先级

到处服务发布的基本流程和关键的技术点已经完成了。下节我们一起来分析服务引用流程

个人公众号 奋进的IT创业者

 类似资料: