我有一个只有一个分区的Kafka主题。
在任何时间点,都可以有多个kafka客户端。所有客户端都使用相同的消费者组订阅。因此,在任何给定的时间点,只有1个客户端将接收消息。假设从t0到t10,consumer1正在接收消息,但一段时间后,它停止接收消息,consuper2被选为新的领导者(可能是因为consumer2中的GC暂停)。在我的consumer1中,我希望检测故障转移发生的时间,以便它可以刷新其本地状态。
kafka客户端代码有可能吗?
可以使用消费者反平衡侦听器
接口中提供的 on分区
调用回调方法。
从描述来看,
用户可以实现的回调方法,用于处理对自定义存储的偏移量提交。此方法将在重新平衡操作期间调用,此时使用者必须放弃一些分区。当消费者关闭或取消订阅时,也可以调用它。建议在此回调中将偏移量提交到 Kafka 或自定义偏移量存储,以防止重复数据。
示例实现:
private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}
例:
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class TestConsumer {
KafkaConsumer<String, String> kafkaConsumer;
public static void main(String[] args) {
TestConsumer consumer = new TestConsumer();
consumer.pollMessages();
}
public TestConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-example-consumer");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("input-topic"), new ConsumerPartitionAssignmentListener());
}
public void pollMessages() {
while(true) {
System.out.println("Polling");
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(5000));
System.out.println(records.count());
}
}
private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}
}
出局:
Poll
Partitions assignment listener: [input-topic-0]
0
Poll
0
Poll
Partitions revoke listener: [input-topic-0]
Partitions assignment listener: [input-topic-0]
0
Poll
0
问题内容: 我们目前正在使用Redis 2.8.4和StackExchange.Redis(并喜欢它),但目前没有针对硬件故障等的任何保护措施。我正在尝试使解决方案起作用,从而使我们具有主/从属和哨兵监视功能,但无法完全到达目标位置,并且在搜索后无法找到任何实际的指针。 因此,目前我们已经做到了: 每个节点上都有3个Redis服务器和哨兵(由Linux专家设置):devredis01:6383(主
但是,即使我将节点显式设置到属性中,它们也不会尝试重新连接到有效的kafka节点。 如何使我的使用者在他们连接的kafka节点失败后重新连接到有效的kafka节点?
我们在每个节点上都有3个redis服务器和sentinel(由Linux人员设置):devredis01:6383(主)devredis02:6383(从)devredis03:6383(从)devredis01:26379(sentinel)devredis02:26379(sentinel)devredis03:26379(sentinel) 我能够将StackExchange客户机连接到re
我在端口7000、7001和7002上设置了三个服务器(一个主服务器和两个从服务器),在端口26379、26380和26381上设置了三个哨兵,它们都在同一台机器(ubuntu VM)上。 当我启动它们时,根据日志,一切看起来都很好,当我对哨兵运行信息命令时,看起来也很健康。但是当我放下主程序(通过Ctrl+C或redis-cli SLEEP命令使其停止工作)时,没有一个从程序实例被引入为新的主程
我们使用MQ作为传递消息的主要路径。这是我们的制度运作不可或缺的一部分。消息代理有时会失败,所有相关的队列也会随之失败。在camel中,有没有一种方法可以启动故障切换,并在其启动时恢复到主故障切换?