{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."}
{"stateA":54, "stateB":100, ... "stateJ":34}
我有10个状态:[创建,...,删除]
,平均生命周期为15分钟。状态每秒可以改变两次。理论上可以增加新的国家。
为了每秒接收数据流,我正在考虑使用Flink的时间窗口https://flink.apache.org/news/2015/12/04/induction-windows.html
问题是,我需要具有关于guid->previous-state
和statex->count
信息的有状态对象,以便能够在新事件发生时增加/减少计数。
我对flink和流处理是新手,我还没有深入到flink有状态流处理。在第一阶段,我想使用静态对象来实现,但是当启动几个flink实例时,这种方法就不起作用了。
我想问你:
像下面这样的东西怎么样?
它使用15分钟窗口,之后窗口状态将被清理。它还使用一个自定义触发器,该触发器每秒计算窗口的值。就窗口操作而言,有一个ReduceFunction只保留每个guid的最新状态,还有一个WindowFunction发出一个(state,1)元组。然后我们通过这个状态来确定并求和。我想这会给你带来你想要的结果。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))
val results = stream
.keyBy(_.guid)
.timeWindow(Time.minutes(15))
.trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
.apply(
(e1, e2) => e2,
(k, w, i, c: Collector[(String, Long)]) => {
if (i.head != null) c.collect((i.head.state, 1))
}
)
.keyBy(0)
.timeWindow(Time.seconds(1))
.sum(1)
.addSink(new ElasticsearchSink<>(...))
env.execute("Count States")
ProcessingTimeTriggerWithPerioFirings定义如下:
object ProcessingTimeTriggerWithPeriodicFirings {
def apply(intervalMs: Long) = {
new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
}
}
class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
extends Trigger[Event, TimeWindow] {
private val startTimeDesc =
new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)
override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
val startTime = ctx.getPartitionedState(startTimeDesc)
if (startTime.value == 0) {
startTime.update(window.getStart)
ctx.registerProcessingTimeTimer(window.getEnd)
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
}
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
if (time == window.getEnd) {
TriggerResult.PURGE
}
else {
ctx.registerProcessingTimeTimer(time + intervalMs)
TriggerResult.FIRE
}
}
override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}
我正在使用kafka在java中进行poc项目- 在Kafka上,将产生数量不可预测的事件,从0到数千个事件/秒,例如关于特定主题的事件。 Flink将消耗此事件,并应每秒陷入弹性搜索每个状态中的事件数ex: 我有10个状态:<代码>[创建,…,删除] 平均生命周期为15分钟。状态每秒可以更改两次。理论上可以增加新的州。 为了每时每刻都让溪流下沉,我想用Flink的时间窗https://flink
我不理解无状态会话bean。文档说,instances变量可能包含特定于客户机的状态,然后说,当方法完成时,不应该保留状态。
在接口流中: 中间操作可分为有状态和无状态。它们影响并行流的结果。 只有两个终端操作是非确定性方法:findAny()和forEach(Consumer)。它们影响并行流的结果。 如果中间无状态操作执行惰性操作,它们可能会产生副作用。这会影响并行Stream的结果。 中间操作可分为以下几类: 有状态 不同的() 排序() 极限(长l) 跳过(长l) 无国籍 地图(功能f) 以下是我的两个问题: >
问题内容: 从MDN for NodeList: 在某些情况下,NodeList是一个实时集合,这意味着DOM中的更改会反映在集合中。例如,Node.childNodes处于活动状态: 在其他情况下,NodeList是静态集合,这意味着DOM中的任何后续更改都不会影响集合的内容。document.querySelectorAll返回一个静态NodeList。 所以....有点烦!是否有任何关于哪些
帮助用户快速部署有状态的应用。 有状态应用即Statefulset。Statefulset的详细介绍内容,请参考kubernets官方文档-Statefulsets。 Pod是Kubernetes的最小编排单位,有状态statefulset主要用于部署实例之间有不对等关系,以及实例对外部数据有依赖关系的有状态应用。基于statefulset部署的应用有以下特点: 基于statefulset部署的p
问题内容: 根据OCP的书,必须避免有状态操作,否则称为有状态lambda表达式。本书中提供的定义是“有状态的lambda表达式,其结果取决于在管道执行期间可能更改的任何状态”。 它们提供了一个示例,其中使用并行流使用函数将固定的数字集合添加到同步的ArrayList 。 arraylist中的顺序是完全随机的,这应该使人看到有状态的lambda表达式在运行时会产生不可预测的结果。因此,强烈建议在