当前位置: 首页 > 知识库问答 >
问题:

试图为每次迭代生成Kafka主题的消息,但看起来我最终没有向消费者发送消息

邓欣可
2023-03-14

当使用循环调用kakfa生产类时,无法将消息写入kafka主题(生产者)。

我对Python和Kafka很陌生。我正在尝试编写一个python程序,将消息写入Kafka主题并生成,以便Kafka消费者可以订阅该主题以发布消息。

我不确定我的程序中缺少了什么,它限制了我将信息写入主题。

请注意:我正在读取一个JSON文件,并使用for循环来准备键值。然后将其分配给一个变量,并使用Kafka product with arg为msg引用该变量。

附件是Kafka制片人节目。

输入:Json_smpl。json

文件内容:

{
"transaction":{
"Accnttype":"Saving"
,"Branch":"West"
,"id":"WS"
}
}

程序:

from confluent_kafka import Producer
import json

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {0}: {1}"
              .format(msg.value(), err.str()))
    else:
        print("Message produced: {0}".format(msg.value()))

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j)
        get_data = data.get("transactions")
    print(get_data)
    for i in get_data:
        a = list(get_data.items()[0])
        p.produce(topic='mytopic12', 'myvalue #{0}'.format(a), callback=acked)
except KeyboardInterrupt:
    pass
p.flush(1)

预期结果:消息(JSON键

实际结果:主题中没有消息。因此消费者没有收到任何消息。

共有1个答案

濮佑运
2023-03-14

您的文件没有事务键,也没有循环,因此您的JSON没有被解析,您也没有捕捉到KeyError或ValueError

从这个开始

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j).get("transaction")
        tosend = json.dumps(data)
        print("Ready to send : {}".format(tosend))
        p.produce(topic='mytopic12', tosend, callback=acked)
except:
    print("There was some error")
 类似资料:
  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那