当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream功能路由器输出尝试绑定到Kafka主题

田谦
2023-03-14

我正在尝试迁移到Spring Cloud Stream的新函数式编程模型,替换如下的条件StreamListener注释

@StreamListener("app-input", condition = "headers['eventName']=='Funded'")

用类似的东西

@Bean
fun router() = MessageRoutingCallback {
    when (it.headers["eventName"]) {
        "Funded" -> "funded"
        else -> "ignored"
    }
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
    ...
}

@Bean
fun ignored() = Consumer { message: Message<*> ->
    ...
}

具有将通道链接到主题的关联属性

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic

我需要这种间接级别,因为有多种Avro消息类型都到达MyTopic,需要反序列化并以不同的方式路由

这一切都很顺利,我可以按预期使用和路由消息。然而,以这种方式使用functionRouter会产生意想不到的副作用,即当没有可用的主题时,它也会尝试将functionRouter-out-0绑定到Kafka,因此应用程序每隔30秒就会尝试附加到代理上名为“functionRouter-out-0”的主题,并出现授权错误,正如您所料。

2021-05-06 12:57:55.654 WARN  [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN  [screening]                            org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-3] Error while fetching metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_FAILED}
2021-05-06 12:57:56.199 ERROR [screening]                            org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Topic authorization failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]

因此,问题是a)如何阻止functionRouter-out-0通道尝试绑定到Kafka,或者b)如果不需要中间通道,如何实现这一点?

Spring Cloud Stream事件路由功能自动创建新主题类似,但从未收到答案。

共有1个答案

张通
2023-03-14

我相信这是一个错误。如果你想遵循它,我打开了一个问题:

https://github.com/spring-cloud/spring-cloud-stream/issues/2168

就像工作一样简单地指向同一个目的地

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839

因为代理都是消费者,所以我们永远不会发送任何东西。

 类似资料:
  • 可以使用路由绑定简化URL或者路由规则的定义,绑定支持如下方式: 绑定到模块/控制器/操作 把当前的URL绑定到模块/控制器/操作,最多支持绑定到操作级别,例如在路由配置文件中添加: // 绑定当前的URL到 index模块 Route::bind('index'); // 绑定当前的URL到 index模块的blog控制器 Route::bind('index/blog'); // 绑定当前的U

  • 我想创建OpenAPI UI规范,在静态编程语言上使用Spring WebFlux功能路由器。Spring Boot v.2.5.2,springdoc-openapi v.1.5.9。这是我的路由器类: 我想创建一个 GET 查询,如 其中1是我想要的用户ID。 但我的OpenAPI的UI仍然尝试使用GET参数创建查询 这显然会导致错误400。很高兴知道我的@RouterAction应该如何正确

  • 路由功能浅谈 在本章 选择恰当的分片数量和分片副本数量 一节中,已经提到使用路由功能可以只在一个分片上执行查询命令,作为提高系统吞吐量的一种解决方案。接下来作者将详细地介绍这一功能。 分片和分片中数据 通常情况下,ElasticSearch是如何把数据分发到各个分片中,哪个分片存储哪一类的文档等细节并不重要。因为查询时,将查询命令分发到每个分片就OK了。唯一的关键点在于算法,将数据均等地分配到各个

  • 问题内容: 我在bash脚本中具有简单的功能,我想将stdout作为输入传递给它。 我想以这种方式使用它。 当然,我使用了冗余函数echo和printf来简化问题,但是您明白了。现在,我收到一个“未找到”错误,我认为这意味着我的参数定界是错误的(“ $ 1”部分)。有什么建议么? 最初,jc_hms函数的用法如下: 但我想将结果存储在变量中,以便在将其发送到串行端口之前先进行进一步处理。 编辑:所

  • 我使用的反应路由器dom版本5.0.1一个简单的反应应用程序,我使用了汇总捆绑,这是我的路由器组件 问题是,它只在localhost:8000/处显示主路由,但当我尝试访问localhost:8000/hello或localhost:8000/登录时,会出现此错误 这是我的rollup.config };