当前位置: 首页 > 知识库问答 >
问题:

为什么不能用Kafka正确使用消费者?

施永宁
2023-03-14
192.168.0.1  kafka1
192.168.0.2  kafka2
192.168.0.3  kafka3

/usr/local/kafka2.12-2.6.0/config/server.properties

#
broker.id=1
listeners=PLAINTEXT://kafka1:9092
advertised.listeners=PLAINTEXT://kafka1:9092
#
broker.id=2
listeners=PLAINTEXT://kafka2:9092
advertised.listeners=PLAINTEXT://kafka2:9092
#
broker.id=3
listeners=PLAINTEXT://kafka3:9092
advertised.listeners=PLAINTEXT://kafka3:9092

在开始动物园管理员和Kafka之后,创建一个新的主题

[kafka@kafka1 ~]$ bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 1 --partitions 6 --topic topic1 --config cleanup.policy=delete --config delete.retention.ms=60000

检查所有三个节点上的集群状态

[kafka@kafka1 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1
Topic: topic1   PartitionCount: 6   ReplicationFactor: 1    Configs: cleanup.policy=delete,delete.retention.ms=60000
    Topic: topic1   Partition: 0    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic1   Partition: 1    Leader: 3   Replicas: 3 Isr: 3
    Topic: topic1   Partition: 2    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic1   Partition: 3    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic1   Partition: 4    Leader: 3   Replicas: 3 Isr: 3
    Topic: topic1   Partition: 5    Leader: 1   Replicas: 1 Isr: 1
[kafka@kafka2 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1
Topic: topic1   PartitionCount: 6   ReplicationFactor: 1    Configs: cleanup.policy=delete,delete.retention.ms=60000
    Topic: topic1   Partition: 0    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic1   Partition: 1    Leader: 3   Replicas: 3 Isr: 3
    Topic: topic1   Partition: 2    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic1   Partition: 3    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic1   Partition: 4    Leader: 3   Replicas: 3 Isr: 3
    Topic: topic1   Partition: 5    Leader: 1   Replicas: 1 Isr: 1
[kafka@kafka3 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1
Topic: topic1   PartitionCount: 6   ReplicationFactor: 1    Configs: cleanup.policy=delete,delete.retention.ms=60000
    Topic: topic1   Partition: 0    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic1   Partition: 1    Leader: 3   Replicas: 3 Isr: 3
    Topic: topic1   Partition: 2    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic1   Partition: 3    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic1   Partition: 4    Leader: 3   Replicas: 3 Isr: 3
    Topic: topic1   Partition: 5    Leader: 1   Replicas: 1 Isr: 1
[kafka@kafka1 ~]$ bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic topic1
>
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic1
# kafka1
[2020-08-21 02:09:57,299] WARN [Consumer clientId=consumer-console-consumer-39789-1, groupId=console-consumer-39789] Connection to node 2147483645 (kafka2/192.168.0.2:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
# kafka2
[2020-08-21 03:05:00,573] WARN [Consumer clientId=consumer-console-consumer-71891-1, groupId=console-consumer-71891] Connection to node -1 (kafka1/192.168.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
# kafka3
[2020-08-21 03:05:14,331] WARN [Consumer clientId=consumer-console-consumer-55574-1, groupId=console-consumer-55574] Connection to node -1 (kafka1/192.168.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092

共有1个答案

聂永怡
2023-03-14

“侦听器”应该表示生产者和消费者可以访问的节点FQDN和端口。当生产者/消费者与Kafka联系起来时,就像你在帖子中所说的:

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic1

它打开到Kafka1:9092的连接,然后从Kafka中获取集群的映射,其中包含所有代理、它们的FQDN、端口和每个代理所领导的主题分区。

您设法创建了主题,这是使用zookeeper完成的,并成功了。

根据您的警告日志,“Broker1,2,3”被解析为192.168.0.1/2/3,因此IP解析是正常的。

尝试检查端口9092上的网络连接:从broker1运行:“Telnet broker2 9092”

或者运行producer命令访问远程代理,即:

[kafka@**kafka1** ~]$ bin/kafka-console-producer.sh --broker-list **kafka2**:9092 --topic topic1
 类似资料:
  • 我已经做了一些Kafka流应用程序和Kafka消费者应用程序。最后,Kafka流不是什么,而是消费来自Kafka的实时事件的消费者。所以我不知道什么时候使用Kafka流,或者为什么我们应该使用Kafka流,因为我们可以在消费者端执行所有转换。

  • 我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.

  • 本文向大家介绍什么是kafka消费者组?相关面试题,主要包含被问及什么是kafka消费者组?时的应答技巧和注意事项,需要的朋友参考一下 答:消费者组的概念是Apache Kafka独有的。基本上,每个Kafka消费群体都由一个或多个共同消费一组订阅主题的消费者组成。

  • 我正在使用vscode,eslint正在运行,但它在同一项目中的其他文件中找不到某些文件中的错误。 有什么方法可以调试这个吗?像一个更详细输出的选项,它会告诉我在vscode中运行时每个文件的配置在哪里?

  • 本文向大家介绍为什么要使用 kafka,为什么要使用消息队列?相关面试题,主要包含被问及为什么要使用 kafka,为什么要使用消息队列?时的应答技巧和注意事项,需要的朋友参考一下 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。 解耦和扩展性:项目开始的

  • 我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没