为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的x
和y
坐标以及记录位置的时间t
组成。我的目标是用特定粒子的速度来注释这个数据。所以小溪看起来像这样。
<timestamp:Long> <particle_id:String> <x:Double> <y:Double>
1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2
现在无法保证事件会按顺序到达,即1612103771213 P2-0.1-0.1
可能会在1612103771212 p2 0.0 0.0
之前到达,即10ms
。
为了简单起见,可以假设任何迟来的数据将在早数据的100ms
内到达。
我承认,我是流处理和闪烁的新手,所以这可能是一个愚蠢的问题,提出一个明显的答案,但我目前被难倒了,如何去实现我的目标在这里。
编辑
根据David的回答,我尝试使用Flink表API对数据流进行排序,使用NC-LK9999
对文本套接字流进行排序。问题是,在关闭文本套接字流之前,没有任何内容被打印到控制台。下面是我编写的scala代码-
package processor
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector
import java.time.Duration
object AnnotateJob {
val OUT_OF_ORDER_NESS = 100
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bSettings)
env.setParallelism(1)
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost", 9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)
val posStream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
})
)
val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
tableEnv.createTemporaryView("pos", tablePos)
val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")
val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)
// sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)
sortedPosStream.print()
// execute program
env.execute()
}
case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable
class ParticleMapFunction extends MapFunction[String, ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\\W+")
ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
}
}
}
通常,水印与事件时计时器相结合是对无序事件流带来的问题的解决方案。关于事件时间和水印的官方Flink培训部分解释了它是如何工作的。
在更高的级别上,有时使用Flink的CEP库或Flink SQL更容易,因为它们使按时间对流进行排序变得非常容易,从而消除了所有的无序性。例如,有关使用Flink SQL按事件时间对流进行排序的Flink数据流程序的示例,请参阅如何使用Flink SQL按事件时间对流进行排序。
在您的示例中,一个相当简单的MATCH_RECOGNIZE查询可以完成您所要查找的内容。可能看起来像这样,
SELECT *
FROM event
MATCH_RECOGNIZE (
PARTITION BY particleId
ORDER BY ts
MEASURES
b.ts,
b.particleId,
velocity(a, b)
AFTER MATCH SKIP TO NEXT ROW
PATTERN (a b)
DEFINE
a AS TRUE,
b AS TRUE
)
本文向大家介绍如何使用JavaScript处理绑定事件?,包括了如何使用JavaScript处理绑定事件?的使用技巧和注意事项,需要的朋友参考一下 要使用JavaScript处理事件,请在任何元素上对鼠标单击事件使用click。 示例 您可以尝试运行以下代码,以了解如何使用JavaScript处理绑定事件:
问题内容: 例如,我有10个从AJAX响应生成的标签: 我需要通过循环将onclick事件分配给每个事件: 这是行不通的,它仅将onclick分配给最后一个标签,并警告“ 11”。我该如何工作?我宁愿不使用jQuery。 问题答案: 您所有的处理程序都共享相同的变量。 您需要将每个处理程序放入一个单独的函数作为参数,以便每个处理程序都有自己的变量:
我需要从OracleIdtyManager中的自定义后处理事件处理程序调用外部REST API? 如果有人有想法,请在这里发布。
我一直在寻找如何将apache storm用作CEP的方法,但似乎有两个概念(流处理和复杂事件处理),在CEP中,您可以编写类似sql的查询,并在数据流上执行它们,如ESPER,但我在apache storm中找不到类似的东西,这是否意味着apache storm是一个数据流处理器而不是CEP?
事件就是用户或浏览器自身执行的某种动作。诸如click、load 和mouseover,都是事件的名字。而响应某个事件的函数就叫做事件处理程序(或事件侦听器)。事件处理程序的名字以"on"开头,因此click 事件的事件处理程序就是onclick,load 事件的事件处理程序就是onload。为事件指定处理程序的方式有好几种。 13.2.1 HTML事件处理程序某个元素支持的每种事件,都可以使用一
我已经创建了一个自定义钩子,它从服务器获取数据,向存储发送分派,并返回数据。如果我想在我的应用程序中列出所有评论,它是可用的,但是,我想在我需要获取所有评论回复的组件中重用它,而这应该只在单击某个按钮时发生。 这是下面的钩子。 在我的组件里面,我需要在点击按钮时获取回复 如果我把call放在处理程序中,我会得到一个错误,不能在那里调用hook,但是我只需要在单击按钮时调用它,所以我不知道是否有办法