当前位置: 首页 > 工具软件 > play-rabbitmq > 使用案例 >

使用Scala,Play和Akka连接到RabbitMQ(AMQP)

袁凌
2023-12-01
在本文中,我们将研究如何从Scala连接到RabbitMQ,以便可以从应用程序中支持AMQP协议。 在此示例中,我将使用Play Framework 2.0作为容器(有关更多信息,请参阅我在该主题上的其他文章 )在其中运行应用程序,因为Play使得使用Scala进行开发变得更加容易。 本文还将使用Akka actor发送和接收RabbitMQ的消息。

什么是AMQP

首先,快速介绍AMQP。 AMQP代表“高级消息队列协议”,是消息传递的开放标准。 AMQP 主页声明其愿景是:“成为所有消息中间件之间互操作性的标准协议”。 AMQP定义了用于交换消息的传输级别协议,该协议可用于集成来自许多不同平台,语言和技术的应用程序。
有许多工具可以实现此协议,但是RabbitMQ引起了越来越多的关注。 RabbitMQ是使用AMQP的基于Erlang的开源消息代理。 所有会说AMQP的应用程序都可以连接并使用RabbitMQ。 因此,在本文中,我们将展示如何将基于Play2 / Scala / Akka的应用程序连接到RabbitMQ。 在本文中,我们将向您展示如何实现两种最常见的方案:

  • 发送/接收:我们将配置一个发件人每隔几秒钟发送一条消息,并使用两个侦听器以循环方式从队列中读取消息。
  • 发布/订阅:在此示例中,我们将创建几乎相同的方案,但是这次,侦听器将同时获得消息。

我假设您已经安装了RabbitMQ。 如果不是,请按照其网站上的说明进行操作。

设置基本的Play 2 / Scala项目

在此示例中,我创建了一个新的Play 2项目。 这样做很容易:

jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ
       _            _ 
 _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/ 
 
play! 2.0-RC2, http://www.playframework.org
 
The new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQ
 
What is the application name? 
> PlayAndRabbitMQ
 
Which template do you want to use for this new application? 
 
  1 - Create a simple Scala application
  2 - Create a simple Java application
  3 - Create an empty project
 
> 1
 
OK, application PlayAndRabbitMQ is created.
 
Have fun!

我曾经使用scala-ide插件在Eclipse上工作,所以我执行play eclipsify并将项目导入Eclipse。
我们需要做的下一步是建立正确的依赖关系。 Play为此使用sbt,并允许您从项目目录中的build.scala文件配置依赖项。 我们将添加的唯一依赖关系是RabbitMQ的Java客户端库。 即使Lift提供了一个基于Scala的AMQP库,但我发现直接使用RabbitMQ也是一样容易。 添加依赖项后,我的build.scala如下所示:

import sbt._
import Keys._
import PlayProject._
 
object ApplicationBuild extends Build {
 
    val appName         = "PlayAndRabbitMQ"
    val appVersion      = "1.0-SNAPSHOT"
 
    val appDependencies = Seq(
      "com.rabbitmq" % "amqp-client" % "2.8.1"
    )
 
    val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings(
    )
}

将rabbitMQ配置添加到配置文件

对于我们的示例,我们可以配置一些东西。 将消息发送到的队列,要使用的交换以及运行RabbitMQ的主机。 在实际情况下,我们将需要设置更多的配置选项,但是在这种情况下,我们只有这三个。 将以下内容添加到您的application.conf中,以便我们可以从我们的应用程序中引用它。

#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1

现在,我们可以使用ConfigFactory访问这些配置文件。 为了便于访问,请创建以下对象:

object Config {
  val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");
  val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");
  val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}

初始化与RabbitMQ的连接

在研究如何使用RabbitMQ发送和接收消息之前,我们还有一个要定义的对象。 要使用RabbitMQ,我们需要一个连接。 我们可以使用ConnectionFactory获得与服务器的连接。 查看javadocs以获取有关如何配置连接的更多信息。

object RabbitMQConnection {
 
  private val connection: Connection = null;
 
  /**
   * Return a connection if one doesn't exist. Else create
   * a new one
   */
  def getConnection(): Connection = {
    connection match {
      case null => {
        val factory = new ConnectionFactory();
        factory.setHost(Config.RABBITMQ_HOST);
        factory.newConnection();
      }
      case _ => connection
    }
  }
}

在应用程序启动时启动侦听器

在查看RabbitMQ代码之前,我们还需要做一件事。 我们需要确保在应用程序启动时注册了消息侦听器,并且发件人开始发送。 播放2提供了
为此的GlobalSettings对象,您可以在应用程序启动时扩展该对象以执行代码。 对于我们的示例,我们将使用以下对象(请记住,该对象需要存储在默认名称空间中:

import play.api.mvc._
import play.api._
import rabbitmq.Sender
 
object Global extends GlobalSettings {
 
  override def onStart(app: Application) {
    Sender.startSending
  }
}

我们将在下面的部分中查看此Sender.startSending操作,该操作将初始化所有发送者和接收者。

设置发送和接收方案

让我们看一下Sender.startSending代码,该代码将设置一个将msg发送到特定队列的发送方。 为此,我们使用以下代码:

object Sender {
 
  def startSending = {
    // create the connection
    val connection = RabbitMQConnection.getConnection();
    // create the channel we use to send
    val sendingChannel = connection.createChannel();
    // make sure the queue exists we want to send to
    sendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);
 
   Akka.system.scheduler.schedule(2 seconds, 1 seconds
          , Akka.system.actorOf(Props(
               new SendingActor(channel = sendingChannel, 
                                          queue = Config.RABBITMQ_QUEUE)))
          , "MSG to Queue");
  }
}
 
class SendingActor(channel: Channel, queue: String) extends Actor {
 
  def receive = {
    case some: String => {
      val msg = (some + " : " + System.currentTimeMillis());
      channel.basicPublish("", queue, null, msg.getBytes());
      Logger.info(msg);
    }
    case _ => {}
  }
}

在此代码中,我们采取以下步骤:

  1. 使用工厂检索到RabbitMQ的连接
  2. 在此连接上创建一个通道,用于与RabbitMQ通信
  3. 使用通道创建队列(如果尚不存在)
  4. 安排Akka每秒向演员发送一条消息。

所有这些都应该非常简单。 唯一(有点)复杂的部分是调度部分。 此调度操作的作用是这样的。 我们告诉Akka安排要发送给演员的消息。 我们希望在发射之前有2秒的延迟,并且我们希望每秒重复一次此作业。 应该用于此的actor是SendingActor,您也可以在此清单中看到。 该参与者需要访问通道以发送消息,并且该参与者还需要知道接收消息的位置。 这是队列。
因此,该Actor每秒将收到一条消息,附加一个时间戳,并使用提供的通道将此消息发送到队列:channel.basicPublish(“”,queue,null,msg.getBytes());。 现在我们每秒发送一条消息,在此队列上有可以接收消息的侦听器将是很好的。 为了接收消息,我们还创建了一个Actor,可以在特定队列上无限期地进行监听。

class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {
 
  // called on the initial run
  def receive = {
    case _ => startReceving
  }
 
  def startReceving = {
 
    val consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue, true, consumer);
 
    while (true) {
      // wait for the message
      val delivery = consumer.nextDelivery();
      val msg = new String(delivery.getBody());
 
      // send the message to the provided callback function
      // and execute this in a subactor
      context.actorOf(Props(new Actor {
        def receive = {
          case some: String => f(some);
        }
      })) ! msg
    }
  }
}

这个actor比我们以前发送的actor要复杂一些。 当该参与者收到消息(消息的种类无关紧要)时,它将开始侦听创建该消息的队列。 它通过使用提供的通道创建使用者来实现此目的,并告诉使用者开始在指定队列上侦听。 Consumer.nextDelivery()方法将阻塞,直到消息在配置的队列中等待。 收到消息后,将创建一个新的Actor,将消息发送到该Actor。 这个新的参与者将消息传递给提供的方法,您可以在其中放置业务逻辑。
要使用此侦听器,我们需要提供以下参数:

  • 频道:允许访问RabbitMQ
  • 队列:监听消息的队列
  • f:收到消息后我们将执行的功能。

