当前位置: 首页 > 知识库问答 >
问题:

kafka-python消费者给出错误

谷飞星
2023-03-14

我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html

我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误:

回溯(最近一次调用):文件 “KafkaConsumer.py”,第 4 行,在 for consumer 中的消息中:文件 “/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py”,第 559 行,在下一个返回类型(self).next(self) 文件 “/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”,第 915 行,在下一个返回 next(self._iterator) 文件 “/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”, 第 876 行,_message_generator self._fetcher 中的 msg:文件 “/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py”,第 559 行,在下一个返回类型(self).next(self) 文件中 “/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第 520 行,在下一个返回 next(self._iterator) 文件 “/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第 477 行, 在 self._unpack_message_set(tp, messages) 中 msg 的_message_generator中:文件 “/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第 372 行,在 _unpack_message_set inner_mset = msg.decompress() 文件 “/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py”,第 121 行,在解压缩断言 has_snappy(),'Snappy 解压缩不受支持' 断言错误:不支持快速解压缩

这是我一直在尝试运行的代码:

from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
    print("%s:%d%d: key=%s value=%s"  % (message.topic, message.partition, message.offset, message.key, message.value))

因为,我真的是Kafka的新手,我很难理解我做错了什么。

共有1个答案

岑和风
2023-03-14

您似乎缺少python-snappy,这是读取以snappy格式压缩的数据所必需的。

您需要< code>snappy和< code>snappy-devel,您可以使用yum、apt-get等安装它们。然后尝试< code > pip install python-snappy

 类似资料:
  • 我正在使用一个Kafka产品和一个SpringKafka消费者。我正在使用Json序列化器和反序列化器。每当我试图从主题中读取消费者中的消息时,我会得到以下错误: 我没有在生产者和消费者中配置任何关于头的内容。我错过了什么?

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我