一、日志生成
1、日志生成器开发之产生url和ip信息
vim generate_log.py
import random
url_paths=[
"class/112.html",
"class/128.html",
"class/145.html",
"class/146.html",
"class/131.html",
"class/130.html",
"learn/821",
"course/list"
]
ip_slices=[
132,156,124,10,29,167,143,187,30,46,552,63,72,98,168
]
def sample_url():
return random.sample(url_paths,1)[0]
def sample_ip():
slice=random.sample(ip_slices,4)
return ".".join([str(item) for item in slice)
def generate_log():
while count>=1:
query_log="{url}\t{ip}".format(url=sample_url(),ip=sample_url())
print query_log
count=count-1
if name==‘main’:
generate_log()#im
2、日志生成器开发之产生referer和状态码信息
3、日志生成器开发之产生日志产生时间
4、将产生的日志到服务器上测试并将日志写入文件中
5、通过Linux定时调试器工具每一分钟产生一批数据
python /Users/liujingmao/data/generate_log.py
*/1 * * * * /Users/liujingmao/data/log_generator.sh
二、使用Flume实时收集日志信息
Streaming_project.conf
access.log ==>控制台输出
exec memory log
#name the componets of Flume
exec-memory-logger.sources = exec-source
exec-memory-logger.sinks = logger-sink
exec-memory-logger.channels = memory-channel
#config source
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /Users/liujingmao/data/access.log
exec-memory-logger.sources.exec-source..shell = /bin/sh -c
#config channel
exec-memory-logger.channels.memory-channel.type=memory
exec-memory-logger.sinks.logger-sink.type=logger
exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.logger-sink.channel=memory
复制这个指令的时候,不要复制\ 后面的空格,否则会报错
flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file /Users/liujingmao/data/streaming_project.conf \
-Dflume.root.logger=INFO,console
bin/flume-ng agent --name exec-memory-logger --conf $FLUME_HOME/conf --conf-file /Users/liujingmao/data/streaming_project.conf -Dflume.root.logger =INFO,console
bin/flume-ng agent -c --name exec-memory-logger --conf -f $FLUME_HOME/conf -n a1 --conf-file /Users/liujingmao/data/streaming_project.conf -Dflume.root.logger =INFO,console
./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
./flume-ng agent -c . -f /opt/apps/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
ERROR node.Application: A fatal error occurred while running. Exception follows.
org.apache.commons.cli.MissingOptionException: Missing required option: n
at org.apache.commons.cli.Parser.checkRequiredOptions(Parser.java:299)
at org.apache.commons.cli.Parser.parse(Parser.java:231)
at org.apache.commons.cli.Parser.parse(Parser.java:85)
at org.apache.flume.node.Application.main(Application.java:265)
三 、日志===》Flume===> Kafka
启动zk ./zkServer.sh start
启动kafka
kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic streamingtopic
streaming_project2.conf
exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel
exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /Users/liujingmao/data/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
exec-memory-kafka.channels.memory-channel.type = memory
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
启动Flume
flume-ng agent
–name exec-memory-kafka
–conf $FLUME_HOME/conf
–conf-file /Users/liujingmao/data/streaming_project2.conf
-Dflume.root.logger=INFO,console
启动kafkaf
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic streamingtopic
SparkStreaming 对接 Kafka数据开发
Spark 2.4 KafkaUtils这个类断然过期了,所以会报错!!
四 、数据清洗
def createStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
):
ReceiverInputDStream[(String, String)] =
{
val kafkaParams = Map[String, String]
(
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000"
)
createStream[String, String, StringDecoder, StringDecoder]
(
ssc, kafkaParams, topics, storageLevel
)
val topicMap = topics.split(",").map((_,numThreads.toInt)).Map