当前位置: 首页 > 知识库问答 >
问题:

如何延迟AWS运动信息的处理?

南宫博简
2023-03-14

我想将来自AWS Kinesis流的消息的处理延迟一个小时。我已将KCL消费者配置为每四分钟读取一批记录,检查每条记录的时间戳,如果任何记录不到一个小时,则停止处理该批次,无需检查点。我希望同一个消费者实例每四分钟重读一次相同的消息,直到整个批次足够旧可以处理,然后检查点消费者。但是,在实践中,消费者只读取一次消息,这意味着它们被忽略,并且在准备好处理时永远不会再次读取。有没有办法将消费者配置为每次都重新阅读来自最后一个检查点的所有消息?

共有1个答案

贝成业
2023-03-14

我很喜欢AWS Kinesis Stream的开箱即用功能(一种可能延迟事件交付的配置)。如果不这样做,有一种方法可以延迟事件的处理,代价是浪费计算。

使用SQS(或FIFO SQS,如果您关心事件排序)而不是Kinesis,或在Kinesis Stream上使用AWS Lambda将事件传输到SQS。SQS支持将消息的传递延迟至15分钟。由于您需要延迟为60分钟,因此您可以运行另一个Lambda(或您自己的SQS消费者)来处理消息。在第一次将消息传递给Lambda(或您的SQS消费者)时,不要处理消息,只需将消息的可见性超时设置为45分钟(总计您所需的60分钟延迟)。仅在第二次收到SQS消息后才处理它。您可以检查消息之前已传递了多少次,以决定是要处理还是跳过处理消息。

 类似资料:
  • 我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。 流程如下所示: 我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。 问题是

  • 我有一个基于Spring boot的KStreams应用程序,我在其中加入跨多个主题的数据。当一个主题出现延迟时,处理情况的最佳实践是什么?我读过一些链接,比如如何管理Kafka KStream到KStream窗口连接?和其他人。 下面是我的示例代码(Spring Boot应用程序),用于为两个主题--雇员和财务--生成模拟数据。下面是员工主题的代码: 对于金融主题也是如此:

  • 今天,我使用Spring Cloud Streams和RabbitMQ,根据本文档编写了以下代码: 我的接口: 和我的属性文件:

  • 我们正在使用Rabbitmq-server_3.5.7及其相应的延迟消息交换插件 到目前为止,我们一直在使用直接交换并控制客户端应用程序上的消息生成器的延迟(并跟踪当前队列中的消息数量)。 我们已经开始成功使用延迟交换,但我们想知道如何检查当前延迟的消息数量(这是等待路由到队列)。 兔子是否提供了一种知道这一点的方法?是否有任何其他方法可以访问此信息? 谢啦!

  • 问题内容: 这是我的情况: 我有一个包含用户列表的页面。我通过Web界面创建一个新用户,并将其保存到服务器。服务器在elasticsearch中为文档建立索引并成功返回。然后,我被重定向到不包含新用户的列表页面,因为它可能需要1秒钟的时间才能使文档在Elasticsearch中可供搜索 elasticsearch中的近实时搜索。 elasticsearch指南说您可以手动刷新索引,但说在生产中不要

  • 如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?