Sidekiq 是Ruby社区最受欢迎的异步任务框架之一,几乎是Rails项目标配。
本文,我将从实际使用者的角度来提出疑问,通过一一解答这些问题来剖析Sidekiq是如何工作的。(代码基于Sidekiq-5.1.3)
在Web请求中,有很多任务是可以放到后台执行的,比如用户购买商品付款成功后,就可以直接向用户购买成功,相应的短信发送,物流通知等等就可以放到后台任务去做,不用在用户购买的同时立即执行,这些任务也称作异步任务。在Rails中,使用Sidekiq是这样的:
class HardWorker
include Sidekiq::Worker
sidekiq_options :retry => 5, queue: 'hard'
def perform(name)
# do something
end
end
HardWorker.perform_async('bob') # 异步执行任务
我们通常使用上面的方式,注册一个任务,让它异步执行。那么本文的第一个问题来了:
很容易看出这个类方法定义在 Sidekiq::Worker 里面,在源码中找到相关的代码:
def perform_async(*args)
client_push('class' => self, 'args' => args)
end
def client_push(item) # :nodoc:
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
# 有省略
Sidekiq::Client.new(pool).push(item)
end
可以看出,把自己的Class和方法的参数生成了一个hash,传递进了client_push方法, pool是一个redis连接, 调用了Client的实例方法, 通过一层层代码的查看,最终来到了我们最值得关注的一个方法:
def atomic_push(conn, payloads)
if payloads.first['at'] # 延时任务,例如指定了一分钟之后才执行
conn.zadd('schedule', payloads.map do |hash|
at = hash.delete('at').to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
now = Time.now.to_f
to_push = payloads.map do |entry|
entry['enqueued_at'] = now
Sidekiq.dump_json(entry)
end
conn.sadd('queues', q)
conn.lpush("queue:#{q}", to_push) #重点
end
end
payloads是通过上面item参数加工后得到的,里面除了之前的class,args参数之外,还会有sidekiq_options方法定制的部分参数,例如queue。 我们重新专注于上面的代码,第一个问题的答案已经很明显了。
从上面可以清晰的看出,异步执行相关的数据被打包成json,然后使用redis的lpush命令塞进了一个队列中,这个队列的名字和指定的queue有关系,按照我们上面的配置,这个队列的就是 "queue:hard"。写入队列的数据大概像这样:
{'class' => MyWorker, 'args' => [1, 2, 3]}
通过这些参数,就可以知道需要被执行的是哪个Worker,并且参数是什么。 注册异步任务到此就结束了,第二个问题随之来了。
通过第一个问题,我们知道了,所谓注册异步任务,就是将要执行的相关Class和参数写入redis队列,然后有其他专门的角色来处理这些异步任务,这样便实现了任务异步化,通过redis队列实现了完美的解耦合。那么这些队列数据是被谁消费了呢? 实际进行异步任务处理的就是我们熟悉的Sidekiq进程,通过这样可以启动Sidekiq进程:
bundle exec sidekiq
启动的进程会去消费相应的队列数据,然后执行对应的代码。接下来,我们深入源码,看看这些具体是怎么回事。
首先简单的梳理一下Sidekiq的启动流程,通过启动流程顺藤摸瓜,找到我们的答案。Sidekiq命令本质上是下面的代码:
#!/usr/bin/env ruby
$TESTING = false
require_relative '../lib/sidekiq/cli'
begin
cli = Sidekiq::CLI.instance
cli.parse
cli.run
rescue => e
raise e if $DEBUG
STDERR.puts e.message
STDERR.puts e.backtrace.join("\n")
exit 1
end
重点在cli.parse 和cli.run上,我们依次看看它们到底是什么:
def parse(args=ARGV)
@code = nil
setup_options(args)
initialize_logger
validate!
daemonize
write_pid
end
parse方法就是解析了一下配置数据,例如日志路径,要消费的队列等等,关键在于run方法上,下面截取一部分:
def run
boot_system
print_banner
self_read, self_write = IO.pipe
require 'sidekiq/launcher'
@launcher = Sidekiq::Launcher.new(options)
begin
launcher.run # 关键方法
while readable_io = IO.select([self_read]) # 死循环,等待消息
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
logger.info 'Shutting down'
launcher.stop
logger.info "Bye!"
exit(0)
end
end
可以看出,在进行了一些初始化之后,执行了 launcher.run方法,然后就进入了一个死循环,这个死循环会一直读pipe消息,根据消息执行相关的命令,常见的就是重启等等命令。 进程中的秘密自然的被聚焦到了launcher.run方法上,我们去一探究竟。
class Launcher # 部分截取
include Util
attr_accessor :manager, :poller, :fetcher
def initialize(options)
@manager = Sidekiq::Manager.new(options)
@poller = Sidekiq::Scheduled::Poller.new
@done = false
@options = options
end
def run
@thread = safe_thread("heartbeat", &method(:start_heartbeat)) # 第一行
@poller.start # 第二行
@manager.start # 第三行
end
def start_heartbeat
while true
heartbeat
sleep 5
end
Sidekiq.logger.info("Heartbeat stopping...")
end
end
可以看到,执行了三行代码,先来看看safe_thread 方法:
def safe_thread(name, &block)
Thread.new do
Thread.current['sidekiq_label'] = name
watchdog(name, &block)
end
end
safe_thread方法就是返回了一个Thread实例,线程中会执行传递进去的block,上面传递的是一个start_heartbeat方法。可以看出其实是启动了一个心跳线程, 用来定时做一些数据统计,同时也是一种TCP连接保活机制。
第二行执行了@poller.start, 它是Poller类的一个实例,下面看看源码中的start方法:
class Poller # 有删减
def initialize
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
@sleeper = ConnectionPool::TimedStack.new
@done = false
@thread = nil
end
def start
@thread ||= safe_thread("scheduler") do
initial_wait
while !@done
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
end
end
end
通过safe_thread方法返回了一个线程,线程中和start_heartbeat一样是一个死循环,wait方法就是随机地sleep几秒,重点在enqueue方法:
# @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
def enqueue
begin
@enq.enqueue_jobs
rescue => ex
logger.error ex.message
handle_exception(ex)
end
end
再转到enqueue方法:
SETS = %w(retry schedule)
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
end
可以看出@poller.start就是在 retry, schedule 队列间分别去取重试和延时的任务,使用的是redis的sorted_set,具体的细节,第三个问题还会继续详细探讨。总之到这里,只要知道,@poller.start这个方法就是起了一个线程,这个线程去retry和schedule队列中取任务,然后将取到的任务放到任务队列中等待执行,注意这里的任务队列,其实就是上面问题1我们所说的注册异步任务的队列。
到现在为止,真正的任务还是没有被执行,接下来,我们就看看任务如何被执行,关注第三行代码: @manager.start :
class Manager # 有删减
def initialize(options={})
@options = options
@count = options[:concurrency] || 25
@workers = Set.new
@count.times do
@workers << Processor.new(self)
end
@plock = Mutex.new
end
def start
@workers.each do |x|
x.start
end
end
end
start方法启动了@count个线程,@count的值可以自己设定,默认为20。Sidekiq默认每个进程中有20个线程worker,这些@worker就是用来消费上面所说的任务队列中的数据,并执行相关任务的代码, 看看他们是如何执行的:
class Processor # 有删减
def start
@thread ||= safe_thread("processor", &method(:run))
end
private unless $TESTING
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
end
def process_one
@job = fetch
process(@job) if @job
@job = nil
end
end
注意力转到核心的process_one方法,可以看出流程大概为: 去任务队列中取出一个任务,如果任务存在,就执行这个任务。
去任务队列取数据的细节值得我们关注,将注意力放到fetch方法,随着调用往下走,我们会发现一个关键的方法:
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle.uniq
queues << TIMEOUT
queues
end
end
def retrieve_work # 关键方法
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end
可以看到,@worker取数据的方式就是使用redis的brpop命令去指定的队列中,阻塞式的取数据。queue_cmd参数中指定了要监听的队列以及超时时间。
这里引申一下,在日常的开发中,我们常常会为不同的异步任务类型指定不同的queue,从上面我们知道,通过参数传递,可以指定@worker去哪个队列消费数据。也就是说,我们可以启动不同的进程去消费不同队列的数据,一对一,或者一对多都行。这些进程可以在同一台机器上,也可以在不同的机器上。
拿到数据后,实际执行任务的方式就是,通过任务参数中的 class 参数,通过class.constantize拿实际的class,然后执行 class.new.perform(*args), 细节可以查看源码,这里就不再赘述。
总结一下,我们谈到了3种队列:
@poller.start线程会定时去retry和schedule中取数据,将它放入 queue中,@wokers会自动去执行queue中的任务。
上面两个问题,分别回答了异步任务是如何注册的,同时任务又是如何被消费执行的,其中有些细节问题,还是值得我们思考,在使用Sidekiq的时候,会有如下的使用方式:
HardWorker.perform_in(5.minutes, 'bob', 5)
这种延时任务是如何实现的呢? 这也是第二个问题中我留的一个尾巴,接下来回答第三个问题:
在第二个问题中,我们解释了
@poller.start
这条代码的工作内容就是在 retry和schedule队列中去取数据,然后将取到的数据写入任务队列中,等待@workers去执行,这就是延时和重试任务的关键之处。
上面我们说了,这两个队列是redis的sorted_set。 当有延时任务时,perform_in方法就会将相应的 {class: 'HardWorker', args: *args} 参数写入schedule队列, 并且将它被指定执行的时间点的时间戳当作score写入。上面延时5分钟,那么写入的score就大致为 (Time.now + 5.minutes).to_i 。
当有任务失败的时候,也是同理,将下一次重试的时间当作score,和对应的class等数据写入retry队列。
@poller.start 通过读取score比现在小的数据,然后将这些任务数据写入 任务队列。
job = conn.zrangebyscore(sorted_set, '-inf', Time.now.to_i, :limit => [0, 1])
当score比当前时间戳小,说明指定的执行时间已经到了,那么就将它读取出来,写入任务队列,等待被@workers执行。
在日常的使用中,我们常常需要有定时的周期任务需要执行,比如每分钟需要执行一次,每天需要跑一个统计等等。Sidekiq的免费版中没有提供这个功能,然而有很多的其他插件提供了这个功能,比如sidekiq-scheduler,接下来,我们就来看看它是如何实现周期性任务的。
在这之前,我们先思考一下,通过上面的内容,我们明白,只要有什么东西周期性地往任务队列中写入任务,然后@worker本身会自动去执行这些任务,这样就实现了周期性任务。
通过使用这个插件,我发现它只需要增加一个配置文件就行了,其他的什么都没有做,便实现了这点,看源码:
Sidekiq.configure_server do |config| # 有删减
config.on(:startup) do
scheduler_options = {
dynamic: dynamic,
dynamic_every: dynamic_every,
enabled: enabled,
schedule: schedule,
listened_queues_only: listened_queues_only
}
schedule_manager = SidekiqScheduler::Manager.new(scheduler_options)
config.options[:schedule_manager] = schedule_manager
config.options[:schedule_manager].start # 重点
end
在自动加载这段配置代码的时候, 它会先解析配置文件,然后执行一个start方法, 这个start方法通过rufus-scheduler,通过线程模拟了类似crontab的功能,周期性地往任务队列写入数据。原理大概就是通过线程,在线程中执行死循环,不断的检测时间,时间点到了,就往队列中写数据。
Rails在4之后的版本中就集成了Active::Job的功能,它本质上就是ruby异步任务框架的一种再封装,实际的任务还是依靠像Sidekiq这种来执行的。
ruby社区有众多的异步任务框架,Sidekiq,Resque,DelayJob等等, 这些异步框架的使用都有不少差异。如果项目早期使用的Resque,但是随着项目的发展,后面觉得Sidekiq更加适合,那么从Resque切换到Sidekiq就会有很多代码的改动,只要有改动,则就有可能出问题。
Active::Job就是在异步任务框架和Rails之间扮演了一个解耦合的角色,并且更方便地提供了一些回调功能。换任务框架,不需要改Rails相关的代码,只需要修改Active::Job的任务适配器就行。在有任务框架切换背景的时候还是很有价值。
但是从我个人的角度来看,如果切换任务框架的可能性不大时,就不太值得使用它,Active::Job多了一层封装,无论从性能和复杂度上来讲都有所增加,毕竟简单的场景在很多时候会更好,Rails已经封装的有些臃肿了。
文章也发布在: https://ruby-china.org/topics/36825