我在尝试hadoop(Java版本)中的mapreduce程序,从json文件中查找共同的朋友列表。json文件内容具有以下模式:
{"name":"abc","id":123} [{"name":"xyz","id":124},{"name":"def","id":125},{"name":"cxf","id":155}]
{"name":"cxf","id":155} [{"name":"xyz","id":124},{"name":"abc","id":123},{"name":"yyy","id":129}]
模式解释如下:
friend json选项卡,由相关friends json的数组分隔
因此abc将xyz、def和cxf作为朋友cxf将xyz abc和yyy作为朋友。
综上所述,abc和cxf的共同好友是xyz。
尝试通过创建自定义可写文件来实现相同的使用mapduce,映射器发出以下键值,键是一对朋友,值是键中第一个朋友的相关朋友(即一对朋友)
K->V
(abc,xyz) -> [xyz,def,cxf]
(abc,def) -> [xyz,def,cxf]
(abc,cxf) -> [xyz,def,cxf]
(cxf,xyz) -> [xyz,abc,yyy]
(cxf,abc) -> [xyz,abc,yyy]
(cxf,yyy) -> [xyz,abc,yyy]
这里的键实际上是一个自定义的可写的,创建了一个扩展了WritableComparable的类,我已经重写了compareTo方法,所以这两个对(a,b)和(b,a)是相同的。但我面临的问题是,compareTo方法并不是对所有对的组合都调用的,因此reducer逻辑失败了。
基于上述示例,映射器发射了6个K,V对。但是compareTo只被调用了5次key1。比较(键2),键2。比较(键3),键3。比较(键4),键4。比较(键5),键5。比较(键6)。
知道为什么会这样吗?
下面是根据f11ler建议的逻辑编写的代码
驾驶员等级:
package com.facebook.updated;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
public class FacebookMain extends Configured implements Tool
{
Logger logger = Logger.getLogger(FacebookMain.class);
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new FacebookMain(), args));
}
@Override
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
logger.info("Running======>");
Job job = Job.getInstance();
job.setJarByClass(FacebookMain.class);
job.setJobName("FBApp");
job.setMapOutputKeyClass(Friend.class);
job.setMapOutputValueClass(Friend.class);
job.setOutputKeyClass(FriendPair.class);
job.setOutputValueClass(Friend.class);
job.setMapperClass(FacebookMapper.class);
job.setReducerClass(FacebookReducer.class);
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean val = job.waitForCompletion(true);
return val ? 0 : 1;
}
}
可定制的Writables(用于表示朋友和朋友对)
package com.facebook.updated;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@Getter
@Setter
public class Friend implements WritableComparable<Friend> {
Logger logger = Logger.getLogger(Friend.class);
private IntWritable id;
private Text name;
public Friend() {
this.id = new IntWritable();
this.name = new Text();
}
@Override
public int compareTo(Friend arg0) {
int val = getId().compareTo(arg0.getId());
logger.info("compareTo Friend ======> " + arg0 + " and " + this + " compare is " + val);
return val;
}
@Override
public void readFields(DataInput in) throws IOException {
id.readFields(in);
name.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
id.write(out);
name.write(out);
}
@Override
public boolean equals(Object obj) {
Friend f2 = (Friend) obj;
boolean val = this.getId().equals(f2.getId());
//logger.info("equals Friend ======> " + obj + " and " + this);
return val;
}
@Override
public String toString() {
return id + ":" + name + " ";
}
}
package com.facebook.updated;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@Getter
@Setter
public class FriendPair implements WritableComparable<FriendPair> {
Logger logger = Logger.getLogger(FriendPair.class);
private Friend first;
private Friend second;
public FriendPair() {
this.first = new Friend();
this.second = new Friend();
}
public FriendPair(Friend f1, Friend f2) {
this.first = f1;
this.second = f2;
}
@Override
public int compareTo(FriendPair o) {
logger.info("compareTo FriendPair ======> " + o + " and " + this);
FriendPair pair2 = o;
int cmp = -1;
if (getFirst().compareTo(pair2.getFirst()) == 0 || getFirst().compareTo(pair2.getSecond()) == 0) {
cmp = 0;
}
if (cmp != 0) {
// logger.info("compareTo FriendPair ======> " + o + " and " + this
// + " comparison is " + cmp);
return cmp;
}
cmp = -1;
if (getSecond().compareTo(pair2.getFirst()) == 0 || getSecond().compareTo(pair2.getSecond()) == 0) {
cmp = 0;
}
// logger.info("compareTo FriendPair ======> " + o + " and " + this +
// " comparison is " + cmp);
// logger.info("getFirst() " + getFirst());
// logger.info("pair2.getFirst() " + pair2.getFirst());
// logger.info("getFirst().compareTo(pair2.getFirst()) " +
// getFirst().compareTo(pair2.getFirst()));
// logger.info("getFirst().compareTo(pair2.getSecond()) " +
// getFirst().compareTo(pair2.getSecond()));
// logger.info("getSecond().compareTo(pair2.getFirst()) " +
// getSecond().compareTo(pair2.getFirst()));
// logger.info("getSecond().compareTo(pair2.getSecond()) " +
// getSecond().compareTo(pair2.getSecond()));
// logger.info("pair2.getSecond() " + pair2.getSecond());
// logger.info("getSecond() " + getSecond());
// logger.info("pair2.getFirst() " + pair2.getFirst());
// logger.info("pair2.getSecond() " + pair2.getSecond());
return cmp;
}
@Override
public boolean equals(Object obj) {
FriendPair pair1 = this;
FriendPair pair2 = (FriendPair) obj;
boolean eq = false;
logger.info("equals FriendPair ======> " + obj + " and " + this);
if (pair1.getFirst().equals(pair2.getFirst()) || pair1.getFirst().equals(pair2.getSecond()))
eq = true;
if (!eq) {
// logger.info("equals FriendPair ======> " + obj + " and " + this +
// " equality is " + eq);
return false;
}
if (pair1.getSecond().equals(pair2.getFirst()) || pair1.getSecond().equals(pair2.getSecond()))
eq = true;
// logger.info("equals FriendPair ======> " + obj + " and " + this +
// " equality is " + eq);
return eq;
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public String toString() {
return "[" + first + ";" + second + "]";
}
@Override
public int hashCode() {
logger.info("hashCode FriendPair ======> " + this);
return first.getId().hashCode() + second.getId().hashCode();
}
}
映射器和还原器
package com.facebook.updated;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;
public class FacebookMapper extends Mapper<LongWritable, Text, Friend, Friend> {
Logger log = Logger.getLogger(FacebookMapper.class);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Friend, Friend>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer st = new StringTokenizer(line, "\t");
String person = st.nextToken();
String friends = st.nextToken();
BasicDBObject personObj = (BasicDBObject) JSON.parse(person);
BasicDBList friendsList = (BasicDBList) JSON.parse(friends);
List<Friend> frndJavaList = new ArrayList<>();
for (Object frndObj : friendsList) {
frndJavaList.add(getFriend((BasicDBObject) frndObj));
}
Friend frnd = getFriend(personObj);
Friend[] array = frndJavaList.toArray(new Friend[frndJavaList.size()]);
for (Friend f : array) {
log.info("Map output is " + f + " and " + frnd);
context.write(f, frnd);
}
}
private static Friend getFriend(BasicDBObject personObj) {
Friend frnd = new Friend();
frnd.setId(new IntWritable(personObj.getInt("id")));
frnd.setName(new Text(personObj.getString("name")));
frnd.setHomeTown(new Text(personObj.getString("homeTown")));
return frnd;
}
}
package com.facebook.updated;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;
public class FacebookReducer extends Reducer<Friend, Friend, FriendPair, Friend> {
Logger log = Logger.getLogger(FacebookReducer.class);
@Override
protected void reduce(Friend friend, Iterable<Friend> vals,
Reducer<Friend, Friend, FriendPair, Friend>.Context context) throws IOException, InterruptedException {
List<Friend> friends = new ArrayList<>();
for (Friend frnd : vals) {
friends.add(frnd);
}
log.info("Reducer output is " + friend + " and values are " + friends);
if (friends.size() == 2) {
FriendPair key = new FriendPair(friends.get(0), friends.get(1));
context.write(key, friend);
} else {
//log.info("Size of friends is not 2 key is " + friend + " and values are " + friends);
}
}
}
输入包含2行的json文件
{"name":"abc","id":123} [{"name":"xyz","id":124},{"name":"def","id":125},{"name":"cxf","id":155}]
{"name":"cxf","id":155} [{"name":"xyz","id":124},{"name":"abc","id":123},{"name":"yyy","id":129}]
减速器输出(abc,abc)-
compareTo
排序需要方法,这个关系应该是传递的。这意味着如果
为什么要在mapper中生成这种记录?如果“成为朋友”是一种对称关系,那么您只需使用以下逻辑(伪代码)进行映射即可:
for(int i = 0; i < values.length; ++i)
for(int j = 0; j < values.length; ++j)
if (i ==j)
continue
emmit (values[i], values[j]), key
更新:如果这不是对称的(这意味着“xyz有朋友abc”不是“abc有朋友xyz”)那么我们需要反向记录:
映射器:
for(int i = 0; i < values.length; ++i)
emmit values[i], key
减速器(与之前的mapper相同):
for(int i = 0; i < values.length; ++i)
for(int j = 0; j < values.length; ++j)
if (i ==j)
continue
emmit (values[i], values[j]), key
更新2:
让我们看看这个算法是如何与您的示例一起工作的:
mapper的结果是:
xyz -> abc
def -> abc
cxf -> abc
xyz -> cxf
abc -> cxf
yyy -> cxf
Mapreduce将按键对这些值进行分组,因此reducer的输入:
xyz -> [abc,cxf]
def -> [abc]
cxf -> [abc]
abc -> [cxf]
yyy -> [cxf]
在减速机中,我们通过值进行嵌套循环,但跳过与自比较。结果:
(abc, cxf) -> xyz
这就是我们想要的。
问题内容: 我有两张桌子 用户表: user_relationships 并希望获得2个用户的共同朋友的名字。即: 用户1和2有共同的朋友3。我想在一个查询中得到他的名字“ sammy”。 我怎么做? 问题答案: 或一次连接:
我有一个这样的旋转器: 当用户点击时,我希望它保存状态: 它将位置保存在SharedPref中,但微调器将返回默认值。有人看到什么了吗?
当单元测试共享首选项时,值是否为get refresh every test?
Topics 共同的返回值 一些模块返回’facts’(例如 setup), 这些是通过一个’ansible_facts’作为key和内部一些自动收集的值直接作为当前主机的变量并且他们不需要注册这些数据 Status 每一个模块都必须返回一个status, 来表示这个模块是成功的,是否有任何改变或没有. 当因为用户的条件(when: )或在检查模式下运行时发现该模块不支持, Ansible自己将会
我的问题是树中有大量的节点和许多查询。是否有一种算法,它进行预处理,使查询能够在恒定的时间内得到答复。 我研究了使用RMQ的LCA,但我不能使用该技术,因为我不能对树中的这么多节点使用数组。 如果知道它是满二叉树,节点之间的关系如上所示,那么有人能给我一个高效的实现来快速回答许多查询。 但是当有很多查询时,这种算法非常耗时,因为在最坏的情况下,我可能必须遍历30的高度(树的最大高度)才能到达根(最
问题内容: 实际上,我有2个表Friends表和users表,我想要达到的目的是通过检查另一个用户的朋友来获取我的共同朋友,并从users表中获取这些共同朋友的数据 表的朋友是这样建立的 然后表格数据看起来像这样 然后假设我是ID为2的用户,那么在该表中我有3个朋友-1、3和4。我要检索的是与用户1相同的朋友,他们也有3个朋友-2、3和4。并从表用户中检索2个共同的共同朋友3和4的数据 问题答案: