Hama 是BSP(Bulk Synchronous Parallel整体同步并行)计算框架的一种实现,类似Google研发的Pregel系统。跟前一篇<<Mahout KMeans Clustering学习>>的流程一致,分四不走,在这里我不再多说了。
BSP有三种特性:1. 局部计算,2. 节点通信,3. 全局栅栏同步。
这使得BSP能够很好地实现迭代计算。
Hama版的KMeans比较简单,没有Mahout的复杂的代码架构,因为它只是Hama的这个例子的目的就是告诉大家Hama是能够迭代计算任务。
下面是在完成数据初始化和预处理后,org.apache.hama.ml.kmeans.KMeansBSP.bsp(BSPPeer<VectorWritable, NullWritable, IntWritable, VectorWritable, CenterMessage>)实现的KMeans算法。
@Override
public final void bsp(
BSPPeer<VectorWritable, NullWritable, IntWritable, VectorWritable, CenterMessage> peer)
throws IOException, InterruptedException, SyncException {
long converged;
while (true) {
//每个节点计算它们的簇中心,里面调用peer.send()方法把簇中心列表发送到各个peer节点上去。
assignCenters(peer);
peer.sync();
//接收各个机器同步过来的簇中心列表,并且与旧的中心对比。如果收敛,即新旧簇中心都是一样的。里面调用peer.getCurrentMessage()反复接收其他peer节点发过来的CurrentMessage数据
converged = updateCenters(peer);
//重新打开数据文件,重新读取
peer.reopenInput();
//判断是否收敛或者是否已经迭代规定次数
if (converged == 0)
break;
if (maxIterations > 0 && maxIterations < peer.getSuperstepCount())
break;
}
LOG.info("Finished! Writing the assignments...");
//根据已经分好的簇中心,把数据分到相应的簇里面去
recalculateAssignmentsAndWrite(peer);
LOG.info("Done.");
}
Hama的特点是不需要启动多个Job去运行,而是在同一个Job里即可以完成迭代任务。这个BSP的整体栅栏同步(Barrier Synchronous)机制保证了数据及状态的正确。