我正在函数式编程中使用spring-cloud-stream-binder-kafka-streams:3.1.1
。我尝试了许多组合来设置GroupId,但是使用者总是将GroupId打印为spring.application.name。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.nt.utility</groupId>
<artifactId>kafka-messages</artifactId>
<version>2.0.4</version>
<packaging>jar</packaging>
<name>kafka-messages</name>
<description>Kafka Messages</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<lombok-version>1.16.8</lombok-version>
<maven-version>2.2.4.RELEASE</maven-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${maven-version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
<shutdown>kill</shutdown>
</configuration>
</plugin>
</plugins>
</build>
</project>
@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input;
}
}
spring:
application.name: kafka-messages
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
destination: words
group: group-1
process-out-0:
destination: counts
kafka:
bindings:
process-out-0:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
process-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
value.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
streams:
binder:
brokers: localhost:9092
auto-create-topics: false
2021-02-26 23:29:03.677 INFO 42872 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Subscribed to topic(s): words
2021-02-26 23:29:03.809 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Discovered group coordinator 127.0.0.1:9092 (id: 2147483647 rack: null)
2021-02-26 23:29:03.811 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] (Re-)joining group
2021-02-26 23:29:06.705 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Setting offset for partition words-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 0 rack: null)], epoch=0}}
如您所见,groupId不是从yml文件设置的。我在这上面花了很多小时,但没有运气。请帮忙。
更新
当我不使用将pom依赖项修改为spring-cloud-stream-binder-kafka:3.1.1的streams API时,GroupId似乎可以正确应用
这与Spring无关;它是由KafkaStreams本身设置的。
https://kafka.apache.org/documentation/#streamsconfigs
application.id
流处理应用程序的标识符。在Kafka集群中必须是唯一的。它被用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)changelog主题前缀。
我们在Spring Boot的基础上开发了一个内部公司框架,我们希望通过Spring Cloud Stream支持Kafka Streams。我们需要自动向所有出站消息注入一些头。我们通过标准的Spring Cloud Stream Kafka Binder注册了一个定制的,实现了这一点,但这不适用于Kafka Streams,因为它们似乎遵循不同的路径。 对于Spring Cloud Strea
我有严重的问题处理Spring云流Kafka活页夹。Spring Cloud 3.0.2.Release的配置设置中存在许多模糊性和一致性问题。我一直试图为Kafka主题设置组ID和客户端ID,但是尽管尝试了各种不同的组合,我还是无法正确配置组ID。 文档声称,我们应该能够通过配置以下设置之一来设置组id和客户端id:https://cloud.spring.io/spring-cloud-sta
我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。 我按照链接中的示例,尝试了下面的代码。 应用属性 哈格。JAVA Transporter.java EGSRC处理器。JAVA
我正在使用Spring云流Kafka活页夹编写Kafka生产者和消费者。我想在生产者和消费者中访问以下信息 a) 主题 b)分区 b)偏移 我确实检查了文档,但并没有真正找到在哪里完成这项工作。我在文档中看到的只是指定一个作为生产者/消费者配置的一部分。有人能告诉我这到底是怎么做到的吗?
我们有一个Kafka过程,在这个过程中,我们消费来自一个主题的消息,然后进行一些充实,然后我们将消息发布到另一个主题。以下是事件 消费者-消费信息 我正在使用Spring Cloud kafka活页夹版本3.0.0-RELEASE,事情进展顺利。最近我们引入了幂等生产者,并包含了transactionIdPrefix属性,我们观察到我们开始出现性能问题。以下是统计数据。 在transactionI
我用的是Spring boot 1.5.9.RELEASE和Spring cloud Edgware。跨微服务发布。 我使用注释绑定了一个消费者。注释将完成我使用事件的其余部分。 出现了一些手动配置主题名称和其他一些配置属性的需求,我希望在应用程序启动时覆盖application.properties中定义的一些消费者属性。 有什么直接的方法吗?