我使用rowid
列为下表生成一个数字序列,以便执行联接,但这将引发以下错误。我做错了什么?请帮帮我。
fListVec: org.apache.spark.sql.DataFrame = [features: vector]
+-----------------------------------------------------------------------------+
|features |
+-----------------------------------------------------------------------------+
|[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]|
|[0.9558040000000001,0.9843780000000002,0.545025,0.9979860000000002] |
+-----------------------------------------------------------------------------+
代码:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
val fListrdd = fListVec.rdd
.map{case Row(features: Vector) => features}
.zipWithIndex()
.toDF("features","rowId")
fListrdd.createOrReplaceTempView("featuresTable")
val f = spark.sql("SELECT features, rowId from featuresTable")
f.show(false)
输出:
import org.apache.spark.ml.linalg.vector import org.apache.spark.sql.Row org.apache.spark.sparkException:作业由于阶段失败而中止:阶段206.0中的任务0失败1次,最近的失败:阶段206.0中丢失的任务0.0(TID 1718,localhost,executor驱动程序):Scala.MatchError:[[2.5046410000000003,2.148714999999997,1.0884870000000002,3.5877090000000003]](属于类UNJOB$5。Apply(sparkcontext.scala:1944)在org.apache.spark.scheduler.resulttask.runtask.scala:87)在org.apache.spark.scheduler.task.run(task.scala:99)在org.apache.spark.executor.executor$taskrunner.run(executor.scala:282)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.1149)在scheduler.scala:1422)在org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1。apply(dagscheduler.scala:802)在org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1。apply(dagscheduler.scala:802)在scala.option.foreach(option.scala:257)在dd.zippedWithIndexRdd.(zippedWithIndexRdd.Scala:50)在org.apache.spark.rdd.rdd$$anonfun$zipwithIndex$1处。apply(rdd.scala:1293)在org.apache.spark.rdd.rdd$anonfun$zipwithIndex$1处。apply(Rdd.scala:1293)在org.apache.spark.rdd.rdd.rddoperationscope$.withscope(Rddoperationscope.scala:151)在ndexrdd.scala:52)在org.apache.spark.rdd.zippedWithindexrdd$$anonfun$2。apply(zippedWithindexrdd.scala:52)在org.apache.spark.sparkcontext$$anonfun$runjob$5。apply(sparkcontext.scala:1944)在org.apache.spark.sparkcontext$$anonfun$runjob$5。apply(sparkcontext.scala:1944)在
预期产出:
features | rowId
[2.5046410000000003,...] 0
[0.9558040000000001,...] 1
您就快到了--只需指定适当的向量类型densevector
:
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.Row
val fList = Seq(
(Seq(2.5046410000000003, 2.1487149999999997, 1.0884870000000002, 3.5877090000000003)),
(Seq(0.9558040000000001, 0.9843780000000002, 0.545025, 0.9979860000000002))
).toDF("features")
def seqToVec = udf(
(s: Seq[Double]) => new DenseVector(s.toArray)
)
val fListVec = fList.withColumn("features", seqToVec($"features"))
// fListVec: org.apache.spark.sql.DataFrame = [features: vector]
val fListrdd = fListVec.rdd.
map{ case Row(features: DenseVector) => features }.
zipWithIndex.
toDF("features", "rowId")
fListrdd.show
// +--------------------+-----+
// | features|rowId|
// +--------------------+-----+
// |[2.50464100000000...| 0|
// |[0.95580400000000...| 1|
// +--------------------+-----+
您必须在中间编写一个map
函数,以便为要创建的新DataFrame
定义数据类型
val fListrdd = fListVec.rdd
.map{case Row(features) => features}
.zipWithIndex()
.map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt))
.toDF("features","rowId")
其中.map(x=>(x._1.asinstanceof[DenseVector],x.2.toint))
行仅添加。
您可以进一步创建数据集
。我个人推荐dataset
,因为数据集是类型安全的,是数据流的优化形式。
为此,您需要一个案例类
case class features(features: DenseVector, rowId: Int)
并且只需在我的上述解决方案中添加features
word,这样您就可以调用.tods
api来创建一个类型安全的DataSet
。
val fListDS = fListVec.rdd
.map{case Row(features: DenseVector) => features}
.zipWithIndex()
.map(x => features(x._1.asInstanceOf[DenseVector], x._2.toInt))
.toDS
我有以下格式的数据。向量的第一个元素指的是标题,向量的第二个到底部指的是针对标题的值。我希望以表格/结构化格式(或带有标题和值的数据框)放置数据。
有没有办法将数据帧转换为向量?例如 预期产出
我使用Firebase,确切地说是一个实时数据库,我不知道应该设置什么规则。我制定了以下规则: 但现在每个人都可以写作了。当我设置这些: 使用Gmail的用户无法登录,因为数据库中的记录没有创建,但不是在所有设备上。当我在OnePlus上测试时,一切都很好,当我在三星上测试时,数据库中的记录没有创建。这是我负责创建用户的代码:
我有一个数据库: 当用户()在应用程序中注册时,他会填充另一个用户uid(该用户uid具有属性)并将自己的uid添加到他的个人资料中(
TraceId 生成规则 SOFATracer 通过 TraceId 来将一个请求在各个服务器上的调用日志串联起来,TraceId 一般由接收请求经过的第一个服务器产生,产生规则是: 服务器 IP + 产生 ID 时候的时间 + 自增序列 + 当前进程号 ,比如: 0ad1348f1403169275002100356696 前 8 位 0ad1348f 即产生 TraceId 的机器的 IP,
我尝试用规则返回用户聊天列表。所以我不知道对话的id。我尝试了几种方法,但都不起作用,因为你必须知道聊天id。 数据库: 规则: 但当我从react本机应用程序访问时。通过身份验证的用户无法访问聊天记录(uid:3Oi1atf8l2P4Vgsb8tZOGxpUg7q2) *读取失败:错误:权限被拒绝/Chats:客户端没有访问所需数据的权限。 规则: 查询: