当前位置: 首页 > 知识库问答 >
问题:

Spring应用程序未使用SSL连接到Kafka

蒋星雨
2023-03-14

我有一个Spring boot应用程序,里面有一个非常简单的Kafka制作人。如果我在没有加密的情况下连接到Kafka集群,一切都会很好。但如果我尝试使用SSL连接到kafka群集,就会超时。在producer中是否需要其他配置,或者需要定义其他属性,以允许spring正确使用所有配置?

我有以下属性设置:

spring.kafka.producer.bootstrap-servers=broker1.kafka.poc.com:9093,broker3.kafka.poc.com:9093,broker4.kafka.poc.com:9093,broker5.kafka.poc.com:9093
spring.kafka.ssl.key-store-type=jks
spring.kafka.ssl.trust-store-location=file:/home/ec2-user/truststore.jks
spring.kafka.ssl.trust-store-password=test1234
spring.kafka.ssl.key-store-location=file:/home/ec2-user/keystore.jks
spring.kafka.ssl.key-store-password=test1234
logging.level.org.apache.kafka=debug
server.ssl.key-password=test1234
spring.kafka.ssl.key-password=test1234
spring.kafka.producer.client-id=sym
spring.kafka.admin.ssl.protocol=ssl

应用程序启动时,将以下结果打印为ProducerConfig:

o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [broker1.kafka.allypoc.com:9093, broker3.kafka.allypoc.com:9093, broker4.kafka.allypoc.com:9093, broker5.kafka.allypoc.com:9093]
buffer.memory = 33554432
client.dns.lookup = default
client.id = sym
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = /home/ec2-user/keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = jks
ssl.protocol = ssl
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /home/ec2-user/truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

我的制作人非常简单:

@Service
public class Producer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public Producer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    void sendMessage(String topic, String message) {
        this.kafkaTemplate.send(topic, message);
    }

    void sendMessage(String topic, String key, String message) {
        this.kafkaTemplate.send(topic, key, message);
    }
}

使用SSL连接到Kafka时会出现TimeoutException,表示60000毫秒后元数据中不存在主题符号。如果打开调试日志,我会反复收到此消息,并循环所有代理。

2019-05-29 20:10:25.768 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.clients.NetworkClient   : [Producer clientId=sym] Completed connection to node -4. Fetching API versions.
2019-05-29 20:10:25.768 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.clients.NetworkClient   : [Producer clientId=sym] Initiating API versions fetch from node -4.
2019-05-29 20:10:25.768 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.clients.NetworkClient   : [Producer clientId=sym] Initialize connection to node 10.25.77.13:9093 (id: -3 rack: null) for sending metadata request
2019-05-29 20:10:25.768 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.clients.NetworkClient   : [Producer clientId=sym] Initiating connection to node 10.25.77.13:9093 (id: -3 rack: null) using address /10.25.77.13
2019-05-29 20:10:25.994 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--3.bytes-sent
2019-05-29 20:10:25.996 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--3.bytes-received
2019-05-29 20:10:25.997 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--3.latency
2019-05-29 20:10:25.998 DEBUG 1381 --- [rk-thread | sym] o.apache.kafka.common.network.Selector   : [Producer clientId=sym] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -3
2019-05-29 20:10:26.107 DEBUG 1381 --- [rk-thread | sym] o.apache.kafka.common.network.Selector   : [Producer clientId=sym] Connection with /10.25.75.151 disconnected

java.io.EOFException: null
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:467) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:311) ~[kafka-clients-2.1.1.jar!/:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) ~[kafka-clients-2.1.1.jar!/:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]

2019-05-29 20:10:26.108 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.clients.NetworkClient   : [Producer clientId=sym] Node -1 disconnected.
2019-05-29 20:10:26.110 DEBUG 1381 --- [rk-thread | sym] org.apache.kafka.clients.NetworkClient   : [Producer clientId=sym] Completed connection to node -3. Fetching API versions.

共有1个答案

司空凌
2023-03-14

在生产者配置中security.protocol应该设置为SSL。您还可以尝试设置ssl.endpoint.identification.algirithm="以禁用证书的主机名验证,以防出现问题。除此之外,看看Kafka经纪人配置会很有用。

 类似资料:
  • 我正在使用Firebase GMS服务开发android应用程序。 我试着按照他们的教程去做,我按照他们说的做了项目,下载了json文件。 但当我尝试将我的应用程序连接到Firebase时。他们告诉我错误 无法解析Android应用程序模块的Gradle配置。解析gardle构建issuse和/或重新同步。 然而,我认为我的等级脚本是完美的(我会把它们写在页面的底部) 这是我的等级构建控制台 这是

  • 我有一个Spring Boot应用程序(版本2.1.1),使用Postgresql 9.6作为数据库。我必须使用sslmode=verify ca通过SSL连接到db。到目前为止,我所做的是在申请表中设置。属性文件属性 有没有办法在其他一些Spring属性中指定ssl属性而不是在连接url中? 此外,还可以为证书指定相对路径,而不是使用绝对路径?

  • 我有简单的Spring启动应用程序和具有工作SSL连接的Kafka(其他应用程序,不是Spring启动,已成功连接)。我无法访问Kafka经纪人的属性。我的应用是Kafka的客户端。这个应用程序在库伯内特斯内部的容器中运行。我的Spring启动可以访问密钥库.p12,ca-cert,Kafka佩姆,Kafka.key文件(它位于容器内的目录中)。 在配置中我使用 每次我收到错误 我尝试了不同的变化

  • 我试图将我的spring应用程序连接到debezium/kafka:0.10,但没有成功。最初应用程序使用Wurstmeister/kafka图像,但现在我引入了debezium的一个新功能,并且希望两者只使用一个kafka图像。我不能把应用程序连接到debezium kafka或debezium连接到wurstmeister kafka。有人知道这两个图像是否都可以使用一个图像?欢迎任何想法:)

  • 我的Spring Boot应用程序与Spring云配置服务器连接以获取外部化属性,但突然没有与配置服务器连接/对话。我做了很多改变,不确定哪一个会导致这个问题。 > 我验证了我的Spring Boot应用程序中的bootstrap.yml文件,以下配置存在,并且Spring云配置服务器正在该位置运行 spring:application:name:abc cloud:config:uri:http