我有一个时间窗口,我尝试确定我是否在一段时间内获得一个新的密钥。我正在通过kafka推送数据,当我调试它时,我看到数据到达keyby
方法,但它没有到达process
方法,并且没有被收集器收集。我正在使用BoundedouToFordernesTimeStampExtractor
来分配水印:
case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest
class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
override def extractTimestamp(element: Src): Long = element.ts
}
class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
private lazy val stateDescriptor = new ValueStateDescriptor("refFilter", createTypeInformation[String])
override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
println(s"RefIpFilter processing $key")//data is not getting here
if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
println(s"new key found $key") //data is not getting here also
context.windowState.getState(stateDescriptor).update(key)
out.collect(elements.head)
}
}
}
lazy val env: StreamExecutionEnvironment =
setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))
lazy val src: DataStream[FooRequest] = env.addSource(consumer)
lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src
.flatMap(new FlatMapFunction[FooRequest,Src ]{
override def flatMap(value: FooRequest, out: Collector[Src]): Unit = value match {
case r: Src =>
out.collect(r)
case invalid =>
log.warn(s"filtered unexpected request $invalid")
}
})
.assignTimestampsAndWatermarks(new TsExtractor)
.keyBy(r => r.ref)
.timeWindow(Time.seconds(120))
.allowedLateness(Time.seconds(360))
.process(new RefFilter)
uniqueRef(src).addSink(sink)
env.execute()
如有任何协助,我将不胜感激
BoundedouToFordernesTimeStampExtractor
跟踪到目前为止看到的最高时间戳,并根据配置的延迟(在本例中为3小时)生成水印。这些水印是定期产生的,默认情况下每200毫秒产生一次。所以只有一个单一的事件,水印将落后于这个事件3小时,窗口永远不会被触发。此外,在输入有限的情况下,作业在处理完所有事件后将停止运行。
context.windowstate
是每个窗口的状态,具有有限的生存期。每个2分钟窗口将有自己的实例,一旦窗口的允许迟到时间过期,它就会被清除。如果您希望具有全局作用域的键控窗口状态具有不确定的生存期,请改用context.globalstate
。
我想在Apache Flink中制作一个流数据的时间窗口。我的数据看起来有点像这样: 但显然,Flink并不是将我的数据作为列表来阅读。它将其作为字符串读取,因此,我得到以下异常: 如何对字符串数据执行时间窗口,或者如何将此数据转换为元组?
上下文:我正在研究的项目处理定期(1分钟)生成的时间戳文件,并将其实时摄取到一系列级联窗口操作符中。文件的时间戳指示事件时间,因此我不需要依赖文件创建时间。每个窗口的处理结果被发送到一个接收器,该接收器将数据存储在多个表中。 我正在尝试想出一个解决方案来处理实时进程可能出现的停机时间。输入文件是独立生成的,因此在Flink解决方案严重停机的情况下,我想摄取和处理丢失的文件,就好像它们是由同一进程摄
问题: 在运行时,引擎会为每个数据流创建一个线程吗?还是每个操作员一个线程? 是否可以在作业启动时在运行时动态创建数据流?(即,如果作业启动时从文件中读取N,并且需要创建相应的N个流) 当创建大量流(N~10000个)时,与在单个流中创建N个分区相比,是否有任何特定的性能影响?
我似乎混淆了逻辑和物理数据分区。
下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。
我有一个火花应用程序。我的用例是允许用户定义一个类似< code>Record =的任意函数 以下是代码: 以下是“Record”、“RecordMetadata”和“ScalaExpression”类的定义: 上面的代码引发了一个神秘的异常: 但是,如果规则直接在代码中定义,则代码运行良好: