当前位置: 首页 > 知识库问答 >
问题:

如何限制AWS运动未处理记录的数量?

叶鸿
2023-03-14

例如,RabbitMQ有一种设置队列限制的方法。如果达到这一限制,来自出版商的新消息将被拒绝,从而从消费者向生产者施加某种背压。(因为队列中的消息意味着不由使用者处理)。

有没有一种方法可以确保像Kinesis这样的代理的这种行为,在这种行为中,允许消费者拉取消息,而不是像RabbitMQ那样的代理推给他们。

在动觉方面,与Kafka类似,消费者的状态、消费抵消等都保存在另一个实体中,动觉的DynamoDB,我知道这可能会更加棘手,因为有一些未处理的记录限制。

是否有人知道您是否可以使用某些设置,可能是通过使用KCL/KPL客户端库或其他方式?

共有1个答案

邢良才
2023-03-14

没有。遗憾的是,AWS动觉系统没有提供您想要的功能。如果消费者在处理过程中赶不上进度,就无法阻止制作人向动觉流中写入内容。

事实上,这是使用Kinesis的优点之一,它允许在配置的保留时间内免费对数据进行无限制的缓冲。它提供背压的唯一时间是,由于Amazon Kinesis API的限制,生产者写入太多数据的速度太快:https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html

如果你想要一个有限大小的“队列”,也许你想看看AWS SQS,它的下限是12000条机上消息?

如果您确实想使用Kinesis,您可能希望构建一个自定义解决方案将消费者延迟反馈给生产者。例如,在生产者中实现自定义逻辑以使用AWS Cloudwatch(请参阅https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)监控消费者延迟(“MillisBehindLatest”)并在消费者落后时停止。

 类似资料:
  • 我想将来自AWS Kinesis流的消息的处理延迟一个小时。我已将KCL消费者配置为每四分钟读取一批记录,检查每条记录的时间戳,如果任何记录不到一个小时,则停止处理该批次,无需检查点。我希望同一个消费者实例每四分钟重读一次相同的消息,直到整个批次足够旧可以处理,然后检查点消费者。但是,在实践中,消费者只读取一次消息,这意味着它们被忽略,并且在准备好处理时永远不会再次读取。有没有办法将消费者配置为每

  • 问题内容: 有没有一种方法可以限制使用spark sql 2.2.0从jdbc源获取的记录数? 我正在处理将一个大于200M的记录从一个MS Sql Server表移动(和转换)到另一个MS Sql表的任务: 在工作的同时,很明显,它首先要从数据库中加载所有200M条记录,首先要花18分钟的时间,然后将我希望用于测试和开发目的的有限数量的记录返回给我。 在take(…)和load()之间切换会产生

  • 我是AWS的新手,希望得到一些指导。 我想处理最古老的未处理记录,但似乎无法正确获取参数。 当前架构 对于碎片迭代器: 我试过TRIM_HORIZON从一开始就给了我所有的记录。 我也试过LATEST,它只给了我一张最新的唱片。 不确定这些额外的细节是否有帮助,但。。。 我通过Lambda将自己的记录放在AWS控制台上 提前感谢!

  • 问题内容: 我有一个数据库,有两个表和。表中有一个主键,表中有一个外键。 我想在表上创建一个约束,表中最多可以存储5个约束。 我需要知道这种约束的类型,以及如何通过SQL Server中的查询来完成此约束。 问题答案: 没有约束可以强制执行该规则,但是可以使用如下所示的触发器来做到这一点:

  • 我们有一个带有三个分片的运动流,我们的运动应用程序有三个实例。我们可以看到记录被发布到我们的所有三个分片,但我们的运动应用程序只能处理来自一个分片的记录。监听其他两个分片的工人经常会睡着。 知道是什么原因吗?

  • 问题内容: 我要在此处实现的条件是,sqlite数据库仅保存最近的1000条记录。每个记录都有时间戳记。立即生效的低效逻辑之一就是检查记录总数。如果它们超过1000,则只需删除掉到外围的那些。 但是,我将必须对每个INSERT进行此检查,这会使事情效率很低。 问题答案: 您可以为此使用一个隐式的“ rowid”列。 假设您没有以其他方式手动删除行: 您可以使用API函数或 如果您不需要 正好有 1