线程和进程

优质
小牛编辑
142浏览
2023-12-01

Ruby给了你两个基本的方法来组织你的程序,使它同时能运行自己的不同部分。你可以使用线程在程序内部将任务分割,或者将任务分解为不同的程序,使用多进程来运行。下面我们轮流看一下这两种方法。

多线程

一般来说在Ruby中同时做两件事情最简单的是使用Ruby线程。线程在进程中,由Ruby解释器实现。这使得Ruby线程也能完全的可移至,因为它不需要依赖特定的操作系统,但是这样你也不能利用本地线程(native threads)的优点了。你也许有过线程饥饿得经验(优先级低的线程没有机会运行)。也许你会遇到线程死锁,整个进程都被挂起。或者一些线程的某些操作占用了CPU的太多时间,以至于其它线程不得不等待。但是,不要被这些潜在的问题吓倒,Ruby线程是使你程序并行运行的轻量而有效的方法。

创建 Ruby 线程

创建一个新的线程十分简单,下面的部分代码并行的下载一些网页,每次有请求调用,这段代码都将产生一个独立的线程处理HTTP传输。

require 'net/http'
 
pages = %w( www.rubycentral.com   
            www.awl.com   
            www.pragmaticprogrammer.com   
           )
 
threads = []
 
for page in pages   
  threads << Thread.new(page) { |myPage|
 
    h = Net::HTTP.new(myPage, 80)   
    puts "Fetching: #{myPage}"   
    resp, data = h.get('/', nil )   
    puts "Got #{myPage}:  #{resp.message}"   
  }   
end
 
threads.each { |aThread|  aThread.join }
 
produces:
Fetching: www.rubycentral.com
Fetching: www.awl.com
Fetching: www.pragmaticprogrammer.com
Got www.rubycentral.com:  OK
Got www.pragmaticprogrammer.com:  OK
Got www.awl.com:  OK

让我们更详细的看看这段代码,这里有一些新技巧在里面。

新线程用Thread.new 创建,这个方法接收一个block作为线程中要运行的代码,在我们的例子里面,这个block使用net/http库从每个指定的站点抓取首页,从我们打印出来的信息来看,这些抓取活动是同时进行的。

当我们创建线程的时候,我们将这个HTML页面作为参数传入,这个参数然后作为myPage参数传给了block。为什么我们这么做而不是直接在block里面用page这个变量那?

一个线程共享了所有在它启动之前已经存在的所有全局变量,实例变量和局部变量。善意的人有时候会告诉你,共享有时候不一定是好事。在这个例子里面,3个线程将共享page变量,第一个线程启动之后,page被设为http://www.rubycentral.com,在这个时候,创建线程的循环还没有结束,第二次,page被设为http://www.awl.com,如果第一个线程还没有使用page变量运行完毕,那么可能这个线程会使用page的新值。这个bug将很难被跟踪发现。

但是,在线程块中创建的局部变量的作用域只在创建它的线程里,而不能被其它线程共享。在我们的例子里面,变量myPage将在线程被创建时赋值,每个线程都有自己的myPage变量的拷贝。

 

多线程

另一个很微妙的地方是程序的最后一行,为什么我们调用所创每个建线程的join方法?

当一个Ruby程序结束退出的时候,它会杀死所有的线程,而不管它们的状态。但是,你可以通过线程的Thread#join 方法,使得主程序在等待这个线程结束后再退出。调用它的线程将会被阻塞,直到给定的线程结束。通过调用3个线程的join方法,你可以确定这三个请求将会在主程序退出之前完成。

除了join,还有其它几个用于控制线程的方便的方法。首先,你可以用Thread.current 来访问当前线程,你可以用Thread.list 来取得所有线程列表,这个列表包括所有可运行或者已停止的线程。为了检测一个线程的状态,可以用方法Thread#statusThread#alive?

另外,你可以使用Thread#priority= 来设置线程的不同的优先级别。高优先级的将会先于低优先级的线程执行。

线程变量

在上面我们已经说过,一个线程可以访问在它定义之前已经存在的,在作用域范围内的变量,但是,在线程内部定义的变量的作用域只在这个线程内部有效。

但是,如果你想一个线程的变量能被其它线程访问,包括主线程,该怎么办呢?Ruby中的线程提供了一个功能,就是能够按名字创建和访问线程内的局部变量。你可以简单的把这个线程对象作为一个哈希,使用[ ]=方法写这个对象的变量值,用 [ ]来读它的值。在这个例子里, 每个线程记录当前变量count的值,把它存到线程的局部变量,名字(key)为mycount。(这里有竞争条件的出现,但是我们还没有谈到同步问题,这里我们只是忽略它们。)

count = 0
arr = []
10.times do |i|
  arr[i] = Thread.new {
    sleep(rand(0)/10.0)
    Thread.current["mycount"] = count
    count += 1
  }
end
arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"
产生结果:
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

 

主线程等待每个子线程结束之后,打印出来每个线程得到的count的值。我们人为的让每个线程在取得count值之前休眠随机的时间,这只是为了增加点趣味而已。

 

线程和异常

 

如果一个线程抛出一个没有被处理的异常,将会怎样呢?这依赖于系统设置Thread.abort_on_exception ,这个设置在第384页和387页。

如果abort_on_exception 被设置为false,这也是默认的缺省值,那么如果一个线程出现错误而没有处理,则这个线程将会被杀死,其它没有遇到异常的线程将继续运行。在下面的例子里,编号为3的线程将产生一个异常,而不会输出任何东西,但是,你仍然可以看到其它线程的输出。

threads = []
6.times { |i|
  threads << Thread.new(i) {
    raise "Boom!" if i == 3
    puts i
  }
}
threads.each {|t| t.join }
产生:
01
2
 
45prog.rb:4: Boom! (RuntimeError)         
	from prog.rb:8:in `join'         
	from prog.rb:8         
	from prog.rb:8:in `each'         
	from prog.rb:8
  

 

但是,如果将abort_on_exception 设为true,一个线程出现没有捕获(处理)的异常,则所有的线程将都被杀死,上面的例子,如果编号为3的线程出错,所有后面的线程都被杀死,不会产生任何输出。

