当前位置: 首页 > 知识库问答 >
问题:

Kafka与avro记录

贺跃
2023-03-14

我有以下:Source-Kafka topic(trans)Channel-memory Sink-Hdfs(avro _ event)

kafka主题trans中的数据是使用c#生产者编写的,并且有数千条avro记录。当我运行我的水槽消费者时,它开始将数据下沉到hdfs。问题是数据的格式是:模式数据模式数据

而不是:

模式数据数据

我猜这是因为flume需要一个带有{header} {body}的记录类型,而来自kafka的数据将只是{body}我知道有一种方法可以将写入主题的avro数据包装在avroFlumeEvent中,但这似乎不再是一个真正的avro记录,也许spark消费者或storm会更喜欢真正的avro中的数据。有没有一种方法可以处理这个主题,以便每次flume将数据滚动到hdfs时,数据都不用多个模式就可以写入?

共有2个答案

公羊灿
2023-03-14

一旦你将数据登陆Kafka,你是否考虑过使用LinkedIn的加缪。它将运行mapreduce作业,但您应该获得所需的模式数据数据布局。您还应该查看Confluent的kafka堆栈,特别是它提供的模式注册表和它提供的rest api。

羊舌承天
2023-03-14

我们实际上最终得到了这个工作。我们在C#生产者中使用microsoft .NET avro库而不是apache avro库。这意味着 avro 记录已正确序列化。我还需要将水槽接收器更改为使用“org.apache.flume.sink.hdfs.AvroEventSerializer$Builder”作为接收器序列化程序,而不是“avro_event”。我还需要包含一个连接到 kafka 源的 flume 拦截器,它将变量 “flume.avro.schema.url” 推送到水槽标头中,以便稍后由 hdfs sink 序列化器使用。

我看了一下加缪,但对于我们试图实现的东西来说,它似乎有点矫枉过正,一个连接到 kafka 主题的基本水槽通道,它将 avro 数据下沉到 hdfs。

我刚刚从构建水槽配置的java应用程序中撕下了拦截器位,希望它可以帮助遇到此问题的其他人:

                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId +".interceptors",_interceptorId);           
                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".type","static");
                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".key","flume.avro.schema.url");
                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".value",_avroProdSchemaLocation +_databaseName + "/" + _topic + "/record/" + _schemaVersion + "/" + _topicName + ".avsc");
 类似资料:
  • 我为记录创建了一个Avro模式,我们将其发布到Kafka主题中。我们实际的Kafka记录模式更复杂,但为了简洁起见,我只附上了相关部分。我们在记录中有多个嵌套子类,但由于某些原因,我在尝试发布记录时遇到以下异常(包名称已被掩盖): 这是我定义的Avro模式的当前子集。 我们的对象(KafkaRecord)的JSON表示如下: 我似乎不明白为什么Avro不喜欢这个嵌套的记录,我更喜欢不剥离这些嵌套的

  • 我用Flink的table API创建了一个表。 当运行SQL以查看记录时,我得到: 我知道有一些坏的avro记录被推送到Kafka主题中。在JSON格式中,有一个选项可以通过设置来跳过/过滤这些记录。当从合流avro格式读取时,我们可以跳过这些记录吗? 这并不理想,但不幸的是,尽管有一个模式注册表,但我无法控制要推送到Kafka的内容。

  • 我是流媒体代理(如Kafka)的新手,来自排队消息系统(如JMS、Rabbit MQ)。 我从Kafka文档中读到,消息作为记录存储在Kafka分区的偏移量中。消费者从偏移量读取。 消息和记录有什么区别[多个/部分消息是否构成记录?] 当消费者从偏移量读取时,消费者是否有可能读取部分消息?消费者是否需要基于某种逻辑将这些对等消息串起来? 或 1条消息=1条记录=1个偏移量 之所以会出现这个问题,是

  • 我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问

  • 我试图解析一条Kafka消息,它是以某种加密的AVRO格式。我有以下AvroSchema。avsc avro架构文件: 现在,我编写了以下代码来获取JSON格式的数据: 请帮我解密这封信。 加密字节消息属于以下类型:<代码>080-21-0001:�哦�@@��A.�ǐ�U:�哦�@@��A 我按照建议进行了更改,现在我有以下代码: 但我仍然得到错误为“不是数据文件”。

  • 我正试图了解更多关于我们在Kafka主题中使用的Avro模式的信息,我对这一点相对来说比较陌生。 我想知道是否有一种方法可以在特定情况下发展模式。我们用一个不能为null的新字段或任何默认值来更新模式,因为这些新字段是标识符。解决这个问题的方法是创建新主题,但是有没有更好的方法来改进现有模式?