当前位置: 首页 > 知识库问答 >
问题:

Flink流-在windows中应用函数

施永贞
2023-03-14
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val inputStream = env.readTextFile("dataset.txt")
      .map(transformStream(_))
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.id)
      .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))

def transformStream(input: String): EventStream = {...}

case class EventStream(val eventTime: Long, val id: String, actualEvent: String)

我想做的是对每个窗口批处理的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我已经在DataStream API中看到了该方法的应用,但我不理解它是如何工作的。在Flink API中,它说它的用法与Scala中的用法相同:

inputStream.apply { WindowFunction }

有人能解释一下apply方法是做什么的或者是如何使用的吗?Scala中的示例会更好。apply方法是否符合我的要求?

共有1个答案

罗韬
2023-03-14

所以基本上有两个可能的方向,根据你想要做的计算类型。可以使用:fold/reduce/aggrege或更多您已经提到的泛型-apply。所有这些应用于windows的密钥。

对于apply,它是应用计算的一种非常通用的方法。最基本的版本(在Scala中)是:

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R] 

其中函数采用4个参数:

    null
val stream: DataStream[(String,Int)] =   ...

stream.keyBy(_._1)
      .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
      .apply((e1, e2) => (e1._1, e1._2 + e2._2),
             (key, window, in, out: Collector[(String, Long, Long, Int)]) => {
                out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
      })
 类似资料:
  • 有人尝试在Apache Flink中使用DynamoDB流吗? Flink有一个Kinesis消费者。但是我正在寻找如何直接使用Dynamo流。 我试了很多次,但什么也没找到。然而,Flink Jira董事会发现一个未决请求。所以我想这个选项还不可用?我有什么选择? 允许FlinkKinesisConsumer适应AWS DynamoDB流

  • 如果我想在Flink中分裂一个流,那么最好的方法是什么?

  • 在Flink中,我有一个键控流,我正在对它应用一个进程函数。 我的键选择器看起来像这样... FooBarProcessFunction看起来像这样... 在FooBarProcessFunction中,我希望获得由MyKeySelector的getKey方法创建的键。那可行吗? 目前,我正在使用一种变通方法,其中我基本上在processElement函数中重新创建键。但如果我能避免这样做,那就太

  • 我正在编写一个Flink流程序,其中我需要使用一些静态数据集(信息库,IB)来丰富用户事件的数据流。 对于例如。假设我们有一个买家的静态数据集,并且我们有一个事件的clickstream,对于每个事件,我们要添加一个布尔标志,指示事件的实施者是否是买家。 另一个选择可以是使用托管操作员状态来存储购买者设置,但是我如何保持按用户id分配的该状态,以避免在单个事件查找中使用网络I/O呢?在内存状态后端