当前位置: 首页 > 知识库问答 >
问题:

配置的组ID在spring-cloud-streams中被忽略

危飞跃
2023-03-14

我必须为kafka流消费者设置一个组id,它符合严格的命名约定。

在深入跟踪留档后,我找不到有效的方法。因为我仍然相信我可能有误解,我更喜欢在这里打开一个问题以供同行审查,然后再打开一个bug问题。

一年前就已经问过一个类似的问题,但这个问题不是很夸张,还没有回答,我希望我能在这里对这个问题有更多的见解。

从官方文档的几个来源来看,我看到在我的应用程序的 application.yaml 中配置这应该很容易。

文件中指出,我可以:

  • 对所有活页夹使用默认值,使用Spring.云.流.kafka.default.group=

如果我直接在spring.kafka consumer中设置kafka泛型字段group id。组id该参数被显式忽略,我得到以下警告

2022-08-10 10:18:18.376 [main] [WARN ] [o.s.c.s.b.k.s.p.KafkaStreamsBinderConfigurationProperties] - Ignoring provided value(s) for 'group.id'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify the group instead of group.id

所以我也在spring.cloud.stream.default.group和spring.cloud.stream.binding两个部分进行了尝试

编辑:根据@Olegzhurakousky的评论,这只是错误消息中的一个错别字。我在使用和不使用s的情况下进行了测试,但没有成功。

我已经快速查看了流代码,这个属性似乎确实是必须设置的属性,就像他们在测试中所做的那样,我们可以看到他们使用了例如:--spring.cloud.stream.binding.uppercase-in-0.group=inputGroup

在测试了上述所有配置后,组 ID 似乎总是被忽略。该组始终设置为默认值,即 groupId=process-applicationId

例如在如下日志中:

2022-08-10 10:30:56.644 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.c.c.i.SubscriptionState] - [Consumer clientId=process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1-consumer, groupId=process-applicationId] Resetting offset for partition my-custom-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
2022-08-10 10:32:56.713 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
2022-08-10 10:34:56.767 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update

这就像根本没有使用组的应用程序.yaml一样。另一方面,spring.cloud.stream.bindings.process-in-0.destination=my-custom-topic 字段设置了 destination:my-custom-topic 被理解并正确遵循了主题(请参阅上面的日志)。

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>3.2.4</version>
        </dependency>

package my.custom.stuff;


import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class myKafkaStreamConsumer {

    private static final Logger logger = LoggerFactory.getLogger(myKafkaStreamConsumer.class);

    @Bean
    public static Consumer<KStream<String, String>> process() {
        return input ->
                input.foreach((key, value) -> {
                    logger.debug("from STREAM: Key= {} , value = {}", key, value);
                    // ...
                    // my message handling business logic
                    // ...
                });
    }
}

我在这里放了application.yaml的版本,IMHO应该是最符合文档的,但仍然没有工作,请注意< code>destination使用正确,因此至少它使用了正确的通道。

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
  cloud:
    stream:
      bindings:
        process-in-0:
          group: myCustomGroupId
          destination: "my-custom-topic"

我尝试过以多种方式注入组id,包括:

    < li >我能在任何官方文档或示例中找到的所有可能的组合 < li >将其添加到< code>consumer子部分,例如< code > spring . cloud . stream . bindings . process-in-0 . consumer . group 或< code > spring . cloud . stream . bindings . process-in-0 . consumer . group-id < li >将官方记录的密钥作为环境变量注入

它似乎总是被忽视。

>

  • https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.4.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_summary_of_function_based_programming_styles_for_kafka_streams

    https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties

  • 共有1个答案

    华浩壤
    2023-03-14

    声明一下,我对Spring有点生疏了,但是因为过去几个月我一直在和Kafka一起工作,所以我也想玩这个。我通过做两件事让它工作起来:

    >

  • 在应用程序属性中使用Application ationId而不是group

    spring:
      kafka:
        bootstrap-servers: localhost:29092
        consumer:
          auto-offset-reset: earliest
      cloud:
        stream:
          kafka:
            binder:
              functions:
                process:
                  applicationId: MyGroupIdUsingApplicationId
          bindings:
            process-in-0:
              bindings:
                process-in-0:
                  destination: my-custom-topic
    
    

    显式声明一个KafkaBinderConfigurationPropertiesbean

    我在这里创建了一个工作示例,供您克隆和测试,如果您需要:https://github.com/T-TK-Wan/SO-Spring_Cloud_Streams_Kafka_GroupId

    编辑:

    我只想补充一点,我只是关注GroupId是否可以设置,它是否可以正确注册,使用< code>applicationId属性是否正确,以及有什么副作用,我还没有深入研究。

  •  类似资料:
    • 我已经编写了许多通过RESTAPI调用进行通信的服务。这些服务可以配置为使用HTTP或HTTPS。任何给定的客户端都具有定义到服务器的连接的安全配置。“默认”配置属性由应用程序中的值设置。yml在这一点上效果很好。 然而,我逐渐意识到,这在更现实的情况下并不适用。问题是,我试图设置特定的参数,例如启动客户端时的服务器主机/端口,而我设置的值被忽略。 例如: 服务A(客户端)将出于某种目的与服务B(

    • 正在尝试配置Spring-Cloud-Stream以从Azure事件中心使用。使用下面的yml将身份验证从SAS密钥切换到服务原则 但是错误就像 原因:java。lang.IllegalArgumentException:未为组织的JAAS配置中的键“service\u principle\u azure”指定值。阿帕奇。Kafka。常见的安全JaasConfig。org上的parseAppCon

    • null 检索存储在Spring Cloud Config中的配置,包括Vault令牌。 使用Vault令牌连接到Spring Cloud Vault,然后检索存储在Spring Cloud Vault中的配置。 bootstrap.properties: Spring Cloud配置服务器: 如果是,我们是否需要设置属性检索的顺序?即。为Spring Cloud Vault设置。Spring C

    • 在使用Kafka的Spring Boot中,我可以如下设置ConsumerFactory的属性: } 使用Kafka Streams,我可以在代码中设置属性,如下所示: 使用Spring Cloud Streams和Kafka Streams时,所有属性似乎仅通过应用程序输入。属性或应用程序。资源文件夹中的yml文件,如 在将Spring Cloud Streams与Kafka Streams一起

    • 我正在使用开发一个网关项目。我可以看到重试过滤器已经存在于项目的源代码中。 但是没有关于如何通过yml Congigurations来配置它的文档。任何一个与它一起工作或与您的项目集成。谁能提供任何线索。

    • 因此,我认为我自己陷入了困惑,因为我知道SpringCloudStreams有两种不同的Kafka活页夹: 《春云流》Kafka活页夹 我正在寻找正确的YAML设置,以便在spring cloud streams的常规kafka活页夹中定义序列化器和反序列化器: 我可以使用以下逻辑调整默认值: 即: 我想我应该能够在每个主题的基础上做到这一点: 我遇到了设置: 但这似乎不起作用。我可以设置一个简单