当前位置: 首页 > 知识库问答 >
问题:

Flink·Kafka:为什么我会丢失信息?

党祖鹤
2023-03-14

我编写了一个非常简单的Flink流媒体作业,它使用Flink Kafka消费者082从Kafka获取数据。

protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
    Properties result = new Properties();
    result.put("bootstrap.servers", getBrokerUrl());
    result.put("zookeeper.connect", getZookeeperUrl());
    result.put("group.id", getGroup());

        return env.addSource(
                new FlinkKafkaConsumer082<>(
                        topic,
                        new SimpleStringSchema(), result);
}

这工作得很好,每当我在Kafka上将某些内容放入主题时,它都会被我的Flink作业接收并处理。现在我试图看看如果我的Flink作业由于某种原因不在线会发生什么。所以我关闭了flink作业并继续向Kafka发送消息。然后我再次开始我的Flink作业,并期望它会处理同时发送的消息。

然而,我得到了以下信息:

No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]

因此,它基本上忽略了自上次关闭Flink作业以来出现的所有消息,只是在队列末尾开始读取。从我收集的FlinkKafkaConsumer082文档中,它自动负责与Kafka代理同步处理的偏移量。然而,情况似乎并非如此。

我使用单节点Kafka安装(Kafka发行版附带的安装)和单节点Zookeper安装(也是与Kafka发行版捆绑的安装)。

我怀疑这是某种错误配置或类似的东西,但我真的不知道从哪里开始寻找。有人有这个问题,也许解决了?

共有2个答案

笪烨
2023-03-14

https://kafka.apache.org/08/configuration.html

设置自动。抵消重置为最小值(默认为最大值)

auto.offset.reset:

当Zookeeper中没有初始偏移或偏移超出范围时,该怎么办:

最小:自动将偏移重置为最小偏移

最大:自动将偏移重置为最大偏移

其他:向消费者抛出异常。

如果将其设置为最大,则当代理上订阅的主题的分区数更改时,使用者可能会丢失一些消息。要防止在添加分区期间丢失数据,请设置auto。抵消重置为最小值

还要确保重启后getGroup()相同

令狐宏浚
2023-03-14

我找到了原因。您需要在StreamExecutionEnvironment中显式启用检查点,以使Kafka连接器将处理后的偏移量写入Zookeeper。如果不启用它,Kafka连接器将不会写入上次读取的偏移量,因此在重新启动收集作业时,它将无法从那里恢复。所以一定要写:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(); // <-- this is the important part

Anatoly关于更改初始偏移量的建议可能仍然是一个好主意,以防检查点由于某种原因而失败。

 类似资料:
  • 我有一个视频文件,与此视频流: 流#0:0:视频:h264(主)(h264/0x34363248),yuv420p(电视,bt709,渐进式),1920x1080[SAR 1:1 DAR 16:9],4204kb/s,59.94fps,59.94tbr,59.94tbn,59.94tbc 我可以通过运行获得PTS信息: 我一行一行地得到每帧的PTS: 现在我需要将视频编码到H264,并且之后能够获

  • 我有五门课: 、、、、。 是文本的持有者。 是空的抽象类。 扩展并存储一个String和一个ArrayList的。 是抽象类并存储

  • 使用Flume源syslogudp,我看到大约25%的数据丢失。 这是我的配置 a1.sources = r1 a1.sinks=k1 a1 .通道= c1 a1.sources.r1.type = syslogudp a1.sources.r1.bind = 172.24.1.78 a1.sources.r1.port = 65535 a1.sinks.k1.type=文件滚动 水槽。水槽。目录

  • 使用window.open打开同源新tab,新页面localstorage中token丢失。

  • 我试图以CSV格式保存py spark . SQL . data frame . data frame(也可以是其他格式,只要它易于阅读)。 到目前为止,我找到了几个示例来保存DataFrame。然而,每次我编写它时,它都会丢失信息。 数据集示例: 为了将这个文件保存为CSV,我首先尝试了这个解决方案: 不幸是,这导致了以下错误: 这就是我尝试另一种可能性的原因,将spark数据帧转换成panda