当前位置: 首页 > 知识库问答 >
问题:

每分钟发送35000条jms消息

濮阳功
2023-03-14

我们有一个Spring Boot应用程序,用于在另一个组件上执行负载测试。我们每分钟最多需要发送35000条JMS消息,因此我使用调度器每分钟运行一次任务。

问题是当我保持低强度时,它会设法在指定的时间间隔(一分钟)内发送消息。但是当强度很高时,发送消息块需要超过1分钟。对以下实现有任何建议吗?

调度程序

@Component
public class MessageScheduler {

private final Logger log = LoggerFactory.getLogger(getClass());
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(16);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);

@Autowired
JmsSender sender;

    public void startScheduler() {
       Runnable runnableTask = sender::sendMessagesChunk;
       executorService.scheduleAtFixedRate(runnableTask, 0, TIME_PERIOD, 
       TimeUnit.MILLISECONDS);
    }
}

用于发送消息的类

@Component
public class JmsSender {

@Autowired
TrackingManager manager;

private final Logger log = LoggerFactory.getLogger(getClass());
private final static int TOTAL_MESSAGES = ConfigFactory.getConfig().getInt("total.tracking.messages").orElse(10);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
private static int failedPerPeriod=0;
private static int totalFailed=0;
private static int totalMessageCounter=0;

public void sendMessagesChunk() {
    log.info("Started  at: {}", Instant.now());
    log.info("Sending messages with intensity {} messages/minute", TOTAL_MESSAGES);
    for (int i=0; i<TOTAL_MESSAGES; i++) {
        try {
            long start = System.currentTimeMillis();
            MessageDTO msg = manager.createMessage();
            send(msg);
            long stop = System.currentTimeMillis();
            if (timeOfDelay(stop-start)>=0L) {
                Thread.sleep(timeOfDelay(stop-start));
            }
        } catch (Exception e) {
            log.info("Error :  " + e.getMessage());
            failedPerPeriod++;
        }
    }
    totalMessageCounter += TOTAL_MESSAGES;
    totalFailed += failedPerPeriod;
    log.info("Finished  at: {}", Instant.now());
    log.info("Success rate(of last minute): {} %, Succeeded: {}, Failed: {}, Success rate(in total): {} %, Succeeded: {}, Failed: {}"
            ,getSuccessRatePerPeriod(), getSuccededPerPeriod(), failedPerPeriod,
            getTotalSuccessRate(), getTotalSucceded(), totalFailed);
    failedPerPeriod =0;
}

private long timeOfDelay(Long elapsedTime){
    return (TIME_PERIOD / TOTAL_MESSAGES) - elapsedTime;
}
private int getSuccededPerPeriod(){
    return TOTAL_MESSAGES - failedPerPeriod;
}

private int getTotalSucceded(){
    return totalMessageCounter - totalFailed;
}

private double getSuccessRatePerPeriod(){
    return getSuccededPerPeriod()*100D / TOTAL_MESSAGES;
}

private double getTotalSuccessRate(){
    return getTotalSucceded()*100D / totalMessageCounter;
}

private void send(MessageDTO messageDTO) throws Exception {
    requestContextInitializator();
    JmsClient client = JmsClientBuilder.newClient(UriScheme.JmsType.AMQ);
    client.target(new URI("activemq:queue:" + messageDTO.getDestination()))
            .msgTypeVersion(messageDTO.getMsgType(), messageDTO.getVersion())
            .header(Header.MSG_VERSION, messageDTO.getVersion())
            .header(Header.MSG_TYPE, messageDTO.getMsgType())
            .header(Header.TRACKING_ID, UUID.randomUUID().toString())
            .header(Header.CLIENT_ID, "TrackingJmsClient")
            .post(messageDTO.getPayload());
}

共有2个答案

万俟亦
2023-03-14

35000 msg/min是低于600 msg/sec的一个缺口。这不被认为是“很多”,应该是相对容易清除的目标。主要思想是“重用”所有重量级JMS对象:连接、会话和目标。单线程应该足够了。

ConnectionFactory connFactory = ....    // initialize connection factory
@Cleanup Connection conn = connFactory.createConnection();
@Cleanup Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Queue q = session.createQueue("example_destiation");
@Cleanup MessageProducer producer = session.createProducer(q);

for (String payload: messagesToSend) {
    TextMessage message = session.createTextMessage(payload);
    producer.send(msg);
    session.commit();
}

通过以下方式可以实现额外的加速:

  • 提交第n条消息
  • 通过使用更快的ACKNOWLEDGE模式
  • 通过使用非持久消息
  • 通过使用在会话外部创建的目标对象
  • 异步发送消息

非\u持久、ACKOWLEDGE、异步传递示例

@Cleanup Connection conn = connFactory.createConnection();
@Cleanup Session session = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue q = session.createQueue("example_destiation");
@Cleanup MessageProducer producer = session.createProducer(q);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setAsync(new ExmpleSendListener());

for (String payload: messagesToSend) {
    TextMessage message = session.createTextMessage(payload);
    producer.send(msg);
}
滕夜洛
2023-03-14

您应该解决两个问题:

  1. 总发送操作时间必须低于最大时间

显然,如果您的send方法太慢,将超过最大时间。

发送消息的更快方法是使用某种批量操作。如果您的MQ API不支持批量操作,则无需担心,您无法使用它!由于第二个限制(“一致”)。

您可以异步发送消息,但如果您的MQ API为此创建线程而不是“非阻塞”异步,您可能会遇到内存问题。

使用javax。jms。MessageProducer。发送可以异步发送消息,但会为每个消息创建一个新的线程(将创建大量内存和服务器线程)。

另一种加速方法是只创建一个JMS客户端(您的send方法)。

要实现第二个要求,您应该修复您的timeOfDelay函数,它是错误的。真的,您应该考虑send函数的概率分布来估计正确的值,但是,您可以简单地做:

    long accTime = 0L;
    for (int i=0; i<TOTAL_MESSAGES; i++) {
        try {
            long start = System.currentTimeMillis();
            MessageDTO msg = manager.createMessage();
            send(msg);
            long stop = System.currentTimeMillis();
            accTime += stop - start;
            if(accTime < TIME_PERIOD)
                Thread.sleep((TIME_PERIOD - accTime) / (TOTAL_MESSAGES - i));
        } catch (Exception e) {
            log.info("Error :  " + e.getMessage());
            failedPerPeriod++;
        }
    }
 类似资料:
  • 我想使用SpringBoot向ActiveMQ队列发送消息。应用程序应在发送后终止,但仍保持活动状态。 这是我的申请代码: 在没有任何父节点的情况下使用以下依赖项(Maven): 和一行

  • 我试图在不和谐的情况下每隔x秒发送一条消息。js机器人。我知道如何做到这一点,但我遇到的问题是,即使我启用了slowmode,它也会发送垃圾邮件。我怎样才能解决这个问题?

  • 我用的是Kafka0.8.2。正如文件所说: batch.num.messages指定: 使用异步模式时要在一批中发送的消息数。生产者将等待该数量的消息准备好发送或排队。缓冲器已达到最大毫秒。 和请求。必修的。acks控制代理对请求的确认。 我想知道Kafka经纪人如何发送这个确认,它是否发送批次确认字符,还是每个单独的消息?

  • 如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?

  • null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。

  • 我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。 我从redislabs开始学习本教程 守则: 我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正