当前位置: 首页 > 工具软件 > Sidekiq > 使用案例 >

Sidekiq 信号处理源码分析

孙言
2023-12-01

引言

在之前的文章《Sidekiq任务调度流程分析》中,我们一起仔细分析了 Sidekiq 是如何基于多线程完成队列任务处理以及调度的。我们在之前的分析里,看到了不管是 Sidekiq::Scheduled::Poller 还是 Sidekiq::Processor 的核心代码里,都会有一个由 @done 实例变量控制的循环体:
<!-- More -->

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done           # 这是 poller 的循环控制
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done           # 这是我们常说的 worker 循环控制
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

也就是说,这些 @done 实例变量决定了 poller 线程跟 worker 线程是否循环执行?一旦 @done 被改为 true,那循环体就不再执行,线程自然也就是退出了。于是,单从这些代码,我们可以断定, Sidekiq 就是通过设置 @done 的值来通知一个线程安全退出(graceful exit)的。我们也知道,生产环境中,我们是通过发送信号的方式来告诉 sidekiq 退出或者进入静默(quiet)状态的,那么,这里的 @done 是怎么跟信号处理联系起来的呢?这些就是今天这篇文章的重点了!

注意

  1. 今天的分析所参考的 sidekiq 的源码对应版本是 4.2.3;

  2. 今天所讨论的内容,将主要围绕系统信号处理进行分析,无关细节将不赘述,如有需要,请自行翻阅 sidekiq 源码;

  3. 今天的文章跟上篇的《Sidekiq任务调度流程分析》紧密相关,上篇文章介绍的启动过程跟任务调度会帮助这篇文章的理解,如果还没有阅读上篇文章的,建议先阅读后再来阅读这一篇信号处理的文章。

你将了解到什么?

  1. Sidekiq 信号处理机制;

  2. 为什么重启 Sidekiq 时,USR1 信号(即进入 quiet 模式)需要尽可能早,而进程的退出重启需要尽可能晚。

从头再来

因为前一篇文章着眼于任务调度,所以略过了其他无关细节,包括信号处理,这篇文章则将镜头对准信号处理,所以让我们从头再来一遍,只是这一次,我们只关心与信号处理有关的代码。

依旧是从 cli.rb 文件开始,它是 Sidekiq 核心代码的生命起点,因为 Sidekiq 命令行启动后,它是第一个被执行的代码,Sidekiq 启动过程中调用了 Sidekiq::CLI#run 方法:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L49-L106
def run
  boot_system
  print_banner

  self_read, self_write = IO.pipe

  %w(INT TERM USR1 USR2 TTIN).each do |sig|
    begin
      trap sig do
        self_write.puts(sig)
      end
    rescue ArgumentError
      puts "Signal #{sig} not supported"
    end
  end

  # ... other codes

  begin
    launcher.run

    while readable_io = IO.select([self_read])
      signal = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
  rescue Interrupt
    logger.info 'Shutting down'
    launcher.stop
    # Explicitly exit so busy Processor threads can't block
    # process shutdown.
    logger.info "Bye!"
    exit(0)
  end

以上的代码就是整个 Sidekiq 最顶层的信号处理的核心代码了,让我们慢慢分析!
首先,self_read, self_write = IO.pipe 创建了一个模拟管道的 IO 对象,并且同时返回这个 管道的一个写端以及一个读端,通过这两端,就可以实现对管道的读写了。需要注意的是,IO.pipe 创建的读端在读的时候不会自动生成 EOF 符,所以这就要求读时,写端是关闭的,而写时,读端是关闭的,一句话说,就是这样的管道不允许读写端同时打开。关于 IO.pipe 还有挺多细节跟需要注意的点,如果还需要了解,请阅读官方文档

上面说的管道本质上只是一个 IO 对象而已,暂时不用纠结太多,让我们接着往下读:

