Dpark-AND-Spark
Dpark:Dpark是国内豆瓣公司根据Spark进行的克隆版本的实现
DPark 是一个类似MapReduce 的基于Mesos(Apache 下的一个集群管理器,提供了有效的、跨分布式应用或框架的资源隔离和共享等功能)的集群并行计算框架(Cluster Computing Framework),DPark 是Spark 的Python克隆版本,是一个Python 实现的分布式计算框架,可以非常方便地实现大规模数据处理和低延时的迭代计算。该计算框架类似于MapReduce,但是比其更灵活,可以用Python 非常方便地进行分布式计算,并且提供了更多的功能,以便更好地进行迭代式计算。DPark 由国内的豆瓣公司开发实现和负责维护,据豆瓣公司的描述,目前豆瓣公司内部的绝大多数数据分析都使用DPark 完成,整个项目也正趋于完善。
Dpark克隆与Spark
参考网站:http://suanfazu.com/t/dpark-de-chu-bu-shi-yong/444
本例实现参考自:https://blog.csdn.net/myjiayan/article/details/52463053?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
Spark官方文档:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html
Dpark:https://github.com/douban/dpark
3:官方资料https://github.com/jackfengji/test_pro/wiki
完整数据来源:http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html 或者 (链接:https://pan.baidu.com/s/1mi04sys 密码:3max)
本例数据来源:github数据集 ./DparkAndSpark/data/NASG_LOG_MIN.txt
完整代码可参考本人github: github地址
#!/usr/bin/env python # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: 415900617@qq.com @site: @software: PyCharm @file: Regularization.py @time: 2018/6/7 11:30 @describe: 对目标文件规则化处理---解析访问请求信息 s = '127.0.0.1 - - [01/Aug/1995:00:00:01 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1839' """ # 需要使用spark的ROW方法对返回值处理 # 可参考http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row from pyspark.sql import Row from psutil import long import re import datetime # 定义日期格式 month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7, 'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12} # 进行正则分割--数据清洗 APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S*)\s*(\S*)\s*" (\d{3}) (\S+)' # 处理时间日期格式 def parse_apache_time(s): """ 将Apache时间格式转换为Python datetime对象arg游戏: s (str): Apache时间格式的日期和时间。 Returns: datetime: datetime object 忽略时区 """ return datetime.datetime(int(s[7:11]), month_map[s[3:6]], int(s[0:2]), int(s[12:14]), int(s[15:17]), int(s[18:20])) # 对日志进行解析 def parseApacheLogLine(logline): """ 解析Apache公共日志格式中的一行 Args: logline (str): Apache通用日志格式中的一行文本 Returns: tuple: 一个包含Apache访问日志和1部分的字典,或者原始无效的日志行和0 """ match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) if match is None: return (logline, 0) size_field = match.group(9) if size_field == '-': size = long(0) else: size = long(match.group(9)) # Row方法是spark中的,返回相当于数据库一行记录,参考自开头说明链接 return (Row( host=match.group(1), client_identd=match.group(2), user_id=match.group(3), date_time=parse_apache_time(match.group(4)), method=match.group(5), endpoint=match.group(6), protocol=match.group(7), response_code=int(match.group(8)), content_size=size ), 1)
#!/usr/bin/env python # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: 415900617@qq.com @site: @software: PyCharm @file: DparkAnalysis.py @time: 2018/6/7 11:40 @describe: 初始化RDD并进行相应操作 进行每行记录解析 """ from dpark import DparkContext import os # 导入格式化处理 from DparkAndSpark.Regularization import parseApacheLogLine # 读取日志文件 baseDir = os.path.join('./data/NASA_LOG_MIN.txt') # 初始化dpark dc = DparkContext() # 解析日志 def parseLogs(): """ 读取和解析日志文件 """ # 解析日志 parsed_logs = (dc.textFile(baseDir) .map(parseApacheLogLine) .cache()) # 解析成功 access_logs = (parsed_logs .filter(lambda s: s[1] == 1) .map(lambda s: s[0]) .cache()) # 解析失败 failed_logs = (parsed_logs .filter(lambda s: s[1] == 0) .map(lambda s: s[0])) # 失败条数统计 failed_logs_count = failed_logs.count() if failed_logs_count > 0: print ('Number of invalid logline: %d' % failed_logs.count()) for line in failed_logs.take(20): print ('Invalid logline: %s' % line) print ('*** Read %d lines, successfully parsed %d lines, failed to parse %d lines ***' % (parsed_logs.count(), access_logs.count(), failed_logs.count())) return parsed_logs, access_logs, failed_logs parsed_logs, access_logs, failed_logs = parseLogs()
# !/usr/bin/env python3 # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: 415900617@qq.com @site: @software: PyCharm @file: StatisticAnalysis.py @time: 2018/6/6 16:07 @describe:统计分析 """ from DparkAndSpark.DparkAnalysis import access_logs import matplotlib.pyplot as plt # 1-对web日志返回的结果做统计,返回内容大小,返回服务器返回的数据大小TOP5。 def viewCount(): content_sizes = access_logs.map(lambda log: log.content_size).cache() print('1===>Content Size Avg: %i,Top 5: %s' % ( content_sizes.reduce(lambda a, b: a + b) / content_sizes.count(), content_sizes.top(5),)) # 2-返回状态码分析 def vieWCode(): responseCodeToCount = (access_logs .map(lambda log: (log.response_code, 1)) .reduceByKey(lambda a, b : a + b) .cache()) responseCodeToCountList = responseCodeToCount.take(100) # 返回状态码类型统计 print('2===>Found %d response codes' % len(responseCodeToCountList)) # 统计返回不同类型状态码条数 print('3===>Response Code Counts: %s' % responseCodeToCountList) # 返回状态码类型统计-图形绘制 def pie_pct_format(value): """ Determine the appropriate format string for the pie chart percentage label Args: value: value of the pie slice Returns: str: formated string label; if the slice is too small to fit, returns an empty string for label """ return '' if value < 7 else '%.0f%%' % value labels = responseCodeToCount.map(lambda s: s[0]).collect() count = access_logs.count() fracs = responseCodeToCount.map(lambda s: (float(s[1]) / count)).collect() fig = plt.figure(figsize=(10, 10), facecolor='white', edgecolor='white') colors = ['yellowgreen', 'lightskyblue', 'gold', 'purple', 'lightcoral', 'yellow', 'black'] explode = (0.05, 0.05, 0.1, 0, 0) patches, texts, autotexts = plt.pie(fracs, labels=labels, colors=colors, explode=explode, autopct=pie_pct_format, shadow=False, startangle=125) for text, autotext in zip(texts, autotexts): if autotext.get_text() == '': text.set_text('') # If the slice is small to fit, don't show a text label plt.legend(labels, loc=(0.80, -0.1), shadow=True) plt.savefig('./img/code.png') # 不同类型状态码 labels = responseCodeToCount.map(lambda s: s[0]).collect() print("4===>Codes:", labels) # 所占比例 count = access_logs.count() fracs = responseCodeToCount.map(lambda s: (float(s[1]) / count)).collect() print("5===>Ratio:", list(fracs)) # 百分比 print("6===>percentage:", dict(zip(labels,["{:.2%}".format(round(x,3)) for x in list(fracs)]))) # 3-服务器访问频次超过10次以上 def viewCountSum(): # 任何访问服务器的主机超过10次。. hostCountPairTuple = access_logs.map(lambda log: (log.host, 1)) hostSum = hostCountPairTuple.reduceByKey(lambda a, b: a + b) hostMoreThan10 = hostSum.filter(lambda s: s[1] > 10) hostsPick20 = (hostMoreThan10 .map(lambda s: s[0]) .take(20)) print('7===>Any 20 hosts that have accessed more then 10 times: %s' % hostsPick20) endpoints = (access_logs .map(lambda log: (log.endpoint, 1)) .reduceByKey(lambda a, b: a + b) .cache()) ends = endpoints.map(lambda s: s[0]).collect() counts = endpoints.map(lambda s: s[1]).collect() print(ends) print(counts) # 图像绘制 fig = plt.figure(figsize=(8, 4.2), facecolor='white', edgecolor='white') plt.axis([0, len(ends), 0, max(counts)]) plt.grid(b=True, which='major', axis='y') plt.xlabel('Endpoints') plt.ylabel('Number of Hits') plt.plot(counts) plt.savefig("./img/Endpoints.png") # 4-统计访问hosts def viewHostsCount(): # 不同的Host个数统计 hosts = access_logs.map(lambda log: (log.host, 1)) uniqueHosts = hosts.reduceByKey(lambda a, b: a + b) uniqueHostCount = uniqueHosts.count() print('8===>Unique hosts: %d' % uniqueHostCount) # 每一天有多少不同的Host dayToHostPairTuple = access_logs.map(lambda log: (log.date_time.day, log.host)) dayGroupedHosts = dayToHostPairTuple.groupByKey() dayHostCount = dayGroupedHosts.map(lambda s: (s[0], len(set(s[1])))) # spark 中使用sortByKey,但是dpark中没有该方法,暂时不知替代方法时什么 # dailyHosts = dayHostCount.sortByKey().cache() dailyHosts = dayHostCount.groupByKey().sort().cache() dailyHostsList = dailyHosts.take(30) print('9===>Unique hosts per day: %s' % dailyHostsList) # 统计每一天单个Host平均发起的请求数 dayAndHostTuple = access_logs.map(lambda log: (log.date_time.day, log.host)) groupedByDay = dayAndHostTuple.groupByKey() # spark 中使用sortByKey,但是dpark中没有该方法,所以用groupBykey().sort()实现 sortedByDay = groupedByDay.sort() avgDailyReqPerHost = sortedByDay.map(lambda s: (s[0], len(s[1]) / len(set(s[1])))).cache() avgDailyReqPerHostList = avgDailyReqPerHost.take(30) print('10===>Average number of daily requests per Hosts is %s' % avgDailyReqPerHostList) # 5-统计404访问 def viewFind404(): badRecords = (access_logs .filter(lambda log: log.response_code == 404) .cache()) print('11===>Found %d 404 URLs' % badRecords.count()) # 404urls badEndpoints = badRecords.map(lambda log: log.endpoint) # 去重操作,但是dpark没有该方法 # badUniqueEndpoints = badEndpoints.distinct() # badUniqueEndpointsPick40 = badUniqueEndpoints.take(10) # 所以使用list数据结构中的set进行去重 badUniqueEndpointsPick40 = list(set(badEndpoints.take(10))) print('12===>404 URLS: %s' % badUniqueEndpointsPick40) # top20 404 返回 badEndpointsCountPairTuple = badRecords.map(lambda log: (log.endpoint, 1)) badEndpointsSum = badEndpointsCountPairTuple.reduceByKey(lambda a, b: a + b) ''' # 降序排序 spark方法: badEndpointsTop20 = badEndpointsSum.takeOrdered(20, lambda s: -1 * s[1]) ''' # dpark badEndpointsTop20 = badEndpointsSum.sort(lambda s: -1 * s[1]).take(20) print('13===>Top-20 404 URLs: %s' % badEndpointsTop20) # 找出前25个返回404的host errHostsCountPairTuple = badRecords.map(lambda log: (log.host, 1)) errHostsSum = errHostsCountPairTuple.reduceByKey(lambda a, b: a + b) errHostsTop25 = errHostsSum.sort(lambda s: -1 * s[1]).take(25) print('14===>Top-25 hosts that generated errors: %s' % errHostsTop25) # 按天统计404 errDateCountPairTuple = badRecords.map(lambda log: (log.date_time.day, 1)) errDateSum = errDateCountPairTuple.reduceByKey(lambda a, b: a + b) errDateSorted = (errDateSum .groupByKey().sort() .cache()) errByDate = errDateSorted.collect() print('15===>404 Errors by day: %s' % errByDate) topErrDate = errDateSorted.sort(lambda s: -1 * s[1]).take(5) print('16===>Top Five dates for 404 requests: %s' % topErrDate) # 按小时分类 hourCountPairTuple = badRecords.map(lambda log: (log.date_time.hour, 1)) hourRecordsSum = hourCountPairTuple.reduceByKey(lambda a, b: a + b) hourRecordsSorted = (hourRecordsSum .groupByKey().sort() .cache()) errHourList = hourRecordsSorted.collect() print('17===>Top hours for 404 requests: %s' % errHourList) # 绘制图像 hoursWithErrors404 = hourRecordsSorted.map(lambda s: s[0]).collect() errors404ByHours = hourRecordsSorted.map(lambda s: s[1]).collect() fig = plt.figure(figsize=(8, 4.2), facecolor='white', edgecolor='white') plt.axis([0, max(hoursWithErrors404), 0, max(x for x in errors404ByHours)[0]]) plt.grid(b=True, which='major', axis='y') plt.xlabel('Hour') plt.ylabel('404 Errors') plt.plot(hoursWithErrors404, errors404ByHours) plt.savefig("./img/hours404.png") # 统计返回状态不是200的站点 def viewCountNot200(): not200 = access_logs.filter(lambda x: x.response_code != 200) endpointCountPairTuple = not200.map(lambda x: (x.endpoint, 1)) endpointSum = endpointCountPairTuple.reduceByKey(lambda a, b: a + b) topTenErrURLs = endpointSum.sort(lambda s: -1 * s[1]).take(10) print('18===>Top Ten failed URLs: %s' % topTenErrURLs) # 统计唯一的host站点 def viewHostOne(): hosts = access_logs.map(lambda log: (log.host, 1)) uniqueHosts = hosts.reduceByKey(lambda a, b: a + b) uniqueHostCount = uniqueHosts.count() print('19===>Unique hosts: %d' % uniqueHostCount) # 统计每日访问数量 def viewHostByDay(): dayToHostPairTuple = access_logs.map(lambda log: (log.date_time.day, log.host)) dayGroupedHosts = dayToHostPairTuple.groupByKey() dayHostCount = dayGroupedHosts.map(lambda s: (s[0], len(set(s[1])))) dailyHosts = dayHostCount.sort(lambda item: item[0]).cache() dailyHostsList = dailyHosts.take(30) print('20===>Unique hosts per day: %s' % dailyHostsList) # 每个主机的平均请求数 def viewHostByDayAvg(): dayAndHostTuple = access_logs.map(lambda log: (log.date_time.day, log.host)) groupedByDay = dayAndHostTuple.groupByKey().map(lambda s: (s[0], int(len(s[1]) * 1.0 / len(set(s[1]))))) sortedByDay = groupedByDay.sort(lambda item: item[0]) avgDailyReqPerHost = sortedByDay.cache() avgDailyReqPerHostList = avgDailyReqPerHost.take(30) print('21===>Average number of daily requests per Hosts is %s' % avgDailyReqPerHostList) if __name__ == '__main__': viewCount() vieWCode() viewCountSum() viewHostsCount() viewFind404() viewCountNot200() viewHostOne() viewHostByDay() viewHostByDayAvg()
#!/usr/bin/env python # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: 415900617@qq.com @site: @software: PyCharm @file: StatisticAnalysis404.py @time: 2018/6/7 17:03 @describe: 探索404响应代码 参考自:https://blog.csdn.net/myjiayan/article/details/52463053?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io """ from DparkAndSpark.DparkAnalysis import access_logs import matplotlib.pyplot as plt # 404错误解析 def find404Error(): # 查找404记录条数 badRecords = access_logs.filter(lambda log:log.response_code == 404).cache() print('1=====>Found %d 404 URLs' % badRecords.count()) # 列出404访问记录 badEndpoints = badRecords.map(lambda log:(log.endpoint,1)) badUniqueEndpoints = badEndpoints.reduceByKey(lambda a,b:a+b).map(lambda item:item[0]) badUniqueEndpointsPick40 = badUniqueEndpoints.take(40) print('2=====>404 URLS: %s' % badUniqueEndpointsPick40) # 列出top20 404错误端点 badEndpointsCountPairTuple = badRecords.map(lambda log:(log.endpoint,1)) badEndpointsSum = badEndpointsCountPairTuple.reduceByKey(lambda a,b:a+b) badEndpointsTop20 = badEndpointsSum.sort(lambda item:-1*item[1]).take(20) print('3=====>Top-20 404 URLs: %s' % badEndpointsTop20) # 列出top20 404错误主机 errHostsCountPairTuple = badRecords.map(lambda log:(log.host,1)) errHostsSum = errHostsCountPairTuple.reduceByKey(lambda a,b:a+b) errHostsTop20 = errHostsSum.sort(lambda item:-1*item[1]).take(20) print('4=====>Top-20 hosts that generated errors: %s' % errHostsTop20) # 列出404每天的响应代码 errDateCountPairTuple = badRecords.map(lambda log:(log.date_time.day,1)) errDateSum = errDateCountPairTuple.reduceByKey(lambda a,b:a+b) errDateSorted = errDateSum.sort(lambda item:item[0]).cache() errByDate = errDateSorted.collect() print('5=====>404 Errors by day: %s' % errByDate) # 列出404前5天 topErrDate = errDateSorted.sort(lambda item:item[1]*-1).take(5) print('6=====>Top Five dates for 404 requests: %s' % topErrDate) # 每小时404错误 hourCountPairTuple = badRecords.map(lambda log:(log.date_time.hour,1)) hourRecordsSum = hourCountPairTuple.reduceByKey(lambda a,b:a+b) hourRecordsSorted = hourRecordsSum.sort(lambda item:item[0]).cache() errHourList = hourRecordsSorted.collect() print('7=====>Top hours for 404 requests: %s' % errHourList) hoursWithErrors404 = hourRecordsSorted.map(lambda item:item[0]).collect() errors404ByHours = hourRecordsSorted.map(lambda item:item[1]).collect() # 每小时404错误-可视化 fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white') plt.axis([0, max(hoursWithErrors404), 0, max(errors404ByHours)]) plt.grid(b=True, which='major', axis='y') plt.xlabel('Hour') plt.ylabel('404 Errors') plt.plot(hoursWithErrors404, errors404ByHours) plt.savefig("./img/404DayHour.png") if __name__ == '__main__': find404Error()