我在操作后加入两个流以创建一个新流。代码如下:
DataStream<NewTableA> join1 =
oldTableADataStream
.keyBy(t -> t.getFa3())
.join(tableBDataStream)
.where(new oldTableAKeySelector())
.equalTo(new TableBKeySelector())
.window(EventTimeSessionWindows.withGap(Time.milliseconds(WIN_GAP_TIME)))
.allowedLateness(Time.milliseconds(allowedLateness))
.apply(new oldTableAJoinTableBFunc());
//.assignTimestampsAndWatermarks(new assignTSAndWMLastMax<>(maxOutOfOrderness));
join1.process(
new ProcessFunction<NewTableA, NewTableA>() {
@Override
public void processElement(NewTableA value, Context ctx, Collector<NewTableA> out)
throws Exception {
System.out.println(" NewTableA wmts:" + ctx.timerService().currentWatermark());
System.out.println(" NewTableA ts:" + ctx.timestamp() + " " + value);
}
});
oldTableAJoinTableBFunc的代码如下
public class oldTableAJoinTableBFunc implements JoinFunction<OldTableA, TableB, NewTableA> {
@Override
public NewTableA join(OldTableA oldTableA, TableB tableB) throws Exception {
//System.out.println("join1 on");
NewTableA newTableA = new NewTableA();
newTableA.setPA1(oldTableA.getPa1());
newTableA.setA2(oldTableA.getA2());
newTableA.setFA3(oldTableA.getFa3());
newTableA.setFA4(oldTableA.getFa4());
newTableA.setB2(tableB.getB2());
newTableA.setB3(tableB.getB3());
// importance!!!
newTableA.setTs(oldTableA.getTs());
return newTableA;
}
}
上面的示例是,在事件时间上,将OldTableAdatStream连接到TableDataStream。
我发现了一个有趣的现象。flink自动创建join1中事件的时间戳。
当我创建oldTableADataStream和tableBDataStream的测试数据时,我故意设置了所有的1000000010和1000000044.but后加入和应用html" target="_blank">函数,新流join1
中事件的时间戳由flink更改,打印如下:
NewTableA wmts:**1000000042**
NewTableA ts:**1000000143** NewTableA{PA1=10, A2='a20', FA3=21, B2='b21', B3='b31', FA4=39, C2='null', FC3=null, D2='null', D3='null', ts=**1000000011**}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:**1000000143** NewTableA{PA1=1, A2='a20', FA3=20, B2='b20', B3='b30', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=**1000000010**}
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=11, A2='a20', FA3=21, B2='b21', B3='b31', FA4=40, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000135 NewTableA{PA1=21, A2='a20', FA3=22, B2='b22', B3='b32', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=38, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=57, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA ts:1000000143 NewTableA{PA1=2, A2='a20', FA3=20, B2='b20', B3='b30', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=12, A2='a20', FA3=21, B2='b21', B3='b31', FA4=41, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000135 NewTableA{PA1=22, A2='a20', FA3=22, B2='b22', B3='b32', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=58, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=13, A2='a20', FA3=21, B2='b21', B3='b31', FA4=42, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=39, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=59, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA ts:1000000135 NewTableA{PA1=23, A2='a20', FA3=22, B2='b22', B3='b32', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000143 NewTableA{PA1=3, A2='a20', FA3=20, B2='b20', B3='b30', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=14, A2='a20', FA3=21, B2='b21', B3='b31', FA4=43, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=40, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000016}
NewTableA ts:1000000135 NewTableA{PA1=24, A2='a20', FA3=22, B2='b22', B3='b32', FA4=33, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
......
似乎没有规则创建新事件的时间戳,1000000143和1000000138和1000000135等是如何计算的?似乎与水印无关,因为时间戳水印是1000000042,它与同一时间事件的时间戳不同。
操作依靠什么规则来生成事件的新时间戳,我还没有找到官方说明,谁能给它们一个链接?
由时间窗口创建的输出事件的时间戳设置为该窗口内的最大时间戳。因此,它基本上是包含事件的会话的会话边界。
由于您已经键入了基于FA3的会话窗口,因此您将获得取决于FA3: 100000014320和21, 100000013522的值,以及1000000138的23和24的值。
用户和首选项由objectProperty链接。 objectProperty断言需要通过SWRL规则从其他用户方面推断。例如,如果用户有听力困难,则需要将设置为,因此: 这很好用。但是,由于我有其他SWRL规则,这些规则也为相同的用户推断,例如: 我需要的是一个规则,不知怎么的,只有在没有更高的级别已经断言的情况下才会断言偏好。在给定的示例中,即使为true,唯一断言的级别也应该是,因为是其他规
我是新的apache flink,并试图了解事件时间和窗口的概念是如何处理的flink。 下面是我的设想: > 我有一个程序,它以线程的形式运行,每秒创建一个包含3个字段的文件,其中第3个字段是时间戳。 虽然每隔5秒我会在创建的新文件中输入一个旧的时间戳(可以说是t-5),但还是有一些调整。 现在,我运行流处理作业,将上面的3个字段读入一个元组。 现在,我定义了以下用于水印和时间戳生成的代码: 然
我已经用java编写了一个RESTful服务,并希望用get方法生成json。由于jar版本或依赖关系的问题,我一直在与许多没有找到类定义的错误作斗争。 例如 ,杰克逊错误:java.lang.NoClassDefFoundError: com/fasterxml/Jackson/core/Versioned 引起者:java.lang.NoClassDefFoundError: com/aste
我有什么似乎是一个简单的问题,关于如何设置Jenkins和maven 3。 为了简单起见,假设我们有四个项目: 没有依赖关系的模型 Commons依赖于模型 服务器依赖于Common和模型 前端依赖于模型 我想要实现的是,一个成功的基于模型的构建触发所有项目的新构建,这些项目在其pom中依赖于模型(这里是公共的,服务器和前端) 如果Common失败,则无需构建服务器。 在上述情况下,我似乎可以通过
我的问题是: 在Drools规则中,有没有一种方法可以拦截“成功”事件?为了更好地解释。。。有一种方法可以在规则的所有条件都为真时调用侦听器? 注意:我不想设置一个全局对象(在会话上)来管理规则的“然后”子句中的这个条件。 我正在寻找一个已经在Drools上实现的解决方案 规则示例
每次更新ECS服务时(更新完成或达到所需状态后),我都希望收到一封电子邮件 我考虑过CloudWatch事件规则将SNS主题设置为目标(即确认的电子邮件地址)。然而,它不起作用。 这是我的自定义事件模式: 我也尝试过: TASKSET_STEADY_STATE CAPACITY_PROVIDER_STEADY_STATE SERVICE_DESIRED_COUNT_UPDATED 我正在通过中情局