我试图从dataframe
获取列,并将其转换为rdd[Vector]
。
"col0.1","col1.2","col2.3","col3.4"
1,2,3,4
10,12,15,3
1,12,10,5
val df = spark.read.format("csv").options(Map("header" -> "true", "inferSchema" -> "true")).load("C:/Users/mhattabi/Desktop/donnee/test.txt")
val column=df.columns.map(c=>s"`${c}`")
val rows = new VectorAssembler().setInputCols(column).setOutputCol("vs")
.transform(df)
.select("vs")
.rdd
val data =rows.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
.map(org.apache.spark.mllib.linalg.Vectors.fromML)
val mat: RowMatrix = new RowMatrix(data)
//// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(mat.numCols().toInt, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
println(V)
如果问题是列名中的.(dot)
,则可以使用`(backticks)
将列名括起来。
df.select(“`col0.1`”)
我们正试图在spark中生成数据集的逐列统计数据。除了使用统计库中的summary函数之外。我们使用以下程序: > 我们确定具有字符串值的列 为整个数据集生成键值对,使用列号作为键,使用列的值作为值 生成新的格式映射 (K,V)- 然后我们使用reduceByKey来找到所有列中所有唯一值的总和。我们缓存这个输出以减少进一步的计算时间。 在下一步中,我们使用for循环遍历列,以查找所有列的统计信息
我读到过,太多的小分区会因为开销而损害性能,例如,向执行器发送大量任务。 使用最大的分区的缺点是什么?例如,为什么我会看到100s的MB范围内的建议? 如果丢失了一个分区,则需要进行大量的重新计算。对于许多较小的分区,您可能会更经常地丢失分区,但在运行时中的差异会更小。 如果在大分区上执行的少数任务中有一个任务的计算时间比其他任务长,这将使其他核心未被利用,但使用较小的分区,可以更好地在集群中分配
数据帧结构: 预期的数据帧结构: Code_1已尝试: 这也导致错误配对和重复。关于我应该调整什么以获得所需输出的任何建议。 我还尝试在第一条select语句中使用多次爆炸,这将引发错误。 Code_2尝试: 警告和错误: 是的,我问了同样的问题,这个问题被关闭为重复,指向另一个解决方案,这就是我在片段2中尝试的。它也不起作用。任何建议都会很有帮助。
首先,我使用的是scala 2.10.4,上面的例子是在Spark 1.6中运行的(尽管我怀疑Spark与此有关,但这只是一个序列化问题)。 所以我的问题是:假设我有一个trait,它由两个类实现,比如说和。现在,我有一个泛型特征,它由一组类扩展,其中一个类位于的子类型之上,例如(这里我保留了Spark对RDD的概念,但一旦序列化,它实际上可能是另一个类;不管实际情况如何,它都只是一个结果): 现
在配置spark应用程序时,我试图从集群中挤出每一点,但似乎我并没有完全正确地理解每一件事。因此,我正在AWS EMR集群上运行该应用程序,该集群具有1个主节点和2个m3类型的核心节点。xlarge(每个节点15G ram和4个vCPU)。这意味着,默认情况下,每个节点上为纱线调度的应用程序保留11.25 GB。因此,主节点仅由资源管理器(纱线)使用,这意味着剩余的2个核心节点将用于调度应用程序(
我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?