当前位置: 首页 > 知识库问答 >
问题:

如何在流数据的时间戳基础上处理FLINK窗口?

段兴为
2023-03-14

我有一些问题。

基于类中的时间戳,我想做一个逻辑,排除在1分钟内输入N次或更多次的数据。

UserData类有一个时间戳变量。

class UserData{ 
      public Timestamp timestamp; 
      public String userId; 
    }

起初我试着用一个翻滚的窗户。

SingleOutputStreamOperator<UserData> validStream =
                stream.keyBy((KeySelector<UserData, String>) value -> value.userId)
                        .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
                        .process(new ValidProcessWindow());

public class ValidProcessWindow extends ProcessWindowFunction<UserData, UserData, String, TimeWindow> {

    private int validCount = 10;

    @Override
    public void process(String key, Context context, Iterable<UserData> elements, Collector<UserData> out) throws Exception {
        int count = -1;
        for (UserData element : elements) {
            count++; // start is 0

            if (count >= validCount) // valid click count
            {
                continue;
            }
            
            out.collect(element);
        }
    }
}

但是,滚动窗口的时间计算是基于固定时间的,因此无论UserData类的时间戳如何,它都不适合。

如何处理流上窗口UserData类的时间戳基?

谢谢。

附加信息

我使用这样的代码。

stream.assignTimestampsAndWatermarks(WatermarkStrategy.<UserData>forBoundedOutOfOrderness(Duration.ofSeconds(1))                
.withTimestampAssigner((event, timestamp) -> Timestamps.toMillis(event.timestamp))
.keyBy((KeySelector<UserData, String>) value -> value.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new ValidProcessWindow());

我试了一些测试。150个样本数据。每个数据的时间戳增加1秒。结果是|1,2,3....59||60,61....119|。我等待最后30个数据。我期待|1,2,3....59||60,61....119||120...149|。

如何获得最后的其他数据?

自我回答

我找到了原因。因为我只使用了150个样本数据。

如果使用事件时间在Flink不能进展,如果没有要处理的元素。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html#idling-sources

因此,我测试了150个样本数据和虚拟数据。(每个数据的虚拟数据时间戳增加1秒)。

我收到了正确的数据| 1,2,3。。。。59| |60,61....119| |120...149|.

谢谢你的帮助。

共有1个答案

都飞跃
2023-03-14

所以就我理解你的问题,你应该使用不同的时间特征。处理时间是使用系统时间来计算窗口,您应该为您的应用程序使用事件时间。您可以在这里找到关于正确使用活动时间的更多信息。

编辑:这就是flink的工作原理,没有数据将水印推到150以上,所以窗口没有关闭,因此没有输出。您可以使用自定义触发器,即使没有生成水印也会关闭窗口,或者注入一些数据来移动水印。

 类似资料:
  • 我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。 当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。 流程示例: 我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期

  • null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有

  • 如何在Flink中的迭代数据流循环中处理时间戳? null

  • 到目前为止,我了解到有3种方法可以处理Flink中的后期数据: > 删除延迟事件(这是事件时间窗口运算符的默认行为。(因此,延迟到达的元素不会创建新窗口。)( 重定向延迟事件(也可以使用side输出功能将延迟事件重定向到另一个数据流) 通过包含延迟事件更新结果(重新计算不完整的结果并发出更新) 我不太清楚非窗口操作符的延迟事件会发生什么,特别是当时间戳被分配到源时。这里我有一个FlinkKafka

  • 假设我有一个 inputStream,我对它执行了一些窗口操作。通过对事件执行某些窗口操作而创建的事件的时间戳是什么? 现在我想组合流countStream和maxStream,以找到最后一秒的countStream等于maxStream的所有时间戳。 注意:这并不是我试图解决的问题,但这是一个代表性的例子。解决这个问题将帮助我解决我需要解决的真正问题。