当前位置: 首页 > 知识库问答 >
问题:

如何在Kafka流中添加冷却/速率限制?

金毅
2023-03-14

我所尝试的:

使用aggreate作为有状态转换,但聚合状态依赖于时间。然而,即使得到的KTable在聚合值中没有变化,但是KTable(作为changelog)将继续向下发送元素,因此无法实现对流进行“速率限制”的预期效果

val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
  .groupBy((key, string) => string)
  .aggregate(() => "constant",
    (aggKey: String, value: String, aggregate: String) => aggregate,
    stringSerde,
    "name")
  .print

提供以下输出:

[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)

可能的重复涉及到记录缓存如何在聚合决定在下游发布元素时引起一些混乱的问题。然而,主要的问题是如何在DSL中实现“速率限制”。正如@Miguno所指出的那样,必须返回到较低级别的处理器API。下面我贴了一个相当冗长的方法:

  val logConfig = new util.HashMap[String, String]();
  // override min.insync.replicas
  logConfig.put("min.insyc.replicas", "1")

  case class StateRecord(alert: Alert, time: Long)

  val countStore = Stores.create("Limiter")
    .withKeys(integerSerde)
    .withValues(new JsonSerde[StateRecord])
    .persistent()
    .enableLogging(logConfig)
    .build();
  builder.addStateStore(countStore)

  class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
    var context: ProcessorContext = null;
    var store: KeyValueStore[Integer, StateRecord] = null;

    override def init(context: ProcessorContext) = {
      this.context = context
      this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
    }

    override def transform(key: Integer, value: Alert) = {
      val current = System.currentTimeMillis()
      val newRecord = StateRecord(value._1, value._2, current)
      store.get(key) match {
        case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
          store.put(key, newRecord)
          (key, value)
        }
        case StateRecord(_, _) => null
        case null => {
          store.put(key, newRecord)
          (key, value)
        }
      }
    }
  }

共有1个答案

万俟穆冉
2023-03-14

假设我有一个(User,Alert)元组流。我想要的是对每个用户的流进行速率限制。即。我想要一个只为用户输出一次警报的流。在接下来的60分钟内,用户的任何传入警报都应该被吞没。在这60分钟之后,一个传入警报将再次触发。

当使用Kafka流的DSL时,这目前是不可能的。相反,您可以(并且需要)使用较低级别的处理器API手动实现这样的行为。

注意:我们一直在Kafka社区讨论是否要在DSL中添加这样的功能(通常称为“触发器”)。到目前为止,决定是暂时不具备这种功能。

 类似资料:
  • 我的计算图的一个阶段是类型的流。显然,这个阶段应该为每个请求分配一个响应,并在所有请求都被解决后发出seq。 现在,底层API有一个苛刻的速率限制策略,所以我每秒只能激发一个请求。如果我有一个的单个,我可以使用每秒发出单个元素的来这个流(如何限制Akka流每秒只执行和发送一个消息一次?),但在这种情况下我没有看到类似的解决方案。 有什么好的表达方式吗?我想到的想法是使用低层图DSL并在那里使用一秒

  • 我正试图将mkv文件(见下面的属性)发送到Kinesis视频流。我想有10-15帧每秒的FPS。

  • 速率限制配置参考 filter.http.RateLimit filter.http.RateLimit proto { "domain": "...", "stage": "...", "request_type": "...", "timeout": "{...}" } domain (string, REQUIRED) 需要调用速率限制服务时的域。 stage (uint3

  • 速率限制配置参考。 filter.network.RateLimit filter.network.RateLimit proto { "stat_prefix": "...", "domain": "...", "descriptors": [], "timeout": "{...}" } stat_prefix (string, REQUIRED) 发布统计信息时使用的前缀。

  • 速率限制配置概述。 { "name": "rate_limit", "config": { "domain": "...", "stage": "...", "request_type": "...", "timeout_ms": "..." } } domain (required, string) 调用速率限制服务时使用的域。 stage (opt

  • 速率限制配置参考。 { "name": "ratelimit", "config": { "stat_prefix": "...", "domain": "...", "descriptors": [], "timeout_ms": "..." } } stat_prefix (required, string) 发布统计信息时使用的前缀。 domai