我正在使用Spring Cloud StreamBridge将消息发布到RabbitMQ交换机。使用本机RabbitMQ完美测试,我可以使用单个生产者轻松获得100kmsgs/s(1个通道)。如果我使用发送StreamBrige(也是1个通道)启动带有时循环的线程,我只获得~20kmsgs/s的类似设置(没有持久性,没有手动打包或确认,相同的Docker容器...)。我使用的是Spring Cloud Stream和Rabbit Binder 3.2.2。
我的yml看起来像这样:
spring:
rabbitmq:
host: localhost
port: 5672
cloud:
function:
definition: producer1;
stream:
bindings:
producer1-out-0:
destination: messageQueue
#requiredGroups: consumerGroup1,
rabbit:
bindings:
producer1-out-0:
producer:
deliveryMode: NON_PERSISTENT
exchangeType: direct
bindingRoutingKey: default_message
routingKeyExpression: '''default_message'''
#maxLength: 1
output-bindings: producer1;
我的发送循环RabbitMQ PerfTest工具是用Java编写的,看起来很相似:
@Autowired
public StreamBridge streamBridge;
ExecutorService executorService = Executors.newFixedThreadPool(10);
@PostConstruct
public void launchProducer() {
Runnable task = () -> {
while (true){
streamBridge.send("producer1-out-0", "msg");
}
};
executorService.submit(task);
}
同样在我的控制台中,我得到一个奇怪的msg频道'unknown.channel.name'在启动时有1个订阅者
,我不知道为什么。
使用StreamBridge的慢速发送速率是一个自然的Spring限制还是我有什么配置错误?感谢您的帮助:)
在本机API上使用抽象时,总会有一些开销;然而,5x听起来并不正确。
我使用-x 1-y 1-a作为参数,意味着只有一个生产者使用自动消费者确认发布消息
这可能解释了这一点;自动ack意味着没有ack——当消息发送给消费者时,代理会立即对消息进行打包(冒着消息丢失的风险)。Spring中的等价物是确认模式。NONE
;默认情况下容器会单独打包每条消息。
看https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode
和
https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize
而且
https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount
Spring AMQP默认将其设置为250,但SCSt的默认值为1,这要慢得多。
编辑
有趣的与Spring集成相比,SCSt似乎确实增加了一些显著的开销。
下面测试来自本机Java客户端的各种场景,并在顶部添加越来越多的Spring抽象,最后使用StreamBridge
;可能应该对其进行分析,以查看成本在哪里以及是否可以降低成本。
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct
logging.level.root=warn
@SpringBootApplication
public class So71414000Application {
public static void main(String[] args) {
SpringApplication.run(So71414000Application.class, args).close();
}
@Bean
ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
return args -> {
/*
* Native java API
*/
Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
Channel channel = conn.createChannel();
byte[] msg = "msg".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
int count = 1000000;
StopWatch watch = watch("native");
IntStream.range(0, count).forEach(i -> {
try {
channel.basicPublish("foo", "", props, msg);
}
catch (IOException e) {
e.printStackTrace();
}
});
perf(count, watch);
channel.close();
conn.close();
};
}
@Bean
ApplicationRunner runner2(RabbitTemplate template) {
return args -> {
/*
* Single ChannelProxy, no cache, no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("nocache");
int count = 1000000;
template.invoke(t -> {
IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
return null;
});
perf(count, watch);
};
}
@Bean
ApplicationRunner runner3(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("cached channel");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner4(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), conversion
*/
StopWatch watch = watch("message conversion");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner5(RabbitTemplate template) {
return args -> {
/*
* Spring Integration
*/
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
outbound.setExchangeName("foo");
outbound.setRoutingKey("");
DirectChannel channel = new DirectChannel();
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
consumer.start();
GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
StopWatch watch = watch("Spring Integration");
int count = 1000000;
IntStream.range(0, count).forEach(i -> channel.send(msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner6(StreamBridge bridge) {
return args -> {
/*
* Stream bridge
*/
StopWatch watch = watch("Stream Bridge");
int count = 1000000;
IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
perf(count, watch);
};
}
private StopWatch watch(String name) {
StopWatch watch = new StopWatch();
watch.start(name);
return watch;
}
private void perf(int count, StopWatch watch) {
watch.stop();
System.out.println(watch.prettyPrint());
System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
}
}
在我的MacBook Air(2018 1.6GHz I5)和一个裸机代理上有了这些结果:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.4)
StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns % Task name
---------------------------------------------
10949129530 100% native
91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns % Task name
---------------------------------------------
14175481691 100% nocache
70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns % Task name
---------------------------------------------
16300449457 100% cached channel
61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns % Task name
---------------------------------------------
18206111556 100% message conversion
54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns % Task name
---------------------------------------------
26654581638 100% Spring Integration
37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns % Task name
---------------------------------------------
102734493141 100% Stream Bridge
9k/s
目录 考虑到性能和架构, Redux “可扩展性” 如何? 每个 action 都调用 “所有的 reducer” 会不会很慢? 在 reducer 中必须对 state 进行深拷贝吗?拷贝 state 不会很慢吗? 怎样减少 store 更新事件的数量? 仅有 “一个 state 树” 会引发内存问题吗?分发多个 action 会占用内存空间吗? 缓存远端数据会造成内存问题吗? 性能 考虑到性能
Sketch 的性能可以轻松的支持相当复杂的设计,但如果你创作出了一个很大的文件,你可能会想知道有哪些因素影响着 sketch 的性能。 模糊 模糊是非常消耗系统资源的效果。Sketch 需要先将图层渲染成一个位图(这已经很消耗资源了),然后再在上面添加一个模糊(这将更消耗资源),模糊半径越大,消耗的资源也就越大。 一个半径为 1px的模糊,Sketch 需要检查每一个像素周围的每一个像素,也就是
用户希望他们使用的图形界面具有交互性和流畅性,而这正是你需要越来越多地集中时间和精力的地方。 界面不仅要加载快,还要运行良好, 滚动应该很快,动画和交互应该如丝般流畅。 要编写高性能应用程序,你需要了解 LCUI 如何渲染界面,并确保你编写的代码以及第三方代码尽可能高效地运行。 像素管道 “渲染”就是将组件数据转变为像素数据,这个转变如同一条包含很多区域的单向管道,组件数据经过管道中的每个区域的处
性能工具 Reporting: WEIGHTOF.IT Web Page Test GTmetrix Speed Curve [$] Chrome Devtools Timeline sitespeed.io JS tools: ImageOptim-CLI imagemin Budgeting: performancebudget.io
jd.onMemoryWarning(function callback) 监听内存不足告警事件,当 iOS/Android 向小程序进程发出内存警告时,触发该事件。 参数 function callback 内存不足告警事件的回调函数 参数 属性 类型 说明 level number 内存告警等级,只有 Android 才有,对应系统宏定义 level 的合法值 值 说明 5 TRIM_MEMO
TPS(Transactions Per Second)又称“系统的吞吐量”,即“系统每秒钟能够处理的交易数量”。 当前区块链最大的问题是性能问题。下图为部分公链的单日最大交易量(2019/10/1前的数据)数据来自https://tokenview.com/: 名字 数据日期 交易量 TPS 备注 BitCoin 2017/12/14 490644 5.67875 理论极限为7 Ethereum
如何分析一个由SQLAlchemy支持的应用程序? 查询分析 代码性能测试 执行缓慢 结果获取慢 - 核心 结果获取慢度-ORM 我用ORM插入了400000行,速度非常慢! 如何分析一个由SQLAlchemy支持的应用程序? 寻找性能问题通常涉及两种策略。一个是查询分析,另一个是代码分析。 查询分析 有时只是简单的SQL日志记录(通过python的日志记录模块或通过 echo=True 争论 c