final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.setParallelism(1); //tried with 1 & 4
.....
DataStream<LogEvent> inputLogEventStream = env
.readFile(format, FILEPATH, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
.map(new MapToLogEvents())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.seconds(0)) {
public long extractTimestamp(LogEvent element) {
return element.getTimeLong();
}
})
.keyBy(new KeySelector<LogEvent, String>() {
public String getKey(LogEvent le) throws Exception {
return le.getUser();
}
});
inputLogEventStream.print();
Pattern<LogEvent, ?> mflPattern = Pattern.<LogEvent> begin("mfl")
.subtype(LogEvent.class).where(
new SimpleCondition<LogEvent>() {
public boolean filter(LogEvent logEvent) {
if (logEvent.getResult().equalsIgnoreCase("failed")) { return true; }
return false;
}
})
.timesOrMore(3).within(Time.seconds(60));
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);
DataStream<Threat> outputMflStream = mflPatternStream.select(
new PatternSelectFunction<LogEvent, Threat>() {
public Threat select(Map<String, List<LogEvent>> logEventsMap) {
return new Threat("MULTIPLE FAILED LOGINS detected!");
}
});
outputMflStream.print();
并行度=1(成功检测到模式)
04/03/2018 12:03:53 Source: Custom File Source(1/1) switched to RUNNING
04/03/2018 12:03:53 SelectCepOperator -> Sink: Unnamed(1/1) switched to RUNNING
04/03/2018 12:03:53 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(1/1) switched to RUNNING
04/03/2018 12:03:53 Sink: Unnamed(1/1) switched to RUNNING
LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]
LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]
**THREAT** ==> MULTIPLE FAILED LOGINS detected!
并行度=4(无法检测到模式)
04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(3/4) switched to RUNNING
04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(2/4) switched to RUNNING
04/03/2018 12:05:33 Sink: Unnamed(2/4) switched to RUNNING
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(2/4) switched to RUNNING
04/03/2018 12:05:33 Sink: Unnamed(3/4) switched to RUNNING
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(3/4) switched to RUNNING
2> LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
3> LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]
3> LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
3> LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]
我认为,在使用.keyby()处理CEP时,不同的分区会得到这些事件,这是非常重要的。
您的代码
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);
我相信应该
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream.keyBy("eventCategory","user"), mflPattern);
我试图理解Flink中的并行是如何工作的。本文件https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html似乎表明水槽的平行度等于1。在我的例子中,我正在向我的接收器中的HBase写信——这是否意味着只有一个任务(线程?)哪个将写入HBase?它是否没有为应用程序设置全局并行
我有一个玩具Flink工作,从3个Kafka主题中读取,然后联合所有这3个流。仅此而已,没有额外的工作。 如果在我的Flink工作中使用parallelism 1,只要我更改parallelism,一切都会很好 为什么它适用于并行1,但不适用于并行 是否与Kafka服务器端设置有关?或者它与我的java代码中的comsumer设置有关(我的代码中还没有特殊的配置)? 我知道这里提供的信息可能不够充
问题内容: 我正在使用jsr166y ForkJoinPool在线程之间分配计算任务。但是我显然一定做错了。 如果创建并行度> 1(默认值为Runtime.availableProcessors();我一直在运行2-8个线程)的ForkJoinPool,我的任务就可以正常工作。但是,如果我创建并行度= 1的ForkJoinPool,则在无法预测的迭代次数后会看到死锁。 是的-设置并行度= 1是一种
问题内容: 这个问题已经在这里有了答案 : 8年前关闭。 可能重复: 如何使Eclipse在Play中看到更改!编译模板? 我正在玩Play的第一步!框架和我遇到了一些问题。我可以毫无问题地创建和遮盖一个项目。 现在,如果我添加一个新视图,例如“ sometest.scala.html”,并尝试在“应用程序”中使用它,则Eclipse将该文件标记为错误。 Eclipse标记为红色,即使它可以工作。
我在使用JProfiler时面临一个问题:问题是它无法检测正在运行的WebSphere的JVM。 我看到WebSphere作为本地系统帐户作为Windows服务运行。因此,我尝试了“show Services”选项,但它仍然无法检测到JVM。 解决这个问题真的花了我很多时间,有人能帮我吗?
附加信息@Saifur我创建了一个单独的基类,在其中我初始化了驱动程序实例。我在@BeForeClass中调用这个实例,在@afterClass中调用driver.quit()。通过在testng.xml中提供两个不同的类,我试图运行这个实例。