当前位置: 首页 > 知识库问答 >
问题:

当我重新运行Flink consumer时,Kafka再次使用最新消息

戚研
2023-03-14

我用Scala编写的Apache Flink API创建了一个Kafka消费者。每当我传递某个话题的信息时,它就会适时地接收它们。但是,当我重新启动消费者时,它不会接收新的或未使用的消息,而是使用发送到该主题的最新消息。

以下是我正在做的:

>

  • 运行生产者:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    

    运行消费者:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    

    传递一些信息

  • 共有1个答案

    鄢博简
    2023-03-14

    您正在运行Kafka消费者,检查点间隔为5秒。因此,每5秒钟,Flink就会创建一份操作员状态(偏移量)的副本以进行恢复。

    一旦检查点完成,它会让操作员知道检查点已经完成。在该通知中,Kafka消费者将补偿提交给Zookeeper。所以大约每5秒,我们将最后一个检查点的偏移量写入ZK。

    当你再次开始Flink作业时,它会在ZK中找到偏移量,并从那里继续。根据时间的不同,提交到ZK之后收到的所有消息都将再次发送。

    您不能避免这种行为,因为. print()操作符不是检查点的一部分。它的意思是调试实用程序。然而,参与检查点的数据接收器(例如滚动文件接收器)将确保没有重复写入文件系统。

     类似资料:
    • 我们正在使用spring kafka 1.2.2。释放 我们想要的 1.一旦消息被消费并成功处理,就会在spring-kafka中提交偏移量。 我正在使用Manaul Commit/Ac认收它,它工作正常。 2.在任何异常的情况下,我们希望spring-kafka重新发送相同的消息。 我们对任何系统误差抛出RunTime异常,它由spring-kafka记录并且从未提交。 这很好,因为我们不希望它

    • 我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动Kaf

    • 我有一个Kafka Streams应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题。 每小时消耗/产生几百万条记录。每当我关闭一个代理时,应用程序就进入重新平衡状态,在重新平衡多次之后,它开始使用非常旧的消息。 注意:当Kafka Streams应用程序运行良好时,它的消费者滞后几乎为0。但再平衡之后,它的滞后从0到1000万。 这会不会是因为偏移.保留.分钟。 在这方面的任何帮助都将

    • 问题内容: 下面显示的以下代码会将3个JLabel添加到JFrame中,然后删除3个JLabel。2秒后,它将重新绘制3个JLabel。 我面临的一个小问题是,它正在重新绘制到新的frame(frameTwo)而不是旧的frame(frameOne)。 如何使它重新绘制现有框架而不是重新绘制到新框架? 问题答案: Simpy使用CardLayout来交换视图。 请使用Swing计时器,而不要使用当

    • 直到几周前,我让我的量角器e2e测试与Chrome浏览器一起工作。然而,自从上次Chrome更新69.0.3497.100 (64位)以来,在非无头模式下运行测试不再工作,我得到了一个错误。如果我在无头模式下运行测试(通过量角器配置文件“--head less”中的),所有工作正常。 我在windows命令提示符中遇到的错误是: WebDriverError:未知错误:Chrome无法启动:崩溃(

    • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置: