我有一个简单的模式,如下所示
Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
Long time = System.nanoTime();
// here we are setting the time when this event is detected
event.setEdtl(time);
return event.getSensor_id() == 1 && event.getValue() > 150;
}
}).followedBy("s2")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
Long time = System.nanoTime();
// here we are setting the time when this event is detected
event.setEdtl(time);
return event.getSensor_id() == 2 && event.getValue() > 15;
}
}).followedBy("s3")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
Long time = System.nanoTime();
// here we are setting the time when this event is detected
event.setEdtl(time);
return event.getSensor_id() == 3 && event.getValue() > 35;
}
})
.within(Time.milliseconds(WindowLength_join__ms));
为了找到CEP检测时间的延迟,添加了在如上所示的模式中选择每个事件的时间。每个事件类都有一个参数Edtl(事件检测时间本地),该参数最初设置为0,然后再设置为系统。nanoTime()
我在执行时遇到以下错误,但问题是该错误是在程序运行一段时间后出现的
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.cep.operator.KeyedCEPPatternOperator.emitMatchedSequences(KeyedCEPPatternOperator.java:77)
at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:58)
at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:236)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 18 more
Caused by: java.io.IOException: Failed to send message 'patient_id=1, egtl_raw=null, edtg=null
' to socket server at localhost:6020. Connection re-tries are not enabled.
at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:154)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 26 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:143)
at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:146)
我想我设置这个模式是因为我在模式中同时进行读取和写入操作。如果是这样,那么我应该如何在Flink CEP中找到平均复杂事件延迟。
问题 你希望能够在特定的情况下检测出在数组中的每个元素。 解决方案 使用 Array.every (ECMAScript 5): evens = (x for x in [0..10] by 2) evens.every (x)-> x % 2 == 0 # => true Array.every 被加入到 Mozilla 的 Javascript 1.6 ,ECMAScript 5 标准。如果
问题内容: 我有两个数组,我想检查是否每个元素都在中。如果元素的值在中重复,则该元素的值必须相等。最好的方法是什么? 问题答案: 一种选择是对两个数组进行排序,然后遍历两个数组,然后比较元素。如果在超级袋中未找到子袋候选中的元素,则前者不是子袋。排序通常为O(n *log(n)),比较为O(max(s,t)),其中 s 和_t_是数组大小,总时间复杂度为O(m * log(m)) ,其中m =ma
主要内容:JavaTuples 元组检查元素的方法,JavaTuples 元组检查元素的示例JavaTuples 元组检查元素的方法 每个元组都提供实用方法来以与集合类似的方式检查其元素。 contains(element) : 检查元素是否存在。 containsAll(collection) : 检查元素是否存在。 indexOf(element) : 如果存在,则返回第一个元素的索引,否则返回 -1。 lastIndexOf(element) : 如果存在,则返回最后一个元素的索引
问题内容: 我想比较两个双打数组。使用香草JUnit,我可以执行以下操作: 我想知道如何使用Hamcrest做到这一点,最好不要创建自定义Matchers(如果可能)。类似于对数组中的每个元素使用“关闭”匹配器。 问题答案: 如果更改为a,则可以使用以下辅助方法: 您也可以使用原始数组来完成此操作,但是您将需要一个自定义匹配器。
RDB的时间:latest_fork_usec:936 上次导出rdb快照,持久化花费,微秒。 检查是否有人使用了SAVE。
查看info里面的total_connections_received,如果该值不断升高,则需要修改应用,采用连接池方式进行,因为频繁关闭再创建连接redis的开销很大。