5.3.3.3-基于-Spark-Streaming、Kafka、Flume--日志流处理系统搭建
优质
小牛编辑
131浏览
2023-12-01
1.1 配置依赖
<!-- log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender -->
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.8.0</version>
</dependency>
1.2 配置 log4j.properties
注:在 resource 目录下
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.1.149
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
1.3 代码
import org.apache.log4j.Logger;
public class LogGenerator {
private static Logger logger=Logger.getLogger(LogGenerator.class);
private static int index=0;
public static void main(String[] args) throws InterruptedException {
while (true){
Thread.sleep(1000);
logger.info("value:"+index++);
}
}
}
2. flume
2.1 flume 配置
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
#agent1.sinks.log-sink.type=logger
agent1.sinks.kafka-sink.channel = logger-channel
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.kafka.topic = mytopic # 需提前创建
agent1.sinks.kafka-sink.kafka.bootstrap.servers = 192.168.1.149:9092
agent1.sinks.kafka-sink.kafka.flumeBatchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
2.2 启动命令
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming.conf --name agent1 -Dflume.root.logger=INFO,console
3. Spark Streaming
3.1 核心概念
核心概念 | |
---|---|
StreamingContext | 初始化方法。 |
Dstream | |
Input Dstream | |
Transformations | |
Output Operations |
3.2 Operator
函数 | 功能 |
---|---|
updateStateByKey | 保持状态,累积计算。 |
transform | 将流转换为 RDD 进行运算,有返回值。 |
forEachRDD | 将流转换为 RDD 进行运算,有返回值。 |
窗口函数 |