当前位置: 首页 > 知识库问答 >
问题:

是否可以使用 Spring-Cloud-Streams Kafka-Streams 创建一个多绑定器绑定,以便从集群 A 流式传输并生成到集群 B

公冶宏深
2023-03-14

我想创建一个带有Spring-Cloud-Streams的Kafka-Streams应用程序,该应用程序集成了2个不同的Kafka集群/设置。我尝试使用文档中提到的多绑定器配置来实现它,类似于以下示例:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples

给定如下简单函数:

    @Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
    return input -> input
            .filter(new AnalyticsPredicate())
            .map(new AnalyticsToUpdateEventMapper());
}

在配置中,我试图将这些绑定到不同的活页夹。

spring.cloud:
  stream:
    bindings:
      analyticsEventProcessor-in-0:
        destination: analytics-events
        binder: cluster1-kstream
      analyticsEventProcessor-out-0:
        destination: update-events
        binder: cluster2-kstream

binders:
  cluster1-kstream:
    type: kstream
    environment:
      spring:
        cloud:
          stream:
            kafka:
              binder:
                brokers: <url cluster1>:9093
                configuration:
                  security.protocol: SSL
                  schema.registry.url: <schema-registry-url-cluster1>
                  schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                  schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                  schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                  schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                  ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                  ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                  ssl.truststore.type: JKS
                  ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                  ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                  ssl.keystore.type: JKS
                  ssl.enabled.protocols: TLSv1.2
              streams:
                binder:
                  brokers: <url cluster1>:9093
                  configuration:
                    security.protocol: SSL
                    schema.registry.url: <schema-registry-url-cluster1>
                    schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                    schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                    schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                    schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD} 
                    ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                    ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                    ssl.truststore.type: JKS
                    ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                    ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                    ssl.keystore.type: JKS
                    ssl.enabled.protocols: TLSv1.2
  cluster2-kstream:
    type: kstream
    environment:
      spring:
        cloud:
          stream:
            kafka:
              binder:
                brokers: <url cluster2>:9093
                configuration:
                  security.protocol: SSL
                  schema.registry.url: <schema-registry-url-cluster2>
                  schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                  schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                  schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                  schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                  ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                  ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                  ssl.truststore.type: JKS
                  ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                  ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                  ssl.keystore.type: JKS
                  ssl.enabled.protocols: TLSv1.2
              streams:
                binder:
                  brokers: <url cluster2>:9093
                  configuration:
                    security.protocol: SSL
                    schema.registry.url: <schema-registry-url-cluster2>
                    schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                    schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                    schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                    schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD} 
                    ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                    ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                    ssl.truststore.type: JKS
                    ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                    ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                    ssl.keystore.type: JKS
                    ssl.enabled.protocols: TLSv1.2

我首先尝试在单个集群中完全运行该应用程序,效果很好。当我运行这个时,我总是收到一个错误:

2022-08-10 15:28:42.892  WARN 1 --- [-StreamThread-2] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=<clientid>-StreamThread-2-consumer, groupId=<group-id>] Error while fetching metadata with correlation id 2 : {analytics-events=TOPIC_AUTHORIZATION_FAILED}
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata        : [Consumer clientId=<client-id>, groupId=<group-id>] Topic authorization failed for topics [analytics-events]
2022-08-10 15:28:42.893  INFO 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata        : [Consumer clientId=<client-id>, groupId=<group-id>] Cluster ID: <cluster-id>
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] c.s.a.a.e.UncaughtExceptionHandler       : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.streams.KafkaStreams    : stream-client [<client-id>] Replacing thread in the streams uncaught exception handler

org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642) ~[kafka-streams-3.1.1.jar!/:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) ~[kafka-streams-3.1.1.jar!/:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]

我验证了kafka客户端证书,它们应该是正确的。我用keytool查看了它们,密码env也设置正确。consumerConfig还使用正确的代理URL。

是否可以在KStream函数中使用带有多绑定器的不同kafka集群作为流的输入和输出,这是可能的还是它仅适用于类型kafka绑定器?

共有1个答案

郦兴德
2023-03-14

在Kafka Streams中,您不能在一个应用程序中连接到两个不同的集群。这意味着当使用Spring Cloud Stream函数时,您不能在入站上从一个集群接收数据,并在出站上向另一个集群写入数据。详见本SO [thread][1]。

作为一种变通方法,您可能可以在Kafka Streams函数中从同一个集群接收数据并向其写入数据。然后,使用基于Kafka绑定器的常规函数,简单地将输出主题桥接到第二个集群。在常规函数(非Kafka Streams)中,它可以从多个集群消费并发布到多个集群。

@Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
    return input -> input
            .filter(new AnalyticsPredicate())
            .map(new AnalyticsToUpdateEventMapper());
}

此函数需要接收并写入同一个集群。然后你可以有另一个函数,如下所示。

@Bean
public Function<?, ?> bridgeFunction() {
    ....
}

对于此函数,输入为cluster-1,输出为cluster-2。

使用此解决方法时,请确保将常规Kafka绑定器也作为依赖项-sping-Cloud d-Stre-Binder-kafka

请记住,这种方法有缺点,例如增加额外的主题开销、延迟等。然而,这是此用例的潜在解决方法。有关更多选项,请参阅我上面提到的SO线程



  [1]: https://stackoverflow.com/questions/45847690/how-to-connect-to-multiple-clusters-in-a-single-kafka-streams-application
 类似资料:
  • 集群角色绑定定义了集群角色和服务账户的绑定关系。 集群角色绑定定义了集群角色和服务账户的绑定关系,从而控制服务账户的操作权限。多集群资源的集群角色绑定支持绑定到集群上,批量在集群中创建相同的集群角色绑定。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “容器/多集群资源/集群角色绑定” 菜单项,进入集群角色绑定页面。 新建集群角色绑定 该功能用于新建多集群的集群角色绑定,在新建多集

  • 集群角色绑定定义了集群角色和服务账户的绑定关系。 集群角色绑定定义了集群角色和服务账户的绑定关系,从而控制服务账户的操作权限。 集群角色绑定的详细介绍请参考Kubernetes官方文档-RBAC。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “容器/集群/集群角色绑定” 菜单项,进入集群角色绑定页面。 查看集群角色绑定 该功能用于基于集群、命名空间筛选集群角色绑定信息。 在集群角

  • 我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个集群(远程)- 所以假设WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的PC上。我也有一个Kafka-Instance在我的本地机器上运行。 现在我想让WordCount演示在包含应该计算单词的句子的Topic(“远程”)上运行。 然而,计数应该写入我本地系统上的Topic而不是“远程”T

  • 我想通过代理服务器连接到Azure Service Bus消息队列。我在Spring应用程序中使用流绑定库 波姆。xml: 应用yml 我试图通过命令行参数提供HTTP和SOCKS代理设置,但这似乎不起作用。是否可以为“examplehost.servicebus.windows.net”的连接提供SOCKS或HTTP代理?

  • 根据这份文件: 然而,与Redis(群集模式禁用)群集不同,当前,一旦创建了Redis(群集模式启用)群集,其结构就不能以任何方式改变;不能添加或删除节点或碎片。如果需要添加或删除节点,或更改节点类型,则必须重新创建集群。(来源) 然而,本文档似乎描述了向集群添加碎片的过程: 通过使用Amazon ElastiCache for Redis 3.2.10版本的在线重新划分和分片重新平衡,您可以动态

  • 我打算将StateRestoreListener与Spring Cloud Kafka Streams绑定器一起使用。我需要监视应用程序的容错状态存储的恢复进度。汇流中有一个例子https://docs.confluent.io/current/streams/monitoring.html#streams-监控运行时状态。 为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.