当前位置: 首页 > 知识库问答 >
问题:

warnings.print()以相反的顺序打印事件(最后一个事件优先),但Apache Flink CEP中的第一个事件除外

濮阳浩穰
2023-03-14

我正在尝试过滤所有临时事件

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
                .subtype(TemperatureEvent.class)
                .where(new FilterFunction<TemperatureEvent>() {
                    @Override
                    public boolean filter(TemperatureEvent temperatureEvent) throws Exception {
                        return temperatureEvent.getTemperature() > 50;
                    }
                });

输入是一个文本文件,由输入函数解析为流,输入文件的内容包括:-

1,98
2,33
3,44
4,55
5,66
6,88
7,99
8,76

第一个值是Rack_id第二个是温度

我在input stream和WarnigsStream上都发布了print(),如下所示

inputEventStream.print();
warnings.print();

现在,问题来了,Flink CEP的输出如下所示

08/10/2017 23:43:15 Job execution switched to status RUNNING.
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING 
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
Rack id = 1 and temprature = 98.0)
Rack id = 2 and temprature = 33.0)
Rack id = 3 and temprature = 44.0)
Rack id = 4 and temprature = 55.0)
Rack id = 5 and temprature = 66.0)
Rack id = 6 and temprature = 88.0)
Rack id = 7 and temprature = 99.0)
Rack id = 8 and temprature = 76.0)
08/10/2017 23:43:16 Source: Custom Source -> Sink: Unnamed(1/1) switched to FINISHED 
Rack id = 1 and temprature = 98.0)
Rack id = 8 and temprature = 76.0)
Rack id = 7 and temprature = 99.0)
Rack id = 6 and temprature = 88.0)
Rack id = 5 and temprature = 66.0)
Rack id = 4 and temprature = 55.0)
08/10/2017 23:43:16 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED 
08/10/2017 23:43:16 Job execution switched to status FINISHED.

Process finished with exit code 0

正如我们所看到的,第一个复杂事件(机架id=1和温度=98.0))以相同的顺序打印,但在此之后,所有其他复杂事件都具有温度

My questions are :-

1. Any idea why events are getting printed in reverse order?
2. Is there a custom way to print values{w/o using warnings.print()} of 
   warning stream, like can I print only temperature, rather than rack-id ?

提前感谢

共有1个答案

顾斌
2023-03-14

通过将时间戳和水印分配给inputStream解决了此问题,如下所示

// Input stream of monitoring events
        DataStream<MonitoringEvent> inputEventStream = env
                .addSource(new InputStreamAGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

生成的输出如下所示

 08/11/2017 00:45:09    Job execution switched to status RUNNING.
    08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to SCHEDULED 
    08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to DEPLOYING 
    08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
    08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
    08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to RUNNING 
    08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING 
    Rack id = 1 and temprature = 98.0)
    Rack id = 4 and temprature = 55.0)
    Rack id = 5 and temprature = 66.0)
    Rack id = 6 and temprature = 88.0)
    Rack id = 7 and temprature = 99.0)
    Rack id = 8 and temprature = 76.0)
    08/11/2017 00:45:10 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to FINISHED 
    08/11/2017 00:45:10 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED 
    08/11/2017 00:45:10 Job execution switched to status FINISHED.
 类似资料:
  • 问题内容: JavaScript处理事件的优先顺序是什么? 以下是按字母顺序排列的事件… onabort-图像加载中断 onblur-元素失去焦点 onchange-用户更改字段的内容 onclick-鼠标单击对象 ondblclick-鼠标双击一个对象 onerror-加载文档或图像时发生错误 onfocus-元素获得焦点 onkeydown-按下键盘键 onkeypress-按下或按住键盘键

  • 实际上,我用@KafkaListener来阅读一个主题中的事件,我想阅读100个事件,然后放一个线程。在一定时间内睡觉。我的问题是,当线程返回时,侦听器继续执行我读到的最后一个事件,但我想在线程睡眠时放弃这些事件,继续执行主题中的最后一个事件。 比如: 1-100-捕获 睡线 101-500 线程返回 501-601-捕获 101-500事件可以被丢弃 代码: kafka配置:

  • 我正在尝试在我的应用程序中实现一个抽屉布局。我遵循android开发者网站上的教程,一切顺利。我只有一个“小”问题:我用午餐打开应用程序,打开抽屉布局,然后点击列表视图中的一个元素,一切都很顺利,但当我尝试打开另一个抽屉并点击他的一个元素时,我注意到我第一次点击的元素仍然被点击,而我无法点击其他任何元素。我的代码是:public class Eventi extends Activity{priv

  • 大部分的事件触发依赖于用户与浏览器的交互,但用户的行为是不可控的,许多交互设计上的缺陷与无法考虑到的因素会导致事件的频繁触发。 当事件处理器内部包含大量的操作,又不需要如此快速的响应事件时,就需要采用一些手段来限制事件处理器的执行。 事件的优化主要有两个目的: 减少不必要的 HTTP 请求 减少本机性能的消耗 1. 交互设计 通过交互的设计来优化事件是最常用到的方式。 如用户点击删除后将按钮禁止。

  • 我有一个用例:stepA- 但是我还能做什么呢?谢谢

  • 我想取消定义服务的AlarmManager,在此服务中可以启动新的AlarmManager或取消以前定义的报警。我知道alarmManager里的params Pendingtent。取消(挂起内容),必须相同。与filterEquals(Intent other)相比,它仍然不起作用。取消失败。这是我的密码 GetRoundSilence Service和GerRoundNormalServic