vertx.akka
刚刚发布了ElasticMQ 0.7.0 ,它是一个基于角色的Scala和与Amazon SQS兼容的接口的消息队列系统。
这是一次重大的重写,在核心使用Akka actor,在REST层使用Spray 。 到目前为止,只有核心模块和SQS模块已被重写。 日志记录,SQL后端和复制尚未完成。
客户端的主要改进是:
- 长期的投票支持,它在一段时间前已添加到SQS中
- 更简单的独立服务器-只需下载一个jar
对于长轮询,当接收到一条消息时,您可以指定一个附加的MessageWaitTime
属性。 如果在队列中没有消息,而不是完成空响应请求,ElasticMQ将等待MessageWaitTime
秒钟,直到邮件到达。 这不仅有助于减少所使用的带宽(不需要非常频繁的请求),还可以提高整体系统性能(在发送消息后立即接收消息)以及降低SQS成本。
现在,独立服务器是一个jar。 要运行内存中的本地SQS实现(例如,用于测试使用SQS的应用程序),您需要做的就是下载jar文件并运行:
java -jar elasticmq-server-0.7.0.jar
这将在http://localhost:9324
上启动服务器。 当然,接口和端口是可配置的,有关详细信息,请参见自述文件 。 和以前一样,您也可以使用任何基于JVM的语言来运行嵌入式服务器。
实施说明
出于好奇,这里简要介绍一下ElasticMQ的实现方式,包括核心系统,REST层,Akka Dataflow的用法和长轮询的实现。 所有代码都可以在GitHub上找到 。
如前所述,ElasticMQ现在使用Akka和Spray实现,并且不包含任何阻塞调用 。 一切都是异步的。
核心
核心系统是基于参与者的。 有一个主要角色( QueueManagerActor ),它知道系统中当前创建了哪些队列,并提供了创建和删除队列的可能性。
为了与演员进行交流,使用了键入的询问模式 。 例如,要查找队列(队列也是参与者),则定义一条消息:
case class LookupQueue(queueName: String) extends Replyable[Option[ActorRef]]
用法如下所示:
import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")
如前所述,每个队列都是一个参与者,并且封装了队列状态。 我们可以使用简单的可变数据结构,而不需要线程同步,因为参与者模型会为我们处理这些事情。 有很多消息可以发送到队列执行者,例如:
case class SendMessage(message: NewMessageData) extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int,
waitForMessages: Option[Duration]) extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]
休息层
SQS查询/ REST层是使用Spray (基于Akka的轻量级REST / HTTP工具包)实现的。
除了基于角色的非阻塞IO实现外,Spray还提供了强大的路由库spray-routing
。 它包含许多内置指令,用于在请求方法(get / post等)上进行匹配,提取表单参数查询或在请求路径上进行匹配。 但是,它还允许您使用简单的指令组合来定义自己的指令。 典型的ElasticMQ路由如下所示:
val listQueuesDirective =
action("ListQueues") {
rootPath {
anyParam("QueueNamePrefix"?) { prefixOption =>
// logic
}
}
}
如果action
与body参数的"Action"
URL中指定的action名称匹配并接受/拒绝请求,则rootPath
在空路径上匹配,依此类推。 Spray有一个很好的教程 ,因此,如果您有兴趣的话,我鼓励您去那里看看。
如何使用路由中的队列参与者来完成HTTP请求?
Spray的好处在于,它所做的只是将RequestContext
实例传递到您的路由,没有任何回报。 完全放弃请求或使用值完成请求取决于路由。 该请求也可能在另一个线程中完成,或者例如在某个将来完成时。 ElasticMQ正是这样做的。 这里map
, flatMap
和for-comprehensions
(这是map
/ flatMap
的更好语法)非常方便,例如(简化):
// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {
queueActor <- queueManagerActor ? LookupQueue(queueName)
_ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
requestContext.complete(200, "message deleted")
}
有时,当流程更复杂时,ElasticMQ使用Akka Dataflow ,这要求启用continuations插件。 还有一个使用宏的类似项目Scala Async ,但是它处于早期开发中。
使用Akka Dataflow,您可以编写使用Future
的代码,就像正常的顺序代码一样。 CPS插件会将其转换为在需要的地方使用回调。 一个例子,取自CreateQueueDirectives :
flow {
val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
queueActorOption match {
case None => {
val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
createResult match {
case Left(e) => throw new SQSException("Queue already created: " + e.message)
case Right(_) => newQueueData
}
}
case Some(queueActor) => {
(queueActor ? GetQueueData()).apply()
}
}
}
这里的重要部分是flow
块,它确定了转换的范围,而apply()
调用Future
,它提取了Future
的内容。 这看起来像是完全正常的顺序代码,但是在执行时,因为第一个Future
用法将异步运行。
长时间轮询
由于所有代码都是异步且非阻塞的,因此实现长轮询非常容易。 请注意,当从队列接收消息时,我们会得到Future[List[MessageData]]
。 响应于完成此将来,HTTP请求也将以适当的响应完成。 但是,这种未来可能会立即完成(通常是正常情况),或者例如在10秒钟后完成–支持此操作的代码无需更改。 因此,唯一要做的就是延迟完成将来,直到经过指定的时间量或收到新消息为止。
该实现在QueueActorWaitForMessagesOps中 。 当接收消息的请求到达时,队列中没有任何内容,而不是立即答复(即,将空列表发送给发送方参与者),我们将对原始请求和发送方参与者的引用存储在映射中。 使用Akka调度程序,我们还计划在指定的超时后发回一个空列表并删除该条目。
当收到新消息时,我们只是从地图上等待请求,然后尝试完成它。 同样,所有同步和并发问题都由Akka和参与者模型处理。
vertx.akka