当前位置: 首页 > 知识库问答 >
问题:

Flink Kafka Producer中的精确一次语义学

阳宾实
2023-03-14

一、 我正试图用Kafka信源和信宿测试Flink一次语义:

  1. 运行flink应用程序,只需将消息从一个主题传输到另一个主题,并行度=1,检查点间隔20秒
  2. 每2秒使用Python脚本生成具有递增整数的消息。
  3. 使用read_committed隔离级别的控制台使用者读取输出主题。
  4. 手动杀死TaskManager

我希望在输出主题中看到单调递增的整数,而不考虑TaskManager的终止和恢复。

但实际上,a在控制台使用者输出中看到了一些意想不到的东西:

32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

看起来像是在输出主题中重播的检查点之间的所有消息。这应该是正确的行为还是我做错了什么?

恢复了一个快照:Flink UI

我的Flink代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");
    }

共有2个答案

卢景澄
2023-03-14

Flink以规则的、可配置的间隔生成检查点。当恢复检查点时,Flink会将状态回滚到输入流中最后一次检查点的位置(不一定与上次处理/消费的位置相同)。有不同的方法来确保精确一次语义学。您可以使用支持精确一次语义学的生产者(接收器),请参阅:Flink接收器中的容错保证。

或者,您可以在消费者中支持精确一次语义学。假设唯一整数与多个工作者保持一致(并行性

>

  • 假设当前检查点id为Ckpt N。将所有已处理的整数(大事件情况下已处理事件的指纹)存储在Ckpt N状态。您可以通过让消费者实现ListCheckped接口来实现这一点,以将状态(指纹,或您情况下的整数)存储在Ckpt N中。

    一旦Flink移动到下一个检查点(Ckpt N 1),过滤掉存储在Ckpt N状态中的所有整数,以确保只进行一次处理。以Ckpt N 1的状态(即丢弃Ckpt N的状态)存储未过滤的已处理整数(或已处理事件的指纹)。

    您只需要存储发生在两个检查点之间的已处理事件(或您的情况下的整数)的指纹,然后在持久化新检查点时丢弃。

  • 袁山
    2023-03-14

    除了为精确一次语义学设置生产者之外,您还需要将消费者配置为仅读取来自kafka的已提交消息。默认情况下,消费者将读取已提交和未提交的消息。将此设置添加到您的消费者应该会让您更接近您想要的行为。

    consumerProperties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    
     类似资料:
    • 我已经设置了一个Flink 1.2独立集群,其中包含2个JobManager和3个TaskManager,我正在使用JMeter通过生成Kafka消息/事件对其进行负载测试,然后处理这些消息/事件。处理作业在TaskManager上运行,通常需要大约15K个事件/秒。 作业已设置EXACTLY_ONCE检查点,并将状态和检查点持久化到Amazon S3。如果我关闭运行作业的TaskManager需

    • Apache Flink通过从检查点恢复作业,确保故障和恢复时只进行一次处理,检查点是分布式数据流和操作员状态的一致快照(分布式快照的Chandy Lamport算法)。这保证了故障切换时只需一次。 在正常集群操作的情况下,Flink如何保证只进行一次处理,例如给定一个从外部源读取的Flink源(例如Kafka),Flink如何保证从源读取一次事件?事件源和Flink源之间是否有任何类型的应用程序

    • 我正在寻找Python的第n个根函数/算法,但在发布之前:没有整数根,见鬼 我从哪里至少可以获得一个指南,指导如何编程生成精确的/ 对于(第一个参数是数字,第二个参数是根深度(或其他内容))不返回或的函数。 编辑:所以,你给了我这个解决方案:,当我问这个问题时,我就知道了,但它不适用于,例如,。你不能用有理数来表示,因此给出了不正确的结果

    • 我正在浏览文档,我知道通过启用 幂等性:幂等生成函数对一个主题对一个生成函数只启用一次。基本上,每一条消息发送都有更高的保证,并且在出现错误时不会重复 那么,如果我们已经有幂等性,那么为什么我们需要在Kafka Stream中另一个恰好一次的属性呢?幂等性和恰好一次之间有什么区别 为什么在普通Kafka制作人中不提供一次房产?

    • 我正在使用elasticsearch从json字段进行精确短语匹配。我尝试过多种语法,比如multi_match、query_string query_string我正在使用的语法; 我也尝试了过滤器而不是查询,但是过滤器在json上没有给出任何结果。我用于过滤器的语法是; 现在的问题是; 是否可以使用elasticsearch对json执行精确匹配操作?

    • 更准确地说,函数接口定义为具有一个抽象方法的任何接口。 然后他继续介绍示例,其中一个是接口: 我能够测试是否可以使用lambda函数代替比较器参数,并且它能够工作(例如)。