当前位置: 首页 > 知识库问答 >
问题:

带有MultiLangDaemon的AWS KCL:检查指向“最新”?

西门飞翮
2023-03-14

我有一个Kinesis消费者,他的工作是跟踪系统中的“当前活跃用户”。用户每分钟都会向一个Kinesis流发送一个心跳信号,这个系统只会保存一个它所看到的所有唯一用户GUID的列表,以及他们上次从该GUID接收心跳信号的时间。如果2分钟内没有检测到心跳信号,我们假设该用户不再处于活动状态,并将其从“当前活动用户”列表中逐出。非常直截了当。

因为这个系统只关注当前活跃的用户,所以我们不需要回处理旧消息。如果我们将此消费者关闭2小时,然后再将其打开,我们希望从“最新”消息开始处理,而不是从我们停止的地方开始。

最后,根据Amazon Kinesis Client NodeJS示例,这已实现为NodeJS应用程序,使用MultiLangDaemon与Kinesis客户端库进行html" target="_blank">通信。

在正常使用下,我发现始终从“LATEST”恢复的最佳方法是永远不要使用KCL的检查点功能。例如,在我的进程记录方法的底部,我有以下内容:

    // We don't checkpoint with kinrta, because if we crash for some reason we
    // want to immediately catch back up to live instead of wasting time
    // processing expired heartbeats
    // processRecordsInput.checkpointer.checkpoint(sequenceNumber,
      // function(err, checkpointedSequenceNumber) {

        completeCallback();

      // }
    // );

这样,每当我杀死消费者并重新启动它时,它都会查看<代码>*。属性文件,并看到“initialPositionInStream”是“最新的”,然后从那里开始处理。

然而

当我重新分片流(分割分片或合并分片)时,我遇到了一个问题。当我重新分片时,新分片上的检查点没有设置为“最新”,而是设置为“TRIM_HORIZON”。因为我从来没有重新检查点,这意味着如果我的消费者关闭并重新启动,我最终不得不处理24小时的数据。

我可以通过编辑KCL用于管理检查点的Dynamo表来手动修复此问题,但这显然不是一个可扩展的解决方案。我尝试使用检查指针并传递字符串“LATEST”而不是序列号,但这会引发序列号无效的错误。

我如何告诉KCL,当我重新切分时,我想在新切分上将检查点设置为“最新”?

作为一个简单的解决方案,我考虑过只使用DynamoDBSDK并修复初始化方法中的检查点。它很难看,但我认为它会起作用(假设Amazon不改变他们管理KCL表的方式)

根据所描述的“hack-y解决方案”,我编写了以下小助手方法:

/**
 * Assumes the current shardId (available in the initialize method's
 * `initializeInput.shardId`) is stored in the global "state" object,
 * accessible via the "state" import
 */

import { Kinesis, DynamoDB } from "aws-sdk";
import state from "../state";
import logger from "./logger";
 
const kinesis = new Kinesis();
const ddb = new DynamoDB.DocumentClient();

const log = logger().getLogger("recordProcessor");
const appName = process.env.APP_NAME;

export default async function (startingCheckpoint: string) { 
    // We can't update any Dynamo tables if we don't know which table to update
    if (!appName) return;

    // Compute the name of the shard JUST BEFORE ours
    // Because Kinesis uses an "exclusive" start ID...
    const shardIdNum = parseInt(state.shardId.split("-")[1]) - 1;
    const startShardId = "shardId-" + ("000000000000" + shardIdNum).substr(-12);

    // Pull data about our current shard
    const kinesisResp = await kinesis.listShards({
        StreamName: process.env.KINESIS_STREAM_NAME,
        MaxResults: 1,
        ExclusiveStartShardId: startShardId
    }).promise();
    const oldestSeqNumber = kinesisResp.Shards[0].SequenceNumberRange.StartingSequenceNumber;

    // Pull data about our current checkpoint
    const dynamoResp = await ddb.get({
        TableName: appName,
        Key: {
            leaseKey: state.shardId
        }
    }).promise();
    const prevCheckpoint = dynamoResp.Item.checkpoint;

    log.debug(`Oldest sequence number in Kinesis shard: ${oldestSeqNumber} vs checkpoint: ${prevCheckpoint}`);

    // Determine if we need to "fix" anything
    if (startingCheckpoint === "TRIM_HORIZON") {

        // If our checkpoint is before the oldest sequence number, reset it to
        // "TRIM_HORIZON" so we pull the oldest sequence number
        if (prevCheckpoint < oldestSeqNumber) {
            log.info("Updating checkpoint to TRIM_HORIZON");

            await ddb.update({
                TableName: appName,
                Key: {
                    leaseKey: state.shardId
                },
                UpdateExpression: "SET #checkpoint = :value",
                ExpressionAttributeNames: {
                    "#checkpoint": "checkpoint"
                },
                ExpressionAttributeValues: {
                    ":value": "TRIM_HORIZON"
                }
            }).promise();
        }

    } else if (startingCheckpoint === "LATEST") {

        if (prevCheckpoint !== "LATEST") {
            log.info("Updating checkpoint to LATEST");

            await ddb.update({
                TableName: appName,
                Key: {
                    leaseKey: state.shardId
                },
                UpdateExpression: "SET #checkpoint = :value",
                ExpressionAttributeNames: {
                    "#checkpoint": "checkpoint"
                },
                ExpressionAttributeValues: {
                    ":value": "LATEST"
                }
            }).promise();
        }

    } else {
        log.warn("We can't 'fix' checkpoints that aren't TRIM_HORIZON or LATEST");
    }
}

我进行了测试,这正确有效地更新了DynamoDB表,但它不会立即开始从新位置提取记录。看起来KCL在调用initialize方法之前读取了一次检查点,并且从不重新读取。

此时,我正在寻找一种方法来告诉KCL“开始使用新的检查点”,或者一种方法来优雅地重新启动消费者,以便它重新初始化所有内容。这两样我都没有,但我会继续研究的。也许我可以在MultiLangDaemon文档中找到一些我可以写入STDOUT的东西。。。

共有1个答案

暨高洁
2023-03-14

经过大量研究,我得出结论,亚马逊没有提供任何方法来要求优雅的关闭。您只需使消费者崩溃(进程.退出()),然后等待Docker重新启动它。

然而,在我的hack-y“checkpoint fixer”脚本(我在initialize()回调中运行)和这个hack-y“crash to restart”方法之间,我现在有了一个适当更新我的检查点的解决方案,所以现在动觉对我来说运行更加平稳。

 类似资料:
  • 我确信我在这里遗漏了一些显而易见的东西,但我不知道如何针对客户检查现有卡。 我在laravel应用程序中使用stripe connect api代表他人管理付款,基本流程如下: < Li > stripe < code >令牌通过< code>stripe.js创建,并与支付表单一起提交 < li >如果客户存在于本地数据库中,我获取他们的< code>stripe_id,否则使用令牌作为源/卡创

  • 问题内容: 我有以“键-键”格式而不是“键-值”格式组织的数据。这就像一个HashMap,但是我将需要在两个方向上进行O(1)查找。这种数据结构是否有名称,Java的标准库中是否包含类似的名称?(或者Apache Commons?) 我可以编写自己的类,该类基本上使用两个镜像的Map,但我不想重蹈覆辙(如果已经存在,但我只是没有在寻找正确的术语)。 问题答案: Java API中没有此类。您想要的

  • 问题内容: 这个问题已经在这里有了答案 : 从PDO准备好的语句中获取原始SQL查询字符串 (16个答案) 3年前关闭。 在PHP中,当使用带有参数化查询的PDO访问MySQL数据库时,如何检查最终查询(在替换了所有令牌之后)? 有没有办法检查数据库真正执行了什么? 问题答案: 因此,我想我将最终回答我自己的问题,以便为记录提供完整的解决方案。但是必须感谢本·詹姆斯和凯拉什·巴杜,他们为此提供了线

  • 问题内容: 我在Java中有以下搜索代码: 我希望按名称查找列并返回找到的第一个。 我了解在某些情况下什么也找不到,应该对其进行处理,但是如何处理呢? 这是这个咒骂所要的: ? 怎么修?如果没有发现,我希望返回。 更新 好吧,好吧,我只是没有意识到,那又回来了。 问题答案: 替换为。

  • 编译器显示下一个错误:在这里输入图像描述

  • 问题内容: 我的服务器上有一个文件夹,我有许多符号链接指向该文件夹。此后,我创建了一个新文件夹,并且想要更改所有这些符号链接以指向新文件夹。我曾考虑过用新链接的符号链接替换原始文件夹,但是如果我继续这种做法,它看起来很快就会变得非常混乱。 我一直在做的是手动更改符号链接以指向新文件夹,但是我可能错过了一些。 有没有一种方法可以检查是否有指向特定文件夹的符号链接? 问题答案: 我会使用find命令。