当前位置: 首页 > 知识库问答 >
问题:

我可以使用 Flink 状态执行加入吗?

潘向明
2023-03-14

我正在评估Apache Flink的流处理,作为Apache Spark的替代品/补充。我们通常使用Spark解决的任务之一是数据扩充。

也就是说,我有来自物联网传感器的带有传感器ID的数据流,并且我有一组传感器元数据。我想将输入流转换为传感器测量传感器元数据流。

在星火中,我可以和RDD一起加入数据流。

case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
  spark.read.json(...).as[SensorMetadata]
 .map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] = 
  sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] => 
  rdd.join(staticMetadata)
     .map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}

我可以用Apache Flink做同样的技巧吗?我在这方面没有看到直接的API。我唯一的想法是使用有状态转换——我可以在单个流中合并元数据和传感器事件,并使用Flink状态存储来存储元数据(伪代码):

val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
  sensorInput.keyBy("sensorId")
 .connect(staticMetadata.keyBy("sensorId"))
 .flatMap {new RichCoFlatMapFunction() {
   private val ValueState<SensorMetadata> md = _;
   override def open = ??? // initiate value state
   def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) = 
      collector.collect(s, md.value) 
   def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) = 
   md.update(s)  
 }}

这是正确的方法吗?当元数据不适合一台机器时,我可以在更大的范围内使用它吗?

谢啦

共有1个答案

柴泰平
2023-03-14

使用CoFlatMapFunction连接是一种常见的方法。但是,它有一个明显的缺点。每当任何一个输入的元组到达时都会调用该函数,并且您无法控制首先使用哪个输入。因此,在开始时,您必须在元数据尚未完全读取时处理传感器事件。一种方法是缓冲一个输入的所有事件,直到另一个输入被消耗。另一方面,CoFlatMapFunction方法的好处是您可以动态更新元数据。在您的代码示例中,两个输入都被键入连接键。这意味着输入是分区的,每个任务槽都在处理不同的键集。因此,您的元数据可能比机器可以处理的要大(如果您配置RocksDB状态后端,状态可以持久化到磁盘,因此您甚至不受内存大小的限制)。

如果您要求作业开始时所有元数据都必须存在,并且如果元数据是静态的(它不会改变)并且足够小以适合一台机器,您还可以使用常规的FlatMapFunction并从文件中加载open()方法中的元数据。与您的方法相反,这将是一个广播连接,其中每个任务槽都在内存中拥有完整的元数据。除了在事件数据被消耗时所有元数据可用之外,该方法的好处是您不需要洗牌事件数据,因为它可以在任何机器上连接。

 类似资料:
  • 我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?

  • 问题内容: 我试图将状态动态添加到我的应用程序,并尝试使用ui-router。我尝试遵循此线程。 就我而言,已经有一些现存状态,我需要追加到该列表中,并从json读取动态状态 由于某种原因,当尝试用于deferIntercept()方法时,在$urlRouterProvider上出现注入器错误。就我而言,我使用的是angular1.3,而ui路由器的版本是0.2.10。我看到您可以协同创建状态。但

  • 我试图在Flink(版本1.4.2)上使用可查询状态,但不幸的是,我一直收到以下错误: 在客户端,我使用flink-queryable-state-client-java_2_11.jar可查询客户端的相关代码部分是 最后,在Flink上运行的作业配置了一个ListState,如下所示。请注意,数据在ListState上由String键控 在我看来,这似乎是一个序列化错误,但我不知道我需要做什么来

  • 问题内容: 我有一个singleThreadExecutor以便以串行顺序执行提交给它的任务,即一个任务接一个,没有并行执行。 我有可运行的东西,像这样 } 例如,当我向上述单线程执行器提交MyRunnable的三个实例时,我希望执行第一个任务,并且由于Thread.sleep在TIMED_WAITING中具有其执行线程(我可能对特定的线程有误州)。其他两个任务不应分配有执行它们的线程,至少直到第

  • 我正在学习React/redux-奇妙的框架,不知道JS Counl会这么酷!!! 我的问题... 我有一个父组件和一个子组件。可以通过以下方式隐藏/显示子项: 按下子对象上的按钮将其隐藏 1)可以通过使用this.state控制2)可以通过设置来自父母的道具来控制 我的问题是我不能用状态和道具来控制孩子,因为我认为它是状态或道具。 (这并不完全正确。我可以使用事件组件WillReceivePro

  • 我可以添加多个AsyncTask并同时执行吗?我可以从主activity开始执行多个Asynctask,如下所示。 公共类接收器扩展广播接收器{ } 在这段代码中,我得到了所有的日志,但在此之后,它将不会在AsyncTask的doInBackground()方法中运行。我在每个类方法doInBackground()中设置了Log,但没有一个在Log中被命中(意味着没有一个方法被执行)。 我的问题是