当前位置: 首页 > 知识库问答 >
问题:

如何在Java中读取由kafka创建的_confluent-Metrics主题

谭曦
2023-03-14

我想获取一个接收器连接器的sink-record-active-count值,这里给出https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics

我有所有容器运行在docker桌面使用docker compose文件docker ps docker ps

我使用了汇合度量报告器来完成该任务。

同样参考https://docs.confluent.io/5.4.0/kafka/metrics-reporter.html和https://neo4j.com/docs/labs/neo4j-streams/current/examples/#_confluent_with_docker,我将env变量添加到kafka容器中,如下所示

 kafka-service:
    image: confluentinc/cp-enterprise-kafka:5.4.0
    container_name: kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
    ports:
      - 9092:9092
    expose:
      - "29092"
    environment:
      METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_REPORTER_TOPIC_CREATE: 'true'
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: http://kafka-service:29092
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
    command:
      - bash
      - -c 
      - |
        echo '127.0.0.1 kafka-service' >> /etc/hosts
        /etc/confluent/docker/run
        sleep infinity  

在Kafka的日志中我得到了这个信息

INFO Created metrics reporter主题_Confluent-度量(IO.Confluent.metrics.reporter.ConfluentMetricsReporter)

我不知道我怎么能从Java的主题阅读。这个主题是否具有所需的度量标准?与接收器连接器有关?

第三,在此页面https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics中给出了MBean。我也不知道如何使用它。如果它也需要JMX,我尝试在以下https://rmoff.net/2018/09/17/accessing-kafka-docker-containser-jmx-from-host/之后放置KAFKA_JMX_HOSTNAME=localhost和kafka_jmx_port:9010,但我不确定如何继续。

共有1个答案

施选
2023-03-14

据我所知,ConfluentMetricsReporter是专有的二进制格式,如果没有作为Control Center一部分的反序列化库,就无法读取。

我建议使用Prometheus JMX Exporter+Grafana在Kafka消费者应用程序或控制中心之外可视化这些数据(这样的设置已经由Confluent Helm Charts提供了)

旁白:请不要编辑任何/etc/hosts文件

 类似资料:
  • 问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:

  • 问题内容: 在搜索如何通过API创建Kafka主题时,我在Scala中找到了以下示例: 最后一个arg 显然是Scala对象。我不清楚如何使该示例在Java中工作。 这篇文章如何在Clojure中创建Scala对象的问题在Clojure中提出了相同的问题,答案是: 我认为Java中的翻译成: 但是,当我尝试使用该方法(或其他任何数量的变体)时,它们都无法编译。 编译错误是: 我正在使用kafka_

  • 在阅读Kafka主题时,我得到了奇怪的ArrayIndexOutOfBoundsException。花了很多时间却搞不清问题所在。有人能在这方面提供帮助/建议吗。这是我的日志。

  • 我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头

  • 使用命令时: /usr/local/kafka/bin/kafka-topics.sh--创建--zookeeper localhost:2181--复制-因子1--分区1--主题测试 提前道谢。