当前位置: 首页 > 面试题库 >

如何解码List的byte [] 到数据集 在火花?

劳灵均
2023-03-14
问题内容

我在我的项目中将spark-sql-2.3.1v和kafka与java8一起使用。我正在尝试将主题接收的byte []转换为kafka使用者方面的数据集。

这是详细信息

我有

class Company{
    String companyName;
    Integer companyId;
}

我定义为

public static final StructType companySchema = new StructType(
              .add("companyName", DataTypes.StringType)
              .add("companyId", DataTypes.IntegerType);

但是消息定义为

class Message{
    private List<Company> companyList;
    private String messageId;
}

我试图定义为

StructType messageSchema = new StructType()
            .add("companyList", DataTypes.createArrayType(companySchema , false),false)
            .add("messageId", DataTypes.StringType);

我使用序列化将消息作为byte []发送到kafka主题。

我在Consumer上成功接收到消息字节[]。我正在尝试将其转换为数据集?? 怎么做 ?

   Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");

  messagesDs.printSchema();

  root
         |-- companyList: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- companyName: string (nullable = true)
         |    |    |-- companyId: integer (nullable = true)
         |-- messageId: string (nullable = true)

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));

comapanyListDs.printSchema();

root
 |-- col: struct (nullable = true)
 |    |-- companyName: string (nullable = true)
 |    |-- companyId: integer (nullable = true)



Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));

出现错误:

线程“主”中的异常org.apache.spark.sql.AnalysisException:无法解析companyName给定的输入列:[col];

如何获取数据集记录,如何获取?


问题答案:

爆炸时,您的结构以“ col”命名。

由于您的Bean类没有“ col”属性,因此失败,并提到了错误。

线程“主要” org.apache.spark.sql.AnalysisException中的异常:在给定输入列的情况下,无法解析“
companyName”:[col];

您可以执行以下选择以使相关列作为普通列:诸如此类:

    Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));

我还没有测试语法,但是一旦从struct的每一行中获取普通列,都必须立即进行下一步。



 类似资料:
  • 我希望将传入的Netty的消息转换为我的类的实例。为此,我使用以下: 现在,由于消息是引用计数的,我们必须在处理完它之后释放它。这是由我们正在扩展的自动完成的。 现在,既然ButeBuf已经发布,这难道不意味着支持阵列处于危险之中吗?它将被回收,我不确定我会在我的MyBuffer的数组中看到什么。 这个解码器可以安全使用吗?这是正确的编写方式吗?

  • 我在spark中有一个数据集,只有一列,这列是一个Map[String,Any]。我想逐行映射数据集,然后逐键映射映射映射列,计算每个键的值,并使用新数据生成与前一个相同类型的新数据集。 例如: 我想在每个值的末尾加上“”,结果将是一个数据类型的数据集,如下所示: 谢谢Nir

  • 我想将包含字符串记录的RDD转换为Spark数据帧,如下所示。 模式行不在同一个中,而是在另一个变量中: 所以现在我的问题是,我如何使用上面两个,在Spark中创建一个数据帧?我使用的是Spark 2.2版。 我确实搜索并看到了一篇帖子:我可以使用spack-csv将表示为字符串的CSV读取到Apache Spark中吗?然而,这并不是我所需要的,我也无法找到一种方法来修改这段代码以在我的情况下工

  • 从Spark 1.6迁移到Spark 2.2*会在尝试对查询拼花地板表返回的数据集应用方法时出现错误“错误:无法为“数据集”中存储的类型找到编码器。基本类型(Int、String等)。为了证明同样的错误,我对代码进行了过度简化。代码查询拼花地板文件以返回以下数据类型:“org”。阿帕奇。火花sql。Dataset[org.apache.spark.sql.Row]“我应用一个函数来提取字符串和整数

  • 本文向大家介绍如何去除Java中List集合中的重复数据,包括了如何去除Java中List集合中的重复数据的使用技巧和注意事项,需要的朋友参考一下 1.循环list中的所有元素然后删除重复 总结: 两层循环,外层循环从第一个元素向最后一个元素循环,内层循环是从最后一个元素向外层循环元素的当前元素循环。比较两个元素是否相等,如果相等,移除靠后的元素来进行去重。这种方法时间复杂度大于O(n),小于O(

  • 在Flutter中解析我的时出现了一个问题。这是我的JSON。 我已经为这个数据生成了一个模型,它工作得很完美。 因此,在每个索引中都有另一个列表,我需要将其保存在InsurancePolicyType对象中。这就是我试图获取的数据。如何在代码中获取?我需要去拿那张名单,但我没能去。