我刚刚开始使用Apache Flink进行流处理,我收到的Json流如下所示:
{
token_id: “tok_afgtryuo”,
ip_address: “128.123.45.1“,
device_fingerprint: “abcghift”,
card_hash: “hgtyuigash”,
“bin_number”: “424242”,
“last4”: “4242”,
“name”: “Seu Jorge”
}
并被问到我是否可以履行以下业务规则:
>
如果在过去10秒内此IP的令牌数>5,则拒绝
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//This DataStream Would be Converting the Json to a Token Object
DataStream<Token> baseStream =
env.addSource(new SocketTextStreamFunction("localhost",
9999,
"\n",
1))
.map(new MapTokens());
// 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
DataStreamSink<String> response1 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
5, "seconds").print();
//2 -Decline if number of tokens > 15 for this IP in last minute
DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
62, "minutes").print();
//3- Decline if number of tokens > 60 for this IP in last hour
DataStreamSink<String> response3 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
60, "Hours").print();
env.execute("Job2");
}
在另一个类中,我将对规则进行所有的逻辑操作,我将计算IP地址出现的次数,如果它超过了时间窗口中允许的次数,我将返回一条包含一些信息的消息:
ruleMaker.java
public class RuleMaker {
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
String tokenProp,
Time time,
Integer maxPetitions,
String ruleType){
return
stream
.flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
@Override
public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
String tokenSelection = "";
switch (tokenProp)
{
case "ip":
tokenSelection = token.getIpAddress();
break;
case "device":
tokenSelection = token.getDeviceFingerprint();
break;
case "cardHash":
tokenSelection = token.getCardHash();
break;
}
collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
}
})
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
//Class to process the elements from the window
private class MyProcessWindowFunction extends ProcessWindowFunction<
Tuple3<String, Integer, String>,
String,
Tuple,
TimeWindow
> {
private Integer _maxPetitions;
private String _ruleType;
public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
this._maxPetitions = maxPetitions;
this._ruleType = ruleType;
}
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {
Integer counter = 0;
for (Tuple3<String, Integer, String> element : iterable) {
counter += element.f1++;
if(counter > _maxPetitions){
out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " + _ruleType + " token: " + element.f0 );
counter = 0;
}
}
}
}
}
到目前为止,我认为这段代码是有效的,但我对Apache Flink是一个乞丐,如果您能告诉我,我尝试使用这段代码的方式是否有问题,并指出正确的方向,我将非常感激。
多谢.
通用方法看起来很好,尽管我认为表API功能强大,可以帮助您(更简洁),它可以开箱即用地支持Json。
如果您希望坚持使用DataStream API,那么在GetStreamKeyCount
中,围绕TokenProp
的切换应该被替换为向GetStreamKeyCount
传递密钥提取器,以便只有一个地方可以添加新规则。
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
KeySelector<Token, String> keyExtractor,
Time time,
Integer maxPetitions,
String ruleType){
return stream
.map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
则调用变为
DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream,
Token::getIpAddress, Time.minutes(1), 62, "minutes");
我需要一个用户管理服务为我的Spring启动项目。我一般了解DTO(数据传输对象)在Spring的使用。但是当我考虑设计服务时,我只对一个“用户”模型使用多个DTO,如UserDTO、注册用户DTO、更新用户DTO、管理用户DTO...UserDTO就像一个只读数据(带有用户名、电子邮件、姓名的输出数据),用于显示用户信息。但是注册用户DTO就像一个输入数据(带密码,确认密码为新用户创建密码),用
因此,我有一个包含许多的表单,它们都需要填写。我研究了文本框验证,但我只能找到验证单个文本框的说明。下面是单数文本框验证的代码。我只是想知道是否有可能同时打击所有的人,而不是每个人都这样。任何帮助都将不胜感激!
我如何验证一个列表的值跨字段,其中至少一个单一的值必须设置(不是零) 我需要验证至少有一个字段被输入(例如总数不是零) 我遇到的问题是,当任何一个字段发生更改时,validator::total_cost不会重新评估所有正在验证的字段。 在“任意”输入中键入正确的值需要告诉“所有”其他输入,以便根据新的计算字段重新估价! 任何帮助都将不胜感激。 (我的电视机大得多) 我正在使用的标记 AnyVal
当我尝试更新IsApproved=true以批准该属性时,出现以下错误。一个或多个实体的验证失败。有关更多详细信息,请参阅“EntityValidationErrors”属性。验证错误是:请上传图片。;请选择一些属性功能。请帮帮我。提前准备好。我的模型如下: [Table(“AddProperty”)]公共类AddProperty{[Key,DatabaseGenerated(DatabaseGe
看起来,当我这样使用thenReturn时: 当(abc.call())。然后返回(a)。然后返回(b), 我期望: 验证(abc,次(2))。调用() 相反,该方法似乎只被调用了一次,我有点困惑(我的测试工作如预期的那样,mock似乎返回了我预期的值),但对于调用次数,我不知道我是否得到了错误的结果,或者这是Mockito的预期行为?
我正在使用JSR303,并编写了大量注释,因此我熟悉使用自定义约束的过程。 我刚刚遇到了一个问题,我不确定我能否优雅地解决。这里的对象是为了说明!所以我有一个祖父母对象,它有一个孩子的列表。每个孩子都有自己的孩子列表(显然是祖父母的孙子)。此列表的大小可以用@size约束。但我需要限制祖父母的(孙子)总数,这样,如果我验证祖父母实例,他们在所有子女中的孙子数量限制为50个。举个奇怪的例子,我知道: