hama的vertex比hadoop的mapreduce好用
package org.May25.bjfcd201203;
import java.io.IOException;
import java.math.BigDecimal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
public class Speed {
/*
* 实验结果:
* bc6b34d6c90f4ffe8be3d10804aaa15e 0.0
* 15ae99e97f224f4e9a299a386aac0228 2.6
* 83c8aa0d1c7b4f36b1d02ce77883674d 0.0
* 0544a8a8a8744a56b22e88f44db84c5f 3.73
* b454e650cb1a4fa4a2f2bd6899fbfa73 0.0
* f6d3964061bf4681b688f157f9e132d0 0.0
* 7b93cee2622f41b19e9926a379457f6b 0.0
* 45ce1e1e6c4342f28b057c79e410e16c 0.62
* b8a62c795b02407dac4e72e04e30c480 0.11
* 9f1bb4d8f9484db68c3669a1ac7d7d10 0.0
* a5974484e71b40408234c9af61b3237e 0.0
*/
public static class SpeedVertex extends Vertex<Text,DoubleWritable, Text>{
public double distanceTime =0d;
public double result =0d;
@Override
public void compute(Iterable<Text> messages) throws IOException {
// TODO Auto-generated method stub
if (this.getSuperstepCount() == 0L) {
for(Edge<Text, DoubleWritable>edge : this.getEdges()){
DoubleWritable time = edge.getValue();
Text distance = edge.getDestinationVertexID();
double doudistance = Double.parseDouble(distance.toString());
double douTime = time.get();
if (douTime !=0d) {
distanceTime = doudistance/douTime;
}
BigDecimal bigDecimal = new BigDecimal(distanceTime);
//保留2位有效数字
result = bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
System.out.println(getVertexID()+"\t"+result);
}
}
}
}
public static class SpeedVertexInReader extends
VertexInputReader<LongWritable, Text, Text, DoubleWritable, Text>{
/*
* 实验数据:
* b454e650cb1a4fa4a2f2bd6899fbfa73 0.0:0.006388888888888889
* 15ae99e97f224f4e9a299a386aac0228 8.958467335869804:3.4391666666666665
* 45ce1e1e6c4342f28b057c79e410e16c 5.816872008170536:9.358888888888888
* b8a62c795b02407dac4e72e04e30c480 0.49172655211785005:4.615
* f6d3964061bf4681b688f157f9e132d0 0.0:0.8272222222222222
* 0544a8a8a8744a56b22e88f44db84c5f 10.999974259619822:2.9458333333333333
* 9f1bb4d8f9484db68c3669a1ac7d7d10 0.0:1.1472222222222221
* a5974484e71b40408234c9af61b3237e 0.0:0.5069444444444444
* 83c8aa0d1c7b4f36b1d02ce77883674d 0.0:0.021666666666666667
* bc6b34d6c90f4ffe8be3d10804aaa15e 0.0:0.023055555555555555
* 7b93cee2622f41b19e9926a379457f6b 0.0:0.9072222222222223
* (non-Javadoc)
*/
@Override
public boolean parseVertex(LongWritable key, Text value, Vertex<Text, DoubleWritable, Text> vertex)
throws Exception {
// TODO Auto-generated method stub
String[] split = value.toString().split("\t");
for(int i = 0;i < split.length;i++){
if (i == 0) {
vertex.setVertexID(new Text(split[i]));
}
else {
String[] split2 = split[i].split(":");
vertex.addEdge(new Edge<Text, DoubleWritable>(new Text(split2[0]),
new DoubleWritable(Double.parseDouble(split2[1]))));
}
}
return true;
}
}
public static GraphJob setJob(Path inPath,Path outPath,HamaConfiguration hamaConf)
throws IOException{
GraphJob graphJob = new GraphJob(hamaConf, Speed.class);
graphJob.setMaxIteration(Integer.SIZE);
graphJob.setPartitioner(HashPartitioner.class);
graphJob.setInputFormat(TextInputFormat.class);
graphJob.setInputKeyClass(LongWritable.class);
graphJob.setInputPath(inPath);
graphJob.setInputValueClass(Text.class);
graphJob.setEdgeValueClass(DoubleWritable.class);
graphJob.setMessageClass(Text.class);
graphJob.setVertexClass(SpeedVertex.class);
graphJob.setVertexIDClass(Text.class);
graphJob.setVertexInputReaderClass(SpeedVertexInReader.class);
graphJob.setVertexValueClass(Text.class);
graphJob.setOutputFormat(TextOutputFormat.class);
graphJob.setOutputKeyClass(LongWritable.class);
graphJob.setOutputPath(outPath);
graphJob.setOutputValueClass(Text.class);
return graphJob;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
HamaConfiguration hamaConf = new HamaConfiguration(new Configuration());
Path inPath = new Path("/home/zhuhaichuan/Documents/test01.json");
Path outPath = new Path("/home/zhuhaichuan/Documents");
GraphJob graphJob = setJob(inPath, outPath, hamaConf);
long startTime = System.currentTimeMillis();
if (graphJob.waitForCompletion(true)) {
System.out.println("Job Finished In: "+(System.currentTimeMillis() - startTime)/1000.0+"seconds");
}
}
}