当前位置: 首页 > 工具软件 > ElasticMQ > 使用案例 >

ElasticMQ 0.7.0:使用Akka和Spray的长轮询,无阻塞实现

夏侯衡
2023-12-01

刚刚发布了ElasticMQ 0.7.0 ,它是一个基于角色的Scala和与Amazon SQS兼容的接口的消息队列系统。

这是一次重大的重写,在核心使用Akka actor,在REST层使用Spray 。 到目前为止,只有核心模块和SQS模块已被重写。 日志记录,SQL后端和复制尚未完成。

客户端的主要改进是:

  • 长期轮询支持,该支持已在一段时间前添加到SQS中
  • 更简单的独立服务器-只需下载一个jar

对于长轮询,当接收到一条消息时,您可以指定一个附加的MessageWaitTime属性。 如果在队列中没有消息,而不是完成空响应请求,ElasticMQ将等待MessageWaitTime秒钟,直到邮件到达。 这不仅有助于减少所使用的带宽(不需要非常频繁的请求),还可以提高整体系统性能(在发送消息后立即接收消息)并降低SQS成本。

现在,独立服务器是一个jar。 要运行内存中的本地SQS实现(例如,用于测试使用SQS的应用程序),您需要做的就是下载jar文件并运行:

java -jar elasticmq-server-0.7.0.jar

这将在http://localhost:9324上启动服务器。 当然,接口和端口是可配置的,有关详细信息,请参见自述文件 。 和以前一样,您也可以使用任何基于JVM的语言来运行嵌入式服务器。

实施说明

出于好奇,这里简要介绍一下ElasticMQ的实现方式,包括核心系统,REST层,Akka Dataflow的用法和长轮询的实现。 所有代码都可以在GitHub找到

如前所述,ElasticMQ现在使用Akka和Spray实现,并且不包含任何阻塞调用 。 一切都是异步的。

核心

核心系统是基于参与者的。 有一个主要角色( QueueManagerActor ),它知道系统中当前正在创建的队列,并提供了创建和删除队列的可能性。

为了与演员进行交流,使用了键入的询问模式 。 例如,要查找队列(队列也是参与者),则定义一条消息:

case class LookupQueue(queueName: String) extends Replyable[Option[ActorRef]]

用法如下所示:

import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")

如前所述,每个队列都是一个参与者,并且封装了队列状态。 我们可以使用简单的可变数据结构,而无需线程同步,因为参与者模型会为我们处理这些事情。 有很多消息可以发送到队列执行者,例如:

case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, 
           waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]

休息层

SQS查询/ REST层是使用Spray (基于Akka的轻量级REST / HTTP工具包)实现的。

除了基于角色的非阻塞IO实现外,Spray还提供了功能强大的路由库spray-routing 。 它包含许多内置指令,用于在请求方法(get / post等)上进行匹配,提取表单参数的查询或在请求路径上进行匹配。 但是,它还允许您使用简单的指令组合来定义自己的指令。 典型的ElasticMQ路由如下所示:

val listQueuesDirective = 
  action("ListQueues") {
    rootPath {
      anyParam("QueueNamePrefix"?) { prefixOption =>
        // logic
      }
    }
  }

如果action与body参数的"Action" URL中指定的动作名称匹配并接受/拒绝请求,则rootPath在空路径上匹配,依此类推。 Spray有一个很好的教程 ,因此,如果您有兴趣,我建议您去那里看看。

如何使用路由中的队列参与者来完成HTTP请求?

Spray的好处在于,它所做的只是将RequestContext实例传递到您的路由,没有任何回报。 完全放弃请求或使用值完成请求取决于路由。 该请求也可以在另一个线程中完成,或者例如在将来完成时。 ElasticMQ正是这样做的。 在这里, mapflatMapfor-comprehensions (这是map / flatMap的更好语法)非常方便,例如(简化):

// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {
   queueActor <- queueManagerActor ? LookupQueue(queueName)
   _ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
   requestContext.complete(200, "message deleted")
}

有时,当流程更复杂时,ElasticMQ使用Akka Dataflow ,这要求启用continuations插件。 还有一个类似的项目,它使用宏Scala Async ,但是它处于早期开发中。

使用Akka Dataflow,您可以编写使用Future的代码,就像正常的顺序代码一样。 CPS插件会将其转换为在需要时使用回调。 一个例子,取自CreateQueueDirectives

flow {
  val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
  queueActorOption match {
    case None => {
      val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
      createResult match {
        case Left(e) => throw new SQSException("Queue already created: " + e.message)
        case Right(_) => newQueueData
      }
    }
    case Some(queueActor) => {
      (queueActor ? GetQueueData()).apply()
    }
  }
}

这里的重要部分是flow块,它确定了转换的范围,而apply()调用Future ,它提取了Future的内容。 这看起来像是完全正常的顺序代码,但是在执行时,因为第一个Future用法将异步运行。

长时间轮询

由于所有代码都是异步且非阻塞的,因此实现长轮询非常容易。 请注意,当从队列接收消息时,我们将获得Future[List[MessageData]] 。 作为对完成此将来的响应,HTTP请求也将通过相应的响应来完成。 但是,这种未来可能会立即完成(通常是正常情况),或者例如在10秒钟后完成–支持此操作的代码无需更改。 因此,唯一要做的就是延迟完成将来,直到经过指定的时间量或收到新消息为止。

该实现在QueueActorWaitForMessagesOps中 。 当接收消息的请求到达时,队列中没有任何内容,而不是立即答复(即,将空列表发送给发送方参与者),而是将对原始请求和发送方参与者的引用存储在映射中。 使用Akka计划程序,我们还计划在指定的超时后发回空列表并删除条目。

当收到新消息时,我们只是从地图上等待请求,然后尝试完成它。 同样,所有同步和并发问题都由Akka和参与者模型处理。


翻译自: https://www.javacodegeeks.com/2013/06/elasticmq-0-7-0-long-polling-non-blocking-implementation-using-akka-and-spray.html

 类似资料: