当前位置: 首页 > 知识库问答 >
问题:

一个清理闪烁流状态如何为非活动键?

车靖琪
2023-03-14

我的目标是有一个Flink流程序,保留最后的N个id,其中id是从事件中提取的。接收器是一个Cassandra存储区,因此可以随时获取ID列表。重要的是,卡桑德拉在每一次事件发生时都要立即得到最新消息。

case class MyEvent(userId: Int, id: String)

env
  .addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
  .keyBy(_.userId)
  .mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
    val keepNIds = currentIds match {
      case None => Seq(in.id)
      case Some(cids) => (cids :+ in.id).takeRight(100)
    }
    ((in.userId, keepNIds), Some(keepNIds))
  }
  .addSink { in: (Int, Seq[String]) =>
    CassandraSink.appDatabase.idsTable.store(...)
  }

共有1个答案

鱼宜
2023-03-14

生长状态是一个重要而正确的观察。如果你的键空间在移动,这肯定会发生。

Flink 1.2.0添加了processfunction来解决这个问题。ProcessFunctionFlatMapFunction类似,但可以访问计时器服务。您可以注册计时器,当计时器过期时调用ontimer()回调函数。回调可用于清理状态。

 类似资料:
  • 我正在阅读 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#interval-连接, 它有以下例子: 我有以下两个问题: > < li> 如果< code>o.order_time和< code>s.ship_time是正常时间列,而不是事件时间属性,那么所

  • 我正在开发一个简单的聚合,它对给定资源上发生的事件总数进行汇总(请参阅:在flink中计算总数并定期发射)。在一些人的帮助下,我成功地完成了这项工作,但现在我遇到了另一个问题。 我试图计算资源生命周期的总数,但我正在从保留期为24小时的kinesis流中读取事件。因为这意味着我无法访问在此之前发生的事件,所以我需要从一个每天计算一次总数的遗留(批处理)系统引导我的状态。 基本上,我希望以某种方式从

  • 我有一个关于在Kinesis流中分片数据的问题。我想在向我的kinesis流发送用户数据时使用一个随机分区键,这样碎片中的数据是均匀分布的。为了使这个问题更简单,我想通过在Flink应用程序中键入用户ID来聚合用户数据。

  • 2)我研究了循环分区的重新平衡。假设我建立了一个集群,如果我的源的并行度为1,如果我进行了重新平衡,我的数据是否会在机器之间进行重排以提高性能?如果是这样,是否有一个特定的端口将数据传送到集群中的其他节点? 3)状态维护有什么限制吗?我计划维护一些用户id相关的数据,这些数据可能会变得很大。我读到flink使用rocks db来维护状态。只是想检查一下是否有限制可以维护多少数据? 4)同样,如果数

  • 我正在尝试对Flink中的KeyedStream执行映射操作: JsonToObjectMapper运算符的输出是类MessageObject的POJO,它有一个字符串字段'keyfield'。然后在该字段上键入流。 代码抛出NullPointer异常: 似乎其中一个KeyedStream的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效字符串。根据Flink文

  • 我正在使用眨眼计划程序。这是我的 sql test_table是Kafka桌 我设置了表.exec.state.ttl=10000 并运行我的程序,然后我继续发送消息。 由于我将状态ttl和cep interval都设置为10s,当我启动它时,状态的大小在10秒后应该是一个固定的数字。 但事实是,该州至少持续增长15分钟。此外,jvm触发了两次完整的gc。 是否有我尚未配置的配置