我很感激你在这方面的帮助。
我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息:
13/08/30 18:00:58 INFO producer.SyncProducer: Connected to xx.xx.xx.xx:6667:false for producing
13/08/30 18:00:58 INFO producer.SyncProducer: Disconnecting from xx.xx.xx.xx:6667:false
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager- 1377910855898] Stopping leader finder thread
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager- 1377910855898] Stopping all fetchers
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager- 1377910855898] All connections stopped
我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。
我的代码是示例消费者示例的精确副本:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例
更新:
[2013-09-03 00:57:30,146] INFO Starting ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2013-09-03 00:57:30,146] INFO Opening socket connection to server /xx.xx.xx.xx:2181 (org.apache.zookeeper.ClientCnxn)
[2013-09-03 00:57:30,235] INFO Connected to xx.xx.xx:6667 for producing (kafka.producer.SyncProducer)
[2013-09-03 00:57:30,299] INFO Socket connection established to 10.224.62.212/10.224.62.212:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2013-09-03 00:57:30,399] INFO Disconnecting from xx.xx.xx.net:6667 (kafka.producer.SyncProducer)
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] All connections stopped (kafka.consumer.ConsumerFetcherManager)
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx-1378195030443-cce6fc51], Cleared all relevant queues for this fetcher (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx.-1378195030443-cce6fc51], Cleared the data chunks in all the consumer message iterators (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], Committing all offsets after clearing the fetcher queues (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,401] ERROR [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], zk client is null. Cannot commit offsets (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,401] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], Releasing partition ownership (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,401] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], exception during rebalance (kafka.consumer.ZookeeperConsumerConnector)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:185)
at scala.None$.get(Option.scala:183)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:434)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:429)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:429)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:681)
at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:715)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
你能提供你的生产商代码样本吗?
您是否签出了最新的0.8版本?似乎在当前版本中已修补并修复了consumerFetched死锁的一些已知问题
您可以尝试使用管理控制台脚本来使用消息,确保您的制作者工作正常。
如果可能的话,发布更多的日志和代码片段,应该有助于进一步调试(似乎我需要更多的声誉来发表评论,所以不得不回答)
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:
我有一个JMS生产者和一个消费者,代理是ActiveMQ,参考下面的代码: 寄件人代码 接收码 问题是 ActiveMQ 队列无法接收来自发送方的消息(请参阅屏幕截图): 当我从 Web 控制台发送消息时,该消息在队列中收到,但来自创建者的消息不会进入队列。 另一个有趣的行为是(如队列接收器代码中所示,接收器在收到第一条消息后退出),同样,当我启动接收器时,它会收到相同的消息,并继续执行,直到我关
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github