Flume-ng生产环境实践(一)Flume-ng生产环境编译

宦博雅
2023-12-01
 1.首先在系统上安装jdk1.6+和mvn3.X:
          $ java -version
          java version "1.6.0_31"
Java(TM) SE Runtime Environment (build 1.6.0_31-b04-415-11M3635)
Java HotSpot(TM) 64-Bit Server VM (build 20.6-b01-415, mixed mode)
$ mvn -version
Apache Maven 3.0.3 (r1075438; 2011-02-28 09:31:09-0800)
Maven home: /usr/share/maven
Java version: 1.6.0_31, vendor: Apple Inc.
Java home: /System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
Default locale: en_US, platform encoding: MacRoman
OS name: "mac os x", version: "10.7.3", arch: "x86_64", family: "mac"

          
2.下载source code:
          
         $ svn co https://svn.apache.org/viewvc/flume/trunk/ flume

3.编译发布:
$ cd flume
$ mvn clean install -DskipTests

如果不跳过测试的话,会出现编译验证失败,由于测试过程中,在flume-ng-core项目下生成event.txt测试文件。需要删除才可以编译通过。详情了解地址:
https://issues.apache.org/jira/browse/FLUME-1372
$ cp flume-ng-dist/target/flume-ng-dist-1.3.0-SNAPSHOT-dist.tar.gz . $ tar -zxvf apache-flume-1.3.0-SNAPSHOT-dist.tar.gz $ cd flume-1.3.0-SNAPSHOT
$ cp conf/flume-conf.properties.template conf/flume.conf
$ cp conf/flume-env.sh.template conf/flume-env.sh

通过配置即可以开始启动flume-ng这个分布式日志收集系统了。
编译过程可能发现内存溢出问题。
windows下修改mvn.bat
增加这一行即可:
set MAVEN_OPTS= -Xms512m -Xmx1024m -XX:PermSize=256m -XX:MaxPermSize=512m
linux下修改/etc/profile
export MAVEN_OPTS="-Xms2048m -Xmx3072m -XX:MaxPermSize=1024m"
source /etc/profile

编译过程发现
flume-ng-core下scripts/saveVersion.sh无权限执行的时候,给这个脚本赋权就好了
chmod 755 scripts/saveVersion.sh

编译过程可能发生
 Failed to execute goal org.apache.rat:apache-rat-plugin:0.7:check (test.
rat) on project flume-ng-core: Too many unapproved licenses: 1 ->  [Help 1]
删除flume-ng-core的\src\test\resources\event.txt即可
4.线上测试:
     a.单机测试
          flume-ng启动agent的命令为:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

添加如下配置文件
example.conf
# example.conf: A single-node Flume configuration

# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = localhost
agent1.sources.source1.port = 44444

# Describe sink1
agent1.sinks.sink1.type = logger

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
以上配置信息描述的是:
该配置文件中 配置了一个代理agent1 
在代理agent中配置了一个source(源)一个sink(接收器)和一个channel(通道),分别为:source1,sink1,channel1
source1的类型定义为netcat,对应该类型参数为bind和port 分别为localhost和44444
sink1的类型定义为logger,直接输出到日志文件中
channel的类型定义为内存方式,设置其参数capactiy和transactionCapacity分别为1000和100
指定source1和sink1的channel为channel1

现在我们可以启动看看我们的flume-ng是否成功。
在flume-ng的home目录下
$ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.root.logger=INFO,console --conf = conf

启动之后日志信息如下:
2012-07-30 15:42:30,708 (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)] Starting lifecycle supervisor 1
2012-07-30 15:42:30,713 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)] Flume node starting - agent1
2012-07-30 15:42:30,715 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:187)] Node manager starting
2012-07-30 15:42:30,716 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)] Configuration provider starting
2012-07-30 15:42:30,718 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)] Starting lifecycle supervisor 11
2012-07-30 15:42:30,720 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)] Reloading configuration file:example.conf
2012-07-30 15:42:30,726 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:988)] Processing:sink1
2012-07-30 15:42:30,726 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:988)] Processing:sink1
2012-07-30 15:42:30,726 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:902)] Added sinks: sink1 Agent: agent1
2012-07-30 15:42:30,747 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)] Post-validation flume configuration contains configuration  for agents: [agent1]
2012-07-30 15:42:30,747 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)] Creating channels
2012-07-30 15:42:30,808 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)] Monitoried counter group for type: CHANNEL, name: channel1, registered successfully.
2012-07-30 15:42:30,808 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)] created channel channel1
2012-07-30 15:42:30,828 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)] Creating instance of sink: sink1, type: logger
2012-07-30 15:42:30,832 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:92)] Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource@18f1d7e }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@d9660d counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel@bb0d0d} }
2012-07-30 15:42:30,832 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:99)] Starting Channel channel1
2012-07-30 15:42:30,835 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)] Component type: CHANNEL, name: channel1 started
2012-07-30 15:42:30,835 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:114)] Waiting for channel: channel1 to start. Sleeping for 500 ms
2012-07-30 15:42:31,338 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:127)] Starting Sink sink1
2012-07-30 15:42:31,338 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:138)] Starting Source source1
2012-07-30 15:42:31,339 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:147)] Source starting
2012-07-30 15:42:31,358 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:161)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
我们打开另外一个console:执行 telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
haha
OK
在日志控制台,我们看到如下信息
2012-07-30 15:42:40,780 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 61 68 61 0D                                  haha. }
说明我们的flume-ng运行良好。可以继续下一步操作…………
b.flume-ng 测试过程中event丢失部分body数据
          经过测试发现,当source端单event的body数据大于16字节后,输出到目标只剩下16字节。进过多源代码的分析,发现,源代码中进行了截取。
在LoggerSink.java中:
if (event != null) {
        if (logger.isInfoEnabled()) {
          logger.info("Event: " + EventHelper.dumpEvent(event));
        }
}
我们去看EventHelper.java的dumpEvent方法:
private static final int DEFAULT_MAX_BYTES = 16;
public static String dumpEvent(Event event) {
    return dumpEvent(event, DEFAULT_MAX_BYTES);
}

public static String dumpEvent(Event event, int maxBytes) {
    StringBuilder buffer = new StringBuilder();
    if (event == null || event.getBody() == null) {
      buffer.append("null");
    } else if (event.getBody().length == 0) {
      // do nothing... in this case, HexDump.dump() will throw an exception
    } else {
      byte[] body = event.getBody();
      byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));
      ByteArrayOutputStream out = new ByteArrayOutputStream();
      try {
        HexDump.dump(data, 0, out, 0);
        String hexDump = new String(out.toByteArray());
        // remove offset since it's not relevant for such a small dataset
        if(hexDump.startsWith(HEXDUMP_OFFSET)) {
          hexDump = hexDump.substring(HEXDUMP_OFFSET.length());
        }
        buffer.append(hexDump);
      } catch (Exception e) {
       if(LOGGER.isInfoEnabled()) {
         LOGGER.info("Exception while dumping event", e);
       }
        buffer.append("...Exception while dumping: ").append(e.getMessage());
      }
      String result = buffer.toString();
      if(result.endsWith(EOL) && buffer.length() > EOL.length()) {
        buffer.delete(buffer.length() - EOL.length(), buffer.length()).toString();
      }
    }
    return "{ headers:" + event.getHeaders() + " body:" + buffer + " }";
  }
不难看出,在event处理过程中,发生了数据截取操作。
     c.flume-ng avro2log demo配置详解(多机器)
flume-ng启动agent的命令为:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
添加如下配置文件
avro2log.conf
# avro2log.conf: A single-node Flume configuration
# excute next command in flumeng home dir
# bin/flume-ng agent --conf conf  --conf-file conf/avro2log.conf --name agent1 -Dflume.root.logger=INFO,console
# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1
agent1.sources.source1.type = avro
agent1.sources.source1.bind = 192.168.1.101
agent1.sources.source1.port = 44444

