当前位置: 首页 > 知识库问答 >
问题:

带ssl的Spring启动kafka,发送消息时出错

章安易
2023-03-14

我正在测试Spring Kafka的示例代码。它适用于PLAINTEXT连接,但不适用于SSL连接。

我已通过成功运行控制台使用者来验证密钥和证书对 kafka 代理有效:

bin/kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:50070
--topic myTopic --consumer.config client-ssl.properties --from-beginning

但是我不能使用Spring Boot(2.0.1.RELEASE)和Spring Kafka,使用相同的密钥和证书发送消息。

2018-04-25 15:20:39.126  INFO 9672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-04-25 15:20:39.126  INFO 9672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
2018-04-25 15:20:39.131  INFO 9672 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
2018-04-25 15:20:39.231  INFO 9672 --- [           main] c.c.m.k.e.producer.ProducerApplication   : Started ProducerApplication in 1.553 seconds (JVM running for 5.029)
2018-04-25 15:20:39.238  INFO 9672 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [xx.xx.xx.xx:50070]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2018-04-25 15:20:39.257  INFO 9672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-04-25 15:20:39.257  INFO 9672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
2018-04-25 15:21:39.265 ERROR 9672 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='foo1' to topic myTopic:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2018-04-25 15:22:39.268 ERROR 9672 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='foo2' to topic myTopic:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2018-04-25 15:23:39.268 ERROR 9672 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='foo3' to topic myTopic:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2018-04-25 15:24:39.273  INFO 9672 --- [           main] c.c.m.k.e.producer.ProducerApplication   : All received
2018-04-25 15:29:39.482 ERROR 9672 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
    at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1362) ~[na:1.8.0_20]
    at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:529) ~[na:1.8.0_20]
    at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1194) ~[na:1.8.0_20]
    at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1166) ~[na:1.8.0_20]
    at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469) ~[na:1.8.0_20]
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:168) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:739) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.close(Selector.java:727) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.maybeCloseOldestConnection(Selector.java:630) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:427) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:137) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
Caused by: java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
    at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:90) ~[na:1.8.0_20]
    at sun.security.validator.Validator.getInstance(Validator.java:179) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.getValidator(X509TrustManagerImpl.java:312) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.checkTrustedInit(X509TrustManagerImpl.java:171) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:239) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) ~[na:1.8.0_20]
    at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1356) ~[na:1.8.0_20]
    at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:156) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker.processLoop(Handshaker.java:925) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:865) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:862) ~[na:1.8.0_20]
    at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1302) ~[na:1.8.0_20]
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:389) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:469) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219) ~[kafka-clients-1.0.1.jar:na]
    ... 8 common frames omitted
Caused by: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
    at java.security.cert.PKIXParameters.setTrustAnchors(PKIXParameters.java:200) ~[na:1.8.0_20]
    at java.security.cert.PKIXParameters.<init>(PKIXParameters.java:120) ~[na:1.8.0_20]
    at java.security.cert.PKIXBuilderParameters.<init>(PKIXBuilderParameters.java:104) ~[na:1.8.0_20]
    at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:88) ~[na:1.8.0_20]
    ... 32 common frames omitted

2018-04-25 15:30:09.284 ERROR 9672 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: 

java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
    at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1362) ~[na:1.8.0_20]
    at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:529) ~[na:1.8.0_20]
    at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1194) ~[na:1.8.0_20]
    at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1166) ~[na:1.8.0_20]
    at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469) ~[na:1.8.0_20]
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:168) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:739) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.close(Selector.java:727) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.maybeCloseOldestConnection(Selector.java:630) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:427) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
Caused by: java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
    at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:90) ~[na:1.8.0_20]
    at sun.security.validator.Validator.getInstance(Validator.java:179) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.getValidator(X509TrustManagerImpl.java:312) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.checkTrustedInit(X509TrustManagerImpl.java:171) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:239) ~[na:1.8.0_20]
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) ~[na:1.8.0_20]
    at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1356) ~[na:1.8.0_20]
    at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:156) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker.processLoop(Handshaker.java:925) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:865) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:862) ~[na:1.8.0_20]
    at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_20]
    at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1302) ~[na:1.8.0_20]
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:389) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:469) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412) ~[kafka-clients-1.0.1.jar:na]
    ... 4 common frames omitted
Caused by: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
    at java.security.cert.PKIXParameters.setTrustAnchors(PKIXParameters.java:200) ~[na:1.8.0_20]
    at java.security.cert.PKIXParameters.<init>(PKIXParameters.java:120) ~[na:1.8.0_20]
    at java.security.cert.PKIXBuilderParameters.<init>(PKIXBuilderParameters.java:104) ~[na:1.8.0_20]
    at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:88) ~[na:1.8.0_20]
    ... 23 common frames omitted

应用程序.属性

spring.kafka.bootstrap-servers=xx.xx.xx.xx:50070
spring.kafka.consumer.group-id=aaa
spring.kafka.properties.security.protocol=SSL
spring.kafka.consumer.ssl.keystore-location=classpath:client.keystore.jks
spring.kafka.consumer.ssl.keystore-password=1234
spring.kafka.consumer.ssl.key-password=1234

有人成功用SSL配置Spring Boot 2.0 Spring Kafka吗?

共有1个答案

何玺
2023-03-14
ssl.keystore.location = null
ssl.keystore.password = null

您只为消费者配置SSL…

spring.kafka.consumer.ssl.keystore-location=classpath:client.keystore.jks
spring.kafka.consumer.ssl.keystore-password=1234
spring.kafka.consumer.ssl.key-password=1234

使用< code>spring.kafka.ssl。...适用于生产者和消费者共有的属性。如果没有消费者,使用< code>spring.kafka.producer.ssl。...。

 类似资料:
  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0

  • 我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

  • spring-boot消费者微服务无法在kafka重新启动后向主题发送消息。 > 消费者和生产者(spring boot微服务)位于同一覆盖网络“Net-Broker”上,因此它们使用服务名“kafka:9092”访问kafka。 一开始一切都很顺利。 然后kafka仅被重新启动,在此之后,消费者不能再从kafka主题发送消息。 由于docker-compose.yml中的一个小更改(例如max_