当前位置: 首页 > 知识库问答 >
问题:

在AWS Kinesis中,如果我们使用过时/过期的SequenceNumber调用GetShardIterator,会发生什么?

长孙雅志
2023-03-14

通常,我们使用上次读取记录的序列号调用GetShardIterator(如果我们之前的ShardIterator已过期)。

假设序列号属于保留期内(即默认24小时)的有效记录。

但是如果它在Kinesis保留期之外(即25小时前)怎么办?那么该记录/序列号将从流中删除。

GetShardIterator会抛出异常吗?什么样的异常?还是不返回记录?

共有1个答案

商华藏
2023-03-14

这对我来说很有趣,我试过了。

TL;DR:它的工作原理和我预期的一样:从一个经过修剪视界的序列号开始,相当于从修剪视界开始。

为了测试,昨天早上我在一条专用流上发布了一条记录:

aws kinesis put-record --stream-name test-expiration --partition-key irrelevant --data "this is a test"
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49616057638370363251266361760650016619879524195517857794"
}

然后我等了将近24个小时(幸好我今天早上没有决定睡觉),然后运行了一个我编写的实用程序来验证记录是否仍在流中:

> kinesis_reader.py test-expiration TRIM_HORIZON 1
{"SequenceNumber": "49616057638370363251266361760650016619879524195517857794", "ApproximateArrivalTimestamp": "2021-03-04T11:33:13.254000+00:00", "Data": "this is a test", "PartitionKey": "irrelevant"}

最后,我从该实用程序中获取代码,将其放入Jupyter Notebook,并在记录在流中超过24小时后执行它:

>

client = boto3.client('kinesis')

stream_name = "test-expiration"
shard_id = "shardId-000000000000"
sequence_number ="49616057638370363251266361760650016619879524195517857794"

resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER', StartingSequenceNumber=sequence_number)
shard_itx = resp['ShardIterator']

这返回了一个迭代器(我将省略它,因为它有很多不透明的文本)。它想知道是否会抛出,但没有与过时迭代器相对应的记录异常。

使用此迭代器检索记录:

client.get_records(ShardIterator=shard_itx)
{'Records': [],
 'NextShardIterator': 'AAAAAAAAAAE8Pi3/Ykdggje538B61BxObso1tCZAK4MJIGMc//IGiqJlNdUz2PgTGXhMAW3GLJIFSsaSmWW72Y2qBuwk8+WvKse0Al8DhjBNUmCdB5T/FbUa/67NeUjgSsktcke3ZiCs+rnHXFkAv08rR8egQsJCDmcHkELeEKTaa5pnlMB9kUDB+NT+yFCO7oFNaDdz4OUSH094IN0+Y/w6n5K+XTLsVvhPmM6pYdTv2xllzJJnTA==',
 'MillisBehindLatest': 44741000,
 'ResponseMetadata': {'RequestId': 'fd58bcf1-6596-0186-a5e4-a7359063274d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'fd58bcf1-6596-0186-a5e4-a7359063274d',
   'x-amz-id-2': 'jK9tGfx5eSyi5ysHhnANVn0IvJrwWwYzbxRGTRyFnk1OgjfQ+D2KtzqfF3FXVg5wwBH0m/QBoXdwJ+cEQSeBCktkKgFWOUx5',
   'date': 'Fri, 05 Mar 2021 11:44:04 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '315'},
  'RetryAttempts': 0}}

如您所见,响应中没有记录。

令人惊讶的是,它只表明我比我今天早上添加的最新记录落后了44741000毫秒。我本以为有一天会接近8640000毫秒。

作为最后一个实验,我写了一个循环,计算我必须阅读流多少次才能找到今天早上放在流上的记录(到现在为止,已经有半个小时了):

count = 0

while True:
    count += 1
    resp = client.get_records(ShardIterator=shard_itx)
    print(f"{count}: {resp['MillisBehindLatest']} millis behind latest")
    if resp['Records']:
        print(resp)
        break
    shard_itx = resp['NextShardIterator']

答案是:99读取,碎片迭代器每次前进约500秒。

我将把这个流保留一段时间:我想看看Kinesis是否会更新其内部指针,以便后续请求返回更接近当前时间的碎片html" target="_blank">迭代器。

更新

我再次运行了这段代码,大约比第一次尝试晚了一个小时。当我使用迭代器检索记录时,它错误地告诉我我比最晚的时间晚了0毫秒。随后的检索(使用第一个迭代器)报告了49915000。

寓意:除非你一直在积极处理记录,否则不要依赖MillisBehindLatest。

 类似资料:
  • 问题内容: 如果我提交的表单的操作字段为空,则它提交到当前页面-ajax请求就是这种情况吗? 问题答案: 没错,它会提交到当前页面。 参考,jQuery文档: url (字符串) 默认值:当前页面 一个字符串,其中包含将请求发送到的URL。 资源

  • 在mclients.get(0).send(msg1)行中,我正在使索引脱离界限异常。mClients是附加到此服务并在绑定过程中存储的客户端数组。 代码与链接远程信使服务示例部分http://developer.android.com/reference/android/app/service.html完全相同,只是我在服务中添加了一个onStartCommand

  • 问题内容: 假设我编写了一些执行AJAX调用的JavaScript,并将其作为一种回调方法在AJAX成功时执行。 假设异步调用时在我的页面上调用了其他一些JavaScript方法。 一个操作是否优先于另一个操作?它们都同时运行吗?怎么了? 问题答案: 假设异步调用时在我的页面上调用了其他一些JavaScript方法。 一个操作是否优先于另一个操作?它们都同时运行吗?怎么了? 浏览器上的JavaSc

  • 在django模型中使用“on_delete=models.CASCADE”时会发生什么

  • 问题内容: 以下代码导致死锁(在我的电脑上): 但是,如果我将reducelambda参数替换为匿名类,则不会导致死锁: 你能解释一下这种情况吗? P.S. 我发现该代码(与之前的代码有些不同): 工作不稳定。在大多数情况下,它挂起了,但是有时它成功完成了: 在此处输入图片说明 我真的不明白为什么这种行为不稳定。实际上,我重新测试了第一个代码段,并且行为相同。因此,最新的代码等于第一个。 为了了解

  • 问题内容: 想象一下,一个需要花费很长时间才能运行的python脚本,如果我在运行时对其进行修改,会发生什么?结果会有所不同吗? 问题答案: 没什么,因为Python将您的脚本预编译为PYC文件并启动它。 但是,如果发生某种异常,您可能会得到有点误导的解释,因为 X 行的代码可能与启动脚本之前的代码不同。