我有一个Flink流媒体程序,可以读取Kafka主题的数据。在程序中,自动。抵消重置设置为“最小”。在IDE/Intellij IDEA中进行测试时,程序始终可以从主题的开头读取数据。然后,我建立了一个flink/kafka集群,并将一些数据生成kafka主题。我第一次运行流媒体作业时,它可以读取主题开头的数据。但在那之后,我停止了流式处理作业并再次运行它,它将不会读取主题开头的数据。如何使程序始终从主题的开头读取数据?
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaServers);
properties.put("zookeeper.connect", zkConStr);
properties.put("group.id", group);
properties.put("topic", topics);
properties.put("auto.offset.reset", offset);
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer082<String>(topics, new SimpleStringSchema(), properties));
如果您想始终从头开始读取,则需要在流上下文中禁用检查点。
还可以在消费者属性级别禁用它:
使可能汽车提交=false或auto。犯罪enable=false(取决于Kafka版本)
另一种方法:您可以保持ckeckpointing以进行故障切换,但可以生成新组。需要从头开始阅读时的id(有时只需清理zookeeper)
问题内容: 我正在尝试使用Spring Boot和MySQL开发应用程序。正如文档所述,首先,我使用Intelij Idea使用Spring initializr创建了项目,配置了文件,并编写了文件和文件。运行项目后,我发现MySQL数据库中没有表或数据。我的配置有什么问题?请帮忙。 application.properties文件, pom.xml文件中的依赖项, schema-mysql.sq
我有一个php版本为7.0的Linux apache2 Web服务器。22.看起来设置根本不起作用:( 这个ini文件位于以下位置:根据。当我查看文件夹时,我看到三个文件(设置告诉我正在使用的文件),和。 所以我查找了,它是,我将它改为,然后。我对所有的人都这样做了。ini文件保存了它们,重新启动apache2并刷新了phpinfo()页面,但什么都没有发生。我试图在所有窗口中更改其他设置。ini
我尝试将ini文件解析为可以在ant脚本中使用的属性。我有以下几点: 我试图做的是解析所有的name=value对,并将它们放入属性中,如:section。名称=值; 不知何故,“echoMsg”目标中没有记住该部分。我想记住部门名称。 所以 应该成为: 这是我的ant脚本的输出: 如您所见,未设置最后一个“${prevSection}”。我希望它是“全球性的”。 我试着用它来代替财产,但没有区别
我有一个来自3的Vue项目。十、 我在
积分抵现 积分抵现包括积分抵现比率、是否开启积分抵现和积分说明。 其中,积分抵现比率为 1积分可抵多少元现金