当前位置: 首页 > 知识库问答 >
问题:

Apache Flink-计算两个连续事件与事件时间之间的值差异

齐财
2023-03-14

我有一些能量计,将继续产生计数器值,这是一个累积指标。即不断增加,直到计数器复位。

Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", Kwh: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", Kwh: 21}
Sensor1            {timestamp: "10-10-2019 10:20:55", Kwh: 25}
Sensor1            {timestamp: "10-10-2019 10:21:05", Kwh: 37}
Sensor1            {timestamp: "10-10-2019 10:21:08", Kwh: 43}
.
.
.

有一个实时ETL作业,它在事件时间的两个连续值之间进行减法。

例如。

10-10-2019 10:20:30  = 21 - 10 = 11
10-10-2019 10:20:40  = 25 - 21 = 4
10-10-2019 10:20:55  = 37 - 25 = 12
. 
.
.

此外,有时事件可能没有按顺序接收。

如何使用Apache Flink流式API实现?最好使用Java中的示例。

共有1个答案

诸正谊
2023-03-14

一般来说,当面临按顺序处理乱序流的要求时,最简单(和高性能)的处理方法是使用FlinkSQL,并依靠它来进行排序。请注意,它将依靠Watermark Strategy来确定何时可以安全地认为事件已准备好发出,并且会丢弃任何延迟事件。如果您必须了解延迟事件,那么我建议使用CEP而不是SQLMATCH_RECOGNIZE(如下所示)。

有关使用水印进行排序的详细信息,请参阅Flink docs中关于水印的教程。

下面是一个如何使用Flink SQL实现用例的示例:

public class SortAndDiff {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<Tuple3<String, Long, Long>> input = env.fromElements(
                new Tuple3<>("sensor1", "2019-10-10 10:20:30", 10L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:40", 21L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:10", 28L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:05", 20L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:55", 25L),
                new Tuple3<>("sensor1", "2019-10-10 10:21:05", 37L),
                new Tuple3<>("sensor2", "2019-10-10 10:23:00", 30L))
        .map(new MapFunction<Tuple3<String, String, Long>, Tuple3<String, Long, Long>>() {
            @Override
            public Tuple3<String, Long, Long> map(Tuple3<String, String, Long> t) throws Exception {
                return new Tuple3<>(t.f0, Timestamp.valueOf(t.f1).toInstant().toEpochMilli(), t.f2);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                    .withTimestampAssigner((event, timestamp) -> event.f1));

        Table events = tableEnv.fromDataStream(input,
                $("sensorId"),
                $("ts").rowtime(),
                $("kwh"));

        Table results = tableEnv.sqlQuery(
                "SELECT E.* " +
                    "FROM " + events + " " +
                    "MATCH_RECOGNIZE ( " +
                        "PARTITION BY sensorId " +
                        "ORDER BY ts " +
                        "MEASURES " +
                            "this_step.ts AS ts, " +
                            "next_step.kwh - this_step.kwh AS diff " +
                        "AFTER MATCH SKIP TO NEXT ROW " +
                        "PATTERN (this_step next_step) " +
                        "DEFINE " +
                            "this_step AS TRUE, " +
                            "next_step AS TRUE " +
                    ") AS E"
        );


        tableEnv
                .toAppendStream(results, Row.class)
                .print();

        env.execute();
    }

}
 类似资料:
  • 问题内容: 我有一条流经多个系统的消息,每个系统都会记录消息的进入和退出以及时间戳和uuid messageId。我通过以下方式提取所有日志: 结果,我现在有以下事件: 我想生成一个报告(最好是堆积的条或列),用于每个系统的时间: 做这个的最好方式是什么?Logstash过滤器?kibana计算字段? 问题答案: 您只能使用Logstash 过滤器来实现此目的,但是,您必须实质性地重新实现该过滤器

  • 问题内容: 是否有一个跟踪用户某些事件的表。 总是有一个动作,之后可能会有一个动作。 现在,我想查询这两个动作之间的时间差,以获取用户和之间的time_diff 。 现在,您可以假定没有多个条目(例如,至少一个,最大另一个)。 我想要这样的结果: 问题答案: 您可以使用以下查询: 该子句过滤掉仅包含一个动作的组,例如OP中的with记录。 演示在这里

  • 问题内容: 我有一个熊猫数据框如下 上面的数据帧有83000行。我想获取两个连续行之间的时间差,并将其保存在单独的列中。理想的结果是 我已经尝试过但出现错误,如下所示 如何解决这个问题 问题答案: 问题是功能需要s或s ,因此首先要转换为,然后得到并除以: 如果需要或每分钟:

  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想

  • 问题内容: 我有一个带有StartDate列的表,我想计算两个连续记录之间的时间差。 谢谢。 @ Mark Byers和@ Yahia,我将请求表作为requestId,startdate 我想知道requestid 1和2、2和3、3和4等之间的时差是多少。我知道我需要在表上进行自我连接,但是我在子句上没有得到正确的支持。 问题答案: 要实现您的要求,请尝试以下操作(从OP编辑后进行更新): 如

  • 问题内容: 我在一个MySQL数据库表上工作,该表的列包含我对其他主机执行ping操作时的时间戳记(例如2014-09-16 09:08:05)。我的问题是如何在几分钟内计算出针对特定主机的第一次ping和最后一次ping之间的差异?另外,如何为上述差异的开始和结束指定不同的时间戳(而不是第一次和最后一次ping)。这是表格的示例: 我希望我已经对自己的解释足够清楚。 问题答案: 您可以使用本机的