当前位置: 首页 > 知识库问答 >
问题:

spring cloud stream starter stream kafka消费者SSL配置

梁巴英
2023-03-14

Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置:

security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

我有以下引导yml文件的配置部分

    spring:
     cloud:
      stream:
       bindings:
         <binding configuration>
       kafka:
        binder:
         autoCreateTopics: false
         brokers: <list of kafka brokers>
         defaultBrokerPort: <default port>
        configuration:
         security:
          protocol: SSL
         ssl:
          truststore:
           location: <path to cliend truststore jks>
           password: <password>
           type: JKS
          keystore:
           location: <path to cliend keystore jks>
           password: <password>
           type: JKS
          key:
           password: <password>
          enabled:
           protocols: TLSv1.2,TLSv1.1,TLSv1

共有1个答案

臧令
2023-03-14

我认为在您提供的示例yaml中,不能将安全性和协议(例如)放在两个级别,因为Kafka正在寻找security.protocolssl.truststore.location等属性。因此,在创建yaml文件时,在层次结构的同一级别上提供所有与安全性相关的Kafka属性。否则,spring将它们作为键/值对。

spring:
     cloud:
      stream:
       bindings:
         <binding configuration>
       kafka:
        binder:
         autoCreateTopics: false
         brokers: <list of kafka brokers>
         defaultBrokerPort: <default port>
        configuration:
         security.protocol: SSL
         ssl.truststore.location: <path to cliend truststore jks>
         ssl.truststore.password: <password>
         ssl.truststore.type: JKS
...
 类似资料:
  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 我有两个Kafka监听器组件,每个组件监听不同的主题并期待不同的有效负载。我的问题是,我可以对两者使用相同的客户端id吗?还是必须使用不同的客户端id?如果客户端id必须不同,我想了解一个可以有效使用客户端id的用例。

  • 我正在努力为Kafka-Streams正确配置Spring Cloud Stream,以便使用带有信任存储和密钥存储的SSL。 在我的应用程序中,我有多个流正在运行,所有流的SSL配置应该是相同的。 stream2:Topic2>Topic4 Topic3 stream3:Topic4>Topic5 我使用最新的Spring-Cloud Stream框架和Kafka-Streams以及Avro模型

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者