%w(INT TERM USR1 USR2 TTIN).each do |sig|
  begin
    trap sig do
      self_write.puts(sig)
    end
  rescue ArgumentError
    puts "Signal #{sig} not supported"
  end
end

这段代码就比较有意思了,最外层遍历了一个系统信号的数组,然后逐个信号进行监听(trap,或者叫捕捉?)。让我们聚焦在 trap 方法的调用跟其 block 上,查阅 Ruby 文档,发现 trapSignal 模块下的一个方法,Signal 主要是处理与系统信号有关的任务,然后 trap 的作用是:

Specifies the handling of signals. The first parameter is a signal name (a string such as “SIGALRM”, “SIGUSR1”, and so on) or a signal number...

所以,前面的那段代码的意思就很容易理解了,Sidekiq 注册了对 INTTERMUSR1USR2以及TTIN等系统信号的处理,而在进程收到这些信号时,就会执行 self_write.puts(sig),也就是将收到的信号通过之前介绍的管道写端 self_write 记录下来。什么?只记录下来,那还得处理啊?!

稍安勿躁,让我们接着往下分析 Sidekiq::CLI#run 方法末尾的代码:

begin
  launcher.run

  while readable_io = IO.select([self_read])
    signal = readable_io.first[0].gets.strip
    handle_signal(signal)
  end
rescue Interrupt
  logger.info 'Shutting down'
  launcher.stop
  # Explicitly exit so busy Processor threads can't block
  # process shutdown.
  logger.info "Bye!"
  exit(0)
end

看到没有,这里有个循环,循环控制条件里,readable_io = IO.select([self_read]) 是从前面的管道的读端 self_read 阻塞地等待信号的到达。对于 IO.selectRuby 官方文档介绍如下:

Calls select(2) system call. It monitors given arrays of IO objects, waits until one or more of IO objects are ready for reading, are ready for writing, and have pending exceptions respectively, and returns an array that contains arrays of those IO objects.

所以这里就是说 Sidekiq 主线程首先负责执行完其他初始化工作,最后阻塞在信号等待以及处理。在其等到新的信号之后,进入上面代码展示的循环体:

signal = readable_io.first[0].gets.strip
handle_signal(signal)

这里语法细节先不深究,我们看下这两行代码第一行是从前面说的管道中读取信号,并且将信号传递给 handle_signal 方法,让我们接着往下看 handle_signal 方法的定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L125-L153
def handle_signal(sig)
  Sidekiq.logger.debug "Got #{sig} signal"
  case sig
  when 'INT'
    # Handle Ctrl-C in JRuby like MRI
    # http://jira.codehaus.org/browse/JRUBY-4637
    raise Interrupt
  when 'TERM'
    # Heroku sends TERM and then waits 10 seconds for process to exit.
    raise Interrupt
  when 'USR1'
    Sidekiq.logger.info "Received USR1, no longer accepting new work"
    launcher.quiet
  when 'USR2'
    if Sidekiq.options[:logfile]
      Sidekiq.logger.info "Received USR2, reopening log file"
      Sidekiq::Logging.reopen_logs
    end
  when 'TTIN'
    Thread.list.each do |thread|
      Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}"
      if thread.backtrace
        Sidekiq.logger.warn thread.backtrace.join("\n")
      else
        Sidekiq.logger.warn "<no backtrace available>"
      end
    end
  end
end

这里的代码挺长,但是一点都不难理解,我简单解释下就够了。当进程:

  1. 收到 TERM 或者 INT信号时,直接抛出 Interrupt 中断;

  2. 收到 USR1 信号时,则通知 launcher 执行 .quiet 方法,Sidekiq 在这里进入 Quiet 模式(怎么进入?);

  3. 收到 USR2 信号时,重新打开日志;

  4. 收到 TTIN 信号时,打印所有线程当前正在执行的代码列表。

