MapReduce使用DistributedCache

万俟铭
2023-12-01

会有这样一种场景:有个非常大的文件需要和一个很小的文件去关联,即:一张大表去关联一张小表。

如果是在Hive里可以用MapJoin的方式,但是这种方式不是很灵活虽然也支持不等值连接,例如:有这样一个需求,有一个很大的日志文件,这个文件中有个字段是用户的IP(这里的IP是长整形),有一个很小的文件记录了IP范围对应的地域信息,我需要知道这些用户的地域分布信息,如果用MapJoin的话需要用大于和小于去比较,这种方式经测试发现效率极低。

于是想到了DistributedCache的方式,DistributedCache翻译过来就是分布式缓存,把小文件载入到内存中,在MapReduce的时候直接和内存中的小文件进行关联即可。实现思路:

1、使用DistributedCache将保存IP地域信息的文件加载至内存;

2、在map里每处理一条日志就在内存中查找符合这个用户IP的地域(这里使用二分查找,效率还不错)。

Map类的代码如下:

public static class ConfigStatistic2Mapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {

	ArrayList<String> ipLocationList = new ArrayList<String>();

	@SuppressWarnings("deprecation")
	protected void setup(Context context) throws IOException,
			InterruptedException {
		Configuration conf = context.getConfiguration();
		Path[] catchFiles = DistributedCache.getLocalCacheFiles(conf);
		String ipLocation = catchFiles[0].toString();

		@SuppressWarnings("resource")
		//读取缓存文件,并将文件的内容保存在ipLocationList 这个List对象中
		BufferedReader bReader = new BufferedReader(new FileReader(
				ipLocation));
		String line = "";
		while ((line = bReader.readLine()) != null) {
			ipLocationList.add(line);
		}
	}

	/**
	 * 通过二分查找查询ip的地域
	 * 
	 * @param ip 长整形
	 * @param ipTable 记录其实IP、结束IP以及这个IP端对应的地域信息
	 * @return start_ip,end_ip,country,district,city,netservice,1isp 并用‘\001’分隔
	 */
	public String getLocation(Long ip, List<String> ipTable) {

		int begin = 0, end = 0;
		for (end = ipTable.size() - 1; begin != end;) {
			int midIndex = (end + begin) / 2;
			int mid = (end - begin);
			Long midValue = Long.parseLong(ipTable.get(midIndex).split(
					"\001")[0]);
			if (ip == midValue) {
				return ipTable.get(midIndex);
			}

			if (ip > midValue) {
				begin = midIndex;
			} else {
				end = midIndex;
			}

			if (mid <= 1) {
				break;
			}
		}

		return (ipTable.get(begin));
	}

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		......
		//获取IP对应的地域信息
		String location = getLocation(Long.parseLong(ip),ipLocationList);
		
		......
	}
}

需要在configuration中设置需要分布式缓存的文件,代码如下:

public int run(String[] args) throws Exception {
	......
	Configuration conf = getConf();
	DistributedCache
			.addCacheFile(
					new Path(
							"hdfs://std4:8020/user/hive/warehouse/block.db/ip_location/part-m-00000")
							.toUri(), conf);
	......
}
就贴了主要用到DistributedCache的代码,其他的还是按照普通的Mapreduce编写即可,经测试日志文件有1.7亿多条记录,用时8分钟左右(7个节点的集群),性能还不错。



 类似资料: