当前位置: 首页 > 知识库问答 >
问题:

如何使用Apache Flink处理无序事件?

黄兴业
2023-03-14

为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的xy坐标以及记录位置的时间t组成。我的目标是用特定粒子的速度来注释这个数据。所以小溪看起来像这样。

<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

现在无法保证事件会按顺序到达,即1612103771213 P2-0.1-0.1可能会在1612103771212 p2 0.0 0.0之前到达,即10ms

为了简单起见,可以假设任何迟来的数据将在早数据的100ms内到达。

我承认,我是流处理和闪烁的新手,所以这可能是一个愚蠢的问题,提出一个明显的答案,但我目前被难倒了,如何去实现我的目标在这里。

编辑

根据David的回答,我尝试使用Flink表API对数据流进行排序,使用NC-LK9999对文本套接字流进行排序。问题是,在关闭文本套接字流之前,没有任何内容被打印到控制台。下面是我编写的scala代码-


package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration


object AnnotateJob {

  val OUT_OF_ORDER_NESS = 100

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val tableEnv = StreamTableEnvironment.create(env, bSettings)

    env.setParallelism(1)

    // Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
    val text = env.socketTextStream("localhost", 9999)
    val objStream = text
      .filter( _.nonEmpty )
      .map(new ParticleMapFunction)

    val posStream = objStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
          .withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
            override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
          })
      )

    val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
    tableEnv.createTemporaryView("pos", tablePos)
    val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")

    val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)

    // sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)

    sortedPosStream.print()

    // execute program
    env.execute()
  }

  case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
  case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
                            var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable

  class ParticleMapFunction extends MapFunction[String, ParticlePos] {
    override def map(t: String): ParticlePos = {
      val parts = t.split("\\W+")
      ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
    }
  }

}

共有1个答案

施轶
2023-03-14

通常,水印与事件时计时器相结合是对无序事件流带来的问题的解决方案。关于事件时间和水印的官方Flink培训部分解释了它是如何工作的。

在更高的级别上,有时使用Flink的CEP库或Flink SQL更容易,因为它们使按时间对流进行排序变得非常容易,从而消除了所有的无序性。例如,有关使用Flink SQL按事件时间对流进行排序的Flink数据流程序的示例,请参阅如何使用Flink SQL按事件时间对流进行排序。

在您的示例中,一个相当简单的MATCH_RECOGNIZE查询可以完成您所要查找的内容。可能看起来像这样,

SELECT *
    FROM event
    MATCH_RECOGNIZE (
        PARTITION BY particleId
        ORDER BY ts
        MEASURES 
            b.ts, 
            b.particleId, 
            velocity(a, b)
        AFTER MATCH SKIP TO NEXT ROW
        PATTERN (a b)
        DEFINE
            a AS TRUE,
            b AS TRUE
    )
 类似资料:
  • 本文向大家介绍如何使用JavaScript处理绑定事件?,包括了如何使用JavaScript处理绑定事件?的使用技巧和注意事项,需要的朋友参考一下 要使用JavaScript处理事件,请在任何元素上对鼠标单击事件使用click。 示例 您可以尝试运行以下代码,以了解如何使用JavaScript处理绑定事件:

  • 问题内容: 例如,我有10个从AJAX响应生成的标签: 我需要通过循环将onclick事件分配给每个事件: 这是行不通的,它仅将onclick分配给最后一个标签,并警告“ 11”。我该如何工作?我宁愿不使用jQuery。 问题答案: 您所有的处理程序都共享相同的变量。 您需要将每个处理程序放入一个单独的函数作为参数,以便每个处理程序都有自己的变量:

  • 我需要从OracleIdtyManager中的自定义后处理事件处理程序调用外部REST API? 如果有人有想法,请在这里发布。

  • 我一直在寻找如何将apache storm用作CEP的方法,但似乎有两个概念(流处理和复杂事件处理),在CEP中,您可以编写类似sql的查询,并在数据流上执行它们,如ESPER,但我在apache storm中找不到类似的东西,这是否意味着apache storm是一个数据流处理器而不是CEP?

  • 事件就是用户或浏览器自身执行的某种动作。诸如click、load 和mouseover,都是事件的名字。而响应某个事件的函数就叫做事件处理程序(或事件侦听器)。事件处理程序的名字以"on"开头,因此click 事件的事件处理程序就是onclick,load 事件的事件处理程序就是onload。为事件指定处理程序的方式有好几种。 13.2.1 HTML事件处理程序某个元素支持的每种事件,都可以使用一

  • 我已经创建了一个自定义钩子,它从服务器获取数据,向存储发送分派,并返回数据。如果我想在我的应用程序中列出所有评论,它是可用的,但是,我想在我需要获取所有评论回复的组件中重用它,而这应该只在单击某个按钮时发生。 这是下面的钩子。 在我的组件里面,我需要在点击按钮时获取回复 如果我把call放在处理程序中,我会得到一个错误,不能在那里调用hook,但是我只需要在单击按钮时调用它,所以我不知道是否有办法