Hadoop为Java外的其他语言,提供了一个友好的实现mapreduce的框架,即Hadoop-Streaming。Hadoop-Streaming只需遵循从标准输入stdin读入,写出到标准输出stdout即可,并且它还提供了丰富的参数控制,来实现许多mapreduce的复杂操作。Mrjob对Hadoop-Streaming进行了封装,是一个实现mapreduce任务的python开源框架。
下面我们尝试用Mrjob来实现数据清洗、时间拉链等结构化数据预处理功能,从而介绍一些mrjob中比较隐蔽的功能。
Hadoop版本:hadoop2.7.4
Python版本:python3.6
Pip安装mrjob、lxml、hashlib等库
在mrjob.job中封装了MRJob类,我们只需将其导入并继承它,就可以开始编写我们的MR程序了。我们可以编写其中的mapper、combiner、reducer方法来实现单步的MR任务,每个方法的输入与yield输出都是以key,value键值对的形式。在每个方法的前后,都有对应的*_init() 和 *_final()方法。
想要对方法改名或是实现多步任务,需要导入mrjob.step中的MRStep,并在我们的类中编写steps方法来说明执行的顺序。
基本的介绍各个论坛和博客已有很多,在这里就不赘述了。
对于关系型数据库来说,外部数据文件通常不一定能直接满足加载的要求,因此对数据文件进行清洗是十分重要的。同时在数据仓库中,经常会用历史数据和时间维度做数据分析。而在做ETL入仓的时候,保存历史数据最常见的方案是使用拉链表进行存储。在这里我们如果用HDFS来保存清洗后的文件,以便于比较出增量和物理删除的数据,也需要对文件中的数据进行时间拉链的实现。
实际的工业中步骤较为复杂,这里我们简化一下。我们将输入的文件都放在/user/tmpinput下(包括当日数据和历史全量数据),处理完的数据输出到/user/tmpoutput下。数据表模型相关的信息,我们以xml的形式记录在WB_USER_INFO.fmt中,这是一张微博用户信息表,数据原始文件input.txt是爬虫爬下来的财经微博top100的用户基本信息。模型表结构信息如下:
字段名 | 字段中文名 | 字段类型 | 字段长度 | 字段小数位长度 | PK值 | 清洗标志 |
---|---|---|---|---|---|---|
User_Id | 用户ID | VARCHAR | 10 | 0 | Y | |
User_NickName | 昵称 | VARCHAR | 50 | 0 | A | |
User_Gender | 性别 | VARCHAR | 2 | 0 | ||
Province | 所在省 | VARCHAR | 6 | 0 | ||
City | 所在城市 | VARCHAR | 6 | 0 | ||
Signature | 个性签名 | VARCHAR | 500 | 0 | B | |
Birthday | 生日 | VARCHAR | 40 | 0 | ||
Num_Tweets | 微博数 | int | 11 | 0 | ||
Num_Follows | 关注数 | int | 11 | 0 | ||
Num_Fans | 粉丝数 | int | 11 | 0 | ||
Sex_Orientation | 性取向 | VARCHAR | 12 | 0 | ||
Marriage | 婚姻状况 | VARCHAR | 12 | 0 | ||
URL | 首页链接 | VARCHAR | 500 | 0 |
接下来的介绍分三个部分,一是作为主程序的PreProcessMain,它负责解析存储表结构的fmt文件,并调用mapreduce任务;二是实现mapreduce任务的PreProcessMaster,它负责在map中实现数据清洗,在reduce中实现时间拉链制作以及不同数据的多路输出;三是简单介绍dataClean函数中的一个值得注意的清洗方式。
首先我们需要一个主类PreProcessMain,它负责去解析存储表信息的xml文件,并根据信息的不同调用相应的MR任务。
这里我们取出xml中的表中文名、英文名、字段个数、表类型(微博用户信息表为时间拉链表)直接作为主类的属性,把字段信息存成字典作为主类的属性。我这里将主键和清洗类型各组成一个字典,以字段序号为key;将字段名、中文名、长度、精度放进一个嵌套的字典中,最外层的key同样为字段序号。存成字典能很方便的和json格式进行转换,便于传递。
from lxml import objectify
def loadXmlCfg(self,fmtpath):
#解析fmt文件拿到表结构
doc=objectify.parse(open(fmtpath,'r',encoding='UTF-8'))
root=doc.getroot()
for child in root.getchildren():
if child.tag=="gDecl" or child.tag=="DataBase":
for field in child.getchildren():
if field.tag=="ExtrFileName":
self.ExtrFileName=field.pyval #表英文名:系统名_表名
elif field.tag=="ExtrFileDesc":
self.ExtrFileDesc=field.pyval #表中文名
elif field.tag=="Tables":
self.Tables=field.pyval #表名
elif child.tag=="Table":
for field in child.getchildren():
#循环取出表中每个字段信息
if field.tag=="ColNum":
self.ColNum=field.pyval #字段个数
elif field.tag=="FilterCond":
self.FilterCond=field.pyval #表类型
elif field.tag[0:5]=="Field":
index=int(field.tag[5:])
tabfield={}
for tableField in field.getchildren():
if tableField.tag=="Name" or tableField.tag=="Desc" or tableField.tag=="Len" or tableField.tag=="DecimalLen":
tabfield.update({tableField.tag:tableField.pyval})
elif tableField.tag=="Type":
self.CleanType.update({index:tableField.pyval}) #清洗标志及字段类型字典
elif tableField.tag=="PrimaryKey":
self.PrimaryKey.update({index:tableField.pyval}) #主键字典
self.FieldsStr.update({index:tabfield}) #字段名、中文、长度、精度字典
在主类PreProcessMain中调用mrjob的程序,需要为我们写的PreProcessMaster(PreProcessMaster为继承MRJob的类)创建一个runner,并在PreProcessMaster类中设置需要传递的参数。设置参数使用configure_args方法,具体如下:
def configure_args(self):
#设定接收的参数
super(PreProcessMaster,self).configure_args()
self.add_passthru_arg('--ColNum')
self.add_passthru_arg('--tables')
self.add_passthru_arg('--recorrddate')
self.add_passthru_arg('--PrimaryKey')
self.add_passthru_arg('--CleanType')
self.add_passthru_arg('--FieldsStr')
self.add_passthru_arg('--olddir')
在主类中创建runner并传参,方法如下(其中的olddir用于识别历史全量,值为”/user/tmpinput/part”;recorrddate为数据文件日期):
from PreProcessMaster import PreProcessMaster
import json
if self.FilterCond==100:
#调拉链表MR
prepromaster=PreProcessMaster(args=['-r','hadoop','hdfs:///user/tmpinput','-o','hdfs:///user/tmpoutput','--ColNum='+str(self.ColNum),'--tables='+self.tables,'--recorrddate='+self.recorrddate,'--PrimaryKey='+json.dumps(self.PrimaryKey),'--CleanType='+json.dumps(self.CleanType),'--FieldsStr='+json.dumps(self.FieldsStr),'--olddir='+self.olddir])
with prepromaster.make_runner() as runner:
runner.run()
首先我们需要输出的内容是清洗后的每行数据,而mrjob默认输出的是键值对,因此我们需要先修改PreProcessMaster类的输出协议。输入协议默认为读入文件的每一行,不需要修改。
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol
import hashlib
import json
import os
import datetime
class PreProcessMaster(MRJob):
#设置输出模式为值,而非键值对
OUTPUT_PROTOCOL = RawValueProtocol
接着在进入mapper之前,我们需要对传入的参数进行接收。传入的参数会储存在self.option中,我们需要将json格式转回python的字典。
def mapper_init(self):
#接收参数并初始化为自身属性
self.ColNum=int(self.options.ColNum)
self.tables = self.options.tables
self.recorrddate = self.options.recorrddate
self.PrimaryKey = json.loads(self.options.PrimaryKey)
self.CleanType = json.loads(self.options.CleanType)
self.FieldsStr = json.loads(self.options.FieldsStr)
self.olddir = self.options.olddir
接下来我们进入mapper,默认输入协议中value为文件的一行,没有key。我们将一行数据切分,并根据输入的路径判断数据是当日文件还是上日历史全量,并打上标记。由于不同版本的mrjob存储输入路径的变量名不同,这里我们将mapreduce_map_input_file和map_input_file都尝试一次。
def MasterMapper(self,_,line):
#分割一行数据,若字段个数正确,则对每个字段清洗
data=line.split("|!")
rowAfterClean=[]
PK=""
newdata=True
try:
input_dir=os.environ['mapreduce_map_input_file']
except KeyError:
input_dir=os.environ['map_input_file']
if self.olddir in input_dir:
newdata=False
如果为今日数据,我们将数据和对应的清洗方式、长度信息等按字段传入清洗函数,返回到col并装入rowAfterClean列表,并将主键拼成一个字符串PK作为传入reducer的key。同时为清洗后数据用MD5加密,便于比较增量。最后加入时间拉链的日期字段(开始日期、结束日期、有效日期)。
若为历史全量中的数据,则不做其他处理,只拼接作为key的主键字符串PK。
最终传入reducer的key为字符串PK,value为由rowAfterClean列表和新旧数据标识组成的列表。
if newdata==True:
if len(data)==self.ColNum:
for i in range(self.ColNum):
col=dataClean(data[i],self.CleanType[str(i+1)],self.FieldsStr[str(i+1)])
rowAfterClean.append(col)
if str(i+1) in self.PrimaryKey.keys():
PK=PK+"|!"+col
md=hashlib.md5()
md.update("|!".join(rowAfterClean).encode())
subPK=md.hexdigest()
rowAfterClean.append(subPK)
rowAfterClean.append(self.recorrddate)
rowAfterClean.append("29991231")
rowAfterClean.append(self.recorrddate)
else:
for i in range(self.ColNum+4):
rowAfterClean.append(data[i])
if str(i + 1) in self.PrimaryKey.keys():
PK = PK + "|!" + data[i]
yield PK,[rowAfterClean,newdata]
Reduce部分的程序会按key把value汇总成一个迭代器,我们先将其转换为列表再进行处理。这个列表中将包含同一主键数据的今日数据和历史全量数据,我们最终的目的是将其处理成以下五个部分:今日全量todaydata、今日增量everydaydata、历史全量alldata、物理删除数据physicaldelete和主键重复数据duplicatedata。
首先根据新旧标识将新旧数据分开,接着在旧数据中取出开链数据(结束日期为29991231),其余直接装入历史全量。
def MasterReducer(self,key,value):
"""将迭代器转为列表并分目录输出"""
datalist=list(value)
newrows=[]
oldrows=[]
oldrow=[]
newrow=[]
everydaydata=[]
physicaldelete=[]
alldata=[]
duplicatedata=[]
todaydata=[]
for row in datalist:
if row[1] == True:
newrows.append(row[0])
elif row[1]==False:
oldrows.append(row[0])
for R in oldrows:
if R[int(self.options.ColNum)+2]=="29991231":
oldrow=R
else:
alldata.append(R)
其次,判断新数据条数,若大于1则取最大那条,并同时把所有数据装入duplicatedata,若新数据非空先装入今日全量todaydata。然后根据取出的开链就数据oldrow和新数据newrow,比较其MD5做处理:
if len(newrows)==1:
newrow=newrows[0]
elif len(newrows)>1:
newrow = max(newrows)
duplicatedata=newrows
if newrow!=[]:
todaydata=newrow[:-3]
if len(oldrow)==0:
everydaydata=newrow[:-3]
alldata.append(newrow)
elif len(newrow)==0 and len(oldrow)!=0:
oldrow[int(self.options.ColNum) + 2] = (datetime.date(int(self.options.recorrddate[:4]),int(self.options.recorrddate[4:6]),int(self.options.recorrddate[6:])) - datetime.timedelta(days=1)).strftime("%Y%m%d")
physicaldelete = oldrow[:-3]
alldata.append(oldrow)
elif newrow[int(self.options.ColNum)]==oldrow[int(self.options.ColNum)]:
oldrow[int(self.options.ColNum)+3]=self.options.recorrddate
alldata.append(oldrow)
elif newrow[int(self.options.ColNum)]!=oldrow[int(self.options.ColNum)]:
oldrow[int(self.options.ColNum) + 2] = (datetime.date(int(self.options.recorrddate[:4]),int(self.options.recorrddate[4:6]),int(self.options.recorrddate[6:])) - datetime.timedelta(days=1)).strftime("%Y%m%d")
alldata.append(oldrow)
alldata.append(newrow)
everydaydata = newrow[:-3]
接下来要设置Hadoop的多路输出,我们需要用到一个mrjob自带的jar包(http://empiricalresults.github.io/nicknack/ )。下载后放入PreProcessMaster所在目录下,并进行设置:
class PreProcessMaster(MRJob):
#设置输出模式为值,而非键值对,并设置多路输出jar包
OUTPUT_PROTOCOL = RawValueProtocol
LIBJARS = ['nicknack-1.0.1.jar']
HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'
再回到reducer,输出的格式为:key部分填None;value部分用\t分开,前部分为输出目录的子目录名,后部分为清洗后的一行数据(数据中不能带有换行符,多行数据分多次yield)。
if todaydata!=[]:
yield None,"todaydata"+"\t"+"|!".join(todaydata)
if everydaydata!=[]:
yield None,"everydaydata"+"\t"+"|!".join(everydaydata)
if physicaldelete != []:
yield None,"physicaldelete"+"\t"+"|!".join(physicaldelete)
if alldata != []:
for R in alldata:
yield None,"alldata"+"\t"+"|!".join(R)
if duplicatedata != []:
for R in duplicatedata:
yield None,"duplicatedata"+"\t"+"|!".join(R)
最后,用steps方法写明执行顺序。
def steps(self):
#设置MR步骤
return [MRStep(mapper_init=self.mapper_init,mapper=self.MasterMapper,reducer=self.MasterReducer),]
dataClean函数根据自身业务需要编写,建议直接放在PreProcessMaster.py中,这样不用在各个节点中一式多份。
值得一提的是,涉及到字符串截断的清洗不能直接用python的截断方法。对于一个中文字,python计算其长度是按1来计算的,实际的一个中文显示时按两个字符计算。这时我们需要一个计算长度的函数,以下是该函数以及截断清洗的方式(cleantyp为传入的清洗方式的参数,colinfo为记录字段信息的字典):
def length(data):
l=len(data.encode("utf-8"))
return int((l-len(data))/2+len(data)
#按长度Len截断
if cleantyp=="A":
l=len(data)
while length(data)>colinfo["Len"]:
l = l - 1
data=data[:l]
return data