当前位置: 首页 > 知识库问答 >
问题:

AWS运动-如何从最后一个检查点恢复消费

厍兴腾
2023-03-14

我正在使用KCL(v2)将Kafka消费者转换为AWS动觉消费者。在Kafka中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的Kafka应用程序死机,它将使用重新启动时停止的偏移量。

然而,这在Kinesis中是不一样的。我可以设置动力学ClientLibConfiguration.withfiralPositionInStream(...),但唯一的参数是TRIM_HORIZONLATESTAT_TIMESTAMP。如果我的Kinesis应用程序死机,它将不知道重新启动时从哪里恢复消费。

我的KCL消费者非常简单。方法如下所示:

KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
            "testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

Worker worker = new Worker.Builder()
            .recordProcessorFactory(new KCLRecordProcessorFactory())
            .config(config)
            .build();

而RecordProcessor是一个简单的实现:

@Override
public void initialize(InitializationInput initializationInput) {
    LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    LOGGER.info("Retrieved {} records", records.size());
    records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
}

@Override
public void shutdown(ShutdownInput shutdownInput) {
    LOGGER.info("Shutting down input");
}

如果我检查相应的DynamoDB表,则检查点的值设置为TRIM\u HORIZON,并且不会随着记录的使用而使用SequenceID进行更新。

这里有什么解决方案来确保我消费每条消息?

共有1个答案

水品
2023-03-14

正如@kdgregory所指出的,KCL要求用户设置自己的检查点。工作代码:

@Override
public void initialize(InitializationInput initializationInput) {
    LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    LOGGER.info("Retrieved {} records", records.size());
    records.forEach(r -> LOGGER.info("Record with sequenceId {} at date {} : {}", r.getSequenceNumber(),
            r.getApproximateArrivalTimestamp(), StandardCharsets.UTF_8.decode(r.getData())));
    try {
        processRecordsInput.getCheckpointer().checkpoint();
    } catch (InvalidStateException | ShutdownException e) {
        LOGGER.error("Unable to checkpoint");
    }
}

@Override
public void shutdown(ShutdownInput shutdownInput) {
    LOGGER.info("Shutting down input");
    try {
        shutdownInput.getCheckpointer().checkpoint();
    } catch (InvalidStateException | ShutdownException e) {
        LOGGER.error("Unable to checkpoint");
    }
}
 类似资料:
  • 我有一份flink的工作,它使用Kafka的数据,制作一些无状态平面图,并向Kafka生成数据,这是一份工作量非常小的工作。 例如,在作业需要从检查点还原之前,它通常会无问题地获取检查点,而它只是无法使用下面的堆栈跟踪还原状态。 状态非常小,我相信它只是Kafka偏移量,它至少运行了一次语义。 所有操作员都有。uid()集,我完全没有主意了。 这是尝试从检查点重新启动时的错误: 任务管理器在正常操

  • 我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗

  • 1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?

  • 我有两个活动,分别是家庭活动和沙拉菜单活动。主页活动包含七个片段,其中一个名为菜单类别的片段有四个图像,第一个图像下面的文本是沙拉菜单,当用户单击沙拉菜单时,将启动名为沙拉菜单活动的新活动。 我想,当用户点击后退按钮时,home活动必须开始,并且应该从启动salad菜单活动的相同片段开始,即home活动中的菜单类别片段。我希望每个活动都会发生这种情况,例如,如果用户从您的订单片段开始一个新的活动,

  • 问题内容: 有没有一种方法来获取数字的最后一位数字。我试图找到以“ 1”结尾的变量,例如1,11,21,31,41等。 如果我使用文本变量,我可以简单地输入 但是它适用于带有文本的变量(例如“ hello”),而不适用于数字。对于数字,我会遇到此错误: 我试图看看是否有更好的方法来处理数字。我知道一个解决方案是将其转换为字符串,然后执行上面的命令,但是我正在尝试查看是否还有另一种错过的方式。 提前