当我尝试使用Kafka producer and consumer(0.9.0)脚本从一个主题推/拉消息时,我得到以下错误。
[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> [2016-01-13 02:47:18,620] WARN
> [console-consumer-90116_f89a0b380f19-1452653212738-9f857257-leader-finder-thread],
> Failed to find leader for Set([test,0])
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(test)] from broker
> [ArrayBuffer(BrokerEndPoint(0,192.168.99.100,9092))] failed at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.EOFException at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
> at
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:119) at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> ... 3 more
为什么我会得到这个错误,我如何解决它?
在Mac上运行Docker容器中的所有组件。ZooKeeper和Kafka分别运行在Docker容器中。
Docker计算机(boot2docker)IP地址:192.168.99.100
ZooKeeper端口:2181
Kafka端口:9092
host.name=localhost
broker.id=0
port=9092
advertised.host.name=192.168.99.100
advertised.port=9092
我从kafka server Docker容器中运行以下命令。我已经创建了一个带有一个分区和复制因子为1的主题。
请注意,领导指定为0,这可能是问题的一部分。
root@f89a0b380f19:/opt/kafka/dist# ./bin/kafka-topics.sh --zookeeper 192.168.99.100:2181 --topic test --describe
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
然后我执行以下操作来发送一些消息:
root@f89a0b380f19:/opt/kafka/dist# ./bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic test
one message
two message
three message
four message
[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-01-13 02:50:40,080] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-01-13 02:51:40,081] ERROR Error when sending message to topic test with key: null, value: 13 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-01-13 02:52:40,083] ERROR Error when sending message to topic test with key: null, value: 12 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
root@f89a0b380f19:/opt/kafka/dist# ./bin/kafka-console-consumer.sh --zookeeper 192.168.99.100:2181 --topic test --from-beginning
我已确认端口2181
和9092
是打开的,可以从Kafka Docker容器中访问:
root@f89a0b380f19:/# nc -z 192.168.99.100 2181; echo $?;
0
root@f89a0b380f19:/# nc -z 192.168.99.100 9092; echo $?;
0
解决办法完全不是我所期望的。错误消息与实际发生的情况不符。
主要问题是将Docker中的日志目录挂载到本地文件系统。我的Docker run
命令使用卷挂载将容器中的Kafkalog.dir
文件夹挂载到实际挂载到我的Mac的主机VM上的本地目录。后一点才是问题所在。
例如,
docker run --name kafka -v /Users/<me>/kafka/logs:/var/opt/kafka:rw -p 9092:9092 -d kafka
我正在尝试编写一个简单的java kafka consumer,使用与中类似的代码读取数据https://github.com/bkimminich/apache-kafka-book-examples/blob/master/src/test/kafka/consumer/SimpleHLConsumer.java. 看起来我的应用程序可以连接,但它无法获取任何数据。请建议。 下面是我在ecli
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi
我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息: 我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。 我提到了以下问题,但其中的答案对我没有帮助: 从spark中的Kafka消息获取主题 那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动? 对此任何帮助都将不胜感激。谢谢你。
然而,当在我的环境中测试此示例时,我得到了一个异常。
假设我有一个名为“MyTopic”的主题,它有3个分区P0、P1和P2。这些分区中的每一个都有一个leader,并且本主题的数据(消息)分布在这些分区中。 1.Producer将始终根据代理上的负载以循环方式写到分区的领导者。对吗? 2.制作人如何认识隔断的首领?