我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。
我能够编写一个自定义触发器,将数据分区到Windows中。代码如下:
case class Trade(key: Int, millis: Long, time: LocalDateTime, price: Double, size: Int)
class VolumeTrigger(triggerVolume: Int, config: ExecutionConfig) extends Trigger[Trade, Window] {
val LOG: Logger = LoggerFactory.getLogger(classOf[VolumeTrigger])
val stateDesc = new ValueStateDescriptor[Double]("volume", createTypeInformation[Double].createSerializer(config))
override def onElement(event: Trade, timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
val volume = ctx.getPartitionedState(stateDesc)
if (volume.value == null) {
volume.update(event.size)
return TriggerResult.CONTINUE
}
volume.update(volume.value + event.size)
if (volume.value < triggerVolume) {
TriggerResult.CONTINUE
}
else {
volume.update(volume.value - triggerVolume)
TriggerResult.FIRE_AND_PURGE
}
}
override def onEventTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
override def onProcessingTime(time: Long, window:Window, ctx: TriggerContext): TriggerResult = {
throw new UnsupportedOperationException("Not a processing time trigger")
}
override def clear(window: Window, ctx: TriggerContext): Unit = {
val volume = ctx.getPartitionedState(stateDesc)
ctx.getPartitionedState(stateDesc).clear()
}
}
def main(args: Array[String]) : Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val trades = env
.readTextFile("/tmp/trades.csv")
.map {line =>
val cells = line.split(",")
val time = LocalDateTime.parse(cells(0), DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss.SSSSSSSSS"))
val millis = time.toInstant(ZoneOffset.UTC).toEpochMilli
Trade(0, millis, time, cells(1).toDouble, cells(2).toInt)
}
val aggregated = trades
.assignAscendingTimestamps(_.millis)
.keyBy("key")
.window(GlobalWindows.create)
.trigger(new VolumeTrigger(500, env.getConfig))
.sum(4)
aggregated.writeAsText("/tmp/trades_agg.csv")
env.execute("volume agg")
}
0180102 04:00:29.715706404,169.10,100
20180102 04:00:29.715715627,169.10,100
20180102 05:08:29.025299624,169.12,100
20180102 05:08:29.025906589,169.10,214
20180102 05:08:29.327113252,169.10,200
20180102 05:09:08.350939314,169.00,100
20180102 05:09:11.532817015,169.00,474
20180102 06:06:55.373584329,169.34,200
20180102 06:07:06.993081961,169.34,100
20180102 06:07:08.153291898,169.34,100
20180102 06:07:20.081524768,169.34,364
20180102 06:07:22.838656715,169.34,200
20180102 06:07:24.561360031,169.34,100
20180102 06:07:37.774385969,169.34,100
20180102 06:07:39.305219107,169.34,200
上面的代码可以将其划分为大致相同大小的窗口:
Trade(0,1514865629715,2018-01-02T04:00:29.715706404,169.1,514)
Trade(0,1514869709327,2018-01-02T05:08:29.327113252,169.1,774)
Trade(0,1514873215373,2018-01-02T06:06:55.373584329,169.34,300)
Trade(0,1514873228153,2018-01-02T06:07:08.153291898,169.34,464)
Trade(0,1514873242838,2018-01-02T06:07:22.838656715,169.34,600)
Trade(0,1514873294898,2018-01-02T06:08:14.898397117,169.34,500)
Trade(0,1514873299492,2018-01-02T06:08:19.492589659,169.34,400)
Trade(0,1514873332251,2018-01-02T06:08:52.251339070,169.34,500)
Trade(0,1514873337928,2018-01-02T06:08:57.928680090,169.34,1000)
Trade(0,1514873338078,2018-01-02T06:08:58.078221995,169.34,1000)
现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。
那可以用一些自定义聚合函数来处理吗?它需要知道前一个窗口的结果,但我无法找到如何做到这一点。
我希望从Spark Structured Streaming改为Flink是一个不错的选择,因为我以后还要处理更复杂的情况。
由于您的密钥对于所有记录都是相同的,因此在这种情况下您可能不需要窗口。请参阅Flink的文档https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-keyed-state。它有一个CountWindowAverage类,其中使用状态变量对来自流中每个记录的值进行聚合。您可以实现这一点,并在状态变量到达您的触发器卷时发送输出,并用剩余的卷重置状态变量的值。
The BrowserWindow module is the foundation of your Electron application, and it exposes many APIs that can change the look and behavior of your browser windows. In this tutorial, we will be going over
pre { white-space: pre-wrap; } 默认情况下,窗口(window)有四个工具:collapsible、minimizable、maximizable 和 closable。比如我们定义以下窗口(window): <div id="win" title="My Window"> window content </div> 如需自定义工具
我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?
尝试合并多个 Kafka 流,聚合
本文向大家介绍javascript+html5+css3自定义提示窗口,包括了javascript+html5+css3自定义提示窗口的使用技巧和注意事项,需要的朋友参考一下 javascript自定义提示窗口效果图: 源码: 1.demo.jsp 2.myAlert.js 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。
我正在搜索JavaFX中弹出窗口的示例。我有JavaFX应用程序,有一次我需要一个弹出窗口出现。这个弹出窗口需要一些复杂的输入,我需要处理和检查并返回主应用程序/窗口。 现在的问题是,我在任何地方都找不到一个示例,说明如何在一个JavaFX控制器类中调用Now JavaFX弹出窗口?我只找到了examle如何创建对话框弹出窗口,但找不到基于JavaFX的新弹出窗口示例(我看到了一个解决方案,其中并