当前位置: 首页 > 知识库问答 >
问题:

使用Flink同步处理2个流

申屠裕
2023-03-14

我有两个流A和B。

我开始同时吃A和B。

流A仅在每分钟的第59秒获得记录。

流B在每分钟的任何一秒都有记录。

我希望处理使两个流同步。

示例:在10:01:59之后从流A中,我将在10:02:59收到一条记录,直到10:02:59,我也不想从流B中读取任何内容。

这可以在Flink中实现吗?

共有2个答案

邢项禹
2023-03-14

Flink使用基于推的模型(当源和汇被重构为基于拉的模型时,这种模型应该很快就会改变)来处理下游的元素。这意味着您不能“等到事件到达后再拉入更多数据”,同时您必须在某些操作符状态下缓冲这些数据。Flink提供各种状态后端供您使用。

为了对Kkrugler的答案进行可视化,给定两个流,我们将以逻辑方式连接它们,然后在另一个元素到达时使用ListState对其中一个进行检索:

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._ 

object Test {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val streamA = env.fromCollection(List(1, 2, 3))
    val streamB = env.fromCollection(List("a", "b", "c"))

    streamA
      .connect(streamB)
      .process {
        new CoProcessFunction[Int, String, (Int, String)] {
          var myStateA: ListState[Int] = _

          override def open(parameters: Configuration): Unit = {
            myStateA = getRuntimeContext.getListState[Int](
              new ListStateDescriptor[Int]("my_state", classOf[Int])
            )
          }

          override def processElement1(
              value: Int,
              ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
              out: Collector[(Int, String)]
          ): Unit = {
            myStateA.add(value)
          }

          override def processElement2(
              value: String,
              ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
              out: Collector[(Int, String)]
          ): Unit = {
            val list = myStateA.get().iterator().asScala.toList
            val intFromState = list.headOption
            intFromState match {
              case Some(myInt) =>
                out.collect((myInt, value))
              case None => ()
            }

            myStateA.update(list.tail.asJava)
          }
        }
      }
  }
}

注意:简化了此实现。这里无法保证元素的到达顺序,您需要将其添加到您的状态和实现中。您还可以使用计时器,从而为每个进入流的事件注册一个计时器,作为新数据何时到达的指示。

商昆琦
2023-03-14

您不能在Flink中从流中读取记录,但可以从流中删除(或保存)记录。因此,您可以连接这两个流,并使用共平面图进行处理。当您从流a中获取记录时,请将其保存在状态。当您从流B获得记录时,根据流a的状态决定如何处理它。

 类似资料:
  • 我需要多次调用webservice并传递使用来自多个表的数据创建的数据 UI->控制器->服务->(获取数据(表1,表2)并运行一些验证)1。如果验证失败-返回错误消息并停止。2.如果验证通过-调用JobLauncher并返回“任务启动”消息。 在异步作业中,我想到遵循2个步骤。 > bulkinsertstep > 我需要调用DB查询2个更多的表(Table3、table4)并创建一个大的数据集

  • 我想在一个操作符中接收和处理三个流。例如,Storm中实现的代码如下: <代码>生成器。setBolt(“C\u螺栓”,C\u螺栓(),parallelism\u提示)。字段分组(“A\u bolt”,“TRAINING”,新字段(“word”))。字段分组(“B\U螺栓”,“分析”,新字段(“word”))。所有分组(“A\U螺栓”、“总和”) 在Flink中,实现了和的处理: 但我不知道如何添

  • golang是一门语言级别支持并发的程序语言。golang中使用go语句来开启一个新的协程。 goroutine是非常轻量的,除了给它分配栈空间,它所占用的内存空间是微乎其微的。 但当多个goroutine同时进行处理的时候,就会遇到比如同时抢占一个资源,某个goroutine等待另一个goroutine处理完某一个步骤之后才能继续的需求。 在golang的官方文档上,作者明确指出,golang并

  • 我有两组帐号,基于加元和美元货币。基于这些列表,我需要通过传递请求参数来调用相同的方法,一个用于CAD,另一个用于USD。 请指导我如何使用。 提前感谢!

  • null 其中lambda1、2等是条件检查函数,例如 但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?

  • 为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的和坐标以及记录位置的时间组成。我的目标是用特定粒子的速度来注释这个数据。所以小溪看起来像这样。 现在无法保证事件会按顺序到达,即可能会在之前到达,即。 为了简单起见,可以假设任何迟来的数据将在早数据的内到达。 我承认,我是流处理和闪烁的新手,所以这可能是一个愚蠢的问题,提出一个明显的答案,但我目前被难倒了,如何去实现我的