Flume+Kafla+Spark Streaming链路打通

宗政财
2023-12-01

一、日志生成

1、日志生成器开发之产生url和ip信息

vim generate_log.py

import random

url_paths=[

	"class/112.html",
	"class/128.html",
	"class/145.html",
	"class/146.html",
	"class/131.html",
	"class/130.html",
	"learn/821",
	"course/list"

]

ip_slices=[

132,156,124,10,29,167,143,187,30,46,552,63,72,98,168
]

def sample_url():
	return random.sample(url_paths,1)[0]


def sample_ip():

	slice=random.sample(ip_slices,4)
	return ".".join([str(item) for item in slice)


def generate_log():
	while count>=1:
		query_log="{url}\t{ip}".format(url=sample_url(),ip=sample_url())
		print query_log
			count=count-1

if name==‘main’:
generate_log()#im

2、日志生成器开发之产生referer和状态码信息
3、日志生成器开发之产生日志产生时间
4、将产生的日志到服务器上测试并将日志写入文件中
5、通过Linux定时调试器工具每一分钟产生一批数据

python /Users/liujingmao/data/generate_log.py

*/1 * * * * /Users/liujingmao/data/log_generator.sh

二、使用Flume实时收集日志信息

Streaming_project.conf

access.log ==>控制台输出
exec   memory   log
#name the componets of Flume
exec-memory-logger.sources = exec-source
exec-memory-logger.sinks = logger-sink
exec-memory-logger.channels = memory-channel

#config source
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /Users/liujingmao/data/access.log
exec-memory-logger.sources.exec-source..shell = /bin/sh -c

#config channel
exec-memory-logger.channels.memory-channel.type=memory
exec-memory-logger.sinks.logger-sink.type=logger
exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.logger-sink.channel=memory

复制这个指令的时候,不要复制\ 后面的空格,否则会报错

flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file /Users/liujingmao/data/streaming_project.conf \
-Dflume.root.logger=INFO,console


bin/flume-ng agent --name exec-memory-logger --conf $FLUME_HOME/conf --conf-file /Users/liujingmao/data/streaming_project.conf -Dflume.root.logger =INFO,console

bin/flume-ng agent -c --name exec-memory-logger --conf -f $FLUME_HOME/conf -n a1 --conf-file /Users/liujingmao/data/streaming_project.conf -Dflume.root.logger =INFO,console

./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console


./flume-ng agent -c . -f  /opt/apps/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

ERROR node.Application: A fatal error occurred while running. Exception follows.
org.apache.commons.cli.MissingOptionException: Missing required option: n
at org.apache.commons.cli.Parser.checkRequiredOptions(Parser.java:299)
at org.apache.commons.cli.Parser.parse(Parser.java:231)
at org.apache.commons.cli.Parser.parse(Parser.java:85)
at org.apache.flume.node.Application.main(Application.java:265)

三 、日志===》Flume===> Kafka

启动zk ./zkServer.sh start

启动kafka
kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic streamingtopic

streaming_project2.conf

exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel

exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /Users/liujingmao/data/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c

exec-memory-kafka.channels.memory-channel.type = memory

exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1

exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动Flume
flume-ng agent
–name exec-memory-kafka
–conf $FLUME_HOME/conf
–conf-file /Users/liujingmao/data/streaming_project2.conf
-Dflume.root.logger=INFO,console

启动kafkaf

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic streamingtopic

SparkStreaming 对接 Kafka数据开发

Spark 2.4 KafkaUtils这个类断然过期了,所以会报错!!

四 、数据清洗

 def createStream(
  ssc: StreamingContext,
  zkQuorum: String,
  groupId: String,
  topics: Map[String, Int],
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): 
ReceiverInputDStream[(String, String)] = 
{
	val kafkaParams = Map[String, String]
	(
 	  "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
  	"zookeeper.connection.timeout.ms" -> "10000"
  	)
	createStream[String, String, StringDecoder, StringDecoder]
	(
 	  ssc, kafkaParams, topics, storageLevel
 	 )
 	 val topicMap = topics.split(",").map((_,numThreads.toInt)).Map
 类似资料: