当前位置: 首页 > 知识库问答 >
问题:

春靴Kafka客户端有“断路器”吗?

夏才
2023-03-14

如果Kafka服务器(暂时)关闭,我的Spring Boot应用程序reactiveKafKaconsumerTemplate将继续尝试连接,但失败,从而导致不必要的通信量并扰乱日志文件:

2021-11-10 14:45:30.265  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected

我知道参数reconnect.backoff.ms,这就是我如何创建reactiveKafKaconsumerTemplatebean:

@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
    final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
    map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
    map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
    final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("com.example.myapplication");

    return new ReactiveKafkaConsumerTemplate<>(
            ReceiverOptions
                    .<String, MyEvent>create(map)
                    .withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
                    .withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
                    .subscription(List.of("MyTopic")));
}

消费者仍然试图每3秒连接一次。

共有1个答案

阚原
2023-03-14

参见https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms

试图重新连接到给定主机之前等待的基本时间。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试。

和https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms

重新连接到多次连接失败的代理时等待的最长时间(毫秒)。如果提供,则每台主机的退避量将随着连续的连接失败而呈指数增长,直至此最大值。计算退避增加后,增加20%的随机抖动以避免连接Storm。

 类似资料:
  • Netflix的创造了一个调用的库Hystrix实现了断路器图案。在微服务架构中,通常有多层服务调用。 图1.微服务图 较低级别的服务中的服务故障可能导致用户级联故障。当对特定服务的呼叫达到一定阈值时(Hystrix中的默认值为5秒内的20次故障),电路打开,不进行通话。在错误和开路的情况下,开发人员可以提供后备。 图2. Hystrix回退防止级联故障 开放式电路会停止级联故障,并允许不必要的或

  • 我想在我的spring boot项目中使用Kafka Streams实时处理。所以我需要Kafka Streams配置,或者我想使用KStreams或KTable,但我在互联网上找不到示例。 我做了制作人和消费者现在我想流实时。

  • 我有一个Spring Boot2.25.1应用程序,它使用Camel 2.25.1与camel-kafka,一切都正常工作…在我的Kafka消费者中,我需要添加该功能以按需暂停消费,因此我升级到camel 3.18.1,以便我可以使用可暂停功能。升级到3.18.1后,我收到错误FileNotes与类文件TimeoutAwareAggregationStategy.class. 当我打开camel-

  • 我目前只有一个manager和一个worker节点。 我使用以下命令创建了一个覆盖网络: 在manager上,我使用以下名为“docker-compose.Consul.master.yml”的组合文件部署Consul: 我使用以下命令将此服务部署到堆栈: Spring Boot应用程序在启动时失败,出现以下错误: 注意,工作人员认为Consul正在运行的IP地址是10.0.2.110 作为附加信

  • 有没有一种方法实现一个断路器模式与SpringKafka为基础的消费者。我想知道,在实现我的Spring kafka consumer时,如果基于某个外部系统的数据处理失败并引发网络错误,是否可以停止使用记录。但是,如果解决了网络问题,消费者应该再次正常处理。

  • 断路器将处于闭合或半断开状态无限时间,直到达到最小的呼叫次数,对吗?有什么办法我可以设置什么时候没有调用在数量的时间,它将转为关闭状态?另外,在半开状态下,是否有可能使最小呼叫数大于允许的呼叫数?谢谢。