当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者组使用者故障转移检测

金钧
2023-03-14

我有一个只有一个分区的Kafka主题。

在任何时间点,都可以有多个kafka客户端。所有客户端都使用相同的消费者组订阅。因此,在任何给定的时间点,只有1个客户端将接收消息。假设从t0到t10,consumer1正在接收消息,但一段时间后,它停止接收消息,consuper2被选为新的领导者(可能是因为consumer2中的GC暂停)。在我的consumer1中,我希望检测故障转移发生的时间,以便它可以刷新其本地状态。

kafka客户端代码有可能吗?

共有1个答案

羊柏
2023-03-14

可以使用消费者反平衡侦听器接口中提供的 on分区调用回调方法。

从描述来看,

用户可以实现的回调方法,用于处理对自定义存储的偏移量提交。此方法将在重新平衡操作期间调用,此时使用者必须放弃一些分区。当消费者关闭或取消订阅时,也可以调用它。建议在此回调中将偏移量提交到 Kafka 或自定义偏移量存储,以防止重复数据。

示例实现:

private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
        // Add your changes here to flush
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
    }
}

例:

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class TestConsumer {
    
    KafkaConsumer<String, String> kafkaConsumer;
    
    public static void main(String[] args) {
        TestConsumer consumer = new TestConsumer();
        consumer.pollMessages();
    }

    public TestConsumer() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-example-consumer");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        
        kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("input-topic"), new ConsumerPartitionAssignmentListener());
    }
    
    public void pollMessages() {
        while(true) {
            System.out.println("Polling");
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(5000));
            System.out.println(records.count());
        }
    }
    
    private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
            // Add your changes here to flush
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
        }
    }
}

出局:

Poll
Partitions assignment listener: [input-topic-0]
0
Poll
0
Poll
Partitions revoke listener: [input-topic-0]
Partitions assignment listener: [input-topic-0]
0
Poll
0
 类似资料:
  • 问题内容: 我们目前正在使用Redis 2.8.4和StackExchange.Redis(并喜欢它),但目前没有针对硬件故障等的任何保护措施。我正在尝试使解决方案起作用,从而使我们具有主/从属和哨兵监视功能,但无法完全到达目标位置,并且在搜索后无法找到任何实际的指针。 因此,目前我们已经做到了: 每个节点上都有3个Redis服务器和哨兵(由Linux专家设置):devredis01:6383(主

  • 但是,即使我将节点显式设置到属性中,它们也不会尝试重新连接到有效的kafka节点。 如何使我的使用者在他们连接的kafka节点失败后重新连接到有效的kafka节点?

  • 我们在每个节点上都有3个redis服务器和sentinel(由Linux人员设置):devredis01:6383(主)devredis02:6383(从)devredis03:6383(从)devredis01:26379(sentinel)devredis02:26379(sentinel)devredis03:26379(sentinel) 我能够将StackExchange客户机连接到re

  • 我在端口7000、7001和7002上设置了三个服务器(一个主服务器和两个从服务器),在端口26379、26380和26381上设置了三个哨兵,它们都在同一台机器(ubuntu VM)上。 当我启动它们时,根据日志,一切看起来都很好,当我对哨兵运行信息命令时,看起来也很健康。但是当我放下主程序(通过Ctrl+C或redis-cli SLEEP命令使其停止工作)时,没有一个从程序实例被引入为新的主程

  • 我们使用MQ作为传递消息的主要路径。这是我们的制度运作不可或缺的一部分。消息代理有时会失败,所有相关的队列也会随之失败。在camel中,有没有一种方法可以启动故障切换,并在其启动时恢复到主故障切换?