5.3.3.3-基于-Spark-Streaming、Kafka、Flume--日志流处理系统搭建

优质
小牛编辑
127浏览
2023-12-01

1.1 配置依赖

<!-- log4j -->
<dependency>
     <groupId>log4j</groupId>
     <artifactId>log4j</artifactId>
     <version>1.2.17</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
     <groupId>org.apache.flume</groupId>
     <artifactId>flume-ng-sdk</artifactId>
     <version>1.8.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender -->
<dependency>
     <groupId>org.apache.flume.flume-ng-clients</groupId>
     <artifactId>flume-ng-log4jappender</artifactId>
     <version>1.8.0</version>
</dependency>

1.2 配置 log4j.properties

注:在 resource 目录下

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.1.149
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

1.3 代码

import org.apache.log4j.Logger;

public class LogGenerator {
    private static Logger logger=Logger.getLogger(LogGenerator.class);
    private static int index=0;

    public static void main(String[] args) throws InterruptedException {
        while (true){
            Thread.sleep(1000);
            logger.info("value:"+index++);
        }
    }
}

2. flume

2.1 flume 配置

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
#agent1.sinks.log-sink.type=logger
agent1.sinks.kafka-sink.channel = logger-channel
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.kafka.topic = mytopic # 需提前创建
agent1.sinks.kafka-sink.kafka.bootstrap.servers = 192.168.1.149:9092
agent1.sinks.kafka-sink.kafka.flumeBatchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

2.2 启动命令

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming.conf --name agent1 -Dflume.root.logger=INFO,console

3. Spark Streaming

3.1 核心概念

核心概念
StreamingContext初始化方法。
Dstream
Input Dstream
Transformations
Output Operations

3.2 Operator

函数功能
updateStateByKey保持状态,累积计算。
transform将流转换为 RDD 进行运算,有返回值。
forEachRDD将流转换为 RDD 进行运算,有返回值。
窗口函数

4. 存储

参考资料