我必须为kafka流消费者设置一个组id,它符合严格的命名约定。
在深入跟踪留档后,我找不到有效的方法。因为我仍然相信我可能有误解,我更喜欢在这里打开一个问题以供同行审查,然后再打开一个bug问题。
一年前就已经问过一个类似的问题,但这个问题不是很夸张,还没有回答,我希望我能在这里对这个问题有更多的见解。
从官方文档的几个来源来看,我看到在我的应用程序的 application.yaml
中配置这应该很容易。
文件中指出,我可以:
kafka.default.group=
如果我直接在
spring.kafka consumer中设置kafka泛型字段
该参数被显式忽略,我得到以下group id
。组id警告
:
2022-08-10 10:18:18.376 [main] [WARN ] [o.s.c.s.b.k.s.p.KafkaStreamsBinderConfigurationProperties] - Ignoring provided value(s) for 'group.id'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify the group instead of group.id
所以我也在
spring.cloud.stream.default.group和spring.cloud.stream.binding
两个部分进行了尝试。
编辑:根据@Olegzhurakousky的评论,这只是错误消息中的一个错别字。我在使用和不使用
s
的情况下进行了测试,但没有成功。
我已经快速查看了流代码,这个属性似乎确实是必须设置的属性,就像他们在测试中所做的那样,我们可以看到他们使用了例如:
--spring.cloud.stream.binding.uppercase-in-0.group=inputGroup
。
在测试了上述所有配置后,组 ID 似乎总是被忽略。该组始终设置为默认值,即
groupId=process-applicationId
。
例如在如下日志中:
2022-08-10 10:30:56.644 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.c.c.i.SubscriptionState] - [Consumer clientId=process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1-consumer, groupId=process-applicationId] Resetting offset for partition my-custom-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
2022-08-10 10:32:56.713 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
2022-08-10 10:34:56.767 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
这就像根本没有使用组的应用程序
.yaml
一样。另一方面,spring.cloud.stream.bindings.process-in-0.destination=my-custom-topic
字段设置了 destination:my-custom-topic
被理解并正确遵循了主题(请参阅上面的日志)。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.2.4</version>
</dependency>
package my.custom.stuff;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class myKafkaStreamConsumer {
private static final Logger logger = LoggerFactory.getLogger(myKafkaStreamConsumer.class);
@Bean
public static Consumer<KStream<String, String>> process() {
return input ->
input.foreach((key, value) -> {
logger.debug("from STREAM: Key= {} , value = {}", key, value);
// ...
// my message handling business logic
// ...
});
}
}
我在这里放了application.yaml的版本,IMHO应该是最符合文档的,但仍然没有工作,请注意< code>destination
使用正确,因此至少它使用了正确的通道。
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
auto-offset-reset: earliest
cloud:
stream:
bindings:
process-in-0:
group: myCustomGroupId
destination: "my-custom-topic"
我尝试过以多种方式注入组id,包括:
< li >我能在任何官方文档或示例中找到的所有可能的组合 < li >将其添加到< code>consumer
子部分,例如< code > spring . cloud . stream . bindings . process-in-0 . consumer . group 或< code > spring . cloud . stream . bindings . process-in-0 . consumer . group-id < li >将官方记录的密钥作为环境变量注入
它似乎总是被忽视。
>
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.4.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_summary_of_function_based_programming_styles_for_kafka_streams
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties
声明一下,我对Spring有点生疏了,但是因为过去几个月我一直在和Kafka一起工作,所以我也想玩这个。我通过做两件事让它工作起来:
>
在应用程序属性中使用Application ationId
而不是group
spring:
kafka:
bootstrap-servers: localhost:29092
consumer:
auto-offset-reset: earliest
cloud:
stream:
kafka:
binder:
functions:
process:
applicationId: MyGroupIdUsingApplicationId
bindings:
process-in-0:
bindings:
process-in-0:
destination: my-custom-topic
显式声明一个KafkaBinderConfigurationProperties
bean
我在这里创建了一个工作示例,供您克隆和测试,如果您需要:https://github.com/T-TK-Wan/SO-Spring_Cloud_Streams_Kafka_GroupId
编辑:
我只想补充一点,我只是关注GroupId是否可以设置,它是否可以正确注册,使用< code>applicationId属性是否正确,以及有什么副作用,我还没有深入研究。
我已经编写了许多通过RESTAPI调用进行通信的服务。这些服务可以配置为使用HTTP或HTTPS。任何给定的客户端都具有定义到服务器的连接的安全配置。“默认”配置属性由应用程序中的值设置。yml在这一点上效果很好。 然而,我逐渐意识到,这在更现实的情况下并不适用。问题是,我试图设置特定的参数,例如启动客户端时的服务器主机/端口,而我设置的值被忽略。 例如: 服务A(客户端)将出于某种目的与服务B(
正在尝试配置Spring-Cloud-Stream以从Azure事件中心使用。使用下面的yml将身份验证从SAS密钥切换到服务原则 但是错误就像 原因:java。lang.IllegalArgumentException:未为组织的JAAS配置中的键“service\u principle\u azure”指定值。阿帕奇。Kafka。常见的安全JaasConfig。org上的parseAppCon
null 检索存储在Spring Cloud Config中的配置,包括Vault令牌。 使用Vault令牌连接到Spring Cloud Vault,然后检索存储在Spring Cloud Vault中的配置。 bootstrap.properties: Spring Cloud配置服务器: 如果是,我们是否需要设置属性检索的顺序?即。为Spring Cloud Vault设置。Spring C
在使用Kafka的Spring Boot中,我可以如下设置ConsumerFactory的属性: } 使用Kafka Streams,我可以在代码中设置属性,如下所示: 使用Spring Cloud Streams和Kafka Streams时,所有属性似乎仅通过应用程序输入。属性或应用程序。资源文件夹中的yml文件,如 在将Spring Cloud Streams与Kafka Streams一起
我正在使用开发一个网关项目。我可以看到重试过滤器已经存在于项目的源代码中。 但是没有关于如何通过yml Congigurations来配置它的文档。任何一个与它一起工作或与您的项目集成。谁能提供任何线索。
因此,我认为我自己陷入了困惑,因为我知道SpringCloudStreams有两种不同的Kafka活页夹: 《春云流》Kafka活页夹 我正在寻找正确的YAML设置,以便在spring cloud streams的常规kafka活页夹中定义序列化器和反序列化器: 我可以使用以下逻辑调整默认值: 即: 我想我应该能够在每个主题的基础上做到这一点: 我遇到了设置: 但这似乎不起作用。我可以设置一个简单