当前位置: 首页 > 知识库问答 >
问题:

Flink:实现数据流和“规则集”之间的“连接”

诸超
2023-03-14

以下用例的最佳实践建议是什么?我们需要将一个流与一组“规则”进行匹配,这些“规则”本质上是一个Flink数据集的概念。对此“规则集”的更新是可能的,但不是频繁的。每个流事件必须与“规则集”中的所有记录进行检查,每次匹配都会在接收器数据流中产生一个或多个事件。规则集中的记录数在6位数范围内。

目前,我们只是将规则加载到本地规则列表中,并在传入的数据流上使用flatMap。在flatMap中,我们只是迭代一个列表,将每个事件与每个规则进行比较。

为了加快迭代速度,我们还可以将列表拆分为几个批,实质上创建一个列表列表,并创建一个单独的线程来迭代每个子列表(使用Java或Scala中的Futures)。

  1. 有没有更好的方法来执行这种连接?
  2. 如果不是,那么通过在flatMap操作中创建新线程,在Flink已经完成的操作的基础上添加额外的并行性是否安全?

编辑:下面是所需的示例代码

package wikiedits

import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object WikipediaEditEventProcessor {

  def main(args: Array[String])= {
    val see = StreamExecutionEnvironment.getExecutionEnvironment
    val edits = see.addSource(new WikipediaEditsSource())

    val ruleSets = Map[Int, List[String]](
      (1, List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")),
      (2, List("k", "l", "m", "n", "o", "p", "q", "r", "s", "t")),
      (3, List("u", "v", "w", "x", "y", "z", "0", "1", "2", "3"))
    )

    val result = edits.flatMap { edit =>
      ruleSets.map { ruleSet =>
        applyRuleSet(edit, ruleSet._2, ruleSet._1)
      }
    }
    see.execute
  }

  def applyRuleSet(event: WikipediaEditEvent, ruleSet: List[String], ruleSetId: Int): Future[List[String]] = {
    val title = event.getTitle
    Future(
      ruleSet.map {
        case rule if title.contains(rule) =>
          val result = s"Ruleset $ruleSetId: $rule -> exists in: $title"
          println(result) // this would be creating an output event instead
          result
        case rule =>
          val result = s"Ruleset $ruleSetId: $rule -> NO MATCH in: $title"
          println(result)
          result
      }
    )
  }
}

共有1个答案

颜文昌
2023-03-14

每个流事件必须对照“规则集”中的所有记录进行检查,每个匹配都将一个或多个事件产生到接收器数据流中。规则集中的记录数在6位数范围内

说你有K规则。如果输入速率快于处理单个事件的K条规则所用的时间,那么您的方法是很好的。另外,您需要一些方法,可以并行地处理这K条规则。

就当他们是K收费亭吧。把它们一个接一个地放起来,而不是把它们放在一个大房间里。这将简化流式引擎的事情。

 类似资料:
  • 以下用例的最佳实践建议是什么?我们需要将流与一组“规则”相匹配,这些规则本质上是Flink数据集的概念。可以对此“规则集”进行更新,但并不频繁。必须根据“规则集”中的所有记录检查每个流事件,并且每个匹配将生成一个或多个事件到接收器数据流中。规则集中的记录数在6位数范围内。 目前,我们只是将规则加载到本地规则列表中,并在传入的数据流上使用flatMap。在flatMap中,我们只是在一个列表上迭代,

  • 我使用Firebase,确切地说是一个实时数据库,我不知道应该设置什么规则。我制定了以下规则: 但现在每个人都可以写作了。当我设置这些: 使用Gmail的用户无法登录,因为数据库中的记录没有创建,但不是在所有设备上。当我在OnePlus上测试时,一切都很好,当我在三星上测试时,数据库中的记录没有创建。这是我负责创建用户的代码:

  • 我有一个数据库: 当用户()在应用程序中注册时,他会填充另一个用户uid(该用户uid具有属性)并将自己的uid添加到他的个人资料中(

  • 我有两条左右流。就在同一时间窗口 左侧流包含元素L1、L2(数字为键) 右流包含元素R1、R3 我想知道如何在Apache Flink中实现LEFT OUTER JOIN,以便处理此窗口时获得的结果如下: L1、R1通过键(1)匹配,L2、R3不匹配。L2包括在内,因为它位于左侧

  • 我试图从动态表和基于某些字段的流中派生新表。 有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。 书籍 ============================ BookId, Instruments, Quantity Book1, Goog,100 Book2, Vod,10 Book1, Appl,50 Book2, Goog,60 Book1, Vod,130 Book3,

  • 我正在编写一个Flink流程序,其中我需要使用一些静态数据集(信息库,IB)来丰富用户事件的数据流。 对于例如。假设我们有一个买家的静态数据集,并且我们有一个事件的clickstream,对于每个事件,我们要添加一个布尔标志,指示事件的实施者是否是买家。 另一个选择可以是使用托管操作员状态来存储购买者设置,但是我如何保持按用户id分配的该状态,以避免在单个事件查找中使用网络I/O呢?在内存状态后端