当前位置: 首页 > 知识库问答 >
问题:

在Flume-ng中使用HDFS接收器和滚动间隔来批量处理90秒的日志信息

家经纶
2023-03-14

我正在尝试使用Flume-ng获取90秒的日志信息,并将其放入HDFS的一个文件中。我让flume通过exec和tail查看日志文件,但是它每5秒创建一个文件,而不是我试图配置为每90秒创建一个文件。

我的flume.conf如下:

# example.conf: A single-node Flume configuration                                                                                                                  
# Name the components on this agent                                                                                                                                
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1                                                                                                                                       
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/cloudera/LogCreator/fortune_log.log

# Describe sink1                                                                                                                                                   
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost/flume/logtest/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# this parameter seems to be getting overridden                                                                                                                    
agent1.sinks.sink1.hdfs.rollInterval=90
agent1.sinks.sink1.hdfs.rollSize=0
agent1.sinks.sink1.hdfs.hdfs.rollCount = 0

# Use a channel which buffers events in memory                                                                                                                     
agent1.channels.channel1.type = memory

# Bind the source and sink to the channel                                                                                                                          
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

我试图通过参数-agent1.sinks.sink1.hdfs.rollInterval=90来控制文件大小。

运行此配置会产生:

13/01/03 09:43:02 INFO properties.PropertiesFileConfigurationProvider: Reloading configuration file:/etc/flume-ng/conf/flume.conf
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Added sinks: sink1 Agent: agent1
13/01/03 09:43:03 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration  for agents: [agent1]
13/01/03 09:43:03 INFO properties.PropertiesFileConfigurationProvider: Creating channels
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: channel1, registered successfully.
13/01/03 09:43:03 INFO properties.PropertiesFileConfigurationProvider: created channel channel1
13/01/03 09:43:03 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: hdfs
13/01/03 09:43:03 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SINK, name: sink1, registered successfully.
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:source1,state:IDLE} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@1a50ca0c counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel channel1
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink sink1
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Source source1
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
13/01/03 09:43:03 INFO source.ExecSource: Exec source starting with command:tail -f /home/cloudera/LogCreator/fortune_log.log
13/01/03 09:43:07 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186506.tmp
13/01/03 09:43:08 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186506.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186506
13/01/03 09:43:08 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186507.tmp
13/01/03 09:43:12 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186507.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186507
13/01/03 09:43:12 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186508.tmp
13/01/03 09:43:12 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186508.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186508
13/01/03 09:43:12 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186509.tmp
13/01/03 09:43:18 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186509.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186509
13/01/03 09:43:18 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186510.tmp
13/01/03 09:43:18 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186510.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186510

从时间戳可以看出,它大约每5秒钟创建一个文件。这会创建许多小文件。

我希望能够在更大的时间间隔(90秒)上创建文件。

共有2个答案

汝臻
2023-03-14

根据org . Apache . flume . sink . HDFS . bucket writer的源代码:

 /**
 * Internal API intended for HDFSSink use.
 * This class does file rolling and handles file formats and serialization.
 * Only the public methods in this class are thread safe.
 */
class BucketWriter {
  ...
  /**
   * open() is called by append()
   * @throws IOException
   * @throws InterruptedException
   */
  private void open() throws IOException, InterruptedException {
    ...
    // if time-based rolling is enabled, schedule the roll
    if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
              bucketPath, rollInterval);
          try {
            // Roll the file and remove reference from sfWriters map.
            close(true);
          } catch(Throwable t) {
            LOG.error("Unexpected error", t);
          }
          return null;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
          TimeUnit.SECONDS);
    }
    ...
  }
  ...
   /**
   * check if time to rotate the file
   */
  private boolean shouldRotate() {
    boolean doRotate = false;

    if (writer.isUnderReplicated()) {
      this.isUnderReplicated = true;
      doRotate = true;
    } else {
      this.isUnderReplicated = false;
    }

    if ((rollCount > 0) && (rollCount <= eventCounter)) {
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
      doRotate = true;
    }

    if ((rollSize > 0) && (rollSize <= processSize)) {
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
      doRotate = true;
    }

    return doRotate;
  }
...
}

和org . Apache . flume . sink . HDFS . abstracthdfswriter

public abstract class AbstractHDFSWriter implements HDFSWriter {
...
  @Override
  public boolean isUnderReplicated() {
    try {
      int numBlocks = getNumCurrentReplicas();
      if (numBlocks == -1) {
        return false;
      }
      int desiredBlocks;
      if (configuredMinReplicas != null) {
        desiredBlocks = configuredMinReplicas;
      } else {
        desiredBlocks = getFsDesiredReplication();
      }
      return numBlocks < desiredBlocks;
    } catch (IllegalAccessException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (InvocationTargetException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (IllegalArgumentException e) {
      logger.error("Unexpected error while checking replication factor", e);
    }
    return false;
  }
...
}

hdfs文件的滚动由4个条件控制:

  1. hdfs.rollSize
  2. hdfs.rollCount
  3. hdfs.minBlockReplicas(最高优先级,但通常不是导致滚动小文件的原因)
  4. hdfs.rollInterval

根据BucketWriter.class中的这些if段更改值

舒阳州
2023-03-14

对配置文件的重写指定了更完整的参数选择。这个例子将在10k记录或10分钟后写入,这是第一次。此外,我从内存通道更改为文件通道,以帮助数据流的可靠性。

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1                                                                                                                                                                                                                 
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/cloudera/LogCreator/fortune_log.log

# Describe sink1                                                                                                                                                                                                                             
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost/flume/logtest/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)                                                                                                                                              
agent1.sinks.sink1.hdfs.rollInterval = 600
# File size to trigger roll, in bytes (0: never roll based on file size)                                                                                                                                                                     
agent1.sinks.sink1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events)                                                                                                                                                
agent1.sinks.sink1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS                                                                                                                                                                                 
agent1.sinks.sink1.hdfs.batchSize = 10000
agent1.sinks.sink1.hdfs.txnEventMax = 40000
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy                                                                                                                                                                          
# hdfs.codeC = gzip                                                                                                                                                                                                                          
#format: currently SequenceFile, DataStream or CompressedStream                                                                                                                                                                              
#(1)DataStream will not compress output file and please don't set codeC                                                                                                                                                                      
#(2)CompressedStream requires set hdfs.codeC with an available codeC                                                                                                                                                                         
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.maxOpenFiles=50
# -- "Text" or "Writable"                                                                                                                                                                                                                    
#hdfs.writeFormat                                                                                                                                                                                                                            
agent1.sinks.sink1.hdfs.appendTimeout = 10000
agent1.sinks.sink1.hdfs.callTimeout = 10000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)                                                                                                                                                                        
agent1.sinks.sink1.hdfs.threadsPoolSize=100
# Number of threads per HDFS sink for scheduling timed file rolling                                                                                                                                                                          
agent1.sinks.sink1.hdfs.rollTimerPoolSize = 1
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS                                                                                                                                                                 
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS                                                                                                                                                                            
# hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)                                                                                                                         
# hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using                                                                                                                                                
# hdfs.roundUnit), less than current time.                                                                                                                                                                                                   
# hdfs.roundUnit second The unit of the round down value - second, minute or hour.                                                                                                                                                           
# serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.                                                                                 
# serializer.*                                                                                                                                                                                                                               


# Use a channel which buffers events to a file                                                                                                                                                                                               
# -- The component type name, needs to be FILE.                                                                                                                                                                                              
agent1.channels.channel1.type = FILE
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored                                                                                                                                          
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored                                                                                                                                                           
# The maximum size of transaction supported by the channel                                                                                                                                                                                   
agent1.channels.channel1.transactionCapacity = 1000000
# Amount of time (in millis) between checkpoints                                                                                                                                                                                             
agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file                                                                                                                                                                                                   
agent1.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel                                                                                                                                                                                                            
agent1.channels.channel1.capacity 10000000
#keep-alive 3 Amount of time (in sec) to wait for a put operation                                                                                                                                                                            
#write-timeout 3 Amount of time (in sec) to wait for a write operation                                                                                                                                                                       

# Bind the source and sink to the channel                                                                                                                                                                                                    
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
 类似资料:
  • 我有一个写日志到HDFS的Flume-ng。 我在一个节点中做了一个代理。 但是它没有运行。 这是我的配置。 #示例2.conf:单节点水槽配置 #命名这个代理上的组件 agent1.sources=源1 agent1.sinks=sink1 agent1.channels=channel1 agent1.sources.source1.type=avro agent1.sources.sourc

  • 我是Hadoop的新手,正在学习apache Flume。我在Virtualbox上安装了CDH 4.7。以下命令将输出顶部 cputime。如何使用 Apache flume 将以下命令的日志数据输出传输到我的 HDFS?如何创建水槽配置文件?

  • 在HDFS中写入日志文件的最佳方式是什么?我正在尝试配置Apache Flume,并尝试配置能够为我提供数据可靠性的源。我试图配置“exec”,后来也查看了“spooldir”,但flume.apache上的以下文档。org对我的意图表示怀疑- 执行来源: 最常请求的功能之一是像“tail -F file_name”这样的用例,其中应用程序写入磁盘上的日志文件,Flume 尾随文件,将每一行作为事

  • 我面临一个奇怪的问题。我正在寻找从水槽到HDFS的大量信息。我应用了推荐的配置,以避免过多的小文件,但它不起作用。这是我的配置文件。 这个配置有效,我看到了我的文件。但文件的平均重量为1.5kb。水槽控制台输出提供了此类信息。 有人知道这个问题吗? 以下是有关水槽行为的一些信息。 该命令是flumengagent-na1-c/path/to/flume/conf-conf文件示例flume。con

  • 问题内容: 是否可以在Sun JVM中滚动垃圾收集器日志? 目前,我使用以下命令生成日志: 但是我必须使用fifo队列和rotatelogs手动轮换它们以创建每天的新日志。我希望对此有更好的解决方案。 也许有一种方法可以从java内部访问此日志条目,以便将它们重定向到log4j? 编辑:使用fifo队列的解决方案还不够好,因为如果从该队列中读取的进程(例如rotatelogs)读取速度变慢,则会减

  • 我正在尝试设置 Flume-NG 以从一堆服务器(主要运行 Tomcat 实例和 Apache Httpd)收集各种日志,并将它们转储到 5 节点 Hadoop 集群上的 HDFS 中。设置如下所示: 每个应用程序服务器将相关日志尾随到一个Exec源中(每个日志类型对应一个:java、htpd、syslog),该源通过FileChannel将它们输出到Avro接收器。在每台服务器上,不同的源、通道