当前位置: 首页 > 知识库问答 >
问题:

Spring执行器+Kafka流-将Kafka流状态添加到健康检查endpoint

益清野
2023-03-14
management:
  health.db.enabled: false
  endpoints.web:
    base-path:
    path-mapping.health: /

当一个运行时异常被抛出并且我的流被停止时,日志显示,但是健康检查状态是向上的。

2019-09-17 13:16:31.522信息1---[Thread-5]org.apache.kafka.Streams.kafkastreams:streams-client[lpp-model-stream-7e6e8fea-fcad-4033-92a4-5ede50de6e17]Streams client停止completely

如何将kafka流状态绑定到健康检查endpoint?

  <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency>
            <dependency>
                <groupId>data-wizards</groupId>
                <artifactId>lpp-common-avro</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-streams-avro-serde</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
            </dependency>
            <dependency>
                <groupId>io.vavr</groupId>
                <artifactId>vavr</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

共有1个答案

杨和蔼
2023-03-14

KafkaStreams维护内存中的状态,该状态可以映射到执行器的健康状态。状态可以是以下状态之一:创建错误Not_runningpending_shutdown重新平衡running-它们是不言而喻的。有关状态转换,请参见文档https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kafkastreams.state.html

如果您正在寻找一个完整的示例,可以使用下面的示例并根据需要更新它(例如,您可能不将created算作状态向上)。确保在应用程序上下文中有kafkastreams类型的bean。

//Note that class name prefix before `HealthIndicator` will be camel-cased
//and used as a health component name, `kafkaStreams` here
@Component
public class KafkaStreamsHealthIndicator implements HealthIndicator {

    //if you have multiple instances, inject as Map<String, KafkaStreams>
    //Spring will map KafkaStreams instances by bean names present in context
    //so you can provide status details for each stream by name
    @Autowired
    private KafkaStreams kafkaStreams; 

    @Override
    public Health health() {
        State kafkaStreamsState = kafkaStreams.state();

        // CREATED, RUNNING or REBALANCING
        if (kafkaStreamsState == State.CREATED || kafkaStreamsState.isRunning()) {
            //set details if you need one
            return Health.up().build();
        }

        // ERROR, NOT_RUNNING, PENDING_SHUTDOWN, 
        return Health.down().withDetail("state", kafkaStreamsState.name()).build();
    }
}

然后健康endpoint将显示如下:

{
    "status": "UP",
    "kafkaStreams": {
        "status": "DOWN",
        "details": {  //not included if "UP"
            "state": "NOT_RUNNING"
        }
    }
}

 类似资料:
  • 我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。 我的用例: 一些有状态规则如下所示: 我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。

  • 当我从Spring Boot应用程序访问/healthendpoint时,它返回的状态为UP: 但我想像这样定制我的状态: 如何自定义状态?

  • 对于我一直在开发的一个微服务,我创建了一个自定义健康检查类,扩展了AbstractHealthIndicator,并能够在中获得输出 但当我向领事注册服务时,健康检查状态为失败。 尝试将执行器url配置为领事健康检查为spring。云领事发现健康检查url=http://localhost:8080/actuator/health。但它仍然失败,出现错误http://localhost:8566/

  • 我需要改变频率来检查springboot执行器中的DB运行状况。默认DB运行状况检查查询每毫秒执行一次。我想让这个查询每1分钟后执行一次,而不是毫秒。有什么方法可以自定义它吗?

  • 因此,我将Spring引导执行器添加到我的应用程序中,并在应用程序中指定。属性管理。endpoint。健康隐藏物生存时间=120秒,以缓存健康检查结果。因此,当我调用执行器/健康时,结果被缓存,效果很好。 当我调用执行器/健康/就绪或自定义创建的组时,问题开始出现。该请求结果不会被缓存。我查阅了Spring文档,只找到了主要健康终点的信息,没有找到特定人群的信息。 所以我的问题是:我错过了什么吗?

  • 我想报告应用程序的健康状态作为一个衡量标准,我希望使用相同的健康指标作为Spring启动执行器,但是,我没有看到任何可导出的组件从Spring启动执行器的依赖关系,我可能能够在这里使用。 我想编写的代码: 当然,不是导出的bean。Spring启动执行器是否导出我可以以这种方式消费的bean?