当前位置: 首页 > 知识库问答 >
问题:

AWS Kinesis NextShardIterator从不为Null

公良英资
2023-03-14

上下文:我正在尝试使用API引用从Kinesis流中获取记录。我正在使用。网络核心(3.1版本)。

我正在使用API将数据写入Kinesis流。这个问题没有任何问题。但是我在阅读数据方面有一些问题。我将getRecord方法放入do-while循环中。while is nextShardIterator值是否为null?但是这个值永远不会为null。我不能打破这个循环。

一些答案包括这样的短语:“NextShardIterator切分中开始顺序读取数据记录的下一个位置。如果设置为null,则切分已关闭,请求的迭代器不会返回更多数据。”

我只有一个流和一个碎片。我放了两张唱片。然后,我执行读取方法。它正在获取这些记录。但在此之后,即使记录被消耗,nextShardIterator也永远不会得到null。

  • 列出流(我只有一个流)

putRecord代码如下:

public async Task<ResponseModel> PutRecord(string orderFlow, string documentId)
        {
            byte[] bytedata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderFlow));
            using MemoryStream memoryStream = new MemoryStream(bytedata);
            var putRecordRequest = new PutRecordRequest();
            putRecordRequest.StreamName = myStreamName;
            putRecordRequest.PartitionKey = "partition" + documentId;
            putRecordRequest.Data = memoryStream;
            try
            {
                var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest);
                return new ResponseModel
                {
                    Data = putRecordResponse,
                    Status = ResponseStatus.Success,
                    Message = "Successfully put record!"
                };
            }
            catch (Exception e)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Error,
                    Message = "PutRecord Error: " + e.Message
                };
            }
        }

但是我在阅读数据方面有一些问题。GetRecord代码如下:

public async Task<ResponseModel> GetRecords()
        {
            var listStreams = await ListStreams();
            if (listStreams.StreamNames.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Stream!"
                };
            }
            myStreamName = listStreams.StreamNames[0];

            var describeStreams = await DescribeStream(listStreams.StreamNames[0]);
            if (describeStreams.StreamDescription.StreamStatus != "ACTIVE")
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Stream status: " + describeStreams.StreamDescription.StreamStatus
                };
            }

            var shards = describeStreams.StreamDescription.Shards;
            if (shards.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Shard (or data)!"
                };
            }
            var shardId = shards[0].ShardId;

            var shardIterator = await GetShardIterator(shardId);
            if (string.IsNullOrWhiteSpace(shardIterator.ShardIterator))
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "ShardIterator is null or empty!"
                };
            }

            var getRecords = await GetRecords(shardIterator.ShardIterator);
            Console.WriteLine("First Iterator: " + shardIterator);
            var dataList = new List<Record>();

            do
            {
                if (getRecords.Records.Count == 0)
                {
                    Console.WriteLine("Records are empty!");
                    var nextShardIterator = GetShardLatest(shardId).Result.ShardIterator;
                    getRecords = await GetRecords(nextShardIterator);
                    Console.WriteLine("Latest Iterator: " + nextShardIterator);
                }
                else
                {
                    Console.WriteLine("We have records!");
                    foreach (var record in getRecords.Records)
                    {
                        dataList.Add(record);
                    }
                    var nextShardIterator = GetShardIteratorWithSequence(shardId, getRecords.Records[getRecords.Records.Count-1].SequenceNumber).Result.ShardIterator;
                    Console.WriteLine("AfterSequence Iterator: " + nextShardIterator);
                    getRecords = await GetRecords(nextShardIterator);
                }
            } while (getRecords.NextShardIterator != null);

            return new ResponseModel
            {
                Data = dataList,
                Status = ResponseStatus.Success,
                Message = "Successfull"
            };
        }

共有1个答案

郎嘉树
2023-03-14

Kinesis流是一个潜在的无限记录序列,可以由多个生产者随时添加到其中。因此,开放流的分片迭代器永远不会为空。

如果您想在到达流的“末尾”时中断循环,请查看Get唱片响应中的MillisBehindLatest字段。引用留档:

值为零表示记录处理已完成,此时没有要处理的新记录。

但是,请注意,新记录可能随时添加。如果您确实中断了循环,请务必保存您处理的最后一条记录中返回的Sequence Number,以便您可以从中断的地方继续。

 类似资料:
  • 问题内容: IPython Notebook带有,可以 将 笔记本 导出 为其他格式。但是,如何在相反方向转换文本?我之所以问,是因为我已经有了不同格式的资料和良好的工作流程,但是我想利用Notebook的交互式环境。 可能的解决方案:可以通过导入文件来创建笔记本,并且文档指出,当将笔记本导出为python脚本时,它将指令嵌入到可用于重新创建笔记本的注释中。但是这些信息带有关于此方法局限性的免责声

  • 问题内容: 我有那个代码 有时,强制转换为无效。当我看到。为什么会这样呢?我使用来自GitHub的SwiftSocket库。对不起我的英语不好。 当我的服务器发送大字符串时会发生这种情况。例如- 如果我收到一个消息对象(我的班级)-一切都会正常。但是,如果我收到4,5,6,…消息对象(我的班级),这有时会起作用。MAGIC :( 新版本的代码 问题答案: 注意:我不会讲Swift。以下代码可能无法

  • 问题内容: 我有以下几行代码 我认为第3行和第4行执行相同的任务,然后为什么编译器在第4行显示错误“类型不匹配:无法从long转换为int” 请帮忙。 问题答案: 这是因为复合赋值运算符会进行隐式转换。 从JLS复合分配运算符: 形式的复合赋值表达式等效于,其中是的类型,不同之处在于该表达式仅被评估一次。 对于二进制运算符,则必须显式进行强制转换。进行第四项作业: 它会工作。这就是您的复合赋值表达

  • 问题内容: 我正在使用ZXING库在JAVA中创建QR码生成器程序。该程序是 编译该程序时,出现类型不匹配错误, 在这条线 请帮忙!!! 问题答案: 我自己从未使用过该库,但是阅读错误消息时,我会假设您必须要以字节为单位存储字节的问题。问题将是一个字节由多个位组成,因此您不能仅通过一位表示一个字节。 将编码数据存储到ByteMatrix中,然后阅读以下内容: 使用zxing进行QR码编码和解码 完

  • macOS Sierra附带PHP5.6,我希望通过从源代码构建将其升级到最新的稳定版本PHP(7.1.4)。我所做的实际上是一个新的安装。首先,我从系统中删除了旧的PHP,然后从PHP下载源代码。net并构建它。除了Apache之外,一切都很好。 顺便说一下,在我构建PHP之前,我必须安装和。PHP 7不只是接受操作系统默认自带的下的相同包。 执行configtest时,我收到以下错误: htt

  • 问题内容: 我正在尝试将String转换为Clob以便存储在数据库中。我有以下代码: 当我运行此命令时,没有设置Clob,输出如下: 我到处都说要使用setString方法,但我不知道为什么这对我不起作用。 问题答案: 您不需要中间实例,只需 在上使用: