当前位置: 首页 > 知识库问答 >
问题:

从Kafka到HDFS的avro事件

弓胜泫
2023-03-14

我有 kafka 集群,它从生产者那里接收 avro 事件。

我想使用flume来消费这些事件并将它们作为avro文件放在HDFS中

水槽可以吗?

有没有人有一个配置文件的例子来演示如何做?

Yosi

共有2个答案

闻人举
2023-03-14

考虑一下这个场景。对于来自kafka的avro事件(只有二进制数据,没有模式),下面是为我工作的代理。

架构将使用以下代理在接收器端添加。

#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.useFlumeEventFormat = false
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers =${BOOTSTRAP_SERVERS}
MY_AGENT.sources.my-source.kafka.topics = my-topic
MY_AGENT.sources.my-source.kafka.consumer.group.id = my-topic_grp
MY_AGENT.sources.my-source.kafka.consumer.client.id = my-topic_clnt
MY_AGENT.sources.my-source.kafka.compressed.topics = my-topic
MY_AGENT.sources.my-source.kafka.auto.commit.enable = false
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.max.partition.fetch.bytes=704857
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=latest

#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false

#Sink
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.kerberosPrincipal =${user}
MY_AGENT.sinks.my-sink.hdfs.kerberosKeytab =${keytab}
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://nameservice1/my_hdfs/my_table1/timestamp=%Y%m%d
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro

MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs://nameservice1/my_hdfs/avro_schemas/${AVSC_FILE}

我想强调的几件事。

< code > MY _ agent . sinks . MY-sink . HDFS . write format = Text ..有助于仅转储来自flume事件的数据(忽略Flume事件标题....)

< code > MY _ agent . sinks . MY-sink . serializer . schema URL = HDFS://name service 1/MY _ HDFS/avro _ schemas/$ { AVSC文件}..需要传递适当的模式(将被添加到avro文件中的二进制数据)。hdfs中的最终输出文件将包含模式数据。

在HDFS中存储数据后,使用适当的avro模式创建了hive表,我能够按预期访问数据。

邰英毅
2023-03-14

这确实是可能的。

如果你想从Kafka消费,那么你需要设置一个Kafka源和一个将使用Avro的HDFS接收器。

以下是指向 Kafka 源代码的配置选项的链接:http://flume.apache.org/FlumeUserGuide.html#kafka-source

设置源配置非常简单。您当然需要对此进行测试,以验证您选择的设置是否与您的系统配合良好。

要使用Avro设置HDFS,您需要设置HDFS接收器,并且您很幸运,此站点描述了如何执行此操作:http://thisdataguy.com/2014/07/28/avro-end-to-end-in-hdfs-part-2-flume-setup/

最后,您需要配置一个通道。我有使用默认设置的 Flume 内存通道的经验(我相信......现在无法检查),效果很好。

我建议您花时间使用Flume留档:http://flume.apache.org/FlumeUserGuide.html,因为所有这些信息都包含在那里。在设置Flume代理来处理数据之前,了解您正在使用的系统非常重要。

 类似资料:
  • 我尝试使用Kafka流将一个带有String/JSON消息的主题转换为另一个作为Avro消息的主题。 并得到如下所示的异常: 这是正确的做法吗?我对Kafka溪流和阿夫罗是新来的

  • 我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一

  • 我让用户编写AVRO文件,我想使用Flume将所有这些文件移动到使用Flume的HDFS中。因此,我以后可以使用Hive或Pig来查询/分析数据。 在客户端上,我安装了水槽,并有一个SpoolDir源和AVRO接收器,如下所示: 在hadoop集群上,我有一个AVRO源和HDFS接收器: 问题是HDFS上的文件不是有效的AVRO文件!我正在使用色调UI检查文件是否是有效的AVRO文件。如果我将我在

  • 我正在使用Kafka连接分布。命令是:bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties 工作人员配置为: Kafka连接重新开始没有错误! java代码如下: 奇怪的事情发生了。我从kafka-logs中获取数据,但在hdfs中没有数据(没有主题目录)。我尝试connector命令: 出什

  • {“type”:“record”、“name”:“twitter_schema”、“namespace”:“com.miguno.avro”、“fields”:[{“name”:“username”、“type”:“string”、“doc”:“Twitter.com上的用户帐户名称”}、{“name”:“tweet”、“type”:“string”、“doc”:“用户的Twitter消息内容”}

  • 我正在尝试通过水槽从kafka将数据放入hdfs中。kafka_producer每10秒发送一条消息。我想在hdfs上的一个文件中收集所有消息。这是我使用的水槽配置,但它在hdfs上存储了许多文件(一个用于消息): 附言我从一个文件开始.csv。kafka 生产者获取文件并选择一些感兴趣的字段,然后每 10 秒发送一个条目。Flume将条目存储在Hadoophdfs上,但存储在许多文件中(1个条目