当前位置: 首页 > 知识库问答 >
问题:

如何使用Flink SQL按事件时间对流进行排序

冀越
2023-03-14

我有一个无序的DataStream

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);

    DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
            .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

    Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
    tableEnv.registerTable("events", events);
    Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
    DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

    sortedEventStream.print();

    env.execute();
}

我得到这个错误:

线程"main"中的异常org.apache.flink.table.api.SqlParserException:SQL解析失败。在第1行,第8列遇到"时间戳From"。

似乎我没有以正确的方式指定事件时间属性,但不清楚出了什么问题。


共有1个答案

竺和洽
2023-03-14

问题是在我的事件类中使用时间戳作为字段名。将其更改为eventTime(事件时间)就足以让一切正常工作:

public class Sort {
    public static final int OUT_OF_ORDERNESS = 1000;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
                .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

        Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
        tableEnv.registerTable("events", events);
        Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
        DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

        sortedEventStream.print();

        env.execute();
    }

    public static class Event {
        public Long eventTime;

        Event() {
            this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
        }
    }

    private static class OutOfOrderEventSource implements SourceFunction<Event> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            while(running) {
                ctx.collect(new Event());
                Thread.sleep(1);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

    private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
        public TimestampsAndWatermarks() {
            super(Time.milliseconds(OUT_OF_ORDERNESS));
        }

        @Override
        public long extractTimestamp(Event event) {
            return event.eventTime;
        }
    }
}
 类似资料:
  • 这个问题涵盖了如何使用FlinkSQL对乱序流进行排序,但我更愿意使用DataStream API。一种解决方案是使用ProcessFunction来执行此操作,该ProcessFunction使用PriorityQueue来缓冲事件,直到水印指示它们不再乱序,但这在RocksDB状态后端中表现不佳(问题是每次访问PriorityQueue都需要整个PriorityQueue的ser/de)。无论

  • 我有一个数据库表,我想对其进行筛选,然后按相反(降序)顺序进行排序。我如何在类似于以下内容的速度流中表达: 我希望我的SQL查询能够通过速度优化,因此我不能使用匿名lambda。

  • 问题内容: 我有一个包含从上午8:00到下午4:00的时间列表。 当我在输出中显示它时,它似乎没有排序,而当我使用它时,它的排序时间是从1:00 pm到8:00 am。 我如何从8:00 am到4:00 pm排序我的列表? 问题答案: 不要重新发明轮子,而是使用collection(如果允许使用java8,则使用Lambdas)How ??:将列表保留为字符串,但使用Anonymous 比较器 ,

  • 问题内容: 我是Java 8的新手,不确定如何使用流及其排序方法。如果我的地图如下,如何使用Java 8按值对地图进行排序以仅获取前10个条目。 我知道在Java 8之前,我们可以按以下链接进行排序:http://codingdict.com/questions/116310 问题答案: 您可以随时开始阅读文档和一些 教程。 参考 http://www.leveluplunch.com/java/

  • 问题内容: 我的清单包含大小等的集合。我尝试这样做,但似乎不起作用。 我想要的最终结果是。 我可以尝试添加在所有的元素和那种出来再做出新的的。但是,有某种班轮吗? 更新: 这可行,但是可以简化吗? 问题答案: @Eugene的回答很甜蜜,因为番石榴很甜。但是,如果您碰巧在类路径中没有番石榴,这是另一种方式: 首先,我将所有集合映射到一个流中,然后对所有元素进行排序,最后,将整个排序后的流收集到集合

  • 我正在使用java lambda对列表进行排序。 我怎样才能反向排序呢?