当前位置: 首页 > 工具软件 > mrjob > 使用案例 >

Mrjob实现Hadoop结构化数据预处理

邵亦
2023-12-01

前言

Hadoop为Java外的其他语言,提供了一个友好的实现mapreduce的框架,即Hadoop-Streaming。Hadoop-Streaming只需遵循从标准输入stdin读入,写出到标准输出stdout即可,并且它还提供了丰富的参数控制,来实现许多mapreduce的复杂操作。Mrjob对Hadoop-Streaming进行了封装,是一个实现mapreduce任务的python开源框架。

下面我们尝试用Mrjob来实现数据清洗、时间拉链等结构化数据预处理功能,从而介绍一些mrjob中比较隐蔽的功能。

一、环境

Hadoop版本:hadoop2.7.4
Python版本:python3.6
Pip安装mrjob、lxml、hashlib等库

二、Mrjob基本框架

在mrjob.job中封装了MRJob类,我们只需将其导入并继承它,就可以开始编写我们的MR程序了。我们可以编写其中的mapper、combiner、reducer方法来实现单步的MR任务,每个方法的输入与yield输出都是以key,value键值对的形式。在每个方法的前后,都有对应的*_init() 和 *_final()方法。

想要对方法改名或是实现多步任务,需要导入mrjob.step中的MRStep,并在我们的类中编写steps方法来说明执行的顺序。

基本的介绍各个论坛和博客已有很多,在这里就不赘述了。

三、实验基本步骤

对于关系型数据库来说,外部数据文件通常不一定能直接满足加载的要求,因此对数据文件进行清洗是十分重要的。同时在数据仓库中,经常会用历史数据和时间维度做数据分析。而在做ETL入仓的时候,保存历史数据最常见的方案是使用拉链表进行存储。在这里我们如果用HDFS来保存清洗后的文件,以便于比较出增量和物理删除的数据,也需要对文件中的数据进行时间拉链的实现。

实际的工业中步骤较为复杂,这里我们简化一下。我们将输入的文件都放在/user/tmpinput下(包括当日数据和历史全量数据),处理完的数据输出到/user/tmpoutput下。数据表模型相关的信息,我们以xml的形式记录在WB_USER_INFO.fmt中,这是一张微博用户信息表,数据原始文件input.txt是爬虫爬下来的财经微博top100的用户基本信息。模型表结构信息如下:

字段名字段中文名字段类型字段长度字段小数位长度PK值清洗标志
User_Id用户IDVARCHAR100Y
User_NickName昵称VARCHAR500A
User_Gender性别VARCHAR20
Province所在省VARCHAR60
City所在城市VARCHAR60
Signature个性签名VARCHAR5000B
Birthday生日VARCHAR400
Num_Tweets微博数int110
Num_Follows关注数int110
Num_Fans粉丝数int110
Sex_Orientation性取向VARCHAR120
Marriage婚姻状况VARCHAR120
URL首页链接VARCHAR5000

接下来的介绍分三个部分,一是作为主程序的PreProcessMain,它负责解析存储表结构的fmt文件,并调用mapreduce任务;二是实现mapreduce任务的PreProcessMaster,它负责在map中实现数据清洗,在reduce中实现时间拉链制作以及不同数据的多路输出;三是简单介绍dataClean函数中的一个值得注意的清洗方式。

(1)PreProcessMain

首先我们需要一个主类PreProcessMain,它负责去解析存储表信息的xml文件,并根据信息的不同调用相应的MR任务。

这里我们取出xml中的表中文名、英文名、字段个数、表类型(微博用户信息表为时间拉链表)直接作为主类的属性,把字段信息存成字典作为主类的属性。我这里将主键和清洗类型各组成一个字典,以字段序号为key;将字段名、中文名、长度、精度放进一个嵌套的字典中,最外层的key同样为字段序号。存成字典能很方便的和json格式进行转换,便于传递。

from lxml import objectify 

def loadXmlCfg(self,fmtpath):
    #解析fmt文件拿到表结构
    doc=objectify.parse(open(fmtpath,'r',encoding='UTF-8'))
    root=doc.getroot()
    for child in root.getchildren():
        if child.tag=="gDecl" or child.tag=="DataBase":
            for field in child.getchildren():
                if field.tag=="ExtrFileName":
                    self.ExtrFileName=field.pyval   #表英文名:系统名_表名
                elif field.tag=="ExtrFileDesc":
                    self.ExtrFileDesc=field.pyval   #表中文名
                elif field.tag=="Tables":
                    self.Tables=field.pyval         #表名
        elif child.tag=="Table":
            for field in child.getchildren():
                #循环取出表中每个字段信息
                if field.tag=="ColNum":
                    self.ColNum=field.pyval         #字段个数
                elif field.tag=="FilterCond":
                    self.FilterCond=field.pyval     #表类型
                elif field.tag[0:5]=="Field":
                    index=int(field.tag[5:])
                    tabfield={}
                    for tableField in field.getchildren():
                        if tableField.tag=="Name" or tableField.tag=="Desc" or tableField.tag=="Len" or tableField.tag=="DecimalLen":
                            tabfield.update({tableField.tag:tableField.pyval})
                        elif tableField.tag=="Type":
                            self.CleanType.update({index:tableField.pyval})   #清洗标志及字段类型字典
                        elif tableField.tag=="PrimaryKey":
                            self.PrimaryKey.update({index:tableField.pyval})  #主键字典
                    self.FieldsStr.update({index:tabfield})                   #字段名、中文、长度、精度字典

在主类PreProcessMain中调用mrjob的程序,需要为我们写的PreProcessMaster(PreProcessMaster为继承MRJob的类)创建一个runner,并在PreProcessMaster类中设置需要传递的参数。设置参数使用configure_args方法,具体如下:

def configure_args(self):
    #设定接收的参数
    super(PreProcessMaster,self).configure_args()
    self.add_passthru_arg('--ColNum')
    self.add_passthru_arg('--tables')
    self.add_passthru_arg('--recorrddate')
    self.add_passthru_arg('--PrimaryKey')
    self.add_passthru_arg('--CleanType')
    self.add_passthru_arg('--FieldsStr')
    self.add_passthru_arg('--olddir')

在主类中创建runner并传参,方法如下(其中的olddir用于识别历史全量,值为”/user/tmpinput/part”;recorrddate为数据文件日期):

from PreProcessMaster import PreProcessMaster
import json 
if self.FilterCond==100:
    #调拉链表MR
    prepromaster=PreProcessMaster(args=['-r','hadoop','hdfs:///user/tmpinput','-o','hdfs:///user/tmpoutput','--ColNum='+str(self.ColNum),'--tables='+self.tables,'--recorrddate='+self.recorrddate,'--PrimaryKey='+json.dumps(self.PrimaryKey),'--CleanType='+json.dumps(self.CleanType),'--FieldsStr='+json.dumps(self.FieldsStr),'--olddir='+self.olddir])
    with prepromaster.make_runner() as runner:
        runner.run()

(2)PreProcessMaster

首先我们需要输出的内容是清洗后的每行数据,而mrjob默认输出的是键值对,因此我们需要先修改PreProcessMaster类的输出协议。输入协议默认为读入文件的每一行,不需要修改。

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol
import  hashlib
import json
import os
import datetime
class PreProcessMaster(MRJob):

    #设置输出模式为值,而非键值对
    OUTPUT_PROTOCOL = RawValueProtocol

接着在进入mapper之前,我们需要对传入的参数进行接收。传入的参数会储存在self.option中,我们需要将json格式转回python的字典。