Thread.abort_on_exception = true
threads = []
6.times { |i|
  threads << Thread.new(i) {
    raise "Boom!" if i == 3
    puts i
  }
}
threads.each {|t| t.join }
produces:
01
2
prog.rb:5: Boom! (RuntimeError)
	from prog.rb:7:in `initialize'
	from prog.rb:7:in `new'
	from prog.rb:7
	from prog.rb:3:in `times'
	from prog.rb:3

控制线程调度器

在一个设计良好的应用程序中,你应该让线程只做自己改作的事情;在一个多线程环境中创建一个基于时间的系统一般来说不是一个好主意。

但是,有时候我们需要控制线程的运行。也许我们的自动点唱机有一个线程用来控制指示灯,我们希望在音乐停止播放的时候也停止指示灯。你也许在一个经典的生产者-消费者关系中有两个线程,一个消费者在生产者挂起的时候也必须挂起。

Thread 提供了很多方法用来控制线程调度,调用Thread.stop 能停止当前线程,而Thread#run 将使某个线程启动运行,调用Thread.pass 将告诉线程调度器去执行另外一个线程。Thread#joinThread#value 将使调用者挂起,直到这个线程结束。

我们可以用下面代码来示范一下上面的特点。

t = Thread.new { sleep .1; Thread.pass; Thread.stop; }t.status ?"sleep"t.runt.status ?"run"t.runt.status ?false

但是,使用这些原始的方法来控制线程调度实现同步,不管怎么说,都可能会遇到竞争条件。如果你需要在线程中共享数据,竞争条件将会一直存在并且给调试带来麻烦。幸运的是,线程还有另一个工具:临界区(critical section),使用它,我们能编写一个安全的同步方案。

互斥(Mutual Exclusion)

 

用来阻塞一个线程运行的低层的方法是使用全局的"线程临界"(thread critical)条件。当这个条件被设为true(用Thread.critical= 方法)时,调度器将不会让任何已经存在地线程执行,但是,它不会阻止新线程的建立和运行;一些特定的线程操作(比如停止或者杀死一个线程,在当前线程中休眠,或者抛出一个异常)都会引起一个线程被调度,即使在临界区之内。

直接使用Thread.critical= 虽然可行,但是它并不是很方便。幸运的是,Ruby自带了很多其它选项,当然,最好的两个是thread库模块中的类Mutex和类ConditionVariable。关于它们的文档从第457页开始。

类 Mutex

Mutex 是一个为对互斥地访问某一共享对象而设计的一个简单的信号量锁。也就是说,在一个时候,只有一个线程能持有这个锁。其它线程可以继续等待直到这个锁可用,或者立即返回一个错误不再继续等待。

一个mutex经常用于原子性的对一个共享对象进行修改更新。假设我们需要更新一个事务中的两个变量,比如下面的程序模拟增加两个数的计数。这个更新假定是原子性的,外面的世界不可能看到这两个数有不同的值。如果不使用互斥,则不能达到该目的。

count1 = count2 = 0difference = 0counter = Thread.new doloop docount1 += 1count2 += 1endendspy = Thread.new doloop dodifference += (count1 - count2).absendendsleep 1Thread.critical = 1count1 ?184846count2 ?184846difference ?58126

这个例子显示了在执行的过程中count1和count2的值曾经出现过不同,尽管最后还是一样的。

幸运的是,我们可以用互斥来改善这个例子。
require 'thread'
mutex = Mutex.new
 
count1 = count2 = 0        
difference = 0        
counter = Thread.new do        
  loop do        
    mutex.synchronize do        
      count1 += 1        
      count2 += 1        
    end        
  end        
end        
spy = Thread.new do        
  loop do        
    mutex.synchronize do        
      difference += (count1 - count2).abs        
    end        
  end        
end
        
sleep 1mutex.lockcount1 ?21192count2 ?21192difference ?0

通过把需要访问共享数据的代码放到muxtex的控制下,我们确保了数据的一致性。但不幸的是,你也从这些数字看到了,我们在经受着性能上的损失。

 

条件变量(Condition Variables)

有时候使用互斥(mutex )来保护对临界数据的访问并不能满足要求,比如假设我们在一个临界区内,但是你还需要等待一个特殊的资源,如果你的线程这时候为了等待这个资源而休眠,可能会导致其它线程不能释放这个资源,因为它们都无法进入这个临界区,原来的线程一直在锁定着这个临界区。你也许需要暂时的放弃对临界区的控制,同时告诉其它线程你在等待某一资源。当这个资源可用之后,你的线程同时需要重新得到对临界区的控制权。

条件变量正是用在此处。一个条件变量是一个简单的信号量,它关联一个特定的资源,在临界区的保护范围内使用。当你需要一个资源而这个资源暂时不可用的时候,你等待一个条件变量,这个操作将放弃对这个条件变量所在互斥(临界区?)的锁定。当其它线程发送信号告诉这个变量可用之后,原来的线程停止等待立即取得对临界区的锁定。

 

require 'thread'
mutex = Mutex.new
cv = ConditionVariable.new
 
a = Thread.new {    
  mutex.synchronize {    
    puts "A: I have critical section, but will wait for cv"    
    cv.wait(mutex)    
    puts "A: I have critical section again! I rule!"    
  }    
}
 
puts "(Later, back at the ranch...)"
 
b = Thread.new {    
  mutex.synchronize {    
    puts "B: Now I am critical, but am done with cv"    
    cv.signal    
    puts "B: I am still critical, finishing up"    
  }    
}    
a.join    
b.join
结果:
A: I have critical section, but will wait for cv(Later, back at the ranch...)
 
B: Now I am critical, but am done with cv     
B: I am still critical, finishing up     
A: I have critical section again! I rule!

另一个同步机制的实现,可以参考Ruby发布程序中lib文件夹下的文件monitor.rbsync.rb。

 

运行多个进程(Multiple Processes)

有时候你可能需要把一个任务分成几个进程级别的子任务,或者你需要运行一个另外的不使用Ruby写的程序,没关系,Ruby有好几种方法能使你创建和管理其它独立的进程。

 

产生一个新的进程

在Ruby中产生一个新的进程有好几种方法,最简单的方法是运行一个命令并且等到它结束。你也许运行一些其它的独立的命令,并且从主机得到返回的结果,Ruby提供了system方法和反引号方法。(反引号即"`")

 system("tar xzf test.tgz")  ?   tar: test.tgz: Cannot open: No such file or directory\ntar: Error is not recoverable: exiting now\ntar: Child returned status 2\ntar: Error exit delayed from previous errors\nfalse result = `date`      result   ?  "Sun Jun 9 00:08:50 CDT 2002\n"

方法Kernel::system 运行一个指定的命令,如果这个命令存在且正确的运行结束,这个方法返回true,否则返回false。如果这个命令运行失败,你可以从全局变量$?得到这个命令的返回代码。

但system也有一个问题,就是它所运行程序的输出简单的被指定到了你的程序的输出,这可能不是你想要的。为了取得子进程的标准输出,你可以用反引号,比如上面例子的`date` 。注意,你需要用String#chomp 来去除返回结果最后的换行符。

