# 使用kafka-log4j-appender实现日志采集
### 背景
kafka内核实现了log4j.AppenderSkeleton接口,通过配置log4j即可将日志生产到topic
### 配置方法
**step0:** 配置pom.xml
```pom
org.apache.kafka
kafka-clients
2.4.0
compile
org.apache.kafka
kafka-log4j-appender
2.4.0
compile
```
**step1:** 配置代码,样例如下:
```java
package com.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestKafkaLog4gAppender {
// 初始化logger
private static final Logger logger = LoggerFactory.getLogger(TestKafkaLog4gAppender.class);
public static void main(String[] args) throws IOException {
int start = 0;
while (true) {
// 使用logger来生产数据
logger.warn("string-" + start);
start++;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
```
**step2:** 配置log4j.properties,样例如下
> 注意:log4j.properties文件必须放在src/resource目录下
- 安全场景(通过21007端口进行数据生产)
```python
log4j.rootLogger=debug,Console
# appender kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 数据发送的topic
log4j.appender.kafka.topic=test
log4j.appender.kafka.syncSend=false
# bootstrap-server配置,多个节点间用逗号","隔开
log4j.appender.kafka.brokerList=xxx.xxx.xxx.xxx:21007
log4j.appender.kafka.saslKerberosServiceName = kafka
log4j.appender.kafka.securityProtocol = SASL_PLAINTEXT
# 配置krb5.config所在路径,安全场景下必须配置
log4j.appender.kafka.kerb5ConfPath = D:\\work\\test\\kafka\\src\\resource\\conf\\krb5.conf
# 配置jaas.conf路径,安全场景下必须配置
log4j.appender.kafka.clientJaasConfPath = C:\\xxx\\xxx\\xxx\\xxx\\xxx\\xxx\\jaas.conf
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
#输出日志到控制台
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.Threshold=all
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n
# kafka
# 指明向kafka生产数据的类的入口
log4j.logger.com.test=warn,kafka
```
- 非安全场景(通过21005端口进行数据生产)
```log
log4j.rootLogger=debug,Console
# appender kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 数据发送的topic
log4j.appender.kafka.topic=test
log4j.appender.kafka.syncSend=false
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=xxx.xxx.xxx.xxx:21005
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
#输出日志到控制台
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.Threshold=all
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n
#kafka
log4j.logger.com.test=warn,kafka
```
**step3:** 运行代码。可以idea本地调试,也可以打包到集群上运行,这里以idea本地调试为例
- 生产数据
![Send.PNG](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202007/07/112056xlot6iabrevvctxd.png)
- 消费数据
![Consumer.PNG](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202007/07/112134q9x0zzeyckiqxvui.png)