当前位置: 首页 > 知识库问答 >
问题:

Kafka broker可能不可用例外情况

乐正宜人
2023-03-14

我和我的Kafka制作人遇到了一个奇怪的问题。我使用Kafka-0.11服务器/客户端版本。我有一个zookeper和一个kafka经纪人节点。此外,我还创建了带有3个分区的“事件”主题:

Topic:events    PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: events   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: events   Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: events   Partition: 2    Leader: 0       Replicas: 0     Isr: 0

在我的Java代码中,我创建了具有以下属性的producer:

Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(MAX_BLOCK_MS_CONFIG, 30000);
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(PARTITIONER_CLASS_CONFIG, KafkaCustomPartitioner.class);
this.producer = new KafkaProducer<>(props);

此外,我还向Producer#send()方法添加了一个回调,该方法将失败的消息添加到队列中,该队列由另一个“重新发送”线程在循环中迭代:

this.producer.send(producerRecord, new ProducerCallback(producerRecord.value(), topic));

private class ProducerCallback implements Callback {
  private final String message;
  private final String topic;

  public ProducerCallback(String message, String topic) {
    this.message = message;
    this.topic = topic;
  }

  @Override
  public void onCompletion(RecordMetadata metadata, Exception ex) {
    if (ex != null) {
        logger.error("Kafka producer error. Topic: " + topic +
                ".Message will be added into failed messages queue.", ex);
        failedMessagesQueue.enqueue(SerializationUtils.serialize(new FailedMessage(topic, message)));
    }
  }
}

private class ResenderThread extends Thread {
    private volatile boolean running = true;

    public void stopGracefully() {
        running = false;
    }

    @Override
    public void run() {
        while (running) {
            try {
                byte[] val = failedMessagesQueue.peek();
                if (val != null) {
                    FailedMessage failedMessage = SerializationUtils.deserialize(val);
                    ProducerRecord<String, String> record;
                    if (topic.equals(failedMessage.getTopic())) {
                        String messageKey = generateMessageKey(failedMessage.getMessage());
                        record = createProducerRecordWithKey(failedMessage.getMessage(), messageKey, failedMessage.getTopic());
                    } else {
                        record = new ProducerRecord<>(failedMessage.getTopic(), failedMessage.getMessage());
                    }
                    try {
                        this.producer.send(record).get();
                        failedMessagesQueue.dequeue();
                    } catch (Exception e) {
                        logger.debug("Kafka message resending attempt was failed. Topic " + failedMessage.getTopic() +
                                " Partition. " + record.partition() + ". " + e.getMessage());
                    }
                }

                Thread.sleep(200);
            } catch (Exception e) {
                logger.error("Error resending an event", e);
                break;
            }
        }
    }
}

一切正常,直到我决定测试Kafka经纪人杀/重启场景:

我已经杀死了我的Kafka经纪人节点,并使用我的Kafka生产者发送了5条消息。以下消息是我的生产者应用程序记录的:

....the application works fine....
// kafka broker was killed
2017-11-10 09:20:44,594 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,646 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,700 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,759 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,802 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
// sent 5 message using producer. message were put to the failedMessagesQueue and "re-sender" thread started resending 
2017-11-10 09:20:44,905 ERROR [com.inq.kafka.KafkaETLService] - <Kafka producer error. Topic: events.Message will be added into failed messages queue.>
....
2017-11-10 09:20:45,070 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,129 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,170 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,217 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>

// kafka broker was restarted, some strange errors were logged
2017-11-10 09:20:51,103 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 29 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,205 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 31 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,308 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 32 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,114 WARN [org.apache.kafka.clients.producer.internals.Sender] - <Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it>
2017-11-10 09:20:51,114 ERROR [com.inq.kafka.KafkaETLService] - <Kafka message resending attempt was failed. Topic events. org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.>
2017-11-10 09:20:52,485 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 33 : {events=INVALID_REPLICATION_FACTOR}>
// messages were succesfully re-sent and received by consumer..

如何处理这些日志(当Kafka broker关闭时每100ms记录一次):

[org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>

为什么我会在Kafka broker启动后收到以下错误(我没有更改任何服务器道具,也没有更改主题)。在我看来,这些错误是在broker启动期间zookeeper和kafka之间的某种同步化过程的结果,因为一段时间后procuder成功地反感我的消息。我错了吗?:

[org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 29 : {events=INVALID_REPLICATION_FACTOR}>
Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it. 

共有1个答案

宋烨烁
2023-03-14
 bin/kafka-console-consumer.sh --bootstrap-server tt01.my.tech:9092,tt02.my.tech:9092,tt03.my.tech:9092 --topic wallet-test-topic1 --from-beginning
new message from topic1
hello
hello world
123
hello again
123
what do i publish ?
[2020-02-09 16:57:21,142] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 2 (tt02.my.tech/192.168.35.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-02-09 16:57:25,999] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 2 (tt02.my.tech/192.168.35.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-02-09 16:57:58,902] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 2 (tt02.my.tech/192.168.35.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-02-09 16:57:59,024] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 3 (tt03.my.tech/192.168.35.126:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 7 messages

在消费者端,如果轮询后没有读取消息,则抛出此警告。

基本上,对.poll的调用引用了

HandleTimeDoutRequests(响应,updatedNow);

如果在此轮询中未读取任何消息,并且出现超时,则processDisconnection将抛出警告。

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
    List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
    for (String nodeId : nodeIds) {
        // close connection to the node
        this.selector.close(nodeId);
        log.debug("Disconnecting from node {} due to request timeout.", nodeId);
        processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
    }

    // we disconnected, so we should probably refresh our metadata
    if (!nodeIds.isEmpty())
        metadataUpdater.requestUpdate();
}

processDisconnection中的完全大小写匹配会引发以下警告:

        case NOT_CONNECTED:
        log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());

简而言之,从生产者和消费者的角度来看,一切都将运转良好。您应该将该消息视为任何其他警告

 类似资料:
  • 我在Windows子系统Linux上安装了kafka,并开始使用命令服务启动,所有服务都已启动。现在,当我尝试从Windows运行我的kafka-spring应用程序时,它显示以下错误:- 无法建立与节点-1(localhost/127.0.0.1:9092)的连接。经纪人可能不可用。 我的服务器属性是:- 我哪里出错了???

  • 问题内容: 我不知道我是否是唯一知道这一点的人,但是枚举的值不是隐式最终的,可以修改。 这些值通常是在实例创建()时初始化的,但是除了我自己,我从未见过有人使用final关键字来表示应为不变的枚举变量。这不是问题的重点,只是想知道我是否是唯一意识到这一点的人。 我想知道的是,是否存在用于创建可变枚举的用例? 而且我还想知道我们可以使用枚举(无论是否使用良好实践)的局限性。我还没有测试过,但是可以用

  • 我有下面的代码片段,我想用optionals重写 使用optionals,我可以想出以下方法。 我被困在无法根据用户名和电子邮件引发特定异常的部分。如果其中一个已经存在于db中,我可以返回null,这将导致orElseThrow工作,但异常类型相同。我想要两个不同情况的例外。我该怎么办?

  • 我面临着GCM设备注册的问题,但它似乎仅限于运行Android 5.0版(Lollipop)的设备。 我已经能够在除了运行Lollipop的设备之外的所有其他设备上成功地测试推送通知。我总是收到GCM服务不可用的错误。 我正在尝试运行

  • 我正在尝试在 Kotlin 中生成一个单例,并且遇到了问题,因为我无法。 这似乎是制作单例的一种非常标准的方法。为什么它不让我,我该如何解决它?

  • 问题内容: 我想描述一下AOP有效参与应用程序设计的可能情况。到目前为止,我所遇到的是: 伐木相关 安全检查 交易管理 调整旧版应用程序 还要别的吗? (不一定是基于Spring的基于代理的AOP,而是JBoss AOP。) 问题答案: 我可以举两个使用它的示例: 在JMX中自动注册对象以进行远程管理。如果使用我们的注释对一个类进行注释,则我们可以通过一个方面来监视该类的新实例,并将其自动注册到J