当前位置: 首页 > 知识库问答 >
问题:

无法在Flink新Kafka消费者api中的检查点向Kafka提交消费补偿(1.14)

能修谨
2023-03-14

我指的是Kafka源代码连接器的Flink 1.14版本,代码如下。

我期待以下要求。

  • 在最新的应用程序开始时,必须阅读Kafka主题的最新偏移量
  • 在检查点上,它必须将消耗的偏移量提交给Kafka
  • 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中选取,并且必须消耗使用者延迟,然后再使用新的事件提要

有了Flink新的KafkaConsumer API(KafkaSource),我面临以下问题

  • 能够完成上述要求,但不能在检查点(500ms)上提交消耗的偏移量。它宁愿在2s或3s后提交。

在2/3秒内手动关闭应用程序并重新启动时。由于上次使用的消息未提交,因此会读取两次(重复)。

为了交叉检查这个功能,我尝试了Flink Kafka的旧消费者应用编程接口(Flink Kafka消费者)。在那里,它是完美的工作。当消息被立即消费时,它被提交回Kafka。

接下来的步骤

  • 建立Kafka环境

如果我丢失了任何东西或需要添加任何财产,请提出建议。

 @Test
    public void test() throws Exception {

        System.out.println("FlinkKafkaStreamsTest started ..");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(500);
        env.setParallelism(4);

        Properties propertiesOld = new Properties();
        Properties properties = new Properties();
        String inputTopic = "input_topic";
        String bootStrapServers = "localhost:29092";
        String groupId_older = "older_test1";
        String groupId = "test1";

        propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
        propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);


        /******************** Old Kafka API **************/
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
                new KRecordDes(),
                propertiesOld);
        flinkKafkaConsumer.setStartFromGroupOffsets();
        env.addSource(flinkKafkaConsumer).print("old-api");


        /******************** New Kafka API **************/
        KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
                .setBootstrapServers(bootStrapServers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("enable.auto.commit", "false")
                .setProperty("commit.offsets.on.checkpoint", "true")
                .setProperties(properties)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));

        KafkaSource<String> kafkaSource = sourceBuilder.build();

        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");

        source.print("new-api");

        env.execute();
    }
    static class KRecordDes implements  KafkaDeserializationSchema<String>{
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        @Override
        public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            return new String(consumerRecord.value());
        }
    }

注意:我还有其他要求,我希望Flink Kafka有界源代码读取器在相同的代码中,这在新的API(KafkaSource)中可用。

共有1个答案

萧晓博
2023-03-14

Kafka的文件来源:

请注意,Kafka source不依赖提交的偏移量来实现容错。提交抵销仅用于公开消费者和消费群体的进度以进行监控。

当Flink作业从失败中恢复时,它将从最近成功的检查点恢复状态,并从存储在该检查点中的偏移恢复消耗,因此检查点之后的记录将被“重播”一点。由于您使用的是打印接收器,它不支持完全一次语义,因此您将看到重复的记录,这些记录实际上是最近一次成功检查点之后的记录。

对于您提到的偏移量提交的2-3秒延迟,这是因为实现了SourceReaderBase。简而言之,SplitFetcher管理一个任务队列,当一个offset commit任务被推到队列中时,它不会被执行,直到调用KafkaConsumer#poll()的正在运行的fetch任务超时。如果交通量很小,延误可能会更长。但请注意,这不会影响正确性:KafkaSource不使用提交的偏移量进行容错。

 类似资料:
  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我正在使用Kafka 0.8 最近,我们开始喂食和消耗一个行为怪异的新主题,消耗的偏移量突然被重置,它尊重我们设置的auto.offset.reset策略(实际上是最小的)但我无法理解为什么该主题会突然重置其偏移量。 我正在使用高级消费者。 这是我发现的一些错误日志: 我们有一堆这样的错误日志: 每次出现此问题时,我都会看到警告日志: 然后真正的问题发生了: 现在的问题是:有人已经经历过这种行为吗

  • 我设置了MirrorMaker2,用于在两个DC之间复制数据。 我的 mm2 属性, 看到下面的MM2创业。 我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。 已在源群集中启动使用者组。 消耗了少量消息并将其停止。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。 我尝试使用来自目标集群的消息,如下所示。 由于我使用相同的使用者组,因此我希望我

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • 我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。

  • 我在Flink的工作中使用Kafka资料来源的信息流,一次阅读50个主题,如下所示: 然后有一些运算符,如:过滤器- 我能获得的最大吞吐量是每秒10k到20k条记录,考虑到源发布了数十万个事件,这相当低,我可以清楚地看到消费者落后于生产者。我甚至试着移除水槽和其他操作员,以确保没有背压,但它仍然是一样的。我正在将我的应用程序部署到Amazon Kinesis data analytics,并尝试了