当前位置: 首页 > 知识库问答 >
问题:

用Spring boot和Apache Camel连接Kafka集群时获取SSL异常

单于翰飞
2023-03-14

我的要求是通过SSL与Spring Boot和Apache Camel连接Kafka主题,为此,我编写了以下代码,但我面临一个类似于sun.security.validator.validatoreXception引起的错误:PKIX路径构建失败:sun.security.provider.certPath.SunCertPathBuilderException:无法找到请求目标的有效认证路径

任何人,请帮助我如何解决这个错误。

//in this code i'm configured the SSL
    @Configuration
    public class Testing {
        @Bean
         SSLContextParameters sslContextParameters(){
            KeyStoreParameters store = new KeyStoreParameters();
            store.setResource("kafka.client.truststore.jks");
            store.setPassword("123456");
    
            TrustManagersParameters trust = new TrustManagersParameters();
            trust.setKeyStore(store);
    
            SSLContextParameters parameters = new SSLContextParameters();
            parameters.setTrustManagers(trust);
    
            return parameters;
          }
    
    }

在下面的文件中,我使用sslContextParameters参数调用router

    @Autowired
    SSLContextParameters params;
@Override
    public void configure() throws Exception {
    from("{{timerOnce}}").process(consumerCreate).to(
                "https://xx.xx.xx.xxx/consumers/group-id?sslContextParameters=params");

}

******我使用了另一种通过SSL连接Kafka集群的方法,但不幸的是,它得到了如下的异常*******org.apache.camel.spring.boot.camelspringbootInitializationException:java.io.ioException:无效的密钥存储格式

public Endpoint setupSSLConext(CamelContext camelContext) throws Exception {

        KeyStoreParameters keyStoreParameters = new KeyStoreParameters();
        // Change this path to point to your truststore/keystore as jks files
        keyStoreParameters.setResource("kafka.client.truststore.jks");
        keyStoreParameters.setPassword("123456");

        KeyManagersParameters keyManagersParameters = new KeyManagersParameters();
        keyManagersParameters.setKeyStore(keyStoreParameters);
        keyManagersParameters.setKeyPassword("123456");

        TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
        trustManagersParameters.setKeyStore(keyStoreParameters);

        SSLContextParameters sslContextParameters = new SSLContextParameters();
        sslContextParameters.setKeyManagers(keyManagersParameters);
        sslContextParameters.setTrustManagers(trustManagersParameters);

        HttpComponent httpComponent = camelContext.getComponent("https4", HttpComponent.class);
        httpComponent.setSslContextParameters(sslContextParameters);


        // This is important to make your cert skip CN/Hostname checks
        httpComponent.setX509HostnameVerifier(new X509HostnameVerifier() {
            @Override
            public void verify(String s, SSLSocket sslSocket) throws IOException {

            }

            @Override
            public void verify(String s, X509Certificate x509Certificate) throws SSLException {

            }

            @Override
            public void verify(String s, String[] strings, String[] strings1) throws SSLException {

            }

            @Override
            public boolean verify(String s, SSLSession sslSession) {
                // I don't mind just return true for all or you can add your own logic
                return true;
            }

        });

        return     httpComponent.createEndpoint("https://XX.XX.X.XXX/consumers/");
    }
    public void configure() throws Exception {

        Endpoint createEndpoint = cdcHelper.setupSSLConext(context);

        from("{{timerOnce}}").process(consumerCreate)
                .to(createEndpoint);    // calling kafka consumer 

    }
}

共有1个答案

韦俊英
2023-03-14

您可以按照下面的方法使用Apache Camel和Springboot设置Kafka使用者。

将以下属性添加到应用程序中。properties

# kafka configuration
kafka.topic=iot1
kafka.camelKafkaOptions.groupId=grp1
kafka.camelKafkaOptions.brokers=kafka.localtest:9093
kafka.camelKafkaOptions.consumersCount=10
kafka.camelKafkaOptions.autoOffsetReset=latest
kafka.camelKafkaOptions.autoCommitEnable=false
kafka.camelKafkaOptions.allowManualCommit=true
kafka.camelKafkaOptions.metadataMaxAgeMs=5000
kafka.camelKafkaOptions.securityProtocol=SSL
kafka.camelKafkaOptions.sslEndpointAlgorithm=HTTPS
kafka.camelKafkaOptions..sslKeyPassword=<ssl key password>
kafka.camelKafkaOptions..sslKeystoreLocation=<keystorepath>
kafka.camelKafkaOptions.sslKeystorePassword=<sslkeystore password>
kafka.camelKafkaOptions.sslTruststoreLocation=<truststore path>
kafka.camelKafkaOptions.sslTruststorePassword=<password>

and create a utility method, to construct a kafka url

@Component
public class KafkaUtility {
      public String getKafkaEndpoint(String topicName ){
       StringBuilder urlBuilder = new StringBuilder("kafka:" + topicName);

        if (!getCamelKafkaOptions().isEmpty()) {
            urlBuilder.append("&");
            getCamelKafkaOptions().forEach(
                (key, value) -> {
                    if (StringUtils.isNotBlank(value)) {
                        appendConfig(urlBuilder, key, value);
                    }
                }
            );
        }
        // strip the last "&" symbol
         String kafkaURL = stripLastAnd(urlBuilder.toString());
        return kafkaURL;
   }
}

In your route builder, implement the below

    @Autowired
    private KafkaUtility kafkaUtility;

  from(kafkaUtility.getKafkaEndpoint())
  .process("yourprocessor")
  .to("tourl");
 类似资料:
  • 当我使用带SSL的apache camel连接到Kafka集群时,我面临以下问题,请任何人帮助解决这个问题 javax . net . SSL . SSL handshake异常:sun . security . validator . validator异常:PKIX路径构建失败:sun . security . provider . certpath . suncertpathbuildere

  • 我尝试了kafka-console-consumer.sh和kafka-console-producer.sh,它工作得很好。我能够看到生产者在消费者中发送的消息 1)我已经下载了s3连接器(https://docs.confluent.io/current/connect/kafka-connect-S3/index.html) 2)将文件解压缩到/home/ec2-user/plugins/

  • 代码片段如下所示: 如果有人有决议,请帮忙?

  • 我正在尝试在启用SSL的Kafka集群中注册MySql Debezium连接器。我为此目的使用的卷曲是: Debezium无法创建数据库。历史记录主题,它将失败,并出现以下错误: 美化错误:

  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我目前使用的是Kafka connect集群,它有两个节点,使用的是同一个 当使用curl/connectors时,我可以获得创建的连接器列表,但我看不到有关活动节点的信息,健康检查。。。