原生API集成
pom.xml依赖
<!--必须提前搭建AVRO Source-->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
单机连接
public class RpcClientTests {
private RpcClient client;
@Before
public void before(){
client= RpcClientFactory.getDefaultInstance("CentOS",44444);
}
@Test
public void testSend() throws EventDeliveryException {
Event event= EventBuilder.withBody("this is body".getBytes());
HashMap<String, String> header = new HashMap<String, String>();
header.put("from","Dora");
event.setHeaders(header);
client.append(event);
}
@After
public void after(){
client.close();
}
}
集群连接
故障转移
//参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
public class RpcClientTests02_FailoverClient {
private RpcClient client;
@Before
public void before(){
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
props.put("hosts.h1", "CentOSA:44444");
props.put("hosts.h2","CentOSB:44444");
props.put("hosts.h3", "CentOSC:44444");
client= RpcClientFactory.getInstance(props);
}
@Test
public void testSend() throws EventDeliveryException {
Event event= EventBuilder.withBody("this is body".getBytes());
HashMap<String, String> header = new HashMap<String, String>();
header.put("from","zhangsan");
event.setHeaders(header);
client.append(event);
}
@After
public void after(){
client.close();
}
}
负载均衡
//参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
public class RpcClientTests02_LoadBalancing {
private RpcClient client;
@Before
public void before(){
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
props.put("hosts.h1", "CentOSA:44444");
props.put("hosts.h2", "CentOSB:44444");
props.put("hosts.h3", "CentOSC:44444");
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
client= RpcClientFactory.getInstance(props);
}
@Test
public void testSend() throws EventDeliveryException {
Event event= EventBuilder.withBody("this is body".getBytes());
HashMap<String, String> header = new HashMap<String, String>();
header.put("from","lisi");
event.setHeaders(header);
client.append(event);
}
@After
public void after(){
client.close();
}
}
log4j集成(传统)
pom依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
log4j.properties
log4j.appender.flume= org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.flume.Hosts = CentOSA:44444 CentOSB:44444 CentOSC:44444
log4j.appender.flume.Selector = RANDOM
log4j.logger.com.Dora = DEBUG,flume
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%p %d %c %m %n
测试案例
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class TestLog {
private static Log log= LogFactory.getLog(TestLog.class);
public static void main(String[] args) {
log.debug("你好!_debug");
log.info("你好!_info");
log.warn("你好!_warn");
log.error("你好!_error");
}
}
SpringBoot 集成
pom依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--junit测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
log4j.properties
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender" >
<encoder>
<pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="flume" class="com.gilt.logback.flume.FlumeLogstashV1Appender">
<flumeAgents>
CentOS:44444,
CentOS:44444,
CentOS:44444
</flumeAgents>
<flumeProperties>
connect-timeout=4000;
request-timeout=8000
</flumeProperties>
<batchSize>1</batchSize>
<reportingWindow>1</reportingWindow>
<additionalAvroHeaders>
myHeader=myValue
</additionalAvroHeaders>
<application>smapleapp</application>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%n</pattern>
</layout>
</appender>
<!-- 控制台输出日志级别 -->
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
<logger name="com.Dora.service" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="flume" />
</logger>
</configuration>
实现类
import com.Dora.service.IUserSerivice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class UserService implements IUserSerivice {
private static final Logger LOG= LoggerFactory.getLogger(UserService.class);
@Override
public String sayHello(String name) {
LOG.info("hello "+name);
return "hello "+name;
}
}
spring启动类
@SpringBootApplication
public class FlumeAplication {
public static void main(String[] args) {
SpringApplication.run(FlumeAplication.class,args);
}
}
测试类
@SpringBootTest(classes = {KafkaSpringBootApplication.class})
@RunWith(SpringRunner.class)
public class KafkaTempolateTests {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private IOrderService orderService;
@Test
public void testOrderService(){
orderService.saveOrder("002","Doras xxxxx ");
}
@Test
public void testKafkaTemplate(){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo"));
}
});
}
}