我尝试从Socket TCP获取数据以附加到数据帧我收到数据并将它们执行到Seq(),但当我使用foreach将它们附加到数据帧时出现问题这是我的代码:
object CustomReceiver {
def main(args: Array[String]): Unit = {
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("CustomReceiver")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
import spark.implicits._
/*formatdata line data from Socket: number1, 20210621090303, RadiusMessage, Stop, 84602496347, v241.66.85.130 */
val linesData1 = ssc.receiverStream(new CustomReceiver("localhost", 11000))
linesData1.flatMap(_.split(" ").map(_.trim))
linesData1.foreachRDD { rdd =>
rdd.foreach{ line => {
val arrraLine = line.split(",").toList
// oke arrayLine data : List(number1, 20210621090303, RadiusMessage, Stop, 84602496347, 241.66.85.130)
val testRDD = Seq(arrraLine).map(x =>(x(0), x(1), x(2), x(3), x(4)))
// oke TestRDD : testRDD :List((number1,20210621090303,RadiusMessage,Stop,84602496347))
val testDF = testRDD.toDF("cot1","cot2","cot3","cot4","cot5")
// has an Problem
testDF.show()
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
这是我跑步时的问题
Java语言组织中的lang.NullPointerException。阿帕奇。火花sql。SQLImplicits。localSeqToDatasetHolder(SQLImplicits.scala:231),位于Cl.CustomReceiver$$anonfun$main$4(CustomerReceiver.scala:52),Cl.CustomReceiver$$anonfun$main$4$adapted(CustomerReceiver.scala:45)位于scala。收集迭代器。scala上的foreach(迭代器。scala:943)。收集迭代器。foreach$(Iterator.scala:943)位于org。阿帕奇。火花util。完成计算器。foreach(CompletionIterator.scala:25)位于org。阿帕奇。火花rdd。RDD$anonfun$foreach$2(RDD.scala:1012)位于org。阿帕奇。火花rdd。RDD$anonfun$foreach$2$adapted(RDD.scala:1012)位于org。阿帕奇。火花SparkContext$org上的anonfun$runJob$5(SparkContext.scala:2236)。阿帕奇。火花调度程序。结果任务。在组织上运行任务(ResultTask.scala:90)。阿帕奇。火花调度程序。任务在组织上运行(Task.scala:131)。阿帕奇。火花执行人。执行者$TaskRunner$anonfun$run$3(Executor.scala:497)位于org。阿帕奇。火花util。Utils美元。tryWithSafeFinally(Utils.scala:1439)位于org。阿帕奇。火花执行人。执行者$TaskRunner。在java上运行(Executor.scala:500)。util。同时发生的线程池执行器。java上的runWorker(ThreadPoolExecutor.java:1149)。util。同时发生的ThreadPoolExecutor$工作者。在java上运行(ThreadPoolExecutor.java:624)。lang.Thread。运行(Thread.java:748)
你想做的事不太值得推荐。如果要使用数据帧,请使用Spark结构化流。您正在尝试在foreach RDD操作中创建DF。如果您使用旧的Spark流媒体版本,请使用RDD。
您可以在“foreachRDD”中创建一个DF,为每个小批次创建一个RDD,但这不是一个好主意。如果您进行测试,您将看到许多火花阶段来创建新的DF,并且对于每个小批次...使用结构化流,您可以直接创建数据帧。
问题内容: 如何获得WPF DataGrid将更改保存回数据库? 我已经将DataGrid控件数据绑定到一个DataTable对象,并使用一个非常简单的SELECT查询填充该表,该查询检索了一些基本信息。数据在控件中显示得很好。 但是,当我使用控件来编辑数据时,所做的更改不会被推回到数据库中。 有人知道我在想什么吗? 问题答案: 执行更新 当用户在DataGrid中编辑客户数据时,绑定的内存数据表
问题内容: 这是我的类,用于从数据库中获取数据 这是我的文件: 当我运行该程序时,出现异常后,请帮助我如何解决它。我是Hibernate的新手,尝试学习但被卡住了。 虽然我能够将数据存储在数据库中,但我有2个用于第一和第二类的数据获取数据,但在获取数据时遇到了问题PLZ帮助。 问题答案: 让我引述一下: 据我所知,您正在使用表名。 所以应该是这样的:
我正在尝试使用 考虑以下代码 每当我试图打印出的结果时,它总是返回 即使我返回一个json对象。 如何打印的结果?
我想获取日期字段,但下面的代码没有这样做。Toast显示null(字符串日期的值)。 公共字符串日期;
问题内容: 我正在基于CodeIgniter的应用程序。这里的代码: 控制器 : 型号 : 查看 : Javascript : 上面的代码运行良好。 现在,我想通过ajax请求将数据提取到。我已经从url创建了一个ajax请求,可以从这里,where 和进行说。 Ajax响应产生: 问题 如何使用Ajax数据源配置数据表,以及如何将数据显示到表中,所以我可以使用数据例如创建类似代码的链接 问题答案
问题内容: 我是Android编程的新手,我想找到如何将json中的数据存储在android的sqlite中。我没有在互联网上找到任何有用的示例,这就是为什么我在这里问。如果您向我展示一个简单的举个例子,我会很高兴。非常感谢! 问题答案: 创建一个具有JSON模型属性的模型类。 在您的模型中创建一个接受(搜索Android开发人员参考)的构造函数。 在构造函数中,使用方法和读取属性的值。 您也可以