当前位置: 首页 > 知识库问答 >
问题:

事件中心没有输出

冷正青
2023-03-14

我正在尝试从事件中心读取数据,但结果它只返回空值。

我将一个数据帧转换为json以发送到eventhub

dfSource_pandas = df.toPandas()
type(dfSource_pandas)
payload = dfSource_pandas.to_json(orient="records")
print(payload)

这是数据的模式

root
|-- alertId: string (nullable = true)
|-- score: double (nullable = true)
|-- severity: double (nullable = true)

我正在尝试使用从eventhub读取数据

from pyspark.sql import functions as F

从 pyspark.sql.类型导入数组类型, 双类型, 结构类型, 结构字段,
字符串类型, 长类型, 布尔类型

schema = StructType([
StructField("alertId", StringType(), True),
StructField("score", DoubleType(), True),
StructField("severity", DoubleType(), True),
])


from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import *
df = spark.readStream.format("eventhubs").options(**ehConf).load()
stream_data_body=
df.select(from_json(df.body.cast('string'),schema).\
alias('body')).select("body.*")

共有1个答案

牟稳
2023-03-14

看起来您的有效负载中有一个记录列表,而不是单个记录。在这种情况下,您可以尝试以下模式:

schema = StructType([
  StructField("alerts", ArrayType(
    StructType([
      StructField("alertId", StringType(), True),
      StructField("score", DoubleType(), True),
      StructField("severity", DoubleType(), True),
    ])))])

然后为了得到单独的记录,你需要分解你的有效载荷——就像这样:

from pyspark.sql.functions import from_json, explode

stream_data = df.select(from_json(df.body.cast('string'),schema).alias('body')) \
  .select("body.*") \
  .select(explode("alerts").alias("alert")) \
  .select("alert.*")
 类似资料:
  • 我正在通过HTTP POST将消息发送到我的Azure事件中心(用于IoT中心和Blob存储)。 我试过: 不同的SAS密钥和所有者 不同的标题以及没有标题 找到这个问题后,我添加了标题。我还确保检查了权限,就像那里建议的那样,但是它们已经设置到了必要的级别。 我的问题: 1.是否需要手动添加同名服务总线终结点?我假设当你创建一个事件中心时,它会自动为你创建服务总线。 2。有没有可能我看到的请求和

  • 我是Apache Flink CEP的新手,我正在努力检测一个简单的事件缺失。 我试图检测的是,具有特定ID的CurrencyEvent类型的事件在一定时间内不会发生。我想每次在3000ms之后事件没有发生时检测没有这样的事件。 我的模式代码如下所示: 所以现在我的想法是使用超时函数来检测超时事件: 我的测试源使用事件时间戳和水印,如下所示: 我在用TimeCharacteristics.Even

  • 本节描述了SQLAlchemy核心中提供的事件接口。有关事件侦听API的介绍,请参阅 事件 . ORM事件在 ORM事件 . Object Name Description Events 为特定目标类型定义事件侦听函数。 class sqlalchemy.event.base.Events Object Name Description PoolEvents 的可用事件 Pool . class

  • 我已经实现了下面链接中的代码,用于从事件中心接收事件。但是假设有10个事件,每5个事件检查一次。现在程序在读取第7个事件时异常退出,如果我再次重启事件处理器主机,那么事件(1,2,3,4,6)将被重新读取。请建议我如何再次避免重读和阅读第7次事件?任何例子都值得欣赏。谢了。 https://github.com/Azure/azure-event-hubs/blob/master/samples/

  • 因此,我将替换为,就像在这个文档示例中一样(具有更高的maxOutOfOrderness延迟),以便处理乱序事件,但我仍然无法获得任何输出。这是为什么?

  • 问题内容: 这是我的代码。我正在尝试使用箭头键使球移动。当我运行上述程序时 ,不显示球 ( 如果我将坐标更改为显示0,30球), 并且事件未触发,球既不移动也不跳跃 ? 问题答案: