当前位置: 首页 > 知识库问答 >
问题:

Flink KeyedProcessFunction订购

韩智敏
2023-03-14

我是Flink的新手,我试图理解Flink是如何在其KeyedProcessFunction的并行抽象中命令调用processElement()。考虑这个产生部分和的流的例子:

package sample

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector

object Playground {
  case class Record(groupId: String, score: Int) {}

  def main(args: Array[String]): Unit = {
    // 1. Create the environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
    env.setParallelism(10)

    // 2. Source
    val record1 = Record("groupX", 1)
    val record2 = Record("groupX", 2)
    val record3 = Record("groupX", 3)
    val records: DataStream[Record] = env.fromElements(record1, record2, record3, record1, record2, record3)

    // 3. Application Logic
    val partialSums: DataStream[Int] = records
      .keyBy(record => record.groupId)
      .process(new KeyedProcessFunction[String, Record, Int] {
        // Store partial sum of score for Records seen
        lazy val partialSum: ValueState[Int] = getRuntimeContext.getState(
          new ValueStateDescriptor[Int]("partialSum", classOf[Int]))

        // Ingest new record
        override
        def processElement(value: Record,
                           ctx: KeyedProcessFunction[String, Record, Int]#Context,
                           out: Collector[Int]): Unit =
        {
          val currentSum: Int = partialSum.value()
          partialSum.update(currentSum + value.score)
          out.collect(partialSum.value())
        }
      })

    // 4. Sink
    partialSums.print()

    // 5. Build JobGraph and execute
    env.execute("sample-job")
  }
}

我希望它的输出是流:1、3、6、7、9、12。事实上,就在这里。

是否可以安全地假设这种情况始终存在,尤其是在从具有大量并行性的源读取数据时?

共有1个答案

蒋嘉颖
2023-03-14

在您的示例中,顺序在每个键中得到保证。由于只有一个键,你将永远得到1, 3, 6, 7, 9, 12

当您从并行度大于1的源读取时,各种源实例将相互竞争。当来自两个或多个源的流被连接(例如,通过keyBy、union、rebalance等)时,结果是不确定的(但来自每个源的事件将保持其相对顺序)。

例如,如果你有

stream X: 1, 2, 3, 4
stream Y: a, b, c, d

然后把这两条溪流汇集在一起,你可能会

1,2,3,4, a, b, c, d,或a, b,1,2,3, c,4, d等。

 类似资料:
  • 我是新的数据流和发布子工具在GCP。 需要将prem过程中的电流迁移到GCP。 当前流程如下: 我们有两种类型的数据馈送 Full Feed–其adhoc作业–完整XML的大小约为100GB(单个XML–非常复杂的一个–完整的数据–ETL作业处理此XML并将其加载到约60个表中) 单独的ETL作业用于处理完整提要。ETL作业过程完全馈送并创建负载就绪文件,所有表将被截断并重新加载 源系统每30分钟

  • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

  • 问题内容: 我有一个与数据库对话的servlet,然后返回一个有序(按时间排序)对象的列表。在servlet部分,我有 从日志中,我可以看到数据库以正确的顺序返回了User对象。 在前端,我有 但是顺序改变了。 我只在返回的列表很大(超过130个用户)时才注意到这一点。 我尝试使用Firebug进行调试,Firebug中的“响应选项卡”显示列表的顺序与servlet中的日志不同。 我做错了什么吗?

  • 订阅指过滤表(table)的规则,Canal 客户端发送给客户端订阅规则,那么服务端将会推送符合规则的表数据过来,采用正则匹配。 允许所有表:.\*\\\\..\*

  • 统一下单 没错,什么 H5 支付,公众号支付,扫码支付,支付中签约,全部都是用这个接口下单。 {info} 参数 appid, mch_id, nonce_str, sign, sign_type 可不用传入 服务商模式下, 需使用 sub_openid, 并传入sub_mch_id 和sub_appid $result = $app->order->unify([ 'body' => '

  • 作用 接入方或者费控平台拉取企业支付订单或个人支付转个人垫付订单,做汇总统计之类 依赖 暂无依赖 注意 所有接口调用时需要严格遵守请求方式(GET/POST) 使用接口前需要仔细阅读每个接口的注意事项 接口报错时先阅读通用错误解决方案和当前接口文档下的接口错误解决方案