我正在尝试使用Apache Flink CEP(复杂事件处理)库捕获模式。我从以下结构开始,我希望看到id[1,2]和[3,4]的2个匹配项。但是我没有看到任何结果。
public class StreamingJob {
private static Logger logger = LoggerFactory.getLogger(StreamingJob.class);
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
ArrayList <Event> strings = new ArrayList <>();
strings.add(new Event(0L, "room", 9));
strings.add(new Event(1L, "room", 10));
strings.add(new Event(2L, "garden", 11));
strings.add(new Event(3L, "room", 12));
strings.add(new Event(4L, "garden", 13));
strings.add(new Event(5L, "room", 14));
strings.add(new Event(6L, "room", 15));
KeyedStream <Event, String> source = env.fromCollection(strings).keyBy(Event::getName);
source.print("###-source");
Pattern <Event, ?> pattern = Pattern. <Event>begin("room")
.where(new SimpleCondition <Event>() {
@Override
public boolean filter(Event value) {
logger.info("### value: {}", value);
return value.getName().equals("room");
}
})
.next("garden")
.where(new SimpleCondition <Event>() {
@Override
public boolean filter(Event value) {
logger.info("### value: {}", value);
return value.getName().equals("garden");
}
});
PatternStream <Event> patternStream = CEP.pattern(source, pattern);
// process
DataStream <Alarm> result = patternStream.process(
new PatternProcessFunction <Event, Alarm>() {
@Override
public void processMatch(
Map <String, List <Event>> pattern,
Context ctx,
Collector <Alarm> out) throws Exception {
logger.info("### pattern: {}", pattern);
logger.info("### ctx: {}", ctx);
out.collect(new Alarm(pattern.toString()));
}
});
result.print("###");
// or select function
patternStream.select(new PatternSelectFunction <Event, Alarm>() {
@Override
public Alarm select(Map <String, List <Event>> pattern) throws Exception {
logger.info("###");
return new Alarm(pattern.toString());
}
}).print("###");
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
source.print()
方法正在打印源流,对于接收器,我尝试了进程
和选择
方法,但都无法打印结果。此外,我的过滤器或匹配方法的日志根本没有打印出来。我的印象是过滤器功能甚至没有使用。Event
和警报
对象是简单的pojos,如下所示:
public class Event implements Serializable {
Long id;
String name;
Integer temperature;
//...
}
public class Alarm implements Serializable {
String text;
// ...
}
我还尝试将其更改为过滤其他字段,例如,使用温度字段,我想捕获奇数
-偶数
数字序列,但仍然没有打印任何结果。
CEP依赖于能够按时间戳对事件流进行排序。这要求您要么提供Watermark Strategy
(如果您想使用事件时间戳),要么使用流转时长语义学指定您希望模式匹配按摄取顺序完成。
您可以通过进行以下小改动来实现后者:
PatternStream <Event> patternStream =
CEP.pattern(source, pattern).inProcessingTime();
我用java在netbeans中编程,一切都很好,直到突然测试不再运行。我怀疑这是一个编程错误,因为我正在使用netbeans生成所有测试,而且这个问题似乎在各个项目中持续存在。我甚至尝试过重新安装Netbeans。这是我右键单击并按test file时的输出。 我假设这是一个渐变问题,这里是默认渐变。在新的Netbeans项目上构建文件。 它快把我逼疯了。有变通办法吗?我可以对gradle.bu
下面的代码(代码A)产生正确的输出(匹配) 但是,下面的代码(代码B)产生错误的输出(不匹配)
问题内容: 我正在尝试运行python urllib2脚本并收到此错误: InsecurePlatformWarning:真正的SSLContext对象不可用。这会阻止urllib3正确配置SSL,并可能导致某些SSL连接失败。有关更多信息,请参见https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarn
问题内容: 从那时起,我一直在TutorialsPoint上查看代码,此后一直困扰着我……看一下这段代码: 此代码成功打印: 但是根据正则表达式,为什么它不返回其他可能的结果,例如: 要么 如果此代码不适合这样做,那么我该如何编写一个可以找到所有可能匹配项的代码? 问题答案: 这是因为的贪婪,随之而来的是回溯。 字串: 正则表达式: 我们都知道那是贪婪的,并且尽可能匹配所有字符。因此,第一个匹配所
一、模式匹配 Scala 支持模式匹配机制,可以代替 swith 语句、执行类型检查、以及支持析构表达式等。 1.1 更好的swith Scala 不支持 swith,可以使用模式匹配 match...case 语法代替。但是 match 语句与 Java 中的 switch 有以下三点不同: Scala 中的 case 语句支持任何类型;而 Java 中 case 语句仅支持整型、枚举和字符串常
主机权限和 内容脚本匹配 是基于匹配模式定义的一组 URL。匹配模式本质上是一个以允许的 schema(http,https,file 或ftp 开头)的URL,并且可以包含 “*” 字符。特殊模式 < all_urls > 匹配以允许的 schema 开头的任何 URL。 每个模式包含 3 个部分: schema - 例如,http 或file 或 * 注意:对文件 URL 的访问不是自动的。用