当前位置: 首页 > 知识库问答 >
问题:

订阅SNS主题的AWS Lambda不会处理该主题的所有消息

步胜
2023-03-14

我正在构建一个由亚马逊服务提供支持的警报系统。

我每天将一个文件放到S3上,它生成一个lambda函数(我们称之为生成器函数)来处理该文件。

Generator基于此文件构建警报并将多条消息发布到SNS主题(让我们称之为发件箱)-由Generator计算的每个收件人一条消息。

我在发件箱中订阅了第二个lambda函数(我们称之为Courier),它应该接收每条消息并对其进行处理。

发电机代码:

// 'Generator' function

exports.handler = function (event, context) {
  console.log('Reading options from event:\n', util.inspect(event, {depth: 5}));

  var users = {};
  var userSubscriptions = {};
  var alerts = {};
  var artists = {};
  var tracks = {};

  async.waterfall([
    function downloadSubscribersFile (next) {
      // Do stuff
    },
    function downloadAndFormatActionsFile (next) {
      // Download data file and analyse
    },
    function publishAlerts (next) {
      // Now we have alerts built, we need to mail them out
      var recipients = Object.keys(alerts);

      async.each(recipients, function (recipient, callback) {
        var recipientAlert = alerts[recipient];
        console.log(util.inspect(recipientAlert, { depth: 10 }));

        if (alerts[recipient].actions.artists.length < 1) {
          return callback();
        }

        var params = {
          TopicArn: SNS_TOPIC_ARN,
          Subject: recipient,
          Message: JSON.stringify(recipientAlert)
        };

        sns.publish(params, function (err, data) {
          if (err) {
            console.log(err);
            return callback(err);
          } else {
            console.log('PUBLISHED MESSAGE: \n', util.inspect(data, { depth: 10 }));
            console.log('MESSAGE WAS: \n', util.inspect(params, { depth: 10 }));
          }

          return callback();
        })
      }, function (err) {
        if (err) return next(err);
        next(); 
      })
    }
  ], function (err) {
    if (err) {
      console.log('Error: ', err);
    } else {
      console.log('Process successful');
    }
    context.done();
  })

}

以及其他功能:

// 'Courier' function

console.log('Loading function');

exports.handler = function(event, context) {
    console.log(JSON.stringify(event, null, 2));
    console.log('From SNS:', event.Records[0].Sns.Message);
    context.succeed();
};

当我的Generator函数被调用时,我可以看到12条消息应该发布到SNS主题。发布这些消息时没有记录错误,但Courier函数只触发一次。

我想知道是否有人对此有任何类似的问题,以及我在这里是否遗漏了什么。可能是我在AWS中没有正确配置某些内容,但我非常有信心一切都按应有的方式设置。

更新:

在查看了我试图发送的消息后,SNS接收到的消息似乎是负载最小的消息。我想知道SNS是否能够处理大量针对某个主题的小而频繁的消息。。。?

共有1个答案

空英逸
2023-03-14

是的,SNS可以处理单个主题的高吞吐量。但是,邮件的最大大小为256KB,因此,如果您的邮件大于256KB,则可能是原因。

我看到您的Generator函数正在记录消息,您是否看到12条带有消息ID的记录消息?我看到您有一个变量警报,您希望从中获取多个收件人,但我不知道您在哪里设置它。

我的建议是:在发送消息之前添加更多日志记录,以验证您认为应该发生的事情是否确实发生了。

 类似资料:
  • 是否有一种方法可以使用AWS CLI列出特定SQS队列订阅的所有AWS SNS主题? 我已经知道如何看到一个SNS主题上的订阅者列表,但这只有在我知道SNS主题的名称时才起作用。在这种情况下,我只知道SQS队列的名称。 我在网上搜索什么也找不到。

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我有几个。Net 5.0微服务,RabbitMQ作为消息代理。现在我正在切换到AWS SQS。很少有服务在侦听相同的消息(这是通过RabbitMQ中的Exchange完成的)。在AWS中,这可以通过将SQS队列订阅到SNS主题来实现。我创建了SNS fifo topic和SQS fifo队列,将这些队列订阅到topic。当我将消息直接发布到队列时,一切都会立即工作,但当我将消息发布到SNS主题时,

  • 我有一个将消息发布到AWS SNS主题字符串的NodeJS应用程序和一个AWS SQS订阅。在SQS控制台上,我可以看到发布的消息。但是,我不清楚SQS队列的访问策略。 问题 当将消息传递到SQS队列时,作为订阅的结果,哪个用户是有效的?与发表该主题的人相同? 只有在使用时,才能使消息流入队列。那么,我应该如何定义一个限制性策略,使消息只能作为订阅的结果写入队列? 创建具有权限的队列的AWS SQ

  • 在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的

  • 我想实现一个Amazon SNS主题,它首先将消息传递给一个SQS队列,该队列是该主题的订阅者,然后执行一个AWS Lambda函数,该函数也是同一主题的订阅者。然后Lambda函数可以从SQS队列读取消息,并并行处理其中的几条消息(数百条)。 我的问题是,是否有任何方法可以保证发送到SNS主题的消息会首先被传递到SQS队列,然后才被传递到Lambda函数? 这样做的目的是扩展到大量消息,而不必为