当前位置: 首页 > 知识库问答 >
问题:

Flink是否可以产生聚合/滚动/累积数据的每小时快照?

潘自强
2023-03-14

流处理的教科书示例是一个带有时间戳的单词计数程序。使用以下数据示例

mario 10:00
luigi 10:01
mario 11:00
mario 12:00
mario 3
luigi 1
mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1

但是,我还没有找到一个在滚动时间窗口上的单词计数程序的例子,即我希望每小时产生一个从时间开始的每个单词的单词计数:

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1

对于Apache Flink或任何其他流处理库,这是可能的吗?谢谢!

编辑:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            .setParallelism(1)
            .setMaxParallelism(1);

    env.setStreamTimeCharacteristic(EventTime);


    String fileLocation = "full file path here";
    DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);

    rawInput.flatMap(parse())
            .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
                    return new Watermark(extractedTimestamp - 1);
                }

                @Override
                public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
                    return element.getTimestamp();
                }
            })
            .keyBy(TimestampedWord::getWord)
            .process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
                private transient ValueState<Long> count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
                }

                @Override
                public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    if (count.value() == null) {
                        count.update(0L);
                    }

                    long l = ((value.getTimestamp() / 10) + 1) * 10;
                    ctx.timerService().registerEventTimeTimer(l);

                    count.update(count.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    long currentWatermark = ctx.timerService().currentWatermark();
                    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
                }
            })
            .addSink(new PrintlnSink());

    env.execute();
}

private static long fileCounter = 0;

private static FlatMapFunction<String, TimestampedWord> parse() {
    return new FlatMapFunction<String, TimestampedWord>() {
        @Override
        public void flatMap(String value, Collector<TimestampedWord> out) {
            out.collect(new TimestampedWord(value, fileCounter++));
        }
    };
}

private static class TimestampedWord {
    private final String word;
    private final long timestamp;

    private TimestampedWord(String word, long timestamp) {
        this.word = word;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
    @Override
    public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
        System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
    }
}

马里奥,路易吉,马里奥,马里奥,维尔马,弗雷德,鲍勃,鲍勃,马里奥,丹,迪伦,迪伦,弗雷德,马里奥,马里奥,卡尔,班巴姆,萨默,安娜,安娜,埃杜,安娜,安娜,安娜,安娜,安娜

产生以下输出:

mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807

显然有些不对劲。虽然Anna这个词的第三个实例直到第22个位置才出现,但是Anna的计数是3。奇怪的是,edu只出现在最后一个快照中,尽管它出现在anna的第三个实例之前。即使没有消息到达(即应该生成相同的数据),我如何触发每10“单位时间”生成一次快照?

如果有人能给我指明正确的方向,我会非常感激的!

共有1个答案

茅炯
2023-03-14

是的,这不仅仅是用闪现可以做到的,而是很容易的。您可以使用KeyedProcessFunction来实现这一点,该函数将每个字/键在输入流中出现的次数保持在键化状态。然后使用计时器触发报告。

下面是一个使用处理时间计时器的示例。它每10秒打印出一份报告。

public class DSExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
            .keyBy(x -> x)
            .process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
                private transient ValueState<Integer> counter;

                @Override
                public void open(Configuration parameters) throws Exception {
                    counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
                }

                @Override
                public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
                    if (counter.value() == null) {
                        counter.update(0);
                        long now = context.timerService().currentProcessingTime();
                        context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    }
                    counter.update(counter.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
                    long now = context.timerService().currentProcessingTime();
                    context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
                }
            })
            .print();

        env.execute();
    }
}

更新:

使用事件时总是更好,但这确实增加了复杂性。增加的复杂性大部分来自于这样一个事实,即在实际应用程序中,您很可能必须处理无序事件--您在示例中已经避免了这一点,因此在本例中,我们可以通过一个相当简单的实现来摆脱这种情况。

如果你改变了两件事,你就会得到你所期望的结果。首先,将水印设置为extractedtimestamp-1是结果错误的原因(例如,这就是为什么Anna=3在20处)。如果您将水印设置为extractedtimestamp,那么这个问题就会消失。

