什么是AMQP
首先,快速介绍AMQP。 AMQP代表“高级消息队列协议”,是消息传递的开放标准。 AMQP 主页声明其愿景是:“成为所有消息中间件之间互操作性的标准协议”。 AMQP定义了用于交换消息的传输级别协议,该协议可用于集成来自许多不同平台,语言和技术的应用程序。
有许多工具可以实现此协议,但是RabbitMQ引起了越来越多的关注。 RabbitMQ是使用AMQP的基于Erlang的开源消息代理。 所有会说AMQP的应用程序都可以连接并使用RabbitMQ。 因此,在本文中,我们将展示如何将基于Play2 / Scala / Akka的应用程序连接到RabbitMQ。 在本文中,我们将向您展示如何实现两种最常见的方案:
- 发送/接收:我们将配置一个发件人每隔几秒钟发送一条消息,并使用两个侦听器以循环方式从队列中读取消息。
- 发布/订阅:在此示例中,我们将创建几乎相同的方案,但是这次,侦听器将同时获得消息。
我假设您已经安装了RabbitMQ。 如果不是,请按照其网站上的说明进行操作。
设置基本的Play 2 / Scala项目
在此示例中,我创建了一个新的Play 2项目。 这样做很容易:
jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ
_ _
_ __ | | __ _ _ _| |
| '_ \| |/ _' | || |_|
| __/|_|\____|\__ (_)
|_| |__/
play! 2.0-RC2, http://www.playframework.org
The new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQ
What is the application name?
> PlayAndRabbitMQ
Which template do you want to use for this new application?
1 - Create a simple Scala application
2 - Create a simple Java application
3 - Create an empty project
> 1
OK, application PlayAndRabbitMQ is created.
Have fun!
我曾经使用scala-ide插件在Eclipse上工作,所以我执行play eclipsify并将项目导入Eclipse。
我们需要做的下一步是建立正确的依赖关系。 Play为此使用sbt,并允许您从项目目录中的build.scala文件配置依赖项。 我们将添加的唯一依赖关系是RabbitMQ的Java客户端库。 即使Lift提供了一个基于Scala的AMQP库,但我发现直接使用RabbitMQ也是一样容易。 添加依赖项后,我的build.scala如下所示:
import sbt._
import Keys._
import PlayProject._
object ApplicationBuild extends Build {
val appName = "PlayAndRabbitMQ"
val appVersion = "1.0-SNAPSHOT"
val appDependencies = Seq(
"com.rabbitmq" % "amqp-client" % "2.8.1"
)
val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings(
)
}
将rabbitMQ配置添加到配置文件
对于我们的示例,我们可以配置一些东西。 将消息发送到的队列,要使用的交换以及运行RabbitMQ的主机。 在实际情况下,我们将需要设置更多的配置选项,但是在这种情况下,我们只有这三个。 将以下内容添加到您的application.conf中,以便我们可以从我们的应用程序中引用它。
#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1
现在,我们可以使用ConfigFactory访问这些配置文件。 为了便于访问,请创建以下对象:
object Config {
val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");
val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");
val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}
初始化与RabbitMQ的连接
在研究如何使用RabbitMQ发送和接收消息之前,我们还有一个要定义的对象。 要使用RabbitMQ,我们需要一个连接。 我们可以使用ConnectionFactory获得与服务器的连接。 查看javadocs以获取有关如何配置连接的更多信息。
object RabbitMQConnection {
private val connection: Connection = null;
/**
* Return a connection if one doesn't exist. Else create
* a new one
*/
def getConnection(): Connection = {
connection match {
case null => {
val factory = new ConnectionFactory();
factory.setHost(Config.RABBITMQ_HOST);
factory.newConnection();
}
case _ => connection
}
}
}
在应用程序启动时启动侦听器
在查看RabbitMQ代码之前,我们还需要做一件事。 我们需要确保在应用程序启动时注册了消息侦听器,并且发件人开始发送。 播放2提供了
为此的GlobalSettings对象,您可以在应用程序启动时扩展该对象以执行代码。 对于我们的示例,我们将使用以下对象(请记住,该对象需要存储在默认名称空间中:
import play.api.mvc._
import play.api._
import rabbitmq.Sender
object Global extends GlobalSettings {
override def onStart(app: Application) {
Sender.startSending
}
}
我们将在下面的部分中查看此Sender.startSending操作,该操作将初始化所有发送者和接收者。
设置发送和接收方案
让我们看一下Sender.startSending代码,该代码将设置一个将msg发送到特定队列的发送方。 为此,我们使用以下代码:
object Sender {
def startSending = {
// create the connection
val connection = RabbitMQConnection.getConnection();
// create the channel we use to send
val sendingChannel = connection.createChannel();
// make sure the queue exists we want to send to
sendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);
Akka.system.scheduler.schedule(2 seconds, 1 seconds
, Akka.system.actorOf(Props(
new SendingActor(channel = sendingChannel,
queue = Config.RABBITMQ_QUEUE)))
, "MSG to Queue");
}
}
class SendingActor(channel: Channel, queue: String) extends Actor {
def receive = {
case some: String => {
val msg = (some + " : " + System.currentTimeMillis());
channel.basicPublish("", queue, null, msg.getBytes());
Logger.info(msg);
}
case _ => {}
}
}
在此代码中,我们采取以下步骤:
- 使用工厂检索到RabbitMQ的连接
- 在此连接上创建一个通道,用于与RabbitMQ通信
- 使用通道创建队列(如果尚不存在)
- 安排Akka每秒向演员发送一条消息。
所有这些都应该非常简单。 唯一(有点)复杂的部分是调度部分。 此调度操作的作用是这样的。 我们告诉Akka安排要发送给演员的消息。 我们希望在发射之前有2秒的延迟,并且我们希望每秒重复一次此作业。 应该用于此的actor是SendingActor,您也可以在此清单中看到。 该参与者需要访问通道以发送消息,并且该参与者还需要知道接收消息的位置。 这是队列。
因此,该Actor每秒将收到一条消息,附加一个时间戳,并使用提供的通道将此消息发送到队列:channel.basicPublish(“”,queue,null,msg.getBytes());。 现在我们每秒发送一条消息,在此队列上有可以接收消息的侦听器将是很好的。 为了接收消息,我们还创建了一个Actor,可以在特定队列上无限期地进行监听。
class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {
// called on the initial run
def receive = {
case _ => startReceving
}
def startReceving = {
val consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, true, consumer);
while (true) {
// wait for the message
val delivery = consumer.nextDelivery();
val msg = new String(delivery.getBody());
// send the message to the provided callback function
// and execute this in a subactor
context.actorOf(Props(new Actor {
def receive = {
case some: String => f(some);
}
})) ! msg
}
}
}
这个actor比我们以前发送的actor要复杂一些。 当该参与者收到消息(消息的种类无关紧要)时,它将开始侦听创建该消息的队列。 它通过使用提供的通道创建使用者来实现此目的,并告诉使用者开始在指定队列上侦听。 Consumer.nextDelivery()方法将阻塞,直到消息在配置的队列中等待。 收到消息后,将创建一个新的Actor,将消息发送到该Actor。 这个新的参与者将消息传递给提供的方法,您可以在其中放置业务逻辑。
要使用此侦听器,我们需要提供以下参数:
- 频道:允许访问RabbitMQ
- 队列:监听消息的队列
- f:收到消息后我们将执行的功能。
第一个示例的最后一步是将所有内容粘合在一起。 为此,我们向Sender.startSending方法添加了几个方法调用。
def startSending = {
...
val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);
setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);
// create an actor that starts listening on the specified queue and passes the
// received message to the provided callback
val callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);
// setup the listener that sends to a specific queue using the SendingActor
setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);
...
}
private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {
Akka.system.scheduler.scheduleOnce(2 seconds,
Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");
}
在此代码中,您可以看到我们定义了一个回调函数,并使用此回调函数以及队列和通道来创建ListeningActor。 我们使用scheduleOnce方法在单独的线程中启动此侦听器。 现在,使用此代码,我们可以运行应用程序(播放运行),打开localhost:9000来启动应用程序,我们应该看到类似以下输出的内容。
[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822
在这里,您可以清楚地看到消息的循环处理方式。
设置发布和订阅方案
一旦我们运行了上面的代码,添加发布/订阅功能就变得非常简单。 现在,我们使用PublishingActor代替SendingActor:
class PublishingActor(channel: Channel, exchange: String) extends Actor {
/**
* When we receive a message we sent it using the configured channel
*/
def receive = {
case some: String => {
val msg = (some + " : " + System.currentTimeMillis());
channel.basicPublish(exchange, "", null, msg.getBytes());
Logger.info(msg);
}
case _ => {}
}
}
RabbitMQ使用交换来允许多个收件人接收相同的消息(以及许多其他高级功能)。 来自其他参与者的代码唯一的变化是,这次我们将消息发送到交换而不是队列。 侦听器代码完全相同,我们唯一需要做的就是将队列连接到特定的交换机。 这样,该队列上的侦听器就可以接收发送到交换机的消息。 我们再次根据之前使用的设置方法执行此操作。
...
// create a new sending channel on which we declare the exchange
val sendingChannel2 = connection.createChannel();
sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");
// define the two callbacks for our listeners
val callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);
val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);
// create a channel for the listener and setup the first listener
val listenChannel1 = connection.createChannel();
setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(),
Config.RABBITMQ_EXCHANGEE, callback3);
// create another channel for a listener and setup the second listener
val listenChannel2 = connection.createChannel();
setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(),
Config.RABBITMQ_EXCHANGEE, callback4);
// create an actor that is invoked every two seconds after a delay of
// two seconds with the message "msg"
Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(
new PublishingActor(channel = sendingChannel2
, exchange = Config.RABBITMQ_EXCHANGEE))),
"MSG to Exchange");
...
我们还为setupListener创建了一个重载方法,该方法作为一个附加参数,也接受要使用的交换的名称。
private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {
channel.queueBind(queueName, exchange, "");
Akka.system.scheduler.scheduleOnce(2 seconds,
Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");
}
在这小段代码中,您可以看到我们将提供的队列(在我们的示例中是一个随机名称)绑定到指定的交易所。 之后,我们将创建一个新的监听器,如我们之前所见。
现在运行此代码将产生以下输出:
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006
如您所见,在这种情况下,两个侦听器都收到相同的消息。 这几乎涵盖了本文的全部内容。 如您所见,为RabbitMQ使用基于Java的客户端api绰绰有余,并且可以从Scala轻松使用。 请注意,尽管该示例尚未准备好投入生产,但您应注意关闭连接,并很好地关闭侦听器和参与者。 这里没有显示所有关闭代码。
参考:通过我们的JCG合作伙伴 Jos Dirksen在Smart Java博客上使用Scala,Play和Akka连接到RabbitMQ(AMQP) 。
翻译自: https://www.javacodegeeks.com/2012/04/connect-to-rabbitmq-amqp-using-scala.html