我使用以下JSON结构处理来自Kafka的消息:
{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}
我想打印出我收到的内容。这是我已经完成的代码片段:
JavaSparkContext sc = createJavaSparkContext();
JavaStreamingContext streamingContext =
new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));
SparkSession sparkSession = SparkSession
.builder()
.config(new SparkConf())
.getOrCreate();
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)
.option("subscribe", KAFKA_TOPIC)
.load();
StreamingQuery query = df.selectExpr("CAST(value AS STRING)")
.select(from_json(new Column("value"), getSchema())).as("data").
select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {
@Override
public void process(Row value) {
System.out.println(value);
}
@Override
public void close(Throwable errorOrNull) {
}
@Override
public boolean open(long partitionId, long version) {
return true;
}
})
.start();
query.awaitTermination();
模式方法:
private static StructType getSchema() {
return new StructType(new StructField[]{
new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),
new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),
new StructField(IP, DataTypes.StringType, false, Metadata.empty()),
new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),
});
}
问题是,我在从Spark编写时不断收到错误:
线程“main”组织中出现异常。阿帕奇。火花sql。AnalysisException:无法解析数据。category\u id'给定的输入列:[JSONTStruct(value)];;'项目['data.category\u id]-子QueryAlias数据-项目[JSONTStruct(StructField(unix\u时间,TimestampType,false),StructField(category\u id,IntegerType,false),StructField(ip,StringType,false),StructField(type,StringType,false),value#15)作为JSONTStruct(value)#18]
如何克服这个问题?有什么建议吗?
这部分异常告诉您在哪里寻找答案:
无法解析“”数据。“category\u id”给定的输入列:[JSONTStruct(值)]
换句话说,没有列<代码>数据。category\u id在可用列中,只有一列是jsontostruct(value)。
这意味着流式查询中的onlyselect
不起作用。原因很简单(我可以称之为输入错误)——在列和数据集类型上,前面有太多的右括号作为(“数据”)。
总之,替换查询的以下部分:
.select(from_json(new Column("value"), getSchema())).as("data")
以下内容:
.select(from_json(new Column("value"), getSchema()).as("data"))
请注意,我将一个闭合支架移到了末端。
我在这里尝试使用光标,我想知道如何访问选择列中的光标字段? 我有一个如下的实现, 我想明白,这个实现有什么问题。 谢谢你。
问题内容: 我有一个正在处理的查询,我想增加一个字段并在键值不同时重新启动计数器。 我知道这段代码行不通。以编程方式,这就是我想要的… …最终结果看起来像这样: 是的,我坚持使用SQL2k。否则,该row_number()将起作用。 问题答案: 假设一个表: 在Microsoft SQL Server 2000中获得此权限的一种方法是使用子查询对具有相同ID和较低顺序的行进行计数。 提示: 现在是
问题内容: 我已经使用预备语句尝试了几次,但是它返回SQL异常。这是我的代码: 运行此程序时,出现以下SQL异常: 有什么建议可以使这项工作吗?任何代码都值得赞赏。 问题答案: 您需要使用: 代替 当您将字符串传递给 该 查询时,将按字面意义执行查询,因此将其发送到数据库,然后数据库会产生错误。通过传递查询字符串,您不会执行传递值的“已缓存”准备语句。
我试图使用JPA标准编写以下查询,但我无法选择子查询中的多列。 我陷入了下面的实现过程中,无法找到如何在子查询中选择多个列。请看我在代码中的评论(第三行)。 请帮我解决这个问题。
问题内容: 我有一个查询,看起来像这样: 这是一个非常基本的查询。除了提取Item的值之外,我还想将其他值添加到混合中,然后将其返回给我。在原始SQL中,我会这样做: 如何通过sqlalchemy手动添加该值? 问题答案: 您需要使用,看起来有点像这样: 注意,该参数无需任何转换即可插入查询;如果您从应用程序外部接受text参数的值,则可能使您暴露于SQL Injection漏洞。如果这是您需要的
问题内容: 在MS Access中,我想在选择查询的返回结果中插入新列。新列的每一行都具有相同的值。例如,我的选择返回列A,B,而我希望C成为选择查询创建的新列: 问题答案: