当前位置: 首页 > 知识库问答 >
问题:

Flink:将使用keyBy运算符创建的值的映射广播到另一个流,冷启动问题

方茂
2023-03-14

我有一个高频率的流发布事件,每个事件包含一些关于汽车的信息。我需要处理这个事件流,但排除具有特定城市和车牌号组合的事件。这些列入黑名单的城市和车牌号码组合的信息来自一个每天更新的S3文件。

[
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Berlin"
    },
    {
        "name": "Car2",
        "plate": "XYZ1234",
        "city": "Amsterdam"
    },
    {
        "name": "Car3",
        "plate": "ASD 123",
        "city": "Kuala Lumpur"
    },
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Moscow"
    },
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Barcelona"
    }
]

S3文件如下所示:例如。假设它被称为excludedcars

[
    {
        "plate": "XYZ123",
        "city": "Berlin"
    },
    {
        "plate": "ABC1231",
        "city": "Berlin"
    },
    {
        "plate": "AWS121",
        "city": "Berlin"
    },
    {
        "plate": "XYZ1234",
        "city": "Amsterdam"
    },
    {
        "plate": "AMC3421",
        "city": "Amsterdam"
    },
    {
        "plate": "ASD 123",
        "city": "Kuala Lumpur"
    },
    {
        "plate": "XYZ123",
        "city": "Moscow"
    },
    {
        "plate": "XYZ123",
        "city": "Barcelona"
    }
]

方法:

  1. 将S3文件excludedcars用作流源。
  2. 转换事件以产生以下结构:
{
    "Berlin": ["XYZ123", "ABC1231", "AWS121"],
    "Amsterdam": ["XYZ1234", "AMC3421"],
    "Kuala Lumpur":["ASD 123"],
    "Moscow":["XYZ123"],
    "Barcelona":["XYZ123"]
}

object Cars {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val excludedCarStream: DataStream[Array[ExcludedCarDetail]] = getExcludedCarsStream(env)
    val excludedCarDetails = excludedCarStream.flatMap(item => item) // Array of Excluded Car objects
    excludedCarDetails.map(car => (car.cityId, car.plateNumber)).keyBy(0) // As per my understanding, this should result into a map of city to array of plate number maps 
    excludedCarDetails.print() // This just prints the simple tuples without any grouping by city
    env.execute("Scala SocketTextStreamWordCount Example")
  }

  private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[Array[ExcludedCarDetail]] = {
    val path: String = "file:///Users/name/flinkTest/excluded"
    val textInputFormat = new TextInputFormat(new Path(path))
    env
      .readFile(
        textInputFormat,
        path,
        FileProcessingMode.PROCESS_CONTINUOUSLY,
        1000
      )
      .map(jsonString => {
        val excludedCars: Array[ExcludedCarDetail] = (new Gson).fromJson(jsonString, classOf[Array[ExcludedCarDetail]])
        excludedCars
      })
  }
}

case class ExcludedCarDetail(
  @(SerializedName @scala.annotation.meta.field)("city") cityId: String,
  @(SerializedName @scala.annotation.meta.field)("plate") plateNumber: String
)

广播状态解决方案:

object Cars {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val excludedCarsState: MapStateDescriptor[Int, List[String]] = new MapStateDescriptor("excludedCars", classOf[Int], classOf[List[String]])
    val excludedCarDetails: DataStream[ExcludedCarDetail] = getExcludedCarsStream(env)
    val excludedCarBroadcast: BroadcastStream[ExcludedCarDetail] = excludedCarDetails.broadcast(excludedCarsState)


    val carsStream: DataStream[CarDetail] = getMainCarsStream(env)

    val bs = carsStream
      .keyBy(_.cityId)
      .connect(excludedCarBroadcast)
      .process(new CarsStateLogic(excludedCarsState))

    bs.print()

    env.execute("Scala SocketTextStreamWordCount Example")
  }

  private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[ExcludedCarDetail] = {
    val cars: ListBuffer[ExcludedCarDetail] = ListBuffer()
    for(i <- 0 until 3) {
      val cityId = i+1
      val plateNumber = "Plate"+(i+1)
      cars += ExcludedCarDetail(cityId, plateNumber) // Basically exclude cars with plate1 in city1, plate2 in city2, plate3 in city3
    }
    env.fromCollection(cars.toList)
  }

  private def getMainCarsStream(env: StreamExecutionEnvironment): DataStream[CarDetail] = {
    val cars: ListBuffer[CarDetail] = ListBuffer()
    for(i <- 0 until 10) {
      val cityId = i+1
      val plateNumber = "Plate"+(i+1)
      val name = "Name"+(i+1)
      cars += CarDetail(cityId, plateNumber, name)
    }
    env.fromCollection(cars.toList)
  }
}

case class ExcludedCarDetail(cityId: Int, plateNumber: String)
case class CarDetail(cityId: Int, plateNumber: String, name: String)

class CarsStateLogic(excludedCarsState: MapStateDescriptor[Int, List[String]]) extends KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail] {
  override def processElement(car: CarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#ReadOnlyContext, out: Collector[CarDetail]): Unit = {
    val state = ctx.getBroadcastState(excludedCarsState)

    if(state.contains(car.cityId)) {
      val cityState = state.get(car.cityId)
      if(cityState.indexOf(car.plateNumber) < 0) { // not excluded
        out.collect(car)
      }
    } else {
      out.collect(car)
    }
  }

  override def processBroadcastElement(value: ExcludedCarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#Context, out: Collector[CarDetail]): Unit = {
    val state = ctx.getBroadcastState(excludedCarsState)
    val newStateForKey = if(state.contains(value.cityId)) {
      value.plateNumber :: state.get(value.cityId)
    } else {
      List(value.plateNumber)
    }
    ctx.getBroadcastState(excludedCarsState).put(value.cityId, newStateForKey)
    println("BroadCast element: CityId:"+ value.cityId+ ", State: "+state.get(value.cityId))
  }
}

但我现在碰到了冷启动问题。在处理主数据之前,确保广播状态可用的可靠方法是什么。

共有1个答案

柳和怡
2023-03-14

如果排除的cars数据集很小,那么您可以按原样广播它(不按城市分组)。如果它很大,那么您可以按城市键(与car流相同),并将这两个流连接起来,以便每个子任务只获得所有被排除的car和常规car数据的分区集。

注意,您存在冷启动问题,即您希望在处理任何常规汽车数据之前首先处理当前所有排除的汽车数据,这样,在接收排除的汽车数据之前,您就不会从正在处理的汽车数据中得到误报。

 类似资料:
  • Flink源函数引入水印,这些水印向下传递给下游操作符,根据这些操作符可以执行不同的基于时间的操作。对于使用多个流的操作员,将传入水印的最小值视为此时操作员的水印。 将源流拆分为多个逻辑流,然后将这些逻辑流传递给下游操作员(例如处理函数)。 Eg. 假设Process函数有4个子任务(例如),并且有100个关键组(假设),每个子任务处理25个关键组,即,等等。 如果从下午5点开始DriverStr

  • 我需要将Java转换为的实例(包括映射内容) 我应该用什么来代替来使此代码可编译?

  • 问题内容: 我正在寻找有关如何在Python中将一个范围值转换为另一范围值的想法。我正在从事硬件项目,正在从可返回一定范围值的传感器读取数据,然后使用该数据来驱动需要不同范围值的执行器。 例如,假设传感器返回的值在1到512的范围内,并且执行器由5到10的值驱动。我想要一个函数,我可以传递一个值和两个范围并取回该值映射到第二个范围。如果这样的函数被命名,则可以这样使用: 在此示例中,我希望输出为,

  • 我是Java中Spring框架的新手。并且有一些问题… 我的应用程序使用Spring、Spring seq的安全性、Hibernate框架。 我正在编写一些简单的应用程序。 我有一个页面,可以通过Hibernate将用户添加到db。 用户类: 角色类: 保存在控制器中: 在UserDAO中保存 我在页面上有一个选择块,我可以在其中选择用户角色作为字符串。 所以在保存用户时,我有 所以据我所知,我应

  • 我有两台服务器,具有独立的流/可观察对象,如何使这两台服务器使用相同的链,以便我可以使用,等操作符控制它们? 例如,如果我从服务器获取,而getStream需要一段时间,并且在getFastStream完成后仍在进行中,我希望通过切换映射阻止第一个getStream的覆盖。我怎么能这么做?我是rxjs的新手,所以我尝试的可能不是它应该如何使用。 可观察的 然后,我必须将流推到图像$观察与运算符之一

  • 下面是Main中的随机客户端列表 第二类:产品(字符串名称、枚举类别、BigDecimal价格) 目标->预期结果:在类Main中,创建类Shopping的实例,其中属性是具有 的映射,并用随机数据//integer-购买了多少产品 null null