当前位置: 首页 > 知识库问答 >
问题:

使用Apache Flink进行动态模式评估

符修杰
2023-03-14

我是Apache Flink的新手,我正在尝试使用Flink CEP动态评估流中的模式。我正在尝试查找执行以下操作的用户登录、addtocart和注销,并且能够检测到模式,但是如果我定义了多个模式,例如登录,注销,则无法检测到模式

下面是我的代码

动作类

public class Action {

    public int userID;
    public String action;

    public Action() {
    }

    public Action(int userID, String action) {
        this.userID = userID;
        this.action = action;
    }

    public int getUserID() {
        return userID;
    }

    public void setUserID(int userID) {
        this.userID = userID;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    @Override
    public String toString() {
        return "Action [userID=" + userID + ", action=" + action + "]";
    }

}

模式类

public class Pattern {

    public String firstAction;
    public String secondAction;
    public String thirdAction;

    public Pattern() {

    }

    public Pattern(String firstAction, String secondAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
    }

    public Pattern(String firstAction, String secondAction, String thirdAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
        this.thirdAction = thirdAction;
    }

    public String getFirstAction() {
        return firstAction;
    }

    public void setFirstAction(String firstAction) {
        this.firstAction = firstAction;
    }

    public String getSecondAction() {
        return secondAction;
    }

    public void setSecondAction(String secondAction) {
        this.secondAction = secondAction;
    }

    public String getThirdAction() {
        return thirdAction;
    }

    public void setThirdAction(String thirdAction) {
        this.thirdAction = thirdAction;
    }

    @Override
    public String toString() {
        return "Pattern [firstAction=" + firstAction + ", secondAction=" + secondAction + ", thirdAction=" + thirdAction
                + "]";
    }



}

主类

public class CEPBroadcast {

    public static class PatternEvaluator
            extends KeyedBroadcastProcessFunction<Integer, Action, Pattern, Tuple2<Integer, Pattern>> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        ValueState<String> prevActionState;

        MapStateDescriptor<Void, Pattern> patternDesc;

        @Override
        public void open(Configuration conf) throws IOException {
            prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
            patternDesc = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {

            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
            bcState.put(null, pattern);
            ;

        }

        @Override
        public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {
            Pattern pattern = ctx.getBroadcastState(this.patternDesc).get(null);
            String prevAction = prevActionState.value();

            if (pattern != null && prevAction != null) {

                if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(prevAction)
                        && pattern.thirdAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                } else if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }

            prevActionState.update(action.action);

        }

    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

        DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

        KeyedStream<Action, Integer> actionByUser = actions
                .keyBy((KeySelector<Action, Integer>) action -> action.userID);

        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID,
                Types.POJO(Pattern.class));

        BroadcastStream<Pattern> bcedPattern = pattern.broadcast(bcStateDescriptor);

        DataStream<Tuple2<Integer, Pattern>> matches = actionByUser.connect(bcedPattern)
                .process(new PatternEvaluator());

        matches.flatMap(new FlatMapFunction<Tuple2<Integer, Pattern>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void flatMap(Tuple2<Integer, Pattern> value, Collector<String> out) throws Exception {

                if (value.f1.thirdAction != null) {
                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction + "," + value.f1.thirdAction);
                } else {

                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction);

                }

            }

        }).print();

        env.execute("CEPBroadcast");

    }

}

如果我给出一个模式来评估它的输出,如下所示

DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

Output: User ID: 1001,Pattern matched:login,logout

如果我试图给多个模式进行评估,如下面所示,它不是评估第二个模式,而是建议我如何评估多个模式,提前感谢。

DataStream<Pattern> pattern = env.fromElements(new Pattern ("login","addtocart","logout"),
                new Pattern("login", "logout"));

Output:  User ID: 1003,Pattern matched:login,addtocart,logout

共有1个答案

车辰龙
2023-03-14

这不起作用有几个原因:

(1) 每当您有一个具有多个输入流的Flink操作符时,例如应用程序中的PatternEvaluator,您都无法控制该操作符将如何读取其输入。在您的情况下,它可能会在读取模式之前完全消耗动作流中的事件,反之亦然,或者它可能会交错两个流。从某种意义上说,你很幸运,它与任何东西都匹配。

解决这个问题并不容易。如果您在编译时知道所有模式(换句话说,如果它们实际上不是动态的),那么您可以使用Flink CEP,或者从FlinkSQLMATCH_RECOGNIZE。

如果您确实需要动态模式,那么您必须找到一种方法来阻止动作流,直到模式被读取为止。本主题(“侧边输入”)之前已在其他问题中介绍过。例如,请参阅当processElement依赖于广播的数据时,如何在flink中单元测试BroadcastProcessFunction。(或者,您可以调整预期,并满足于只有在存储模式后处理的操作才能与该模式匹配。)

(2)在存储模式时使用null作为键

bcState.put(null, pattern);

当第一个模式到达时,您正在用第二个模式覆盖它。永远不会有两个模式都可用于匹配的时候。

要根据两种不同的模式匹配输入,您需要修改模式计算器,以处理两种模式的同时匹配。这将需要以广播状态存储这两种模式,在processElement中考虑这两种模式,并为这两种模式提供prevActionState实例。您可能希望提供模式ID,将这些ID用作广播状态下的键,并将MapState用于再次由模式ID设置键的prevActionState。

更新时间:

请记住,当您使用DataStream API编写流作业时,您并没有像在典型的过程应用程序中那样定义执行顺序。相反,您正在描述数据流图的拓扑,以及嵌入在该图中的将执行作业(将并行执行)的运算符的行为。

 类似资料:
  • 问题内容: 给定两个数据框 我想使用对一列或多列进行算术运算pd.eval。具体来说,我想移植以下代码: …使用进行编码eval。使用的原因eval是我想自动执行许多工作流程,因此动态创建它们对我很有用。 我试图更好地理解和参数,以确定如何最好地解决我的问题。我已经浏览了文档,但是对我而言,区别并不明显。 应该使用什么参数来确保我的代码以最高性能工作? 有没有一种方法可以将表达式的结果赋值给df2

  • 最近我尝试使用ApacheFlink进行快速批处理。我有一个表,它有一个列:value和一个不相关的索引列 基本上我想计算每5行值的平均值和范围。然后我将根据我刚才计算的平均值计算平均值和标准偏差。所以我想最好的方法是使用窗口。 看起来是这样的 但是我不知道用。我试过,但它说没有这样的输入。我只希望它在从源代码读取时按顺序分组。但是它必须是一个时间属性,所以我不能使用索引列作为排序。 我是否必须添

  • 问题内容: 我有一个关于使用JdbcTemplate进行动态查询的问题。 我的代码如下: 现在,我的问题是,我想要与插入查询中的自动生成问号相同数量的“值”。 现在,值变量考虑为一个字符串,因此,如果我有2个或更多问号,则在值变量中只有一个用逗号分隔的完整字符串,因此它不起作用。 见下面我的查询: 我想要如下: 问题答案: }

  • 问题内容: 我想知道是否有一种方法可以将“ java.exe”作为后台进程执行(静默模式执行) 例如:java -cp。MyClass arg1 我想将以上语句作为后台进程运行,而无需打开命令窗口 问题答案: 在Windows下,使用代替。请参阅此处的链接,相关位复制在此处: 该命令与相同,除了没有关联的控制台窗口。使用时,你不想要一个命令提示符窗口出现。该发射器将,但是,如果启动由于某种原因失败

  • 我正在为spring批处理使用java配置(spring boot)。我有一个员工Id列表,对于每个Id,我需要运行一个查询(如下所示),然后处理数据。 我知道我们可以使用阅读器。setPreparedStatementSetter动态设置上述SQL中的参数。但是,我不确定如何对列表中的每个员工id重复批处理过程。即使我将reader()标记为@StepScope,也只会调用一次reader。(即

  • 在这篇文章我们看到了如何使用iNalyzer对iOS应用进行静态分析。本文我们将看看如何用iNalyer对iOS应用进行运行时分析。我们能够在运行时调用方法,能够在应用的某个特殊时间找出特定实例变量的值,基本上能做我们用Cycript做的所有事情。 在这篇文章当中,我们成功的用Doxygen生成了html文件,并且打开它看到了关于这个应用的类信息和其他信息。我们将使用Firefox浏览器进行运行时