到此,一个信号从收到被存下,到被取出处理的大致过程就是这样的,至于具体的处理方式,我们下个章节详细展开。现在有一点需要补充的是,上面讲当 Sidekiq 收到 TERM 或者 INT 信号时,都会抛出 Interrupt 中断异常,那这个异常又是如何处理的呢?我们回过头去看刚才最开始的 Sidekiq::CLI#run 方法末尾的代码:

begin
  launcher.run

  while readable_io = IO.select([self_read])
    signal = readable_io.first[0].gets.strip
    handle_signal(signal)
  end
rescue Interrupt
  logger.info 'Shutting down'
  launcher.stop
  # Explicitly exit so busy Processor threads can't block
  # process shutdown.
  logger.info "Bye!"
  exit(0)
end

原来是 run 方法在处理信号时,声明了 rescue Interrupt,捕捉了 Interrupt 中断异常,并且在异常处理时打印必要日志,同时执行 launcher.stop 通知各个线程停止工作,最后调用 exit 方法强制退出进程,到此,一个 Sidekiq 进程就彻底退出了。
但是问题又来了,信号处理的大致过程我是知道了,但是具体的 launcher.quietlauncher.stop 都干了些什么呢?

Sidekiq::Launcher#quiet 源码探索

老规矩,先上代码:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L32-L36
def quiet
  @done = true
  @manager.quiet
  @poller.terminate
end

代码只有短短三行。 Launcher 对象首先设置自己的实例变量 @done 的值为 true,接着执行 @manager.quiet 以及 @poller.terminate。看方法命名上理解,应该是 Luancher 对象又将 quiet 的消息传递给了 @managerSidekiq::Manager 对象,同时通知 @pollerSidekiq::Scheduled::Poller 对象结束工作。那到底是不是真的这样呢?让我们继续深挖!

Sidekiq::Manager#quiet

让我们来看看 Sidekiq::Manager#quiet 方法的代码

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L51-L58
def quiet
  return if @done
  @done = true

  logger.info { "Terminating quiet workers" }
  @workers.each { |x| x.terminate }
  fire_event(:quiet, true)
end

上面的代码也很短,首先将 Sidekiq::Manager 对象自身的 @done 实例变量的值设置为 true,接着对其所管理的每一个 worker,都发出一个 terminate 消息。让我们接着往下看 worker 对象(Sidekiq::Processor 对象)的 #terminate 方法定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L42-L46
def terminate(wait=false)
  @done = true
  return if !@thread
  @thread.value if wait
end

这里的代码依然保持了精短的特点!跟上一层逻辑一样,worker 在处理 terminate 时,同样设置自己的 @done 实例变量为 true 后返回,但是,如果其参数 waittrue,则会保持主线程等待,直到 @thread 线程退出(@thread.value 相当于执行 @thread.join并且返回线程的返回值,可参考 Ruby 文档)。

那么,这里就要问了,worker 设置 @done 为 true 是要干嘛?这里好像也没有做什么特别的事啊?!勿急,还记得上篇文章介绍 worker 运行时的核心代码吗?

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

看到了吧,@done 变量可是一个重要的开关,当 @donefalse 时,worker 一直周而复始地从队列中取任务并且老老实实干活;而当 @donetrue 时,worker 在处理完当前的任务之后,便不再执行新的任务,执行 @msg.processor_stopped(self) 通知 worker 管理器自己已经退出工作,最终 #run 方法返回。由于 #run 方法是在独立线程里执行的,所以当 #run 方法返回时,其所在的线程自然也就退出了。

那关于 worker 的 quiet 模式进入过程就是这么简单,通过一个共享变量 @done 便实现了对工作线程的控制。

Sidekiq::Scheduled::Poller#terminate

前面说到 Sidekiq::Launcher#quiet 执行时,先将消息传递给了 worker 管理器,随后执行了 @poller.terminate,那我们来看看 #terminate 方法的定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L53-L61
def terminate
  @done = true
  if @thread
    t = @thread
    @thread = nil
    @sleeper << 0
    t.value
  end
end

又是如此简短的代码。poller 退出的逻辑跟 worker 退出的逻辑非常一致,都是同样先设置自己的 @done 实例变量的值为 true,接着等待线程 @thread 退出,最后 poller 返回。

那么,poller 的 @done 是不是也是用来控制线程退出呢?答案是肯定的!

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end

还记得上面这段代码吗? poller 在每次将定时任务压回任务队列之后,等待一定时间,然后重新检查 @done 的值,如果为 true,则 poller 直接返回退出,因为 #start 方法里的循环体在新线程中执行,当循环结束时,线程自然也退出了。

小结

  1. 当 Sidekiq 收到 USR1 系统信号时,Sidekiq 主线程向 @launcher 发送 quiet 消息,@launcher 又将消息传递给 @manager ,同时向 @poller 发出 terminate 消息;

  2. @manager 在收到 quiet 消息时,逐一对运行中的 worker 发送 terminate 消息,worker 收到消息后,设置自己的 @donetrue,标识不再处理新任务,当前任务处理完成后退出线程;

  3. @poller 在收到 terminate 消息后,也是设置自己的 @donetrue,在本次任务执行完毕后,线程也退出;

  4. Sidekiq 进入 quiet 模式之后,所有未处理任务以及新任务都不再处理,直到 sidekiq 的下一次重启。

Sidekiq::Launcher#stop 源码探索

前面介绍的是 Sidekiq 进入 quiet 模式的过程,那 Sidekiq 的停止过程又是怎样的呢?

让我们从 Sidekiq::Launcher#stop 方法开始寻找答案:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L41-L56
def stop
  deadline = Time.now + @options[:timeout]

  @done = true
  @manager.quiet
  @poller.terminate

  @manager.stop(deadline)

  # Requeue everything in case there was a worker who grabbed work while stopped
  # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
  strategy = (@options[:fetch] || Sidekiq::BasicFetch)
  strategy.bulk_requeue([], @options)

  clear_heartbeat
end

首先,Sidekiq::Launcher 对象设定了一个强制退出的 deadline,时间是以当前时间加上配置的 timeout,这个时间默认是 8 秒

接着,设定对象本身的 @done 变量的值为 true,然后分别对 @manager@poller 发送 quietterminate 消息,这个过程就是我们上面说的 Sidekiq::Launcher#quiet 的过程,所以,这里的代码主要是 Sidekiq 要确保退出前已经通知各个线程准备退出。

接下来的代码就比较重要了,我们先看这一行:

@manager.stop(deadline)

在通知完 @manager 进入 quiet 模式之后,launcher 向 @manager 发送了 stop 消息,并且同时传递了 deadline 参数。让我们接着继续往下看:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L61-L83
PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5

def stop(deadline)
  quiet
  fire_event(:shutdown, true)

  # some of the shutdown events can be async,
  # we don't have any way to know when they're done but
  # give them a little time to take effect
  sleep PAUSE_TIME
  return if @workers.empty?

  logger.info { "Pausing to allow workers to finish..." }
  remaining = deadline - Time.now
  while remaining > PAUSE_TIME
    return if @workers.empty?
    sleep PAUSE_TIME
    remaining = deadline - Time.now
  end
  return if @workers.empty?

  hard_shutdown
end

上面的代码,manager 首先调用了自身的 quiet 方法(这里就真的多此一举了,因为外层的 launcher 已经调用过一次了),然后 manager 执行 sleep 系统调用进入休眠,持续时间为 0.5 秒,休眠结束后检查所有 worker 是否已经都退出,如果退出,则直接返回,任务提前结束;如果仍有 worker 未退出,则检查当前时间是否接近强制退出的 deadline,如果不是,则重复“检查所有 worker 退出 - 休眠” 的过程,直到 deadline 来临,或者 worker 线程都已经全部退出。如果最后到达 deadline,仍有 worker 线程未退出,则最后执行 hard_shutdown

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L108-L135
def hard_shutdown
  cleanup = nil
  @plock.synchronize do
    cleanup = @workers.dup
  end

  if cleanup.size > 0
    jobs = cleanup.map {|p| p.job }.compact

    # ... other codes

    strategy = (@options[:fetch] || Sidekiq::BasicFetch)
    strategy.bulk_requeue(jobs, @options)
  end

  cleanup.each do |processor|
    processor.kill
  end
