当前位置: 首页 > 知识库问答 >
问题:

Kafka remote consumer无法获取消息

宓弘壮
2023-03-14

我设置了3个节点汇流/Kafka都指向同一个动物园管理员

所有3台服务器都已播发。Listener=带有明文的公共ipv4

broker.id.generation.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://PUBLIC-IPv4:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
zookeeper.connect=10.114.16.19:2181
bootstrap.servers=10.114.16.19:9092,10.114.16.21:9092,10.114.16.20:9092
producer = KafkaProducer(
    bootstrap_servers='PUBLIC-IP:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('slim', {'topic': 'kafka'})
print('Making connection.')
consumer = KafkaConsumer(bootstrap_servers='PUBLIC-IP:9092')

print('Assigning Topic.')
consumer.assign([TopicPartition('slim', 2)])

print('Getting message.')
for message in consumer:
    print("OFFSET: " + str(message[0])+ "\t MSG: " + str(message))

当我运行消费者py客户机时,它只是保持打开状态,没有得到任何消息来澄清上面我在网上找到的测试代码,我没有编写它,因为我还在学习Kafka API

共有1个答案

牟华翰
2023-03-14

首先,请确保您能够pingpublic-ippublic-ip,并且您没有陷入DNS问题。如果您在Linux平台上,则安装kafkacat,并通过以下命令测试它是否可以使用数据:

kafkacat -b <BROKER_IP> -t <TOPIC_NAME>
 类似资料:
  • 问题内容: 我一直在尝试制作Angular2的快速入门指南。我按照快速指南中的说明进行了操作。但是,当我运行它时,它显示以下消息“无法获取”。有人知道为什么会这样吗? boot.js文件 app.component.js文件 索引文件 最后,package.json文件 我运行“ npm start”行,该行打开浏览器并显示“无法获取” 问题答案: 我弄清楚了问题所在。我的html文件中有多余的空

  • 我们使用nextjs/reactjs作为FE,并且我们有一个server.js文件,它允许我们在上传映像,但是由于某种原因,每当我们运行服务器时,都会出现错误 下面是我们在server.js上的代码 这些是我们package.json中包含的脚本 希望得到一些答案和建议。这些代码在本地运行,没有任何问题

  • 问题内容: 我正在尝试使一些Go对象实现io.Writer,但是写入字符串而不是文件或类似文件的对象。自实施以来,我以为会奏效。但是,当我尝试这样做: 我收到以下错误: 我很困惑,因为它清楚地实现了接口。如何解决此错误? 问题答案: 将指针传递给缓冲区,而不是缓冲区本身:

  • 我是Storm世界的新手。在我的拓扑中,我使用Kafka的数据,并使用。 通过一些测试,我得到了以下警告消息: 2015-10-01 23:31:51.753 s.k.KafkaUtils[警告]获取了偏移量超出范围的获取请求:[85970]2015-10-01 23:31:51.755 s.k.PartitionManager[警告]使用新偏移量:0 我的\\\\\\\\\\\\\\\\\\\\

  • 我有一个简单的登录模型类,几乎没有数据注释验证 View是一个部分View,如下所示: 我相信我已经做了验证消息显示所需的所有事情,但它没有,客户端验证也没有。在母版页的头部区域,我还包括了以下脚本 是什么导致了这个问题?解决办法是什么?

  • 我从Spring Boot应用程序向Kafka发送消息 application.properties 配置 在日志中,我可以看到如下消息: SUCCESS: SendResult[producerRecords=产品记录(主题=uniqTopic123,分区=null,标头=RecordHeaders(标头 = [], isReadOnly=true),key=testKey,value=Test