当前位置: 首页 > 工具软件 > Sidekiq > 使用案例 >

五问Sidekiq # W11

东郭元魁
2023-12-01

Sidekiq 是Ruby社区最受欢迎的异步任务框架之一,几乎是Rails项目标配。

本文,我将从实际使用者的角度来提出疑问,通过一一解答这些问题来剖析Sidekiq是如何工作的。(代码基于Sidekiq-5.1.3)

在Web请求中,有很多任务是可以放到后台执行的,比如用户购买商品付款成功后,就可以直接向用户购买成功,相应的短信发送,物流通知等等就可以放到后台任务去做,不用在用户购买的同时立即执行,这些任务也称作异步任务。在Rails中,使用Sidekiq是这样的:

class HardWorker
  include Sidekiq::Worker
  sidekiq_options :retry => 5, queue: 'hard'
  def perform(name)
    # do something
  end
end
HardWorker.perform_async('bob')     # 异步执行任务

我们通常使用上面的方式,注册一个任务,让它异步执行。那么本文的第一个问题来了:

1. 上面的这条代码 HardWorker.perform_async('bob')  到底做了什么

很容易看出这个类方法定义在 Sidekiq::Worker 里面,在源码中找到相关的代码:

  def perform_async(*args)
    client_push('class' => self, 'args' => args)
  end
  def client_push(item) # :nodoc:
    pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
    # 有省略
    Sidekiq::Client.new(pool).push(item)
  end

可以看出,把自己的Class和方法的参数生成了一个hash,传递进了client_push方法, pool是一个redis连接, 调用了Client的实例方法, 通过一层层代码的查看,最终来到了我们最值得关注的一个方法:

    def atomic_push(conn, payloads)
      if payloads.first['at']  # 延时任务,例如指定了一分钟之后才执行
        conn.zadd('schedule', payloads.map do |hash|
          at = hash.delete('at').to_s
          [at, Sidekiq.dump_json(hash)]
        end)
      else
        q = payloads.first['queue']
        now = Time.now.to_f
        to_push = payloads.map do |entry|
          entry['enqueued_at'] = now
          Sidekiq.dump_json(entry)
        end
        conn.sadd('queues', q)
        conn.lpush("queue:#{q}", to_push)  #重点
      end
    end

payloads是通过上面item参数加工后得到的,里面除了之前的class,args参数之外,还会有sidekiq_options方法定制的部分参数,例如queue。 我们重新专注于上面的代码,第一个问题的答案已经很明显了。

从上面可以清晰的看出,异步执行相关的数据被打包成json,然后使用redis的lpush命令塞进了一个队列中,这个队列的名字和指定的queue有关系,按照我们上面的配置,这个队列的就是 "queue:hard"。写入队列的数据大概像这样:

{'class' => MyWorker, 'args' => [1, 2, 3]}

通过这些参数,就可以知道需要被执行的是哪个Worker,并且参数是什么。 注册异步任务到此就结束了,第二个问题随之来了。

2. 上面这些队列里的数据是怎么被消费的

通过第一个问题,我们知道了,所谓注册异步任务,就是将要执行的相关Class和参数写入redis队列,然后有其他专门的角色来处理这些异步任务,这样便实现了任务异步化,通过redis队列实现了完美的解耦合。那么这些队列数据是被谁消费了呢?  实际进行异步任务处理的就是我们熟悉的Sidekiq进程,通过这样可以启动Sidekiq进程:

bundle exec sidekiq 
启动的进程会去消费相应的队列数据,然后执行对应的代码。接下来,我们深入源码,看看这些具体是怎么回事。

首先简单的梳理一下Sidekiq的启动流程,通过启动流程顺藤摸瓜,找到我们的答案。Sidekiq命令本质上是下面的代码

#!/usr/bin/env ruby
$TESTING = false
require_relative '../lib/sidekiq/cli'
begin
  cli = Sidekiq::CLI.instance
  cli.parse
  cli.run
rescue => e
  raise e if $DEBUG
  STDERR.puts e.message
  STDERR.puts e.backtrace.join("\n")
  exit 1
end

重点在cli.parse 和cli.run上,我们依次看看它们到底是什么:

    def parse(args=ARGV)
      @code = nil
      setup_options(args)
      initialize_logger
      validate!
      daemonize
      write_pid
    end

parse方法就是解析了一下配置数据,例如日志路径,要消费的队列等等,关键在于run方法上,下面截取一部分:

    def run
      boot_system
      print_banner
      self_read, self_write = IO.pipe
      require 'sidekiq/launcher'
      @launcher = Sidekiq::Launcher.new(options)
      begin
        launcher.run                                # 关键方法
        while readable_io = IO.select([self_read])  # 死循环,等待消息
          signal = readable_io.first[0].gets.strip
          handle_signal(signal)
        end
      rescue Interrupt
        logger.info 'Shutting down'
        launcher.stop
        logger.info "Bye!"
        exit(0)
      end
    end
可以看出,在进行了一些初始化之后,执行了 launcher.run方法,然后就进入了一个死循环,这个死循环会一直读pipe消息,根据消息执行相关的命令,常见的就是重启等等命令。 进程中的秘密自然的被聚焦到了launcher.run方法上,我们去一探究竟。
  class Launcher  # 部分截取
    include Util
    attr_accessor :manager, :poller, :fetcher
    def initialize(options)
      @manager = Sidekiq::Manager.new(options)
      @poller = Sidekiq::Scheduled::Poller.new
      @done = false
      @options = options
    end

    def run
      @thread = safe_thread("heartbeat", &method(:start_heartbeat))  # 第一行
      @poller.start                                                  # 第二行
      @manager.start                                                 # 第三行
    end 
    def start_heartbeat
      while true
        heartbeat
        sleep 5
      end
      Sidekiq.logger.info("Heartbeat stopping...")
    end
end

可以看到,执行了三行代码,先来看看safe_thread 方法:

    def safe_thread(name, &block)
      Thread.new do
        Thread.current['sidekiq_label'] = name
        watchdog(name, &block)
      end
    end
safe_thread方法就是返回了一个Thread实例,线程中会执行传递进去的block,上面传递的是一个start_heartbeat方法。可以看出其实是启动了一个心跳线程, 用来定时做一些数据统计,同时也是一种TCP连接保活机制。 第二行执行了@poller.start, 它是Poller类的一个实例,下面看看源码中的start方法:
    class Poller  # 有删减
      def initialize
        @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
        @sleeper = ConnectionPool::TimedStack.new
        @done = false
        @thread = nil
      end

      def start
        @thread ||= safe_thread("scheduler") do
          initial_wait
          while !@done
            enqueue
            wait
          end
          Sidekiq.logger.info("Scheduler exiting...")
        end
      end
   end

通过safe_thread方法返回了一个线程,线程中和start_heartbeat一样是一个死循环,wait方法就是随机地sleep几秒,重点在enqueue方法:

     # @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new      
     def enqueue
        begin
          @enq.enqueue_jobs
        rescue => ex
          logger.error ex.message
          handle_exception(ex)
        end
     end

再转到enqueue方法:

     SETS = %w(retry schedule)
     def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
        Sidekiq.redis do |conn|
          sorted_sets.each do |sorted_set|
            while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
              if conn.zrem(sorted_set, job)
                Sidekiq::Client.push(Sidekiq.load_json(job))
                Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
              end
            end
          end
        end
      end

可以看出@poller.start就是在 retry, schedule 队列间分别去取重试和延时的任务,使用的是redis的sorted_set,具体的细节,第三个问题还会继续详细探讨。总之到这里,只要知道,@poller.start这个方法就是起了一个线程,这个线程去retry和schedule队列中取任务,然后将取到的任务放到任务队列中等待执行,注意这里的任务队列,其实就是上面问题1我们所说的注册异步任务的队列。

到现在为止,真正的任务还是没有被执行,接下来,我们就看看任务如何被执行,关注第三行代码: @manager.start :

  class Manager # 有删减
    def initialize(options={})
      @options = options
      @count = options[:concurrency] || 25
      @workers = Set.new
      @count.times do
        @workers << Processor.new(self)
      end
      @plock = Mutex.new
    end

    def start
      @workers.each do |x|
        x.start
      end
    end
end

start方法启动了@count个线程,@count的值可以自己设定,默认为20。Sidekiq默认每个进程中有20个线程worker,这些@worker就是用来消费上面所说的任务队列中的数据,并执行相关任务的代码, 看看他们是如何执行的:

  class Processor  # 有删减
    def start
      @thread ||= safe_thread("processor", &method(:run))
    end

    private unless $TESTING

    def run
      begin
        while !@done
          process_one
        end
        @mgr.processor_stopped(self)
    end

    def process_one
      @job = fetch
      process(@job) if @job
      @job = nil
    end
end

注意力转到核心的process_one方法,可以看出流程大概为: 去任务队列中取出一个任务,如果任务存在,就执行这个任务。

去任务队列取数据的细节值得我们关注,将注意力放到fetch方法,随着调用往下走,我们会发现一个关键的方法:

    def queues_cmd
      if @strictly_ordered_queues
        @queues
      else
        queues = @queues.shuffle.uniq
        queues << TIMEOUT
        queues
      end
    end
    def retrieve_work # 关键方法
      work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
      UnitOfWork.new(*work) if work
    end

可以看到,@worker取数据的方式就是使用redis的brpop命令去指定的队列中,阻塞式的取数据。queue_cmd参数中指定了要监听的队列以及超时时间。

这里引申一下,在日常的开发中,我们常常会为不同的异步任务类型指定不同的queue,从上面我们知道,通过参数传递,可以指定@worker去哪个队列消费数据。也就是说,我们可以启动不同的进程去消费不同队列的数据,一对一,或者一对多都行。这些进程可以在同一台机器上,也可以在不同的机器上。

拿到数据后,实际执行任务的方式就是,通过任务参数中的 class 参数,通过class.constantize拿实际的class,然后执行 class.new.perform(*args), 细节可以查看源码,这里就不再赘述。

总结一下,我们谈到了3种队列:

  • retry , 放重试任务的队列
  • schedule,放延迟任务的队列
  • "queue:#{queue}",可能是多个队列,存放即将被执行的任务

@poller.start线程会定时去retry和schedule中取数据,将它放入 queue中,@wokers会自动去执行queue中的任务。

上面两个问题,分别回答了异步任务是如何注册的,同时任务又是如何被消费执行的,其中有些细节问题,还是值得我们思考,在使用Sidekiq的时候,会有如下的使用方式:

HardWorker.perform_in(5.minutes, 'bob', 5)

这种延时任务是如何实现的呢? 这也是第二个问题中我留的一个尾巴,接下来回答第三个问题:

3. 延时任务以及重试任务是如何实现的?

在第二个问题中,我们解释了 

@poller.start

这条代码的工作内容就是在 retry和schedule队列中去取数据,然后将取到的数据写入任务队列中,等待@workers去执行,这就是延时和重试任务的关键之处。

上面我们说了,这两个队列是redis的sorted_set。 当有延时任务时,perform_in方法就会将相应的 {class: 'HardWorker', args: *args} 参数写入schedule队列, 并且将它被指定执行的时间点的时间戳当作score写入。上面延时5分钟,那么写入的score就大致为 (Time.now + 5.minutes).to_i 。

当有任务失败的时候,也是同理,将下一次重试的时间当作score,和对应的class等数据写入retry队列。

@poller.start 通过读取score比现在小的数据,然后将这些任务数据写入 任务队列。

job = conn.zrangebyscore(sorted_set, '-inf', Time.now.to_i, :limit => [0, 1])

当score比当前时间戳小,说明指定的执行时间已经到了,那么就将它读取出来,写入任务队列,等待被@workers执行。

4. 定时任务如何实现的?

在日常的使用中,我们常常需要有定时的周期任务需要执行,比如每分钟需要执行一次,每天需要跑一个统计等等。Sidekiq的免费版中没有提供这个功能,然而有很多的其他插件提供了这个功能,比如sidekiq-scheduler,接下来,我们就来看看它是如何实现周期性任务的。

在这之前,我们先思考一下,通过上面的内容,我们明白,只要有什么东西周期性地往任务队列中写入任务,然后@worker本身会自动去执行这些任务,这样就实现了周期性任务。

通过使用这个插件,我发现它只需要增加一个配置文件就行了,其他的什么都没有做,便实现了这点,看源码

Sidekiq.configure_server do |config|  # 有删减
  config.on(:startup) do
    scheduler_options = {
      dynamic:       dynamic,
      dynamic_every: dynamic_every,
      enabled:       enabled,
      schedule:      schedule,
      listened_queues_only: listened_queues_only
    }
    schedule_manager = SidekiqScheduler::Manager.new(scheduler_options)
    config.options[:schedule_manager] = schedule_manager
    config.options[:schedule_manager].start   # 重点
  end

在自动加载这段配置代码的时候, 它会先解析配置文件,然后执行一个start方法, 这个start方法通过rufus-scheduler,通过线程模拟了类似crontab的功能,周期性地往任务队列写入数据。原理大概就是通过线程,在线程中执行死循环,不断的检测时间,时间点到了,就往队列中写数据。

5. Active::Job 有什么用?

Rails在4之后的版本中就集成了Active::Job的功能,它本质上就是ruby异步任务框架的一种再封装,实际的任务还是依靠像Sidekiq这种来执行的。

ruby社区有众多的异步任务框架,Sidekiq,Resque,DelayJob等等, 这些异步框架的使用都有不少差异。如果项目早期使用的Resque,但是随着项目的发展,后面觉得Sidekiq更加适合,那么从Resque切换到Sidekiq就会有很多代码的改动,只要有改动,则就有可能出问题。

Active::Job就是在异步任务框架和Rails之间扮演了一个解耦合的角色,并且更方便地提供了一些回调功能。换任务框架,不需要改Rails相关的代码,只需要修改Active::Job的任务适配器就行。在有任务框架切换背景的时候还是很有价值。

但是从我个人的角度来看,如果切换任务框架的可能性不大时,就不太值得使用它,Active::Job多了一层封装,无论从性能和复杂度上来讲都有所增加,毕竟简单的场景在很多时候会更好,Rails已经封装的有些臃肿了。

文章也发布在: https://ruby-china.org/topics/36825

 类似资料: