会有这样一种场景:有个非常大的文件需要和一个很小的文件去关联,即:一张大表去关联一张小表。
如果是在Hive里可以用MapJoin的方式,但是这种方式不是很灵活虽然也支持不等值连接,例如:有这样一个需求,有一个很大的日志文件,这个文件中有个字段是用户的IP(这里的IP是长整形),有一个很小的文件记录了IP范围对应的地域信息,我需要知道这些用户的地域分布信息,如果用MapJoin的话需要用大于和小于去比较,这种方式经测试发现效率极低。
于是想到了DistributedCache的方式,DistributedCache翻译过来就是分布式缓存,把小文件载入到内存中,在MapReduce的时候直接和内存中的小文件进行关联即可。实现思路:
1、使用DistributedCache将保存IP地域信息的文件加载至内存;
2、在map里每处理一条日志就在内存中查找符合这个用户IP的地域(这里使用二分查找,效率还不错)。
Map类的代码如下:
public static class ConfigStatistic2Mapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
ArrayList<String> ipLocationList = new ArrayList<String>();
@SuppressWarnings("deprecation")
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
Path[] catchFiles = DistributedCache.getLocalCacheFiles(conf);
String ipLocation = catchFiles[0].toString();
@SuppressWarnings("resource")
//读取缓存文件,并将文件的内容保存在ipLocationList 这个List对象中
BufferedReader bReader = new BufferedReader(new FileReader(
ipLocation));
String line = "";
while ((line = bReader.readLine()) != null) {
ipLocationList.add(line);
}
}
/**
* 通过二分查找查询ip的地域
*
* @param ip 长整形
* @param ipTable 记录其实IP、结束IP以及这个IP端对应的地域信息
* @return start_ip,end_ip,country,district,city,netservice,1isp 并用‘\001’分隔
*/
public String getLocation(Long ip, List<String> ipTable) {
int begin = 0, end = 0;
for (end = ipTable.size() - 1; begin != end;) {
int midIndex = (end + begin) / 2;
int mid = (end - begin);
Long midValue = Long.parseLong(ipTable.get(midIndex).split(
"\001")[0]);
if (ip == midValue) {
return ipTable.get(midIndex);
}
if (ip > midValue) {
begin = midIndex;
} else {
end = midIndex;
}
if (mid <= 1) {
break;
}
}
return (ipTable.get(begin));
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
......
//获取IP对应的地域信息
String location = getLocation(Long.parseLong(ip),ipLocationList);
......
}
}
public int run(String[] args) throws Exception {
......
Configuration conf = getConf();
DistributedCache
.addCacheFile(
new Path(
"hdfs://std4:8020/user/hive/warehouse/block.db/ip_location/part-m-00000")
.toUri(), conf);
......
}
就贴了主要用到DistributedCache的代码,其他的还是按照普通的Mapreduce编写即可,经测试日志文件有1.7亿多条记录,用时8分钟左右(7个节点的集群),性能还不错。