当前位置: 首页 > 知识库问答 >
问题:

是否可以在apache flink CEP中处理多个流?

羊舌航
2023-03-14

我的问题是,如果我们有两个原始事件流,即烟雾和温度,并且我们想通过将运算符应用于原始流来找出复杂事件(即火灾)是否发生,我们可以在Flink中做到这一点吗?

我问这个问题是因为到目前为止,我所看到的Flink CEP的所有示例都只包括一个输入流。如果我错了,请纠正我。

共有2个答案

俞学
2023-03-14

我想知道是否可以进行严格的链接(如果可以使用next,则不必遵循),因为在给定的流中,对于特定的时间戳可能会有许多事件。假设时间t1-:a,b,c-,这三个事件发生,时间t2-:a2,b2,c2发生在flink引擎上。所以,我想知道我们如何得到事件(a)。下一个(a2),因为它可能永远不会是这样,因为序列类似于-:a b c a2 b2 c2

然而,如果CEP模块处理事件时将一个时间戳视为单个事件,那么这是有意义的。

吕高寒
2023-03-14

简短回答-是的,您可以根据不同流源中的事件类型读取和处理多个流和火灾规则。

长答案-我有一个有点类似的要求,我的答案是基于假设你正在阅读来自不同Kafka主题的不同流。

阅读在单个源中传输不同事件的不同主题:

FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
        new StringSerializerToEvent(),
        props);

kafkaSource.assignTimestampsAndWatermarks(new 
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
        .filter(Objects::nonNull);

序列化程序读取数据并将其解析为具有通用格式的-例如。

@Data
public class BAMEvent {
 private String keyid;  //If key based partitioning is needed
 private String eventName; // For different types of events
 private String eventId;  // Any other field you need
 private long timestamp; // For event time based processing 

 public String toString(){
   return eventName + " " + timestamp + " " + eventId + " " + correlationID;
 }

}

在这之后,事情变得非常简单,根据事件名称定义规则,并比较事件名称来定义规则(您也可以定义复杂的规则,如下所示):

Pattern.<BAMEvent>begin("first")
        .where(new SimpleCondition<BAMEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(BAMEvent event) throws Exception {
            return event.getEventName().equals("event1");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<BAMEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {

            if (!secondEvent.getEventName().equals("event2")) {
              return false;
            }

            for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getEventId = firstEvent.getEventId()) {
                return true;
              }
            }
            return false;
          }
        })
        .within(withinTimeRule);

我希望这能为您提供将一个或多个不同流集成在一起的想法。

 类似资料:
  • 问题内容: 我知道linux确实做了嵌套的中断,其中一个中断可以“抢占”另一个中断,但是其他任务呢。 我只是想了解linux如何处理中断。它们可以被其他用户任务/内核任务抢占吗? 问题答案: 简单答案:中断只能被更高优先级的中断所中断。 因此, 如果 中断的优先级低于内核调度程序中断优先级或用户任务中断优先级,则中断可以被内核或用户任务中断。 注意,“用户任务”是指 用户定义的中断 。

  • 嗨,我是新春批。 我有如下Spring批次的情况: 我需要运行所有促销的批处理[促销列表] > 在这里,我想再次从batch中读取上面的动态查询,因为它返回的结果至少为5万条记录。 以下是我所期待的过程,这在Spring批次中是否可行? 阅读促销【读者逐一阅读促销】 创建查询并将其放在上下文中 传递给下一个读者 读取器逐个读取事务 处理交易并计算积分 我这里的问题是不能写嵌套块[一个用于读取提升,

  • 我对keycloak不熟悉,对它没有深入的了解。我们必须在Spring Boot中为一个应用程序实现身份验证,其中有团队,用户可以是不同团队的一部分。他/用户可以为每个团队拥有不同的角色/权限。例如,用户A可以是team1的经理,同时,他可以是team2的管理员,等等。 我以一种为每个团队提供一个钥匙斗篷客户端的方式实现了它。客户端角色将作为团队权限分配给用户。 现在,我想以这样一种方式限制API

  • 问题内容: 我正在使用python 2.7,并尝试在自己的进程中运行一些CPU繁重的任务。我希望能够将消息发送回父流程,以使其随时了解流程的当前状态。为此,多处理队列似乎很完美,但我不知道如何使它工作。 因此,这是我的基本工作示例,减去了Queue的使用。 我尝试以几种方式传递队列,它们收到错误消息“ RuntimeError:队列对象仅应通过继承在进程之间共享”。这是我根据之前发现的答案尝试的一

  • 问题陈述:我必须在JSP上显示来自两个独立模型的数据。 例如,假设有两种模型:account和Student。 学生模型的属性是可编辑的,因此,我可以使用spring表单中的modelattribute并将数据从jsp映射到java对象。 account的属性只能读取,但正如我在modelattribute中提到的,在jsp表单中,学生模型是可以读取的,每个jsp表单只能有一个modelattri

  • 我读过spring batch中的分区,我发现了一个演示分区的示例。该示例从CSV文件中读取人员,进行一些处理,并将数据插入数据库。在本例中,1 partitioning=1 file,因此partitioner实现如下所示: 但如果我有一个10TB的文件呢?spring批处理是否允许以某种方式对其进行分区? 我尝试了以下方法来实现我的目标: 分为两步——第一步将文件分成若干部分,第二步处理第一步