现在,我试图做相同的架构在Streread上。
val jsonSchema = StructType([ StructField("associatedEntities", struct<driver:StringType,truck:StringType>, True),
StructField("heading", StringType, True),
StructField("location", struct<accuracyType:StringType,captureDateTime:StringType,cityStateCode:StringType,description:StringType,latitude:DoubleType,longitude:DoubleType,quality:StringType,transmitDateTime:StringType>, True),
StructField("measurements", array<struct<type:StringType,uom:StringType,value:StringType>>, True),
StructField("source", struct<entityType:StringType,key:StringType,vendor:StringType>, True),
StructField("speed", DoubleType, True)])
val df = spark
.readStream
.format("eventhubs")
//.schema(jsonSchema)
.options(ehConf.toMap)
.load()
当我在笔记本中运行此单元格时:15:错误:简单表达式val jsonSchema=StructType([StructField(“associatedEntities”,struct,True),”的非法开始
编辑:目标是将数据放入数据框中。我可以从event hub消息的主体中获取json字符串,但如果无法使模式正常工作,我不确定该怎么做。
由于您的架构定义,您会收到错误消息。模式定义应该如下所示:
import org.apache.spark.sql.types._
val jsonSchema = StructType(
Seq(StructField("associatedEntities",
StructType(Seq(
StructField("driver", StringType),
StructField ("truck", StringType)
))),
StructField("heading", StringType),
StructField("measurements", ArrayType(StructType(Seq(StructField ("type", StringType), StructField ("uom", StringType), StructField("value", StringType)))))
)
)
您可以通过以下方式对模式进行双重检查:
jsonSchema.printTreeString
返回模式:
root
|-- associatedEntities: struct (nullable = true)
| |-- driver: string (nullable = true)
| |-- truck: string (nullable = true)
|-- heading: string (nullable = true)
|-- measurements: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- type: string (nullable = true)
| | |-- uom: string (nullable = true)
| | |-- value: string (nullable = true)
正如在评论中提到的,你会得到二进制数据。因此,首先您将获得原始数据帧:
val rawData = spark.readStream
.format("eventhubs")
.option(...)
.load()
你必须:
用解析的数据定义数据帧:
val parsedData = rawData
.selectExpr("cast (Body as string) as json")
.select(from_json($"json", jsonSchema).as("data"))
.select("data.*")
本文向大家介绍nodejs 中模拟实现 emmiter 自定义事件,包括了nodejs 中模拟实现 emmiter 自定义事件的使用技巧和注意事项,需要的朋友参考一下 nodejs 中模拟实现 emmiter 自定义事件
在App开发中,经常会遇到页面间传值的需求,比如从新闻列表页进入详情页,需要将新闻id传递过去; Html5Plus规范设计了evalJS方法来解决该问题; 但evalJS方法仅接收字符串参数,涉及多个参数时,需要开发人员手动拼字符串; 为简化开发,mui框架在evalJS方法的基础上,封装了自定义事件,通过自定义事件,用户可以轻松实现多webview间数据传递。 仅能在5+ App及流应用中使用
自定义事件主要会被用于框架、组件设计与实现中。 自定义的事件有许多的创建方式,但实际的业务场景中几乎不会被用到,网络上的文献记载其具体的使用场景也相对较少。 1. 使用 Event 构造函数 使用 Event 构造函数就可以创建一个自定义事件。 案例演示 预览 复制 复制成功! <style> .btn { border: 1px solid #4caf50; padding: 8px 12p
本文向大家介绍js事件模型与自定义事件实例解析,包括了js事件模型与自定义事件实例解析的使用技巧和注意事项,需要的朋友参考一下 JavaScript 一个最简单的事件模型,需要有事件绑定与触发,还有事件删除。 其中主要实现了bind(绑定事件)、unbind(删除事件)与 trigger (触发事件)。对同一事件名称,可以绑定多个事件处理函数;并按照绑定的顺序依次触发。 args.splice(0
我使用CloudWatchLogs作为源,使用lambda函数调用事件作为目标。 在当前的方法中,每当创建日志流get时,lambda就会激发。但我想要的是,在Cloudwatch规则中编写一个自定义事件模式,这样,只有当日志消息包含异常或错误时,它才会触发lambda。这可能吗? 因为目前我从lambda中的CloudwatchLogs获取的事件不包含有关日志消息的任何详细信息。因此,我如何创建
关于在物联网场景中使用EventHub的Azure功能,我有几个问题。 EventHub有分区。通常来自特定设备的消息会发送到同一个分区。Azure Function的实例如何分布在EventHub分区中?它是基于性能的吗?如果Azure Function的一个实例设法处理来自所有分区的事件,那么这就足够了,否则每个EventHub分区可能会有一个Azure Function的实例? 读取偏移量呢