val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
因此,基本上,当标志
为假
时,即第一次,触发器
应在20秒内触发,并将标志
设置为真
。从下一次开始,它应该每隔5秒就会被发射一次。
我面临的问题是,每次触发器
被激发时,输出中只有一条消息。也就是说,我在20秒后收到一条消息,每5秒收到一条消息。我希望在每次触发的输出中有20条消息。
如果我使用.timewindow(time.seconds(5))
并创建一个五秒的时间窗口,则每5秒输出20条消息。请帮我把这个代码弄对。我是不是缺了什么?
触发器实现有几个问题:
>
不应将函数的状态存储在静态变量中。Flink不隔离JVM中的用户进程。相反,它使用每个TaskManager一个JVM并启动多个线程。因此,您的静态布尔标志在触发器的多个实例中共享。您应该存储Flink的ValueState
接口,该接口可以从TriggerContext
访问。闪现会注意检查你的状态,并在失败的情况下恢复它。
trigger.onevent()
仅在新事件到达时调用。因此它不能用于在特定时间触发窗口计算。相反,您应该注册事件时间计时器或处理时间计时器(同样通过TriggerContext
)。计时器将分别调用Trigger.OnEventTime()
或Trigger.OnProcessingTime()
。使用事件还是处理时间取决于您的用例。
问题内容: 我只是通过跟随有关Udemy的教学视频开始学习Go的,我尝试按如下方式打印当前时间 我得到一个很长的文本作为输出如下 我原本希望只在其后加上a ,这应该是结尾。预期的输出如下所示,也如本教程视频中所示。但是对我来说,结果的形式要长得多。 问题是,为什么同一个命令在教师的程序和我的程序之间返回不同的格式?为什么没有设置特定的格式,不应该返回一种格式? 问题答案: 问题是,为什么同一条命令
我有一个gradle构建脚本,它检索了一些常见的依赖项,并将它们组合起来创建了一个“胖罐子”。
Berat有资格获得驾驶执照! Berat不能买酒! Berat的有效年龄:真 虽然,我优先考虑每个规则与突出关键字,“可以由酒精”规则仍然被解雇。它不应该被激发,因为在第一个规则中执行setValid(true),而在第二个规则中执行isValid()==false控件应该返回false,因此应该遗漏部分。
当我的函数方法签名如下时,我有一个成功触发的v3 WebJob: 然而,当我添加一个输出blob时,BlobTrigger永远不会触发。 下面的文档如下:https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-storage-blob#output
我用固定速率启动计时器。当用户更改系统时间时,此时任务将连续执行。它没有考虑时段。如何管理。 注:现在系统时间为当前时间。周期为30秒。现在用户将系统时间从当前时间改为10分钟后。当时计时器任务不考虑周期。在一秒钟内,它执行了20次任务。 当我使用 而不是 任务工作正常。如果我将系统时间更改为10分钟过去的时间,从现在起任务不会执行。。。 如何解决这个问题(
我需要生成具有以下要求的CSV文件: 每个字段都被引号包围 分隔符(管道字符)用反斜杠转义 反斜杠用反斜杠转义 输入: 字段1 Field2 with\反斜杠"DoubleQuotes"And|管道 字段3 预期产出: 有可能获得这样的输出吗?