当前位置: 首页 > 知识库问答 >
问题:

如何读取Kinesis数据流中最旧的未处理记录

缑智敏
2023-03-14

我是AWS的新手,希望得到一些指导。

我想处理最古老的未处理记录,但似乎无法正确获取参数。

当前架构

对于碎片迭代器:

  • 我试过TRIM_HORIZON从一开始就给了我所有的记录。
  • 我也试过LATEST,它只给了我一张最新的唱片。

不确定这些额外的细节是否有帮助,但。。。

  • 我通过Lambda将自己的记录放在AWS控制台上

提前感谢!

共有1个答案

訾凯歌
2023-03-14

没有“最旧的未处理记录”,因为Kinesis不知道你处理了什么(例如,你可能已经获取了记录,但没有对它们做任何事情)。

如果您使用的是Kinesis,我强烈建议您使用Kinesis客户端库,它具有检查点的概念-这些基本上是一个很好的包装器,位于ShardIterator AFTER\u SEQUENCE\u NUMBER之上,它可以翻译为“最旧的未选中记录”-或者尽可能接近“最旧的未处理记录”。

(您可以自己实现这个逻辑,但为什么不重用Amazon已经为您完成的工作呢?)

 类似资料:
  • 我想从Amazon Kinesis流中获取最新记录。我打算从中提取时间戳,并将其与消费者应用程序检查指向的最后一条记录的时间戳进行比较,以检查消费者是否落后。 我不能使用最新的shard迭代器类型。这是因为LATEST指向最近的记录之后,因此它不能用于访问最近的记录。 有没有简单的方法可以获得最新记录? 我正在考虑的一种方法是获取消费者最近处理的记录序列号的碎片迭代器,使用该碎片迭代器发出GetR

  • 我试图使用类似于https://github.com/aws-sample/amazon-kinesis-learning的Kinesis客户端库来使用Kinesis数据流。但在这个例子中,他们计划了这个过程。我想消费没有调度器传入的记录。 我不想使用DynamoDB,CloudWatch。期望一个简单的使用者使用流中的记录 有没有什么方法可以在没有调度程序的情况下使用java处理记录

  • 我有一个kinesis流,有一个碎片和一个用Python编写的lambda函数。我添加了kinesis流作为批量大小为5的事件源。我在kinesis中添加了数百条记录,lambda函数得到了正确的调用和执行。但是对于最后3条记录,lambda函数被无限地调用,即使函数返回是成功的。

  • 我们有一个带有三个分片的运动流,我们的运动应用程序有三个实例。我们可以看到记录被发布到我们的所有三个分片,但我们的运动应用程序只能处理来自一个分片的记录。监听其他两个分片的工人经常会睡着。 知道是什么原因吗?

  • 我正在探索AWS Kinesis的数据处理要求,该要求用基于流的方法取代旧的批处理ETL处理。 该项目的关键要求之一是在以下情况下重新处理数据的能力: 发现并修复错误,然后重新部署应用程序。数据需要从一开始就重新处理 这里很好地记录了Kafka-https://cwiki.apache.org/confluence/display/KAFKA/Kafka流数据(重新)处理场景的场景。 我在Kine

  • 用例: 每个用户都有一个项目列表。当在UI中创建一个新项目时,它会被添加到DynamoDB,然后再次获取整个列表,以显示新的结果。 问题是,在某些情况下,新添加的项不会出现在该列表中,这是基于索引(基于该用户ID的索引)的最终一致读取。 处理这种情况的正确方法是什么? “item”表将“itemId”作为索引的主分区键,“userId”作为索引的主分区键(在读取列表时查询)。