当前位置: 首页 > 知识库问答 >
问题:

如何使用从Flink序列化到AVRO的Kafka事件?

舒博雅
2023-03-14

我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一个case类urresponse(status:int,domain:String,url:String,queue:String,html:String)和一个schemaval schema:schema=new schema。解析器()。解析(“{”type\“:\”record\“,\”name\“:\”urresponse\“,\”fields\“:[{”name\“:\”status\“,\”type\“:\”long\“,{”name\“:”domain\“,\”type\“:”string\“,{”name\“:”url\“,”type\”string\“,\”name\“:”queue\,”type\“:”string\“,{”name\”html“,”type“:\”string“}]}”)。我尝试了3种方法:

  1. val stream=env.addSource(new Flink Kafka消费者(kafkaTopic, AvroDeserializationSchema.for通用(模式),属性))运行时出错:
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
inferred type arguments [schemas.URLResponse] do not conform to method forSpecific's type parameter bounds [T <: org.apache.avro.specific.SpecificRecord]
      kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties))
constructor AvroDeserializationSchema in class AvroDeserializationSchema cannot be accessed in object MyApp
    val des: AvroDeserializationSchema[URLResponse] = new AvroDeserializationSchema[URLResponse](classOf[URLResponse])

求求你,救命!首选的方法是什么?为什么不起作用?谢谢

共有1个答案

方斌
2023-03-14

似乎推荐第一种方法。提到的异常与avro反序列化scala实现的问题有关。如果我使用java实现(https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro),它会很好。我的解决方案:

    val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
      kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
      new GenericRecordAvroTypeInfo(schema))
    val stream = new DataStream[GenericRecord](javaStream)
 类似资料:
  • 我从一个远程服务器接收到Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),这些消息用json字典表示clickstream数据,其中包含用户代理、位置、url等字段。下面是消息的样子: 如何解码?我尝试了bson解码,但字符串没有被识别为UTF-8,因为我猜它是一种特定的Avro编码。我找到https://github.com/verisign

  • 我收到Kafka主题中的二进制Avro文件,我必须对它们进行反序列化。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的Avro文件分离是一种更好的做法,但我无法控制生产者,也无法更改。 我的代码在Apache Storm上运行。首先,我创建一个读卡器: 然后,我尝试反序列化消息,但不声明架构: 但当消息到达时,我会收到一个错误: 我看到的所有答案都是关于

  • 我有 kafka 集群,它从生产者那里接收 avro 事件。 我想使用flume来消费这些事件并将它们作为avro文件放在HDFS中 水槽可以吗? 有没有人有一个配置文件的例子来演示如何做? Yosi

  • 我有Flume Avro水槽和SparkStreams程序来读取水槽。CDH 5.1、Flume 1.5.0、Spark 1.0,使用Scala作为Spark上的程序lang 我能够制作Spark示例并计算Flume Avro事件。 但是我无法将 Flume Avro 事件反序列化为字符串\文本,然后解析结构行。 有人能举例说明如何使用Scala做到这一点吗?

  • 我正在用Kafka、星火和朱皮特笔记本做概念验证,我遇到了一个奇怪的问题。我正在试着阅读从Kafka到Pyspark的Avro记录。我正在使用汇合模式注册表获取模式以反序列化avro消息。反序列化spark dataframe中的avro消息后,结果列为空,没有任何错误。列应该包含数据,因为当强制转换为字符串时,某些avro字段是可读的。 我也尝试过在Scala中的spark-shell(没有ju

  • 我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