当前位置: 首页 > 知识库问答 >
问题:

FlinkKafkaConsumer082汽车。抵消重置设置不起作用?

姬朗
2023-03-14

我有一个Flink流媒体程序,可以读取Kafka主题的数据。在程序中,自动。抵消重置设置为“最小”。在IDE/Intellij IDEA中进行测试时,程序始终可以从主题的开头读取数据。然后,我建立了一个flink/kafka集群,并将一些数据生成kafka主题。我第一次运行流媒体作业时,它可以读取主题开头的数据。但在那之后,我停止了流式处理作业并再次运行它,它将不会读取主题开头的数据。如何使程序始终从主题的开头读取数据?

    Properties properties = new Properties();
    properties.put("bootstrap.servers", kafkaServers);
    properties.put("zookeeper.connect", zkConStr);
    properties.put("group.id", group);
    properties.put("topic", topics);
    properties.put("auto.offset.reset", offset);

    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer082<String>(topics, new SimpleStringSchema(), properties));

共有1个答案

叶文博
2023-03-14

如果您想始终从头开始读取,则需要在流上下文中禁用检查点。

还可以在消费者属性级别禁用它:

使可能汽车提交=false或auto。犯罪enable=false(取决于Kafka版本)

另一种方法:您可以保持ckeckpointing以进行故障切换,但可以生成新组。需要从头开始阅读时的id(有时只需清理zookeeper)

 类似资料:
  • 问题内容: 我正在尝试使用Spring Boot和MySQL开发应用程序。正如文档所述,首先,我使用Intelij Idea使用Spring initializr创建了项目,配置了文件,并编写了文件和文件。运行项目后,我发现MySQL数据库中没有表或数据。我的配置有什么问题?请帮忙。 application.properties文件, pom.xml文件中的依赖项, schema-mysql.sq

  • 我有一个php版本为7.0的Linux apache2 Web服务器。22.看起来设置根本不起作用:( 这个ini文件位于以下位置:根据。当我查看文件夹时,我看到三个文件(设置告诉我正在使用的文件),和。 所以我查找了,它是,我将它改为,然后。我对所有的人都这样做了。ini文件保存了它们,重新启动apache2并刷新了phpinfo()页面,但什么都没有发生。我试图在所有窗口中更改其他设置。ini

  • 我尝试将ini文件解析为可以在ant脚本中使用的属性。我有以下几点: 我试图做的是解析所有的name=value对,并将它们放入属性中,如:section。名称=值; 不知何故,“echoMsg”目标中没有记住该部分。我想记住部门名称。 所以 应该成为: 这是我的ant脚本的输出: 如您所见,未设置最后一个“${prevSection}”。我希望它是“全球性的”。 我试着用它来代替财产,但没有区别

  • 我有一个来自3的Vue项目。十、 我在

  • 积分抵现 积分抵现包括积分抵现比率、是否开启积分抵现和积分说明。 其中,积分抵现比率为 1积分可抵多少元现金