当前位置: 首页 > 知识库问答 >
问题:

Flume ng/Avro源、内存通道和HDFS接收器-小文件太多

司马高昂
2023-03-14

我面临一个奇怪的问题。我正在寻找从水槽到HDFS的大量信息。我应用了推荐的配置,以避免过多的小文件,但它不起作用。这是我的配置文件。

# single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5458
a1.sources.r1.threads = 20

# Describe the HDFS sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://myhost:myport/user/myuser/flume/events/%{senderType}/%{senderName}/%{senderEnv}/%y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = logs-
a1.sinks.k1.hdfs.fileSuffix = .jsonlog
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#never roll-based on time
a1.sinks.k1.hdfs.rollInterval=0
##10MB=10485760, 128MB=134217728, 256MB=268435456
a1.sinks.kl.hdfs.rollSize=10485760
##never roll base on number of events
a1.sinks.kl.hdfs.rollCount=0
a1.sinks.kl.hdfs.round=false

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 5000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这个配置有效,我看到了我的文件。但文件的平均重量为1.5kb。水槽控制台输出提供了此类信息。

16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating  hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Renaming hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp to hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog
16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Renaming hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp to hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog
16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484509.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484509.jsonlog.tmp

有人知道这个问题吗?

以下是有关水槽行为的一些信息。

该命令是flumengagent-na1-c/path/to/flume/conf-conf文件示例flume。conf-Dflume.root。记录器=跟踪,控制台-Xms8192m-Xmx16384m

注意:记录器指令不起作用。我不明白为什么,但我...

水槽启动输出为:

16/08/03 15:32:55 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/08/03 15:32:55 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:sample-flume.conf
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
16/08/03 15:32:55 INFO node.AbstractConfigurationProvider: Creating channels
16/08/03 15:32:55 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
16/08/03 15:32:55 INFO node.AbstractConfigurationProvider: Created channel c1
16/08/03 15:32:55 INFO source.DefaultSourceFactory: Creating instance of source r1, type avro
16/08/03 15:32:55 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hdfs
16/08/03 15:32:56 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
16/08/03 15:32:56 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
16/08/03 15:32:56 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 5458 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@466ab18a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
16/08/03 15:32:56 INFO node.Application: Starting Channel c1
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
16/08/03 15:32:56 INFO node.Application: Starting Sink k1
16/08/03 15:32:56 INFO node.Application: Starting Source r1
16/08/03 15:32:56 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5458 }...
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
16/08/03 15:32:56 INFO source.AvroSource: Avro source r1 started.

由于我不能有更详细的输出,我必须假设如下信息

[...]
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
[...]

指示接收器配置正确。

附言:我看到了以下答案,但没有一个是有效的(我应该错过一些东西…)。

flume-hdfs-sink-在HDFS上生成大量微小文件

太多小文件hdfs水槽水槽水槽

水槽-连接环-数据-流动-使用-avro-源和接收器

水槽-HDFS-水槽-保持滚动-小文件

共有1个答案

柯阳曦
2023-03-14

根据您的要求增加批量大小

a1.sinks.k1.hdfs.batchSize =

 类似资料:
  • 我正在使用flume将本地文件源到HDFS接收器,下面是我的conf: 我使用用户“flume”来执行这个conf文件。 但它显示我找不到本地文件,权限被拒绝 如何解决这个问题?

  • 我尝试使用Flume 1.7将数据加载到HDFS中。我创建了以下配置: 之后,我使用 netcat 将以下文件发送到源: 该文件包含以下元素: 我面临的问题是,没有任何错误,flume正在写入hdfs,但只有一行传输的文件。如果您开始使用nectat多次将文件推送到源文件,那么有时flume会将多个文件写入hdfs,包括多个行。但很少是所有行。 我试图改变hdfs参数roll大小,批量大小和其他,

  • 某些HDFS接收器文件未关闭 有人说,如果接收器进程因超时条件等问题而失败,它不会再次尝试关闭文件。 我已经查看了水槽日志文件,但没有错误。然而,日志文件显示,每个周期,flume生成两个tmp文件,只关闭一个tmp。。。 对于配置的任何建议将不胜感激!谢谢!

  • 我已将Flume源配置为Spooldir类型。我有很多CSV文件,.xl3和.xls,我希望我的Flume代理将所有文件从假脱机程序加载到HDFS接收器。然而,水槽代理返回异常 这是我对水槽源的配置: 和我的HDFS接收器:

  • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

  • 我需要关于Kafka主题的帮助,我想将其放入拼花格式的HDFS中(与daily partitionner)。 我在Kafka主题中有很多数据,基本上都是json数据,如下所示: 本主题的名称为:测试 我想将这些数据以拼花格式放入我的HDFS集群中。但是我在接收器连接器配置方面遇到了困难。为此,我使用了融合的hdfs-shin-连接器。 以下是我迄今为止所做的工作: 关于为什么我这样配置连接器的一些