当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud StreamBridge性能低?

咸浩初
2023-03-14

我正在使用Spring Cloud StreamBridge将消息发布到RabbitMQ交换机。使用本机RabbitMQ完美测试,我可以使用单个生产者轻松获得100kmsgs/s(1个通道)。如果我使用发送StreamBrige(也是1个通道)启动带有时循环的线程,我只获得~20kmsgs/s的类似设置(没有持久性,没有手动打包或确认,相同的Docker容器...)。我使用的是Spring Cloud Stream和Rabbit Binder 3.2.2。

我的yml看起来像这样:

spring:
  rabbitmq:
    host: localhost
    port: 5672

  cloud:
    function:
      definition: producer1;

    stream:
      bindings:
        producer1-out-0:
          destination: messageQueue
          #requiredGroups: consumerGroup1,
      rabbit:
        bindings:
          producer1-out-0:
            producer:
              deliveryMode: NON_PERSISTENT
              exchangeType: direct
              bindingRoutingKey: default_message
              routingKeyExpression: '''default_message'''
              #maxLength: 1
      output-bindings: producer1;

我的发送循环RabbitMQ PerfTest工具是用Java编写的,看起来很相似:

        @Autowired
        public StreamBridge streamBridge;

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        @PostConstruct
        public void launchProducer() {
            Runnable task = () -> {
                while (true){
                    streamBridge.send("producer1-out-0", "msg");
                }
            };
            executorService.submit(task);
        }

同样在我的控制台中,我得到一个奇怪的msg频道'unknown.channel.name'在启动时有1个订阅者,我不知道为什么。

使用StreamBridge的慢速发送速率是一个自然的Spring限制还是我有什么配置错误?感谢您的帮助:)

共有1个答案

宫铭
2023-03-14

在本机API上使用抽象时,总会有一些开销;然而,5x听起来并不正确。

我使用-x 1-y 1-a作为参数,意味着只有一个生产者使用自动消费者确认发布消息

这可能解释了这一点;自动ack意味着没有ack——当消息发送给消费者时,代理会立即对消息进行打包(冒着消息丢失的风险)。Spring中的等价物是确认模式。NONE;默认情况下容器会单独打包每条消息。

看https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode

https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize

而且

https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount

Spring AMQP默认将其设置为250,但SCSt的默认值为1,这要慢得多。

编辑

有趣的与Spring集成相比,SCSt似乎确实增加了一些显著的开销。

下面测试来自本机Java客户端的各种场景,并在顶部添加越来越多的Spring抽象,最后使用StreamBridge;可能应该对其进行分析,以查看成本在哪里以及是否可以降低成本。

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct

logging.level.root=warn
@SpringBootApplication
public class So71414000Application {

    public static void main(String[] args) {
        SpringApplication.run(So71414000Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
        return args -> {
            /*
             * Native java API
             */
            Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
            Channel channel = conn.createChannel();
            byte[] msg = "msg".getBytes();
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
            int count = 1000000;
            StopWatch watch = watch("native");
            IntStream.range(0, count).forEach(i -> {
                try {
                    channel.basicPublish("foo", "", props, msg);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            });
            perf(count, watch);
            channel.close();
            conn.close();
        };
    }

    @Bean
    ApplicationRunner runner2(RabbitTemplate template) {
        return args -> {
            /*
             * Single ChannelProxy, no cache, no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("nocache");
            int count = 1000000;
            template.invoke(t -> {
                IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
                return null;
            });
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner3(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("cached channel");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner4(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), conversion
             */
            StopWatch watch = watch("message conversion");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner5(RabbitTemplate template) {
        return args -> {
            /*
             * Spring Integration
             */
            AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
            outbound.setExchangeName("foo");
            outbound.setRoutingKey("");
            DirectChannel channel = new DirectChannel();
            EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
            consumer.start();
            GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
            StopWatch watch = watch("Spring Integration");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> channel.send(msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner6(StreamBridge bridge) {
        return args -> {
            /*
             * Stream bridge
             */
            StopWatch watch = watch("Stream Bridge");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
            perf(count, watch);
        };
    }


    private StopWatch watch(String name) {
        StopWatch watch = new StopWatch();
        watch.start(name);
        return watch;
    }

    private void perf(int count, StopWatch watch) {
        watch.stop();
        System.out.println(watch.prettyPrint());
        System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
    }

}

在我的MacBook Air(2018 1.6GHz I5)和一个裸机代理上有了这些结果:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
10949129530  100%  native

91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
14175481691  100%  nocache

70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
16300449457  100%  cached channel

61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
18206111556  100%  message conversion

54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
26654581638  100%  Spring Integration

37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
102734493141  100%  Stream Bridge

9k/s
 类似资料:
  • 目录 考虑到性能和架构, Redux “可扩展性” 如何? 每个 action 都调用 “所有的 reducer” 会不会很慢? 在 reducer 中必须对 state 进行深拷贝吗?拷贝 state 不会很慢吗? 怎样减少 store 更新事件的数量? 仅有 “一个 state 树” 会引发内存问题吗?分发多个 action 会占用内存空间吗? 缓存远端数据会造成内存问题吗? 性能 考虑到性能

  • Sketch 的性能可以轻松的支持相当复杂的设计,但如果你创作出了一个很大的文件,你可能会想知道有哪些因素影响着 sketch 的性能。 模糊 模糊是非常消耗系统资源的效果。Sketch 需要先将图层渲染成一个位图(这已经很消耗资源了),然后再在上面添加一个模糊(这将更消耗资源),模糊半径越大,消耗的资源也就越大。 一个半径为 1px的模糊,Sketch 需要检查每一个像素周围的每一个像素,也就是

  • 用户希望他们使用的图形界面具有交互性和流畅性,而这正是你需要越来越多地集中时间和精力的地方。 界面不仅要加载快,还要运行良好, 滚动应该很快,动画和交互应该如丝般流畅。 要编写高性能应用程序,你需要了解 LCUI 如何渲染界面,并确保你编写的代码以及第三方代码尽可能高效地运行。 像素管道 “渲染”就是将组件数据转变为像素数据,这个转变如同一条包含很多区域的单向管道,组件数据经过管道中的每个区域的处

  • 性能工具 Reporting: WEIGHTOF.IT Web Page Test GTmetrix Speed Curve [$] Chrome Devtools Timeline sitespeed.io JS tools: ImageOptim-CLI imagemin Budgeting: performancebudget.io

  • jd.onMemoryWarning(function callback) 监听内存不足告警事件,当 iOS/Android 向小程序进程发出内存警告时,触发该事件。 参数 function callback 内存不足告警事件的回调函数 参数 属性 类型 说明 level number 内存告警等级,只有 Android 才有,对应系统宏定义 level 的合法值 值 说明 5 TRIM_MEMO

  • TPS(Transactions Per Second)又称“系统的吞吐量”,即“系统每秒钟能够处理的交易数量”。 当前区块链最大的问题是性能问题。下图为部分公链的单日最大交易量(2019/10/1前的数据)数据来自https://tokenview.com/: 名字 数据日期 交易量 TPS 备注 BitCoin 2017/12/14 490644 5.67875 理论极限为7 Ethereum

  • 如何分析一个由SQLAlchemy支持的应用程序? 查询分析 代码性能测试 执行缓慢 结果获取慢 - 核心 结果获取慢度-ORM 我用ORM插入了400000行,速度非常慢! 如何分析一个由SQLAlchemy支持的应用程序? 寻找性能问题通常涉及两种策略。一个是查询分析,另一个是代码分析。 查询分析 有时只是简单的SQL日志记录(通过python的日志记录模块或通过 echo=True 争论 c