第一个示例的最后一步是将所有内容粘合在一起。 为此,我们向Sender.startSending方法添加了几个方法调用。

def startSending = {
   ...
    val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);
 
    setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);
 
    // create an actor that starts listening on the specified queue and passes the
    // received message to the provided callback
    val callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);
 
    // setup the listener that sends to a specific queue using the SendingActor
    setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);
   ...
  }
 
  private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {
    Akka.system.scheduler.scheduleOnce(2 seconds, 
        Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");
  }

在此代码中,您可以看到我们定义了一个回调函数,并使用此回调函数以及队列和通道来创建ListeningActor。 我们使用scheduleOnce方法在单独的线程中启动此侦听器。 现在,使用此代码,我们可以运行应用程序(播放运行),打开localhost:9000来启动应用程序,我们应该看到类似以下输出的内容。

[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822

在这里,您可以清楚地看到消息的循环处理方式。

设置发布和订阅方案

一旦我们运行了上面的代码,添加发布/订阅功能就变得非常简单。 现在,我们使用PublishingActor代替SendingActor:

class PublishingActor(channel: Channel, exchange: String) extends Actor {
 
  /**
   * When we receive a message we sent it using the configured channel
   */
  def receive = {
    case some: String => {
      val msg = (some + " : " + System.currentTimeMillis());
      channel.basicPublish(exchange, "", null, msg.getBytes());
      Logger.info(msg);
    }
    case _ => {}
  }
}

RabbitMQ使用交换来允许多个收件人接收相同的消息(以及许多其他高级功能)。 来自其他参与者的代码唯一的变化是,这次我们将消息发送到交换而不是队列。 侦听器代码完全相同,我们唯一需要做的就是将队列连接到特定的交换机。 这样,该队列上的侦听器就可以接收发送到交换机的消息。 我们再次根据之前使用的设置方法执行此操作。

...
    // create a new sending channel on which we declare the exchange
    val sendingChannel2 = connection.createChannel();
    sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");
 
    // define the two callbacks for our listeners
    val callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);
    val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);
 
    // create a channel for the listener and setup the first listener
    val listenChannel1 = connection.createChannel();
    setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(), 
                   Config.RABBITMQ_EXCHANGEE, callback3);
 
    // create another channel for a listener and setup the second listener
    val listenChannel2 = connection.createChannel();
    setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(), 
                   Config.RABBITMQ_EXCHANGEE, callback4);
 
    // create an actor that is invoked every two seconds after a delay of
    // two seconds with the message "msg"
    Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(
               new PublishingActor(channel = sendingChannel2
                    , exchange = Config.RABBITMQ_EXCHANGEE))), 
         "MSG to Exchange");
    ...

我们还为setupListener创建了一个重载方法,该方法作为一个附加参数,也接受要使用的交换的名称。

private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {
    channel.queueBind(queueName, exchange, "");
 
    Akka.system.scheduler.scheduleOnce(2 seconds, 
        Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");
  }

在这小段代码中,您可以看到我们将提供的队列(在我们的示例中是一个随机名称)绑定到指定的交易所。 之后,我们将创建一个新的监听器,如我们之前所见。
现在运行此代码将产生以下输出:

[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006

如您所见,在这种情况下,两个侦听器都收到相同的消息。 这几乎涵盖了本文的全部内容。 如您所见,为RabbitMQ使用基于Java的客户端api绰绰有余,并且可以从Scala轻松使用。 请注意,尽管该示例尚未准备好投入生产,但您应注意关闭连接,并很好地关闭侦听器和参与者。 这里没有显示所有关闭代码。

参考:通过我们的JCG合作伙伴 Jos Dirksen在Smart Java博客上使用Scala,Play和Akka连接到RabbitMQ(AMQP)


翻译自: https://www.javacodegeeks.com/2012/04/connect-to-rabbitmq-amqp-using-scala.html

 类似资料: