当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者指标从Spring Boot 2.2.2升级到2.3.0

涂羽
2023-03-14
    null
    null
  • 我们正在为我们的kafka broker(kafka Version2.1.1)使用Red Hat AMQ Streams
  • 我们在环境中唯一更改的是Spring Boot版本(以及自动拉入/更新的依赖项),以重新创建此问题

以下是更改前的构建。gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.2.2.RELEASE"
    id("io.spring.dependency-management") version "1.0.9.RELEASE"
    kotlin("jvm") version "1.3.72"
    kotlin("plugin.spring") version "1.3.72"
}

group = "ourGroup"
version = "0.0.1"
java.sourceCompatibility = JavaVersion.VERSION_1_8

repositories {
    mavenCentral()
}

extra["springCloudVersion"] = "Hoxton.RELEASE"

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

dependencies {
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    implementation("io.micrometer:micrometer-registry-prometheus")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    testImplementation("org.springframework.boot:spring-boot-starter-test") {
        exclude(group = "org.junit.vintage", module = "junit-vintage-engine")
    }
    testImplementation("io.projectreactor:reactor-test")
    testImplementation("org.springframework.security:spring-security-test")
}

tasks.withType<Test> {
    useJUnitPlatform()
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "1.8"
    }
}

现在,如果我们只是用如下所示的新的Spring Boot版本更新build.gradle.kts,我们的kafka度量标准就会消失:

    id("org.springframework.boot") version "2.3.0.RELEASE"

提前感谢您的帮助!如果你需要任何其他细节让我知道!

共有1个答案

端木皓君
2023-03-14

它适用于普通的spring-kafka消费者

@SpringBootApplication
public class So62292889Application {

    public static void main(String[] args) {
        SpringApplication.run(So62292889Application.class, args);
    }

    @KafkaListener(id = "so62292889", topics = "so62292889")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62292889").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(MeterRegistry registry) {
        return args -> {
            registry.getMeters().forEach(meter -> System.out.println(meter.getId()));
        };
    }

}
MeterId{name='kafka.consumer.outgoing.byte.total'...

我看你用的是春云流。问题是这个项目创建了自己的生产者/消费者工厂,并且没有添加千分尺监听器。

我打开了一个针对活页夹的问题。

 类似资料:
  • 嗨,我正在将kafka升级到.9,并将kafka consumer升级到与.9一起发布的新java consumer。在升级时,我使用的是现有的主题,步骤只是停止.8 kafka并开始指向相同log.dirs的.9 kafka。在消费者端,我使用的是相同的组名和主题名,但是新的消费者从主题中的起始位置再次使用消息。我已经把他们交了。我正在添加auto.offset.reset=最早。 任何想法为什

  • 我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答

  • 我在kafka文档中读到:kafka还有一个命令行使用者,它将把消息转储到标准输出。 bin/kafka-console-consumer.sh--zookeeper localhost:2181--topic test--从头开始 我想知道如果我想要使用消息并将它们推送到另一个输出,应该向上面的命令添加哪些选项。kafka-console-consumer没有--help选项,我找不到任何参数,

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka