当前位置: 首页 > 知识库问答 >
问题:

水槽:多个来源将原木添加到单个水槽

封瑞
2023-03-14

我正在尝试将日志从单台机器上的不同目录收集到本地文件系统文件或 HDFS。

我已经注册了 2 个来源 r1、r2。两个源都指向单通道C1。有一个接收器连接到通道。K1

请找到下面的配置文件:

# Name the components on this agent
a1.sources = r1
a1.sources = r2
a1.sinks = k1
a1.channels = c1


a1.sources.r2.type = exec
a1.sources.r2.command = tail -f /PATH/bper-peg-pt-rest.log

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /PATH/bper-peg-ejb.log


# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/vbsc/Desktop/flume_project_logging/logs_aggregated
a1.sinks.k1.sink.rollInterval = 0

# Use file channel
a1.channels.c1.type = file

# Bind the source and sink to the channel
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1
a1.sources.r1.channels = c1

但是当我使用代理 a1 启动 Flume 时,只有一个源 (r2) 正在启动。水槽代理启动日志:

16/06/14 14:38:09 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/06/14 14:38:09 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/vbsc/Desktop/flume_project_logging/flume_tailSource.conf
16/06/14 14:38:09 INFO conf.FlumeConfiguration: Processing:k1
16/06/14 14:38:09 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
16/06/14 14:38:09 INFO conf.FlumeConfiguration: Processing:k1
16/06/14 14:38:09 INFO conf.FlumeConfiguration: Processing:k1
16/06/14 14:38:09 INFO conf.FlumeConfiguration: Processing:k1
16/06/14 14:38:09 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
16/06/14 14:38:09 INFO node.AbstractConfigurationProvider: Creating channels
16/06/14 14:38:09 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type file
16/06/14 14:38:10 INFO node.AbstractConfigurationProvider: Created channel c1
16/06/14 14:38:10 INFO source.DefaultSourceFactory: Creating instance of source r2, type exec
16/06/14 14:38:10 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: file_roll
16/06/14 14:38:10 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r2, k1]
16/06/14 14:38:10 INFO node.Application: Starting new configuration:{ sourceRunners:{r2=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r2,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4ad9cb27 counterGroup:{ name:null counters:{} } }} channels:{c1=FileChannel c1 { dataDirs: [/root/.flume/file-channel/data] }} }
16/06/14 14:38:10 INFO node.Application: Starting Channel c1
16/06/14 14:38:10 INFO file.FileChannel: Starting FileChannel c1 { dataDirs: [/root/.flume/file-channel/data] }...
16/06/14 14:38:10 INFO file.Log: Encryption is not enabled
16/06/14 14:38:10 INFO file.Log: Replay started
16/06/14 14:38:10 INFO file.Log: Found NextFileID 13, from [/root/.flume/file-channel/data/log-9, /root/.flume/file-channel/data/log-11, /root/.flume/file-channel/data/log-13, /root/.flume/file-channel/data/log-12, /root/.flume/file-channel/data/log-10]
16/06/14 14:38:10 INFO file.EventQueueBackingStoreFileV3: Starting up with /root/.flume/file-channel/checkpoint/checkpoint and /root/.flume/file-channel/checkpoint/checkpoint.meta
16/06/14 14:38:10 INFO file.EventQueueBackingStoreFileV3: Reading checkpoint metadata from /root/.flume/file-channel/checkpoint/checkpoint.meta
16/06/14 14:38:10 INFO file.FlumeEventQueue: QueueSet population inserting 0 took 0
16/06/14 14:38:10 INFO file.Log: Last Checkpoint Tue Jun 14 14:37:49 CEST 2016, queue depth = 0
16/06/14 14:38:10 INFO file.Log: Replaying logs with v2 replay logic
16/06/14 14:38:10 INFO file.ReplayHandler: Starting replay of [/root/.flume/file-channel/data/log-9, /root/.flume/file-channel/data/log-10, /root/.flume/file-channel/data/log-11, /root/.flume/file-channel/data/log-12, /root/.flume/file-channel/data/log-13]
16/06/14 14:38:10 INFO file.ReplayHandler: Replaying /root/.flume/file-channel/data/log-9
16/06/14 14:38:10 INFO tools.DirectMemoryUtils: Unable to get maxDirectMemory from VM: NoSuchMethodException: sun.misc.VM.maxDirectMemory(null)
16/06/14 14:38:10 INFO tools.DirectMemoryUtils: Direct Memory Allocation:  Allocation = 1048576, Allocated = 0, MaxDirectMemorySize = 20316160, Remaining = 20316160
16/06/14 14:38:10 INFO file.LogFile: fast-forward to checkpoint position: 58602
16/06/14 14:38:10 INFO file.LogFile: Encountered EOF at 58602 in /root/.flume/file-channel/data/log-9
16/06/14 14:38:10 INFO file.ReplayHandler: Replaying /root/.flume/file-channel/data/log-10
16/06/14 14:38:10 INFO file.LogFile: fast-forward to checkpoint position: 20798
16/06/14 14:38:10 INFO file.LogFile: Encountered EOF at 20798 in /root/.flume/file-channel/data/log-10
16/06/14 14:38:10 INFO file.ReplayHandler: Replaying /root/.flume/file-channel/data/log-11
16/06/14 14:38:10 INFO file.LogFile: fast-forward to checkpoint position: 3178
16/06/14 14:38:10 INFO file.LogFile: Encountered EOF at 3178 in /root/.flume/file-channel/data/log-11
16/06/14 14:38:10 INFO file.ReplayHandler: Replaying /root/.flume/file-channel/data/log-12
16/06/14 14:38:10 INFO file.LogFile: fast-forward to checkpoint position: 3264
16/06/14 14:38:10 INFO file.LogFile: Encountered EOF at 3264 in /root/.flume/file-channel/data/log-12
16/06/14 14:38:10 INFO file.ReplayHandler: Replaying /root/.flume/file-channel/data/log-13
16/06/14 14:38:10 INFO file.LogFile: fast-forward to checkpoint position: 3264
16/06/14 14:38:10 INFO file.LogFile: Encountered EOF at 3264 in /root/.flume/file-channel/data/log-13
16/06/14 14:38:10 INFO file.ReplayHandler: read: 0, put: 0, take: 0, rollback: 0, commit: 0, skip: 0, eventCount:0
16/06/14 14:38:10 INFO file.FlumeEventQueue: Search Count = 0, Search Time = 0, Copy Count = 0, Copy Time = 0
16/06/14 14:38:10 INFO file.Log: Rolling /root/.flume/file-channel/data
16/06/14 14:38:10 INFO file.Log: Roll start /root/.flume/file-channel/data
16/06/14 14:38:10 INFO file.LogFile: Opened /root/.flume/file-channel/data/log-14
16/06/14 14:38:10 INFO file.Log: Roll end
16/06/14 14:38:10 INFO file.EventQueueBackingStoreFile: Start checkpoint for /root/.flume/file-channel/checkpoint/checkpoint, elements to sync = 0
16/06/14 14:38:10 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1465907890431, queueSize: 0, queueHead: 373
16/06/14 14:38:10 INFO file.Log: Updated checkpoint for file: /root/.flume/file-channel/data/log-14 position: 0 logWriteOrderID: 1465907890431
16/06/14 14:38:10 INFO file.FileChannel: Queue Size after replay: 0 [channel=c1]
16/06/14 14:38:11 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
16/06/14 14:38:11 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
16/06/14 14:38:11 INFO node.Application: Starting Sink k1
16/06/14 14:38:11 INFO node.Application: Starting Source r2
16/06/14 14:38:11 INFO source.ExecSource: Exec source starting with command:tail -f /PATH/bper-peg-pt-rest.log
16/06/14 14:38:11 INFO sink.RollingFileSink: Starting org.apache.flume.sink.RollingFileSink{name:k1, channel:c1}...
16/06/14 14:38:11 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
16/06/14 14:38:11 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
16/06/14 14:38:11 INFO sink.RollingFileSink: RollInterval is not valid, file rolling will not happen.
16/06/14 14:38:11 INFO sink.RollingFileSink: RollingFileSink k1 started.
16/06/14 14:38:11 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
16/06/14 14:38:11 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r2 started
16/06/14 14:38:11 INFO source.ExecSource: Command [tail -f /PATH/bper-peg-pt-rest.log] exited with 1

