我正在构建一个由亚马逊服务提供支持的警报系统。
我每天将一个文件放到S3上,它生成一个lambda函数(我们称之为生成器函数)来处理该文件。
Generator基于此文件构建警报并将多条消息发布到SNS主题(让我们称之为发件箱)-由Generator计算的每个收件人一条消息。
我在发件箱中订阅了第二个lambda函数(我们称之为Courier),它应该接收每条消息并对其进行处理。
发电机代码:
// 'Generator' function
exports.handler = function (event, context) {
console.log('Reading options from event:\n', util.inspect(event, {depth: 5}));
var users = {};
var userSubscriptions = {};
var alerts = {};
var artists = {};
var tracks = {};
async.waterfall([
function downloadSubscribersFile (next) {
// Do stuff
},
function downloadAndFormatActionsFile (next) {
// Download data file and analyse
},
function publishAlerts (next) {
// Now we have alerts built, we need to mail them out
var recipients = Object.keys(alerts);
async.each(recipients, function (recipient, callback) {
var recipientAlert = alerts[recipient];
console.log(util.inspect(recipientAlert, { depth: 10 }));
if (alerts[recipient].actions.artists.length < 1) {
return callback();
}
var params = {
TopicArn: SNS_TOPIC_ARN,
Subject: recipient,
Message: JSON.stringify(recipientAlert)
};
sns.publish(params, function (err, data) {
if (err) {
console.log(err);
return callback(err);
} else {
console.log('PUBLISHED MESSAGE: \n', util.inspect(data, { depth: 10 }));
console.log('MESSAGE WAS: \n', util.inspect(params, { depth: 10 }));
}
return callback();
})
}, function (err) {
if (err) return next(err);
next();
})
}
], function (err) {
if (err) {
console.log('Error: ', err);
} else {
console.log('Process successful');
}
context.done();
})
}
以及其他功能:
// 'Courier' function
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, 2));
console.log('From SNS:', event.Records[0].Sns.Message);
context.succeed();
};
当我的Generator函数被调用时,我可以看到12条消息应该发布到SNS主题。发布这些消息时没有记录错误,但Courier函数只触发一次。
我想知道是否有人对此有任何类似的问题,以及我在这里是否遗漏了什么。可能是我在AWS中没有正确配置某些内容,但我非常有信心一切都按应有的方式设置。
更新:
在查看了我试图发送的消息后,SNS接收到的消息似乎是负载最小的消息。我想知道SNS是否能够处理大量针对某个主题的小而频繁的消息。。。?
是的,SNS可以处理单个主题的高吞吐量。但是,邮件的最大大小为256KB,因此,如果您的邮件大于256KB,则可能是原因。
我看到您的Generator函数正在记录消息,您是否看到12条带有消息ID的记录消息?我看到您有一个变量警报
,您希望从中获取多个收件人,但我不知道您在哪里设置它。
我的建议是:在发送消息之前添加更多日志记录,以验证您认为应该发生的事情是否确实发生了。
是否有一种方法可以使用AWS CLI列出特定SQS队列订阅的所有AWS SNS主题? 我已经知道如何看到一个SNS主题上的订阅者列表,但这只有在我知道SNS主题的名称时才起作用。在这种情况下,我只知道SQS队列的名称。 我在网上搜索什么也找不到。
我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到
我有几个。Net 5.0微服务,RabbitMQ作为消息代理。现在我正在切换到AWS SQS。很少有服务在侦听相同的消息(这是通过RabbitMQ中的Exchange完成的)。在AWS中,这可以通过将SQS队列订阅到SNS主题来实现。我创建了SNS fifo topic和SQS fifo队列,将这些队列订阅到topic。当我将消息直接发布到队列时,一切都会立即工作,但当我将消息发布到SNS主题时,
我有一个将消息发布到AWS SNS主题字符串的NodeJS应用程序和一个AWS SQS订阅。在SQS控制台上,我可以看到发布的消息。但是,我不清楚SQS队列的访问策略。 问题 当将消息传递到SQS队列时,作为订阅的结果,哪个用户是有效的?与发表该主题的人相同? 只有在使用时,才能使消息流入队列。那么,我应该如何定义一个限制性策略,使消息只能作为订阅的结果写入队列? 创建具有权限的队列的AWS SQ
在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的
我想实现一个Amazon SNS主题,它首先将消息传递给一个SQS队列,该队列是该主题的订阅者,然后执行一个AWS Lambda函数,该函数也是同一主题的订阅者。然后Lambda函数可以从SQS队列读取消息,并并行处理其中的几条消息(数百条)。 我的问题是,是否有任何方法可以保证发送到SNS主题的消息会首先被传递到SQS队列,然后才被传递到Lambda函数? 这样做的目的是扩展到大量消息,而不必为