Flume-ng

尉迟兴修
2023-12-01

埋点:

  1. 埋点分析,是网站分析的一种常用的数据采集方法。数据埋点分为初级、中级、高级三种方式。数据埋点是一种良好的私有化部署数据采集方式。
  2. 埋点技术如何采集数据,有何优缺点?
    数据埋点分为初级、中级、高级三种方式,分别为:
    初级:在产品、服务转化关键点植入统计代码,据其独立ID确保数据采集不重复(如购买按钮点击率);
    中级:植入多段代码,追踪用户在平台每个界面上的系列行为,事件之间相互独立(如打开商品详情页——选择商品型号——加入购物车——下订单——购买完成);
    高级:联合公司工程、ETL采集分析用户全量行为,建立用户画像,还原用户行为模型,作为产品分析、优化的基础。
    无疑,数据埋点是一种良好的私有化部署数据采集方式。数据采集准确,满足了企业去粗取精,实现产品、服务快速优化迭代的需求。
    但,因手动埋点工程量极大,且一不小心容易出错,成为很多工程师的痛。且其开发周期长,耗时费力,很多规模较小的公司并不具备自己埋点的能力。无埋点成为市场新宠。最后埋点、无埋点两种技术谁能成为最后赢家,我们拭目以待。

数据采集:

  1. 埋点:监听客户端浏览器
  2. spooldir:监听文件
  3. log4j:生成session日志,导入dpooldir



概念:

  • flume是分布式大量log数据的搜集、聚合、转移工具
  1. 分布式
  2. 可靠性强、容错、协调机制、错误恢复机制
  3. 高效
  • 基于数据流的组件完成数据采集
  • ng版本
  1. 去除了master、agent、collector等集群角色
  2. 用一种动态的方式来形成分布式数据采集
  3. 只有一种角色:agent(sources、shannels、sinks)
  4. 数据在agent的三个组件中流转的形态是Event
    1. Event是一个类型
    2. 每一个event都是一个对象
    3. 每一个event包含两个对象:
      1. header:存放多个kv类型,一般是元数据
      2. body:被采集的正式内容(二进制形式)

常用组件:

  • source:
  1. netcat:从网络端口接收数据(测试调试)
  2. avro source:
  3. jms:从mq中采集数据进flume
  4. spooling directory:
    1. 监听本地文件夹,把文件和新增文件采集到flume
    2. 如果一个文件在放入spooldir之后再往该文件中写入内容的话flume会报错
    3. 一个文件以aaa的名字放入spooldir之后就不允许在网spooldir中存放行的aaa名称的文件
  5. taildir:监听文件,实时采集文件中的新增内容
  6. kafka:从kafka中获取流数据
  7. Sequence Generator Source :自动生成序列测试数据
  8. Syslog Source :采集系统日志
  9. HTTP Source :接受http协议请求发送来的数据
  • sink:
  1. logger:把数据输出成logger日志(测试调试)
  2. hdfs:
  3. hive:
  4. hbase:
  5. avro sink:
  6. ElasticSearchSink:
  7. Kafka Sink:
  8. HTTP:
  • channel
  • interceptor :
  1. 拦截每一个source的event,并对event进行加工处理或者过滤
  2. 类型:
    1. timestamp interceptor : 给每一个event的header里面添加一个kv:k由参数指定,v时间戳
    2. host interceptor :给每event的header添加一个kv:k:指host,v:ip或者主机名

  • selector

标准输入输出:

  • free指令
  • JMS:Java消息服务(Java Message Service)java对接消息队列(少数MQ实现了java接口)
  • JDBC: Java数据库连接(Java DataBase Connectivity)
  • ORM:对象关系映射(Object Relational Mapping)
  • MQ: Message Queue 消息队列


Flume配置:

  • netstat -alnp|grep 8888 进程查看
  • netstat -an|grep LISTEN 进程查看
  • 进程关闭:kill -9  进程编号
  • 在apache-flume-1.8.0-bin/ 
  1. yum install nc 下载
  2. yum install telnet 下载
  3. 每个节点上面都要下载
  4. 两个节点进行测试:
    运行输入指令:telnet master 8888
    运行接受指令:nc -lk 8888
  • 在apache-flume-1.8.0-bin/  新建文件夹resouce
  1. 在linux本地运行的两个节点里面发送和接收的信息采集,新建配置文件nc_to_logger.conf:
    运行输入指令:telnet master 8888
    运行接受指令:nc -lk 8888
    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = master
    a1.sources.s1.port = 8888
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

  2. 运行:
    flume-ng agent -c conf -f (运行文件名) --name (脚本内自定义名) -Dflume.root.logger=INFO,console
  3. 连接运行节点:
    telnet master 8888
  4. 退出节点方法:
    CTRL+]键,这时会强制退到telnet命令界面下,再用quit退出就行了
  • 监听linux上本地文件夹,把文件采集到flume:
  1. 选择监听目录
  2. 运行后把文件导入到监听的文件夹内
    运行输入指令:telnet master 8888
    运行接受指令:flume-ng agent -c conf -f spoolingdir_to_logger.conf --name a1 -Dflume.root.logger=INFO,console
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    
    a1.sources.s1.type = spooldir
    a1.sources.s1.spoolDir = /opt/hadoop/apache-flume-1.8.0-bin/resources/test_spooling_dir
    a1.sources.s1.fileHeader = true
    
    a1.channels.c1.type = memory
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
  • 把监听的数据上传到hdfs上:
  1. 设置hdfs目标目录
  2. 运行后默认每间隔30秒生成一个时间文件夹
    运行输入指令:telnet master 8888
    运行接受指令:flume-ng agent -c conf netcat_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = master
    a1.sources.s1.port = 8888
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /from_flume/%Y-%m-%d-%H-%M
    a1.sinks.k1.hdfs.rollInterval = 0
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.useLocalTimeStamp = true 获取的是本地时间戳,实时时间戳需要加拦截器
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
  • 监听Windows上本地文件夹,把文件采集到flume然后再上传到hdfs:
  1. 选择Windows本地监听文件夹
  2. 运行服务:先运行接受端,在运行发送端
  3. 把文件导入到监听的文件内
    1. 发送端在Windows端运行,先解压flume文件
    2. 把运行的文件放到bin目录下
    3. 运行时用dox命令进入到bin目录下,报错可去掉 :-c conf
      运行指令:flume-ng agent -c conf -f windows_spooldir_to_avro.conf --name a1 -Dflume.root.logger=INFO,console
      
      a1.sources = s1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.s1.type = spooldir
      a1.sources.s1.spoolDir = E:/apache-flume-1.8.0-bin/resources/spoolingdir
      a1.sources.s1.fileHeader = true
      
      a1.channels.c1.type = memory
      
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = master
      a1.sinks.k1.port = 8888
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channels = c1
  4. 接收端在linux运行:
    1. 选择hdfs存储文件夹
    2. 按时间上进行存储
      运行指令:flume-ng agent -c conf -f linux_avro_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console
      a1.sources = s1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.s1.type = avro
      a1.sources.s1.bind = master
      a1.sources.s1.port = 8888
      
      a1.channels.c1.type = memory
      
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = /from_flume_window/%Y-%m-%d-%H-%M
      a1.sinks.k1.hdfs.rollInterval = 0
      a1.sinks.k1.hdfs.rollSize = 0
      a1.sinks.k1.hdfs.rollCount = 0
      a1.sinks.k1.hdfs.fileType = DataStream
      a1.sinks.k1.hdfs.writeFormat = Text
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
  • 实时抓取日志数据:
  1. flume配置:
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.s1.type = avro
    a1.sources.s1.bind = master
    a1.sources.s1.port = 8888
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
  2. Java代码:
    package com.zhiyou.bd23;
    import java.nio.charset.Charset;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    public class FlumeClientUitls {
    	private static final String HOST_NAME = "master";
    	private static final int PORT = 8888;
    	
    	public static RpcClient getClient(){
    		RpcClient client = RpcClientFactory.getDefaultInstance(HOST_NAME, PORT);
    		return client;
    	}
    	public static void clientClose(RpcClient client){
    		client.close();
    	}
    	public static void sendData(RpcClient client, String msg){
    		Event event = EventBuilder.withBody(msg, Charset.forName("UTF-8"));
    		try {
    			client.append(event);
    		} catch (EventDeliveryException e) {
    			e.printStackTrace();
    			System.out.println("发送消息["+msg+"]失败");
    		}
    	}
    	public static void main(String[] args){
    		RpcClient client = FlumeClientUitls.getClient();
    		for(int i=0;i<20;i++){
    			sendData(client, "msg"+i);
    		}
    		clientClose(client);
    	}
    }
    
  • 高可用failover(故障转移,故障切换,容错,失效接管)当一个节点挂掉时启动另一个节点:
  1. flume配置:
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.s1.type = avro
    a1.sources.s1.bind = master
    a1.sources.s1.port = 8888
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
  2. java代码:
    package com.zhiyou.bd23;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.nio.charset.Charset;
    import java.util.Properties;
    import org.apache.flume.Event;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    public class FailOverClientTest {
    	//服务端:master slaver1
    	//端口号:8888
    	//激活节点:master,备用节点:slaver1。master挂掉slaver1接上
    	public static void main(String[] args) throws Exception {
    		Properties properties = new Properties();
    		InputStream inputStream = new FileInputStream("src/main/resources/failover_client.properties");
    		properties.load(inputStream);
    		RpcClient client = RpcClientFactory.getInstance(properties);
    		for (int i = 0; i < 100; i++) {
    			Event event = EventBuilder.withBody("ags"+i,Charset.forName("UTF-8"));
    			client.append(event);
    			Thread.sleep(2000);
    		}
    		client.close();
    	}
    }
  3. java端口配置file文件:
    file文件名:failover_client.properties
    
    client.type = default_failover
    hosts = h1 h2
    
    hosts.h1 = master:8888
    hosts.h2 = slaver1:8888
  • 均衡区balance,用了两个agent平均分配处理数据:
  1. flume配置:
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.s1.type = avro
    a1.sources.s1.bind = master
    a1.sources.s1.port = 8888
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
  2. Java实例代码:
    package com.zhiyou.bd23;
    
    import java.io.FileInputStream;
    import java.nio.charset.Charset;
    import java.util.Properties;
    import org.apache.flume.Event;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    
    public class LoadBalanceClientTest {
    	public static void main(String[] args) throws Exception {
    		Properties properties = new Properties();
    		properties.load(new FileInputStream("src/main/resources/loadbalance_client.propeties"));
    		RpcClient client = RpcClientFactory.getInstance(properties);
    		for (int i = 0; i <200; i++) {
    			Event event = EventBuilder.withBody("asg"+i,Charset.forName("UTF-8"));
    			client.append(event);
    		}
    		client.close();
    	}
    }
  3. java端口及分配配置file文件:
    file文件名:loadbalance_client.propeties
    client.type = default_loadbalance
    hosts = h1 h2
    hosts.h1 = master:8888
    hosts.h2 = slaver1:8888
    host-selector = round_robin
  • 选择器Selector,把选择的数据传到hdfs上:
  1. flume配置:
    a1.sources = s1
    a1.channels = c1 c2 c3
    a1.sinks = k1 k2 k3
    
    a1.sources.s1.type = avro
    a1.sources.s1.bind = master
    a1.sources.s1.port = 8888
    
    a1.channels.c1.type = memory
    a1.channels.c2.type = memory
    a1.channels.c3.type = memory
    
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/s_henan
    a1.sinks.k1.hdfs.rollInterval = 0
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    a1.sinks.k2.type = hdfs
    a1.sinks.k2.hdfs.path = /flume/s_shandong
    a1.sinks.k2.hdfs.rollInterval = 0
    a1.sinks.k2.hdfs.rollSize = 0
    a1.sinks.k2.hdfs.rollCount = 0
    a1.sinks.k2.hdfs.fileType = DataStream
    a1.sinks.k2.hdfs.writeFormat = Text
    a1.sinks.k2.hdfs.useLocalTimeStamp = true
    
    a1.sinks.k3.type = hdfs
    a1.sinks.k3.hdfs.path = /flume/s_qita
    a1.sinks.k3.hdfs.rollInterval = 0
    a1.sinks.k3.hdfs.rollSize = 0
    a1.sinks.k3.hdfs.rollCount = 0
    a1.sinks.k3.hdfs.fileType = DataStream
    a1.sinks.k3.hdfs.writeFormat = Text
    a1.sinks.k3.hdfs.useLocalTimeStamp = true
    
    
    a1.sources.s1.channels = c1 c2 c3
    a1.sources.s1.selector.type = multiplexing
    a1.sources.s1.selector.header = province
    a1.sources.s1.selector.mapping.henan = c1
    a1.sources.s1.selector.mapping.shandong = c2
    a1.sources.s1.selector.default = c3
    
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    a1.sinks.k3.channel = c3
  2. Java实例代码:
    package com.zhiyou.bd23;
    import java.nio.charset.Charset;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import org.apache.flume.Event;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.event.EventBuilder;
    public class SelectorTest {
    	//随机产生数据,加上地区标识,地区有henan、shandong、shanghai、beijing
    		//要求河南的数据存放到hdfs的/flume/s_henan目录
    //			要求山东的的数据存放到hdfs的/flume/s_shandong目录
    		//要求其他的的数据存放到hdfs的/flume/s_qita目录
    		public static void main(String[] args) throws Exception{
    			Random random = new Random();
    			String[] provinces = new String[]{"henan","shandong","shanghai","beijing"};
    			RpcClient client = FlumeClientUitls.getClient();
    			for(int i=0;i<1000;i++){
    				String province = provinces[random.nextInt(provinces.length)];
    				Event event = EventBuilder.withBody("msg,"+province+","+i, Charset.forName("UTF-8"));
    				Map<String,String> header = new HashMap<String,String>();
    				header.put("province", province);
    				event.setHeaders(header);
    				System.out.println("msg,"+province+","+i);
    				client.append(event);
    			}
    			client.close();
    		}
    	}
    
  • Search and Replace Interceptor搜索和替换拦截器:

  1. flume配置:
    应用了正则表达式:(\\d{3})\\d{4}(\\d{4})
    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = avro
    a1.sources.s1.bind = master
    a1.sources.s1.port = 8888
    
    a1.sources.s1.interceptors = i1
    a1.sources.s1.interceptors.i1.type = search_replace
    a1.sources.s1.interceptors.i1.searchPattern = (\\d{3})\\d{4}(\\d{4})
    a1.sources.s1.interceptors.i1.replaceString = $1xxxx$2
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type=logger
    
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
  2. java实例代码:
    package com.zhiyou.bd23;
    import java.nio.charset.Charset;
    import java.util.Random;
    import org.apache.flume.Event;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.event.EventBuilder;
    public class MockPhoneNums {
    	public static void main(String[] args) throws Exception{
    		String[] no_frags = new String[]{"135","136","138","158","139","152","130","131"};
    		Random random = new Random();
    		RpcClient client = FlumeClientUitls.getClient();
    		for(int i=0;i<100;i++){
    			String pre = no_frags[random.nextInt(no_frags.length)];
    			for(int j=0;j<8;j++){
    				pre += random.nextInt(10);
    			}
    			System.out.println(pre);
    			Event event = EventBuilder.withBody("pn:"+pre+"id:"+i+",name:user_"+i,Charset.forName("utf-8"));
    			client.append(event);
    		}
    		client.close();
    	}
    }
    



























 类似资料: