我很难让apache beam管道触发基于事件时间的触发器,但似乎能够随着处理时间触发窗口触发。
我的管道相当基本:
>
我提取二级时间戳
我打开数据窗口进行处理
我按秒对数据进行分组,以便以后按秒对流数据进行分类。
我最终在分类的秒数上使用滑动窗口,有条件地每秒向pubsub发出两条消息中的一条。
我的问题似乎在步骤3中。
我试图在第3阶段使用与第5阶段相同的窗口策略,在html" target="_blank">分类秒数上运行滑动平均计算。
我尝试过使用withTimestampCombiner(TimestampCombiner.EARLIEST)选项,但似乎无法解决这个问题。
我已经读到了。WithEarlyFireings方法用于事件时间,但这似乎会模仿我现有的工作方法。理想情况下,我能够依靠通过窗口末端的水印
// De-Batching The Pubsub Message
static public class UnpackDataPoints extends DoFn<String,String>{
@ProcessElement
public void processElement(@Element String c, OutputReceiver<String> out) {
JsonArray packedData = new JsonParser().parse(c).getAsJsonArray();
DateTimeFormatter dtf = DateTimeFormat.forPattern("EEE dd MMM YYYY HH:mm:ss:SSS zzz");
for (JsonElement acDataPoint: packedData){
String hereData = acDataPoint.toString();
DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get("Timestamp").getAsString());
Instant eventTimeStamp = date.toInstant();
out.outputWithTimestamp(hereData,eventTimeStamp);
}
}
}
// Extracting The Second
static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> {
@ProcessElement
public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) {
JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject();
String milliString = accDataObject.get("Timestamp").getAsString();
String secondString = StringUtils.left(milliString,24);
accDataObject.addProperty("noMiliTimeStamp", secondString);
String updatedAccData = accDataObject.toString();
KV<String,String> outputKV = KV.of(secondString,updatedAccData);
out.output(outputKV);
}
}
// The Pipeline & Windowing
Pipeline pipeline = Pipeline.create(options);
PCollection<String> dataPoints = pipeline
.apply("Read from Pubsub", PubsubIO.readStrings()
.fromTopic("projects/????/topics/???")
.withTimestampAttribute("messageTimestamp"))
.apply("Extract Individual Data Points",ParDo.of(new UnpackDataPoints()));
/// This is the event time window that doesn't fire for some reason
/*
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))
// .triggering(AfterWatermark.pastEndOfWindow())
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
//.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
*/
///// Temporary Work Around, this does fire but data is out of order
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(120)))
.triggering(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
PCollection<KV<String, String>> TimeStamped = windowedDataPoints
.apply( "Pulling Out The Second For Aggregates", ParDo.of(new ExtractTimeStamp()));
PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply("Group By Key",GroupByKey.create());
PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply("testingIsh", ParDo.of(new LogKVIterable()));
当我使用第一个窗口策略时,它被注释掉,我的管道无限期地运行,接收数据
流转时长触发与事件时间触发不同。在流转时长中,没有延迟数据。在事件时间中,处理延迟数据是真正的挑战。事件时间处理中的延迟数据是通过使用水印和触发器来处理的。关于这方面的一个很好的指南,我建议查看谷歌泰勒·阿基多的这两篇文章: a, b。
由于在处理时间窗口中,不存在延迟数据这类情况,因此,Apache Beam管道的处理时间不会出现任何问题。
同时,在事件时间窗口中,可能会出现延迟数据,您的窗口和触发应该通过良好的设计来处理这些场景。
很可能是由于错误的配置,您的事件时间处理管道代码没有触发!我不可能复制您的问题,因为您的水印(对于发布/订阅源)是试探性确定的。尽管我建议您通过以下方式调试代码:首先,增加允许性。例如:至1小时。如果这可行,那太好了!如果没有,请参见第二条。第二,用EarlyFirens注释掉。如果这可行,那太好了!如果没有,请取消注释并查看三个三用固定时间窗口,而不是滑动时间窗口。
继续调试,直到您能够隔离问题
:):)
这看起来确实像是您添加到数据中的时间戳可能错误/损坏。我鼓励您验证以下内容:
>
正在正确添加元素中的时间戳。在转换之前/之后添加一些日志记录,并广泛测试该代码。
管道中的数据新鲜度和系统滞后指标正在按照您的预期进行。如果数据新鲜度没有按预期移动,则强烈表明您的时间戳没有正确设置。
当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?
我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢
null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有
窗口正在进行无限循环或其他操作,我正在处理后打印数据流,但看起来根本没有达到那个点。 下面是我的伪代码。 我在reduce函数中添加了要打印的日志。正在从reduce函数打印日志。但是这条流没有被打印出来。 并且流源数据是历史数据,即。。超过2个月的旧数据。如果是历史数据流,还需要专门设置其他内容吗? 任何输入都会大有裨益。。
假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我
我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。