标准输入输出:
运行输入指令:telnet master 8888
运行接受指令:nc -lk 8888
运行输入指令:telnet master 8888
运行接受指令:nc -lk 8888
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c conf -f (运行文件名) --name (脚本内自定义名) -Dflume.root.logger=INFO,console
telnet master 8888
CTRL+]键,这时会强制退到telnet命令界面下,再用quit退出就行了
运行输入指令:telnet master 8888
运行接受指令:flume-ng agent -c conf -f spoolingdir_to_logger.conf --name a1 -Dflume.root.logger=INFO,console
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/hadoop/apache-flume-1.8.0-bin/resources/test_spooling_dir
a1.sources.s1.fileHeader = true
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
运行输入指令:telnet master 8888
运行接受指令:flume-ng agent -c conf netcat_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = netcat
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.channels.c1.type = memory
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /from_flume/%Y-%m-%d-%H-%M
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true 获取的是本地时间戳,实时时间戳需要加拦截器
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
运行指令:flume-ng agent -c conf -f windows_spooldir_to_avro.conf --name a1 -Dflume.root.logger=INFO,console
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = E:/apache-flume-1.8.0-bin/resources/spoolingdir
a1.sources.s1.fileHeader = true
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 8888
a1.sources.s1.channels = c1
a1.sinks.k1.channels = c1
运行指令:flume-ng agent -c conf -f linux_avro_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.channels.c1.type = memory
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /from_flume_window/%Y-%m-%d-%H-%M
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.channels.c1.type = memory
a1.sinks.k1.type=logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
package com.zhiyou.bd23;
import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
public class FlumeClientUitls {
private static final String HOST_NAME = "master";
private static final int PORT = 8888;
public static RpcClient getClient(){
RpcClient client = RpcClientFactory.getDefaultInstance(HOST_NAME, PORT);
return client;
}
public static void clientClose(RpcClient client){
client.close();
}
public static void sendData(RpcClient client, String msg){
Event event = EventBuilder.withBody(msg, Charset.forName("UTF-8"));
try {
client.append(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
System.out.println("发送消息["+msg+"]失败");
}
}
public static void main(String[] args){
RpcClient client = FlumeClientUitls.getClient();
for(int i=0;i<20;i++){
sendData(client, "msg"+i);
}
clientClose(client);
}
}
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.channels.c1.type = memory
a1.sinks.k1.type=logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
package com.zhiyou.bd23;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
public class FailOverClientTest {
//服务端:master slaver1
//端口号:8888
//激活节点:master,备用节点:slaver1。master挂掉slaver1接上
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
InputStream inputStream = new FileInputStream("src/main/resources/failover_client.properties");
properties.load(inputStream);
RpcClient client = RpcClientFactory.getInstance(properties);
for (int i = 0; i < 100; i++) {
Event event = EventBuilder.withBody("ags"+i,Charset.forName("UTF-8"));
client.append(event);
Thread.sleep(2000);
}
client.close();
}
}
file文件名:failover_client.properties
client.type = default_failover
hosts = h1 h2
hosts.h1 = master:8888
hosts.h2 = slaver1:8888
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.channels.c1.type = memory
a1.sinks.k1.type=logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
package com.zhiyou.bd23;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
public class LoadBalanceClientTest {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.load(new FileInputStream("src/main/resources/loadbalance_client.propeties"));
RpcClient client = RpcClientFactory.getInstance(properties);
for (int i = 0; i <200; i++) {
Event event = EventBuilder.withBody("asg"+i,Charset.forName("UTF-8"));
client.append(event);
}
client.close();
}
}
file文件名:loadbalance_client.propeties
client.type = default_loadbalance
hosts = h1 h2
hosts.h1 = master:8888
hosts.h2 = slaver1:8888
host-selector = round_robin
a1.sources = s1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3
a1.sources.s1.type = avro
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c3.type = memory
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/s_henan
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /flume/s_shandong
a1.sinks.k2.hdfs.rollInterval = 0
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k3.type = hdfs
a1.sinks.k3.hdfs.path = /flume/s_qita
a1.sinks.k3.hdfs.rollInterval = 0
a1.sinks.k3.hdfs.rollSize = 0
a1.sinks.k3.hdfs.rollCount = 0
a1.sinks.k3.hdfs.fileType = DataStream
a1.sinks.k3.hdfs.writeFormat = Text
a1.sinks.k3.hdfs.useLocalTimeStamp = true
a1.sources.s1.channels = c1 c2 c3
a1.sources.s1.selector.type = multiplexing
a1.sources.s1.selector.header = province
a1.sources.s1.selector.mapping.henan = c1
a1.sources.s1.selector.mapping.shandong = c2
a1.sources.s1.selector.default = c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
package com.zhiyou.bd23;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.event.EventBuilder;
public class SelectorTest {
//随机产生数据,加上地区标识,地区有henan、shandong、shanghai、beijing
//要求河南的数据存放到hdfs的/flume/s_henan目录
// 要求山东的的数据存放到hdfs的/flume/s_shandong目录
//要求其他的的数据存放到hdfs的/flume/s_qita目录
public static void main(String[] args) throws Exception{
Random random = new Random();
String[] provinces = new String[]{"henan","shandong","shanghai","beijing"};
RpcClient client = FlumeClientUitls.getClient();
for(int i=0;i<1000;i++){
String province = provinces[random.nextInt(provinces.length)];
Event event = EventBuilder.withBody("msg,"+province+","+i, Charset.forName("UTF-8"));
Map<String,String> header = new HashMap<String,String>();
header.put("province", province);
event.setHeaders(header);
System.out.println("msg,"+province+","+i);
client.append(event);
}
client.close();
}
}
应用了正则表达式:(\\d{3})\\d{4}(\\d{4})
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = master
a1.sources.s1.port = 8888
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = search_replace
a1.sources.s1.interceptors.i1.searchPattern = (\\d{3})\\d{4}(\\d{4})
a1.sources.s1.interceptors.i1.replaceString = $1xxxx$2
a1.channels.c1.type = memory
a1.sinks.k1.type=logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
package com.zhiyou.bd23;
import java.nio.charset.Charset;
import java.util.Random;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.event.EventBuilder;
public class MockPhoneNums {
public static void main(String[] args) throws Exception{
String[] no_frags = new String[]{"135","136","138","158","139","152","130","131"};
Random random = new Random();
RpcClient client = FlumeClientUitls.getClient();
for(int i=0;i<100;i++){
String pre = no_frags[random.nextInt(no_frags.length)];
for(int j=0;j<8;j++){
pre += random.nextInt(10);
}
System.out.println(pre);
Event event = EventBuilder.withBody("pn:"+pre+"id:"+i+",name:user_"+i,Charset.forName("utf-8"));
client.append(event);
}
client.close();
}
}