当前位置: 首页 > 知识库问答 >
问题:

无法在Spring-Cloud-Stream-Binder-Kafka-Streams中设置GroupId:3.1.1

贺玉石
2023-03-14

我正在函数式编程中使用spring-cloud-stream-binder-kafka-streams:3.1.1。我尝试了许多组合来设置GroupId,但是使用者总是将GroupId打印为spring.application.name。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.nt.utility</groupId>
    <artifactId>kafka-messages</artifactId>
    <version>2.0.4</version>
    <packaging>jar</packaging>

    <name>kafka-messages</name>
    <description>Kafka Messages</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <lombok-version>1.16.8</lombok-version>
        <maven-version>2.2.4.RELEASE</maven-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>3.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok-version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams-test-utils</artifactId>
            <version>${kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${maven-version}</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M4</version>
                <configuration>
                    <testFailureIgnore>true</testFailureIgnore>
                    <shutdown>kill</shutdown>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
@SpringBootApplication
public class KafkaMessageApplication {
    public static void main(String args[]) {
        SpringApplication.run(KafkaMessageApplication.class, args);
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> process() {
        return input -> input;
    }
}
spring:
  application.name: kafka-messages
  cloud:
    stream:
      function:
        definition: process
      bindings:
        process-in-0:
          destination: words
          group: group-1
        process-out-0:
          destination: counts
      kafka:
        bindings:
          process-out-0:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
          process-in-0:
            consumer:
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
        streams:
          binder:
            brokers: localhost:9092
            auto-create-topics: false
2021-02-26 23:29:03.677  INFO 42872 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Subscribed to topic(s): words
2021-02-26 23:29:03.809  INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Discovered group coordinator 127.0.0.1:9092 (id: 2147483647 rack: null)
2021-02-26 23:29:03.811  INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] (Re-)joining group
2021-02-26 23:29:06.705  INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Setting offset for partition words-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 0 rack: null)], epoch=0}}

如您所见,groupId不是从yml文件设置的。我在这上面花了很多小时,但没有运气。请帮忙。

更新

当我不使用将pom依赖项修改为spring-cloud-stream-binder-kafka:3.1.1的streams API时,GroupId似乎可以正确应用

共有1个答案

伯君浩
2023-03-14

这与Spring无关;它是由KafkaStreams本身设置的。

https://kafka.apache.org/documentation/#streamsconfigs

application.id

流处理应用程序的标识符。在Kafka集群中必须是唯一的。它被用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)changelog主题前缀。

 类似资料:
  • 我们在Spring Boot的基础上开发了一个内部公司框架,我们希望通过Spring Cloud Stream支持Kafka Streams。我们需要自动向所有出站消息注入一些头。我们通过标准的Spring Cloud Stream Kafka Binder注册了一个定制的,实现了这一点,但这不适用于Kafka Streams,因为它们似乎遵循不同的路径。 对于Spring Cloud Strea

  • 我有严重的问题处理Spring云流Kafka活页夹。Spring Cloud 3.0.2.Release的配置设置中存在许多模糊性和一致性问题。我一直试图为Kafka主题设置组ID和客户端ID,但是尽管尝试了各种不同的组合,我还是无法正确配置组ID。 文档声称,我们应该能够通过配置以下设置之一来设置组id和客户端id:https://cloud.spring.io/spring-cloud-sta

  • 我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。 我按照链接中的示例,尝试了下面的代码。 应用属性 哈格。JAVA Transporter.java EGSRC处理器。JAVA

  • 我正在使用Spring云流Kafka活页夹编写Kafka生产者和消费者。我想在生产者和消费者中访问以下信息 a) 主题 b)分区 b)偏移 我确实检查了文档,但并没有真正找到在哪里完成这项工作。我在文档中看到的只是指定一个作为生产者/消费者配置的一部分。有人能告诉我这到底是怎么做到的吗?

  • 我们有一个Kafka过程,在这个过程中,我们消费来自一个主题的消息,然后进行一些充实,然后我们将消息发布到另一个主题。以下是事件 消费者-消费信息 我正在使用Spring Cloud kafka活页夹版本3.0.0-RELEASE,事情进展顺利。最近我们引入了幂等生产者,并包含了transactionIdPrefix属性,我们观察到我们开始出现性能问题。以下是统计数据。 在transactionI

  • 我用的是Spring boot 1.5.9.RELEASE和Spring cloud Edgware。跨微服务发布。 我使用注释绑定了一个消费者。注释将完成我使用事件的其余部分。 出现了一些手动配置主题名称和其他一些配置属性的需求,我希望在应用程序启动时覆盖application.properties中定义的一些消费者属性。 有什么直接的方法吗?