我想创建一个带有Spring-Cloud-Streams的Kafka-Streams应用程序,该应用程序集成了2个不同的Kafka集群/设置。我尝试使用文档中提到的多绑定器配置来实现它,类似于以下示例:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples
给定如下简单函数:
@Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
return input -> input
.filter(new AnalyticsPredicate())
.map(new AnalyticsToUpdateEventMapper());
}
在配置中,我试图将这些绑定到不同的活页夹。
spring.cloud:
stream:
bindings:
analyticsEventProcessor-in-0:
destination: analytics-events
binder: cluster1-kstream
analyticsEventProcessor-out-0:
destination: update-events
binder: cluster2-kstream
binders:
cluster1-kstream:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <url cluster1>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster1>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
streams:
binder:
brokers: <url cluster1>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster1>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
cluster2-kstream:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <url cluster2>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster2>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
streams:
binder:
brokers: <url cluster2>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster2>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
我首先尝试在单个集群中完全运行该应用程序,效果很好。当我运行这个时,我总是收到一个错误:
2022-08-10 15:28:42.892 WARN 1 --- [-StreamThread-2] org.apache.kafka.clients.NetworkClient : [Consumer clientId=<clientid>-StreamThread-2-consumer, groupId=<group-id>] Error while fetching metadata with correlation id 2 : {analytics-events=TOPIC_AUTHORIZATION_FAILED}
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata : [Consumer clientId=<client-id>, groupId=<group-id>] Topic authorization failed for topics [analytics-events]
2022-08-10 15:28:42.893 INFO 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata : [Consumer clientId=<client-id>, groupId=<group-id>] Cluster ID: <cluster-id>
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] c.s.a.a.e.UncaughtExceptionHandler : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.streams.KafkaStreams : stream-client [<client-id>] Replacing thread in the streams uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642) ~[kafka-streams-3.1.1.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) ~[kafka-streams-3.1.1.jar!/:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
我验证了kafka客户端证书,它们应该是正确的。我用keytool查看了它们,密码env也设置正确。consumerConfig还使用正确的代理URL。
是否可以在KStream函数中使用带有多绑定器的不同kafka集群作为流的输入和输出,这是可能的还是它仅适用于类型kafka绑定器?
在Kafka Streams中,您不能在一个应用程序中连接到两个不同的集群。这意味着当使用Spring Cloud Stream函数时,您不能在入站上从一个集群接收数据,并在出站上向另一个集群写入数据。详见本SO [thread][1]。
作为一种变通方法,您可能可以在Kafka Streams函数中从同一个集群接收数据并向其写入数据。然后,使用基于Kafka绑定器的常规函数,简单地将输出主题桥接到第二个集群。在常规函数(非Kafka Streams)中,它可以从多个集群消费并发布到多个集群。
@Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
return input -> input
.filter(new AnalyticsPredicate())
.map(new AnalyticsToUpdateEventMapper());
}
此函数需要接收并写入同一个集群。然后你可以有另一个函数,如下所示。
@Bean
public Function<?, ?> bridgeFunction() {
....
}
对于此函数,输入为cluster-1,输出为cluster-2。
使用此解决方法时,请确保将常规Kafka绑定器也作为依赖项-sping-Cloud d-Stre-Binder-kafka
。
请记住,这种方法有缺点,例如增加额外的主题开销、延迟等。然而,这是此用例的潜在解决方法。有关更多选项,请参阅我上面提到的SO线程。
[1]: https://stackoverflow.com/questions/45847690/how-to-connect-to-multiple-clusters-in-a-single-kafka-streams-application
集群角色绑定定义了集群角色和服务账户的绑定关系。 集群角色绑定定义了集群角色和服务账户的绑定关系,从而控制服务账户的操作权限。多集群资源的集群角色绑定支持绑定到集群上,批量在集群中创建相同的集群角色绑定。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “容器/多集群资源/集群角色绑定” 菜单项,进入集群角色绑定页面。 新建集群角色绑定 该功能用于新建多集群的集群角色绑定,在新建多集
集群角色绑定定义了集群角色和服务账户的绑定关系。 集群角色绑定定义了集群角色和服务账户的绑定关系,从而控制服务账户的操作权限。 集群角色绑定的详细介绍请参考Kubernetes官方文档-RBAC。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “容器/集群/集群角色绑定” 菜单项,进入集群角色绑定页面。 查看集群角色绑定 该功能用于基于集群、命名空间筛选集群角色绑定信息。 在集群角
我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个集群(远程)- 所以假设WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的PC上。我也有一个Kafka-Instance在我的本地机器上运行。 现在我想让WordCount演示在包含应该计算单词的句子的Topic(“远程”)上运行。 然而,计数应该写入我本地系统上的Topic而不是“远程”T
我想通过代理服务器连接到Azure Service Bus消息队列。我在Spring应用程序中使用流绑定库 波姆。xml: 应用yml 我试图通过命令行参数提供HTTP和SOCKS代理设置,但这似乎不起作用。是否可以为“examplehost.servicebus.windows.net”的连接提供SOCKS或HTTP代理?
根据这份文件: 然而,与Redis(群集模式禁用)群集不同,当前,一旦创建了Redis(群集模式启用)群集,其结构就不能以任何方式改变;不能添加或删除节点或碎片。如果需要添加或删除节点,或更改节点类型,则必须重新创建集群。(来源) 然而,本文档似乎描述了向集群添加碎片的过程: 通过使用Amazon ElastiCache for Redis 3.2.10版本的在线重新划分和分片重新平衡,您可以动态
我打算将StateRestoreListener与Spring Cloud Kafka Streams绑定器一起使用。我需要监视应用程序的容错状态存储的恢复进度。汇流中有一个例子https://docs.confluent.io/current/streams/monitoring.html#streams-监控运行时状态。 为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.