当前位置: 首页 > 知识库问答 >
问题:

使用Javascript SDK时,如何在Amazon Kinesis consumer中获得最新记录?

蒋泰
2023-03-14

我正在使用AWS Javascript SDK从Kinesis数据流中消费。我想获取碎片中的最新记录。

当我将ShardIterator Type设置为“最新”时,我不会取回任何记录。但是,当我使用“TRIM_HORIZON”时,我会取回所有记录。

kinesis.describeStream(describeParams, function(err, data) {
            if (err) {
                console.log(err, err.stack);    // an error occurred
            }
            else {
                var getParams = {
                    ShardId: data.StreamDescription.Shards[0].ShardId,
                    ShardIteratorType: "TRIM_HORIZON",     //get oldest package
                    StreamName: streamName,
                };

                if(shardIteratorType){
                    console.log("you have passed some shardIteratorType:" + shardIteratorType);
                    getParams.ShardIteratorType = shardIteratorType;
                }

                kinesis.getShardIterator(getParams, function(err, result) {
                    if (err) {
                        console.log("Error in getShardIterator()");
                        console.log(err);
                    } else {
                        console.log("calling getRecord with shard iterator");
                        // Get records from the Kinesis stream
                        getRecord(result.ShardIterator);
                    }
                });
            }
        });

function getRecord(shard_iterator) {
    console.log("getRecord was called.");
    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {

            if(result.Records.length > 0) {
                    // Loop through all the packages
               for(var i = 0; i < result.Records.length; i++) {
                   if(result.Records[i] != undefined) {
                        var getData = JSON.parse( decodeURIComponent
(escape(result.Records[i].Data)));
                        console.log(getData);
                        var table = document.getElementById("myTable");
                        var row = table.insertRow(0);
                        var j = i + 1 ;
                        var cell1 = row.insertCell(0);


                        cell1.innerHTML = getData ;

                    }
                }
            }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }
        }
    });
}

对于shard迭代器类型,我希望使用“latest”而不是“TRIM_HORIZON”来获得最新的记录,而不是所有的历史记录。我做错了什么?

共有1个答案

东郭翰音
2023-03-14

来自文件https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

请求中,您可以指定分片迭代器类型AT_TIMESTAMP从任意时间点读取记录,TRIM_HORIZON使ShardIterator指向系统分片中最后一条未修剪的记录(分片中最旧的数据记录),或LATEST,以便您始终读取分片中的最新数据。

这里的最新意味着“现在”跳过最近检查点和现在之间的所有记录。

使用LATEST作为shard迭代器类型,可以认为它读取了在使用getShardIterator函数返回的shard迭代器之后立即出现的记录。

您可以在下面参考它如何与“最新”一起工作的示例:

kinesis.describeStream(describeParams, (err, streamData) => {
    // Skipping error handling

    kinesis.getShardIterator(getShardIteratorParams, (err, shardIterData) => {
        let shardIterator = shardIterData.ShardIterator;

        // Keep reading records from the stream
        while (true) {
            let getRecParams = {
                ShardIterator: shardIterator
            };
            kinesis.getRecords(getRecParams, (err, recData) => {
                // Skipping error handling

                if (recData.Records.length > 0) {
                   // Do something

                   shardIterator = recData.NextShardIterator;
                }
            });
            // Break if you need
        }
    });
});
 类似资料:
  • 问题内容: 我需要从数据库获取最新记录。我正在使用sqlalchemy。此刻,我正在那样做: 但是查询太繁琐了。我怎样才能更好地获得最近的记录? 问题答案: 看一看。如果你在右列上指定排序,则第一个将是你的最后一个。一个示例可能如下所示:

  • 问题内容: 在日志中,我得到: “来自LAHETYS”发生了什么事?使用HQL或/和SQL处理此问题的最佳实践是什么? 另一个问题: 我得到一个例外: 所以我不能将对象投射到我的Lahetys对象上,很奇怪吗? 谢谢!佐美 问题答案: 您的HQL查询无效。LIMIT不是有效的HQL子句。要在Hibernate中做到这一点,只需

  • 您可能知道,在正常模式下,当状态更新时,我们使用更新依赖项来获得通知,如下所示: 但在我的例子中,我的状态中有一个数组,我正在尝试在useEffect的循环中更新它,如下所示: 在本例中,每次forEach循环运行时,我都会得到初始val(我知道,因为val不是useffect的依赖项),但如果我将其作为依赖项,它将更新两次。解决这个问题的办法是什么? 编辑:基本上,我的意思是,当我在useffe

  • 问题内容: 我有一个类似于 我想获取每个“ deviceId”的最新“数据”(使用message.ts作为时间戳)。 到目前为止,我已经设法使用查询按时间戳记的顺序取回了数据, 但是我不知道如何删除重复的设备记录。 这可以在CosmosDB SQL引擎内完成吗? 问题答案: 感谢Mark Brown的评论,我发现以下内容似乎是此问题的正确解决方案。不仅仅只是一次性使用一些SQL那样优雅,但这确实是

  • 我想从Amazon Kinesis流中获取最新记录。我打算从中提取时间戳,并将其与消费者应用程序检查指向的最后一条记录的时间戳进行比较,以检查消费者是否落后。 我不能使用最新的shard迭代器类型。这是因为LATEST指向最近的记录之后,因此它不能用于访问最近的记录。 有没有简单的方法可以获得最新记录? 我正在考虑的一种方法是获取消费者最近处理的记录序列号的碎片迭代器,使用该碎片迭代器发出GetR

  • 我们都知道Nexus3还没有REST API,这对我来说很奇怪。我只能使用wget或curl手动下载工件。但是当我使用Maven3时,所有的快照工件都使用如下所示的时间戳命名: 谢了。