# Describe sink1
agent1.sinks.sink1.type = logger

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
现在我们可以启动看看我们的flume-ng是否成功。
在flume-ng的home目录下
$ bin/flume-ng agent --conf-file avro2log.conf --name agent1 -Dflume.root.logger=INFO,console --conf = conf
启动之后日志信息如下:
2012-08-08 14:53:17,522 (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)] Starting lifecycle supervisor 1
2012-08-08 14:53:17,527 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)] Flume node starting - agent1
2012-08-08 14:53:17,530 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:187)] Node manager starting
2012-08-08 14:53:17,533 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)] Starting lifecycle supervisor 10
2012-08-08 14:53:17,531 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)] Configuration provider starting
2012-08-08 14:53:17,534 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)] Reloading configuration file:conf/avro2log.conf
2012-08-08 14:53:17,540 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:988)] Processing:sink1
2012-08-08 14:53:17,541 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:988)] Processing:sink1
2012-08-08 14:53:17,541 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:902)] Added sinks: sink1 Agent: agent1
2012-08-08 14:53:17,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)] Post-validation flume configuration contains configuration  for agents: [agent1]
2012-08-08 14:53:17,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)] Creating channels
2012-08-08 14:53:17,622 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)] Monitoried counter group for type: CHANNEL, name: channel1, registered successfully.
2012-08-08 14:53:17,622 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)] created channel channel1
2012-08-08 14:53:17,634 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)] Monitoried counter group for type: SOURCE, name: source1, registered successfully.
2012-08-08 14:53:17,646 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)] Creating instance of sink: sink1, type: logger
2012-08-08 14:53:17,650 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:92)] Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:Avro source source1: { bindAddress: localhost, port: 44444 } }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@18f1d7e counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel@d9660d} }
2012-08-08 14:53:17,652 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:99)] Starting Channel channel1
2012-08-08 14:53:17,653 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)] Component type: CHANNEL, name: channel1 started
2012-08-08 14:53:17,653 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:114)] Waiting for channel: channel1 to start. Sleeping for 500 ms
2012-08-08 14:53:18,156 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:127)] Starting Sink sink1
2012-08-08 14:53:18,156 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:138)] Starting Source source1
2012-08-08 14:53:18,157 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:138)] Starting Avro source source1: { bindAddress: localhost, port: 44444 }...
2012-08-08 14:53:18,565 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)] Component type: SOURCE, name: source1 started
2012-08-08 14:53:18,565 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:155)] Avro source source1 started.
打开另外一个机器(192.168.1.102)consloe:
在目录/data/flumetmp下创建文件a
执行echo 11111111 >>a
我们再打开另外一个机器(192.168.1.102):执行 
bin/flume-ng avro-client --conf conf -H 192.168.1.101 -p 44444 -F /data/flumetmp/a
显示:
+ exec /opt/soft/jdk/bin/java -Xmx20m -cp '/data/software/flumeng/conf:/data/software/flumeng/lib/*' -Djava.library.path= org.apache.flume.client.avro.AvroCLIClient -H 192.168.1.101 -p 44444 -F /data/flumetmp/a
在日志控制台,我们看到如下信息
2012-08-08 14:53:18,565 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)] Component type: SOURCE, name: source1 started
2012-08-08 14:53:18,565 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:155)] Avro source source1 started.
2012-08-08 15:27:30,298 (New I/O server boss #1 ([id: 0x0050a649, /192.168.1.101:44444])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:141)] [id: 0x01c208b0, /192.168.1.102:55565 => /192.168.1.101:44444] OPEN
2012-08-08 15:27:30,304 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:141)] [id: 0x01c208b0, /192.168.1.102:55565 => /192.168.1.101:44444] BOUND: /192.168.1.101:44444
2012-08-08 15:27:30,304 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:141)] [id: 0x01c208b0, /192.168.1.102:55565 => /192.168.1.101:44444] CONNECTED: /192.168.1.102:55565
2012-08-08 15:27:30,749 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:141)] [id: 0x01c208b0, /192.168.1.102:55565 :> /192.168.1.101:44444] DISCONNECTED
2012-08-08 15:27:30,749 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:141)] [id: 0x01c208b0, /192.168.1.102:55565 :> /192.168.1.101:44444] UNBOUND
2012-08-08 15:27:30,749 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:141)] [id: 0x01c208b0, /192.168.1.102:55565 :> /192.168.1.101:44444] CLOSED
2012-08-08 15:27:33,227 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 31 31 31 31 31 31                               111111 }
avro source执行成功!
 类似资料: