当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka自定义健康检查不起作用

翟誉
2023-03-14

我在spring boot(消费者)应用程序中使用spring cloud stream kafka。即使应用程序无法连接到Kafka(Kafka代理已关闭),应用程序的运行状况也不准确。我读过关于Kafka健康检查的文章。在spring actuator health check中,kafka health check似乎已禁用。

因此,我成功地编写了以下代码来为我的应用程序启用Kafka健康检查。我想,我在应用程序配置和代码之间缺少了一些联系,而且我看不到Kafka的健康状况在起作用。

(1)我创建一个自定义健康指标bean如下:

      import java.util.HashMap;
      import java.util.Map;

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.common.serialization.ByteArrayDeserializer;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
      import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
      import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
      import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.core.ConsumerFactory;
      import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
      import org.springframework.util.ObjectUtils;

      @Configuration
      @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
      public class KafkaBinderHealthIndicatorConfiguration {

        @Bean
        KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
            KafkaBinderConfigurationProperties configurationProperties) {
          Map<String, Object> props = new HashMap<>();
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
          Map<String, Object> mergedConfig = configurationProperties.getConsumerConfiguration();
          if (!ObjectUtils.isEmpty(mergedConfig)) {
            props.putAll(mergedConfig);
          }
          if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
          }
          ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
          KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
          indicator.setTimeout(configurationProperties.getHealthTimeout());
          return indicator;
        }
      }

(2) 已创建活页夹配置:

          import java.io.IOException;

          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
          import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
          import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
          import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
          import org.springframework.boot.context.properties.EnableConfigurationProperties;
          import org.springframework.cloud.stream.binder.Binder;
          import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
          import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
          import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
          import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
          import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.context.annotation.Import;
          import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;

          @Configuration
          @ConditionalOnMissingBean(Binder.class)
          @Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
              KafkaBinderHealthIndicatorConfiguration.class })
          @EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
          public class KafkaBinderConfiguration {

            @Autowired
            private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;

          //  @Autowired
          //  private ProducerListener               producerListener;

            @Bean
            KafkaBinderConfigurationProperties configurationProperties(KafkaProperties kafkaProperties) {
              return new KafkaBinderConfigurationProperties();
            }

            @Bean
            KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
              return new KafkaTopicProvisioner(configurationProperties, new Kafka10AdminUtilsOperation());
            }

            @Bean
            KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
                KafkaTopicProvisioner provisioningProvider) {

              KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties,
                  provisioningProvider);
          //    kafkaMessageChannelBinder.setProducerListener(producerListener);
              kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
              return kafkaMessageChannelBinder;
            }

            @Bean
            public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
              return new KafkaJaasLoginModuleInitializer();
            }

          }

我添加的应用程序属性:

经营健康活页夹。启用=真,管理。健康Kafka。启用=真

=====================================当我在本地启动我的应用程序并点击/healthendpoint时,我看到Kafka的以下内容:

 "binders": {
     "status": "UNKNOWN",
     "kafka": {
     "status": "UNKNOWN"
     }
  },

共有1个答案

班高明
2023-03-14

通过使用最新版本的“spring cloud stream binder kafka”解决了这个问题。我最初使用的是一个旧版本(比1.3.0.RELEASE旧的版本),而Kafka的健康检查不起作用。正如@Sobychacko所建议的,我使用了最新版本2.0.0 REALEASE,kafka活页夹的健康检查很好:)没有定制健康指标bean。

“binders”:{“status”:“UP”,“kafka”:{“status”:“UP”,“healthIndicator”:{“status”:“UP”}},

该检查也适用于1.3.0版。释放

 类似资料:
  • 这将为应用程序添加几个有用的endpoint。其中之一是/健康。当您启动应用程序并导航到/healthendpoint时,您将看到它已经返回了一些数据。 如何在春靴健康中添加自定义健康检查?

  • SOFABoot 为 Spring Boot 的健康检查能力增加了 Readiness Check 的能力。如果你需要使用 SOFA 中间件,那么建议使用 SOFABoot 的健康检查能力的扩展,来更优雅的上线应用实例 引入健康检查扩展 要引入 SOFABoot 的健康检查能力的扩展,只需要引入以下的 Starter 即可: <dependency> <groupId>com.alipay

  • 健康检查配置概述。 filter.http.HealthCheck filter.http.HealthCheck proto { "pass_through_mode": "{...}", "endpoint": "...", "cache_time": "{...}" } pass_through_mode (BoolValue, REQUIRED) 指定过滤器是否在传递模式下运

  • 健康检查架构概述。如果为集群配置了健康检查,则会发出相应的统计信息。详见请参考统计相关文档。 HealthCheck HealthCheck.Payload HealthCheck.HttpHealthCheck HealthCheck.TcpHealthCheck HealthCheck.RedisHealthCheck HealthCheck HealthCheck proto { "ti

  • 健康检查架构概述。 如果为集群配置了健康检查,则会发出相应的统计信息。并且记录在这里。 { "type": "...", "timeout_ms": "...", "interval_ms": "...", "unhealthy_threshold": "...", "healthy_threshold": "...", "path": "...", "send": [

  • 健康检查配置概述。 { "name": "health_check", "config": { "pass_through_mode": "...", "endpoint": "...", "cache_time_ms": "..." } } pass_through_mode (required, boolean) 指定过滤器是否在通过模式下运行。 end