当前位置: 首页 > 知识库问答 >
问题:

使用GROUP BY over not timestamp列对Flink SQL进行流式处理

梁祯
2023-03-14

在e2e FlinkSQL教程中,源表被定义为带有启用水印的时间戳列的Kafka源表

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime AS PROCTIME(),   -- generates processing-time attribute using computed column
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute
) WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'kafka:9094',  -- kafka broker address
    'format' = 'json'  -- the data format is json
);

只要GROUP BY是由一个翻滚在ts上的字段生成的,这看起来很自然(因为Flink知道何时触发/弹出窗口),但在教程的中间我们看到了以下表达式

INSERT INTO cumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv
FROM (
  SELECT
    DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,
    SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,
    user_id
  FROM user_behavior)
GROUP BY date_str;

在这里,我们看到分组是在导数<code>date_str</code>字段上进行的,但是水印在这里是如何工作的呢?Flink如何决定何时“关闭”date_ str桶?由于<code>date_str

共有1个答案

闻人宝
2023-03-14

也许您可以参考下面的链接来了解水印的生成和交付,特别是“操作员如何处理水印”

在本例中,水印是从源运算符的ts生成的,而下游运算符将仅处理水印,这与date_str字段无关。

public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T>
        implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
......


    @Override
    public void open() throws Exception {
        super.open();

        timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
        watermarkGenerator =
                emitProgressiveWatermarks
                        ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                        : new NoWatermarksGenerator<>();

        wmOutput = new WatermarkEmitter(output);

        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        if (watermarkInterval > 0 && emitProgressiveWatermarks) {
            final long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);
        }
    }

    @Override
    public void processElement(final StreamRecord<T> element) throws Exception {
        final T event = element.getValue();
        final long previousTimestamp =
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
        final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);

        element.setTimestamp(newTimestamp);
        output.collect(element);
        watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
    }

......

    @Override
    public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark)
            throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE) {
            wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
        }
    }
......
}

https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/

 类似资料:
  • 我有一个这样的方法: 此方法需要以如下字符串形式返回3个最昂贵项目的产品ID:“item1,item2,item3”。我应该只能使用溪流,我被困在这里了。我应该能够按值对项目进行排序,然后获得产品ID,但我似乎无法使其正常工作。 编辑: 产品ID位于入口类中

  • 我有一个BigDecimals(在本例中是)的集合,希望将其添加在一起。有没有可能使用流来实现这一点? 我注意到类有几个方法 每一个都有一个方便的方法。但是,正如我们所知,和算术几乎总是一个坏主意。 那么,有没有方便的方法来求和BigDecimals呢? 这是我目前掌握的代码。 为后人编辑后答案: 这两个答案都非常有用。我想补充一点:我的实际场景不涉及原始的集合,它们包装在发票中。但是,我能够修改

  • 我已经阅读了Spring Cloud stream binder参考文档,其中提到了使用@RabbitListener进行DLQ处理。https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#ra

  • 我有一个作者集合和一个书籍集合。一本书与作者之间的联系是作者的电子邮件地址。 Book类有一个作者电子邮件地址的:。 如何使用lambda表达式获取所有图书的所有作者列表,并按图书名称进行筛选?

  • 问题内容: 我的清单包含大小等的集合。我尝试这样做,但似乎不起作用。 我想要的最终结果是。 我可以尝试添加在所有的元素和那种出来再做出新的的。但是,有某种班轮吗? 更新: 这可行,但是可以简化吗? 问题答案: @Eugene的回答很甜蜜,因为番石榴很甜。但是,如果您碰巧在类路径中没有番石榴,这是另一种方式: 首先,我将所有集合映射到一个流中,然后对所有元素进行排序,最后,将整个排序后的流收集到集合

  • 我有以下课程 我正在尝试按贡献者名称和角色名称对ResourceContributor列表进行排序。到目前为止,我得到的是: 我已经尝试使用then比较,但还不知道如何使用它。