我正在使用KCL(v2)将Kafka消费者转换为AWS动觉消费者。在Kafka中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的Kafka应用程序死机,它将使用重新启动时停止的偏移量。
然而,这在Kinesis中是不一样的。我可以设置动力学ClientLibConfiguration.withfiralPositionInStream(...)
,但唯一的参数是TRIM_HORIZON
、LATEST
或AT_TIMESTAMP
。如果我的Kinesis应用程序死机,它将不知道重新启动时从哪里恢复消费。
我的KCL消费者非常简单。方法如下所示:
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
"testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
Worker worker = new Worker.Builder()
.recordProcessorFactory(new KCLRecordProcessorFactory())
.config(config)
.build();
而RecordProcessor是一个简单的实现:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", records.size());
records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOGGER.info("Shutting down input");
}
如果我检查相应的DynamoDB表,则检查点的值设置为TRIM\u HORIZON,并且不会随着记录的使用而使用SequenceID进行更新。
这里有什么解决方案来确保我消费每条消息?
正如@kdgregory所指出的,KCL要求用户设置自己的检查点。工作代码:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", records.size());
records.forEach(r -> LOGGER.info("Record with sequenceId {} at date {} : {}", r.getSequenceNumber(),
r.getApproximateArrivalTimestamp(), StandardCharsets.UTF_8.decode(r.getData())));
try {
processRecordsInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
LOGGER.error("Unable to checkpoint");
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOGGER.info("Shutting down input");
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
LOGGER.error("Unable to checkpoint");
}
}
我有一份flink的工作,它使用Kafka的数据,制作一些无状态平面图,并向Kafka生成数据,这是一份工作量非常小的工作。 例如,在作业需要从检查点还原之前,它通常会无问题地获取检查点,而它只是无法使用下面的堆栈跟踪还原状态。 状态非常小,我相信它只是Kafka偏移量,它至少运行了一次语义。 所有操作员都有。uid()集,我完全没有主意了。 这是尝试从检查点重新启动时的错误: 任务管理器在正常操
我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗
1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?
null
我有两个活动,分别是家庭活动和沙拉菜单活动。主页活动包含七个片段,其中一个名为菜单类别的片段有四个图像,第一个图像下面的文本是沙拉菜单,当用户单击沙拉菜单时,将启动名为沙拉菜单活动的新活动。 我想,当用户点击后退按钮时,home活动必须开始,并且应该从启动salad菜单活动的相同片段开始,即home活动中的菜单类别片段。我希望每个活动都会发生这种情况,例如,如果用户从您的订单片段开始一个新的活动,
问题内容: 有没有一种方法来获取数字的最后一位数字。我试图找到以“ 1”结尾的变量,例如1,11,21,31,41等。 如果我使用文本变量,我可以简单地输入 但是它适用于带有文本的变量(例如“ hello”),而不适用于数字。对于数字,我会遇到此错误: 我试图看看是否有更好的方法来处理数字。我知道一个解决方案是将其转换为字符串,然后执行上面的命令,但是我正在尝试查看是否还有另一种错过的方式。 提前