我想获取一个接收器连接器的sink-record-active-count值,这里给出https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics
我有所有容器运行在docker桌面使用docker compose文件docker ps docker ps
我使用了汇合度量报告器来完成该任务。
同样参考https://docs.confluent.io/5.4.0/kafka/metrics-reporter.html和https://neo4j.com/docs/labs/neo4j-streams/current/examples/#_confluent_with_docker,我将env变量添加到kafka容器中,如下所示
kafka-service:
image: confluentinc/cp-enterprise-kafka:5.4.0
container_name: kafka
depends_on:
- zookeeper
links:
- zookeeper
ports:
- 9092:9092
expose:
- "29092"
environment:
METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_REPORTER_TOPIC_CREATE: 'true'
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: http://kafka-service:29092
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
command:
- bash
- -c
- |
echo '127.0.0.1 kafka-service' >> /etc/hosts
/etc/confluent/docker/run
sleep infinity
在Kafka的日志中我得到了这个信息
INFO Created metrics reporter主题_Confluent-度量(IO.Confluent.metrics.reporter.ConfluentMetricsReporter)
我不知道我怎么能从Java的主题阅读。这个主题是否具有所需的度量标准?与接收器连接器有关?
第三,在此页面https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics中给出了MBean。我也不知道如何使用它。如果它也需要JMX,我尝试在以下https://rmoff.net/2018/09/17/accessing-kafka-docker-containser-jmx-from-host/之后放置KAFKA_JMX_HOSTNAME=localhost和kafka_jmx_port:9010,但我不确定如何继续。
据我所知,ConfluentMetricsReporter
是专有的二进制格式,如果没有作为Control Center一部分的反序列化库,就无法读取。
我建议使用Prometheus JMX Exporter+Grafana在Kafka消费者应用程序或控制中心之外可视化这些数据(这样的设置已经由Confluent Helm Charts提供了)
旁白:请不要编辑任何/etc/hosts
文件
问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:
问题内容: 在搜索如何通过API创建Kafka主题时,我在Scala中找到了以下示例: 最后一个arg 显然是Scala对象。我不清楚如何使该示例在Java中工作。 这篇文章如何在Clojure中创建Scala对象的问题在Clojure中提出了相同的问题,答案是: 我认为Java中的翻译成: 但是,当我尝试使用该方法(或其他任何数量的变体)时,它们都无法编译。 编译错误是: 我正在使用kafka_
在阅读Kafka主题时,我得到了奇怪的ArrayIndexOutOfBoundsException。花了很多时间却搞不清问题所在。有人能在这方面提供帮助/建议吗。这是我的日志。
我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头
使用命令时: /usr/local/kafka/bin/kafka-topics.sh--创建--zookeeper localhost:2181--复制-因子1--分区1--主题测试 提前道谢。