谢谢

共有1个答案

仲阳朔
2023-03-14

我需要声明以下两个来源:

a1.sources = r1 r2

早些时候,我这样做是

a1.sources = r1
a1.sources = r2

所以只有一个来源被注册。

 类似资料:
  • 我试图建立flume,这样每个代理可以有多个接收器,最终有多个通道和源(现在只看多个通道)。我有一个类似这样的配置文件和一个ruby模板。我不知道如何将功能添加到模板文件中,以便每个代理可以将一个事件发送到多个通道

  • 我的要求是将数据发送到不同的ES接收器(基于数据)。例如:如果数据包含特定信息,则将其发送到sink1,否则将其发送到sink2等(基本上是根据数据动态发送到任何一个接收器)。我还想分别为ES sink1、ES sink2、ES sink3等设置并行度。 有什么简单的方法可以在flink中实现上述目标吗? 我的解决方案:(但并不满意) 我可以想出一个解决方案,但有中间Kafka主题,我写(topi

  • 我想使用 flume 将数据从 hdfs 目录传输到 hdfs 中的目录,在此传输中,我想应用处理形态线。 例如:我的来源是 我的水槽是 有水槽可能吗? 如果是,源水槽的类型是什么?

  • 我遇到了Flume的问题(Cloudera CDH 5.3上的1.5): 我想做的是:每5分钟,大约20个文件被推送到假脱机目录(从远程存储中抓取)。每个文件包含多行,每行是一个日志(在JSON中)。文件大小在10KB到1MB之间。 当我启动代理时,所有文件都被成功推送到HDFS。1分钟后(这是我在flume.conf中设置的),文件被滚动(删除. tmp后缀并关闭)。 但是,当在假脱机目录中找到

  • 下午好,我在增加Flume的堆大小时遇到了麻烦。结果,我得到: 我增加了“flume-env.sh”和Hadoop/Yarn中定义的堆。运气不好。 有一点要注意,在启动水槽时,Exec(进程构建器?)似乎将堆定义为20Mb。关于如何覆盖它有什么想法吗? 最终,我尝试将Heapsize设置为1512MB。

  • https://cwiki.apache.org/confluence/display/FLUME/Getting 开始的页面说 HDFS sink 支持追加,但我无法找到有关如何启用它的任何信息,每个示例都在滚动文件上。因此,如果可能的话,我将不胜感激有关如何将水槽附加到现有文件的任何信息) 使现代化 可以将所有滚动属性设置为0,这将使flume写入单个文件,但它不会关闭文件,新记录对其他进程不