当前位置: 首页 > 知识库问答 >
问题:

多个Apache Flink windows验证

施锋
2023-03-14

我刚刚开始使用Apache Flink进行流处理,我收到的Json流如下所示:

{

  token_id: “tok_afgtryuo”,

  ip_address: “128.123.45.1“,

  device_fingerprint: “abcghift”,

  card_hash: “hgtyuigash”,

  “bin_number”: “424242”,

  “last4”: “4242”,

  “name”: “Seu Jorge”

}

并被问到我是否可以履行以下业务规则:

>

  • 如果在过去10秒内此IP的令牌数>5,则拒绝

        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //This DataStream Would be  Converting the Json to a Token Object
            DataStream<Token> baseStream =
                    env.addSource(new SocketTextStreamFunction("localhost",
                            9999,
                            "\n",
                            1))
                            .map(new MapTokens());
    
    
            // 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
           DataStreamSink<String> response1 =  new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
                   5, "seconds").print();
    
            //2 -Decline if number of tokens > 15 for this IP in last minute
            DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
                    62, "minutes").print();
    
            //3- Decline if number of tokens > 60 for this IP in last hour
            DataStreamSink<String> response3  = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
                    60, "Hours").print();
    
            env.execute("Job2");
        }
    

    在另一个类中,我将对规则进行所有的逻辑操作,我将计算IP地址出现的次数,如果它超过了时间窗口中允许的次数,我将返回一条包含一些信息的消息:

    ruleMaker.java

    public class RuleMaker {
    
    
        public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                    String tokenProp,
                                                    Time time, 
                                                    Integer maxPetitions, 
                                                    String ruleType){
    
            return
                   stream
                    .flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
                        @Override
                        public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
    
                             String tokenSelection = "";
                            switch (tokenProp)
                            {
                                case "ip":
                                    tokenSelection = token.getIpAddress();
                                    break;
                                case "device":
                                    tokenSelection = token.getDeviceFingerprint();
                                    break;
                                case "cardHash":
                                    tokenSelection = token.getCardHash();
                                    break;
                            }
                            collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
                        }
                    })
                    .keyBy(0)
                    .timeWindow(time)
                    .process(new MyProcessWindowFunction(maxPetitions, ruleType));
        }
    
        //Class to process the elements from the window
        private class MyProcessWindowFunction extends ProcessWindowFunction<
                Tuple3<String, Integer, String>,
                String,
                Tuple,
                TimeWindow
                > {
    
            private Integer _maxPetitions;
            private String  _ruleType;
    
    
            public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
                this._maxPetitions = maxPetitions;
                this._ruleType = ruleType;
            }
    
            @Override
            public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {
    
                Integer counter = 0;
                for (Tuple3<String, Integer, String> element : iterable) {
                    counter += element.f1++;
                    if(counter > _maxPetitions){
                        out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " +  _ruleType + " token: " + element.f0 );
                        counter = 0;
                    }
                }
            }
        }
    }
    

    到目前为止,我认为这段代码是有效的,但我对Apache Flink是一个乞丐,如果您能告诉我,我尝试使用这段代码的方式是否有问题,并指出正确的方向,我将非常感激。

    多谢.

  • 共有1个答案

    司马渝
    2023-03-14

    通用方法看起来很好,尽管我认为表API功能强大,可以帮助您(更简洁),它可以开箱即用地支持Json。

    如果您希望坚持使用DataStream API,那么在GetStreamKeyCount中,围绕TokenProp的切换应该被替换为向GetStreamKeyCount传递密钥提取器,以便只有一个地方可以添加新规则。

    public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                KeySelector<Token, String> keyExtractor,
                                                Time time, 
                                                Integer maxPetitions, 
                                                String ruleType){
    
        return stream
             .map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
                .keyBy(0)
                .timeWindow(time)
                .process(new MyProcessWindowFunction(maxPetitions, ruleType));
    }
    

    调用变为

    DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream, 
        Token::getIpAddress, Time.minutes(1), 62, "minutes");
    
     类似资料:
    • 我需要一个用户管理服务为我的Spring启动项目。我一般了解DTO(数据传输对象)在Spring的使用。但是当我考虑设计服务时,我只对一个“用户”模型使用多个DTO,如UserDTO、注册用户DTO、更新用户DTO、管理用户DTO...UserDTO就像一个只读数据(带有用户名、电子邮件、姓名的输出数据),用于显示用户信息。但是注册用户DTO就像一个输入数据(带密码,确认密码为新用户创建密码),用

    • 因此,我有一个包含许多的表单,它们都需要填写。我研究了文本框验证,但我只能找到验证单个文本框的说明。下面是单数文本框验证的代码。我只是想知道是否有可能同时打击所有的人,而不是每个人都这样。任何帮助都将不胜感激!

    • 我如何验证一个列表的值跨字段,其中至少一个单一的值必须设置(不是零) 我需要验证至少有一个字段被输入(例如总数不是零) 我遇到的问题是,当任何一个字段发生更改时,validator::total_cost不会重新评估所有正在验证的字段。 在“任意”输入中键入正确的值需要告诉“所有”其他输入,以便根据新的计算字段重新估价! 任何帮助都将不胜感激。 (我的电视机大得多) 我正在使用的标记 AnyVal

    • 当我尝试更新IsApproved=true以批准该属性时,出现以下错误。一个或多个实体的验证失败。有关更多详细信息,请参阅“EntityValidationErrors”属性。验证错误是:请上传图片。;请选择一些属性功能。请帮帮我。提前准备好。我的模型如下: [Table(“AddProperty”)]公共类AddProperty{[Key,DatabaseGenerated(DatabaseGe

    • 看起来,当我这样使用thenReturn时: 当(abc.call())。然后返回(a)。然后返回(b), 我期望: 验证(abc,次(2))。调用() 相反,该方法似乎只被调用了一次,我有点困惑(我的测试工作如预期的那样,mock似乎返回了我预期的值),但对于调用次数,我不知道我是否得到了错误的结果,或者这是Mockito的预期行为?

    • 我正在使用JSR303,并编写了大量注释,因此我熟悉使用自定义约束的过程。 我刚刚遇到了一个问题,我不确定我能否优雅地解决。这里的对象是为了说明!所以我有一个祖父母对象,它有一个孩子的列表。每个孩子都有自己的孩子列表(显然是祖父母的孙子)。此列表的大小可以用@size约束。但我需要限制祖父母的(孙子)总数,这样,如果我验证祖父母实例,他们在所有子女中的孙子数量限制为50个。举个奇怪的例子,我知道: