当前位置: 首页 > 知识库问答 >
问题:

flink数据不由timewindow运算符中的process函数处理

滕学义
2023-03-14

我有一个时间窗口,我尝试确定我是否在一段时间内获得一个新的密钥。我正在通过kafka推送数据,当我调试它时,我看到数据到达keyby方法,但它没有到达process方法,并且没有被收集器收集。我正在使用BoundedouToFordernesTimeStampExtractor来分配水印:

    case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest

    class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
      override def extractTimestamp(element: Src): Long = element.ts
    }

    class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
      private lazy val stateDescriptor = new ValueStateDescriptor("refFilter",  createTypeInformation[String])

      override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
        println(s"RefIpFilter processing $key")//data is not getting here 
        if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
          println(s"new key found $key") //data is not getting here also 
          context.windowState.getState(stateDescriptor).update(key)
          out.collect(elements.head)
        }
      }
    }

lazy val env: StreamExecutionEnvironment =
    setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))

 lazy val src: DataStream[FooRequest] = env.addSource(consumer)

 lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src 
        .flatMap(new FlatMapFunction[FooRequest,Src ]{
          override def flatMap(value: FooRequest, out: Collector[Src]): Unit =   value match {
            case r: Src =>
              out.collect(r)
            case invalid =>
              log.warn(s"filtered unexpected request $invalid")
          }
        })
        .assignTimestampsAndWatermarks(new TsExtractor)
        .keyBy(r => r.ref)
        .timeWindow(Time.seconds(120))
        .allowedLateness(Time.seconds(360))
        .process(new RefFilter)

uniqueRef(src).addSink(sink)
env.execute()

如有任何协助,我将不胜感激

共有1个答案

郎曜文
2023-03-14

BoundedouToFordernesTimeStampExtractor跟踪到目前为止看到的最高时间戳,并根据配置的延迟(在本例中为3小时)生成水印。这些水印是定期产生的,默认情况下每200毫秒产生一次。所以只有一个单一的事件,水印将落后于这个事件3小时,窗口永远不会被触发。此外,在输入有限的情况下,作业在处理完所有事件后将停止运行。

context.windowstate是每个窗口的状态,具有有限的生存期。每个2分钟窗口将有自己的实例,一旦窗口的允许迟到时间过期,它就会被清除。如果您希望具有全局作用域的键控窗口状态具有不确定的生存期,请改用context.globalstate

 类似资料:
  • 我想在Apache Flink中制作一个流数据的时间窗口。我的数据看起来有点像这样: 但显然,Flink并不是将我的数据作为列表来阅读。它将其作为字符串读取,因此,我得到以下异常: 如何对字符串数据执行时间窗口,或者如何将此数据转换为元组?

  • 上下文:我正在研究的项目处理定期(1分钟)生成的时间戳文件,并将其实时摄取到一系列级联窗口操作符中。文件的时间戳指示事件时间,因此我不需要依赖文件创建时间。每个窗口的处理结果被发送到一个接收器,该接收器将数据存储在多个表中。 我正在尝试想出一个解决方案来处理实时进程可能出现的停机时间。输入文件是独立生成的,因此在Flink解决方案严重停机的情况下,我想摄取和处理丢失的文件,就好像它们是由同一进程摄

  • 问题: 在运行时,引擎会为每个数据流创建一个线程吗?还是每个操作员一个线程? 是否可以在作业启动时在运行时动态创建数据流?(即,如果作业启动时从文件中读取N,并且需要创建相应的N个流) 当创建大量流(N~10000个)时,与在单个流中创建N个分区相比,是否有任何特定的性能影响?

  • 我似乎混淆了逻辑和物理数据分区。

  • 下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。

  • 我有一个火花应用程序。我的用例是允许用户定义一个类似< code>Record =的任意函数 以下是代码: 以下是“Record”、“RecordMetadata”和“ScalaExpression”类的定义: 上面的代码引发了一个神秘的异常: 但是,如果规则直接在代码中定义,则代码运行良好: