消息传递队列在包括财务系统,医疗保健和旅游行业在内的一系列软件体系结构和域中很常见。 但是,面向消息的中间件(MOM)(分布式系统的主要消息传递范例)要求特别安装和维护排队系统。 本月,我将介绍基于云的替代劳动密集型消息的方法:亚马逊的简单队列服务(SQS)。
正如它通常是有意义的主机上的谷歌应用程序引擎或亚马逊弹性魔豆的Web应用程序(请参阅相关信息 ),所以是否有意义充分利用云的邮件系统。 无论哪种方式,您都可以花费更多的时间来编写应用程序,而不是安装和维护其基础结构。
在本文中,您将学习Amazon SQS如何减轻安装和维护消息队列系统的负担。 您还将有机会练习创建SQS消息队列,然后在它们上删除和检索消息。 最后,我将向您展示在向Magnus添加消息传递时发生的情况,Magnus是我上个月对Amazon Elastic Beanstalk进行介绍时使用的移动Web应用程序。
面向消息的中间件 (MOM)是一个术语,它描述通过消息队列进行通信的松耦合系统。 它们不是在系统中紧密耦合(例如,通过编译时相关性),而是在网络中分布。 消息队列是通信的媒介,这种分布式效应使消息传递系统得以扩展。
传统上,架构师决定在面向消息的系统中哪些组件将相互通信。 尽管所有通信都是通过消息传递进行的,但消息本身通常采用通用的跨平台格式。 消息可以是简单的字符串,甚至可以是使用XML或JSON编码的文档。
因为MOM体系结构将组件解耦并允许它们之间的跨平台通信,所以各个单元可能是异构的。 也就是说,分布式体系结构中的组件可以用不同的语言编写,例如Java语言,C#和Ruby。 组件也可以存在于不同的平台上,例如UNIX®和Windows®。 此外,MOM使系统集成更加容易。 作为中间件,MOM可以连接旧系统和更新系统。 这是因为组件之间的API只是一条消息,可以是从XML文档到序列化对象再到简单String
。
MOM系统中的消息队列是Web的管道:它们连接各种系统组件,以使消息能够在它们之间自由流动。 事实证明,GAE是面向消息的中间件系统的绝佳示例。
与任何优秀的MOM一样,Google App Engine使用消息队列来解耦系统进程。 特别是,GAE队列使从Web请求中卸载长时间运行的进程成为可能。 使用GAE,您可以将指向servlet或JSP的URL转储到消息队列中,然后由GAE服务进行拾取和处理。 Servlet是相对于Web应用程序的主要逻辑序列异步调用的。 (请参阅相关主题 ,以了解更多关于GAE)。
为了管理主流程的持续时间而对运行时间更长的流程进行排队并不是GAE的事情。 其他PaaS实现(例如Heroku)提供了类似MOM的功能。 但是,借助Amazon SQS,无论平台如何,都可以在任何 Web应用程序中轻松实现。
如果您在JMS中使用了消息队列,Amazon SQS提供了许多应熟悉的功能。
亚马逊SQS:
SQS入门与AWS中的其他一切一样简单。 如果您还没有AWS账户,请首先创建一个 。 接下来,启用Amazon SQS。 最后,使用AWS接口Java SDK来发布和阅读基于云的消息! (有关在下面实际编写它们的更多信息。)
与Amazon SQS名称保持一致,读取和写入队列的逻辑本身就是简单性。 首先,使用有效的访问密钥和秘密建立与AWS的连接,如清单1所示:
AmazonSQS sqs = new AmazonSQSClient(new BasicAWSCredentials(AWS_KEY, AWS_SECRET));
接下来,您需要一个队列。 在AWS API中,清单2中显示的对createQueue
的调用不一定每次都创建一个新队列。 如果队列已经存在,则返回其句柄。 在SQS中,队列只是URL; 因此,队列句柄也只是URL。 请注意,在AWS开发工具包API中, Queue
URL是String
类型,而不是Java URL
类型。
String url = sqs.createQueue(new CreateQueueRequest("a_queue")).getQueueUrl();
一旦有了队列,就可以向其中写入消息。 SQS的消息格式类似于SimpleDB的消息格式(请参阅参考资料 ),因为消息是String
。 但是请记住,通过使String
的格式有效为JSON或XML,可以轻松地构造String
,从而轻松地对其进行解析。
sqs.sendMessage(new SendMessageRequest(url, "It's a wonderful life!"));
消息长度是有界的。 默认情况下,邮件不能超过8KB。 如果您需要使用更长的消息,则可以始终将它们切碎,并使用序列ID识别各个片段。 然后可以在接收方重新组合消息。
就是这样–仅用这三行代码就可以在SQS队列中放置一条消息。
你可能会注意到在AWS SDK熟悉的模式,特别是如果你读过我的介绍SimpleDB的(参见相关主题 )。 由于AWS中的所有内容都是Web服务,因此所有通信都通过HTTP进行。 因此,API通过类似SendMessageRequest
或CreateQueueRequest
类似于Request
的对象来模拟逻辑请求。 在这两种情况下,名称都描述了对象的意图。
另外需要注意的是,放置在SQS上的邮件是持久的:它们一直存在,直到您将其删除为止。 (如果您不删除消息,消息最终会消失;自动过期的默认值为四天。)当读取消息以进行读取时,Amazon SQS采用简单的锁定策略—对于读取事件,消息不会被删除。在一段时间内可用于其他并发阅读过程,称为消息的可见性超时 。 尽管您可以随意更改持续时间,但默认情况下该值设置为30秒。
生活在亚马逊基础设施中的消息的持久性令人放心。 像SimpleDB甚至S3一样,AWS世界中的组件是大量冗余的。 如果您的阅读器进程在消息处理过程中意外终止,则很有可能消息仍然存在。 而且,如果AWS网络中的某些资产也决定发挥作用,您可以打赌关键任务消息将不会丢失-它们仍将存在于许多其他计算机上。 最后,与所有其他AWS产品一样,您可以按地区(美国,欧盟等)设置消息基础架构的物理位置。
将消息写入SQS队列需要三行代码。 读一条消息只需要更多。 实际上,假设您需要连接到AWS和同一个队列的句柄,所以前两行是相同的。 Amazon SQS不提供任何回调功能或消息到达的主动通知。 您必须定期轮询SQS队列以查看是否有任何要传递的内容。 因此,读取SQS队列需要一些额外的代码行。
实施轮询策略有一点警告:在尝试处理一条消息之前,必须检查以确保您确实收到了一条有效消息。 如果不这样做,您肯定会看到讨厌的NullPointerException
。
例如,假设我具有与AWS的有效连接以及包含消息的队列的句柄,那么我可以检索消息,如清单4所示:
while (true) {
List<Message> msgs = sqs.receiveMessage(
new ReceiveMessageRequest(url).withMaxNumberOfMessages(1)).getMessages();
if (msgs.size() > 0) {
Message message = msgs.get(0);
System.out.println("The message is " + message.getBody());
sqs.deleteMessage(new DeleteMessageRequest(url, message.getReceiptHandle()));
} else {
System.out.println("nothing found, trying again in 30 seconds");
Thread.sleep(3000);
}
}
在清单4中,对sqs
的引用是AmazonSQS
类型,如清单1所示 。 该目的提供了一种receiveMessage
它接受一个方法ReceiveMessageRequest
。 可以将ReceiveMessageRequest
配置为在队列中请求一定数量的消息。 就我而言,我将其配置为一次只抓取一条消息。 无论我请求多少消息, receiveMessage
方法都将返回“ Message
类型” List
。
如前所述,SQS读取采用轮询方式进行; 而且, receiveMessage
方法是非阻塞的。 因此,我必须检查相应的List
( msgs
)实际上是否包含任何内容。 如果没有从队列中检索任何内容,则对ReceiveMessageRequest
上的getMessages
的调用将返回一个空的List
,而不是null
。
如果我已检索到有效消息,则可以通过getBody
调用获取其有效负载或主体。 请记住,一旦您拥有有效消息的句柄,SQS就会将其锁定。 默认情况下,我有30秒的时间来处理该消息。 如果我希望将其从处理中永久删除,则必须删除该消息。 因此,我发出一个deleteMessage
调用,该调用接受一个DeleteMessageRequest
。
Message
实例通过其接收句柄 (如id
)来区分。 句柄与消息不直接相关,而与消息被读取的事件有关。 读取多次(例如,未删除,或者读取过程失败)的消息可能具有多个但不同的回执句柄。 因此,当您希望删除一条消息时,必须通过getReceiptHandle
调用提供它的接收句柄。
我没有连续检查队列中是否有消息,而是提供了一个睡眠功能,如果未检索到消息,它会等待30秒。 显然,在某些情况下,睡眠可能不是一个好主意,或者可能需要更长的暂停时间。
通过这些几行代码,我几乎涵盖了Amazon SQS。 尽管AWS开发工具包提供了许多其他功能,但是到目前为止,您仅需编写代码即可将消息读写到SQS队列中。
现在,让我们看看实际使用它时会发生什么。
上个月,我创建了一个名为马格努斯一个简单的移动web应用程序,这是我用来演示一些亚马逊弹性魔豆的功能(请参阅相关的主题 )。 Magnus具有存储从帐户持有人的移动设备接收到的位置信息的巧妙功能-正是许多人想要提供的信息,以及许多其他人想要消费的信息。
捕捉某人的下落是好事,但人们真正喜欢的是图表(那个和带有圆角的闪亮按钮)。 当您要移动大量数据时,图形和分析的处理成本可能很高。 (Hadoop,有人吗?) 提取,转换和加载或ETL的久经考验的技术是管理此问题的一种方法。 ETL是一个相当大的术语,包含许多内容。 (人们用这个首字母缩写来建立职业,公司来建立业务!)在这种情况下,ETL只是暗示我将分析一些MongoDB数据并基于该数据创建新文档。
当涉及到数据分析时,对于数据的要求和我们可以提供的答案存在无数的可能性。 Magnus网络应用程序只发挥了很小的潜力:它提取并显示与地理坐标,时间和用户帐户有关的数据。 从技术上讲,Magnus对位置的纬度和经度,用户帐户ID,时间戳以及这些特定数据之间的关系感兴趣。
Magnus可以给出此数据的图形表示,按地理区域显示用户帐户(也许在任何给定时间带有标记的地图都可以找到一个帐户持有者)。 或者它可以显示帐户持有者/用户如何在给定区域(另一张地图)上移动。 提供此类信息涉及离线发生的类似ETL的过程。 从处理的角度来看,实时生成的数据(生成时)可能过于昂贵。 因此,可以将这些分析视为近实时的 。
为了在Magnus中使用Amazon SQS,我需要进行一些初步设置。 首先,我需要一种获取AWS凭证的方法。 我喜欢Play(请参阅参考资料 ),因此将其用作我的应用程序开发框架。 要获取凭据,我可以使用Play的application.conf
文件,这是一个自动读取的属性文件。
#AWS configuration
aws_access_key_id=1S..........MR2
aws_secret_access_key=S3.........ZM
定义属性后,可以通过调用Play的Play
对象轻松获得它们,如清单6所示:
public class Application extends Controller {
private static final String AWS_KEY =
Play.configuration.get("aws_access_key_id").toString();
private static final String AWS_SECRET =
Play.configuration.get("aws_secret_access_key").toString();
//....
}
定义好管道之后,我就可以开始做生意了。 清单7中的代码与我上个月在Amazon Elastic Beanstalk的介绍中使用的代码段相似。 在这种情况下,我只是用一些代码更新了saveLocation
,以将一个简单的JSON文档放置到名为“ locations_queue
”的队列中。 JSON基本上是这样的: {"id":"4d6baeb52a54f1000001"}
。 提供了已保存位置的ID,以供邮件的收件人查找和分析。
public static void saveLocation(String id, JsonObject body) throws Exception {
String eventname = body.getAsJsonPrimitive("name").getAsString();
double latitude = body.getAsJsonPrimitive("latitude").getAsDouble();
double longitude = body.getAsJsonPrimitive("longitude").getAsDouble();
String when = body.getAsJsonPrimitive("timestamp").getAsString();
SimpleDateFormat formatter =
new SimpleDateFormat("dd-MM-yyyy HH:mm");
Date dt = formatter.parse(when);
ObjectId oid = new Location(id, dt, latitude, longitude).save();
AmazonSQS sqs = new AmazonSQSClient(new BasicAWSCredentials(AWS_KEY, AWS_SECRET));
Map mp = new HashMap<String, String>();
mp.put("id", oid.toString());
String url = sqs.createQueue(new CreateQueueRequest("locations_queue")).getQueueUrl();
sqs.sendMessage(new SendMessageRequest(url, new Gson().toJson(mp)));
renderJSON(getSuccessMessage());
}
现在将消息放置在SQS队列上,我需要将它们从队列中弹出并进行一些处理。 记得,MOM的优点之一是它允许异构体系结构。 为此,可以使用Java语言以外的其他语言编写SQS阅读器端,甚至可以在其他平台上运行!
因为我基本上可以用自己喜欢的任何方式进行分析处理,所以我将使用Ruby进行处理-以赢得一些酷孩子的认可。
在清单8中,我获得了right_aws
Ruby gem的帮助来协助我使用SQS。 在许多方面,您都可以将gem视为jar文件。 right_aws
库与Amazon Java版的SDK非常相似,尽管它不那么冗长,但使用起来却更加简单。
require "right_aws"
#...
sqs = RightAws::SqsGen2.new(aws_access_key_id, aws_secret_access_key)
queue = sqs.queue('locations_queue')
如您所见,清单8中的两行相关代码建立了与AWS的连接,并抓住了名为'locations_queue'
队列的句柄。
接下来,我将使用轮询机制,如清单9所示。对@queue
的引用与清单8中的queue
变量相同。但是,在这种情况下,它已被定义为类的一部分。 因此,在清单9中,我直接使用Ruby的@
语法引用一个实例变量。
def process_messages()
while true
msg = @queue.pop
if !msg.nil?
handle_message(msg) # impl of which does neat stuff
msg.delete
else
sleep 10
end
end
end
将消息传递给handle_message
方法后,可以删除它。 如果未找到消息,则主线程Hibernate10秒钟。 该行!msg.nil?
与Java代码中的msg != null
相同。 但是,在Ruby中,即使null
也是对象。 询问对象是否为nil
类型(通过nil?
方法调用)将返回布尔值。
由于AWS是一种Web服务产品,因此许多平台库都可以访问和利用它。 在Magnus中,您可以看到由此带来的灵活性:我能够使用Java代码将消息推送到SQS队列中,然后使用一个小的Ruby程序将其弹出。 采用队列的体系结构的优点之一是组件的隐式解耦。
正如在GAE或Amazon的Elastic Beanstalk上托管Web应用程序通常很有意义,利用云消息传递系统也很有意义。 亚马逊的SQS减轻了安装和维护排队系统的负担。 您只需创建一个队列,然后在其上拖放并检索消息。 让亚马逊担心其余的事情。
翻译自: https://www.ibm.com/developerworks/java/library/j-javadev2-17/index.html