当前位置: 首页 > 知识库问答 >
问题:

如何编写数据集编码器以支持将函数映射到组织。阿帕奇。火花sql。Scala Spark中的数据集[字符串]

狄子真
2023-03-14

从Spark 1.6迁移到Spark 2.2*会在尝试对查询拼花地板表返回的数据集应用html" target="_blank">方法时出现错误“错误:无法为“数据集”中存储的类型找到编码器。基本类型(Int、String等)。为了证明同样的错误,我对代码进行了过度简化。代码查询拼花地板文件以返回以下数据类型:“org”。阿帕奇。火花sql。Dataset[org.apache.spark.sql.Row]“我应用一个函数来提取字符串和整数,并返回一个字符串。返回以下数据类型:Array[String]接下来,我需要执行大量操作,需要一个单独的函数。在这个测试函数中,我尝试附加一个字符串,产生与我的详细示例相同的错误。我尝试了一些编码器示例和“case”的使用,但没有找到可行的解决方案。如有任何建议/示例,将不胜感激

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, 
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x" 
(1)+","+s.getDecimal(1);

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The 
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)

scala> def dd(s:String){
 | s + "some string"
 | }
dd: (s: String)Unit

scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support 
for serializing other types will be added in future releases.

为了进一步提炼问题,我相信这个场景(尽管我还没有尝试所有可能的解决方案)可以进一步简化为以下代码:

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String){
 | s + "hi"
 | }
f: (s: String)Unit

scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
   var test2 = test.map{ s => f(s) }

共有2个答案

刘兴修
2023-03-14

第一个解决方案不适用于我的初始(生产)数据集,而是产生错误“org.apache.spark.SparkException:Task not serializable”(有趣的是,两者都存储为相同的数据类型(org.apache.spark.sql.Dataset[String]=[value:String]),我认为这是相关的。我为我的测试数据集提供了另一个解决方案,它消除了最初的编码器错误,如图所示,它实际上可以解决我的玩具问题,而不是生产数据集。我有点困惑,为什么我的应用程序在从1.6版到2.3版spark的过程中被搁置一边,因为我多年来都不需要对我的应用程序进行任何特殊调整,并且已经成功地运行了它,以进行最有可能数万亿次的计算。其他探索包括将我的方法包装为可序列化,探索@transient关键字,利用“org.apache.spark.serializer.KryoSerializer”,将我的方法编写为函数,并将所有变量更改为“vals”(见“stack”上的相关文章)。

scala>  import spark.implicits._
import spark.implicits._

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 |   val r = s + "hi"
 |   return r
 |   }
 f: (s: String)String

 scala> var d2 =  test.map{s => f(s)}(Encoders.STRING)
 d2: org.apache.spark.sql.Dataset[String] = [value: string]

 scala> d2.take(1)
 res0: Array[String] = Array(just some wordshi)

斯卡拉

仲绍晖
2023-03-14

我至少有一个简单问题的解决方案(见下文)。我会测试更多。。。。

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 | val r = s + "hi"
 | return r
 | }
f: (s: String)String

scala> var test2 = test.rdd.map{ s => f(s) }
test2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:43

scala> test2.take(1)
res9: Array[String] = Array(just some wordshi)
 类似资料:
  • 我在spark中有一个数据集,只有一列,这列是一个Map[String,Any]。我想逐行映射数据集,然后逐键映射映射映射列,计算每个键的值,并使用新数据生成与前一个相同类型的新数据集。 例如: 我想在每个值的末尾加上“”,结果将是一个数据类型的数据集,如下所示: 谢谢Nir

  • 问题内容: 我在我的项目中将spark-sql-2.3.1v和kafka与java8一起使用。我正在尝试将主题接收的byte []转换为kafka使用者方面的数据集。 这是详细信息 我有 我定义为 但是消息定义为 我试图定义为 我使用序列化将消息作为byte []发送到kafka主题。 我在Consumer上成功接收到消息字节[]。我正在尝试将其转换为数据集?? 怎么做 ? 出现错误: 线程“主”

  • scala的版本是2.11.8;jdk为1.8;spark是2.0.2 我试图在spark apache的官方网站上运行一个LDA模型的示例,我从以下句子中得到了错误消息: 错误按摩是: 错误:(49,25)读取的值不是组织的成员。阿帕奇。火花SparkContext val dataset=spark。阅读格式(“libsvm”)^ 我不知道怎么解决。

  • 问题内容: 当前,我有一堆实现接口的Java类,这意味着它们都具有方法。这个想法是,每个类都有几个(例如<10)成员,并且每个类都通过方法映射到该类中的方法,如下所示: 你明白了。 这对我来说很好,但是现在我需要一个从键到函数的运行时可访问的映射。并非每个函数 实际上都 返回一个String(有些返回void),并且我需要动态地访问每个具有键的类中每个函数的返回类型(使用反射)。我已经有一位经理,

  • 我有一组数字要核对。如果数字是偶数,程序将检查最大的奇数分频器相等。例如,如果数字是12,程序将返回类似[3,3,3,3]的数组,或者对于36,它将是[9,9,9,9],对于54,[27,27]和56,[7,7,7,7,7,7,7]等等。我正在用ruby写我的代码。我不知道如何写出正确的算法。任何帮助都将不胜感激。

  • 问题内容: 我想为我正在执行的映射操作在DataSet中为Row类型编写一个编码器。本质上,我不了解如何编写编码器。 以下是地图操作的示例: 我知道,编码器需要编写如下字符串,而不是字符串: 但是,我不了解编码器中的clsTag(),并且我试图找到一个可以演示类似内容的运行示例(即,用于行类型的编码器) 编辑- 这不是所提问题的副本:尝试将数据框行映射到更新行时出现编码器错误,因为答案涉及在Spa