以下是更改前的构建。gradle.kts
:
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.2.2.RELEASE"
id("io.spring.dependency-management") version "1.0.9.RELEASE"
kotlin("jvm") version "1.3.72"
kotlin("plugin.spring") version "1.3.72"
}
group = "ourGroup"
version = "0.0.1"
java.sourceCompatibility = JavaVersion.VERSION_1_8
repositories {
mavenCentral()
}
extra["springCloudVersion"] = "Hoxton.RELEASE"
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
dependencies {
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("io.micrometer:micrometer-registry-prometheus")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation("org.springframework.boot:spring-boot-starter-test") {
exclude(group = "org.junit.vintage", module = "junit-vintage-engine")
}
testImplementation("io.projectreactor:reactor-test")
testImplementation("org.springframework.security:spring-security-test")
}
tasks.withType<Test> {
useJUnitPlatform()
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "1.8"
}
}
现在,如果我们只是用如下所示的新的Spring Boot版本更新build.gradle.kts
,我们的kafka度量标准就会消失:
id("org.springframework.boot") version "2.3.0.RELEASE"
提前感谢您的帮助!如果你需要任何其他细节让我知道!
它适用于普通的spring-kafka消费者
@SpringBootApplication
public class So62292889Application {
public static void main(String[] args) {
SpringApplication.run(So62292889Application.class, args);
}
@KafkaListener(id = "so62292889", topics = "so62292889")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62292889").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(MeterRegistry registry) {
return args -> {
registry.getMeters().forEach(meter -> System.out.println(meter.getId()));
};
}
}
MeterId{name='kafka.consumer.outgoing.byte.total'...
我看你用的是春云流。问题是这个项目创建了自己的生产者/消费者工厂,并且没有添加千分尺监听器。
我打开了一个针对活页夹的问题。
嗨,我正在将kafka升级到.9,并将kafka consumer升级到与.9一起发布的新java consumer。在升级时,我使用的是现有的主题,步骤只是停止.8 kafka并开始指向相同log.dirs的.9 kafka。在消费者端,我使用的是相同的组名和主题名,但是新的消费者从主题中的起始位置再次使用消息。我已经把他们交了。我正在添加auto.offset.reset=最早。 任何想法为什
我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答
我在kafka文档中读到:kafka还有一个命令行使用者,它将把消息转储到标准输出。 bin/kafka-console-consumer.sh--zookeeper localhost:2181--topic test--从头开始 我想知道如果我想要使用消息并将它们推送到另一个输出,应该向上面的命令添加哪些选项。kafka-console-consumer没有--help选项,我找不到任何参数,
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka