当前位置: 首页 > 知识库问答 >
问题:

Flink:在DataStream和“一组规则”之间实现“连接”

何高歌
2023-03-14

以下用例的最佳实践建议是什么?我们需要将流与一组“规则”相匹配,这些规则本质上是Flink数据集的概念。可以对此“规则集”进行更新,但并不频繁。必须根据“规则集”中的所有记录检查每个流事件,并且每个匹配将生成一个或多个事件到接收器数据流中。规则集中的记录数在6位数范围内。

目前,我们只是将规则加载到本地规则列表中,并在传入的数据流上使用flatMap。在flatMap中,我们只是在一个列表上迭代,将每个事件与每个规则进行比较。

为了加快迭代,我们还可以将列表拆分为几个批次,本质上是创建一个列表列表,并创建一个单独的线程来迭代每个子列表(在Java或Scala中使用Futures)。

问题:

  1. 有没有更好的方法来进行这种连接
  2. 如果不是,那么在Flink已经在做的事情的基础上,通过在每个flatMap操作中创建新线程来添加额外的并行性是否安全

编辑:以下是所要求的示例代码:

package wikiedits

import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object WikipediaEditEventProcessor {

  def main(args: Array[String])= {
    val see = StreamExecutionEnvironment.getExecutionEnvironment
    val edits = see.addSource(new WikipediaEditsSource())

    val ruleSets = Map[Int, List[String]](
      (1, List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")),
      (2, List("k", "l", "m", "n", "o", "p", "q", "r", "s", "t")),
      (3, List("u", "v", "w", "x", "y", "z", "0", "1", "2", "3"))
    )

    val result = edits.flatMap { edit =>
      ruleSets.map { ruleSet =>
        applyRuleSet(edit, ruleSet._2, ruleSet._1)
      }
    }
    see.execute
  }

  def applyRuleSet(event: WikipediaEditEvent, ruleSet: List[String], ruleSetId: Int): Future[List[String]] = {
    val title = event.getTitle
    Future(
      ruleSet.map {
        case rule if title.contains(rule) =>
          val result = s"Ruleset $ruleSetId: $rule -> exists in: $title"
          println(result) // this would be creating an output event instead
          result
        case rule =>
          val result = s"Ruleset $ruleSetId: $rule -> NO MATCH in: $title"
          println(result)
          result
      }
    )
  }
}

共有1个答案

吕征
2023-03-14

每个流事件都必须对照“规则集”中的所有记录进行检查,并且每个匹配都会将一个或多个事件生成到接收器数据流中。规则集中的记录数在6位数范围内

假设您有K个规则。如果输入速率快于处理单个事件的K个规则所需的时间,您的方法就很好。否则,您需要一些可以并行处理这些K个规则的方法。

把它们想象成K收费站。把它们一个接一个地放好,而不是放在一个大房间里。这将简化流媒体引擎的工作。

换句话说,使用simple for循环迭代所有规则,并为每个规则创建一个单独的平面图。因此,它们彼此独立,因此可以并行处理。最后,您将有K个平面图用于执行。引擎将使用您为执行提供的任何配置的最大并行性。这种方法将最大可能的并行性限制为K。但是,对于大量规则来说,这已经足够了。

通过在每个flatMap操作中创建新线程来增加并行性

完全不推荐。让并行性闪烁。您在平面图中定义要执行的工作单元。

 类似资料:
  • 以下用例的最佳实践建议是什么?我们需要将一个流与一组“规则”进行匹配,这些“规则”本质上是一个Flink数据集的概念。对此“规则集”的更新是可能的,但不是频繁的。每个流事件必须与“规则集”中的所有记录进行检查,每次匹配都会在接收器数据流中产生一个或多个事件。规则集中的记录数在6位数范围内。 目前,我们只是将规则加载到本地规则列表中,并在传入的数据流上使用flatMap。在flatMap中,我们只是

  • 我想加入一个大表,不可能包含在TM内存和流(kakfa)中。我在测试中成功加入了这两个表,将table-api与datastream api混合在一起。我做了以下操作: 它正在工作,但我从未见过这种类型的实现。可以吗?缺点是什么?

  • 在JUnit 4.10及更低版本中,可以将规则注释为@规则和@ClassRur。这意味着规则会在类之前/之后以及每次测试之前/之后被调用。这样做的一个可能原因是设置一个昂贵的外部资源(通过@ClassRur调用),然后廉价地重置它(通过@Rur调用)。 从JUnit 4.11开始,@规则字段必须是非静态的,@ClassRule字段必须是静态的,因此上述操作不再可行。 有明确的变通方法(例如,明确地

  • Makefile有很多灵活的写法,可以写得更简洁,同时减少出错的可能。本节我们来看看这样一个例子还有哪些改进的余地。 一个目标依赖的所有条件不一定非得写在一条规则中,也可以拆开写,例如: main.o: main.h stack.h maze.h main.o: main.c gcc-c main.c 就相当于: main.o: main.c main.h stack