当前位置: 首页 > 知识库问答 >
问题:

Flink复杂事件处理

芮博厚
2023-03-14

我有一个flink cep代码,可以从套接字读取数据并检测模式。假设模式(单词)为“警报”。如果单词alert出现五次或五次以上,则应创建一个警报。但我得到了一个输入不匹配错误。Flink版本为1.3.0。提前谢谢!!

package pattern;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

    public class cep {

         public static void main(String[] args) throws Exception {


             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() {
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception {
                                return word.equals("alert");
                            }
                        })
                        .times(5);


                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .flatSelect((Map<String,List<String>> in, Collector<String> out) -> {

                            String first = in.get("first").get(0);

                            for (int i = 0; i < 6; i++ ) {

                                out.collect(first);

                            }


                        });

                alerts.print();

                env.execute();

            }

    }

共有2个答案

唐繁
2023-03-14

所以我有了代码来工作。这是可行的解决方案,

    package pattern;

    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternSelectFunction;
    import org.apache.flink.cep.PatternStream;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.IterativeCondition;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;

    import java.util.List;
    import java.util.Map;

    public class cep {

         public static void main(String[] args) throws Exception {


             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() {
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception {
                                return word.equals("alert");
                            }
                        })
                        .times(5);

                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .select(new PatternSelectFunction<String, String>() {
                            @Override
                            public String select(Map<String, List<String>> in) throws Exception {

                                String first = in.get("first").get(0);

                                if(first.equals("alert")){

                                    return ("5 or more alerts");
                                }
                                else{

                                    return (" ");
                                }
                            }
                        });

                alerts.print();

                env.execute();

            }

    }
席俊驰
2023-03-14

只是对最初的问题做了一些澄清。在1.3.0中,有一个bug使得使用lambdas作为参数来选择/扁平选择变得不可能。

它在1.3.1中已修复,因此您的第一个版本的代码将与1.3.1一起使用。

此外,我认为你误解了时间量词。它与精确的次数匹配。所以,在您的情况下,只有当事件精确匹配3次时,它才会返回,而不是3次或更多。

 类似资料:
  • 我正在尝试了解Apache Flink CEP程序,该程序用于监控数据中心中的机架温度,如Flink官方文档所述。但是当我按照步骤使用mvn clean package创建jar并尝试使用命令执行包时 但我有以下错误, 我尝试了给出这里描述的类路径的不同变体,但得到了相同的错误。有人能指出我在运行程序时的错误吗?

  • 我有一个离散事件流进入我的系统,我需要根据每个事件的内容应用规则。另外,我想对这些流事件应用复杂的事件处理。 约束1.这些规则是用户提供的,并将动态更改。2.每当应用规则时,我不想重新启动我的系统。3.HA 4.只有成熟的开源解决方案 可能的方式...1.在Storm螺栓内运行Esper CEP 2。让口水流到Storm螺栓里 > 这会处理单事件规则和复杂事件吗?规则更改是否需要我的Storm重新

  • 我想根据具有相同标识符的两个事件来检测两个事件是否在定义的时间范围内发生。例如,如下所示: 下面示例中的My DoorEvent java类具有相同的结构。 我想检测id为1的门在打开后5分钟内关闭。为此,我尝试使用Apache flink CEP库。传入流包含来自20扇门的所有打开和关闭消息。 如何在中将门1的状态保存为打开,以便在步骤中我知道门1是关闭的门,而不是其他门?

  • 我一直在寻找如何将apache storm用作CEP的方法,但似乎有两个概念(流处理和复杂事件处理),在CEP中,您可以编写类似sql的查询,并在数据流上执行它们,如ESPER,但我在apache storm中找不到类似的东西,这是否意味着apache storm是一个数据流处理器而不是CEP?

  • 我需要延迟处理一些事件。 我有三件事(发表在Kafka上): A(id: 1, retry At: now) B(id: 2, retry At: 10分钟后) C(id: 3, retry At: now) 我需要立即处理记录A和C,而记录B需要在十分钟后处理。这在Apache Flink中实现可行吗? 到目前为止,无论我研究了什么,“触发器”似乎都有助于在Flink中实现它,但还没有能够正确实