end

这里 hard_shutdown 方法在执行时,首先克隆了当前仍未退出的 @workers 列表,接着获取每个 worker 当前正在处理的任务,将这些正在执行中的任务数据通过 strategy.bulk_requeue(jobs, @options) 重新写回队列,而最后对每一个 worker 发送 kill 消息:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L48-L58
def kill(wait=false)
  @done = true
  return if !@thread

  @thread.raise ::Sidekiq::Shutdown
  @thread.value if wait
end

worker 在收到 kill 消息时,首先设置自己的 @donetrue,最后向 worker 所关联的线程抛出 ::Sidekiq::Shutdown 异常。让我们看看 worker 的线程又是如何处理异常的:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

又回到 worker 的 run 方法这里,可以看到,run 方法捕捉了 Sidekiq::Shutdown 异常,并且在处理异常时,只是执行 @mgr.processor_stopped(self),通知 manager 自己已经退出,由于已经跳出正常流程,worker 的 run 方法返回,线程也因此得以退出。至此,worker 也都正常退出了。

小结

  1. launcher 在执行退出时,首先按照 quiet 的流程先通知各个线程准备退出;

  2. 接着 launcher 向 manager 下达 stop 指令,并且给出最后期限(deadline);

  3. manager 在给定的限时内,尽可能等待所有 worker 执行完自己退出,对于到达限时仍未退出的 worker,manager 备份了每个 worker 的当前任务,重新加入队列,确保任务至少完整执行一次,然后通过向线程抛出异常的方式,迫使 worker 的线程被动退出。

总结

  1. Sidekiq 简单高效利用了系统信号,并且有比较清晰明了的信号处理过程;

  2. Sidekiq 在信号处理的过程中,各个组件协调很有条理,消息逐级传递,而且对被强制停止的任务也有备份方案;

  3. 我们可以从 Sidekiq 的系统信号处理机制上借鉴不少东西,比如常用系统信号的分类处理等;

  4. 对于多线程的控制,通过共享变量以及异常的方式做到 graceful 以及 hard 两种方式的退出处理。

  5. 还有很多,一百个人心中有一百个哈姆莱特,同样一份代码,不同的人学习阅读,肯定收获不同,你可以在评论区留下你的感悟,跟看到这篇文章的人一起分享!

问题思考

  1. 为了尽可能确保所有 Sidekiq 的任务能够正常主动退出,所以在部署脚本中,都会尽可能早地让 Sidekiq 进入 quiet 模式,但是 Sidekiq 的 quiet 是不可逆的,所以一旦部署脚本中途失败,Sidekiq 得不到重启,将会一直保持 quiet 状态,如果长时间未重启,任务就会积压。所以,一般我都会在部署脚本中,额外捕捉部署脚本失败异常,然后主动执行 sidekiq 的重启。如果你的部署脚本中有涉及 Sidekiq 的,一定要注意检查部署失败是否会影响 Sidekiq 的状态

  2. 虽然 Sidekiq 在强制退出当前长时间未退出的任务时,会将 job 的数据写回队列,等待重启后重新执行,那么这里就有个细节需要注意了,就是你的 job 必须是幂等的,否则就不能允许重新执行了。所以,请注意,如果你有需要长时间运行的 job,请注意检查其幂等性

好了,今天就写到这吧!仍然挺长一篇,啰嗦了。感谢看到这里!

 类似资料: