我在spring boot(消费者)应用程序中使用spring cloud stream kafka。即使应用程序无法连接到Kafka(Kafka代理已关闭),应用程序的运行状况也不准确。我读过关于Kafka健康检查的文章。在spring actuator health check中,kafka health check似乎已禁用。
因此,我成功地编写了以下代码来为我的应用程序启用Kafka健康检查。我想,我在应用程序配置和代码之间缺少了一些联系,而且我看不到Kafka的健康状况在起作用。
(1)我创建一个自定义健康指标bean如下:
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
@Configuration
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = configurationProperties.getConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
}
(2) 已创建活页夹配置:
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
@Autowired
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
// @Autowired
// private ProducerListener producerListener;
@Bean
KafkaBinderConfigurationProperties configurationProperties(KafkaProperties kafkaProperties) {
return new KafkaBinderConfigurationProperties();
}
@Bean
KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, new Kafka10AdminUtilsOperation());
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties,
provisioningProvider);
// kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
return kafkaMessageChannelBinder;
}
@Bean
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
}
我添加的应用程序属性:
经营健康活页夹。启用=真,管理。健康Kafka。启用=真
=====================================当我在本地启动我的应用程序并点击/healthendpoint时,我看到Kafka的以下内容:
"binders": {
"status": "UNKNOWN",
"kafka": {
"status": "UNKNOWN"
}
},
通过使用最新版本的“spring cloud stream binder kafka”解决了这个问题。我最初使用的是一个旧版本(比1.3.0.RELEASE旧的版本),而Kafka的健康检查不起作用。正如@Sobychacko所建议的,我使用了最新版本2.0.0 REALEASE,kafka活页夹的健康检查很好:)没有定制健康指标bean。
“binders”:{“status”:“UP”,“kafka”:{“status”:“UP”,“healthIndicator”:{“status”:“UP”}},
该检查也适用于1.3.0版。释放
这将为应用程序添加几个有用的endpoint。其中之一是/健康。当您启动应用程序并导航到/healthendpoint时,您将看到它已经返回了一些数据。 如何在春靴健康中添加自定义健康检查?
SOFABoot 为 Spring Boot 的健康检查能力增加了 Readiness Check 的能力。如果你需要使用 SOFA 中间件,那么建议使用 SOFABoot 的健康检查能力的扩展,来更优雅的上线应用实例 引入健康检查扩展 要引入 SOFABoot 的健康检查能力的扩展,只需要引入以下的 Starter 即可: <dependency> <groupId>com.alipay
健康检查配置概述。 filter.http.HealthCheck filter.http.HealthCheck proto { "pass_through_mode": "{...}", "endpoint": "...", "cache_time": "{...}" } pass_through_mode (BoolValue, REQUIRED) 指定过滤器是否在传递模式下运
健康检查架构概述。如果为集群配置了健康检查,则会发出相应的统计信息。详见请参考统计相关文档。 HealthCheck HealthCheck.Payload HealthCheck.HttpHealthCheck HealthCheck.TcpHealthCheck HealthCheck.RedisHealthCheck HealthCheck HealthCheck proto { "ti
健康检查架构概述。 如果为集群配置了健康检查,则会发出相应的统计信息。并且记录在这里。 { "type": "...", "timeout_ms": "...", "interval_ms": "...", "unhealthy_threshold": "...", "healthy_threshold": "...", "path": "...", "send": [
健康检查配置概述。 { "name": "health_check", "config": { "pass_through_mode": "...", "endpoint": "...", "cache_time_ms": "..." } } pass_through_mode (required, boolean) 指定过滤器是否在通过模式下运行。 end