当前位置: 首页 > 知识库问答 >
问题:

基于java的Apache-Flink-CEP模式检测

曾嘉瑞
2023-03-14

我想做
从映射结构中包含的任何arraylist元素的CEP开始,然后继续我已经开始的其余arraylist元素
地图和图案结构:

final Map< Integer,ArrayList<String>> deger = new HashMap<Integer,ArrayList<String>>();
        deger.put(1,new ArrayList<String>(Arrays.asList("h:1","l:1","g:0")));
        deger.put(2,new ArrayList<String>(Arrays.asList("h:1","l:1","g:1")));
        deger.put(3,new ArrayList<String>(Arrays.asList("h:2","l:3","g:1")));
        deger.put(4,new ArrayList<String>(Arrays.asList("h:0","l:2","g:2")));

 for(int i=1;i<deger.size()+1;i++) {
            temp1.add(deger.get(i));
        }

Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                new SimpleCondition<String>() {
//                    @Override
                    public boolean filter(String value) throws Exception {

                        for (ArrayList<String> aa: temp1){
                            for (String dd : aa)
                                if(value.equals(dd)){ 
                                    return true;
                                }
                        }
                        return false;
                    }
                }
        ).followedBy("middle").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(1));
                    }
                }
        ).followedBy("end").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(2));
                    }
                }
        );

我的目的是使用map中的数组列表元素发出警告,但由于其中的流流,数组列表元素的顺序并不重要。我想继续处理这个数组的其余元素,当我从这里的任何数组开始时,我可以返回这个数组的信息。例如:

Incoming data = "l:1","h:1","g:0"
my pattern = "h:1","l:1","g:0" 
Start -> l:1 find
Middle -> g:0 or h:1 | h:1 find
End -> g:0 find -> alarm

共有2个答案

淳于坚壁
2023-03-14

因此,目前AFAIK Flink不支持开箱即用的非有序模式,因此基本上我看到了两种解决此问题的方法:

1) 您可以创建要搜索的所有可能的模式,并简单地合并所有生成的数据流。

2) 正如这篇帖子所暗示的那样,FlinkCEP:我可以引用之前的事件来定义后续匹配吗?您可以尝试使用IterativeCondition,这将允许您访问之前已经匹配的元素,因此基本上您必须定义匹配列表中所有可能元素的模式,然后只需检查最后一个条件是否所有三个元素都属于同一列表。如果是,则找到模式。

饶滨海
2023-03-14
 public static  Integer temp1;
    public static  Map<Integer,ArrayList<String>> temp2 = new HashMap<>();     
final Map< Integer,ArrayList<String>> deger = new HashMap<>();
            deger.put(1,new ArrayList<>(Arrays.asList("h:1","g:1","s:0")));
            deger.put(2,new ArrayList<>(Arrays.asList("h:1","g:1","g:0")));
            deger.put(3,new ArrayList<>(Arrays.asList("h:1","c:0","g:0")));
            deger.put(4,new ArrayList<>(Arrays.asList("h:1","s:1","g:0")));


            Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String value) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryStart : deger.entrySet()) {
                                if(entryStart.getValue().contains(value) && !temp2.containsKey(entryStart.getKey())){
                                        ArrayList<String> newList = new ArrayList<String>();
                                        newList.addAll(entryStart.getValue());
                                        newList.remove(value);
                                        temp2.put(entryStart.getKey(),newList);
                                        flag = true;
                                }
                            }
                            return flag;
                        }
                    }
            ).followedBy("middle").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String middle) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryMiddle : temp2.entrySet()) {
                                if(entryMiddle.getValue().contains(middle) && entryMiddle.getValue().size() == 2){
                                    ArrayList<String> newListMiddle = new ArrayList<String>();
                                    newListMiddle.addAll(entryMiddle.getValue());
                                    newListMiddle.remove(middle);
                                    temp2.put(entryMiddle.getKey(),newListMiddle);
                                    flag = true;
                                }
                            }
                            return flag;
                        }
                    }
            ).followedBy("end").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String end) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryEnd : temp2.entrySet()) {
                                if(entryEnd.getValue().contains(end) && entryEnd.getValue().size() == 1){
                                    flag = true;
                                    temp1 = entryEnd.getKey();
                                }
                            }
                            if (flag)
                                temp2.remove(temp1);
                            return flag;
                        }
                    }
            );

            PatternStream<String> patternStream = CEP.pattern(stream_itemset_ham,pattern);

            DataStream<String> result = patternStream.select(
                    new PatternSelectFunction<String, String>() {
                        @Override
                        public String select(Map<String, List<String>> map) throws Exception {
                            ArrayList<String> NewList= new ArrayList<>();
                            NewList.addAll(deger.get(temp1));
                            String found = "Found";
                            for (String list_element : NewList)
                                found += " " + list_element ;
                            return found;
                        }
                    }
            );
            result.print();

我从你的问题中了解到,可以提供这种解决方案。

 类似资料:
  • 请帮帮我,我有两个问题: 我从Apache Kafka json-messages读到(然后我有步骤:反序列化到POJO、筛选器、keyBy...) 使用哪个更好:KeyedProcessFunction(带有状态、计时器、if-else逻辑块)还是Flink CEP模式库? 我可以检查KeyedProcessFunction中的输入序列(检查state,if-else blocks,out.co

  • 目前我正在做一个学期项目,我必须认识三个事件的系列。像

  • 我有一个场景,如果第二个事件在x秒内没有跟随第一个事件,我必须更改状态。例如,对于用户没有在100分钟内注销,将他视为处于无效状态。如何使用当前模式操作设计这一点?

  • 我对Flink CEP库还是个新手,但我不了解模式检测行为。考虑到下面的示例,我有一个Flink应用程序,它使用来自kafka主题的数据,数据是定期生成的,我想使用Flink CEP模式来检测值何时大于给定阈值。代码如下: 当我运行作业时会发生什么,模式检测不会实时发生,它只在生成第二条记录后才输出当前记录检测到的模式的警告,似乎延迟了将警告打印到日志中,我真的不知道如何让它在检测到模式时输出警告

  • 并行度=1(成功检测到模式) 并行度=4(无法检测到模式)

  • 我正在尝试为Kafka输入流实现一个非常简单的Apache Flink CEP。Kafka生产者生成一个简单的Double值,并通过Kafka主题将它们作为字符串发送给消费者。目前,我正在用Flink编码一个CEP消费者。到目前为止,这是我编写的代码: 如果我正在尝试执行这段代码,这是一条错误消息: 编辑:我尝试了另一个例子,每次执行我都得到相同的错误。所以我觉得我的包裹有问题?