当前位置: 首页 > 知识库问答 >
问题:

查找每个元组的Flink CEP检测延迟

曹浩波
2023-03-14

我有一个简单的模式,如下所示

 Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 1 && event.getValue() > 150;
                        }
                    }).followedBy("s2")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {
                            Long time = System.nanoTime();

                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 2 && event.getValue() > 15;
                        }
                    }).followedBy("s3")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);
                            return event.getSensor_id() == 3 && event.getValue() > 35;
                        }
                    })
                    .within(Time.milliseconds(WindowLength_join__ms));

为了找到CEP检测时间的延迟,添加了在如上所示的模式中选择每个事件的时间。每个事件类都有一个参数Edtl(事件检测时间本地),该参数最初设置为0,然后再设置为系统。nanoTime()

我在执行时遇到以下错误,但问题是该错误是在程序运行一段时间后出现的

    Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.emitMatchedSequences(KeyedCEPPatternOperator.java:77)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:58)
    at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:236)
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    ... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 18 more
Caused by: java.io.IOException: Failed to send message 'patient_id=1, egtl_raw=null, edtg=null
' to socket server at localhost:6020. Connection re-tries are not enabled.
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:154)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 26 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:143)
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:146)

我想我设置这个模式是因为我在模式中同时进行读取和写入操作。如果是这样,那么我应该如何在Flink CEP中找到平均复杂事件延迟。

暂时还没有答案

 类似资料:
  • 问题 你希望能够在特定的情况下检测出在数组中的每个元素。 解决方案 使用 Array.every (ECMAScript 5): evens = (x for x in [0..10] by 2) evens.every (x)-> x % 2 == 0 # => true Array.every 被加入到 Mozilla 的 Javascript 1.6 ,ECMAScript 5 标准。如果

  • 问题内容: 我有两个数组,我想检查是否每个元素都在中。如果元素的值在中重复,则该元素的值必须相等。最好的方法是什么? 问题答案: 一种选择是对两个数组进行排序,然后遍历两个数组,然后比较元素。如果在超级袋中未找到子袋候选中的元素,则前者不是子袋。排序通常为O(n *log(n)),比较为O(max(s,t)),其中 s 和_t_是数组大小,总时间复杂度为O(m * log(m)) ,其中m =ma

  • 主要内容:JavaTuples 元组检查元素的方法,JavaTuples 元组检查元素的示例JavaTuples 元组检查元素的方法 每个元组都提供实用方法来以与集合类似的方式检查其元素。 contains(element) : 检查元素是否存在。 containsAll(collection) : 检查元素是否存在。 indexOf(element) : 如果存在,则返回第一个元素的索引,否则返回 -1。 lastIndexOf(element) : 如果存在,则返回最后一个元素的索引

  • 在HashMap中,我可以使用containsKey(i)或containsValue(i)来检查我是否被使用;对数组也可以这样做吗?我的意思是检查myarray i1中的值是否为z i1==myarray中每个数组的第一个元素的组 在我的例子中{1,3,0,2}

  • 问题内容: 我想比较两个双打数组。使用香草JUnit,我可以执行以下操作: 我想知道如何使用Hamcrest做到这一点,最好不要创建自定义Matchers(如果可能)。类似于对数组中的每个元素使用“关闭”匹配器。 问题答案: 如果更改为a,则可以使用以下辅助方法: 您也可以使用原始数组来完成此操作,但是您将需要一个自定义匹配器。

  • RDB的时间:latest_fork_usec:936 上次导出rdb快照,持久化花费,微秒。 检查是否有人使用了SAVE。