当前位置: 首页 > 知识库问答 >
问题:

Flume:目录到Avro->Avro到HDFS-传输后Avro无效

洪高阳
2023-03-14

我让用户编写AVRO文件,我想使用Flume将所有这些文件移动到使用Flume的HDFS中。因此,我以后可以使用Hive或Pig来查询/分析数据。

在客户端上,我安装了水槽,并有一个SpoolDir源和AVRO接收器,如下所示:

a1.sources = src1
a1.sinks = sink1
a1.channels = c1

a1.channels.c1.type = memory

a1.sources.src1.type = spooldir
a1.sources.src1.channels = c1
a1.sources.src1.spoolDir = {directory}
a1.sources.src1.fileHeader = true
a1.sources.src1.deserializer = avro

a1.sinks.sink1.type = avro
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hostname = {IP}
a1.sinks.sink1.port = 41414

在hadoop集群上,我有一个AVRO源和HDFS接收器:

a1.sources = avro1
a1.sinks = sink1
a1.channels = c1

a1.channels.c1.type = memory

a1.sources.avro1.type = avro
a1.sources.avro1.channels = c1
a1.sources.avro1.bind = 0.0.0.0
a1.sources.avro1.port = 41414

a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hdfs.path = {hdfs dir}
a1.sinks.sink1.hdfs.fileSuffix = .avro
a1.sinks.sink1.hdfs.rollSize = 67108864
a1.sinks.sink1.hdfs.fileType = DataStream

问题是HDFS上的文件不是有效的AVRO文件!我正在使用色调UI检查文件是否是有效的AVRO文件。如果我将我在PC上生成的AVRO I文件上传到集群,我可以很好地看到其内容。但是来自Flume的文件不是有效的AVRO文件。

我尝试了Flume中包含的Flume avro客户端,但没有工作,因为它每行发送一个破坏avro文件的Flume事件,这是使用desializer=avrospooldir代码修复的。所以我认为在写入文件时问题出在HDFS接收器上。

使用hdfs。fileType=DataStream它写入avro字段中的值,而不是整个avro文件,从而丢失所有架构信息。如果我使用hdfs。fileType=SequenceFile由于某些原因,这些文件无效。

有什么想法吗?

谢谢

共有1个答案

卢英范
2023-03-14

您必须将其添加到hdfs接收器配置中(此属性的值默认为TEXT):

a1.sinks.sink1.serializer = avro_event

这应该写入有效的avro文件,但使用默认模式。

但是,由于您使用avro文件作为输入,您可能希望编写具有相同模式的avro文件。为此,您可以使用cloud dera的cdk中的AvroEventSerializer。假设您构建了代码并将jar放在Flume的lib目录中,您现在可以在属性文件中定义Serializer:

a1.sinks.sink1.serializer = org.apache.flume.serialization.AvroEventSerializer$Builder

序列化程序假定avro模式以URL或文本的形式存在于每个事件的标题中。要使用后一种方法(效率较低,但可能更容易尝试),必须通过添加以下属性,告诉客户端的源向每个事件添加模式文本:

a1.sources.src1.deserializer.schemaType = LITERAL
 类似资料:
  • 我有 kafka 集群,它从生产者那里接收 avro 事件。 我想使用flume来消费这些事件并将它们作为avro文件放在HDFS中 水槽可以吗? 有没有人有一个配置文件的例子来演示如何做? Yosi

  • 我是Logstash和Avro的初学者。我们正在建立一个系统,logstash作为Kafka队列的制作人。然而,我们遇到了这样一个问题:由Logstash生成的avro序列化事件无法由apache提供的avro工具jar(版本1.8.2)解码。此外,我们注意到Logstash和avro工具的序列化输出有所不同。 我们有以下设置: logstash 5.5版 logstash avro编解码器版本3

  • 在阅读了Apache Flume及其在处理客户端事件方面提供的好处之后,我决定是时候开始更详细地研究这个问题了。另一个很大的好处似乎是它可以处理Apache Avro对象:-)然而,我很难理解Avro模式是如何用来验证收到的Flume事件的。 为了帮助更详细地了解我的问题,我在下面提供了代码片段; 出于这篇文章的目的,我使用了一个示例模式,定义了一个包含2个字段的嵌套记录。 在我的Java项目中,

  • 我有以下:Source-Kafka topic(trans)Channel-memory Sink-Hdfs(avro _ event) kafka主题trans中的数据是使用c#生产者编写的,并且有数千条avro记录。当我运行我的水槽消费者时,它开始将数据下沉到hdfs。问题是数据的格式是:模式数据模式数据 而不是: 模式数据数据 我猜这是因为flume需要一个带有{header} {body}

  • Avro(读音类似于[ævrə])是Hadoop的一个子项目,由Hadoop的 创始人Doug Cutting(也是Lucene,Nutch等项目的创始人)牵头开发。Avro是一个数据序列化系统,设计用于支持大 批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro提供的机制使动态语言可以方便地处理 Avro数据。

  • 我尝试使用Kafka流将一个带有String/JSON消息的主题转换为另一个作为Avro消息的主题。 并得到如下所示的异常: 这是正确的做法吗?我对Kafka溪流和阿夫罗是新来的