这中方法对简单的场合比较合适,我们只需要运行一个命令,然后取得它的返回结果。但是,很多时候我们都需要对进程有更多的控制,比如我们需要和子进程进行会话,向它输入数据,并且从它取回数据。方法IO.popen 正是具有这样的作用。popen方法以一个子进程来运行一个命令,并且把这个进程的标准输入和标准输出绑定到Ruby的IO对象。向IO对象写数据,子进程就可以从它的标准输入读取数据,而子进程输出的数据,也可以通过Ruby的IO对象读出来。

比如,我们的操作系统中有一个有用的程序叫做pig,这个程序从标准输入读入数据,然后以pig Latin方式打印这些数据。
pig = IO.popen("pig", "w+")
pig.puts "ice cream after they go to bed"
pig.close_write
puts pig.gets
produces:
iceway eamcray afterway eythay ogay otay edbay

这个例子看起来很简单,打开这个管道(pipe),写入一个短语,然后读取返回结果。但是pig程序并不会立即将它写的东西flush。假如上面例子中,如果pig.puts 后面紧跟pig.gets的话,程序将被挂起,pig程序处理了我们的输入,但是返回结果却一直不会被写到管道。我们必须在这两行之间插入pig.close_write,这将给pig的标准输入发送一个文件结束标志(end-of-file),然后我们需要的结果就会返回。

popen方法还有另外一些注意事项。如果指定的命令是一个减号("-"),Ruby将强迫产生一个新的Ruby解释器,它将和原来的解释器一起运行。原来的解释器进程将得到一个IO对象作为返回结果,而子解释器将得到nil。

pipe = IO.popen("-","w+")
if pipe
  pipe.puts "Get a job!"
  $stderr.puts "Child says '#{pipe.gets.chomp}'"
else
  $stderr.puts "Dad says '#{gets.chomp}'"
  puts "OK"
end
produces:
Dad says 'Get a job!'
Child says 'OK'

除了popen方法,传统的Unix调用Kernel::forkIO.pipeKernel::exec 也可以在支持它们的系统上使用。许多IO方法和Kernel::open 也能产生新的子进程,使用方法是将文件名前面加上一个竖线``|'' 。注意你不能用File.new 来产生一个子进程,这个方法只是用于文件。

Independent Children

有时候我们并不需要这样处理:我们只想把产生的子进程赋给一个变量,然后继续处理自己的事务。一段时间以后,我们也许还需要一下这个进程是否结束。比如,我们需要从主程序分离一个需要很长运行时间的外部排序:

exec("sort testfile > output.txt") if fork == nil
# The sort is now running in a child process
# carry on processing in the main program
 
# then wait for the sort to finish                
Process.wait
  

系统调用Kernel::fork 在父进程中返回fork产生的子进程id,在子进程中返回nil,所以,上面例子中子进程将调用Kernel::exec 来运行一个外部的排序。一段时间以后,我们使用Process::wait ,这将等待排序完成,然后返回这个进程的id。(pid)

如果你需要在子进程退出后通知父进程(而不是等待子进程结束),可以用Kernel::trap 来对返回的信号进行处理。比如这里我们建立了一个用于捕获SIGCLD信号的trap,这个信号的意思是“子进程结束(死亡)”

trap("CLD") {
  pid = Process.wait
  puts "Child pid #{pid}: terminated"
  exit
}
 
exec("sort testfile > output.txt") if fork == nil
 
# do other stuff...
 
produces:
Child pid 31842: terminated

块(Block)和子进程

IO.popen 也能像File.open 那样接受一个block。通过传递一个参数给popen 作为一个命令,比如date,然后,这个block将得到一个IO对象作为参数。
IO.popen ("date") { |f| puts "Date is #{f.gets}" }
produces:
Date is Sun Jun  9 00:08:50 CDT 2002

这个IO对象将会在BLOCK结束之后自动关闭,就如同File.open 一样。

如果你给Kernel::fork 提供一个block,那么这些block中的代码将在Ruby的子进程中运行,而父进程在block结束后继续运行。

fork do
  puts "In child, pid = #$$"
  exit 99
end
pid = Process.wait
puts "Child terminated, pid = #{pid}, exit code = #{$? >> 8}"
produces:
In child, pid = 31849
Child terminated, pid = 31849, exit code = 99

最后一个问题,为什么我们子进程的返回代码$? 要右移8位?这是Posix系统的特点,退出代码(exit code)的低8位是程序结束的原因,高8位才是真正的退出代码。