我想监控Kafka指标,但不幸的是,在/exactor/prometheus endpoint下没有与Kafka相关的内容。我的设置有什么遗漏吗?
management:
server:
port: 8081
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
spring:
jmx:
enabled: true
kafka:
bootstrap-servers: ...
consumer:
group-id: my-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
ssl:
key-store-location: ...
key-store-password: ...
security:
protocol: SSL
我的接收器看起来像:
@Bean
fun someEventReceiver(): SomeEventReceiver =
KafkaReceiver.create(
ReceiverOptions.create<String, SomeEvent>(kafkaProperties.buildConsumerProperties())
.withValueDeserializer(SomeEvenDeserializer())
.subscription(listOf(serviceProperties.kafka.topics.someevent))
)
听众:
@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
someEventReceiver
.receive()
.groupBy { it.receiverOffset().topicPartition() }
.publishOn(Schedulers.boundedElastic())
.flatMap { someEvent ->
someEvent
.publishOn(Schedulers.boundedElastic())
.delayUntil(::handleEvent)
.doOnNext { it.receiverOffset().acknowledge() }
.retryWhen(Retry.backoff(10, Duration.ofMillis(100)))
}
.retryWhen(Retry.indefinitely())
.subscribe()
}
继续@gary russell的建议(再次感谢您的帮助!),我在构建监听器时采用了稍微不同的方法,以减少代码量,因为在我的项目中有很多消费者。
class KafkaReceiverWithMetrics<K, V>(
private val receiver: KafkaReceiver<K, V>,
private val consumerId: String,
private val metricsListener: MicrometerConsumerListener<K, V>, ) : KafkaReceiver<K, V> by receiver {
override fun receive(): Flux<ReceiverRecord<K, V>> =
receiver.receive()
.doOnSubscribe {
receiver
.doOnConsumer { consumer -> metricsListener.consumerAdded(consumerId, consumer) }
.subscribe()
} }
@Bean
fun someEventReceiver(): SomeEventReceiver =
KafkaReceiverWithMetrics(
KafkaReceiver.create(
ReceiverOptions.create<String, SomeEvent>(kafkaProperties.buildConsumerProperties())
.withValueDeserializer(SomeEventDeserializer())
.subscription(listOf(topics.someEvent))
),
topics.someEvent,
MicrometerConsumerListener(meterRegistry)
)
与sping-kafka
不同,report-kafka
目前没有任何微米集成。
如果类路径上也有sping-kafka,则可以利用其Micrometer消费者监听器
将一个KafkaClientMetrics
绑定到仪表注册表(或者您可以自己进行注册绑定)。
下面是一个使用Spring listener的示例:
@SpringBootApplication
public class So66706766Application {
public static void main(String[] args) {
SpringApplication.run(So66706766Application.class, args);
}
@Bean
ApplicationRunner runner(MicrometerConsumerListener<String, String> consumerListener) {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "so66706766"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(Collections.singletonList("so66706766"));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(ro);
receiver.receive()
.doOnNext(rec -> {
System.out.println(rec.value());
rec.receiverOffset().acknowledge();
})
.subscribe();
receiver.doOnConsumer(consumer -> {
consumerListener.consumerAdded("myConsumer", consumer);
return Mono.empty();
}).subscribe();
};
}
@Bean
MicrometerConsumerListener<String, String> consumerListener(MeterRegistry registry) {
return new MicrometerConsumerListener<>(registry);
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so66706766").partitions(1).replicas(1).build();
}
}
和
# HELP kafka_consumer_successful_authentication_total The total number of connections with successful authentication
# TYPE kafka_consumer_successful_authentication_total counter
kafka_consumer_successful_authentication_total{client_id="consumer-so66706766-1",kafka_version="2.6.0",spring_id="myConsumer",} 0.0
# HELP jvm_gc_live_data_size_bytes Size of long-lived heap memory pool after reclamation
# TYPE jvm_gc_live_data_size_bytes gauge
jvm_gc_live_data_size_bytes 0.0
# HELP kafka_consumer_connection_creation_rate The number of new connections established per second
# TYPE kafka_consumer_connection_creation_rate gauge
kafka_consumer_connection_creation_rate{client_id="consumer-so66706766-1",kafka_version="2.6.0",spring_id="myConsumer",} 0.07456936193482637
...
我添加了一个问题:https://github.com/reactor/reactor-kafka/issues/206
我试图让prometheus监视我的入口nginx部署工作。我可以在prometheus用户界面中查看默认指标,但与nginx无关。 我已经通过集群监控部署了prometheus-运算符(同时更新serviceMonitorSelector和serviceMonitorNamespaceSelector值):https://github.com/carlosedp/cluster-monitori
Spring Boot actuator endpoint/Prometheus没有列出与tomcat相关的指标,比如请求计数。但它有和等度量标准。 少了什么吗?
我有一个Spring Boot应用程序(版本2.3.4),我正在使用@kafkalistener来消费记录。我还使用执行器和千分尺(版本1.5.5)来进行度量。 问题是我看不到/acture/prometheus中的Kafka度量。我正在使用以下依赖项: 请注意,当我使用默认KafkaTemplate时,这些度量是可见的,但当试图创建自定义KafkaTemplate时,这些度量会消失:
本文向大家介绍Kafka提供的保证是什么?相关面试题,主要包含被问及Kafka提供的保证是什么?时的应答技巧和注意事项,需要的朋友参考一下 答:生产者向特定主题分区发送的消息的顺序相同。 此外,消费者实例按照它们存储在日志中的顺序查看记录。 此外,即使不丢失任何提交给日志的记录,我们也可以容忍最多N-1个服务器故障。
我为普罗米修斯和Actuator添加了依赖项: 但是,如果我去endpoint /actuator/promehteuslog4j2_events_total指标是不到位的,即使我还添加了log4j2依赖从Spring Boot启动器,我错过了一些额外的配置吗?