spark+sql

墨高杰
2023-12-01
#-*- coding: utf-8 -*-
#use sqlcontext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType,Row
from pyspark.sql import Window
from pyspark.sql import functions as F

import time
from datetime import datetime,timedelta
import sys

#--------------------------------------------------------1.初始化-----------------------------------------------
#-----------------初始化参数---------------------- --
#必填:appname
appname='storage_owner_buckets_perform'
#hdfs目录
data_set='storage'
#必填:小时目录
passhour=24
source_time=(datetime.strptime(sys.argv[1], "%Y%m%d%H%M%S") -timedelta(hours=passhour)).strftime("%Y%m%d")
#必填:hdfs文件路径
hdfs_path='/data/s2log/'+source_time
this_month=(datetime.strptime(sys.argv[1], "%Y%m%d%H%M%S") -timedelta(hours=passhour)).strftime("%Y%m")

#-----------------spark初始化------------------------
conf=SparkConf().setAppName(sys.argv[0])
sc=SparkContext(conf=conf)
hc=HiveContext(sc)
sqlContext=SQLContext(sc)

#将有数据的hdfs存入hdfs list
hdfs_list=[]
for i in range(24):
	hour=str(i).zfill(2)
	hdfs='%s/%s'%(hdfs_path,hour)
	hdfs_list.append(hdfs)

#hdfs list 为空报错
if len(hdfs_list)==0:
	print 'HDFS has no data for %s'%(source_time)
	quit()
#load 24小时数据
else:
	#成功获取数据的小时数
	SuccessHours=0;
	for i in range(len(hdfs_list)):
		#数据获取
		try:
			rows_hour=sqlContext.read.json(hdfs_list[i])
			print 'Rows for %s:'%(hdfs_list[i])
			print 'The count is %s'%(rows_hour.count())
			print 'The head is '
			print rows_hour.head()
			SuccessHours = SuccessHours +1;
		except Exception,e:
			print Exception,":",e

		#如果获取成功则数据清洗
		if 'rows_hour' in locals().keys():
			if '_corrupt_record' in rows_hour.columns:
				print '_corrupt_record exists'
				try:
					rows_hour=rows_hour.\
						filter("_corrupt_record='' or not _corrupt_record")
					print 'Filter successes'
				except Exception,e:
					print Exception,":",e

				#如果获取成功则删除_corrupt_record
				try:
					rows_hour=rows_hour.drop('_corrupt_record')
					print 'Drop successes'
				except Exception,e:
					print Exception,":",e
				print rows_hour.count()
				#dataframe 合并
			else:
				print rows_hour.count()

                lis=rows_hour.columns()
                lis_refer=['Time','Bucket-Owner','Error-Code','Bucket','Server-Address','Real-File-Type','Operation','Real-File-Size','HTTP-status',
                           'Total-Time','Turn-Around-Time','Request-Body-Size','Request-URI','Response-Size','Request-Size','date','owner','refer']
                for elem in lis_refer:
                    if  elem not in lis:
                        rows_hour.withColumn(elem,'')
                rows_hour=rows_hour.select(F.col('Time'),F.col('Bucket-Owner'),F.col('Error-Code'),F.col('Bucket'),F.col('Server-Address'),F.col('Real-File-Type'),
                                       F.col('Operation'),F.col('Real-File-Size'),F.col('HTTP-status'),F.col('Total-Time'),F.col('Turn-Around-Time'),
                                       F.col('Request-Body-Size'),F.col('Request-URI'),F.col('Response-Size'),F.col('Request-Size'),F.col('date'),F.col('owner'),
                                       F.col('refer'))
                ##print rows_hour.columns()
                """
                for i in rows_hour.columns:
                    if i is None:
                        rows_hour.withColumn(i,'')
              """

			if SuccessHours>1:

				try:
					rows=rows.unionAll(rows_hour)
					print 'Merged rows:'
					print 'The count is %s'%(rows.count())
					print 'The left log is'
					print rows.head()
                                        print 'The right log is'
                                        print rows_hour.head()

				except Exception,e:
					print Exception,":",e
					print rows_hour.head()
					print '%s is failed to union with the others'%(hdfs_list[0])
			else:
				rows=rows_hour

			#清空rows_hour
			del rows_hour

 类似资料: