当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream能否将消息发送到与StreamBridge位于同一应用程序中的其他messageChannel?

锺离晗昱
2023-03-14

目的:

>

  • 我想测试文档https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_spring_cloud_stream_sendto_destination上的spring.cloud.stream.sendto.destination功能

    然后我不想用函数Bean的例子来测试它,因为我想用http发送消息,但函数Bean需要外部代理公开的显式目的地。

    我知道我可以通过“StreamBridge.send('dynamicDestinationName')”来实现它。但我真的想测试“sendto.destination”是否能达到同样的效果。

    正在尝试游行:

    1. 控制器超文本传输协议endpoint就绪。
        @GetMapping("/test/dynamic/destination/{destination}")
        @ResponseStatus(HttpStatus.ACCEPTED)
        public void dynamicDestination(@PathVariable("destination") String destination) {
            log.info("route to dynamic[{}]", destination);
            // First send it to the Binding of Stream inside the stream first, and then send it to the Broker end Destination according to the Headers' consumer
            streamBridge.send("dynamicDestFunc-in-0", destination);
        }
    
        @Bean
        public Function<String, Message<String>> dynamicDestFunc() {
            return value -> MessageBuilder.withPayload(value + "_payload").setHeader("spring.cloud.stream.sendto.destination", value).build();
        }
    
    spring:
      cloud:
        stream:
          kafka:
            binder:
              brokers: xx.xx.xx.xx:9092
          bindings:
            functionRouter-in-0:
              destination: O2_TEST_TOPIC
              content-type: application/json
        function:
          definition: functionRouter;dynamicDestFunc
    

    StreamBridge#resolveDestination时引发NullPointerException

    最后,Spring Cloud Stream可以将消息发送到与StreamBridge在同一应用程序中的其他消息通道吗?

    消息通道可以在没有绑定器/外部代理的情况下连接到其他消息通道吗?

    像channelA输出绑定-

    对于另一个问题:对使用KafkaExtendedBindingProperties和KafkaBinderConfigurationProperties感到困惑

    这是我的申请表。yml代表我的想法

    
    spring:
      cloud:
        stream:
          kafka:
    #        default:
    #          consumer:
    #            startOffset: latest
    #            resetOffsets: true
    #            autoCommitOffset: true
            binder:
              brokers: xx.xx.xx.xx:9092
            bindings:
              testAutoCommit-in-0:
                consumer:
                  startOffset: latest
                  resetOffsets: true
                  autoCommitOffset: true
                  configuration: { request.timeout.ms: "600000" }
          bindings:
            testAutoCommit-in-0:
              destination: O2_TEST_TOPIC
              group: testAutoCommit_GROUP
        function:
          definition: testAutoCommit
    
  • 共有2个答案

    后焕
    2023-03-14

    @Oleg zhurakousky

    首先,下面提到的SCS是指“Spring云流”。

    感谢您的回答,我想描述一下我对您的回答的理解,以帮助其他人快速理解绑定和活页夹属性。

    正如作者"Oleg zhurakousky"所说,绑定和粘合剂是SCS中两个不同的组成部分。

    活页夹可以帮助我们像“路由器”一样连接到互联网。

    绑定可以帮助我们像“浏览器”一样连接到互联网中的网站

    所以绑定器配置定义了代理地址和端口(路由器表的ip地址)。

    此外,绑定配置定义了更具体的地址(在SCS中称为目标),就像(浏览器上的超文本传输协议url)

    但是当我努力描述kafka配置和SCS中的配置(Binder/Bding)之间的清晰映射时,我仍然感到困惑。

    我尝试如下描述映射:

    1. 引导。服务器-
    final Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    // build KafkaProducer
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties)
    

    所以我认为映射是生产者属性参数-

    主题-

    enable.auto.commit-

    但另一种解释也是正确的。它是KafkaExtendedBindingProperties#绑定中的绑定属性

    spring:
      cloud:
        stream:
          kafka:
            bindings:
              testAutoCommit-in-0:
                consumer:
                  startOffset: latest
                  resetOffsets: true
                  autoCommitOffset: true
                  configuration:
                    request.timeout.ms: 60000
    

    为了理解活页夹和bindingProperties之间的关系,我甚至列出了以下列表:

    >

  • 绑定属性

    扩展绑定属性(获取扩展消费者属性(绑定名称),获取扩展生产者属性(绑定名称))

    抽象扩展绑定属性

    BinderSpecificPropertiesProvider(getProducer()和getConsumer())=

    粘结剂属性

    产品属性

    消费者财产

    活页夹Kafka核心:(Kafka活页夹)实现包下的配置类,定义了Kafka专属配置)

    • KafkaBinderConfigurationProperties("spring.cloud.stream.kafka.binder")
    • Kafka消费者属性
    • Kafka生产者属性
    • KafkaTopicProperties
    • Kafka绑定属性
    • KafkaExtendedBindingProperties("spring.cloud.stream.kafka")

    但我最终还是放弃了!

    我想我只需要找出SCS中为我们提供的configurationProperties,而不需要找出复杂的properties类关系。。。。

    如下所示:

    简单地说,从使用的角度来看,我只需要在尝试配置或更新kafka属性时找出属性。SCS配置如下:

    • BindingServiceProperties("spring.cloud.stream")
    • 地图

    我这样做是为了帮助我的同事,我可以快速配置我们需要的属性,而无需了解源代码背后的复杂细节。

    但真正了解绑定器和绑定之间的关系将有助于您快速配置属性,因为您可以尽快找到所需的正确SCS配置属性。

    我工作过

  • 姚阳德
    2023-03-14

    绑定和绑定是Spring Cloud Stream中两个不同的概念。

    绑定-是将Binder组件(例如,消息侦听器)与您的应用程序(例如,Function)连接起来的机制。

    Binder-是Spring Cloud Stream连接到目标系统(例如Kafka、Rabbit等)的机制。

    一个很好的类比是Binder是将您连接到网络的电缆,而绑定是您连接到的网络中可用的Internet站点。作为两个不同的概念,每个概念都有自己的一组配置。

     类似资料:
    • 我不确定我遇到的问题是概念问题还是技术问题,因为配置错误。 目标是在应用程序容器中托管的Java EE应用程序(特别是WildFly)和独立运行的Java SE应用程序之间发送双向消息,都使用标准JMS协议和主题。我认为这应该是可能的,只要他们都使用相同的经纪人和相同的主题。 因此,我有一个外部 Artemis 实例作为消息代理运行,并在 Java EE 应用程序的单独 Bean 中成功设置了生产

    • 我在discord.py做了一个机器人,希望我的机器人在网络钩子在特定频道发送消息时定位一个角色。有办法这么做吗?现在所有我有的是通道ID和我很确定这是一个客户端事件

    • 我已经完成了关于如何使用Django(3.1.7版)中的频道和redis制作即时聊天应用程序的youtube教程(github代码链接)。我确实改变了教程中的一些内容,因为他使用的是不同的Django版本。我在使用他的代码时遇到的错误在另一篇stackoverflow文章中有解释。 我已将邮件保存到数据库,并已成功加载页面加载上的10条最新邮件。但是,当我单击“发送”按钮时,我没有看到消息显示在消

    • 让我的应用程序收到5个远程通知。每当我点击单个通知时,所有5个通知都消失了,我们需要完整地显示其他4个通知,直到它们点击。但主要问题是所有其他通知都是为了其他目的,而不是为了相同的目的。因此,我们需要显示所有通知,直到从通知托盘中单击它们。 我们还需要根据此更新徽章计数。有什么方法来处理远程消息推送时,应用程序不运行(被杀)?您能否建议使用目标c处理远程推送通知的更好方法? 谢啦

    • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?

    • 我有客户端代码运行在javascript试图发送html内容到自定义处理程序。 客户端代码如下所示: 处理程序代码为: 问题在于,使用Chrome浏览器时,处理程序获取的消息(txt_内容)不完整。我得到的字符串的最大长度是:524288 当我在资源管理器中运行时,我会得到完整的消息(长度=567130)。 我在这里看到了一个类似的问题,但没有得到回答(设置maxAllowedContentLen