from kafka import KafkaConsumer
consumer = KafkaConsumer('SOME-TOPIC',
other connection parameters,
auto_offset_reset= 'earliest')
# value_deserializer=lambda m: json.loads(m.decode('utf-8')))
# value_deserializer=lambda m: decode(m))
for msg in consumer:
print (msg)
我使用以下库获得了它:
pip install confluent-avro
pip install kafka-python
和代码:
from kafka import KafkaConsumer
from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_avro.schema_registry import HTTPBasicAuth
KAFKA_TOPIC = "SOME-TOPIC"
registry_client = SchemaRegistry(
"https://...",
HTTPBasicAuth("USER", "PASSWORD"),
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)
consumer = KafkaConsumer(KAFKA_TOPIC,
other connection parameters,
auto_offset_reset= 'earliest')
for msg in consumer:
v = avroSerde.value.deserialize(msg.value)
k = avroSerde.key.deserialize(msg.key)
print(msg.offset, msg.partition, k, v)
break
参考:https://pypi.org/project/confluent_avro/
主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示
我目前无法在KSTREAM应用程序中反序列化avro原始密钥 使用avro模式编码的密钥(在模式注册表中注册), 当我使用kafka-avro控制台消费者时,我可以看到密钥是正确的反序列化 但不可能让它在KSTREAM应用程序中工作 密钥的avro模式是一个基元: 我已经遵循了confluent的文档 它对值运行良好,但键将是一个字符串,其中包含来自模式注册表的字节,而不仅仅是“卷轴”键 http
我从一个远程服务器接收到Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),这些消息用json字典表示clickstream数据,其中包含用户代理、位置、url等字段。下面是消息的样子: 如何解码?我尝试了bson解码,但字符串没有被识别为UTF-8,因为我猜它是一种特定的Avro编码。我找到https://github.com/verisign
我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在
我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是: