当前位置: 首页 > 工具软件 > S3QL > 使用案例 >

[转载] AWS之EMR数据ES通过数据仓库HIVE同步S3

管玉堂
2023-12-01

参考链接: AWS创建S3存储桶

本篇主要讲解利用EMR将ES中PB级数据利用HIVE数据仓库同步到S3,从而利用Athena对数据进行分析计算; 

EMR搭建 

1 软件配置(如图)  注意:hive和spark元数据可以不选,我们这块需要使用,所以勾选了 存储模式可以选在S3或者HDFS,我们这块既然Aws的服务,所以这块就选择了S3存储 2 后面step就按照aws-emr创建集群的step操作就可以了 

 注:如果之前有emr集群,这块也可以使用克隆。 3 集群配置 前面我们把emr集群搭建好了,但是能不能从es同步到s3能,答案是不能的,因为这块还没有继承elasticsearch-hadoop-hive,我们下载这个包放到**/usr/lib/hive/auxlib**下面,然后重启hive服务。 如何重启hive呢? 首先使用 initctl list查询服务,如图  然后先停掉hive服务:sudo stop hive-server2 最后重启:sudo start hive-server2 4 打开hue,配置账户等信息 如图 目前为止,此集群就可以用了,是不小有成就。 

数据处理与同步 

数据处理 

1 创建es外部表 CREATE EXTERNAL TABLE if not exists test_es( meta_id string COMMENT ‘from deserializer’, brand string COMMENT ‘from deserializer’, channel string COMMENT ‘from deserializer’, countrycode string COMMENT ‘from deserializer’, createtime STRING, curclientversioncode string COMMENT ‘from deserializer’, curclientversionname string COMMENT ‘from deserializer’, curpage string COMMENT ‘from deserializer’, curpagetype string COMMENT ‘from deserializer’, imei string COMMENT ‘from deserializer’, imsi string COMMENT ‘from deserializer’, isbackgroundurl string COMMENT ‘from deserializer’, lastpage string COMMENT ‘from deserializer’,lastpagetype string COMMENT ‘from deserializer’, nettype string COMMENT ‘from deserializer’, platform string COMMENT ‘from deserializer’, sharechannel string COMMENT ‘from deserializer’, ua string COMMENT ‘from deserializer’, url string COMMENT ‘from deserializer’, time TIMESTAMP) ROW FORMAT SERDE ‘org.elasticsearch.hadoop.hive.EsSerDe’ STORED BY ’org.elasticsearch.hadoop.hive.EsStorageHandler’ WITH SERDEPROPERTIES ( ‘serialization.format’=‘1’) TBLPROPERTIES ( ‘COLUMN_STATS_ACCURATE’=‘false’, ‘es.index.auto.create’=‘false’, ‘es.mapping.names’=‘meta_id:_metadata._id,brand:brand,channel:channel,countryCode:countryCode,createTime:createTime,curClientVersionCode:curClientVersionCode,curClientVersionName:curClientVersionName,curPage:curPage,curPageType:curPageType,imei:imei,imsi:imsi,isBackgroundUrl:isBackgroundUrl,lastPage:lastPage,lastPageType:lastPageType,netType:netType,platform:platform,shareChannel:shareChannel,time:time,ua:ua,url:url,time:time’, ‘es.nodes’=‘xxxx:9200’, ‘es.read.metadata’=‘true’, ‘es.date.format’ = ‘yyyy-MM-dd HH:mm:ss’, ‘es.index.read.missing.as.empty’=‘true’, ‘es.ser.reader.value.class’ = ‘com.bigdata.service.hadoop.EsValueReader’, ‘es.resource’=‘test_es’); 

此时就可以写sql查询数据,如:select * from test_es;但是这样是通过外部表的形式去查询es的数据,查询还是相当慢的,不过到这个地方还是不错的,说明es的数据可以通过hvie数据仓库可以查询了,也可以对它进行数据分析了,但是离我们的目标还是有点距离。 

2 建立S3外部表 CREATE EXTERNAL TABLE test_s3(  meta_id string COMMENT ‘from deserializer’,  brand string COMMENT ‘from deserializer’,  channel string COMMENT ‘from deserializer’,  countrycode string COMMENT ‘from deserializer’,  createtime string COMMENT ‘from deserializer’,  curclientversioncode string COMMENT ‘from deserializer’,  curclientversionname string COMMENT ‘from deserializer’,  curpage string COMMENT ‘from deserializer’,  curpagetype string COMMENT ‘from deserializer’,  imei string COMMENT ‘from deserializer’,  imsi string COMMENT ‘from deserializer’,  isbackgroundurl string COMMENT ‘from deserializer’,  lastpage string COMMENT ‘from deserializer’,  lastpagetype string COMMENT ‘from deserializer’,  nettype string COMMENT ‘from deserializer’,  platform string COMMENT ‘from deserializer’,  sharechannel string COMMENT ‘from deserializer’,  ua string COMMENT ‘from deserializer’,  url string COMMENT ‘from deserializer’,  time string COMMENT ‘from deserializer’)  PARTITIONED BY (  dt string)  ROW FORMAT SERDE  ‘org.apache.hadoop.hive.serde2.OpenCSVSerde’  WITH SERDEPROPERTIES (  ‘escapeChar’=’\’,  ‘quoteChar’=’’’,  ‘separatorChar’=’\t’)  STORED AS INPUTFORMAT  ‘org.apache.hadoop.mapred.TextInputFormat’  OUTPUTFORMAT  ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’  LOCATION  ‘s3a://桶/目录’ 

此时我们可以hive数据仓库可以通过sql对s3的数据进行查询,如select * from test_s3; 

到这一步我们实现了hive数据仓库对es和s3数据的查询和分析,离目标更近了一步。 

3 es数据写入到s3 insert into test_s3 partition (dt=‘2019.08.01’) select * from test_es;分区建议按时间,下图是这边同步es同步到s3的数据 到这个地方,恭喜,已经完成了目标,但是如果我们数据非常庞大,不可能一个个去处理,浪费人力和时间,有没有更好的方法让数据按照我们的方式去自动实现同步呢,答案在下面 

利用OOZIE 

1 创建workflow 两步,一步是创建es外部表,而是将es的外部表数据查出来insert到s3外部表中,同时我这个workflow里面有两个参数,因为我的表示按天创建。 

2 创建schedule schedule其实就是一个定时调度器,里面可以配置workflow 大家看到schedule里面有两个参数,这个两个参数是传给workflow之sys_es_to_s3_job的值, 随便解释下 ${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, ‘DAY’), ‘yyyy_MM_dd’)} 它的作用是获取当前时间的前一天,并按照给定的格式。 创建好之后运行就可以了,如图  现在每天都会自动处理数据,但是发现这个sys_es_to_s3_job运行的时候报错了如图: 解决办法:修改/etc/oozie/conf/oozie-site.xml  oozie.action.max.output.data 2048000  Max size in characters for output data.   我修改oozie-default.xml重启oozie不起作用,最后我修改了oozie-site.xml,重启后就正常了。 重启oozie一样的,先sudo stop oozie;然后在sudo start oozie; 

到目前为止我们遇到的一些问题也一一解决了,这个集群现在也是正常运行。

 类似资料: