当前位置: 首页 > 知识库问答 >
问题:

如何根据状态更改事件以分布式方式计算flink处于一个状态的“客户端”数量?我需要有状态的对象

阳俊德
2023-03-14

我正在使用kafka在java中进行poc项目-

在Kafka上,将产生数量不可预测的事件,从0到数千个事件/秒,例如关于特定主题的事件。

{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."} 

Flink将消耗此事件,并应每秒陷入弹性搜索每个状态中的事件数ex:

{"stateA":54, "stateB":100, ... "stateJ":34}

我有10个状态:<代码>[创建,…,删除] 平均生命周期为15分钟。状态每秒可以更改两次。理论上可以增加新的州。

为了每时每刻都让溪流下沉,我想用Flink的时间窗https://flink.apache.org/news/2015/12/04/Introducing-windows.html

问题是我需要有状态对象,其中包含有关guid的信息-

我找到了一份关于有状态蒸汽处理https://cwiki.apache.org/confluence/display/FLINK/Stateful流处理的文件草稿

我是flink和流处理的新手,我还没有深入研究flink有状态流处理。对于第一个阶段,我想使用静态对象来实现这一点,但当启动几个flink实例时,这种方法将不起作用。

我想问你:

  1. 你认为这种方法怎么样

此外,我还希望得到一些用于窗口有状态流解决方案(或其他解决方案)的代码片段。

谢谢

共有1个答案

欧阳骏俊
2023-03-14

像下面这样的怎么样?

它使用15分钟的窗口,之后窗口状态将被清理。它还使用每秒评估窗口的自定义触发器。就窗口操作而言,有一个RedueFunction,它只是为每个guid保留最新状态,还有一个WindowFunction,它发出一个(state,1)元组。然后我们通过这个状态进行键入并求和。我认为这应该会给你你想要的结果。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))

val results = stream
  .keyBy(_.guid)
  .timeWindow(Time.minutes(15))
  .trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
  .apply(
    (e1, e2) => e2,
    (k, w, i, c: Collector[(String, Long)]) => {
      if (i.head != null) c.collect((i.head.state, 1))
    }
  )
  .keyBy(0)
  .timeWindow(Time.seconds(1))
  .sum(1)
  .addSink(new ElasticsearchSink<>(...))

env.execute("Count States")

ProcessingTimeTrigger with PeriodicFirings定义如下:

object ProcessingTimeTriggerWithPeriodicFirings {
  def apply(intervalMs: Long) = {
    new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
  }
}

class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
  extends Trigger[Event, TimeWindow] {

  private val startTimeDesc =
    new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)

  override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    val startTime = ctx.getPartitionedState(startTimeDesc)
    if (startTime.value == 0) {
      startTime.update(window.getStart)
      ctx.registerProcessingTimeTimer(window.getEnd)
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
    }
    TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    if (time == window.getEnd) {
      TriggerResult.PURGE
    }
    else {
      ctx.registerProcessingTimeTimer(time + intervalMs)
      TriggerResult.FIRE
    }
  }

  override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}
 类似资料:
  • 我有10个状态:,平均生命周期为15分钟。状态每秒可以改变两次。理论上可以增加新的国家。 为了每秒接收数据流,我正在考虑使用Flink的时间窗口https://flink.apache.org/news/2015/12/04/induction-windows.html 问题是,我需要具有关于和信息的有状态对象,以便能够在新事件发生时增加/减少计数。 我对flink和流处理是新手,我还没有深入到f

  • 我试图建立一个Ingenico POS终端(iWL220)的模拟器。主屏幕上有一个组合框。一旦用户输入id和密码,组合框加载6个菜单。如果用户点击btn1,则组合框清除菜单并添加另一组菜单。如果用户点击新加载菜单的< code>btn1,则再次清除组合框并加载另一组菜单,依此类推。 我的问题是,每次点击按钮(btn1、btn2、btn3、btn4、btn5),我都要编写大量的if-else语句。实

  • 在广播模式的文档中,提到没有RocksDB状态后端: 如果应用程序使用rocksdb作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存点期间不存储,因此不会恢复?

  • 我不理解无状态会话bean。文档说,instances变量可能包含特定于客户机的状态,然后说,当方法完成时,不应该保留状态。

  • 问题内容: 从MDN for NodeList: 在某些情况下,NodeList是一个实时集合,这意味着DOM中的更改会反映在集合中。例如,Node.childNodes处于活动状态: 在其他情况下,NodeList是静态集合,这意味着DOM中的任何后续更改都不会影响集合的内容。document.querySelectorAll返回一个静态NodeList。 所以....有点烦!是否有任何关于哪些

  • 8. 状态(State) Intent 允许对象在内部状态改变时改变它的行为,对象看起来好像修改了它所属的类。 Class Diagram Implementation 糖果销售机有多种状态,每种状态下销售机有不同的行为,状态可以发生转移,使得销售机的行为也发生改变。 // java public interface State { /** * 投入 25 分钱 */