当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者停止消费,如果其中一个经纪人变得不可用

满俊楠
2023-03-14

我在一台Windows主机上安装了两个Kafka 2.1.0代理。默认复制因子设置为2。所有其他设置均为默认设置。

networkClient:[Consumer ClientID=Consumer-1,GroupID=SOUT]无法建立到节点-2(/192.168.0.1:19092)的连接。代理可能不可用。

消费者:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public final class PumpToConsoleSimple {

  private static final Duration pollTimeout = Duration.ofSeconds(10);

  public static void main(String[] args) {
    final Properties consumerProperties = new Properties();

    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "sout");
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");
    consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

    try (final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProperties)) {
      kafkaConsumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
          //do nothing
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
          System.out.println("Partitions were assigned");
          kafkaConsumer.seekToBeginning(partitions);
        }
      });
      while (true) {
        final ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(pollTimeout);
        consumerRecords.forEach(r -> System.out.println(r.value()));
        kafkaConsumer.commitSync();
      }
    }
  }
}

一个制作人:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.locks.LockSupport;

public final class OnceInASecondProducerSimple {
  public static void main(String[] args) {
    final Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");

    long counter = 0;
    while (true) {
      try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
        producer.send(new ProducerRecord<>("test", "msg" + counter++));
      }
      LockSupport.parkNanos(Duration.ofSeconds(1).getNano());
    }
  }
}

共有1个答案

林魁
2023-03-14

通过kafka-topics脚本检查偏移量主题__consumer_offsets状态。失败的代理必须是组的协调器,并且__consumer_offsets的复制因子可能为1,因此使用者找不到协调器。即使您重新启动消费者,它仍然无法查找协调器。

在您的情况下,可以增加__consumer_offsets的复制因子,然后重试看看它是否按预期工作。

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 知道为什么当server0关闭时,即使主题test1的复制因子设置为2,使用者仍然停止获取消息吗? 已经有一个类似的问题,但它没有完全回答Kafka0.10快速入门:当“主要”经纪人被扳倒时,消费者失败

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。

  • 我有两台机器localhost和192.168.1.110来运行两台独立的单机kafka。 kafka2.11-0.10.0.0 bin/kafka-console-producer.sh--broker-list 192.168.1.110:9092--topic test这是一条消息[2016-08-24 18:15:27,441]错误将消息发送到topic test时出错,关键字:null,

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。