我的目标是有一个Flink流程序,保留最后的N个id,其中id是从事件中提取的。接收器是一个Cassandra存储区,因此可以随时获取ID列表。重要的是,卡桑德拉在每一次事件发生时都要立即得到最新消息。
case class MyEvent(userId: Int, id: String)
env
.addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
.keyBy(_.userId)
.mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
val keepNIds = currentIds match {
case None => Seq(in.id)
case Some(cids) => (cids :+ in.id).takeRight(100)
}
((in.userId, keepNIds), Some(keepNIds))
}
.addSink { in: (Int, Seq[String]) =>
CassandraSink.appDatabase.idsTable.store(...)
}
生长状态是一个重要而正确的观察。如果你的键空间在移动,这肯定会发生。
Flink 1.2.0添加了processfunction
来解决这个问题。ProcessFunction
与FlatMapFunction
类似,但可以访问计时器服务。您可以注册计时器,当计时器过期时调用ontimer()
回调函数。回调可用于清理状态。
我正在阅读 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#interval-连接, 它有以下例子: 我有以下两个问题: > < li> 如果< code>o.order_time和< code>s.ship_time是正常时间列,而不是事件时间属性,那么所
我正在开发一个简单的聚合,它对给定资源上发生的事件总数进行汇总(请参阅:在flink中计算总数并定期发射)。在一些人的帮助下,我成功地完成了这项工作,但现在我遇到了另一个问题。 我试图计算资源生命周期的总数,但我正在从保留期为24小时的kinesis流中读取事件。因为这意味着我无法访问在此之前发生的事件,所以我需要从一个每天计算一次总数的遗留(批处理)系统引导我的状态。 基本上,我希望以某种方式从
我有一个关于在Kinesis流中分片数据的问题。我想在向我的kinesis流发送用户数据时使用一个随机分区键,这样碎片中的数据是均匀分布的。为了使这个问题更简单,我想通过在Flink应用程序中键入用户ID来聚合用户数据。
2)我研究了循环分区的重新平衡。假设我建立了一个集群,如果我的源的并行度为1,如果我进行了重新平衡,我的数据是否会在机器之间进行重排以提高性能?如果是这样,是否有一个特定的端口将数据传送到集群中的其他节点? 3)状态维护有什么限制吗?我计划维护一些用户id相关的数据,这些数据可能会变得很大。我读到flink使用rocks db来维护状态。只是想检查一下是否有限制可以维护多少数据? 4)同样,如果数
我正在尝试对Flink中的KeyedStream执行映射操作: JsonToObjectMapper运算符的输出是类MessageObject的POJO,它有一个字符串字段'keyfield'。然后在该字段上键入流。 代码抛出NullPointer异常: 似乎其中一个KeyedStream的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效字符串。根据Flink文
我正在使用眨眼计划程序。这是我的 sql test_table是Kafka桌 我设置了表.exec.state.ttl=10000 并运行我的程序,然后我继续发送消息。 由于我将状态ttl和cep interval都设置为10s,当我启动它时,状态的大小在10秒后应该是一个固定的数字。 但事实是,该州至少持续增长15分钟。此外,jvm触发了两次完整的gc。 是否有我尚未配置的配置