我是Apache Flink CEP的新手,我正在努力检测一个简单的事件缺失。
我试图检测的是,具有特定ID的CurrencyEvent类型的事件在一定时间内不会发生。我想每次在3000ms之后事件没有发生时检测没有这样的事件。
我的模式代码如下所示:
Pattern<CurrencyEvent, ?> myPattern = Pattern.<Event>begin("CurrencyEvent")
.subtype(CurrencyEvent.class)
.where(new SimpleCondition<CurrencyEvent>() {
@Override
public boolean filter(CurrencyEvent currencyEvent) throws Exception {
return currencyEvent.getId().equalsIgnoreCase("usd");
}
})
.within(Time.milliseconds(3000L));
所以现在我的想法是使用超时函数来检测超时事件:
DataStreamSource<Event> events = env.addSource(new TestSource(
Arrays.asList(
basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D),
basicCurrencyWithMivLevelEvent("USD", 100L, Arrays.asList("1", "2"), 200D),
basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D)
),
1636040364820L, // initial timestamp for the first element
7000 // 7 seconds between each event
));
PatternStream<Event> patternStream = CEP.pattern(
events,
(Pattern<Event, ?>) myPattern
);
OutputTag<Alarm> tag = new OutputTag<Alarm>("currency-timeout"){};
PatternFlatTimeoutFunction<Event, Alarm> eventAlarmTimeoutPatternFunction = (patterns, timestamp, ctx) -> {
System.out.println("New alarm, since after 3 seconds an event with id=usd is not detected");
//TODO: call collect
};
PatternFlatSelectFunction<Event, Alarm> eventAlarmPatternSelectFunction = (patterns, ctx) -> {
System.out.println("Select! (we can ignore it) " + patterns);
// ignore matched events
};
return patternStream.flatSelect(
tag,
eventAlarmTimeoutPatternFunction,
TypeInformation.of(Alarm.class),
eventAlarmPatternSelectFunction
);
我的测试源使用事件时间戳和水印,如下所示:
public class TestSource implements SourceFunction<Event> {
private final List<Event> events;
private final long initialTimestamp;
private final long timeBetweenInMillis;
public TestSource(List<Event> events, long initialTimestamp, long timeBetweenInMillis){
this.events = events;
this.initialTimestamp = initialTimestamp;
this.timeBetweenInMillis = timeBetweenInMillis;
}
@Override
public void run(SourceContext<Event> sourceContext) throws InterruptedException {
long timestamp = this.initialTimestamp;
for(Event event: this.events){
sourceContext.collectWithTimestamp(event, timestamp);
sourceContext.emitWatermark(new Watermark(timestamp));
timestamp+=this.timeBetweenInMillis;
}
}
@Override
public void cancel() {
}
}
我在用TimeCharacteristics.EventTime.
由于窗口时间(3秒)低于每个事件之间的事件时间差(7秒),我希望得到一些超时事件,但我得到0。
CEPPattern
匹配一个或多个事件的序列;(间隔)子句中的添加了一个额外的约束,即序列中的所有事件都必须发生在指定的间隔内。当部分匹配超时时,这可以在
TimedOutPARalMatchHandler
中捕获。
在您的情况下,由于成功匹配的模式由单个事件组成,因此不可能存在部分匹配,并且匹配永远不会超时。(匹配序列的长度始终小于3秒。)
您可以做的是扩展模式定义以包括第二个事件,以便在3秒内必须有一个开始事件,然后是另一个事件。当第二个事件丢失时,您将有一个部分匹配超时。
为了实现涉及缺失事件的用例比CEP提供的更大的灵活性,您可以将
KeyedProcessFunction
与定时器一起使用。
我正在尝试从事件中心读取数据,但结果它只返回空值。 我将一个数据帧转换为json以发送到eventhub 这是数据的模式 我正在尝试使用从eventhub读取数据 从 pyspark.sql.类型导入数组类型, 双类型, 结构类型, 结构字段, 字符串类型, 长类型, 布尔类型
问题内容: 这是我的代码。我正在尝试使用箭头键使球移动。当我运行上述程序时 ,不显示球 ( 如果我将坐标更改为显示0,30球), 并且事件未触发,球既不移动也不跳跃 ? 问题答案:
我在库伯内特斯连夜运行了一项工作。当我早上检查它时,它已经失败了。通常,我会检查pod日志或事件来确定原因。然而,pod被删除了,没有事件。 下面是输出: 这是作业配置yaml。它有,但是它从来没有重启过。我也没有设置TTL,所以豆荚永远不会被清理。 我如何调试这个?
因此,我遵循了一个关于创建Discord机器人的Python Discord教程。当我运行代码时,它说“AttributeError:module'discord.client'没有属性'event'”。当我在网上搜索时,大多数答案都令人困惑。我正在使用repl。现在就开始,在Pycharm上试一试。两者都以相同的错误结束 仅供参考:我删除了,这样就不会显示令牌。
问题内容: 使用Spring(3.0.5),Hibernate(3.6.0)和Wicket(1.4.14)开发应用程序时遇到了一个奇怪的问题。问题是:我无法将任何对象保存或修改到数据库中。“不能”是指对象的所有更改或对EntityManager.persist(foo)的调用都被简单,无声地忽略。选择工作。 示例案例很简单-在某些检票页面上,我尝试将对象保存到数据库中,如下所示 这是comicDA
我正在Spring3和Hibernate3中进行产品构建,我已经更新了这个Hibernate5和Spring5。现在我可以部署应用程序了,但是当我尝试与数据库连接时,会得到“javax.persistence.TransactionRequiredException:no transaction is in progress”