当前位置: 首页 > 知识库问答 >
问题:

Flink自定义触发器给出意外输出

长孙瑞
2023-03-14
val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        } else {
            if((System.currentTimeMillis()-ctime) >= 5000){
                ctime = System.currentTimeMillis();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

    }

    @Override
    public TriggerResult onEventTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }


    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }

}

因此,基本上,当标志时,即第一次,触发器应在20秒内触发,并将标志设置为。从下一次开始,它应该每隔5秒就会被发射一次。

我面临的问题是,每次触发器被激发时,输出中只有一条消息。也就是说,我在20秒后收到一条消息,每5秒收到一条消息。我希望在每次触发的输出中有20条消息。

如果我使用.timewindow(time.seconds(5))并创建一个五秒的时间窗口,则每5秒输出20条消息。请帮我把这个代码弄对。我是不是缺了什么?

共有1个答案

方德宇
2023-03-14

触发器实现有几个问题:

>

  • 不应将函数的状态存储在静态变量中。Flink不隔离JVM中的用户进程。相反,它使用每个TaskManager一个JVM并启动多个线程。因此,您的静态布尔标志在触发器的多个实例中共享。您应该存储Flink的ValueState接口,该接口可以从TriggerContext访问。闪现会注意检查你的状态,并在失败的情况下恢复它。

    trigger.onevent()仅在新事件到达时调用。因此它不能用于在特定时间触发窗口计算。相反,您应该注册事件时间计时器或处理时间计时器(同样通过TriggerContext)。计时器将分别调用Trigger.OnEventTime()Trigger.OnProcessingTime()。使用事件还是处理时间取决于您的用例。

  •  类似资料:
    • 问题内容: 我只是通过跟随有关Udemy的教学视频开始学习Go的,我尝试按如下方式打印当前时间 我得到一个很长的文本作为输出如下 我原本希望只在其后加上a ,这应该是结尾。预期的输出如下所示,也如本教程视频中所示。但是对我来说,结果的形式要长得多。 问题是,为什么同一个命令在教师的程序和我的程序之间返回不同的格式?为什么没有设置特定的格式,不应该返回一种格式? 问题答案: 问题是,为什么同一条命令

    • 我有一个gradle构建脚本,它检索了一些常见的依赖项,并将它们组合起来创建了一个“胖罐子”。

    • Berat有资格获得驾驶执照! Berat不能买酒! Berat的有效年龄:真 虽然,我优先考虑每个规则与突出关键字,“可以由酒精”规则仍然被解雇。它不应该被激发,因为在第一个规则中执行setValid(true),而在第二个规则中执行isValid()==false控件应该返回false,因此应该遗漏部分。

    • 当我的函数方法签名如下时,我有一个成功触发的v3 WebJob: 然而,当我添加一个输出blob时,BlobTrigger永远不会触发。 下面的文档如下:https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-storage-blob#output

    • 我用固定速率启动计时器。当用户更改系统时间时,此时任务将连续执行。它没有考虑时段。如何管理。 注:现在系统时间为当前时间。周期为30秒。现在用户将系统时间从当前时间改为10分钟后。当时计时器任务不考虑周期。在一秒钟内,它执行了20次任务。 当我使用 而不是 任务工作正常。如果我将系统时间更改为10分钟过去的时间,从现在起任务不会执行。。。 如何解决这个问题(

    • 我需要生成具有以下要求的CSV文件: 每个字段都被引号包围 分隔符(管道字符)用反斜杠转义 反斜杠用反斜杠转义 输入: 字段1 Field2 with\反斜杠"DoubleQuotes"And|管道 字段3 预期产出: 有可能获得这样的输出吗?