当前位置: 首页 > 知识库问答 >
问题:

Spark Structured Streaming Databricks事件中心模式定义问题

和斌
2023-03-14

现在,我试图做相同的架构在Streread上。

val jsonSchema = StructType([ StructField("associatedEntities", struct<driver:StringType,truck:StringType>, True), 
                          StructField("heading", StringType, True), 
                          StructField("location", struct<accuracyType:StringType,captureDateTime:StringType,cityStateCode:StringType,description:StringType,latitude:DoubleType,longitude:DoubleType,quality:StringType,transmitDateTime:StringType>, True), 
                          StructField("measurements", array<struct<type:StringType,uom:StringType,value:StringType>>, True), 
                          StructField("source", struct<entityType:StringType,key:StringType,vendor:StringType>, True), 
                          StructField("speed", DoubleType, True)])

val df = spark
 .readStream
 .format("eventhubs")
 //.schema(jsonSchema) 
 .options(ehConf.toMap)
 .load()

当我在笔记本中运行此单元格时:15:错误:简单表达式val jsonSchema=StructType([StructField(“associatedEntities”,struct,True),”的非法开始

编辑:目标是将数据放入数据框中。我可以从event hub消息的主体中获取json字符串,但如果无法使模式正常工作,我不确定该怎么做。

共有1个答案

许波涛
2023-03-14

由于您的架构定义,您会收到错误消息。模式定义应该如下所示:

import org.apache.spark.sql.types._

val jsonSchema = StructType(
                        Seq(StructField("associatedEntities", 
                                        StructType(Seq(
                                          StructField("driver", StringType), 
                                          StructField ("truck", StringType)
                                        ))),
                            StructField("heading", StringType),
                            StructField("measurements", ArrayType(StructType(Seq(StructField ("type", StringType), StructField ("uom", StringType), StructField("value", StringType)))))
                           )
                         )

您可以通过以下方式对模式进行双重检查:

jsonSchema.printTreeString

返回模式:

root
 |-- associatedEntities: struct (nullable = true)
 |    |-- driver: string (nullable = true)
 |    |-- truck: string (nullable = true)
 |-- heading: string (nullable = true)
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- uom: string (nullable = true)
 |    |    |-- value: string (nullable = true)

正如在评论中提到的,你会得到二进制数据。因此,首先您将获得原始数据帧:

val rawData = spark.readStream
  .format("eventhubs")
  .option(...)
  .load()

你必须:

  • 将数据转换为字符串
  • 解析嵌套的json
  • 把它压平

用解析的数据定义数据帧:

val parsedData = rawData
   .selectExpr("cast (Body as string) as json")
   .select(from_json($"json", jsonSchema).as("data"))
   .select("data.*")
 类似资料:
  • 本文向大家介绍nodejs 中模拟实现 emmiter 自定义事件,包括了nodejs 中模拟实现 emmiter 自定义事件的使用技巧和注意事项,需要的朋友参考一下 nodejs 中模拟实现 emmiter 自定义事件

  • 在App开发中,经常会遇到页面间传值的需求,比如从新闻列表页进入详情页,需要将新闻id传递过去; Html5Plus规范设计了evalJS方法来解决该问题; 但evalJS方法仅接收字符串参数,涉及多个参数时,需要开发人员手动拼字符串; 为简化开发,mui框架在evalJS方法的基础上,封装了自定义事件,通过自定义事件,用户可以轻松实现多webview间数据传递。 仅能在5+ App及流应用中使用

  • 自定义事件主要会被用于框架、组件设计与实现中。 自定义的事件有许多的创建方式,但实际的业务场景中几乎不会被用到,网络上的文献记载其具体的使用场景也相对较少。 1. 使用 Event 构造函数 使用 Event 构造函数就可以创建一个自定义事件。 案例演示 预览 复制 复制成功! <style> .btn { border: 1px solid #4caf50; padding: 8px 12p

  • 本文向大家介绍js事件模型与自定义事件实例解析,包括了js事件模型与自定义事件实例解析的使用技巧和注意事项,需要的朋友参考一下 JavaScript 一个最简单的事件模型,需要有事件绑定与触发,还有事件删除。 其中主要实现了bind(绑定事件)、unbind(删除事件)与 trigger (触发事件)。对同一事件名称,可以绑定多个事件处理函数;并按照绑定的顺序依次触发。 args.splice(0

  • 我使用CloudWatchLogs作为源,使用lambda函数调用事件作为目标。 在当前的方法中,每当创建日志流get时,lambda就会激发。但我想要的是,在Cloudwatch规则中编写一个自定义事件模式,这样,只有当日志消息包含异常或错误时,它才会触发lambda。这可能吗? 因为目前我从lambda中的CloudwatchLogs获取的事件不包含有关日志消息的任何详细信息。因此,我如何创建

  • 关于在物联网场景中使用EventHub的Azure功能,我有几个问题。 EventHub有分区。通常来自特定设备的消息会发送到同一个分区。Azure Function的实例如何分布在EventHub分区中?它是基于性能的吗?如果Azure Function的一个实例设法处理来自所有分区的事件,那么这就足够了,否则每个EventHub分区可能会有一个Azure Function的实例? 读取偏移量呢