192.168.0.1 kafka1
192.168.0.2 kafka2
192.168.0.3 kafka3
/usr/local/kafka2.12-2.6.0/config/server.properties
#
broker.id=1
listeners=PLAINTEXT://kafka1:9092
advertised.listeners=PLAINTEXT://kafka1:9092
#
broker.id=2
listeners=PLAINTEXT://kafka2:9092
advertised.listeners=PLAINTEXT://kafka2:9092
#
broker.id=3
listeners=PLAINTEXT://kafka3:9092
advertised.listeners=PLAINTEXT://kafka3:9092
在开始动物园管理员和Kafka之后,创建一个新的主题
[kafka@kafka1 ~]$ bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 1 --partitions 6 --topic topic1 --config cleanup.policy=delete --config delete.retention.ms=60000
检查所有三个节点上的集群状态
[kafka@kafka1 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1
Topic: topic1 PartitionCount: 6 ReplicationFactor: 1 Configs: cleanup.policy=delete,delete.retention.ms=60000
Topic: topic1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: topic1 Partition: 1 Leader: 3 Replicas: 3 Isr: 3
Topic: topic1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: topic1 Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: topic1 Partition: 4 Leader: 3 Replicas: 3 Isr: 3
Topic: topic1 Partition: 5 Leader: 1 Replicas: 1 Isr: 1
[kafka@kafka2 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1
Topic: topic1 PartitionCount: 6 ReplicationFactor: 1 Configs: cleanup.policy=delete,delete.retention.ms=60000
Topic: topic1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: topic1 Partition: 1 Leader: 3 Replicas: 3 Isr: 3
Topic: topic1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: topic1 Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: topic1 Partition: 4 Leader: 3 Replicas: 3 Isr: 3
Topic: topic1 Partition: 5 Leader: 1 Replicas: 1 Isr: 1
[kafka@kafka3 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1
Topic: topic1 PartitionCount: 6 ReplicationFactor: 1 Configs: cleanup.policy=delete,delete.retention.ms=60000
Topic: topic1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: topic1 Partition: 1 Leader: 3 Replicas: 3 Isr: 3
Topic: topic1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: topic1 Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: topic1 Partition: 4 Leader: 3 Replicas: 3 Isr: 3
Topic: topic1 Partition: 5 Leader: 1 Replicas: 1 Isr: 1
[kafka@kafka1 ~]$ bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic topic1
>
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic1
# kafka1
[2020-08-21 02:09:57,299] WARN [Consumer clientId=consumer-console-consumer-39789-1, groupId=console-consumer-39789] Connection to node 2147483645 (kafka2/192.168.0.2:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
# kafka2
[2020-08-21 03:05:00,573] WARN [Consumer clientId=consumer-console-consumer-71891-1, groupId=console-consumer-71891] Connection to node -1 (kafka1/192.168.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
# kafka3
[2020-08-21 03:05:14,331] WARN [Consumer clientId=consumer-console-consumer-55574-1, groupId=console-consumer-55574] Connection to node -1 (kafka1/192.168.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
“侦听器”应该表示生产者和消费者可以访问的节点FQDN和端口。当生产者/消费者与Kafka联系起来时,就像你在帖子中所说的:
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic1
它打开到Kafka1:9092的连接,然后从Kafka中获取集群的映射,其中包含所有代理、它们的FQDN、端口和每个代理所领导的主题分区。
您设法创建了主题,这是使用zookeeper完成的,并成功了。
根据您的警告日志,“Broker1,2,3”被解析为192.168.0.1/2/3,因此IP解析是正常的。
尝试检查端口9092上的网络连接:从broker1运行:“Telnet broker2 9092”
或者运行producer命令访问远程代理,即:
[kafka@**kafka1** ~]$ bin/kafka-console-producer.sh --broker-list **kafka2**:9092 --topic topic1
我已经做了一些Kafka流应用程序和Kafka消费者应用程序。最后,Kafka流不是什么,而是消费来自Kafka的实时事件的消费者。所以我不知道什么时候使用Kafka流,或者为什么我们应该使用Kafka流,因为我们可以在消费者端执行所有转换。
我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.
本文向大家介绍什么是kafka消费者组?相关面试题,主要包含被问及什么是kafka消费者组?时的应答技巧和注意事项,需要的朋友参考一下 答:消费者组的概念是Apache Kafka独有的。基本上,每个Kafka消费群体都由一个或多个共同消费一组订阅主题的消费者组成。
我正在使用vscode,eslint正在运行,但它在同一项目中的其他文件中找不到某些文件中的错误。 有什么方法可以调试这个吗?像一个更详细输出的选项,它会告诉我在vscode中运行时每个文件的配置在哪里?
本文向大家介绍为什么要使用 kafka,为什么要使用消息队列?相关面试题,主要包含被问及为什么要使用 kafka,为什么要使用消息队列?时的应答技巧和注意事项,需要的朋友参考一下 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。 解耦和扩展性:项目开始的
我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没