当前位置: 首页 > 知识库问答 >
问题:

Flink检查点不重放保存点/检查点期间正在处理的kafka事件

祝宾白
2023-03-14

我想在flink中测试一次端到端的处理。我的工作是:

Kafka资料来源-

我在mapper1中放了一个< code > thread . sleep(100000),然后运行了这个作业。我在停止作业时获取了保存点,然后从mapper1中删除了< code > thread . sleep(100000),我希望该事件应该会被重放,因为它没有下沉。但这并没有发生,乔布斯正在等待新的事件。

我的Kafka资料来源:

KafkaSource.<String>builder()
                .setBootstrapServers(consumerConfig.getBrokers())
                .setTopics(consumerConfig.getTopic())
                .setGroupId(consumerConfig.getGroupId())
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("commit.offsets.on.checkpoint", "true")
                .build();

我的Kafka水槽:

KafkaSink.<String>builder()
                .setBootstrapServers(producerConfig.getBootstrapServers())
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(producerConfig.getTopic())
                        .setValueSerializationSchema(new SimpleStringSchema()).build())
                .build();

我的flink作业环境设置:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.enableCheckpointing(2000);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
        environment.getCheckpointConfig().setCheckpointTimeout(60000);
        environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        environment.getCheckpointConfig().setCheckpointTimeout(1000);
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        environment.getCheckpointConfig().enableUnalignedCheckpoints();
        environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        environment.configure(configuration);

我到底做错了什么?我希望在取消/停止作业期间正在进行的任何事件都应该重新开始。

编辑1:我注意到我的kafka对我的flink的kafka-source消费者群体显示出了抵消滞后。我假设这意味着我的检查点运行正常,对吗?

我还观察到,当我从检查点重新启动作业时,它并未开始从剩余的偏移量中消耗,而我将使用者偏移量设置为FIRST。我必须发送更多事件来触发kafka源端的消耗,然后它消耗了所有事件。

共有1个答案

戚晨
2023-03-14

对于“恰好一次”,您必须在针对同一Kafka集群运行的所有应用程序之间提供唯一的< code > TransactionalIdPrefix (与传统的< code>FlinkKafkaConsumer相比,这是一个变化):

KafkaSink<T> sink =
        KafkaSink.<T>builder()
                .setBootstrapServers(...)
                .setKafkaProducerConfig(...)
                .setRecordSerializer(...)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("unique-id-for-your-app")
                .build();

当从检查点恢复时,Flink总是使用存储在检查点中的偏移量,而不是在代码中配置或存储在代理中的偏移量。

 类似资料:
  • 下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应

  • 我知道stackoverflow上也有类似的问题,但在调查了其中几个之后,我知道 > 他们正在使用不同的存储格式 但这些并不是令人困惑的地方,我不知道什么时候该用一个,什么时候该用另一个。 考虑以下两种情况: 如果由于某种原因(例如错误修复或意外崩溃)需要关闭或重新启动整个应用程序,那么我必须使用保存点来恢复整个应用程序

  • 如果Flink应用程序在发生故障或更新后正在启动备份,那么不明确属于KeyedState或OperatorState的类变量是否会持久化? 例如,Flink的留档中描述的BoundedOutOfOrdernessGenerator有一个电流最大时间戳变量。如果更新了Flink应用程序,电流最大时间戳中的值是否会丢失,或者是否会写入在应用程序更新之前创建的保存点? 这样做的真正原因是我想实现一个自定

  • 我们正在接收来自多个独立数据源的事件,因此,到达我们Flink拓扑(通过Kafka)的数据将是无序的。 我们正在Flink拓扑中创建1分钟的事件时间窗口,并在源操作符处生成事件时间水印(当前事件时间-某些阈值(30秒))。 如果一些事件在设置的阈值之后到达,那么这些事件将被忽略(在我们的例子中这是可以的,因为属于该分钟的大多数事件都已经到达并在相应的窗口中得到处理)。 现在的问题是,如果程序崩溃(

  • 简而言之,我想从一开始就对Kafka的数据重新运行Flink管道。 Flink0.10.2,Kafka0.8.2。 我在Kafka中有一个保留2小时的推文主题,以及Flink中的一个管道,该管道以每10秒5分钟的滑动窗口计算推文。 如果我中断管道并重新运行它,我希望它重新读取旧推文,从而发出价值5分钟的推文计数。相反,它似乎从新到达的推文重新开始,因此需要5分钟才能计数为“处于状态”。 我已经尝试

  • 我们正在尝试使用RocksDB后端设置Flink有状态作业。我们使用会话窗口,有30分钟的间隔。我们使用aggregateFunction,所以不使用任何Flink状态变量。通过采样,我们的事件数不到20k次/秒,新会话数不到20-30次/秒。我们的会议基本上收集了所有的事件。会话累加器的大小会随着时间而增大。我们总共使用了10G内存和Flink1.9,128个容器。以下是设置: 从我们对给定时间