当前位置: 首页 > 知识库问答 >
问题:

使用apache flink进行有状态复杂事件处理

呼延鹏云
2023-03-14

我想根据具有相同标识符的两个事件来检测两个事件是否在定义的时间范围内发生。例如,DoorEvent如下所示:

<doorevent>
  <door>
    <id>1</id>
    <status>open</status>
  </door>
  <timestamp>12345679</timestamp>
</doorevent> 

<doorevent>
  <door>
    <id>1</id>
    <status>close</status>
  </door>
  <timestamp>23456790</timestamp>
</doorevent>

下面示例中的My DoorEvent java类具有相同的结构。

我想检测id为1的门在打开后5分钟内关闭。为此,我尝试使用Apache flink CEP库。传入流包含来自20扇门的所有打开和关闭消息。

Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
    new SimpleCondition<String>() {
        private static final long serialVersionUID = 1L;
        public boolean filter(String doorevent) {
            DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
            if (event.getDoor().getStatus().equals("open")){
                // save state of door as open
                return true;
            }
            return false;                           
        }
    }
)
.followedByAny("door_close").where(
    new SimpleCondition<String>() {
            private static final long serialVersionUID = 1L;
            public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
                DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
                if (event.getDoor().getStatus().equals("close")){
                    // check if close is of previously opened door
                    return true;
                }
                return false;
            }
        }
)
.within(Time.minutes(5));

如何在door_open中将门1的状态保存为打开,以便在door_close步骤中我知道门1是关闭的门,而不是其他门?

共有1个答案

仲孙文乐
2023-03-14

如果你有Flink 1.3.0及以上版本,那么你想做什么就很简单了

您的模式如下所示:

Pattern.<DoorEvent>begin("first")
        .where(new SimpleCondition<DoorEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(DoorEvent event) throws Exception {
            return event.getDoor().getStatus().equals("open");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<DoorEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(DoorEvent secondEvent, Context<DoorEvent> ctx) throws Exception {

            if (!secondEvent.getDoor().getStatus().equals("close")) {
              return false;
            }

            for (DoorEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getDoor().getEventID().equals(firstEvent.getDoor().getEventId())) {
                return true;
              }
            }
            return false;
          }
        })
        .within(Time.minutes(5));

因此,基本上,您可以使用IterativeConditions,获取匹配的第一个模式的上下文,并在该列表上迭代,同时比较所需的模式,并根据需要继续。

迭代条件(IterativeConditions)代价高昂,应该相应地进行处理

有关条件的更多信息,请查看此处的Flink-conditions

 类似资料:
  • 我有一个离散事件流进入我的系统,我需要根据每个事件的内容应用规则。另外,我想对这些流事件应用复杂的事件处理。 约束1.这些规则是用户提供的,并将动态更改。2.每当应用规则时,我不想重新启动我的系统。3.HA 4.只有成熟的开源解决方案 可能的方式...1.在Storm螺栓内运行Esper CEP 2。让口水流到Storm螺栓里 > 这会处理单事件规则和复杂事件吗?规则更改是否需要我的Storm重新

  • 我有一个flink cep代码,可以从套接字读取数据并检测模式。假设模式(单词)为“警报”。如果单词alert出现五次或五次以上,则应创建一个警报。但我得到了一个输入不匹配错误。Flink版本为1.3.0。提前谢谢!!

  • 我正在尝试了解Apache Flink CEP程序,该程序用于监控数据中心中的机架温度,如Flink官方文档所述。但是当我按照步骤使用mvn clean package创建jar并尝试使用命令执行包时 但我有以下错误, 我尝试了给出这里描述的类路径的不同变体,但得到了相同的错误。有人能指出我在运行程序时的错误吗?

  • 我一直在寻找如何将apache storm用作CEP的方法,但似乎有两个概念(流处理和复杂事件处理),在CEP中,您可以编写类似sql的查询,并在数据流上执行它们,如ESPER,但我在apache storm中找不到类似的东西,这是否意味着apache storm是一个数据流处理器而不是CEP?

  • 例如,我有简单的DF: 我是否可以使用熊猫的方法和习惯用法,从“A”中选择“B”对应值大于50的值,以及“C”对应值不等于900的值?

  • 我有一个状态机,有三种可能的状态:状态1,状态2,状态3。 每当一个对象进入状态_3时,我想触发一个事件event1。 在我state_machine,我有 但只要状态从state1更改为state3或从state2更改为state3,就会正确触发事件。但当状态从state3转换到state3时,不会触发event1。我怎样才能做到这一点? 我知道从3号州到3号州不是一个州的过渡。我可以使用类似于