我正在努力为Kafka-Streams正确配置Spring Cloud Stream,以便使用带有信任存储和密钥存储的SSL。
在我的应用程序中,我有多个流正在运行,所有流的SSL配置应该是相同的。
stream2:Topic2>Topic4 Topic3
stream3:Topic4>Topic5
我使用最新的Spring-Cloud Stream框架和Kafka-Streams以及Avro模型。我可以配置模式注册表。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
stream1-out-0:
destination: topic2
stream2-in-0:
destination: topic2
stream2-in-1:
destination: topic3
stream2-out-0:
destination: topic4
stream3-in-0:
destination: topic4
stream3-out-0:
destination: topic5
kafka:
binder:
brokers: kafkabrokerurl.com:9092
configuration: # not recognized at all
security.protocol: SSL
ssl.truststore.location: /mnt/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
bindings:
default:
consumer:
resetOffsets: false
startOffset: latest
stream1-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream1-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-1:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
materializedAs: sbinfo-outage-mapping-store
stream2-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
streams:
binder:
configuration:
schema.registry.url: https://schemaregistryurl.com # this works
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [kafkabrokerurl.com:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
我在这里发现了一个老问题,并尝试了这种方法,但结果是相同的:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129
configuration: # not recognized at all
"[security.protocol]": SSL
"[ssl.truststore.location]": /mnt/truststore.jks
"[ssl.truststore.type]": JKS
"[ssl.keystore.location]": /mnt/keystore.jks
"[ssl.keystore.type]": JKS
"[ssl.enabled.protocols]": TLSv1.2
我了解到,当使用多个绑定器时,配置不起作用,所以我也尝试了定义绑定器名称的方法,但它抱怨说它不能识别它。
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
binder: ssl
stream1-out-0:
destination: topic2
binder: ssl
stream2-in-0:
destination: topic2
binder: ssl
stream2-in-1:
destination: topic3
binder: ssl
stream2-out-0:
destination: topic4
binder: ssl
stream3-in-0:
destination: topic4
binder: ssl
stream3-out-0:
destination: topic5
binder: ssl
binders:
ssl:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers:
configuration:
security.protocol: SSL
ssl.truststore.location: /mnt/secrets/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
错误:
2021-07-15 17:11:14.634 ERROR 5216 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream
属性名称中缺少streams
元素-您正在配置Kafka MessageChannel绑定器。
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
security:
protocol: SSL
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的
我想使用一个camel组件,它提供了使用和生成RESTful资源的能力。 对于这个例子,我想使用camel restlet组件。restlet组件一切正常,我已经使用REST DSL成功地实现了restlet consumer。然而,我有几个问题: 问题 1) 将restlet启用为异步是否安全?我读过restlet async可能会导致一些问题。这仍然正确吗?如何提高服务绩效?我应该改用码头吗?
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职