当前位置: 首页 > 知识库问答 >
问题:

Spark Structured Streaming——有状态流处理中带有窗口操作的事件处理

包唯
2023-03-14

我是Spark结构化流处理的新手,目前正在处理一个用例,其中结构化流应用程序将从Azure IoT中心-事件中心(例如每20秒)获取事件。

任务是使用这些事件并实时处理。为此,我在下面用Spark Java编写了Spark结构化流媒体程序。

以下是要点

  1. 目前我已经应用了10分钟间隔和5分钟滑动窗口的窗口操作。
  2. 水印被应用在以10分钟间隔的eventDate参数上。
  3. 目前,我没有执行任何其他操作,只是将其存储在指定的位置,拼花格式。
  4. 程序将一个事件存储在一个文件中。

问题:

  1. 是否可以根据窗口时间以拼花格式将多个事件存储在一个文件中
  2. 在这种情况下,窗口操作是如何工作的
  3. 此外,我还想与之前的事件一起检查事件状态,并根据一些计算(例如,5分钟内未收到事件),我想更新状态

...

public class EventSubscriber {

   public static void main(String args[]) throws InterruptedException, StreamingQueryException {

    String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";

    String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();

    EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
            .setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
            .setReceiverTimeout(java.time.Duration.ofMinutes(10));

    SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");

    SparkSession spSession = SparkSession.builder()
            .appName("IoT Spark Streaming")
            .config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
            .getOrCreate();

    Dataset<Row> inputStreamDF = spSession.readStream()
            .format("eventhubs")
            .options(eventHubsConf.toMap())
            .load();

    Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");

    StructType jsonStruct = new StructType()
            .add("eventType", DataTypes.StringType)
            .add("payload", DataTypes.StringType);

    Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
        String valStr = value.getString(0).toString();

        String payload = valStr;

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);

        JsonElement methodName = jsonObj.get("method");

        String eventType = null;
        if(methodName != null) {
            eventType = "OTHER_EVENT";
        } else {
            eventType = "DEVICE_EVENT";
        }

        Row jsonRow = RowFactory.create(eventType, payload);
        return jsonRow;

    }, RowEncoder.apply(jsonStruct));

    messageRow.printSchema();

    Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");

    deviceEventRowDS.printSchema();

    Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {

        String jsonString = value.getString(1).toString();

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
        DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
        return deviceEvent;

    }, Encoders.bean(DeviceEvent.class));

    deviceEventDS.printSchema();

    Dataset<Row> messageDataset = deviceEventDS.select(
            functions.col("eventType"), 
            functions.col("deviceID"),
            functions.col("description"),
            functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
            functions.col("deviceModel"),
            functions.col("pingRate"))
            .select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");

    messageDataset.printSchema();

    Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
            .groupBy(functions.col("deviceID"),
                    functions.window(
                            functions.col("eventDate"), "10 minutes", "5 minutes"))
            .count();

    devWindowDataset.printSchema();

    StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
            .format("parquet")
            .option("truncate", "false")
            .option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
            .start();

    query.awaitTermination();
}}

...

任何关于这方面的帮助或指导都是有用的。

感谢和问候,

阿维纳什·德什穆克

共有1个答案

蓟辰沛
2023-03-14

是否可以根据窗口时间以拼花格式将多个事件存储在一个文件中?

是的。

在这种情况下,窗口操作是如何工作的?

以下代码是Spark结构化流媒体应用程序的主要部分:

Dataset<Row> devWindowDataset = messageDataset
  .withWatermark("eventDate", "10 minutes")
  .groupBy(
    functions.col("deviceID"),
    functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
  .count();

也就是说,底层状态存储应该按照deviceIDeventDate将状态保持10分钟,对于延迟事件,额外的10分钟(按照带水印)。换句话说,在流式查询开始20分钟后,当一个事件eventDate20分钟后,您应该会看到结果出来。

with Watermark用于后期事件,因此即使group by将产生结果,结果也不会仅在超过水印阈值之前发出。

同样的程序每10分钟应用一次(10分钟水印),用5分钟的窗口幻灯片。

将带有窗口的groupBy操作符视为多列聚合。

此外,我还想与之前的事件一起检查事件状态,并根据一些计算(例如,5分钟内未收到事件),我想更新状态。

这听起来像是KeyValueGroupedDataset的一个用例。flatMapGroupsWithState运算符(又称任意有状态流聚合)。查阅任意有状态操作。

也可能只需要众多聚合标准函数中的一个或用户定义的聚合函数(UDAF)。

 类似资料:
  • 我正在尝试理解。 据我所知,在这种类型的流处理器中,它使用来维护某种状态。 我开始知道,实现的方法之一是使用。假设以下(并且只有一个处理器是) A- 假设sp只监听一个Kafka主题,比如带有10个分区的。 我观察到,当应用程序启动时(在不同的物理机器上有2个实例,并且=5),然后对于,它会创建目录结构,其内容如下: 0_0,0_1,0_2......0_9(每台机器有5个分区)。 我正在浏览一些

  • 我有一个包含项目列表的大文件。 我想创建一批项目,用这个批次做一个HTTP请求(所有的项目都需要作为HTTP请求中的参数)。我可以用循环很容易地做到这一点,但是作为Java8爱好者,我想尝试用Java8的Stream框架来编写这个(并获得延迟处理的好处)。 例子: 我想做一些事情沿着< code>lazyFileStream.group(500)线。映射(processBatch)。collect

  • 本文向大家介绍Python 处理带有 \u 的字符串操作,包括了Python 处理带有 \u 的字符串操作的使用技巧和注意事项,需要的朋友参考一下 最近遇到一个头疼的问题,用socket接收到一个字符串 格式如下: {“trade_status”: {“desc”: “\u30106\u3011 - \u8d22\u52a1\u7ed3\u7b97\u5df2\u5b8c\u6210 “}}/en

  • 我想制作一个,传递事件和一些参数。问题是函数没有得到元素。下面是一个例子: 必须在匿名函数之外定义。如何获取传递的元素以在匿名函数中使用?有办法做到这一点吗? 那么呢?我似乎根本无法通过传递事件,是吗? 使现代化 我似乎用“这个”解决了这个问题 其中包含我可以在函数中访问的。 addEventListener 但是我想知道:

  • 我想根据具有相同标识符的两个事件来检测两个事件是否在定义的时间范围内发生。例如,如下所示: 下面示例中的My DoorEvent java类具有相同的结构。 我想检测id为1的门在打开后5分钟内关闭。为此,我尝试使用Apache flink CEP库。传入流包含来自20扇门的所有打开和关闭消息。 如何在中将门1的状态保存为打开,以便在步骤中我知道门1是关闭的门,而不是其他门?

  • 我有一个流系统,在这里我可以获得点击流数据。 数据格式: 我怎样才能做到这一点呢?基本上,我必须维护窗口中所有事件的状态,然后,一旦我获得事件,我必须从该状态获取价格。我并不要求任何工作解决方案,只是要求如何维护窗口中所有事件的状态。我也有一些自定义的Reduce操作。 在:我将2个事件数据加入到列表中。