我们有一个Spring Boot应用程序,用于在另一个组件上执行负载测试。我们每分钟最多需要发送35000条JMS消息,因此我使用调度器每分钟运行一次任务。
问题是当我保持低强度时,它会设法在指定的时间间隔(一分钟)内发送消息。但是当强度很高时,发送消息块需要超过1分钟。对以下实现有任何建议吗?
调度程序类
@Component
public class MessageScheduler {
private final Logger log = LoggerFactory.getLogger(getClass());
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(16);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
@Autowired
JmsSender sender;
public void startScheduler() {
Runnable runnableTask = sender::sendMessagesChunk;
executorService.scheduleAtFixedRate(runnableTask, 0, TIME_PERIOD,
TimeUnit.MILLISECONDS);
}
}
用于发送消息的类
@Component
public class JmsSender {
@Autowired
TrackingManager manager;
private final Logger log = LoggerFactory.getLogger(getClass());
private final static int TOTAL_MESSAGES = ConfigFactory.getConfig().getInt("total.tracking.messages").orElse(10);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
private static int failedPerPeriod=0;
private static int totalFailed=0;
private static int totalMessageCounter=0;
public void sendMessagesChunk() {
log.info("Started at: {}", Instant.now());
log.info("Sending messages with intensity {} messages/minute", TOTAL_MESSAGES);
for (int i=0; i<TOTAL_MESSAGES; i++) {
try {
long start = System.currentTimeMillis();
MessageDTO msg = manager.createMessage();
send(msg);
long stop = System.currentTimeMillis();
if (timeOfDelay(stop-start)>=0L) {
Thread.sleep(timeOfDelay(stop-start));
}
} catch (Exception e) {
log.info("Error : " + e.getMessage());
failedPerPeriod++;
}
}
totalMessageCounter += TOTAL_MESSAGES;
totalFailed += failedPerPeriod;
log.info("Finished at: {}", Instant.now());
log.info("Success rate(of last minute): {} %, Succeeded: {}, Failed: {}, Success rate(in total): {} %, Succeeded: {}, Failed: {}"
,getSuccessRatePerPeriod(), getSuccededPerPeriod(), failedPerPeriod,
getTotalSuccessRate(), getTotalSucceded(), totalFailed);
failedPerPeriod =0;
}
private long timeOfDelay(Long elapsedTime){
return (TIME_PERIOD / TOTAL_MESSAGES) - elapsedTime;
}
private int getSuccededPerPeriod(){
return TOTAL_MESSAGES - failedPerPeriod;
}
private int getTotalSucceded(){
return totalMessageCounter - totalFailed;
}
private double getSuccessRatePerPeriod(){
return getSuccededPerPeriod()*100D / TOTAL_MESSAGES;
}
private double getTotalSuccessRate(){
return getTotalSucceded()*100D / totalMessageCounter;
}
private void send(MessageDTO messageDTO) throws Exception {
requestContextInitializator();
JmsClient client = JmsClientBuilder.newClient(UriScheme.JmsType.AMQ);
client.target(new URI("activemq:queue:" + messageDTO.getDestination()))
.msgTypeVersion(messageDTO.getMsgType(), messageDTO.getVersion())
.header(Header.MSG_VERSION, messageDTO.getVersion())
.header(Header.MSG_TYPE, messageDTO.getMsgType())
.header(Header.TRACKING_ID, UUID.randomUUID().toString())
.header(Header.CLIENT_ID, "TrackingJmsClient")
.post(messageDTO.getPayload());
}
35000 msg/min是低于600 msg/sec的一个缺口。这不被认为是“很多”,应该是相对容易清除的目标。主要思想是“重用”所有重量级JMS对象:连接、会话和目标。单线程应该足够了。
ConnectionFactory connFactory = .... // initialize connection factory
@Cleanup Connection conn = connFactory.createConnection();
@Cleanup Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Queue q = session.createQueue("example_destiation");
@Cleanup MessageProducer producer = session.createProducer(q);
for (String payload: messagesToSend) {
TextMessage message = session.createTextMessage(payload);
producer.send(msg);
session.commit();
}
通过以下方式可以实现额外的加速:
非\u持久、ACKOWLEDGE、异步传递示例:
@Cleanup Connection conn = connFactory.createConnection();
@Cleanup Session session = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue q = session.createQueue("example_destiation");
@Cleanup MessageProducer producer = session.createProducer(q);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setAsync(new ExmpleSendListener());
for (String payload: messagesToSend) {
TextMessage message = session.createTextMessage(payload);
producer.send(msg);
}
您应该解决两个问题:
显然,如果您的send
方法太慢,将超过最大时间。
发送消息的更快方法是使用某种批量操作。如果您的MQ API不支持批量操作,则无需担心,您无法使用它!由于第二个限制(“一致”)。
您可以异步发送消息,但如果您的MQ API为此创建线程而不是“非阻塞”异步,您可能会遇到内存问题。
使用javax。jms。MessageProducer。发送可以异步发送消息,但会为每个消息创建一个新的线程(将创建大量内存和服务器线程)。
另一种加速方法是只创建一个JMS客户端(您的send方法)。
要实现第二个要求,您应该修复您的timeOfDelay
函数,它是错误的。真的,您应该考虑send
函数的概率分布来估计正确的值,但是,您可以简单地做:
long accTime = 0L;
for (int i=0; i<TOTAL_MESSAGES; i++) {
try {
long start = System.currentTimeMillis();
MessageDTO msg = manager.createMessage();
send(msg);
long stop = System.currentTimeMillis();
accTime += stop - start;
if(accTime < TIME_PERIOD)
Thread.sleep((TIME_PERIOD - accTime) / (TOTAL_MESSAGES - i));
} catch (Exception e) {
log.info("Error : " + e.getMessage());
failedPerPeriod++;
}
}
我想使用SpringBoot向ActiveMQ队列发送消息。应用程序应在发送后终止,但仍保持活动状态。 这是我的申请代码: 在没有任何父节点的情况下使用以下依赖项(Maven): 和一行
我试图在不和谐的情况下每隔x秒发送一条消息。js机器人。我知道如何做到这一点,但我遇到的问题是,即使我启用了slowmode,它也会发送垃圾邮件。我怎样才能解决这个问题?
我用的是Kafka0.8.2。正如文件所说: batch.num.messages指定: 使用异步模式时要在一批中发送的消息数。生产者将等待该数量的消息准备好发送或排队。缓冲器已达到最大毫秒。 和请求。必修的。acks控制代理对请求的确认。 我想知道Kafka经纪人如何发送这个确认,它是否发送批次确认字符,还是每个单独的消息?
如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?
null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。
我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。 我从redislabs开始学习本教程 守则: 我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正