我想用以下代码匹配Flink 1.4.0 Streaming中的CEP模式:
DataStream<Event> input = inputFromSocket.map(new IncomingMessageProcessor()).filter(new FilterEmptyAndInvalidEvents());
DataStream<Event> inputFiltered = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
KeyedStream<Event, String> partitionedInput = inputFiltered.keyBy(new MyKeySelector());
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new ActionCondition("action1"))
.followedBy("middle").where(new ActionCondition("action2"))
.followedBy("end").where(new ActionCondition("action3"));
pattern = pattern.within(Time.seconds(30));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
事件
只是一个POJO
public class Event {
private UUID id;
private String action;
private String senderID;
private long occurrenceTimeStamp;
......
}
从我的自定义源(Google PubSub)中提取。第一个过滤器FilterEmptyAndInvalidEvents()只过滤格式不正确的事件等,但在这种情况下不会发生这种情况。由于日志输出,我可以验证这一点。因此,每个事件都会通过MyKeySelector运行。getKey()方法。
BoundedOutOfOrdneressGenerator
仅从一个字段中提取时间戳:
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
private static Logger LOG = LoggerFactory.getLogger(BoundedOutOfOrdernessGenerator.class);
private final long maxOutOfOrderness = 5500; // 5.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Event element, long previousElementTimestamp) {
long timestamp = element.getOccurrenceTimeStamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
Watermark newWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return newWatermark;
}
}
仅从字段中提取字符串值:
public class MyKeySelector implements KeySelector<Event, String> {
private static Logger LOG = LoggerFactory.getLogger(MyKeySelector.class);
@Override
public String getKey(Event value) throws Exception {
String senderID = value.getSenderID();
LOG.info("Partioning event {} by key {}", value, senderID);
return senderID;
}
}
这里的ActionCondition只是对事件中的一个字段进行比较,如下所示:
public class ActionCondition extends SimpleCondition<Event> {
private static Logger LOG = LoggerFactory.getLogger(ActionCondition.class);
private String filterForCommand = "";
public ActionCondition(String filterForCommand) {
this.filterForCommand = filterForCommand;
}
@Override
public boolean filter(Event value) throws Exception {
LOG.info("Filtering event for {} action: {}", filterForCommand, value);
if (value == null) {
return false;
}
if (value.getAction() == null) {
return false;
}
if (value.getAction().equals(filterForCommand)) {
LOG.info("It's a hit for the {} action for event {}", filterForCommand, value);
return true;
} else {
LOG.info("It's a miss for the {} action for event {}", filterForCommand, value);
return false;
}
}
}
不幸的是,当启动作业并发送应该由模式匹配的事件时,它们被正确地接收和分区,但CEP模式不匹配。
例如,我发送以下事件:
在Flink作业的日志输出中,我看到事件通过MyKeySelector正确运行。getKey()方法,因为我在那里添加了日志输出。因此,事件似乎正确地出现在流中,但不幸的是,它们与模式不匹配。
日志输出如下所示:
FilterEmptyAndInvalidEvents - Letting event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 through
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 through
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 through
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
TimeCharacteristic通过设置为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
事件包含正确的时间戳。
如果我现在发送另外3个事件与行动(但与新的时间戳等)
该模式与第一组事件相匹配。我知道它与第一组事件相匹配,因为出于调试目的,我用guid标记了每个事件,并为匹配的事件打印了它。
发送第三、第四。。。在这3个事件的集合中,始终会匹配上一组事件。因此,在模式检测中似乎存在某种“偏移”。但这似乎不是一个时间问题,因为如果我在发送后等待很长时间(并且看到事件被Flink分割),第一组事件也不匹配。
我的代码有什么问题吗?或者为什么flink总是将之前的事件集与模式匹配?
我确实解决了这个问题——我总是在流媒体源点进行搜索,但我的事件处理实际上完全没有问题。问题是,我的水印生成并没有持续发生。正如您在上面的代码中所看到的,我只在收到事件时生成了水印。
但是在发送了前3个事件后,我的设置中就没有更多的事件了。因此,没有再次生成新的水印。
并且因为没有创建时间戳大于序列最后接收事件时间戳的新水印,Flink从未处理过元素。原因可以在这里找到:Flink CEP-处理事件时间的延迟
重要的一句话是:
...当水印到达时,将处理此缓冲区中时间戳小于水印的所有元素。
因此,由于我在BoundedAutofordernessGenerator中以5.5秒的延迟生成水印,最新的水印总是在最后一个事件的时间戳之前5.5秒。因此,事件从未被处理。
因此,解决这个问题的一个方法是定期生成水印,假设事件发生的特定延迟。为了做到这一点,我们需要为ExecutionConfig设置setAutoWatermarkInterval:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
..
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(1000L);
这使Flink能够在给定的时间内(本例中为每秒)定期调用水印生成器,并提取新的水印。
此外,我们需要调整时间戳/水印生成器,以便即使没有新事件流入,它也会发出新的时间戳。为此,我操纵了BoundedAutofordernessTimestampExtractor。Flink附带的java:
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
private static final long serialVersionUID = 1L;
/** The current maximum timestamp seen so far. */
private long currentMaxTimestamp;
/** The timestamp of the last emitted watermark. */
private long lastEmittedWatermark = Long.MIN_VALUE;
/**
* The (fixed) interval between the maximum seen timestamp seen in the records
* and that of the watermark to be emitted.
*/
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessGenerator() {
Time maxOutOfOrderness = Time.seconds(5);
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness
+ ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}
/**
* Extracts the timestamp from the given element.
*
* @param element The element that the timestamp is extracted from.
* @return The new timestamp.
*/
public long extractTimestamp(Event element) {
long timestamp = element.getOccurrenceTimeStamp();
return timestamp;
}
@Override
public final Watermark getCurrentWatermark() {
Instant instant = Instant.now();
long nowTimestampMillis = instant.toEpochMilli();
long latenessTimestamp = nowTimestampMillis - maxOutOfOrderness;
if (latenessTimestamp >= currentMaxTimestamp) {
currentMaxTimestamp = latenessTimestamp;
}
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(Event element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
正如您在getCurrentWatermark()中所看到的,我取当前历元时间戳,减去我们期望的最大延迟,然后从该时间戳创建水印。
Flink现在每秒钟都会提取一个新的时间戳,水印总是“滞后”5秒。这允许在收到最后一个事件后最长5秒钟内,将事件与定义的模式进行匹配。
如果这适用于您的场景,则取决于您的场景,因为这也意味着在Flink接收到的时间点超过5秒(比水印少5秒)的事件将被丢弃并且不再处理。
有人知道在bash中如何在包含txt文件和子目录(我也必须搜索)的目录中搜索模式a,然后在匹配模式a的文件上打印匹配模式B的结果吗?
我有两个文件file1。txt和file2。txt。 文件1。文本 file2.txt 我想匹配文件1的第1、2和3列。带有文件2第1、4和5列的txt。txt。如果匹配,则将匹配行与以下行一起打印,直到,但不要打印。我用“awk”命令试过了
给定:
如何在流中获得第一个匹配条件的元素?我试过了,但不起作用 如果条件不起作用,则在Stop以外的其他类中调用filter方法。
本文向大家介绍Rust 与守卫的条件模式匹配,包括了Rust 与守卫的条件模式匹配的使用技巧和注意事项,需要的朋友参考一下 示例 可以根据独立于使用if防护匹配的值的值来匹配模式: 这将显示“不可用”。
问题内容: 我在尝试编译一个android项目时从Jenkins控制台获得了以下输出:请注意,我没有对主类(Jenkins控制台)进行任何更改: 这是我的pom: 我认为我的问题来自目标目录,因为jenkins提到了这一点。我在詹金斯(Jenkins)有3个目录:assets,res et src是否应该在这三个文件夹中添加更多内容?例如lib或target ?? 问题答案: 该 档案文物 后生成