当前位置: 首页 > 知识库问答 >
问题:

使用水槽将文件从假脱机目录移动到HDFS

索正豪
2023-03-14

我正在为我公司的 POC 实施一个小型 hadoop 集群。我正在尝试使用Flume将文件导入HDFS。每个文件都包含如下 JSON 对象(每个文件 1 个“长”行):

{ "objectType" : [ { JSON Object } , { JSON Object }, ... ] }

“objectType”是数组中对象的类型(例如:事件、用户…)。

这些文件稍后将由多个任务根据“对象类型”进行处理。

我正在使用spoolDir源和HDFS接收器。

我的问题是:

>

  • flume写入HDFS时,是否可以保留源文件名(文件名是唯一的,因为文件名中包含时间戳和UUID)

    有没有办法将“deserializer.maxLineLength”设置为一个无限制的值(而不是设置一个很高的值)?

    我真的不想泄露数据。JDBC和File哪个通道最好?(我没有高吞吐量的流量)

    我的限制是,我必须尽可能多地使用开箱即用的水槽(没有自定义元素)。

    谢谢你的帮助!

  • 共有1个答案

    凌琦
    2023-03-14

    当flume写入HDFS时,可以保留源文件名吗(文件名是唯一的,因为文件名中包含时间戳和UUID)

    对对于spooldir源,请确保fileheader属性设置为true。这将包括记录的文件名。

    agent-1.sources.src-1.fileHeader = true
    

    然后,对于你的接收器,使用avro_event序列化程序来捕获avro flume事件记录头中的文件名。

    agent-1.sinks.snk-1.serializer = avro_event
    

    avro 记录符合此架构。https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/FlumeEventAvroEventSerializer.java#L30

    有没有办法将“deserializer.maxLineLength”设置为一个无限制的值(而不是设置一个很高的值)?

    deserializer.maxLineLength没有无限制的配置https://github . com/Apache/flume/blob/trunk/flume-ng-core/src/main/Java/org/Apache/flume/serialization/line deserializer . Java # L143

    我真的不想泄露数据。JDBC和File哪个通道最好?(我没有高吞吐量的流量)

    这可能取决于您的数据库或文件系统的弹性选项。如果您有带备份的冗余数据库,请选择JDBC。如果你有一个持久的有弹性的文件系统,那么就使用file。

     类似资料:
    • 我有我有水槽代理如下 假脱机目录中的文件将自动重命名为 。已完成,文件应重命名为 。在水槽代理将该文件写入 HDFS 后完成,但在我的情况下,它将文件重命名为 .在代理运行之前已完成。它还将文件重命名为 .即使我只是手动将文件复制到假脱机目录,也已完成。 还有一个问题是删除策略即使在文件复制到HDFS后也不会删除文件。 代理将假脱机目录文件随机写入HDFS。 它还在HDFS创建了大量的tmp文件。

    • 我正在尝试使用水槽假脱机目录摄取到 HDFS(SpoolDir 我正在使用Cloudera Hadoop 5.4.2。(Hadoop 2.6.0,Flume 1.5.0)。 它适用于较小的文件,但适用于较大的文件时失败。请在下面找到我的测试场景: < li >千字节到50-60兆字节的文件,处理时没有问题。 < li >大于50-60MB的文件,它将大约50MB写入HDFS,然后我发现flume代

    • 我在SQL Developer中连接到远程服务器上的Oracle数据库。我有一个PL/SQL脚本,它将数据存储在clob变量中。我希望将此变量的数据输出到本地计算机上的一个文件中。 我遇到了,但我想我没有正确使用它。 下面是我的脚本: 当我运行该脚本时,没有任何内容写入文件C:\home\output.txt。我希望clob变量中的数据被写入文件。 我该如何进行? 编辑:如果有人知道将clob变量

    • 问题内容: 好的,所以我对oracle完全满意。现在,这已经不复存在了; 我认为您可以了解我在下面尝试做的事情。对于找到的每个存储过程,将DDL输出到具有其名称的文件名。 问题是我不知道如何获取假脱机目标来拾取由游标设置的FileName的值。 关于我要去哪里错的任何想法吗?如果有人举这个例子,我将不胜感激。 我觉得我必须在周围跳舞,因为如果我最初创建一列, 我得到一个结果,我似乎无法动态地更改&

    • 我正在尝试实现一个Spring Integration类,它获取一个. xml文件并对其进行解析,如果有效,就将其移动到一个“存档”目录,如果无效,就将其移动到一个错误目录。 然而,每当调用< code>calback.execute()时,我都会得到这个错误,我不太明白为什么。 虽然我有一个消息处理程序,但我怀疑这个问题的原因是我没有重写handle方法。但我不知道该怎么做。

    • 我的项目有一个要求。我必须使用水槽收集日志数据,并且必须将数据输入到hive表中。 在这里,我需要将放置在文件夹中的文件收集到hdfs中,我正在使用Spooldir进行。在此之后,我需要处理这些文件并将输出放在hive文件夹中,以便立即查询数据。 我是否可以使用 sink 处理源文件,使放置在 hdfs 中的数据已经处理为所需的格式。? 谢了,萨希