通常,我们使用上次读取记录的序列号调用GetShardIterator(如果我们之前的ShardIterator已过期)。
假设序列号属于保留期内(即默认24小时)的有效记录。
但是如果它在Kinesis保留期之外(即25小时前)怎么办?那么该记录/序列号将从流中删除。
GetShardIterator会抛出异常吗?什么样的异常?还是不返回记录?
这对我来说很有趣,我试过了。
TL;DR:它的工作原理和我预期的一样:从一个经过修剪视界的序列号开始,相当于从修剪视界开始。
为了测试,昨天早上我在一条专用流上发布了一条记录:
aws kinesis put-record --stream-name test-expiration --partition-key irrelevant --data "this is a test"
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49616057638370363251266361760650016619879524195517857794"
}
然后我等了将近24个小时(幸好我今天早上没有决定睡觉),然后运行了一个我编写的实用程序来验证记录是否仍在流中:
> kinesis_reader.py test-expiration TRIM_HORIZON 1
{"SequenceNumber": "49616057638370363251266361760650016619879524195517857794", "ApproximateArrivalTimestamp": "2021-03-04T11:33:13.254000+00:00", "Data": "this is a test", "PartitionKey": "irrelevant"}
最后,我从该实用程序中获取代码,将其放入Jupyter Notebook,并在记录在流中超过24小时后执行它:
>
client = boto3.client('kinesis')
stream_name = "test-expiration"
shard_id = "shardId-000000000000"
sequence_number ="49616057638370363251266361760650016619879524195517857794"
resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER', StartingSequenceNumber=sequence_number)
shard_itx = resp['ShardIterator']
这返回了一个迭代器(我将省略它,因为它有很多不透明的文本)。它想知道是否会抛出,但没有与过时迭代器相对应的记录异常。
使用此迭代器检索记录:
client.get_records(ShardIterator=shard_itx)
{'Records': [],
'NextShardIterator': 'AAAAAAAAAAE8Pi3/Ykdggje538B61BxObso1tCZAK4MJIGMc//IGiqJlNdUz2PgTGXhMAW3GLJIFSsaSmWW72Y2qBuwk8+WvKse0Al8DhjBNUmCdB5T/FbUa/67NeUjgSsktcke3ZiCs+rnHXFkAv08rR8egQsJCDmcHkELeEKTaa5pnlMB9kUDB+NT+yFCO7oFNaDdz4OUSH094IN0+Y/w6n5K+XTLsVvhPmM6pYdTv2xllzJJnTA==',
'MillisBehindLatest': 44741000,
'ResponseMetadata': {'RequestId': 'fd58bcf1-6596-0186-a5e4-a7359063274d',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': 'fd58bcf1-6596-0186-a5e4-a7359063274d',
'x-amz-id-2': 'jK9tGfx5eSyi5ysHhnANVn0IvJrwWwYzbxRGTRyFnk1OgjfQ+D2KtzqfF3FXVg5wwBH0m/QBoXdwJ+cEQSeBCktkKgFWOUx5',
'date': 'Fri, 05 Mar 2021 11:44:04 GMT',
'content-type': 'application/x-amz-json-1.1',
'content-length': '315'},
'RetryAttempts': 0}}
如您所见,响应中没有记录。
令人惊讶的是,它只表明我比我今天早上添加的最新记录落后了44741000毫秒。我本以为有一天会接近8640000毫秒。
作为最后一个实验,我写了一个循环,计算我必须阅读流多少次才能找到今天早上放在流上的记录(到现在为止,已经有半个小时了):
count = 0
while True:
count += 1
resp = client.get_records(ShardIterator=shard_itx)
print(f"{count}: {resp['MillisBehindLatest']} millis behind latest")
if resp['Records']:
print(resp)
break
shard_itx = resp['NextShardIterator']
答案是:99读取,碎片迭代器每次前进约500秒。
我将把这个流保留一段时间:我想看看Kinesis是否会更新其内部指针,以便后续请求返回更接近当前时间的碎片html" target="_blank">迭代器。
更新
我再次运行了这段代码,大约比第一次尝试晚了一个小时。当我使用迭代器检索记录时,它错误地告诉我我比最晚的时间晚了0毫秒。随后的检索(使用第一个迭代器)报告了49915000。
寓意:除非你一直在积极处理记录,否则不要依赖MillisBehindLatest。
问题内容: 如果我提交的表单的操作字段为空,则它提交到当前页面-ajax请求就是这种情况吗? 问题答案: 没错,它会提交到当前页面。 参考,jQuery文档: url (字符串) 默认值:当前页面 一个字符串,其中包含将请求发送到的URL。 资源
在mclients.get(0).send(msg1)行中,我正在使索引脱离界限异常。mClients是附加到此服务并在绑定过程中存储的客户端数组。 代码与链接远程信使服务示例部分http://developer.android.com/reference/android/app/service.html完全相同,只是我在服务中添加了一个onStartCommand
问题内容: 假设我编写了一些执行AJAX调用的JavaScript,并将其作为一种回调方法在AJAX成功时执行。 假设异步调用时在我的页面上调用了其他一些JavaScript方法。 一个操作是否优先于另一个操作?它们都同时运行吗?怎么了? 问题答案: 假设异步调用时在我的页面上调用了其他一些JavaScript方法。 一个操作是否优先于另一个操作?它们都同时运行吗?怎么了? 浏览器上的JavaSc
在django模型中使用“on_delete=models.CASCADE”时会发生什么
问题内容: 以下代码导致死锁(在我的电脑上): 但是,如果我将reducelambda参数替换为匿名类,则不会导致死锁: 你能解释一下这种情况吗? P.S. 我发现该代码(与之前的代码有些不同): 工作不稳定。在大多数情况下,它挂起了,但是有时它成功完成了: 在此处输入图片说明 我真的不明白为什么这种行为不稳定。实际上,我重新测试了第一个代码段,并且行为相同。因此,最新的代码等于第一个。 为了了解
问题内容: 想象一下,一个需要花费很长时间才能运行的python脚本,如果我在运行时对其进行修改,会发生什么?结果会有所不同吗? 问题答案: 没什么,因为Python将您的脚本预编译为PYC文件并启动它。 但是,如果发生某种异常,您可能会得到有点误导的解释,因为 X 行的代码可能与启动脚本之前的代码不同。