我有一些能量计,将继续产生计数器值,这是一个累积指标。即不断增加,直到计数器复位。
Key Value
----------------------------------------------------------------------
Sensor1 {timestamp: "10-10-2019 10:20:30", Kwh: 10}
Sensor1 {timestamp: "10-10-2019 10:20:40", Kwh: 21}
Sensor1 {timestamp: "10-10-2019 10:20:55", Kwh: 25}
Sensor1 {timestamp: "10-10-2019 10:21:05", Kwh: 37}
Sensor1 {timestamp: "10-10-2019 10:21:08", Kwh: 43}
.
.
.
有一个实时ETL作业,它在事件时间的两个连续值之间进行减法。
例如。
10-10-2019 10:20:30 = 21 - 10 = 11
10-10-2019 10:20:40 = 25 - 21 = 4
10-10-2019 10:20:55 = 37 - 25 = 12
.
.
.
此外,有时事件可能没有按顺序接收。
如何使用Apache Flink流式API实现?最好使用Java中的示例。
一般来说,当面临按顺序处理乱序流的要求时,最简单(和高性能)的处理方法是使用FlinkSQL,并依靠它来进行排序。请注意,它将依靠Watermark Strategy
来确定何时可以安全地认为事件已准备好发出,并且会丢弃任何延迟事件。如果您必须了解延迟事件,那么我建议使用CEP而不是SQLMATCH_RECOGNIZE(如下所示)。
有关使用水印进行排序的详细信息,请参阅Flink docs中关于水印的教程。
下面是一个如何使用Flink SQL实现用例的示例:
public class SortAndDiff {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Tuple3<String, Long, Long>> input = env.fromElements(
new Tuple3<>("sensor1", "2019-10-10 10:20:30", 10L),
new Tuple3<>("sensor1", "2019-10-10 10:20:40", 21L),
new Tuple3<>("sensor2", "2019-10-10 10:20:10", 28L),
new Tuple3<>("sensor2", "2019-10-10 10:20:05", 20L),
new Tuple3<>("sensor1", "2019-10-10 10:20:55", 25L),
new Tuple3<>("sensor1", "2019-10-10 10:21:05", 37L),
new Tuple3<>("sensor2", "2019-10-10 10:23:00", 30L))
.map(new MapFunction<Tuple3<String, String, Long>, Tuple3<String, Long, Long>>() {
@Override
public Tuple3<String, Long, Long> map(Tuple3<String, String, Long> t) throws Exception {
return new Tuple3<>(t.f0, Timestamp.valueOf(t.f1).toInstant().toEpochMilli(), t.f2);
}
}).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
.withTimestampAssigner((event, timestamp) -> event.f1));
Table events = tableEnv.fromDataStream(input,
$("sensorId"),
$("ts").rowtime(),
$("kwh"));
Table results = tableEnv.sqlQuery(
"SELECT E.* " +
"FROM " + events + " " +
"MATCH_RECOGNIZE ( " +
"PARTITION BY sensorId " +
"ORDER BY ts " +
"MEASURES " +
"this_step.ts AS ts, " +
"next_step.kwh - this_step.kwh AS diff " +
"AFTER MATCH SKIP TO NEXT ROW " +
"PATTERN (this_step next_step) " +
"DEFINE " +
"this_step AS TRUE, " +
"next_step AS TRUE " +
") AS E"
);
tableEnv
.toAppendStream(results, Row.class)
.print();
env.execute();
}
}
问题内容: 我有一条流经多个系统的消息,每个系统都会记录消息的进入和退出以及时间戳和uuid messageId。我通过以下方式提取所有日志: 结果,我现在有以下事件: 我想生成一个报告(最好是堆积的条或列),用于每个系统的时间: 做这个的最好方式是什么?Logstash过滤器?kibana计算字段? 问题答案: 您只能使用Logstash 过滤器来实现此目的,但是,您必须实质性地重新实现该过滤器
问题内容: 是否有一个跟踪用户某些事件的表。 总是有一个动作,之后可能会有一个动作。 现在,我想查询这两个动作之间的时间差,以获取用户和之间的time_diff 。 现在,您可以假定没有多个条目(例如,至少一个,最大另一个)。 我想要这样的结果: 问题答案: 您可以使用以下查询: 该子句过滤掉仅包含一个动作的组,例如OP中的with记录。 演示在这里
问题内容: 我有一个熊猫数据框如下 上面的数据帧有83000行。我想获取两个连续行之间的时间差,并将其保存在单独的列中。理想的结果是 我已经尝试过但出现错误,如下所示 如何解决这个问题 问题答案: 问题是功能需要s或s ,因此首先要转换为,然后得到并除以: 如果需要或每分钟:
我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想
问题内容: 我有一个带有StartDate列的表,我想计算两个连续记录之间的时间差。 谢谢。 @ Mark Byers和@ Yahia,我将请求表作为requestId,startdate 我想知道requestid 1和2、2和3、3和4等之间的时差是多少。我知道我需要在表上进行自我连接,但是我在子句上没有得到正确的支持。 问题答案: 要实现您的要求,请尝试以下操作(从OP编辑后进行更新): 如
问题内容: 我在一个MySQL数据库表上工作,该表的列包含我对其他主机执行ping操作时的时间戳记(例如2014-09-16 09:08:05)。我的问题是如何在几分钟内计算出针对特定主机的第一次ping和最后一次ping之间的差异?另外,如何为上述差异的开始和结束指定不同的时间戳(而不是第一次和最后一次ping)。这是表格的示例: 我希望我已经对自己的解释足够清楚。 问题答案: 您可以使用本机的