SOFARPC 是蚂蚁SOFASTACK中的rpc框架,每一个中间件的兴起都值得我们学习它的设计理念,以拓展我们的知识储备。基本应该这里就不再展示了。没了解的过的同学可以参考:https://www.sofastack.tech/projects/sofa-rpc/overview/
本次基于zookeeper作为注册分析。
配置好接口后如下
@SofaService(interfaceType = HelloSofaV2.class,
bindings = {@SofaServiceBinding(bindingType = "bolt"),@SofaServiceBinding(bindingType = "rest")})
@Service
public class HelloSofav2Impl implements HelloSofaV2 {
private Logger logger = LoggerFactory.getLogger(HelloSofav2Impl.class);
@Override
public String sayHello(String sofa) {
logger.info("hello sofa...");
return "hellosofa";
}
}
会在接口实现上标识@SofaService注解。ServiceBeanFactoryPostProcessor是spring BeanFactoryPostProcessor扩展点的扩展通过编程的方式定义bean
ServiceBeanFactoryPostProcessor#postProcessBeanFactory->transformSofaBeanDefinition->generateSofaServiceDefinitionOnClass->generateSofaServiceDefinition
private void generateSofaServiceDefinition(String beanId, SofaService sofaServiceAnnotation, Class<?> beanClass, BeanDefinition beanDefinition, ConfigurableListableBeanFactory beanFactory) { if (sofaServiceAnnotation == null) { return; } AnnotationWrapperBuilder<SofaService> wrapperBuilder = AnnotationWrapperBuilder.wrap( sofaServiceAnnotation).withBinder(binder); //通过代理实现占位符解析 sofaServiceAnnotation = wrapperBuilder.build(); Class<?> interfaceType = sofaServiceAnnotation.interfaceType(); if (interfaceType.equals(void.class)) { Class<?> interfaces[] = beanClass.getInterfaces(); if (beanClass.isInterface() || interfaces == null || interfaces.length == 0) { interfaceType = beanClass; } else if (interfaces.length == 1) { interfaceType = interfaces[0]; } else { throw new FatalBeanException("Bean " + beanId + " has more than one interface."); } } BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(); String serviceId = SofaBeanNameGenerator.generateSofaServiceBeanName(interfaceType, sofaServiceAnnotation.uniqueId()); if (!beanFactory.containsBeanDefinition(serviceId)) { builder.getRawBeanDefinition().setScope(beanDefinition.getScope()); builder.setLazyInit(beanDefinition.isLazyInit()); builder.getRawBeanDefinition().setBeanClass(ServiceFactoryBean.class); builder.addPropertyValue(AbstractContractDefinitionParser.INTERFACE_CLASS_PROPERTY, interfaceType); builder.addPropertyValue(AbstractContractDefinitionParser.UNIQUE_ID_PROPERTY, sofaServiceAnnotation.uniqueId()); builder.addPropertyValue(AbstractContractDefinitionParser.BINDINGS, getSofaServiceBinding(sofaServiceAnnotation, sofaServiceAnnotation.bindings())); builder.addPropertyReference(ServiceDefinitionParser.REF, beanId); builder.addPropertyValue(ServiceDefinitionParser.BEAN_ID, beanId); builder.addPropertyValue(AbstractContractDefinitionParser.DEFINITION_BUILDING_API_TYPE, true); builder.addDependsOn(beanId); ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(serviceId, builder.getBeanDefinition()); } else { SofaLogger.error("SofaService was already registered: {}", serviceId); } }
通过BeanDefinitionBuilder创建ServiceFactoryBean。通过看SOFARPC简绍我们知道@SofaService注解支持占位符,占位符的解析通过 PlaceHolderAnnotationInvocationHandler 这个类去实现的,上面注释的部分是实现原理。到这需要讲目光转移到ServiceFactoryBean通过继承关系ServiceFactoryBean继承了AbstractContractFactoryBean实现了InitializingBean,模板的设计模式
ServiceFactoryBean#doAfterPropertiesSet
@Override
protected void doAfterPropertiesSet() {
if (!apiType && hasSofaServiceAnnotation()) {
throw new ServiceRuntimeException(
"Bean " + beanId + " of type " + ref.getClass()
+ " has already annotated by @SofaService,"
+ " can not be registered using xml. Please check it.");
}
Implementation implementation = new DefaultImplementation();
implementation.setTarget(ref);
service = buildService();
// default add jvm binding and service jvm binding should set serialize as true
if (bindings.size() == 0) {
JvmBindingParam jvmBindingParam = new JvmBindingParam().setSerialize(true);
bindings.add(new JvmBinding().setJvmBindingParam(jvmBindingParam));
}
for (Binding binding : bindings) {
service.addBinding(binding);
}
ComponentInfo componentInfo = new ServiceComponent(implementation, service,
bindingAdapterFactory, sofaRuntimeContext);
sofaRuntimeContext.getComponentManager().register(componentInfo);
}
主要注意力集中在**sofaRuntimeContext.getComponentManager().register(componentInfo);**通过调用会到 ComponentManagerImpl#doRegister
private ComponentInfo doRegister(ComponentInfo ci) {
ComponentName name = ci.getName();
if (isRegistered(name)) {
SofaLogger.error("Component was already registered: {}", name);
if (ci.canBeDuplicate()) {
return getComponentInfo(name);
}
throw new ServiceRuntimeException("Component can not be registered duplicated: " + name);
}
try {
ci.register();
} catch (Throwable t) {
SofaLogger.error("Failed to register component: {}", ci.getName(), t);
return null;
}
SofaLogger.info("Registering component: {}", ci.getName());
try {
ComponentInfo old = registry.putIfAbsent(ci.getName(), ci);
if (old != null) {
SofaLogger.error("Component was already registered: {}", name);
if (ci.canBeDuplicate()) {
return old;
}
throw new ServiceRuntimeException("Component can not be registered duplicated: "
+ name);
}
if (ci.resolve()) {
typeRegistry(ci);
//暴露服务
ci.activate();
}
} catch (Throwable t) {
ci.exception(new Exception(t));
SofaLogger.error("Failed to create the component {}", ci.getName(), t);
}
return ci;
}
ci.activate()会依靠com.alipay.sofa.runtime.spi.binding.Binding暴力服务 。Binding是SOFARPC中的扩展点。通过 BindingAdapterFactory包装成BindingAdapter<?>,通过
Object outBinding(Object contract, T binding, Object target,
SofaRuntimeContext sofaRuntimeContext);
暴露服务。以RpcBindingAdapter为例
@Override
public Object outBinding(Object contract, RpcBinding binding, Object target,
SofaRuntimeContext sofaRuntimeContext) {
ApplicationContext applicationContext = sofaRuntimeContext.getSofaRuntimeManager()
.getRootApplicationContext();
ProviderConfigContainer providerConfigContainer = applicationContext
.getBean(ProviderConfigContainer.class);
ProcessorContainer processorContainer = applicationContext
.getBean(ProcessorContainer.class);
String uniqueName = providerConfigContainer.createUniqueName((Contract) contract, binding);
ProviderConfig providerConfig = providerConfigContainer.getProviderConfig(uniqueName);
processorContainer.processorProvider(providerConfig);
if (providerConfig == null) {
throw new ServiceRuntimeException(LogCodes.getLog(
LogCodes.INFO_SERVICE_METADATA_IS_NULL, uniqueName));
}
try {
//暴露服务
providerConfig.export();
} catch (Exception e) {
throw new ServiceRuntimeException(LogCodes.getLog(LogCodes.ERROR_PROXY_PUBLISH_FAIL), e);
}
if (providerConfigContainer.isAllowPublish()) {
providerConfig.setRegister(true);
List<RegistryConfig> registrys = providerConfig.getRegistry();
for (RegistryConfig registryConfig : registrys) {
Registry registry = RegistryFactory.getRegistry(registryConfig);
registry.init();
registry.start();
registry.register(providerConfig);
}
}
return Boolean.TRUE;
}
providerConfig#export
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}
private void doExport() {
if (exported) {
return;
}
// 检查参数
checkParameters();
String appName = providerConfig.getAppName();
//key is the protocol of server,for concurrent safe
Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();
String key = providerConfig.buildKey() + ":" + protocol;
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
}
// 注意同一interface,同一uniqueId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
hasExportedInCurrent.put(serverConfig.getProtocol(), true);
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
decrementCounter(hasExportedInCurrent);
// 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_DUPLICATE_PROVIDER_CONFIG, key,
maxProxyCount));
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.WARN_DUPLICATE_PROVIDER_CONFIG, key, c));
}
}
}
}
try {
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
preProcessProviderTarget(providerConfig, (ProviderProxyInvoker) providerProxyInvoker);
// 初始化注册中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 将处理器注册到server
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注册请求调用器
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
//启动服务
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName,
LogCodes.getLog(LogCodes.ERROR_REGISTER_PROCESSOR_TO_SERVER, serverConfig.getId()), e);
}
}
// 注册到注册中心
providerConfig.setConfigListener(new ProviderAttributeListener());
register();
} catch (Exception e) {
decrementCounter(hasExportedInCurrent);
if (e instanceof SofaRpcRuntimeException) {
throw e;
}
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_BUILD_PROVIDER_PROXY), e);
}
// 记录一些缓存数据
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}
最后通过BoltServer启动服务。到这服务就发布完了。
通过sofaboot。spring.factories中看到有一个SofaRpcAutoConfiguration。里面配置一了一个SofaBootRpcStartListener,这个监听器监听 SofaBootRpcStartEvent。SofaBootRpcStartEvent又是ApplicationContextRefreshedListener 发布的,ApplicationContextRefreshedListener监听的是 ContextRefreshedEvent,也是就说spring boot 启动完成会发布一个SofaBootRpcStartEvent这个事件被SofaBootRpcStartListener监听到
@Override
public void onApplicationEvent(SofaBootRpcStartEvent event) {
//choose disable metrics lookout
disableLookout();
//extra info
processExtra(event);
//start fault tolerance
faultToleranceConfigurator.startFaultTolerance();
Collection<ProviderConfig> allProviderConfig = providerConfigContainer
.getAllProviderConfig();
if (!CollectionUtils.isEmpty(allProviderConfig)) {
//start server
serverConfigContainer.startServers();
}
//set allow all publish
providerConfigContainer.setAllowPublish(true);
//register registry
providerConfigContainer.publishAllProviderConfig();
//export dubbo
providerConfigContainer.exportAllDubboProvideConfig();
}
通过注释可以看到providerConfigContainer.publishAllProviderConfig(); 通过Registry注册服务到注册中心上
public void register(ProviderConfig config) {
String appName = config.getAppName();
if (!registryConfig.isRegister()) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE));
}
return;
}
//发布
if (config.isRegister()) {
registerProviderUrls(config);
}
if (config.isSubscribe()) {
// 订阅配置节点
if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) {
//订阅接口级配置
subscribeConfig(config, config.getConfigListener());
}
}
}
通过registerProviderUrls(config);我们可以看到具体的发布流程,到此服务注册完成。
接下来重点说明sofarpc spi 的应该
spi是jdk中的工具类,大量被其它框架引用sofarpc也不例外,sofarpc好多设计应该是模仿dubbo去设计的基于插件的微内核,因为jdk spi比较单一,很多框架基于jdk spi 做了扩展
ExtensionLoader 是sofarpc中spi 的工具栏,通过ExtensionLoaderFactory可以得到ExtensionLoader
ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
if (loader == null) {
synchronized (ExtensionLoaderFactory.class) {
loader = LOADER_MAP.get(clazz);
if (loader == null) {
loader = new ExtensionLoader<T>(clazz, listener);
LOADER_MAP.put(clazz, loader);
}
}
}
return loader;
}
一个类会绑定一个ExtensionLoader绑定,并且会做缓存,构建 ExtensionLoader
protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
if (RpcRunningState.isShuttingDown()) {
this.interfaceClass = null;
this.interfaceName = null;
this.listeners = null;
this.factory = null;
this.extensible = null;
this.all = null;
return;
}
// 接口为空,既不是接口,也不是抽象类
if (interfaceClass == null ||
!(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
}
this.interfaceClass = interfaceClass;
this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
this.listeners = new ArrayList<>();
if (listener != null) {
listeners.add(listener);
}
Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
if (extensible == null) {
throw new IllegalArgumentException(
"Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
} else {
this.extensible = extensible;
}
this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
if (autoLoad) {
List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
for (String path : paths) {
loadFromFile(path);
}
}
}
public @interface Extension {
/**
* 扩展点名字
*
* @return 扩展点名字
*/
String value();
/**
* 扩展点编码,默认不需要,当接口需要编码的时候需要
*
* @return 扩展点编码
* @see Extensible#coded()
*/
byte code() default -1;
/**
* 优先级排序,默认不需要
*
* @return 排序
*/
int order() default 0;
/**
* 是否覆盖其它低{@link #order()}的同名扩展
*
* @return 是否覆盖其它低排序的同名扩展
* @since 5.2.0
*/
boolean override() default false;
/**
* 排斥其它扩展,可以排斥掉其它低{@link #order()}的扩展
*
* @return 排斥其它扩展
* @since 5.2.0
*/
String[] rejection() default {};
}
会按照优先级、相同的扩展点存在是否覆盖、等特性。
sofarpc中扩展点存在META-INF/services/sofa-rpc/、META-INF/services/路径中,
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tDkD8CgW-1626429409833)(C:\Users\subay\AppData\Roaming\Typora\typora-user-images\image-20210716165212395.png)]
sofarpc 中所有的扩展点
默认的负载均衡算法是 random
默认容错策略 failover
sofarpc的默认配置都在 rpc-config-default.json 中
如果想覆盖默认配置需要在sofa-rpc/rpc-config.json或者
META-INF/sofa-rpc/rpc-config.json中配置覆盖通过 rpc.config.order定义优先级
到处服务发布的基本流程和关键的技术点已经完成了。下节我们一起来分析服务引用流程
个人公众号也会发布一些spring stack,sofa stack 源码分析文章
点
默认的负载均衡算法是 random
默认容错策略 failover
sofarpc的默认配置都在 rpc-config-default.json 中
如果想覆盖默认配置需要在sofa-rpc/rpc-config.json或者
META-INF/sofa-rpc/rpc-config.json中配置覆盖通过 rpc.config.order定义优先级
到处服务发布的基本流程和关键的技术点已经完成了。下节我们一起来分析服务引用流程
个人公众号 奋进的IT创业者