当前位置: 首页 > 知识库问答 >
问题:

基于负载的Spring云流内容路由

卫弘义
2023-03-14

我们正在使用Spring Cloud Stream v2。2带有Kafka和Avro(本机编码器/解码器)。我们正在尝试根据负载的条件使用基于内容的路由。据我所知,根据Spring Cloud Stream文档,基于内容的路由只能在标头上实现,因为负载到达条件时没有经过类型转换过程。因此,除非条件基于字节格式,否则它将无法按预期工作。但是,我知道,当在本机模式下使用Avro时,会跳过消息头,并且不会处理任何类型协商。因此,我不确定基于内容的路由是否能像预期的那样在有效负载上工作。

@StreamListener(target = Channels.INPUT, condition =
      "payload.context['type']=='one' or"
          + " payload.context['type']=='two'")
  public void doStuff(TypeOneAndTwoData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}


@StreamListener(target = Channels.INPUT, condition =
      "payload.context['type']=='three' or"
          + " payload.context['type']=='four'")
  public void doOtherStuff(TypeThreeAndFourData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}

根据我的日志记录,我可以看到偶尔会触发doStuff,有时会触发doOtherStuff。然而,似乎大多数情况下,它们都没有被触发,消息被跳过。根据输入数据,我确信上下文。type只能有4个值“one”、“two”、“three”和“four”,因此根据输入,不可能期望有其他值,但我经常可以在日志中看到以下条目:

Cannot find a @StreamListener matching for message with id: null

我有几个问题:

  • 有效负载的条件在消息以本机Avro格式反序列化为相应的POJO类后是否适用?
  • 为什么有时条件有效,有时则无效?id: null是否意味着什么?
  • 从线程的角度来看,基于内容的路由是如何工作的?当我们有两个具有单独条件的StreamListner时,多线程运行还是它们工作在单线程?在这种情况下如何管理至少一次保证消息传递?条件应该是相互排斥的吗?

共有1个答案

姚永年
2023-03-14

>

id:是不幸的;与Kafka一起分享信息。默认情况下,标题['id']为空-它没有任何意义,因此此日志消息没有多大用处。

问题是没有一个条件与转换后的有效载荷匹配

您可以在DispatchingStreamListenerMessageHandler中设置断点。HandlerRequestMessage()来找出问题所在。

编辑

我没有回答你的第三个问题。

调用是单线程的;否,一条消息可以匹配多个条件;只有在没有条件匹配的情况下才能获得该日志。请参阅我上面引用的方法。

如果多个条件匹配,侦听器引发的任何异常都将停止处理(并调用重试/DLQ处理等)。

 类似资料:
  • 我正在尝试在最新版本的 Spring Cloud 流中使用基于内容的路由。根据这份文件 - 这是我用StreamListener编写的代码 通过使用该条件,可以将消息路由到两个不同的函数。 我正试图用如下的功能接口方法来消费消息。 如何在函数中实现类似的基于内容的路由?蒂亚。 其他细节- Spring引导版本 - 2.3.12.发布 Spring云版 - Hoxton.SR11

  • 我需要创建一个反向代理,接收传入的请求,并基于请求正文的内容,将请求路由到特定的URI。 这是一个路由微服务,它类似于反向代理,根据来自每个请求主体的一些信息进行路由。这意味着对于每个请求,我需要解析请求正文并获得“username”字段,然后建立JDBC连接以从数据库中获取附加信息。根据数据库中的信息,它最终将请求重定向到正确的URI。 从我现在所拥有的,我有2个阻止方法。第一个是请求主体的解析

  • 我使用的是confluent,所以我已经根据confluent文档使用Connect.Properties中的confluent-hub安装了dibezium连接器 我需要使用io.debezium.transforms.contentbasedrouter https://debezium.io/documentation/reference/1.3/configuration/content-

  • 我有一个解压缩和文件的要求,并处理它的内容。在zip文件中,可以有两种类型的文件个人或公司。可以通过文件名区分的。在处理完所有文件后,它应该调用另一个程序模块,并将处理后的文件存档在不同的位置。希望使用Spring集成相同。我试图通过下面的代码来实现这一点,但它在基于文件名的路由时产生了问题。我使用的是JDK 8,Spring 5 例外 下面是整个代码段

  • 我们期待着迁移到Spring云负载平衡器以取代Ribbon。我们使用Eureka进行服务发现和注册。 它看起来像是从Spring云Netflix 3开始的。x、 Eureka客户端模块中的EurekaribbonClient配置已被删除。 我们使用deploymentContextBasedVipAddress配置将内部主机名映射到Eureka中的注册vip地址。 配置与此类似: 我们这样做是因为