当前位置: 首页 > 知识库问答 >
问题:

Spark Structured Streaming:在翻滚窗口结束时输出结果,而不是批次

冯胤
2023-03-14

我希望火花流的输出在翻转窗口的末端发送到水槽,而不是在批处理间隔。

我从一个Kafka流读取并输出到另一个Kafka流。

查询和写入输出的代码如下:

Dataset<Row> sqlResult = session.sql("select window, user, sum(amount) as amount from users where type = 'A' group by window(timestamp, '1 minute', '1 minute'), user");
sqlResult = sqlResult.select(to_json(struct("window", "user", "amount")).as("value"));

StreamingQuery query = sqlResult.writeStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "aggregated-topic")
    .option("checkpointLocation", "c:/tmp")
    .outputMode(OutputMode.Update())
    .start();

当我在一分钟的窗口内为一个特定用户发送多个记录时,我希望在一分钟结束时这些事件的总数。

但我在输出Kafka流上获得了多个输出,并在其中写入了间歇聚合。

如。

我将在一分钟内发送以下7条记录,但间隔一定时间。

json prettyprint-override">
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}

我得到的结果是:

{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":10.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":20.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":40.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":60.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}

可以看到,输出在同一个窗口中,但有多个输出。

我想要的是一分钟结束时的单个输出

{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}

我该如何实现呢?

共有1个答案

公西博实
2023-03-14

将流写入接收器时,需要设置处理触发器。

你用。具有适当触发器值的DataStreamWriter的触发器(trigger.ProcessingTime)。


StreamingQuery query = sqlResult.writeStream()
        .trigger(Trigger.ProcessingTime("1 minute")) //this

 类似资料:
  • 使用翻滚窗口的apache flink应用程序遇到问题。窗口大小是10秒,我希望每隔10秒有一个resultSet数据流。然而,当最新窗口的结果集总是延迟时,除非我将更多数据推送到源流。 例如,如果我在“01:33:40.0”和“01:34:00.0”之间将多条记录推送到源流,然后停止查看日志,则不会发生任何事情。 我在“01:37:XX”上再次推送一些数据,然后将在“01:33:40.0”和“0

  • Python新手在这里。我正在编写一个简单的客户端/服务器程序,要求用户输入姓名、用户名、电子邮件地址、密码。此信息被发送到服务器,服务器将检查此用户的文本文件中是否已经有条目。如果有,它应该发送一条消息回来说这个用户已经存在,要求用户再试一次。 我设置了一个名为标志的变量为False。我对照文本文件检查用户信息,如果在文件中没有找到匹配,我将标志设置为true。然后我有一个if语句,它说如果标志

  • 我有一个聚合函数,它计算WindowedStream中一系列事件的平均值。 这里的警告是,平均值需要在可能无序(或根本没有)到达的事件对上计算。 换句话说,我需要在计算之前对数据进行排序,因为序列很重要。 我可以用getResult API来实现这一点,但是这个函数在窗口中的每个事件上都被调用,这在性能方面没有意义。我也可以用flink cep来做这件事,但出于同样的原因,我想避免使用它。 理想情

  • 问题内容: 我是Java的新手,所以我编写了这段代码,以便将这整个五年都称为布尔值,并为所有布尔值生成答案。但是,它仅调用最后一个。我该怎么做呢? 问题答案: 您每年需要使用单独的对象,或者至少在创建该年份的对象后立即调用the年检查方法。 您所拥有的是对函数的一系列调用,该函数将值分配给同一对象的属性。因此,只有最后一条语句才起作用,因为先前的值将被覆盖。 另外请注意,您的代码似乎没有正确组织。

  • 我有一个使用flink应用程序的场景,该应用程序接收以下格式的数据流: {“event\u id”:“c1s2s34”,“event\u create\u timestamp”:“2019-03-07 11:11:23”,“amount”:“104.67”} 我使用下面的滚动窗口来查找过去60秒内输入流的总和、计数和平均值。 键值。时间窗口(时间秒(60)) 然而,我如何标记聚合结果,以便我可以说

  • 我正在窗口流上执行聚合,希望抑制早期聚合结果。我所说的早期结果是指在窗口结束前计算的结果,而不是那些在宽限期内发生的结果。因此,我想用时间戳抑制所有聚合结果 最小Kafka流拓扑示例: 因此,不是我的选择,因为我必须等到宽限期到期,这可能会很长。 根据KIP-328,使用