当前位置: 首页 > 知识库问答 >
问题:

当processElement依赖于广播的数据时,如何在flink中单元测试BroadcastProcessFunction

潘辰龙
2023-03-14

我用BroadcastProcessFunction实现了一个flink流。从processBroadcastElement获取模型,并将其应用于processElement中的事件。

我没有找到对流进行单元测试的方法,因为我没有找到确保在第一个事件之前调度模型的解决方案。我想说有两种方法可以实现这一点:
1。找到一个解决方案,首先在流中推送模型。在流的执行之前,使用模型填充广播状态,以便恢复流

我可能错过了什么,但我没有找到一个简单的方法来做到这一点。

下面是我的问题的一个简单单元测试:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfter, FunSuite}

import scala.collection.mutable


class BroadCastProcessor extends BroadcastProcessFunction[Int, (Int, String), String] {

  import BroadCastProcessor._

  override def processElement(value: Int,
                              ctx: BroadcastProcessFunction[Int, (Int, String), String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    val broadcastState = ctx.getBroadcastState(broadcastStateDescriptor)

    if (broadcastState.contains(value)) {
      out.collect(broadcastState.get(value))
    }
  }

  override def processBroadcastElement(value: (Int, String),
                                       ctx: BroadcastProcessFunction[Int, (Int, String), String]#Context,
                                       out: Collector[String]): Unit = {
    ctx.getBroadcastState(broadcastStateDescriptor).put(value._1, value._2)
  }
}

object BroadCastProcessor {
  val broadcastStateDescriptor: MapStateDescriptor[Int, String] = new MapStateDescriptor[Int, String]("int_to_string", classOf[Int], classOf[String])
}

class CollectSink extends SinkFunction[String] {

  import CollectSink._

  override def invoke(value: String): Unit = {
    values += value
  }
}

object CollectSink { // must be static
  val values: mutable.MutableList[String] = mutable.MutableList[String]()
}

class BroadCastProcessTest extends FunSuite with BeforeAndAfter {

  before {
    CollectSink.values.clear()
  }

  test("add_elem_to_broadcast_and_process_should_apply_broadcast_rule") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataToProcessStream = env.fromElements(1)

    val ruleToBroadcastStream = env.fromElements(1 -> "1", 2 -> "2", 3 -> "3")

    val broadcastStream = ruleToBroadcastStream.broadcast(BroadCastProcessor.broadcastStateDescriptor)

    dataToProcessStream
      .connect(broadcastStream)
      .process(new BroadCastProcessor)
      .addSink(new CollectSink())

    // execute
    env.execute()

    CollectSink.values should contain("1")
  }
}

由于David Anderson的更新,我选择了缓冲区解决方案。我为同步定义了一个进程函数:

class SynchronizeModelAndEvent(modelNumberToWaitFor: Int) extends CoProcessFunction[Int, (Int, String), Int] {
  val eventBuffer: mutable.MutableList[Int] = mutable.MutableList[Int]()
  var modelEventsNumber = 0

  override def processElement1(value: Int, ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
    if (modelEventsNumber < modelNumberToWaitFor) {
      eventBuffer += value
      return
    }
    out.collect(value)
  }

  override def processElement2(value: (Int, String), ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
    modelEventsNumber += 1

    if (modelEventsNumber >= modelNumberToWaitFor) {
      eventBuffer.foreach(event => out.collect(event))
    }
  }
}

因此,我需要将其添加到我的流中:

dataToProcessStream
  .connect(ruleToBroadcastStream)
  .process(new SynchronizeModelAndEvent(3))
  .connect(broadcastStream)
  .process(new BroadCastProcessor)
  .addSink(new CollectSink())

谢谢

共有2个答案

柴嘉年
2023-03-14

感谢David Anderson和Matthieu,我编写了这个通用的CoFlatMap函数,可以在事件流上产生请求的延迟:

import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.util.Collector

import scala.collection.mutable

class SynchronizeEventsWithRules[A,B](rulesToWait: Int) extends CoProcessFunction[A, B, A] {
  val eventBuffer: mutable.MutableList[A] = mutable.MutableList[A]()
  var processedRules = 0
  override def processElement1(value: A, ctx: CoProcessFunction[A, B, A]#Context, out: Collector[A]): Unit = {
    if (processedRules < rulesToWait) {
      println("1 item buffered")
      println(rulesToWait+"--"+processedRules)
      eventBuffer += value
      return
    }
    eventBuffer.clear()
    println("send input to output without buffering:")
    out.collect(value)
  }

  override def processElement2(value: B, ctx: CoProcessFunction[A, B, A]#Context, out: Collector[A]): Unit = {
    processedRules += 1
    println("1 rule processed processedRules:"+processedRules)
    if (processedRules >= rulesToWait && eventBuffer.length>0) {
      println("send buffered data to output")
      eventBuffer.foreach(event => out.collect(event))
      eventBuffer.clear()
    }
  }
}

但不幸的是,它对我的情况一点帮助都没有,因为测试的主题是一个KeyedBroadcastProcessFunction,它使事件数据的延迟变得无关紧要,因为我试图应用一个平面图,使规则流n倍大,n是CPU的数量。所以我将确保生成的事件流将始终与规则流同步,并且将在那之后到达,但它也没有帮助。

毕竟,我找到了这个简单的解决方案,当然它不是确定性的,但由于并行性和并发性的性质,问题本身也不是确定性的。如果我们把delayMilis设置得足够大(

val delayMilis = 100
val synchronizedInput = inputEventStream.map(x=>{
      Thread.sleep(delayMilis)
      x
    }).keyBy(_.someKey)

您还可以将映射函数更改为this以仅在第一个元素上应用延迟:

package util

import org.apache.flink.api.common.functions.MapFunction

class DelayEvents[T](delayMilis: Int) extends MapFunction[T,T] {
  var delayed = false
  override def map(value: T): T = {
        if (!delayed) {
          delayed=true
          Thread.sleep(delayMilis)
        }
        value
  }
} 
val delayMilis = 100
val synchronizedInput = inputEventStream.map(new DelayEvents(100)).keyBy(_.someKey)
云飞翮
2023-03-14

要做到这一点,没有简单的方法。您可以让processElement缓冲其所有输入,直到processBroadcastElement接收到模型为止。或者在没有事件流量的情况下运行一次作业,并在模型广播后获取一个保存点。然后将该保存点恢复到同一作业中,但其事件输入已连接。

顺便说一下,在Flink社区中,您正在寻找的功能通常被称为“辅助输入”。

 类似资料:
  • 我正在使用一个Flink流式Java应用程序,输入源为Kafka。在我的应用程序中总共使用了4个流。一个是主数据流,另一个3个用于广播流。 我加入了使用任何一种类型的三个广播流。我已经作为流B广播,并且能够在广播过程函数上下文状态(即在processBroadcastElement())中接收。 我的问题是, > 是否可以在广播状态下存储大数据? 注意:根据我的理解,Flink广播状态在运行时保存

  • 问题内容: 我有一段代码,我不知道如何进行单元测试!该模块使用urllib2从外部XML提要(twitter,flickr,youtube等)中提取内容。这是一些伪代码: 我的第一个想法是腌制响应并加载它以进行测试,但是显然urllib的响应对象是不可序列化的(它引发了异常)。 仅从响应主体保存XML是不理想的,因为我的代码也使用标头信息。它旨在作用于响应对象。 当然,在单元测试中依赖外部数据源是

  • 单元测试的正确方法是什么?理想情况下,我只想验证某个程序的(模拟)实例调用了主方法中的run()方法。我知道我可以创建一个setter来设置某个程序的实例,但是它看起来像一种代码味道,因为它除了启用更容易的测试之外没有做任何事情?与run()方法上的公共访问器相同。为了测试的目的,它是自由的。如果它是私人的,我如何测试它?我正在寻找类的变化,也许应用一些设计模式来缓解这个问题。

  • 上面的匕首2.0当量是多少? 您可以在GitHub上看到我的项目及其单元测试。