当前位置: 首页 > 知识库问答 >
问题:

用Kafka连接Spring Cloud Stream SSL

翟渝
2023-03-14
spring:
  kafka:
    ssl:
      protocol: SASL_SSL
      keystore-location: classpath:kafka.p12 // created from a PEM with private key and certificate
      keystore-password: passw0rd
      key-store-type: PKCS12
  cloud:
    stream:
      bindings:
      ...
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      brokers: myhost.mydomain.com
listeners=SASL_SSL://myhost.mydomain.com:9092
advertised.listeners=SASL_SSL://myhost.mydomain.com:9092
2018-08-29 17:49:13.421 ERROR [alef-data-connector,,,] 83300 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Cannot initialize Binder

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2018-08-29 17:49:13.422 ERROR [alef-data-connector,,,] 83300 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:391) ~[spring-cloud-stream-binder-kafka-core-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:212) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:126) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:153) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:77) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:138) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:244) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:221) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:252) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:46) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[na:1.8.0_162]
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[na:1.8.0_162]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[na:1.8.0_162]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:47) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:29) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:157) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:121) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:885) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.finishRefresh(ReactiveWebServerApplicationContext.java:83) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at com.alefeducation.connector.ConnectorApplicationKt.main(ConnectorApplication.kt:13) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Spring Cloud Stream应用程序的配置是否正确。我们有Spring Cloud Stream文档解释与Kafka的SSL连接吗?

共有1个答案

马渊
2023-03-14

当您使用多绑定器支持(通过Environment属性)时,不会继承根引导属性。

您需要在环境下移动spring.kafka.ssl属性,或者,如果只有一个绑定器,则删除多绑定器配置,只在根声明绑定器属性。

编辑

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: so52080276
          binder: kafka1
          group: so52080276
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      brokers: myhost.mydomain.com
                      configuration:
                        security.protocol: SASL_SSL
                        ssl.keystore.location: classpath:kafka.p12 // created from a PEM with private key and certificate
                        ssl.keystore.password: passw0rd
                        ssl.key.store.type: PKCS12
 类似资料:
  • 我有一个kafka connect插件,部署在kafka集群中(在独立模式下,仅用于测试,目的是分布式完成)。这个Kafka连接插件使用curator连接到集群的zookeper,并从中提取一些信息,以决定如何处理这些消息。 代码如下: 在treeCache启动时超时,配置根路径存在于本地zookeeper中(已确认在zookeeper外壳中执行ls,对于我尝试使用的zkConnection字符串

  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li

  • 下面是/etc/kafka/connect-MongoDB-source.properties中的MongoDB配置 但是低于误差 以独立模式运行连接器。 我在debezium-debezium-连接器-mongob-1.0.0/debezium-connector-mongodb-1.0.0.Final.jar 类路径的设置如下 使用插件路径,我看到它能够注册和加载所有必需的插件。 但最后还是同

  • 我正在尝试使用CockroachDB (v2.0.6)作为我的一个Kafka主题的接收器。 我找不到任何专门用于CockroachDB的Kafka连接器,所以我决定使用Confluent的jdbc sink连接器,因为CockroachDB支持postgreSQL语法。 我在Kafka Connect上使用的连接字符串如下 这基本上是我在现有工作的Postgres接收器连接器上所做的唯一更改。 不

  • 我使用kafka connect从mongo读取数据并将其写入kafka主题。 我正在使用 mongo kafka 源连接器。 我收到以下错误: 罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压

  • 我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf

  • 我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。