当前位置: 首页 > 知识库问答 >
问题:

simpMessagingTemplate convertAndSendToUser许多等待线程阻塞其他功能

鲁浩渺
2023-03-14

我们在应用程序中使用了Stomp、SpringBoot和WebSockets。服务器应用程序执行以下操作:1)生成要推送给用户的消息;2)接受WebSocket连接;3)将消息推送给ActiveMQ stomp Broker。线程转储显示了大量与simpMessagingTemplate convertAndSendToUser API调用相关联的等待线程

应用程序的两个实例在云中运行。该应用程序使用simpMessagingTemplate convertAndSendToUser API生成消息并推送到ActiveMQ stomp broker(单独运行)。

"ForkJoinPool-1-worker-440" #477 daemon prio=5 os_prio=0 tid=0x00007f0c541c2800 nid=0x2a47 sleeping[0x00007f08e6371000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at reactor.util.concurrent.WaitStrategy$Sleeping.waitFor(WaitStrategy.java:319)
	at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:211)
	at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:176)
	at org.springframework.messaging.tcp.reactor.AbstractMonoToListenableFutureAdapter.get(AbstractMonoToListenableFutureAdapter.java:73)
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:980)
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:549)
	at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:234)
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
	at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:105)
	at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
	at org.springframework.messaging.simp.user.UserDestinationMessageHandler.handleMessage(UserDestinationMessageHandler.java:227)
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
	at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
	at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:229)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:218)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:204)
	at com.mypackage.PushMessageManager.lambda$sendMyMessage$2(PushMessageManager.java:77)
	at com.mypackage.PushMessageManager$$Lambda$923/1850582969.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at com.mypackage.PushMessageManager.sendMyMessage(PushMessageManager.java:74)
	at com.mypackage.PushMessageManager.lambda$processPushMessage$0(PushMessageManager.java:61)
	at com.mypackage.PushMessageManager$$Lambda$664/624459498.run(Unknown Source)
	at nl.talsmasoftware.context.functions.RunnableWithContext.run(RunnableWithContext.java:42)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at nl.talsmasoftware.context.executors.ContextAwareExecutorService$1.call(ContextAwareExecutorService.java:59)
	at nl.talsmasoftware.context.delegation.RunnableAdapter.run(RunnableAdapter.java:44)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
	- None

stompclient.connect({'username':$(“#username”).val()},function(frame){setConnected(true);subscription=stompclient.subscribe(“/user/queue/abc”,function(message){showData(json.parse(message.body));},headers={'loginusername':$(“#username”).val()};});

因此,每个用户只能接收到发送给他们的消息,而不是所有的消息。这就是我们在通过WebSocket连接的同时将用户连接到单个队列的原因,并且还使用convertAndSendToUser将消息推送到特定的会话中。后端JMS发布器确保消息以循环的方式发布给用户。

为了回答你关于确定瓶颈的问题,如果我们连接2000个用户,一切都很好。但是当我们添加更多的用户时,我们看到应用程序的JMS侦听器不能侦听后端Gatling JMS load Generator每分钟发送的20000条消息。ActiveMQ JMS队列深度因此而增加。

希望这能澄清你的一些问题。下面给出了更新代码段以显示SimpMessagingTemplate.ConvertAndSendTouser API的异步调用。这里,repositoryutil.executor()是我们自己的executor对象包装器。

    public CompletableFuture<Void> processPushMessage(String userName, String payload) {
    return ContextAwareCompletableFuture.runAsync(() -> {
        sendABCMessage(payload, userName);
    }, RepositoryUtil.executor());
}

public void sendABCMessage(@Payload String payload, String username) {
    ArrayList<UserProfiles> userProfiles = (ArrayList<UserProfiles>) cacheService.getValue(username);
    if (Objects.nonNull(userProfiles) && userProfiles.size() > 0) {
      userProfiles.parallelStream()
          .filter(userProfiles1 -> ("/user/queue/abc".equalsIgnoreCase(userProfiles1.getSubscribeMapping()) && username.equals(userProfiles1.getUserName())))
          .forEach(userProfiles1 -> {              simpMessagingTemplate.convertAndSendToUser(userProfiles1.getSessionId(), "/queue/abc", payload);
          });
    } else {
      LOGGER.info("sendABCMessage userProfiles is null. Payload: {}", payload);
    }
}

共有1个答案

章心水
2023-03-14

我们可以通过移动到/user/topic而不是/user/queue来解决这个问题。我们现在能够处理每分钟35K来自后端和8K web套接字用户连接的消息。

 类似资料:
  • 本文向大家介绍Java countDownLatch如何实现多线程任务阻塞等待,包括了Java countDownLatch如何实现多线程任务阻塞等待的使用技巧和注意事项,需要的朋友参考一下 我这里需要通过多线程去处理数据,然后在所有数据都处理完成后再往下执行。这里就用到了CountDownLatch。把countdownlatch作为参数传入到每个线程类里,在线程中处理完数据后执行countdo

  • 我的布局有表面视图。有时当我的应用程序从后台切换到前台时,我会得到ANR。我认为原因是主线程被lock方法阻塞了。 最重要的部分是:

  • 我有4-5个工作线程处理大型消息队列。我还有另一段代码,它使用2-3个worker运行。我想在处理大型消息队列时阻止所有其他工作者。 我正在使用JDK6和Jms 编辑: 队列进程工作者从未终止。当没有消息时,它们阻塞队列。这些工作者由执行器线程池管理,如果我使用读写锁,其中一个工作者也会被阻塞。此外,如果使用循环屏障,那么我必须终止线程,以便重新传递阻塞的第二个进程。由于工作者是由线程池管理的,所

  • 线程状态WAIT和线程状态BLOCKED有什么区别? 线。国家文件: Blocked 等待监视器锁而被阻塞的线程处于这种状态。 等待 无限期等待另一个线程执行特定操作的线程处于此状态 这不能向我解释差异。

  • 首先,我决定让我的类阻塞(让消费者更容易使用,但对我来说可能更乏味)。而不是让使用者定义异步回调。这是一个好的设计模式吗?这样,用户可以获得预期的行为,但如果他们对线程被阻塞的时间不满意,则可以实现自己的多线程。 我有一个构造函数,它根据异步回调的结果在类中设置最后一个字段: 这不起作用,所以我使用了原子引用,并实现了一个阻塞循环,直到返回结果,如下所示: 这是阻止/检索结果的好方法吗?

  • 我们在其中一个模块中使用了Hystrix-断路器模式[library]。usecase是:-我们正在从kafka轮询16个消息,并使用pararllel流处理它们,因此,对于工作流中的每条消息,它需要3个rest调用,这些调用由hystric命令保护。现在,问题是当我尝试运行单个实例时,CPU显示尖峰,线程转储显示许多线程处于等待状态,等待所有3个命令。如下所示:-