当前位置: 首页 > 知识库问答 >
问题:

不能将Kafka用于python

陶山
2023-03-14

Python:2.6.6

Kafka-蟒蛇:1.4.3

从Kafka.Consumer进口KafkaConsumer

文件“/usr/lib/python2.6/site-packages/kafka/consumer/init.py”,第5行

从kafka.consumer.group导入KafkaConsumer

文件“/usr/lib/python2.6/site-packages/kafka/record/init.py”,第1行

从kafka.record.memory_records导入

文件“/usr/lib/python2.6/site-packages/kafka/record/memory_records.py”,第27行

类DefaultRecordBatchBuilder(DefaultRecordBase,ABCRecordBatchBuilder):

DefaultRecordBatchBuilder中的文件“/usr/lib/python2.6/site-packages/kafka/record/default_records.py”,第378行

byte_like=(bytes、bytearray、memoryview),

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json

class Kafka_producer():

    def __init__(self, kafkahost,kafkaport, kafkatopic):
        self.kafkatopic = kafkatopic
        service_host = kafkahost+":"+kafkaport
        self.producer = KafkaProducer(bootstrap_servers=service_host)

    def sendjsondata(self, params):
        try:
            # parmas_message = json.dumps(params)
            producer = self.producer
            futur = producer.send(self.kafkatopic, params.encode('utf-8'))
            res = futur.get(timeout=60)
            producer.flush()
            producer.close()
        except KafkaError as e:
            print e

if __name__ == '__main__':
    # test = {
    #     "test":"testtets"
    # }
    # Kafka_producer("http://10.25.245.192","9092","nori-log").sendjsondata(test)
    producer = KafkaProducer(bootstrap_servers='10.25.245.192:9092')
    for _ in range(100):
        producer.send('nori-log', {"test":"test_content"})

我把多伦多的版本换成了2.2.1

共有1个答案

秦博达
2023-03-14

memoryview类型在Python 2.7中是新的。Python 2.6.6已经有8年的历史了,而且已经有5年没有得到任何支持或安全更新了。您需要升级您的Python安装。

 类似资料:
  • 问题内容: 我在将Chrome驱动程序用于Selenium时遇到问题。我已下载chromedriver,并将其保存到C:\ Chrome: 使用它给我以下错误: 任何帮助,将不胜感激。 问题答案: 您应该指定可执行文件路径,而不是包含可执行文件的目录路径。

  • 我在Windows子系统Linux上安装了kafka,并开始使用命令服务启动,所有服务都已启动。现在,当我尝试从Windows运行我的kafka-spring应用程序时,它显示以下错误:- 无法建立与节点-1(localhost/127.0.0.1:9092)的连接。经纪人可能不可用。 我的服务器属性是:- 我哪里出错了???

  • 我在我的项目分级文件中收到了这样的警告: 警告:(16,5)“生成类型”不能应用于“(groovy.lang.closure )” 我的buildTypes部分是: 我目前使用的是Android Studio 1.1.0、CompilesDKVersion22、BuildToolsVersion22.0.0和TargetSDKVersion22。我试着退让到21岁,但还是收到了警告。 是什么导致了

  • 我为一个用Scala编码的Kafka Streams应用程序创建了一个jar文件,使用IntelliJ IDEA本身,然后使用SBT。以下是我一直面临的问题,到目前为止我还没有成功: 首先,我使用IntelliJ idea制作了一个jar文件。运行Jar文件时,它给出了以下错误: java.lang.ClassNotFoundException:Main... null 之后,我尝试使用SBT创建

  • 我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.