说明:正是第三个安娜的到来创造了在时间20关闭窗口的水印。第三个anna的时间戳为21,因此在流中它后面跟着一个20的水印,这将关闭第二个窗口,并生成一个报告,说明anna=3。是的,第一个edu更早到达,但它是第一个edu,时间戳是20。在edu到达时,没有为edu设置定时器,并且创建的定时器被正确设置为在30发射,所以直到至少有30的水印到达,我们才会听到edu的消息

另一个问题是定时器逻辑。Flink为每个键创建一个单独的计时器,每次计时器触发时,您都需要创建一个新的计时器。否则,您将只得到关于在窗口期间到达的单词的报告。您应该将代码修改成更像这样:

@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    if (count.value() == null) {
        count.update(0L);
        setTimer(ctx.timerService(), value.getTimestamp());
    }

    count.update(count.value() + 1);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    long currentWatermark = ctx.timerService().currentWatermark();
    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
    if (currentWatermark < Long.MAX_VALUE) {
        setTimer(ctx.timerService(), currentWatermark);
    }
}

private void setTimer(TimerService service, long t) {
    service.registerEventTimeTimer(((t / 10) + 1) * 10);
}

通过这些更改,我得到了这些结果:

mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807

现在,如果您需要实际处理无序事件,这将变得相当复杂。将有必要使水印滞后于时间戳一些反映流中存在的实际无序量的实际量,然后这将需要能够处理一次打开多个窗口。任何给定的事件/字都可能不属于下一个将关闭的窗口,因此不应该增加其计数器。例如,您可以将这些“早期”事件缓冲在另一个状态(例如,ListState)中,或者以某种方式维护多个计数器(可能在MapState中)。此外,某些事件可能会延迟,从而使先前的报告无效,您需要定义一些策略来处理这一点。

 类似资料:
  • 我有一个窗口化的每小时聚合的数据流。 Datastreamds=.....

  • 我可以在每次添加的地方实现简单的使用。但我无法实现需要减去现有价值而增加新价值的聚合。 余额1、2、3是邮件的序列 注释显示Jet执行的每个消息的聚合值。 我的目标是添加新的金额(如果第一次来了id)和减去金额,如果一个更新的余额来了I。E.Id和之前一样。

  • 问题内容: 我知道可以通过以下方法在Flask中设置请求大小的整体限制: 但是我想确保一个特定的路由将不接受特定大小的POST数据。 问题答案: 你需要检查一下特定路线本身;你可以随时测试内容长度;是一个或整数值: 在访问请求中的表单或文件数据之前,请执行此操作。 你可以将其变成装饰器以供查看: 然后将其用作: 本质上这就是Flask所做的;当你尝试访问请求数据时,在尝试解析请求正文之前,首先检查

  • 我正在实现一个用例,其中不同的物理设备正在发送事件,并且由于网络/电源问题,在flink source接收事件时可能会有延迟。flink作业中的一个操作符是模式操作符,并且有一些特定的模式是时间敏感的,所以我使用事件时间特性。但是,当来自特定设备的事件出现不可预测的延迟时,问题就会出现,这会导致这些事件被丢弃(因为我无法真正定义允许延迟的静态绑定)。 由于我使用的是基于源设备ID的KeyedStr

  • 当我们进行基于事件时间的聚合时,有一个实时的数据流--某个键的事件的水印会触发其他键的窗口操作吗? id为2的事件是否会触发id为1的12:00-12:10的时间窗口?或者只有在12:20下一个id为1的事件到达时才会发生?

  • 我目前正在使用Flink 1.0编写一个聚合用例,作为该用例的一部分,我需要获得过去10分钟内登录的api数量。 这我可以很容易地使用keyBy("api"),然后应用10分钟的窗口和doe sum(count)操作。 但问题是我的数据可能会出现混乱,所以我需要一些方法来获取10分钟窗口内的api计数。。 例如:如果相同的api日志出现在两个不同的窗口中,我应该得到一个全局计数,即2,而不是两个单