当前位置: 首页 > 知识库问答 >
问题:

水槽中的文件名和变量

纪畅
2023-03-14

现在我正在一个项目中工作,我们试图使用 flume 读取 tomcat 访问日志并在 Spark 中处理这些数据并以正确的格式将它们转储到数据库中。但问题是tomcat访问日志文件是每日滚动文件,文件名每天都会更改。像...

localhost_access_log.2017-09-19.txt
localhost_access_log.2017-09-18.txt
localhost_access_log.2017-09-17.txt

源代码部分的flume-conf文件如下

# Describe/configure the source
flumePullAgent.sources.nc1.type = exec
flumePullAgent.sources.nc1.command = tail -F /tomcatLog/localhost_access_log.2017-09-17.txt
#flumePullAgent.sources.nc1.selector.type = replicating

它在一个固定的文件名上运行tail命令(我使用了固定的文件名,只是为了测试)。如何在flume conf文件中将文件名作为参数传递?

事实上,如果我能够将文件名作为参数传递,那么这也不是一个实际的解决方案。例如,我今天用某个文件名(例如:“localhost_access_log.2017-09-19.txt”)启动flume,明天我将更改文件名(localhost_access_log2017-09-19.txt为localhost\access_og.2017-09-20.txt)时,必须有人停止flume并用新的文件名重新启动。在这种情况下,它将不是一个持续的过程,我必须使用cron作业或类似的东西来停止/启动水槽。另一个问题是,在处理期间,我每天都会丢失一些数据(我们现在使用的服务器是高吞吐量服务器,几乎700-800 TPS)。(我的意思是生成新文件名所需的时间停止水槽时间开始水槽时间)

任何一个人,有想法如何在正式生产环境中使用滚动文件名运行水槽吗?任何帮助都将受到高度赞赏…

共有1个答案

谭志用
2023-03-14

exec 源不适合您的任务,您可以改用假脱机目录源。来自水槽用户指南:

此源允许您通过将要摄取的文件放入磁盘上的“假脱机”目录中来摄取数据。此源将监视指定目录中的新文件,并在新文件中出现事件时对其进行解析

然后,在配置文件中,您将像这样提及日志目录:

agent.sources.spooling_src.spoolDir = /tomcatLog

 类似资料:
  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法

  • 我尝试每5分钟用其他事件刷新. tmp文件,我的源代码很慢,需要30分钟才能在我的hdfs接收器中获取128MB文件。 flume hdfs接收器中是否有任何属性,我可以在将.tmp文件滚动到hdfs之前控制该文件的刷新率。 我需要它使用. tmp文件中的hive表查看HDFS中的数据。 目前我正在查看来自的数据。tmp文件,但是。由于卷大小为128MB,tmp文件长时间不刷新。

  • 我试图建立flume,这样每个代理可以有多个接收器,最终有多个通道和源(现在只看多个通道)。我有一个类似这样的配置文件和一个ruby模板。我不知道如何将功能添加到模板文件中,以便每个代理可以将一个事件发送到多个通道

  • 我已经为我的应用程序配置了水槽代理,其中源是Spooldir,接收器是HDFS 我能够在hdfs中收集文件。 代理配置为: 我有以下格式的hdfs文件: /flume/events/file1.txt。1411543838171/水槽/事件/文件2.txt.1411544272696 我想知道我可以删除时间戳(1411543838171) /唯一号码,这是自动生成的每个事件的文件名?

  • 我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html) Kafka消费者使用以下选项初始化:

  • 我试图配置水槽与HDFS作为汇。 这是我的flume.conf文件: 我的hadoop版本是: 水槽版本是: 我已将这两个jar文件放在flume/lib目录中 我将hadoop common jar放在那里,因为在启动flume代理时出现以下错误: 现在代理开始了。这是启动日志: 但是当一些事件发生时,下面的错误出现在水槽日志中,并且没有任何东西被写入hdfs。 我缺少一些配置或jar文件?