当前位置: 首页 > 工具软件 > kafka-php > 使用案例 >

php使用kafka进行日志采集,使用kafka-log4j-appender实现日志采集到kafka

柯轶
2023-12-01

# 使用kafka-log4j-appender实现日志采集

### 背景

kafka内核实现了log4j.AppenderSkeleton接口,通过配置log4j即可将日志生产到topic

### 配置方法

**step0:** 配置pom.xml

```pom

org.apache.kafka

kafka-clients

2.4.0

compile

org.apache.kafka

kafka-log4j-appender

2.4.0

compile

```

**step1:** 配置代码,样例如下:

```java

package com.test;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class TestKafkaLog4gAppender {

// 初始化logger

private static final Logger logger = LoggerFactory.getLogger(TestKafkaLog4gAppender.class);

public static void main(String[] args) throws IOException {

int start = 0;

while (true) {

// 使用logger来生产数据

logger.warn("string-" + start);

start++;

try {

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

```

**step2:** 配置log4j.properties,样例如下

> 注意:log4j.properties文件必须放在src/resource目录下

- 安全场景(通过21007端口进行数据生产)

```python

log4j.rootLogger=debug,Console

# appender kafka

log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender

# 数据发送的topic

log4j.appender.kafka.topic=test

log4j.appender.kafka.syncSend=false

# bootstrap-server配置,多个节点间用逗号","隔开

log4j.appender.kafka.brokerList=xxx.xxx.xxx.xxx:21007

log4j.appender.kafka.saslKerberosServiceName = kafka

log4j.appender.kafka.securityProtocol = SASL_PLAINTEXT

# 配置krb5.config所在路径,安全场景下必须配置

log4j.appender.kafka.kerb5ConfPath = D:\\work\\test\\kafka\\src\\resource\\conf\\krb5.conf

# 配置jaas.conf路径,安全场景下必须配置

log4j.appender.kafka.clientJaasConfPath = C:\\xxx\\xxx\\xxx\\xxx\\xxx\\xxx\\jaas.conf

log4j.appender.kafka.layout=org.apache.log4j.PatternLayout

log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

#输出日志到控制台

log4j.appender.Console=org.apache.log4j.ConsoleAppender

log4j.appender.Console.Threshold=all

log4j.appender.Console.layout=org.apache.log4j.PatternLayout

log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n

# kafka

# 指明向kafka生产数据的类的入口

log4j.logger.com.test=warn,kafka

```

- 非安全场景(通过21005端口进行数据生产)

```log

log4j.rootLogger=debug,Console

# appender kafka

log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender

# 数据发送的topic

log4j.appender.kafka.topic=test

log4j.appender.kafka.syncSend=false

# multiple brokers are separated by comma ",".

log4j.appender.kafka.brokerList=xxx.xxx.xxx.xxx:21005

log4j.appender.kafka.layout=org.apache.log4j.PatternLayout

log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

#输出日志到控制台

log4j.appender.Console=org.apache.log4j.ConsoleAppender

log4j.appender.Console.Threshold=all

log4j.appender.Console.layout=org.apache.log4j.PatternLayout

log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n

#kafka

log4j.logger.com.test=warn,kafka

```

**step3:** 运行代码。可以idea本地调试,也可以打包到集群上运行,这里以idea本地调试为例

- 生产数据

![Send.PNG](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202007/07/112056xlot6iabrevvctxd.png)

- 消费数据

![Consumer.PNG](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202007/07/112134q9x0zzeyckiqxvui.png)

 类似资料: