当前位置: 首页 > 知识库问答 >
问题:

使用Resilience4j库具有断路器功能的spring kafka消费者

卫沈义
2023-03-14

我正在尝试实现Spring kafka消费者,它需要在处理事件时出现某个异常后暂停(例如:在将事件信息存储到DB时,DB已关闭)。

我们如何在spring boot-2.3.8(spring kafka)中使用Resilience4j断路器方法来处理这种情况

寻找一些消费者暂停和恢复的例子。

@Component
public class CircuitBreakerManager {

    private CircuitBreaker circuitBreaker;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public CircuitBreakerManager() {
        CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
                .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
                .enableAutomaticTransitionFromOpenToHalfOpen()
                .minimumNumberOfCalls(5)
                .permittedNumberOfCallsInHalfOpenState(3)
                .slidingWindowSize(10)
                .failureRateThreshold(50)
                .slowCallRateThreshold(60.0f)
                .slowCallDurationThreshold(Duration.ofSeconds(3))
                .build();
        CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(circuitBreakerConfig);
        this.circuitBreaker = registry.circuitBreaker("serialization_exception");
        this.circuitBreaker.getEventPublisher().onStateTransition(this::onStateChange);
    }

    private void onStateChange(CircuitBreakerOnStateTransitionEvent circuitBreakerEvent) {
        CircuitBreaker.State toState = circuitBreakerEvent.getStateTransition()
                .getToState();
        System.out.println("Change in Circuit Breaker state " + toState);
        switch (toState) {
            case OPEN:
                    kafkaListenerEndpointRegistry.getListenerContainer("my_listener_id").stop();
                break;
            case CLOSED:
                break;
            case HALF_OPEN:
                kafkaListenerEndpointRegistry.getListenerContainer("my_listener_id").start();
                break;
        }
    }


}

在Kafka,listerner只是想捕捉解析错误。如果出现5个以上的解析错误,则需要停止侦听器。但我不确定断路器将如何触发。

@CircuitBreaker(name = RESILIENCE4J_INSTANCE_NAME)
    private Event getParsedEvent(ConsumerRecord consumerRecord) {
        Event event = getEvent(consumerRecord);
        
            if (StringUtils.isEmpty(event)) {
                
                throw new RuntimeException("Serialization Exception occurred");
            }
        }
        return event;
    }

共有1个答案

吕扬
2023-03-14

请参阅暂停和恢复侦听器容器

请注意,只有处理完当前轮询返回的所有记录(或者侦听器抛出异常,只要有默认的错误处理程序),暂停才会生效。

 类似资料:
  • 有没有一种方法实现一个断路器模式与SpringKafka为基础的消费者。我想知道,在实现我的Spring kafka consumer时,如果基于某个外部系统的数据处理失败并引发网络错误,是否可以停止使用记录。但是,如果解决了网络问题,消费者应该再次正常处理。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 是否可以通过注释在Spring Cloud Circuit Breaker上使用Resilience4j?我找不到任何关于它的留档,只有关于通过代码使用弹性4j的示例

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用