Spring Cloud AWS(1.0.0.rc2)中SimpleMessageListenerContainer类的当前实现似乎会在消息处理程序完成对消息的处理并且方法调用返回之后自动删除消息。
在我们的应用程序中,在从SQS上游队列删除消息之前,我们需要能够处理消息并等待来自下游队列的异步确认。类似于
接收SQS消息->处理消息->将消息发布到RabbitMQ(线程在此完成)
删除SQS消息<-our app<-RabbitMQ消息成功确认(异步)
由于msg ack通过不同的线程异步返回,我们需要在检查成功ack后手动从SQS中删除msg的选项。
理想情况下,SimpleMessageListener应该可以配置它运行的模式(auto delete或manual delete)。
我们非常希望使用spring aws cloud lib(vs推出我们自己的cloud lib)与SQS集成,因为它已经负责监听器容器bean的生命周期管理。
请让我知道上述建议的功能是否被认为是可行的,如果是可行的,何时可以实现和发布。
多谢了。
我们可以再添加一个标志(除了已经存在的deleteMessageOnException标志之外)来完全禁用自动删除消息,即使在成功处理消息时也是如此。我看到的问题是,毒药消息不再被处理,可能会炸毁队列。我在这里为此制造了一个问题。
你的方法会有另一个问题。如果消息删除得不够快(基于可见性超时),它将再次出现在处理程序方法中。
接收SQS MSG1->处理MSG1->将msg1发布到RabbitMQ(线程在此完成)
接收SQS MSG2->处理MSG2->将msg2发布到RabbitMQ(线程在此完成)
接收SQS MSG1->处理MSG1->将msg1发布到RabbitMQ msg1再次出现,因为它未被删除
删除SQS msg1<-our app<-RabbitMQ msg1成功Ack(异步)
编辑
这个问题现在已经解决,可以直接使用@sqslistener
注释定义删除策略,并使用注入的确认对象。见关于这一问题的最后评论
我正在使用Spring Cloud Stream(Edgware.SR5)和Spring Boot(1.5.10.RELEASE)。我的@StreamListener正在处理收到的每条消息两次。 该示例的思想是在队列中发布消息并对其进行处理。 服务: 绑定: application.properties: 配置(用于在测试中注入代理服务): 测试: 我得到了以下输出: 我不知道我的配置有什么问题,
目前我正在使用SQS-Lambda集成 Lambda的并发可用。SQS批次设置为1记录,0延迟。 SQS的可见性超时为15分钟,Lambda最大执行时间为15分钟 我会注意到,有时SQS消息在飞行中被卡住,根本没有被任何Lambda处理(它们在15分钟后落入死信队列,CloudWatch显示没有Lambda被消息调用) 有人面临过同样的问题吗? 我在VPC内部运行Lambda,如果这很重要的话
我正在探索AWS SQS服务。当尝试使用java sdk从队列中删除消息时,我遇到了一些问题。 队列是在SQS中创建的,它有三条消息。该队列由AWS3存储支持,用于处理大型消息。 下面是通过多次轮询接收消息的方法。 日志消息: 我搞不清例外的原因。在上面的使用java SDK的代码片段中,我是否遗漏了什么? 提前感谢任何建议。
当消息到达SQS时,如何触发AWS Lambda函数(Python),并将消息传递给HTTPendpoint,在处理API中的数据后,从SQS队列中删除消息?如何在Python lambda中实现这一点?
我目前正在使用亚马逊的SQS,在尝试删除当前“正在运行”的队列消息时遇到问题。 下面是一些示例代码: 现在,在接收到句柄和消息体之后,我将接收句柄字符串存储到云存储中(例如DynamoDB)。随后,我从存储服务中加载该句柄,并使用类似于以下内容的方式调用delete: 但是,当运行该行时,我收到一条“输入收据句柄无效”的错误消息。 注意,我知道这条消息没有被重新接收,所以记录的接收句柄应该是最新的
我已经部署了一个AWS Lambda函数,它在SQS队列接收到消息时触发。该函数向Rest API发出请求,如果响应不是Ok,则需要再次处理SQS消息。 这就是为什么我需要将消息重新发送到队列,但我更愿意以编程方式删除SQS消息,尽管我找不到如何配置SQS。我尝试过消息保留,但似乎触发器事件会导致消息被删除。