当前位置: 首页 > 知识库问答 >
问题:

AWS SQS/Lambda->一次处理数据1记录,消息发送或同步之间有延迟

能业
2023-03-14

我正在寻找一个使用SQS(或任何其他AWS服务)实现以下功能的解决方案:

用例二:如果我有X(每秒1-10条消息)从lambda函数发送到SQS队列的消息量。我想从SQS队列同步地将消息1乘1发送到另一个lambda函数。例如,我的lambda函数向SQS队列发送消息;它每0.1秒发送1条消息。所以每秒发送10条消息。我希望我的SQS队列一次发送1条消息到我的目标函数进行处理,并且只有在前一条消息处理完毕时才发送下一条消息。所以在这种情况下,第一条消息被直接处理,第二条消息在队列中,直到第一条消息完成,一旦完成,第二条消息被发送,依此类推。

用例二:如果我有X(每秒1-10条消息),那么从lambda函数发送到SQS队列的消息量。我想从SQS队列向另一个lambda函数发送消息,每个消息发送之间有一个延迟。例如,消息1被发送到我的目的地,消息2被发送3秒后,消息3被发送3秒后,等等。我在这里选择3秒,因为这是我的目的地进程运行所需的最长时间。

TL;DR I需要一种方法,一次只从队列发送一条记录;每条消息之间的延迟时间为3秒,或者直到前一条消息完成处理。

到目前为止我做了什么?

我以为创建一个3秒延迟的先进先出队列会在这里起作用?

  GSSAREventQueue:
    Type: AWS::SQS::Queue
    Properties: 
      ContentBasedDeduplication: false
      DeduplicationScope: messageGroup
      FifoThroughputLimit: perMessageGroupId 
      DelaySeconds: 3
      FifoQueue: true
      ReceiveMessageWaitTimeSeconds: 3
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt MyDeadLetterQueue.Arn
        maxReceiveCount: 10

  MyDeadLetterQueue: 
    Type: AWS::SQS::Queue
    Properties: 
      FifoQueue: true
 

但当我查看数据发送到的lambda的日志时:

2021-09-30T14:49:28.816+01:00   END RequestId: 5f9e784c-bba1-56f5-84e7-2183793ac607

2021-09-30T14:49:28.816+01:00   REPORT RequestId: 5f9e784c-bba1-56f5-84e7-2183793ac607 Duration: 217.39 ms Billed Duration: 218 ms Memory Size: 128 MB Max Memory Used: 101 MB XRAY TraceId: 1-6155c068-6565d6de237fd0170fb290d0 SegmentId: 11f9660438a56763 Sampled: true

2021-09-30T14:49:30.808 01:00启动请求ID:7d98c954-47c6-5b7f-8665-909b855be17b版本:$LATEST

没有字面上的延迟。我确保在我的Lambda上以1为批次发送数据

      Events:
        UserEvent:
          Type: SQS
          Properties:
            BatchSize: 1
            Queue: !GetAtt GSSAREventQueue.Arn

我可能在这里做错了什么;我的用例可能吗?

我认为可能发生的事情:

SQS在每批消息之间延迟3秒发送数据?也许它一次发送10条消息,因为我在lambda上设置了Batchsize: 1,所以它一次通过1发送数据?

--

需要这样做的主要原因是我正在从DynamoDB表中的同一条记录读取/写入数据;如果lambda同时收到10条消息,而10条lambda分别处理,那么这些数字就错了。

共有1个答案

汪晟睿
2023-03-14

您可以使用lambda的保留并发功能。https://docs.aws.amazon.com/lambda/latest/dg/configuration-concurrency.html

对于目标lambda,可以将reserved concurrency的值设置为1,将batch size设置为1。这将确保一次只运行一个目标lambda实例,并且一次从SQS接收一条消息。

 类似资料:
  • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。

  • 在阅读artemis时,docs理解-artemis在内存中存储整个当前活动消息,并可以根据设置将消息卸载到给定队列/主题的分页区域&artemis日志只追加。 关于这一点 代理如何以及何时从日记同步消息(仅在重新启动期间?) 它如何标识要从日记中删除的消息(例如:如果日记是仅追加模式,如果持久消息的使用者访问消息,那么代理如何从日记中删除一条消息而不保留索引)。 将每个活动消息保存在内存中,甚至

  • 主要内容:1 invokeOneway单向发送,1.1 invokeOnewayImpl单向调用,2 sendMessageSync同步发送,2.1 invokeSync同步调用,3 sendMessageAsync异步发送消息,3.1 invokeAsync异步调用,3.2 onExceptionImpl异常处理,4 NettyClientHandler处理服务端消息,4.1 processResponseCommand处理响应,基于RocketMQ release-4.9.3,深入的介绍了P

  • 我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。 流程如下所示: 我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。 问题是

  • 我有一个AWS Lambda函数,它读取CSV文件并将一组记录保存到SQS队列。将有数千条记录,因此它们不能放在一条消息中。 然后,该队列会触发另一个Lambda函数来处理每条记录。处理每条记录大约需要一秒钟的时间。 处理完所有记录后,我需要发送一封电子邮件。 最好的方法是什么? 我有两个想法: > 将其设为FIFO队列,并向最后一条记录添加一个属性。当我读到那张记录时发送电子邮件。 使用记录总数

  • 如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?