目前,当我打印CEP模式的结果时,我有一个非常奇怪的行为。
数据模型如下:
CEP部分如下所示:
val pattern = Pattern
.begin[VehicleRelated]("before")
.subtype(classOf[Position])
.next("recognize")
.subtype(classOf[Recognize])
.next("after")
.subtype(classOf[Position])
.within(Time.seconds(5))
val patternStream = CEP.pattern(actionEvents, pattern)
val recognitions = patternStream
.select(pattern => {
val s = pattern("recognize").head.asInstanceOf[Recognize]
LOG.debug(s.toString)
s
})
recognitions.print("RECO")
日志的输出如下:
14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())
现在最大的问题是,为什么在我返回强制转换的对象后,vehicleId 属性为 null?有什么建议吗?
更新:我做了一些调查,发现PojoSerializer是问题所在。将调用复制函数,并在第151行中执行此操作。numFields
错误..计数仅包括识别类本身的属性计数,但不包括继承类,在本例中为事件和车辆相关..属性类型和时间戳也为空。。
问题是flink内部POJO序列化器无法正确解析多态性。
因此,我将Kyro序列化器设置为默认值:
val config = env.getConfig
config.enableForceKryo()
主要内容:1.模式Api (Partern Api),2.模式的检测,3匹配的事件的提取,4.超时事件的提取,5.CEP的状态机实现1.模式Api (Partern Api) Flink CEP 核心就是模式 1.1 个体模式 1.1.1 个体模式形式 每一个简单事件并不是任意选取的,也需要有一定的条件规则;所以我们就把每个简单事件的匹配规则称为个体模式 这些都是个体模式。个体模式一般都会匹配接收一个事件, 以begin, next开头的。 1.1.2 个体模式中的量词 .oneOrMore()
主要内容:1.CEP概述,2.模式,3.CEP 的应用场景,4.快速上手代码1.CEP概述 复杂事件处理(Complex Event Processing) 总结起来,复杂事件处理(CEP)的流程可以分成三个步骤: 定义一个匹配规则 将匹配规则应用到事件流上,检测满足规则的复杂事件 对检测到的复杂事件进行处理,得到结果进行输出 输入的事件流在一个模式规则中输出得到的数据流 CEP 是针对流处理而言的,分析的是低延迟、频繁产生的事件流。 目的在于在无界流中检测出特定的数据组
Flink CEP如何管理间歇性状态?它将它们存储在哪里?它只是在内存中还是有一个快速的持久存储支持状态? 留档在任何地方都没有提到这一点。
我已经在Flink中实现了CEP模式,它按预期工作连接到本地Kafka代理。但是当我连接到基于集群的云kafka设置时,Flink CEP不会触发。 我正在使用AscendingTimestampExtractor, 我也收到警告消息, AscendingTimestampExtractor:140-违反时间戳单调性:1594017872227 而且我也尝试过使用Assignerwith周期水印和
我正在尝试创建一个与“至少”事件匹配的CEP模式。修改示例代码: 成 不解决我的问题,因为条件将不断失败,永远无法贡献计数。 我在中找到https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html确实有 有start.at存在吗?
我正在尝试为Kafka输入流实现一个非常简单的Apache Flink CEP。Kafka生产者生成一个简单的Double值,并通过Kafka主题将它们作为字符串发送给消费者。目前,我正在用Flink编码一个CEP消费者。到目前为止,这是我编写的代码: 如果我正在尝试执行这段代码,这是一条错误消息: 编辑:我尝试了另一个例子,每次执行我都得到相同的错误。所以我觉得我的包裹有问题?