val vertex = graph.vertices
val edges = graph.edges.map(v=>(v.srcId, v.dstId)).toDF("key","value")
var FMvertex = vertex.map(v => (v._1, HLLCounter.encode(v._1)))
var encodedVertex = FMvertex.toDF("keyR", "valueR")
var Degvertex = vertex.map(v => (v._1, 0.toLong))
var lastRes = Degvertex
//calculate FM of the next step
breakable {
for (i <- 1 to MaxIter) {
var N_pre = FMvertex.map(v => (v._1, HLLCounter.decode(v._2)))
var adjacency = edges.join(
encodedVertex,//FMvertex.toDF("keyR", "valueR"),
$"value" === $"keyR"
).rdd.map(r => (r.getAs[VertexId]("key"), r.getAs[Array[Byte]]("valueR"))).reduceByKey((a,b)=>HLLCounter.Union(a,b))
FMvertex = FMvertex.union(adjacency).reduceByKey((a,b)=>HLLCounter.Union(a,b))
// update vetex encode
encodedVertex = FMvertex.toDF("keyR", "valueR")
var N_curr = FMvertex.map(v => (v._1, HLLCounter.decode(v._2)))
lastRes = N_curr
var middleAns = N_curr.union(N_pre).reduceByKey((a,b)=>Math.abs(a-b))//.mapValues(x => x._1 - x._2)
if (middleAns.values.sum() == 0){
println(i)
break
}
Degvertex = Degvertex.join(middleAns).mapValues(x => x._1 + i * x._2)//.map(identity)
}
}
val res = Degvertex.join(lastRes).mapValues(x => x._1.toDouble / x._2.toDouble)
return res
import net.agkn.hll.HLL;
import com.google.common.hash.*;
import com.google.common.hash.Hashing;
import java.io.Serializable;
public class HLLCounter implements Serializable {
private static int seed = 1234567;
private static HashFunction hs = Hashing.murmur3_128(seed);
private static int log2m = 15;
private static int regwidth = 5;
public static byte[] encode(Long id) {
HLL hll = new HLL(log2m, regwidth);
Hasher myhash = hs.newHasher();
hll.addRaw(myhash.putLong(id).hash().asLong());
return hll.toBytes();
}
public static byte[] Union(byte[] byteA, byte[] byteB) {
HLL hllA = HLL.fromBytes(byteA);
HLL hllB = HLL.fromBytes(byteB);
hllA.union(hllB);
return hllA.toBytes();
}
public static long decode(byte[] bytes) {
HLL hll = HLL.fromBytes(bytes);
return hll.cardinality();
}
}
当我在一个大约有一千万个顶点和一亿条边的图上运行时,代码工作得很好。然而,当我在一个有数以亿计的图和数十亿条边的图上运行它时,在集群上运行了几个小时后,它显示
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 91 in stage 29.1 failed 4 times, most recent failure: Lost task 91.3 in stage 29.1 (TID 17065, 9.10.135.216, executor 102): java.io.IOException: : No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
有人能帮我吗?我刚开始使用spark几天。谢谢你帮忙。
小天,你说“洗牌读和洗牌写大约是1TB。我不需要那些中间值或RDDs”。此声明确认您不熟悉Apache Spark,也可能不熟悉您正在运行的算法。请听我解释。
当三个数相加时,你必须对要相加的前两个数做出选择。例如(A+B)+C或A+(B+C)。一旦做出选择,括号内的数字就会有一个临时的中间值。在没有中间数的情况下,不可能在所有三个数之间继续计算。
RDD是一种空间高效的数据结构。每个“新”RDD表示整个数据集上的一组操作。一些RDDs表示单个操作,如“加五”,而另一些RDDs表示一系列操作,如“加五,然后乘六,再减七”。如果不放弃数学算法的某些部分,就不能放弃RDD。
Apache Spark的核心是一种散集算法。它将数据集分发到多个工作节点,其中该数据集是分发的单个RDD的一部分,以及所需的计算。此时,计算尚未执行。当从RDD的计算形式请求数据时,按需执行计算。
有时,如果不知道其他工人的一些中间值,就不可能在单个工人上完成计算。这种工作人员之间的交叉通信总是发生在头节点之间,头节点将数据分配给各个工作人员,并收集和聚合来自各个工作人员的数据;但是,根据算法的结构,它也可能发生在计算中期(特别是在分组或连接数据片的算法中)。
您有一个算法需要洗牌,以这样一种方式,单个节点不能从所有其他节点收集结果,因为单个节点没有足够的ram来保存从其他节点收集的中间值。
如果没有Apache Spark程序的细节,没有对数据集的访问,没有对Spark集群及其日志的访问,很难知道这些常见方法中哪一种对您的好处最大;所以我把它们都列了出来。
祝你好运!
问题内容: 当我尝试将某些文件保存到centos计算机上时,出现错误“设备上没有剩余空间” 我试过了 当我这样做 ->只有5G 似乎文件系统已满。我怎么能找到哪一个尺寸这么大? 问题答案: 和的输出之间的这种差异可能会在某些大文件已被删除但仍由某些进程打开的情况下发生。检查命令以查看哪些进程已打开描述符以删除文件。您可以重新启动该过程,空间将被释放。
我已经向EC2实例附加了一个新卷。卷已成功附加。位于命令输出下面。 DF-H
我在运行一些任务时在hadoop中收到以下异常。但是HDFS显示它有空间。任何有关此错误的信息都会有所帮助。
我有一个EMR作业,它读取大约1TB的数据,过滤它并对它进行重新分区(重新分区后有一些连接),但是我的作业在重新分区时失败,错误为“设备上没有空间”。我很想更改“spark.local.dir”,但没有用。我的工作只在D2.4xLarge实例上完成,但在具有类似内核和RAM的R3.4xLarge实例上失败。我找不到这个问题的根本原因。如有任何帮助,不胜感激。 谢谢你抽出时间。
我在ec2集群上运行一个pyspark作业,有4个工作人员。我得到这个错误: 尝试增加洗牌分区-同样的问题。我的数据在执行器中的分区看起来相当均匀。我想尝试将Null或None分配给dataframes的变通方法,问题是它是否真的会移除中间洗牌文件,以及linage是否会被保留。 例如,如果我的代码如下所示: 我会把
问题内容: 我正在将一个小文件(8.5 Mb)上传到烧瓶测试服务器。 文件上传完成后,服务器报告: 现在,服务器具有足够的可用空间-超过3Gb。 我查看了Werkzeug github存储库,以查找Werkzeug尝试写入的位置,但无法对其进行跟踪。 我还检查了tempfile.gettempdir(),该文件将/ var / tmp用作临时文件目录,但是该文件夹实际上是空的,因此我不认为这是造成