def mapper_init(self):
    #接收参数并初始化为自身属性
    self.ColNum=int(self.options.ColNum)
    self.tables = self.options.tables
    self.recorrddate = self.options.recorrddate
    self.PrimaryKey = json.loads(self.options.PrimaryKey)
    self.CleanType = json.loads(self.options.CleanType)
    self.FieldsStr = json.loads(self.options.FieldsStr)
    self.olddir = self.options.olddir

接下来我们进入mapper,默认输入协议中value为文件的一行,没有key。我们将一行数据切分,并根据输入的路径判断数据是当日文件还是上日历史全量,并打上标记。由于不同版本的mrjob存储输入路径的变量名不同,这里我们将mapreduce_map_input_file和map_input_file都尝试一次。

def MasterMapper(self,_,line):
    #分割一行数据,若字段个数正确,则对每个字段清洗
    data=line.split("|!")
    rowAfterClean=[]
    PK=""
    newdata=True
    try:
        input_dir=os.environ['mapreduce_map_input_file']
    except KeyError:
        input_dir=os.environ['map_input_file']
    if self.olddir in input_dir:
        newdata=False

如果为今日数据,我们将数据和对应的清洗方式、长度信息等按字段传入清洗函数,返回到col并装入rowAfterClean列表,并将主键拼成一个字符串PK作为传入reducer的key。同时为清洗后数据用MD5加密,便于比较增量。最后加入时间拉链的日期字段(开始日期、结束日期、有效日期)。

若为历史全量中的数据,则不做其他处理,只拼接作为key的主键字符串PK。

最终传入reducer的key为字符串PK,value为由rowAfterClean列表和新旧数据标识组成的列表。

if newdata==True:

    if len(data)==self.ColNum:
        for i in range(self.ColNum):
            col=dataClean(data[i],self.CleanType[str(i+1)],self.FieldsStr[str(i+1)])
            rowAfterClean.append(col)
            if str(i+1) in self.PrimaryKey.keys():
                PK=PK+"|!"+col
        md=hashlib.md5()
        md.update("|!".join(rowAfterClean).encode())
        subPK=md.hexdigest()
        rowAfterClean.append(subPK)
        rowAfterClean.append(self.recorrddate)
        rowAfterClean.append("29991231")
        rowAfterClean.append(self.recorrddate)

else:

    for i in range(self.ColNum+4):
        rowAfterClean.append(data[i])
        if str(i + 1) in self.PrimaryKey.keys():
            PK = PK + "|!" + data[i]
yield PK,[rowAfterClean,newdata]

Reduce部分的程序会按key把value汇总成一个迭代器,我们先将其转换为列表再进行处理。这个列表中将包含同一主键数据的今日数据和历史全量数据,我们最终的目的是将其处理成以下五个部分:今日全量todaydata、今日增量everydaydata、历史全量alldata、物理删除数据physicaldelete和主键重复数据duplicatedata。

首先根据新旧标识将新旧数据分开,接着在旧数据中取出开链数据(结束日期为29991231),其余直接装入历史全量。

def MasterReducer(self,key,value):
    """将迭代器转为列表并分目录输出"""
    datalist=list(value)
    newrows=[]
    oldrows=[]
    oldrow=[]
    newrow=[]
    everydaydata=[]
    physicaldelete=[]
    alldata=[]
    duplicatedata=[]
    todaydata=[]
    for row in datalist:
        if row[1] == True:
            newrows.append(row[0])
        elif row[1]==False:
            oldrows.append(row[0])
    for R in oldrows:
        if R[int(self.options.ColNum)+2]=="29991231":
            oldrow=R
        else:
            alldata.append(R)

其次,判断新数据条数,若大于1则取最大那条,并同时把所有数据装入duplicatedata,若新数据非空先装入今日全量todaydata。然后根据取出的开链就数据oldrow和新数据newrow,比较其MD5做处理:

  1. 若无oldrow则为新增数据,装入everydaydata和alldata;
  2. 若无newrow而有oldrow则为物理删除数据,将oldrow闭链并装入physicaldelete和alldata;
  3. 若newrow等于oldrow则数据无变化,oldrow有效日期设为当天,装入alldata;
  4. 若newrow不等于oldrow则为修改数据,将oldrow闭链并装入alldata,newrow装入alldata和everydaydata。
if len(newrows)==1:
    newrow=newrows[0]
elif len(newrows)>1:
    newrow = max(newrows)
    duplicatedata=newrows
if newrow!=[]:
    todaydata=newrow[:-3]
if len(oldrow)==0:
    everydaydata=newrow[:-3]
    alldata.append(newrow)
elif len(newrow)==0 and len(oldrow)!=0:
    oldrow[int(self.options.ColNum) + 2] = (datetime.date(int(self.options.recorrddate[:4]),int(self.options.recorrddate[4:6]),int(self.options.recorrddate[6:])) - datetime.timedelta(days=1)).strftime("%Y%m%d")
    physicaldelete = oldrow[:-3]
    alldata.append(oldrow)
elif newrow[int(self.options.ColNum)]==oldrow[int(self.options.ColNum)]:
    oldrow[int(self.options.ColNum)+3]=self.options.recorrddate
    alldata.append(oldrow)
elif newrow[int(self.options.ColNum)]!=oldrow[int(self.options.ColNum)]:
    oldrow[int(self.options.ColNum) + 2] = (datetime.date(int(self.options.recorrddate[:4]),int(self.options.recorrddate[4:6]),int(self.options.recorrddate[6:])) - datetime.timedelta(days=1)).strftime("%Y%m%d")
    alldata.append(oldrow)
    alldata.append(newrow)
    everydaydata = newrow[:-3]

接下来要设置Hadoop的多路输出,我们需要用到一个mrjob自带的jar包(http://empiricalresults.github.io/nicknack/ )。下载后放入PreProcessMaster所在目录下,并进行设置:

class PreProcessMaster(MRJob):

    #设置输出模式为值,而非键值对,并设置多路输出jar包
    OUTPUT_PROTOCOL = RawValueProtocol
    LIBJARS = ['nicknack-1.0.1.jar']
    HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'

再回到reducer,输出的格式为:key部分填None;value部分用\t分开,前部分为输出目录的子目录名,后部分为清洗后的一行数据(数据中不能带有换行符,多行数据分多次yield)。

if todaydata!=[]:
    yield None,"todaydata"+"\t"+"|!".join(todaydata)
if everydaydata!=[]:
    yield None,"everydaydata"+"\t"+"|!".join(everydaydata)
if physicaldelete != []:
    yield None,"physicaldelete"+"\t"+"|!".join(physicaldelete)
if alldata != []:
    for R in alldata:
        yield None,"alldata"+"\t"+"|!".join(R)
if duplicatedata != []:
    for R in duplicatedata:
        yield None,"duplicatedata"+"\t"+"|!".join(R)

最后,用steps方法写明执行顺序。

def steps(self):
    #设置MR步骤
    return [MRStep(mapper_init=self.mapper_init,mapper=self.MasterMapper,reducer=self.MasterReducer),]

(3)dataClean()

dataClean函数根据自身业务需要编写,建议直接放在PreProcessMaster.py中,这样不用在各个节点中一式多份。

值得一提的是,涉及到字符串截断的清洗不能直接用python的截断方法。对于一个中文字,python计算其长度是按1来计算的,实际的一个中文显示时按两个字符计算。这时我们需要一个计算长度的函数,以下是该函数以及截断清洗的方式(cleantyp为传入的清洗方式的参数,colinfo为记录字段信息的字典):

def length(data):
    l=len(data.encode("utf-8"))
    return int((l-len(data))/2+len(data)

#按长度Len截断
if cleantyp=="A":
    l=len(data)
    while length(data)>colinfo["Len"]:
        l = l - 1
        data=data[:l]
    return data
 类似资料: