当前位置: 首页 > 知识库问答 >
问题:

如何计算Spring网络流量和千分尺中的Kafka消费者处理时间

劳法
2023-03-14

我是反应式编程的新手,并在Spring Webflux中编写了一个kafka消费者来消费事件,对其进行处理并处理成功和失败的场景。我想了解如何使用千分尺计算处理每个事件(成功和失败)所花费的时间指标。我知道我们可以使用千分尺的定时器接口来计算这样的处理时间-

Timer timer = Timer.builder("kafka.consumer.time")
                    .tag("eventType", "Event A")
                    .register(meterRegistry);
    timer.record(Duration.ofMillis(System.currentTimeMillis() - inTime));

其中inTime-事件处理开始的时间。

但是我不知道我应该如何在反应式编程场景中使用“inTime ”,因为它是一个事件流,而且它将一次处理多个事件。

 @EventListener(ApplicationReadyEvent.class)
public void consume() {
    kafkaReceiver
            .receive()
            .concatMap(res -> kafkaHelper.process(res)
                    .doOnError(error -> {
                        log.error("Error occurred);
                    }).retryWhen(Retry.backoff(3, Duration.ofSeconds(9)).jitter(0.5))
                    .onErrorResume(error -> {
                        log.error("Retry exhausted);
                        return Mono.empty();
                    })
                    .doOnSuccess(val -> {
                        res.receiverOffset().acknowledge();
                    })).subscribe();

}

请帮忙。提前致谢。

共有1个答案

相德宇
2023-03-14

围绕这个问题空间,你可以研究三件事:

    < li>Reactor支持千分尺发布,因此您可以命名您的< code>Flux并请求记录如下指标:
receive()
    .name("kafka.events") 
    .metrics() 
    .doOnNext(System.out::println)
    .subscribe();
 类似资料:
  • 我想测量使用WebFlux进行的一些异步调用的长度。我已经阅读了各种来源,因为我了解到注释与AspectJ一起工作,基本上只是在方法调用之前启动计时器,然后停止。这显然不适用于异步方法。 是否有任何针对WebFlux的解决方案,或者我唯一能做的就是传递执行时间戳,使我的应用程序逻辑混乱?

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我正在使用spring-kafka“2.2.7.RELEASE”来创建一个批处理消费者,并且我正在尝试了解当我的记录处理时间超过 max.poll.interval.ms 时消费者重新平衡是如何工作的。 这是我的配置。 这是我的出厂设置。 我添加了自定义消费者监听器,如下所示。 现在我期望消费者群体能够重新平衡,因为处理时间超过 max.poll.interval.ms 但我没有看到任何这样的行为

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我想测量处理某些数据需要多长时间:我的应用程序以固定速率从给定源读取该数据。在每个圆圈之前,我存储。我读取数据,将单个时间戳添加到每个条目中。数据被存储,转换,就在我通过WebSockets发送它之前,我想测量和初始时间戳之间的持续时间。 我试过了 但是可视化这一点只允许我使用<code>processingDuration_seconds_count、<code>_max和<code>_ sum

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费