当前位置: 首页 > 知识库问答 >
问题:

如何用Python从Kafka解码/反序列化Avro

阎冠玉
2023-03-14

我从一个远程服务器接收到Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),这些消息用json字典表示clickstream数据,其中包含用户代理、位置、url等字段。下面是消息的样子:

b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'

如何解码?我尝试了bson解码,但字符串没有被识别为UTF-8,因为我猜它是一种特定的Avro编码。我找到https://github.com/verisign/python-confluent-schemaRegistry,但它只支持Python2.7。理想情况下,我希望使用Python3.5+和MongoDB处理数据并将其存储为我当前的基础设施。

共有1个答案

窦华晖
2023-03-14

我原以为Avro库只是用来读取Avro文件,但它实际上解决了Kafka消息的解码问题,如下所示:我首先导入库并将模式文件作为参数,然后创建一个函数将消息解码到字典中,我可以在消费者循环中使用。

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
 类似资料:
  • 我收到Kafka主题中的二进制Avro文件,我必须对它们进行反序列化。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的Avro文件分离是一种更好的做法,但我无法控制生产者,也无法更改。 我的代码在Apache Storm上运行。首先,我创建一个读卡器: 然后,我尝试反序列化消息,但不声明架构: 但当消息到达时,我会收到一个错误: 我看到的所有答案都是关于

  • 我正在用Kafka、星火和朱皮特笔记本做概念验证,我遇到了一个奇怪的问题。我正在试着阅读从Kafka到Pyspark的Avro记录。我正在使用汇合模式注册表获取模式以反序列化avro消息。反序列化spark dataframe中的avro消息后,结果列为空,没有任何错误。列应该包含数据,因为当强制转换为字符串时,某些avro字段是可读的。 我也尝试过在Scala中的spark-shell(没有ju

  • 问题内容: 我很好奇序列化和反序列化的方式。我使用关键字“ json”和“ tuple”进行搜索,但找不到所需的内容。 问题答案: 我通过和Json.net进行测试,测试代码如下。结果显示可序列化和可反序列化。因此,我可以在应用程序中使用它们。 测试代码 注释 在将序列化到字符串{“项目1”:“一”,“项目2”:“嘻嘻”,“项目3”:真正},并且它可以被反序列化回类型。

  • 问题内容: 我正在使用JSON中的Google Geocode响应。 JSON格式如下: 我正在尝试使用Java创建序列化和反序列化它们。我尝试了GSON,但是因为它无法在更深层次上反序列化对象,所以GSON将不是一个选择。 我只是想知道是否有人对此主题有经验?也许您尝试过可以解决此问题的库?一些示例代码会很棒。 我真的不想为此编写自己的API … 问题答案: 使用杰克逊