require 'eventmachine'
EventMachine::run
这不是一个激动人心的例子,确实一个好的起点。有几点要注意。首先, 我们加载eventmachine模块。其次,精明读者已经发现,我用的是EventMachine::,不是EM.,当然也可以用EM::或者EventMachine.。最后,如果你运行这个程序,它永远不会停止。当调用EM#run方法的时候,EventMachine反应器将被启动,一直运行下去直到stop方法被调用。本例子中没有调用EM#stop方法。
那我们停止这个程序呢?很简单:
require 'eventmachine'
EM.run do
EM.add_timer(1) { EM.stop }
end
这里我们提供了一个块(block)给EM#run方法。当reactor实例化之后循环启动之前,block块将会被执行。你可以在block中做你想做的任何初始化。在这个例子中,我们创建了一个定时器,并在定时器中执行EM#stop方法。我们也能调用EM#stop_event_loop。这两种调用没有不同,都会关闭reactor。当reactor终止之后,EM#run之后的方法块block将会得到执行。
定时器
我们接下来要介绍的是EventMachine定时器。定时器分两种:一次性定时器和周期性定时器 。EventMachine添加定时器的方式有两种,通常的方式是下面例子中的用法
EM#add_timer和EM#add_periodic_timer。
require 'eventmachine'
EM.run do
p = EM::PeriodicTimer.new(1) do
puts "Tick ..."
end
EM::Timer.new(5) do
puts "BOOM"
p.cancel
end
EM::Timer.new(8) do
puts "The googles, they do nothing"
EM.stop
end
end
推迟和延迟工作
如果你还记得reactor模式的定义,你会注意到reactor是单线程的。EventMachine秉承了这一单线程机制。Reactor本是是单线程的,EM的方法不是线程安全的。这有两个结果:(1)一些长任务,比如数据库查询,远程HTTP请求等,需要能够移到后台线程执行。(2)一旦那些长任务被移到后台线程,reactor能够立即工作。这是EM#defer和EM#next_tick发挥作用的地方。
使用EM#defer方法可以将一个块调度到EventMachine的线程池中执行。线程池提供了固定的20个线程。很棒,代码可以在后台线程执行,那怎么把线程的执行结果返回呢?这要看EM#defer的第二个参数callback。当defered的后台线程执行完之后,Callback方法将会在主线程(Reactor线程)执行,并接受后台线程的返回作为callback块的参数。我们可以利用call块处理与调用者的通信。
使用EM#next_tick方法可以将一个块调度到reactor的下一次迭代时执行,执行任务的线程还是reactor主线程。
EM#next_tick
使用EM#next_tick和EM#add_timer非常相似。可以为EM#next_tick指定一个块,在reactor的下次迭代执行。
require 'eventmachine'
EM.run do
EM.add_periodic_timer(1) do
puts "Hai"
end
EM.add_timer(5) do
EM.next_tick do
EM.stop_event_loop
end
end
end
EM#next_tick调度的block将在主线程同步执行。如果这个block是一个长任务的话,那整个程序将被阻塞知道任务结束。这可不是好事情。
EM#defer
使用EM#defer,可以提供一个回调块。当调度的块在后台线程执行完之后,回调块将在主线程得到执行。当然,回调块不是必需的。
require 'eventmachine'
require 'thread'
EM.run do
EM.add_timer(2) do
puts "Main #{Thread.current}"
EM.stop_event_loop
end
EM.defer do
puts "Defer #{Thread.current}"
end
end
EM#defer将会把一个任务调度到后台线程。切记,必须保证这个任务不会永远占据线程,因为EventMachine的线程池是固定的,如果任务占据线程不是放,你就没办法回收了。
轻量级并发
EventMachine内置了两种轻量级并发机制:spawned processes和deferrables。spawned processes是从Erlang借鉴而来,它们并不是操作系统进程。在CPU和内存占用上,这两种机制要更轻量。你的应用可能需要处理大量的轻量并发工作,deferrables和spawned processes直到你的应用真正需要的时候才会执行。让我们看看它们是怎么工作的。
EM::Deferrable
我们先来卡看EM::Deferrable。如果一个类混入了EM::Deferrable之后,就可以把一些callback和errback关联到这个类的实例上,当一些特定情况发生时(实例处于了某个deferrable的状态)。callback和errback将会按照他们关联的次序执行。
可以通过设置实例的#set_deferred_status 方法来触发关联的callbacks和errbacks 。如果set_deferred_status 的参数是:succeeded,则触发callbacks,如果参数是:failed,则触发errbacks。一旦触发,这些回调将会在主线程立即得到执行。同样你可以在回调中(callbacks和errbacks)再次调用#set_deferred_status,改变状态。
require 'eventmachine'
class MyDeferrable
include EM::Deferrable
def go(str)
puts "Go #{str} go"
end
end
EM.run do
df = MyDeferrable.new
df.callback do |x|
df.go(x)
EM.stop
end
EM.add_timer(1) do
df.set_deferred_status :succeeded, "SpeedRacer"
end
end
EM::SpawnedProcess
EventMachine spawned processes是从Erlang中借鉴而来。命名有点让人困惑,这里没有任何操作系统层面的进程。Spawned processes,你可以创建一个“进程”,关联一些代码。某个时间点,spawned实例将会被#notify方法触发,执行关联的block。
和deferrables不同的是,当你调用了notify之后,不会立即被执行,而是在某个时间点。
require 'eventmachine'
EM.run do
s = EM.spawn do |val|
puts "Received #{val}"
end
EM.add_timer(1) do
s.notify "hello"
end
EM.add_periodic_timer(1) do
puts "Periodic"
end
EM.add_timer(3) do
EM.stop
end
end
网络编程
网络编程是EventMachin的设计初衷。Eventmachine可以处理一系列protocol,并实现了许多基本的protocol,这使得网络编程非常方便。
服务端
我们先从服务端开始。
回头看看简介里的例子。
require 'eventmachine'
class Echo < EM::Connection
def receive_data(data)
send_data(data)
end
end
EM.run do
EM.start_server("0.0.0.0", 10000, Echo)
end
Echo类继承自EM::Connection。我们使用start_server创建了一个服务器,监听在端口10000,Echo被用来处理connection。
其实还有两种方式。
模块
require 'eventmachine'
module Echo
def receive_data(data)
send_data(data)
end
end
EM.run do
EM.start_server("0.0.0.0", 10000, Echo)
end
块(block)
require 'eventmachine'
EM.run do
EM.start_server("0.0.0.0", 10000) do |srv|
def srv.receive_data(data)
send_data(data)
end
end
end
无论哪种情况,当一个新的连接建立之后,一个包含你的代码的类的实例将会创建。这非常重要,一个连接一个实例。你不能用实例在连接之间交换信息。
我们需要实现receive_data(data)方法来处理接收到的数据,如果你没有实现这个方法,你会在控制台看到类似于“............>>>6”的输出。除了receive_data之外,还有许多其他的方法需要实现。这些方法在一个连接的生命周期中将会被调用。
post_init | 当实例创建好,连接还没有完全建立的时候调用 |
connection_completed | 连接完全建立的时候调用 |
receive_data(data) | 当收到客户端的数据时调用。数据是成块接收的。应用负责解析。 |
unbind | 当客户端断开连接的时候调用 |
在例子中,我们使用send_data(data)方法把接受到的数据返回客户端。 如果要返回的是一个大的数据文件,可以调用send_file_data(filename) 方法,这个方法对流式数据性能很高。还有两个有用的方法#close_connection和close_connection_after_writing。这两个方法非常相似。它们将通知客户端关闭连接。 不同的是close_connection_after_writing会确保连接断开之前,send_data发送的数据已经完全发送到客户端。
前面已经提到,我们不能在我们的类中存储跨连接的信息。EventMachine提供里另外一种机制。
require 'eventmachine'
class Pass < EM::Connection
attr_accessor :a, :b
def receive_data(data)
send_data "#{@a} #{data.chomp} #{b}"
end
end
EM.run do
EM.start_server("127.0.0.1", 10000, Pass) do |conn|
conn.a = "Goodbye"
conn.b = "world"
end
end
客户端
服务端已经运行,那么客户端呢?非常简单
require 'eventmachine'
class Connector < EM::Connection
def post_init
puts "Getting /"
send_data "GET / HTTP/1.1\r\nHost: MagicBob\r\n\r\n"
end
def receive_data(data)
puts "Received #{data.length} bytes"
end
end
EM.run do
EM.connect("www.postrank.com", 80, Connector)
end
除了调用EM#connect创建一个连接,其它的代码和server类似。
结论
我们在本文讨论了EM#run,创建一次性和周期性定时器,推迟和延迟块执行,轻量级并发和网络编程。