当前位置: 首页 > 知识库问答 >
问题:

Apache Flink CEP中没有事件

庾远航
2023-03-14

我是Apache Flink CEP的新手,我正在努力检测一个简单的事件缺失。

我试图检测的是,具有特定ID的CurrencyEvent类型的事件在一定时间内不会发生。我想每次在3000ms之后事件没有发生时检测没有这样的事件。

我的模式代码如下所示:

Pattern<CurrencyEvent, ?> myPattern = Pattern.<Event>begin("CurrencyEvent")
        .subtype(CurrencyEvent.class)
        .where(new SimpleCondition<CurrencyEvent>() {
            @Override
            public boolean filter(CurrencyEvent currencyEvent) throws Exception {
                return currencyEvent.getId().equalsIgnoreCase("usd");
            }
        })
        .within(Time.milliseconds(3000L));

所以现在我的想法是使用超时函数来检测超时事件:

    DataStreamSource<Event> events = env.addSource(new TestSource(
            Arrays.asList(
                    basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D),
                    basicCurrencyWithMivLevelEvent("USD", 100L, Arrays.asList("1", "2"), 200D),
                    basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D)
            ),
            1636040364820L, // initial timestamp for the first element
            7000 // 7 seconds between each event
    ));

  PatternStream<Event> patternStream = CEP.pattern(
            events,
            (Pattern<Event, ?>) myPattern
    );

    OutputTag<Alarm> tag = new OutputTag<Alarm>("currency-timeout"){};

    PatternFlatTimeoutFunction<Event, Alarm> eventAlarmTimeoutPatternFunction = (patterns, timestamp, ctx) -> {
        System.out.println("New alarm, since after 3 seconds an event with id=usd is not detected");
        //TODO: call collect

    };


    PatternFlatSelectFunction<Event, Alarm> eventAlarmPatternSelectFunction = (patterns, ctx) -> {
        System.out.println("Select! (we can ignore it) " + patterns);
        // ignore matched events
    };

    return patternStream.flatSelect(
            tag,
            eventAlarmTimeoutPatternFunction,
            TypeInformation.of(Alarm.class),
            eventAlarmPatternSelectFunction
    );

我的测试源使用事件时间戳和水印,如下所示:

public class TestSource implements SourceFunction<Event> {

    private final List<Event> events;
    private final long initialTimestamp;
    private final long timeBetweenInMillis;

    public TestSource(List<Event> events, long initialTimestamp, long timeBetweenInMillis){
        this.events = events;
        this.initialTimestamp = initialTimestamp;
        this.timeBetweenInMillis = timeBetweenInMillis;


    }
    @Override
    public void run(SourceContext<Event> sourceContext) throws InterruptedException {
        long timestamp  = this.initialTimestamp;
        for(Event event: this.events){
            sourceContext.collectWithTimestamp(event, timestamp);
            sourceContext.emitWatermark(new Watermark(timestamp));
            timestamp+=this.timeBetweenInMillis;
        }
    }

    @Override
    public void cancel() {

    }

}

我在用TimeCharacteristics.EventTime.

由于窗口时间(3秒)低于每个事件之间的事件时间差(7秒),我希望得到一些超时事件,但我得到0。

共有1个答案

柯永福
2023-03-14

CEPPattern匹配一个或多个事件的序列;(间隔)子句中的添加了一个额外的约束,即序列中的所有事件都必须发生在指定的间隔内。当部分匹配超时时,这可以在TimedOutPARalMatchHandler中捕获。

在您的情况下,由于成功匹配的模式由单个事件组成,因此不可能存在部分匹配,并且匹配永远不会超时。(匹配序列的长度始终小于3秒。)

您可以做的是扩展模式定义以包括第二个事件,以便在3秒内必须有一个开始事件,然后是另一个事件。当第二个事件丢失时,您将有一个部分匹配超时。

为了实现涉及缺失事件的用例比CEP提供的更大的灵活性,您可以将KeyedProcessFunction与定时器一起使用。

 类似资料:
  • 我正在尝试从事件中心读取数据,但结果它只返回空值。 我将一个数据帧转换为json以发送到eventhub 这是数据的模式 我正在尝试使用从eventhub读取数据 从 pyspark.sql.类型导入数组类型, 双类型, 结构类型, 结构字段, 字符串类型, 长类型, 布尔类型

  • 问题内容: 这是我的代码。我正在尝试使用箭头键使球移动。当我运行上述程序时 ,不显示球 ( 如果我将坐标更改为显示0,30球), 并且事件未触发,球既不移动也不跳跃 ? 问题答案:

  • 我在库伯内特斯连夜运行了一项工作。当我早上检查它时,它已经失败了。通常,我会检查pod日志或事件来确定原因。然而,pod被删除了,没有事件。 下面是输出: 这是作业配置yaml。它有,但是它从来没有重启过。我也没有设置TTL,所以豆荚永远不会被清理。 我如何调试这个?

  • 因此,我遵循了一个关于创建Discord机器人的Python Discord教程。当我运行代码时,它说“AttributeError:module'discord.client'没有属性'event'”。当我在网上搜索时,大多数答案都令人困惑。我正在使用repl。现在就开始,在Pycharm上试一试。两者都以相同的错误结束 仅供参考:我删除了,这样就不会显示令牌。

  • 问题内容: 使用Spring(3.0.5),Hibernate(3.6.0)和Wicket(1.4.14)开发应用程序时遇到了一个奇怪的问题。问题是:我无法将任何对象保存或修改到数据库中。“不能”是指对象的所有更改或对EntityManager.persist(foo)的调用都被简单,无声地忽略。选择工作。 示例案例很简单-在某些检票页面上,我尝试将对象保存到数据库中,如下所示 这是comicDA

  • 我正在Spring3和Hibernate3中进行产品构建,我已经更新了这个Hibernate5和Spring5。现在我可以部署应用程序了,但是当我尝试与数据库连接时,会得到“javax.persistence.TransactionRequiredException:no transaction is in progress”