当前位置: 首页 > 知识库问答 >
问题:

Flink-如何通过JMX Reporter导出Flink的Kafka连接器偏移量?

邢飞雨
2023-03-14

根据这里的说明,我启用了JMX Exporter,并能够连接到它以查看一些指标:jobmanager。状态,jobmanager。工作

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

但是,我没有看到KafkaConnector的度量值。我该怎么做?

我想从Flink那里得到那些偏移量来计算

消费者滞后=最新kafka分区偏移量-flink分区偏移量。

共有1个答案

唐烨煜
2023-03-14

我找到了答案。flink-conf.yaml中的JMX端口配置应该是一个范围(9250-9260),而不是一个端口号。Flink需要至少2个端口来同时导出JobManager和TaskManager的指标。

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 9250-9260

如果仅指定了1个端口(问题中为8789),则仅导出来自JobManager的指标。KafkaConnector的指标属于TaskManager的指标,因此不会出现。

 类似资料:
  • 我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。

  • 我正在使用Apache Flink,并尝试通过使用Apache Kafka协议从它接收消息来连接到Azure eventhub。我设法连接到Azure eventhub并接收消息,但我不能使用这里(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-star

  • 我对flink/Java/Scala还比较陌生,所以这可能不是问题,但非常感谢您的帮助。我还没有找到一个将Flink Kafka连接器与Flink 1.13结合使用的示例(对我适用)。 我的项目在这里:https://github.com/sysarcher/flink-scala-tests 我想我无法使用我想试用的FlinkKafkaConsumer(链接)。 我正在使用IntelliJ Id

  • 简而言之,我想从一开始就对Kafka的数据重新运行Flink管道。 Flink0.10.2,Kafka0.8.2。 我在Kafka中有一个保留2小时的推文主题,以及Flink中的一个管道,该管道以每10秒5分钟的滑动窗口计算推文。 如果我中断管道并重新运行它,我希望它重新读取旧推文,从而发出价值5分钟的推文计数。相反,它似乎从新到达的推文重新开始,因此需要5分钟才能计数为“处于状态”。 我已经尝试

  • 我想重置AerospikeSink Kafka Connector偏移量,我首先删除连接器消费组()偏移量,然后重新创建它。当我使用策略重新创建时,它以正确的偏移量重新创建,但是然后,当任务状态从更改为任务时,它会从连接器的前一个实例到达的点继续处理,这会阻止从一开始就读取来自kafka的所有消息(我正在尝试再次读取来自kafka的所有消息)。 注意:使用新名称创建新连接器并不能解决问题。 使用任

  • 我有一个Kafka接收器任务,通过方法收听Kafka主题 但我不想自动提交偏移量,因为一旦从Kafka取出记录,我就有一些处理逻辑 从Kafka获取记录后,如果处理成功,则只有我想提交偏移量,否则它应该再次从同一偏移量读取。 我可以在Kafka consumer中看到方法,但在中找不到替代方法。