我在我的项目中将spark-sql-2.3.1v和kafka与java8一起使用。我正在尝试将主题接收的byte []转换为kafka使用者方面的数据集。
这是详细信息
我有
class Company{
String companyName;
Integer companyId;
}
我定义为
public static final StructType companySchema = new StructType(
.add("companyName", DataTypes.StringType)
.add("companyId", DataTypes.IntegerType);
但是消息定义为
class Message{
private List<Company> companyList;
private String messageId;
}
我试图定义为
StructType messageSchema = new StructType()
.add("companyList", DataTypes.createArrayType(companySchema , false),false)
.add("messageId", DataTypes.StringType);
我使用序列化将消息作为byte []发送到kafka主题。
我在Consumer上成功接收到消息字节[]。我正在尝试将其转换为数据集?? 怎么做 ?
Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");
messagesDs.printSchema();
root
|-- companyList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- companyId: integer (nullable = true)
|-- messageId: string (nullable = true)
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));
comapanyListDs.printSchema();
root
|-- col: struct (nullable = true)
| |-- companyName: string (nullable = true)
| |-- companyId: integer (nullable = true)
Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));
出现错误:
线程“主”中的异常org.apache.spark.sql.AnalysisException:无法解析companyName
给定的输入列:[col];
如何获取数据集记录,如何获取?
爆炸时,您的结构以“ col”命名。
由于您的Bean类没有“ col”属性,因此失败,并提到了错误。
线程“主要” org.apache.spark.sql.AnalysisException中的异常:在给定输入列的情况下,无法解析“
companyName”:[col];
您可以执行以下选择以使相关列作为普通列:诸如此类:
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));
我还没有测试语法,但是一旦从struct的每一行中获取普通列,都必须立即进行下一步。
我希望将传入的Netty的消息转换为我的类的实例。为此,我使用以下: 现在,由于消息是引用计数的,我们必须在处理完它之后释放它。这是由我们正在扩展的自动完成的。 现在,既然ButeBuf已经发布,这难道不意味着支持阵列处于危险之中吗?它将被回收,我不确定我会在我的MyBuffer的数组中看到什么。 这个解码器可以安全使用吗?这是正确的编写方式吗?
我在spark中有一个数据集,只有一列,这列是一个Map[String,Any]。我想逐行映射数据集,然后逐键映射映射映射列,计算每个键的值,并使用新数据生成与前一个相同类型的新数据集。 例如: 我想在每个值的末尾加上“”,结果将是一个数据类型的数据集,如下所示: 谢谢Nir
我想将包含字符串记录的RDD转换为Spark数据帧,如下所示。 模式行不在同一个中,而是在另一个变量中: 所以现在我的问题是,我如何使用上面两个,在Spark中创建一个数据帧?我使用的是Spark 2.2版。 我确实搜索并看到了一篇帖子:我可以使用spack-csv将表示为字符串的CSV读取到Apache Spark中吗?然而,这并不是我所需要的,我也无法找到一种方法来修改这段代码以在我的情况下工
从Spark 1.6迁移到Spark 2.2*会在尝试对查询拼花地板表返回的数据集应用方法时出现错误“错误:无法为“数据集”中存储的类型找到编码器。基本类型(Int、String等)。为了证明同样的错误,我对代码进行了过度简化。代码查询拼花地板文件以返回以下数据类型:“org”。阿帕奇。火花sql。Dataset[org.apache.spark.sql.Row]“我应用一个函数来提取字符串和整数
本文向大家介绍如何去除Java中List集合中的重复数据,包括了如何去除Java中List集合中的重复数据的使用技巧和注意事项,需要的朋友参考一下 1.循环list中的所有元素然后删除重复 总结: 两层循环,外层循环从第一个元素向最后一个元素循环,内层循环是从最后一个元素向外层循环元素的当前元素循环。比较两个元素是否相等,如果相等,移除靠后的元素来进行去重。这种方法时间复杂度大于O(n),小于O(
在Flutter中解析我的时出现了一个问题。这是我的JSON。 我已经为这个数据生成了一个模型,它工作得很完美。 因此,在每个索引中都有另一个列表,我需要将其保存在InsurancePolicyType对象中。这就是我试图获取的数据。如何在代码中获取?我需要去拿那张名单,但我没能去。