当前位置: 首页 > 文档资料 > gRPC 学习笔记 >

类ManagedChannelImpl - Name Resolver

优质
小牛编辑
139浏览
2023-12-01

构建 Name Resolver

ManagedChannelImpl中和 Name Resolver 相关的属性:

  1. // 匹配这个正则表达式意味着 target 字符串是一个 URI target,或者至少打算成为一个
  2. // URI target 必须是一个absolute hierarchical URI
  3. // 来自 RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
  4. static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
  5. private final String target;
  6. private final NameResolver.Factory nameResolverFactory;
  7. private final Attributes nameResolverParams;
  8. // 绝不为空。必须在lock之下修改。
  9. // 这里不能是final,因为后面会重新设值,但是绝不会设置为null
  10. private NameResolver nameResolver;

ManagedChannelImpl()的构造函数,传入target/nameResolverFactory/nameResolverParams:

  1. ManagedChannelImpl(String target, ......
  2. NameResolver.Factory nameResolverFactory, Attributes nameResolverParams,......) {
  3. this.target = checkNotNull(target, "target");
  4. this.nameResolverFactory = checkNotNull(nameResolverFactory, "nameResolverFactory");
  5. this.nameResolverParams = checkNotNull(nameResolverParams, "nameResolverParams");
  6. this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
  7. ......
  8. }

getNameResolver() 方法根据这三个参数构建 NameResolver:

  1. static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory, Attributes nameResolverParams) {
  2. //查找NameResolver。尝试使用 target 字符串作为 URI。如果失败,尝试添加前缀
  3. // "dns:///".
  4. URI targetUri = null;
  5. StringBuilder uriSyntaxErrors = new StringBuilder();
  6. try {
  7. targetUri = new URI(target);
  8. // 对于 "localhost:8080" 这将会导致 newNameResolver() 方法返回null
  9. // 因为 "localhost" 被作为 scheme 解析。
  10. // 将会转入下一个分支并尝试 "dns:///localhost:8080"
  11. } catch (URISyntaxException e) {
  12. // 可以发生在类似 "[::1]:1234" or 127.0.0.1:1234 这样的ip地址
  13. uriSyntaxErrors.append(e.getMessage());
  14. }
  15. if (targetUri != null) {
  16. // 如果 target 是有效的URI
  17. NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
  18. if (resolver != null) {
  19. return resolver;
  20. }
  21. // "foo.googleapis.com:8080" 导致 resolver 为null,因为 "foo.googleapis.com" 是一个没有映射的scheme。简单失败并将尝试"dns:///foo.googleapis.com:8080"
  22. }
  23. // 如果走到这里,表明 targetUri 不能使用
  24. if (!URI_PATTERN.matcher(target).matches()) {
  25. // 如果格式都不匹配,说明看上去不像是一个 URI target。可能是 authority 字符串。
  26. // 尝试从factory中获取默认 scheme
  27. try {
  28. targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
  29. } catch (URISyntaxException e) {
  30. // Should not be possible.
  31. throw new IllegalArgumentException(e);
  32. }
  33. if (targetUri != null) {
  34. // 尝试加了默认 scheme 之后的URI
  35. NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
  36. if (resolver != null) {
  37. return resolver;
  38. }
  39. }
  40. }
  41. // 最后如果还是不成,只能报错了
  42. throw new IllegalArgumentException(String.format(
  43. "cannot find a NameResolver for %s%s",
  44. target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
  45. }

构建失败会直接抛异常退出。

Name Resolver 的使用

在 TransportSet.Callback 中被调用:

  1. final TransportManager<ClientTransport> tm = new TransportManager<ClientTransport>() {
  2. public ClientTransport getTransport(final EquivalentAddressGroup addressGroup) {
  3. ......
  4. ts = new TransportSet(......,
  5. new TransportSet.Callback() {
  6. public void onAllAddressesFailed() {
  7. // 所有地址失败时刷新
  8. nameResolver.refresh();
  9. }
  10. public void onConnectionClosedByServer(Status status) {
  11. // 服务器端关闭连接时刷新
  12. nameResolver.refresh();
  13. }
  14. ......

NameResolver.Listener

NameResolver通过 NameResolver.Listener 来通知变化,这里的实现是类NameResolverListenerImpl:

  1. private class NameResolverListenerImpl implements NameResolver.Listener {
  2. final LoadBalancer<ClientTransport> balancer;
  3. NameResolverListenerImpl(LoadBalancer<ClientTransport> balancer) {
  4. this.balancer = balancer;
  5. }
  6. @Override
  7. public void onUpdate(List< ? extends List<ResolvedServerInfo>> servers, Attributes config) {
  8. if (serversAreEmpty(servers)) {
  9. // 如果更新的服务列表为空,表明没有可用的服务器了
  10. onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
  11. } else {
  12. // 如果更新的服务列表不为空,通知balancer
  13. try {
  14. balancer.handleResolvedAddresses(servers, config);
  15. } catch (Throwable e) {
  16. // 这必然是一个bug!将异常推回给 LoadBalancer ,希望LoadBalancer可以提升给应用
  17. balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
  18. .withDescription("Thrown from handleResolvedAddresses(): " + e));
  19. }
  20. }
  21. }
  22. @Override
  23. public void onError(Status error) {
  24. checkArgument(!error.isOk(), "the error status must not be OK");
  25. // 通知 balancer 解析错误
  26. balancer.handleNameResolutionError(error);
  27. }
  28. }

name resolver 的 start()的触发

而类NameResolverListenerImpl只有一个地方使用,在方法 exitIdleMode() 中:

  1. LoadBalancer<ClientTransport> exitIdleMode() {
  2. synchronized (lock) {
  3. ......
  4. if (loadBalancer != null) {
  5. // 如果loadBalancer不为空就直接return
  6. // 意味着如果想继续后面的代码,就必须loadBalancer为空才行
  7. return loadBalancer;
  8. }
  9. balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
  10. this.loadBalancer = balancer;
  11. resolver = this.nameResolver;
  12. }
  13. class NameResolverStartTask implements Runnable {
  14. @Override
  15. public void run() {
  16. // 这将在 LoadBalancer 和 NameResolver 中触发一些不平凡的工作
  17. // 不想在lock里面做(因此封装成task扔给scheduledExecutor)
  18. resolver.start(new NameResolverListenerImpl(balancer));
  19. }
  20. }
  21. scheduledExecutor.execute(new NameResolverStartTask());
  22. return balancer;
  23. }

在 NameResolverStartTask 中才开始调用 name resolver 的 start() 方法启动 name resolver 的工作。而搜索中发现,这里是 name resolver 的 start() 方法的唯一一个使用的地方。也就是说,这是 name resolver 开始工作的唯一入口。

因此,name resolver 要开始工作,需要两个条件:

  1. exitIdleMode() 方法被调用
  2. loadBalancer 属性为null

exitIdleMode() 方法有两个被调用的地方:

  1. inUseStateAggregator中,当发现 Channel 的状态转为使用中时:

    1. final InUseStateAggregator<Object> inUseStateAggregator = new InUseStateAggregator<Object>() {
    2. ......
    3. void handleInUse() {
    4. exitIdleMode();
    5. }
    6. }
  2. ClientTransportProvider的get()方法,通过exitIdleMode()方法获取balancer,然后通过balancer获取ClientTransport:

    1. private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
    2. @Override
    3. public ClientTransport get(CallOptions callOptions) {
    4. LoadBalancer<ClientTransport> balancer = exitIdleMode();
    5. if (balancer == null) {
    6. return SHUTDOWN_TRANSPORT;
    7. }
    8. return balancer.pickTransport(callOptions.getAffinity());
    9. }
    10. };

    这里返回的 ClientTransport 在RealChannel的ClientCall()方法中使用,被传递给新创建的ClientCallImpl:

    1. private class RealChannel extends Channel {
    2. public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
    3. ......
    4. return new ClientCallImpl<ReqT, RespT>(......
    5. transportProvider,......)
    6. }

    而ClientCallImpl中这个get()方法只有一个地方被调用,在ClientCallImpl的start()方法中:

    1. public void start(final Listener<RespT> observer, Metadata headers) {
    2. ......
    3. ClientTransport transport = clientTransportProvider.get(callOptions);
    4. }

name resolver 的总结

name resolver 的 start() 方法,也就是 name resolver 要开始解析name的这个工作,只有两种情况下开始:

  1. 第一次RPC请求: 此时要调用Channel的 newCall() 方法得到ClientCall的实例,然后调 ClientCall 的 start()方法,期间获取ClientTransport时激发一次 name resolver 的 start()
  2. 如果开启了空闲模式:则在每次 Channel 从空闲模式退出,进入使用状态时,再激发一次 name resolver 的 start()