当前位置: 首页 > 知识库问答 >
问题:

Python用avro存储库反序列化kafka消息

常彭薄
2023-03-14
from kafka import KafkaConsumer
consumer = KafkaConsumer('SOME-TOPIC', 
                        other connection parameters,
                        auto_offset_reset= 'earliest')
                        # value_deserializer=lambda m: json.loads(m.decode('utf-8')))
                        # value_deserializer=lambda m: decode(m))

for msg in consumer:
    print (msg)
    null
    null

共有1个答案

平和雅
2023-03-14

我使用以下库获得了它:

pip install confluent-avro
pip install kafka-python

代码

from kafka import KafkaConsumer

from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_avro.schema_registry import HTTPBasicAuth

KAFKA_TOPIC = "SOME-TOPIC"

registry_client = SchemaRegistry(
    "https://...",
    HTTPBasicAuth("USER", "PASSWORD"),
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)

consumer = KafkaConsumer(KAFKA_TOPIC, 
                        other connection parameters,
                        auto_offset_reset= 'earliest')

for msg in consumer:
    v = avroSerde.value.deserialize(msg.value)
    k = avroSerde.key.deserialize(msg.key)
    print(msg.offset, msg.partition, k, v)
    break

参考:https://pypi.org/project/confluent_avro/

 类似资料:
  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 我目前无法在KSTREAM应用程序中反序列化avro原始密钥 使用avro模式编码的密钥(在模式注册表中注册), 当我使用kafka-avro控制台消费者时,我可以看到密钥是正确的反序列化 但不可能让它在KSTREAM应用程序中工作 密钥的avro模式是一个基元: 我已经遵循了confluent的文档 它对值运行良好,但键将是一个字符串,其中包含来自模式注册表的字节,而不仅仅是“卷轴”键 http

  • 我从一个远程服务器接收到Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),这些消息用json字典表示clickstream数据,其中包含用户代理、位置、url等字段。下面是消息的样子: 如何解码?我尝试了bson解码,但字符串没有被识别为UTF-8,因为我猜它是一种特定的Avro编码。我找到https://github.com/verisign

  • 我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在

  • 我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是: