当前位置: 首页 > 知识库问答 >
问题:

如何在流查询中使用from_json标准函数(在选择中)?

谢铭
2023-03-14

我使用以下JSON结构处理来自Kafka的消息:

{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}

我想打印出我收到的内容。这是我已经完成的代码片段:

JavaSparkContext sc = createJavaSparkContext();
JavaStreamingContext streamingContext =
                new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));

SparkSession sparkSession = SparkSession
        .builder()
        .config(new SparkConf())
        .getOrCreate();

Dataset<Row> df = sparkSession
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)
        .option("subscribe", KAFKA_TOPIC)
        .load();

StreamingQuery query = df.selectExpr("CAST(value AS STRING)")
            .select(from_json(new Column("value"), getSchema())).as("data").
                    select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {
                @Override
                public void process(Row value) {
                    System.out.println(value);
                }

                @Override
                public void close(Throwable errorOrNull) {

                }

                @Override
                public boolean open(long partitionId, long version) {
                    return true;
                }
            })
            .start();

    query.awaitTermination();

模式方法:

private static StructType getSchema() {
    return new StructType(new StructField[]{
            new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),
            new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),
            new StructField(IP, DataTypes.StringType, false, Metadata.empty()),
            new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),
    });
}

问题是,我在从Spark编写时不断收到错误:

线程“main”组织中出现异常。阿帕奇。火花sql。AnalysisException:无法解析数据。category\u id'给定的输入列:[JSONTStruct(value)];;'项目['data.category\u id]-子QueryAlias数据-项目[JSONTStruct(StructField(unix\u时间,TimestampType,false),StructField(category\u id,IntegerType,false),StructField(ip,StringType,false),StructField(type,StringType,false),value#15)作为JSONTStruct(value)#18]

如何克服这个问题?有什么建议吗?

共有1个答案

华子航
2023-03-14

这部分异常告诉您在哪里寻找答案:

无法解析“”数据。“category\u id”给定的输入列:[JSONTStruct(值)]

换句话说,没有列<代码>数据。category\u id在可用列中,只有一列是jsontostruct(value)。

这意味着流式查询中的onlyselect不起作用。原因很简单(我可以称之为输入错误)——在列和数据集类型上,前面有太多的右括号作为(“数据”)。

总之,替换查询的以下部分:

.select(from_json(new Column("value"), getSchema())).as("data")

以下内容:

.select(from_json(new Column("value"), getSchema()).as("data"))

请注意,我将一个闭合支架移到了末端。

 类似资料:
  • 我在这里尝试使用光标,我想知道如何访问选择列中的光标字段? 我有一个如下的实现, 我想明白,这个实现有什么问题。 谢谢你。

  • 问题内容: 我有一个正在处理的查询,我想增加一个字段并在键值不同时重新启动计数器。 我知道这段代码行不通。以编程方式,这就是我想要的… …最终结果看起来像这样: 是的,我坚持使用SQL2k。否则,该row_number()将起作用。 问题答案: 假设一个表: 在Microsoft SQL Server 2000中获得此权限的一种方法是使用子查询对具有相同ID和较低顺序的行进行计数。 提示: 现在是

  • 问题内容: 我已经使用预备语句尝试了几次,但是它返回SQL异常。这是我的代码: 运行此程序时,出现以下SQL异常: 有什么建议可以使这项工作吗?任何代码都值得赞赏。 问题答案: 您需要使用: 代替 当您将字符串传递给 该 查询时,将按字面意义执行查询,因此将其发送到数据库,然后数据库会产生错误。通过传递查询字符串,您不会执行传递值的“已缓存”准备语句。

  • 我试图使用JPA标准编写以下查询,但我无法选择子查询中的多列。 我陷入了下面的实现过程中,无法找到如何在子查询中选择多个列。请看我在代码中的评论(第三行)。 请帮我解决这个问题。

  • 问题内容: 我有一个查询,看起来像这样: 这是一个非常基本的查询。除了提取Item的值之外,我还想将其他值添加到混合中,然后将其返回给我。在原始SQL中,我会这样做: 如何通过sqlalchemy手动添加该值? 问题答案: 您需要使用,看起来有点像这样: 注意,该参数无需任何转换即可插入查询;如果您从应用程序外部接受text参数的值,则可能使您暴露于SQL Injection漏洞。如果这是您需要的

  • 问题内容: 在MS Access中,我想在选择查询的返回结果中插入新列。新列的每一行都具有相同的值。例如,我的选择返回列A,B,而我希望C成为选择查询创建的新列: 问题答案: