当前位置: 首页 > 文档资料 > gRPC 学习笔记 >

NameResolver - 类DnsNameResolver

优质
小牛编辑
130浏览
2023-12-01

类定义

DnsNameResolver 是 io.grpc.internal 下的类,包级私有,通过DnsNameResolverFactory类来创建。

研究一下它的实现,以便理解 NameResolver 的使用。

  1. package io.grpc.internal;
  2. class DnsNameResolver extends NameResolver {}

属性和构造函数

属性比较多,先只看和 URI 相关的几个属性 authority/host/port:

  1. private final String authority;
  2. private final String host;
  3. private final int port;
  4. DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params,
  5. Resource<ScheduledExecutorService> timerServiceResource,
  6. Resource<ExecutorService> executorResource,
  7. ProxyDetector proxyDetector) {
  8. // 必须在 name 前加上"//",否则将被当成含糊的URI,导致生成的URI的authority和host会变成null。
  9. URI nameUri = URI.create("//" + name);
  10. // 从生成的URI中得到 authority 并赋值,如果为空则报错
  11. authority = Preconditions.checkNotNull(nameUri.getAuthority(),
  12. "nameUri (%s) doesn't have an authority", nameUri);
  13. // 同样方式得到host,如果为空则报错
  14. host = Preconditions.checkNotNull(nameUri.getHost(), "host");
  15. // 端口特殊一点,因为可以通过params参数传入默认端口
  16. if (nameUri.getPort() == -1) {
  17. // 如果name中没有包含端口,则尝试从参数中获取默认端口
  18. Integer defaultPort = params.get(NameResolver.Factory.PARAMS_DEFAULT_PORT);
  19. if (defaultPort != null) {
  20. port = defaultPort;
  21. } else {
  22. throw new IllegalArgumentException(
  23. "name '" + name + "' doesn't contain a port, and default port is not set in params");
  24. }
  25. } else {
  26. port = nameUri.getPort();
  27. }
  28. }

构造函数中还传入了两个和Executor相关的参数,构造函数只是简单的保存起来:

  1. private final Resource<ScheduledExecutorService> timerServiceResource;
  2. private final Resource<ExecutorService> executorResource;
  3. DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params,
  4. ......
  5. this.timerServiceResource = timerServiceResource;
  6. this.executorResource = executorResource;
  7. ......
  8. }

方法实现

NameResolver 定义的方法

getServiceAuthority()

按照要求,这个方法直接返回在构造函数中就设置好的 authority 属性。考虑到 authority 属性是 final 的,因此也满足 NameResolver 接口中要求的: “实现必须非阻塞式的生成它,而且必须保持不变。使用同样的参数从同一个的 factory 中创建出来的 NameResolver 必须返回相同的 authority “。

  1. public final String getServiceAuthority() {
  2. return authority;
  3. }

start()

按照要求,start() 开始解析。listener 用于接收目标的更新。

实现中,this.listener 属性用于保存传入的 listener ,而在 start() 时,timerService 和 executor 两个属性才会从之前传入的 timerServiceResource/executorResource 这两个SharedResourceHolder中获取实际的对象实例。

  1. public final synchronized void start(Listener listener) {
  2. // 先检查之前没有start过,判断依据是 this.listener 是否已有赋值
  3. Preconditions.checkState(this.listener == null, "already started");
  4. timerService = SharedResourceHolder.get(timerServiceResource);
  5. executor = SharedResourceHolder.get(executorResource);
  6. // 为 this.listener 赋值,配合上面的检查
  7. this.listener = Preconditions.checkNotNull(listener, "listener");
  8. resolve();
  9. }

resolve()方法开始做实际的解析,具体内容后面再看。

refresh()

DnsNameResolver 的 refresh() 实现直接调用了 resolve() 方法,和 start() 方法相比, start()中只是多了开始时的检查和resource获取,后面都是同样的调用resolve() 方法。

  1. public final synchronized void refresh() {
  2. // refresh()方法必须在 start() 之后调用,因此这里做了检查,同样判断依据是 listener 属性
  3. Preconditions.checkState(listener != null, "not started");
  4. resolve();
  5. }

shutdown()

按照要求,shutdown()方法将停止解析,同时更新 listener 将会停止。实现中,将

shutdown属性用于标记是否已经关闭。

  1. @GuardedBy("this")
  2. private boolean shutdown;
  3. public final synchronized void shutdown() {
  4. // 通过 shutdown 属性来判断是否已经关闭
  5. if (shutdown) {
  6. return;
  7. }
  8. shutdown = true;
  9. if (resolutionTask != null) {
  10. // 如果 resolutionTask 不为空,取消它
  11. resolutionTask.cancel(false);
  12. }
  13. if (timerService != null) {
  14. // 释放 timerService 资源,返回null将清理属性 timerService
  15. timerService = SharedResourceHolder.release(timerServiceResource, timerService);
  16. }
  17. if (executor != null) {
  18. // 释放 executor 资源,返回null将清理属性 executor
  19. executor = SharedResourceHolder.release(executorResource, executor);
  20. }
  21. }

解析的实际实现

从 resolve() 方法开始:

  1. private void resolve() {
  2. // resolving 属性和 shutdown 协助判断一下状态
  3. if (resolving || shutdown) {
  4. return;
  5. }
  6. // 将 resolutionRunnable 作为任务扔给executor
  7. // 然后方法返回,异步做解析
  8. // 这样 start()和 refresh() 方法就都是快速返回,异步解析后通过 listener 做数据更新
  9. executor.execute(resolutionRunnable);
  10. }

继续看 resolutionRunnable 的实现,代码有点长,先排除各种细节处理,只看主流程:

  1. private final Runnable resolutionRunnable = new Runnable() {
  2. @Override
  3. public void run() {
  4. //忽略此处的状态检查代码和grpc proxy处理代码
  5. ......
  6. ResolutionResults resolvedInetAddrs;
  7. try {
  8. // step1. 将host解析为ResolutionResults
  9. // 新版本引入了一个 delegateResolver,细节稍后再看
  10. // 开始对 host 做 dns 解析,得到解析结果
  11. resolvedInetAddrs = delegateResolver.resolve(host);
  12. } catch (Exception e) {
  13. ...... // 异常处理的流程后面再细看
  14. return;
  15. }
  16. // 解析出来的每个地址组成一个 EAG
  17. ArrayList<EquivalentAddressGroup> servers = new ArrayList<EquivalentAddressGroup>();
  18. for (InetAddress inetAddr : resolvedInetAddrs.addresses) {
  19. step2. 将每个 InetAddress 格式的IP地址包装为 EquivalentAddressGroup 对象
  20. servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
  21. }
  22. // 跳过balancerAddresses和TXT的处理
  23. // step3. 通知listner,有数据更新
  24. savedListener.onAddresses(servers, attrs.build());
  25. }
  26. }

主流程就是上面注释中的3个步骤:

  1. 解析地址
  2. 包装格式
  3. 通知listener

注意这个工作是在异步线程中进行的,无法直接return结果,只能通过listener。

再继续看错误流程,如果解析地址失败:

  1. try {
  2. resolvedInetAddrs = delegateResolver.resolve(host);
  3. } catch (Exception e) {
  4. // 遇到无法解析的情况
  5. synchronized (DnsNameResolver.this) {
  6. if (shutdown) {
  7. // 如果此时已经要求 shutdown,则不用继续处理,直接return
  8. return;
  9. }
  10. // 因为在生产中 timerService 是一个单线程的 GrpcUtil.TIMER_SERVICE
  11. // 我们需要将这个阻塞的工作交给 executor
  12. resolutionTask =
  13. timerService.schedule(new LogExceptionRunnable(resolutionRunnableOnExecutor), 1, TimeUnit.MINUTES);
  14. }
  15. // 通知 listener 遇到错误
  16. savedListener.onError(Status.UNAVAILABLE.withDescription(
  17. "Unable to resolve host " + host).withCause(e));
  18. return;
  19. }

上面的代码,在处理解析失败时,做了两个事情:

  1. 通知 listener 出错了
  2. 安排了一个 resolutionTask

出错了通知 listener 这个容易理解,resolutionTask 是做什幺呢? 我们细看 resolutionTask 相关的代码:

  1. // resolutionTask 的定义,一个标准的 ScheduledFuture
  2. private ScheduledFuture< ? > resolutionTask;
  3. ......
  4. // 唯一一个赋值的地方,就是当解析出错时,也就是上面的处理
  5. resolutionTask = timerService.schedule(new LogExceptionRunnable(resolutionRunnableOnExecutor), 1, TimeUnit.MINUTES);

给 timerService 提交一个 LogExceptionRunnable 的任务,要求延迟1分钟执行,然后将得到的 feature 保存为 resolutionTask 。这个 resolutionTask 在 shutdown()方法和 resolutionRunnable 的 run()方法中有细节处理。

先看看 LogExceptionRunnable 的实现:

  1. // 对 Runnable 的简单包裹,用于记录它抛出的任何异常,在重新抛出之前
  2. public final class LogExceptionRunnable implements Runnable {
  3. // 构造函数只是简单的保存传入的task
  4. public LogExceptionRunnable(Runnable task) {
  5. this.task = checkNotNull(task);
  6. }
  7. public void run() {
  8. try {
  9. // 执行task
  10. task.run();
  11. } catch (Throwable t) {
  12. // 捕获异常,先打印日志,这个是主要目的了
  13. log.log(Level.SEVERE, "Exception while executing runnable " + task, t);
  14. // 如果是RuntimeException或者Error,就直接原样抛出去
  15. MoreThrowables.throwIfUnchecked(t);
  16. // 否则就生成一个新的AssertionError抛出去
  17. throw new AssertionError(t);
  18. }
  19. }
  20. }

只是简单的执行 task 并在出错时记录日志,而这里的 task 是 resolutionRunnableOnExecutor,所以关键还是看 resolutionRunnableOnExecutor 里面的实现内容:

  1. private final Runnable resolutionRunnableOnExecutor = new Runnable() {
  2. @Override
  3. public void run() {
  4. synchronized (DnsNameResolver.this) {
  5. if (!shutdown) {
  6. // 再执行一次 resolutionRunnable,这里和 resolve() 函数差不多的功能
  7. executor.execute(resolutionRunnable);
  8. }
  9. }
  10. }
  11. };

现在错误流程流程就清晰了:

  1. 在解析失败时,就会给 timerService 安排一个一分钟之后执行的任务 resolutionTask
  2. 在 resolutionTask 中,将重新用 executor 跑一次 resolutionRunnable
  3. 如果继续解析失败,则循环上述过程

即解析失败则每隔一分钟尝试一次,直到成功。

注意在 resolutionRunnable 中,每次发现 resolutionTask 存在就会先 cancel 掉它,然后置为null:

  1. if (resolutionTask != null) {
  2. resolutionTask.cancel(false);
  3. resolutionTask = null;
  4. }

所以上述的循环,只是在每次解析失败时,一旦解析成功,就会跳出循环。