1.SOFA RPC源码解析
1.1 RPC配置
1.1.1 源码解析
在使用SOFA RPC的过程中,RPC相关的配置通过RpcConfigs实现。
RpcConfigs存在两个属性:
1. /**
2. * 全部配置
3. */
4. private final staticConcurrentHashMap<String, Object> CFG = new ConcurrentHashMap<String,Object>();
5. /**
6. * 配置变化监听器
7. */
8. private final staticConcurrentHashMap<String, List<RpcConfigListener>> CFG_LISTENER =new ConcurrentHashMap<String, List<RpcConfigListener>>();
其中:
- CFG用于缓存所有的配置;
- CFG_LISTENER缓存所有的配置监听器,当配置发生变化时,做相应的处理。
RpcConfigs在第一次使用时,通过静态代码块进行初始化,加载所有RPC相关的配置。
1. static {
2. init(); // 加载配置文件
3. }
RpcConfigs初始化:
1. private static void init() {
2. try {
3. // loadDefault
4. String json =FileUtils.file2String(RpcConfigs.class, "rpc-config-default.json","UTF-8");
5. Map map = JSON.parseObject(json,Map.class);
6. CFG.putAll(map);
7.
8. // loadCustom
9. loadCustom("sofa-rpc/rpc-config.json");
10. loadCustom("META-INF/sofa-rpc/rpc-config.json");
11.
12. // load system properties
13. CFG.putAll(newHashMap(System.getProperties())); // 注意部分属性可能被覆盖为字符串
14. } catch (Exception e) {
15. throw newSofaRpcRuntimeException("Catch Exception when load RpcConfigs", e);
16. }
17. }
第一步,加载RpcConfigs类所在目录下rpc-config-default.json文件,该文件为RPC缺省配置,具体内容参考如下:
1. /*
2. 本文件为SOFA RPC客户端内置的默认配置文件。
3. 用户可以在自己的工程中自定义rpc-config.json进行默认配置覆盖。
4. 所有的属性均可配置。(通过-D或者setProperty)
5.
6. PS:大家也看到了,本JSON文档是支持注释的,而标准JSON是不支持的,这属于宽松的的json格式
7. */
8. {
9. /*-------------RPC框架内部使用配置项-------------*/
10. //本配置文件加载顺序,越大越后加载
11. "rpc.config.order": 0,
12. //日志实现,日志早于配置加载,所以不能适应Extension机制
13. "logger.impl":"com.alipay.sofa.rpc.log.SLF4JLoggerImpl",
14. //扩展点加载的路径
15. "extension.load.path": [
16. "META-INF/services/sofa-rpc/",
17. "META-INF/services/"
18. ],
19. //需要被加载的模块列表,多个用逗号隔开
20. "module.load.list" : "*",
21. /*-------------RPC框架内部使用配置项-------------*/
22.
23.
24. /*-------------系统运行时相关配置开始-------------*/
25. //0代表自动判断,也可以自定义注入,或者启动参数注入
26. "system.cpu.cores": 0,
27. //是否允许线程上下文携带自定义参数,默认true,关闭后,可能tracer等会失效,但是会提高性能
28. "context.attachment.enable": true,
29. //是否启动事件总线,关闭后,可能tracer等会失效,但是可以提高性能
30. "event.bus.enable": true,
31. //主动监听JVM关闭事件,默认true,如果有外部管理框架,可以由外部开启回收
32. "jvm.shutdown.hook": true,
33. //是否增加序列化安全黑名单,关闭后可提供性能
34. "serialize.blacklist.enable":false,
35. //是否支持多ClassLoader支持,如果是单ClassLoader环境,可以关闭提高性能
36. "multiple.classloader.enable":false,
37. //是否允许请求和响应透传数据,关闭后,会提高性能
38. "invoke.baggage.enable": false,
39. /*-------------系统运行时相关配置开始-------------*/
40.
41.
42. /*-------------默认配置值开始-------------*/
43. //默认Provider启动器
44. "default.provider.bootstrap":"bolt",
45. //默认Consumer启动器
46. "default.consumer.bootstrap":"bolt",
47. //默认uniqueId 做为服务唯一标识
48. "default.uniqueId": "",
49. //默认version 做为服务唯一标识
50. "default.version": "1.0",
51. //默认分组不做为服务唯一标识
52. "default.group": "",
53. //默认注册中心
54. "default.registry":"zookeeper",
55. //默认协议
56. "default.protocol":"bolt",
57. //默认序列化
58. "default.serialization":"hessian2",
59. //默认压缩算法
60. "default.compress":"snappy",
61. //默认代理类
62. "default.proxy":"javassist",
63. //默认字符集
64. "default.charset":"UTF-8",
65. //默认网络层
66. "default.transport":"netty4",
67. //默认tracer实现
68. "default.tracer": "",
69. /*-------------默认配置值结束-------------*/
70.
71.
72. /*-------------Registry相关配置开始-------------*/
73. //注册中心地址发现服务的地址
74. "registry.index.address":"",
75. /*默认连注册中心的超时时间*/
76. "registry.connect.timeout": 20000,
77. //注册中心等待结果的超时时间
78. "registry.disconnect.timeout":10000,
79. //注册中心调用超时时间
80. "registry.invoke.timeout": 10000,
81. //注册中心心跳发送间隔
82. "registry.heartbeat.period": 30000,
83. //注册中心重建连接的间隔
84. "registry.reconnect.period": 30000,
85. //是否开启有注册中心的批量注册/反注册 boolean 默认true,重连或者销毁时采用批量的方式。配为false则采取旧方式。 1.6.0
86. "registry.batch": false,
87. //如果开启,批量的条数
88. "registry.batch.size": 10,
89. /*-------------Registry相关配置开始-------------*/
90.
91. /*-------------Server相关配置开始-------------*/
92. //默认绑定网卡
93. "server.host": "0.0.0.0",
94. //端口段开始
95. "server.port.start": 12200,
96. //端口段结束
97. "server.port.end": 65535,
98. //默认contextPath
99. "server.context.path":"",
100. //默认io线程大小,推荐自动设置
101. "server.ioThreads": 0,
102. //默认业务线程池类型
103. "server.pool.type":"cached",
104. //默认业务线程池最小
105. "server.pool.core": 20,
106. //默认业务线程池最大
107. "server.pool.max": 200,
108. //是否允许telnet,针对自定义协议
109. "server.telnet": true,
110. //默认普通业务线程池队列
111. "server.pool.queue.type":"normal",
112. //默认业务线程池队列大小
113. "server.pool.queue": 0,
114. // 默认业务线程池回收时间
115. "server.pool.aliveTime": 60000,
116. //默认业务线程池是否初始化核心线程
117. "server.pool.pre.start": false,
118. //最大支持长连接
119. "server.accepts": 100000,
120. //是否启动epoll
121. "server.epoll": false,
122. //是否hold住端口,true的话随主线程退出而退出,false的话则要主动退出
123. "server.daemon": true,
124. //端口是否自适应,如果端口被占用,自动+1
125. "server.adaptive.port": false,
126. //服务端是否自动启动
127. "server.auto.start": true,
128. //服务端关闭超时时间
129. "server.stop.timeout": 20000,
130. /*-------------Server相关配置结束-------------*/
131.
132.
133. /*-------------Provider&Consumer公共配置开始-------------*/
134. //默认服务是否注册
135. "service.register": true,
136. //默认服务是否订阅
137. "service.subscribe": true,
138. /*-------------Provider&Consumer公共配置公共配置结束-------------*/
139.
140.
141. /*-------------Provider相关配置开始-------------*/
142. //默认服务端权重
143. "provider.weight": 100,
144. //默认发布延迟,代表spring启动后触发
145. "provider.delay": -1,
146. //默认发布方法:全部
147. "provider.include": "*",
148. //默认不发布方法
149. "provider.exclude": "",
150. //默认动态注册,false代表不主动发布
151. "provider.dynamic": true,
152. //接口优先级
153. "provider.priority": 0,
154. //服务端调用超时,不打断执行。0表示不判断
155. "provider.invoke.timeout": 0,
156. //接口下每方法的最大可并行执行请求数,配置-1关闭并发过滤器,等于0表示开启过滤但是不限制
157. "provider.concurrents": 0,
158. //同一个服务(接口协议uniqueId相同)的最大发布次数,防止由于代码bug导致重复发布。注意:后面的发布可能会覆盖前面的实现
159. "provider.repeated.export.limit":1,
160. /*-------------Provider相关配置结束-------------*/
161.
162.
163. /*-------------Consumer相关配置开始-------------*/
164. //集群策略
165. "consumer.cluster":"failover",
166. //默认连接全部建立长连接
167. "consumer.connectionHolder":"all",
168. //默认单分组
169. "consumer.addressHolder":"singleGroup",
170. //负载均衡
171. "consumer.loadBalancer":"random",
172. //默认失败重试次数
173. "consumer.retries": 0,
174. //接口下每方法的最大可并行执行请求数,配置-1关闭并发过滤器,等于0表示开启过滤但是不限制
175. "consumer.concurrents": 0,
176. //默认是否异步
177. "consumer.invokeType":"sync",
178. //默认不延迟加载
179. "consumer.lazy": false,
180. //默认粘滞连接
181. "consumer.sticky": false,
182. //是否jvm内部调用(provider和consumer配置在同一个jvm内,则走本地jvm内部,不走远程)
183. "consumer.inJVM": false,
184. //是否强依赖(即没有服务节点就启动失败)
185. "consumer.check": false,
186. //默认长连接数
187. "consumer.connection.num": 1,
188. //默认consumer连provider超时时间
189. "consumer.connect.timeout": 5000,
190. //默认consumer断开时等待结果的超时时间
191. "consumer.disconnect.timeout":10000,
192. //默认consumer调用provider超时时间
193. "consumer.invoke.timeout": 5000,
194. //心跳发送间隔
195. "consumer.heartbeat.period": 30000,
196. //重建连接间隔
197. "consumer.reconnect.period": 10000,
198. //等待地址时间,-1表示一直等
199. "consumer.address.wait": -1,
200. //同一个服务(接口协议uniqueId相同)的最大引用次数,防止由于代码bug导致重复引用,每次引用都会生成一个代理类对象
201. "consumer.repeated.reference.limit":3,
202. //本地缓存的StreamObserver最大实例数
203. "stream.observer.max.size": 10000,
204. //本地缓存的Callback最大实例数
205. "callback.max.size": 1000,
206. /*-------------Consumer相关配置结束-------------*/
207.
208.
209. /*-------------异步队列相关配置开始-------------*/
210. //默认回调线程池最小
211. "async.pool.core": 10,
212. //默认回调线程池最大
213. "async.pool.max": 200,
214. //默认回调线程池队列
215. "async.pool.queue": 256,
216. // 默认回调线程池回收时间
217. "async.pool.time": 60000,
218. /*-------------异步队列相关配置结束-------------*/
219.
220.
221. /*-------------Transport层相关配置开始-------------*/
222. //使用epoll
223. "transport.use.epoll": false,
224. //默认数据包大小 8*1024*1024
225. "transport.payload.max": 8388608,
226. //客户端io线程数,默认 max(4,cpu+1)
227. "transport.client.io.threads": 0,
228. //即I/O操作和用户自定义任务的执行时间比为1:1
229. "transport.client.io.ratio": 50,
230. "transport.server.backlog": 35536,
231. "transport.server.reuseAddr": true,
232. "transport.server.keepAlive": true,
233. "transport.server.tcpNoDelay":true,
234. "transport.server.io.ratio": 50,
235. //boss线程,默认max(4,cpu/2)
236. "transport.server.boss.threads": 0,
237. //服务端IO线程 max(8,cpu+1)
238. "transport.server.io.threads": 0,
239. //最大长连接
240. //"transport.server.max.connection": 65536,
241. //是否允许telnet
242. //"transport.server.telnet" : true,
243. //是否守护线程,true随主线程退出而退出,false需要主动退出
244. //"transport.server.daemon" : true,
245. //线程方法模型
246. "transport.server.dispatcher":"",
247. //是否一个端口支持多协议
248. "transport.server.protocol.adaptive":true,
249. //buffer默认大小
250. "transport.buffer.size": 8192,
251. //写入的buffer水位最大值
252. "transport.buffer.max": 32768,
253. //写入的buffer水位最小值
254. "transport.buffer.min": 1024,
255. //是否跨接口长链接复用
256. "transport.connection.reuse": true,
257. //是否开启压缩
258. "compress.open": false,
259. //开启压缩的大小基线
260. "compress.size.baseline": 2048,
261. /*-------------Transport层相关配置结束-------------*/
262.
263. /*
264. *其它 TODO
265. */
266. // Consumer调用时是否发生信息 boolean 默认true,服务端拿到调用端的自动部署应用信息。配为false,服务端拿不到调用者信息 1.6.0
267. "invoke.send.app": false,
268. // 序列化时是否检查父子类(声明参数和传入值是父子类) boolean 默认true,如果业务不存在这种情况可以配置为false,提高性能 1.0.0
269. "serialize.check.class": false,
270. // 序列化是否检测循环引用类型 string 默认true,针对c++等调用者进行关闭 1.6.0
271. "serialize.check.reference": false,
272. // 客户端是否使用epoll(针对linux) boolean 默认false,Linux开启epoll可提高性能 1.0.3
273. "transport.consumer.epoll": false,
274. //是否检测系统时间,需要filter配合
275. "check.system.time": false,
276. //计数器服务调用步长 int 默认1,每次丢调用计数器,调大可以提高性能但会减低精准 1.2.0
277. "counter.batch": false,
278. //json序列化是否返回null字段。 boolean 默认false,默认只返回有属性的字段。打开会降低效率。 1.5.1
279. "json.serialize.fill.empty": false,
280. // consumer的服务端列表是否可被清空 boolean 默认false,默认不能被清空(最后的清空请求被忽略),防止删全部节点等误操作,但是会影响列表的准确性。 1.6.0
281. "consumer.provider.nullable":false,
282. // json序列化的时候,开启的特性 string 默认空,参见fastjson.serializer.SerializerFeature,多个逗号分隔 1.6.0
283. "json.serializer.features": false,
284. // json解析的时候,开启的特性 string 默认空,参见fastjson.parser.Feature,多个逗号分隔 1.6.0
285. "json.parser.features": false,
286. //心跳在IO线程吗? TODO
287. "transport.heart.io": true,
288. //协商请求在IO线程执行吗? TODO
289. "transport.negotiator.async":false,
290. //是否所有客户端共享一个重连线程
291. "consumer.share.reconnect.thread":false
292. }
第二步,通过类加载器加载当前类加载器路径上的所有sofa-rpc/rpc-config.json文件以及父类加载器上的所有sofa-rpc/rpc-config.json文件,包括classpath路径和所有该路径上的Jar包,具体内容参考如下:
1. {
2. /*-------------RPC框架内部使用配置项-------------*/
3. //本配置文件加载顺序,越大越后加载
4. "rpc.config.order": 200,
5. //日志实现,日志早于配置加载,所以不能适应Extension机制
6. "logger.impl":"com.alipay.sofa.rpc.log.MiddlewareLoggerImpl",
7. /*-------------RPC框架内部使用配置项-------------*/
8.
9. /*-------------系统运行时相关配置开始-------------*/
10. //是否增加序列化安全黑名单,关闭后可提供性能
11. "serialize.blacklist.enable": true,
12. //是否支持多ClassLoader支持,如果是单ClassLoader环境,可以关闭提高性能
13. "multiple.classloader.enable":true,
14. //是否允许请求和响应透传数据,关闭后,会提高性能
15. "invoke.baggage.enable": true,
16. /*-------------系统运行时相关配置开始-------------*/
17.
18.
19. /*-------------默认配置值开始-------------*/
20. //默认Provider启动器
21. "default.provider.bootstrap":"bolt",
22. //默认Consumer启动器
23. "default.consumer.bootstrap":"bolt",
24. //默认代理类型
25. "default.proxy" : "jdk",
26. //默认网络层
27. "default.transport":"bolt",
28. //默认tracer实现
29. "default.tracer": "",
30. /*-------------默认配置值结束-------------*/
31.
32.
33. /*-------------Server相关配置开始-------------*/
34. // 默认业务线程池回收时间
35. "server.pool.aliveTime": 300000,
36. //服务端关闭超时时间
37. "server.stop.timeout": 10000,
38. /*-------------Server相关配置结束-------------*/
39.
40.
41. /*-------------Consumer相关配置开始-------------*/
42. //默认consumer连provider超时时间
43. "consumer.connect.timeout": 3000,
44. //默认consumer断开时等待结果的超时时间
45. "consumer.disconnect.timeout":5000,
46. //默认consumer调用provider超时时间
47. "consumer.invoke.timeout": 3000
48.
49. /*-------------Consumer相关配置结束-------------*/
50. }
第三步,通过类加载器加载当前类加载器路径上的所有META-INF/sofa-rpc/rpc-config.json文件以及父类加载器上的所有META-INF/sofa-rpc/rpc-config.json文件,包括classpath路径和所有该路径上的Jar包,具体内容和格式参考上述内容。
最后,把System.getProperties()系统属性增加到CFG中,具体内容参考如下:
1. java.version Java Runtime Environmentversion
2. java.vendor Java Runtime Environmentvendor
3. java.vendor.url Java vendor URL
4. java.home Java installation directory
5. java.vm.specification.version Java VirtualMachine specification version
6. java.vm.specification.vendor Java VirtualMachine specification vendor
7. java.vm.specification.name Java VirtualMachine specification name
8. java.vm.version Java Virtual Machineimplementation version
9. java.vm.vendor Java Virtual Machineimplementation vendor
10. java.vm.name Java Virtual Machineimplementation name
11. java.specification.version Java RuntimeEnvironment specification version
12. java.specification.vendor Java RuntimeEnvironment specification vendor
13. java.specification.name Java RuntimeEnvironment specification name
14. java.class.version Java class formatversion number
15. java.class.path Java class path
16. java.library.path List of paths to searchwhen loading libraries
17. java.io.tmpdir Default temp file path
18. java.compiler Name of JIT compiler to use
19. java.ext.dirs Path of extension directoryor directories
20. os.name Operating system name
21. os.arch Operating system architecture
22. os.version Operating system version
23. file.separator File separator("/" on UNIX)
24. path.separator Path separator(":" on UNIX)
25. line.separator Line separator("\n" on UNIX)
26. user.name User's account name
27. user.home User's home directory
28. user.dir User's current working directory
至此,RpcConfigs初始化完毕。
1.1.2 总结
1.1.2.1 设置RPC属性
通过RpcConfigs初始化过程可以看出,如果想修改Rpc相关配置,可以通过在META-INF/sofa-rpc或sofa-rpc目录下创建rpc-config.json文件,在文件中设置相关配置即可。先sofa-rpc目录,然后META-INF/sofa-rpc目录。如果某个目录下存在多个rpc-config.json文件(分别在多个Jar包),则按照文件中rpc.config.order的顺序,从小到大依次加载。所以,如果多个rpc-config.json文件存在相关的属性,则后者覆盖前者。
另外,也可以在运行程序时,在Java后使用-D参数来设置系统属性,修改Rpc配置。
Rpc属性的优先级,从低到高:
1. RpcConfigs目录下rpc-config-default.json
2. sofa-rpc/rpc-config.json
如果存在多个rpc-config.json文件(分别在多个Jar包),则按照文件中rpc.config.order的顺序,从小到大依次加载。
3. META-INF/sofa-rpc/rpc-config.json
如果存在多个rpc-config.json文件(分别在多个Jar包),则按照文件中rpc.config.order的顺序,从小到大依次加载。
4. 系统属性:通过在Java后使用-D参数来设置一个系统属性值;
1.1.2.2 监听RPC属性变化
如果要监听Rpc属性变化,并根据变化做出响应,则实现RpcConfigListener接口的onChange(T oldValue, TnewValue)方法,然后调用RpcConfigs类的subscribe(Stringkey, RpcConfigListener listener)方法,把RpcConfigListener的实例注册到RpcConfigs。当RpcConfigs的CFG中的属性发生变化(新增、值变化)时,都会通知监听此属性的监听器。