当前位置: 首页 > 知识库问答 >
问题:

Spring网袋跺脚。发送给特定会话的消息(非用户)

梁学真
2023-03-14

我正在尝试使用我在这里找到的方法在Spring框架上设置基本的MessageBroker

作者声称它工作得很好,但我无法在客户端接收消息,尽管没有发现可见的错误。

目标:

我尝试做的基本上是相同的-客户端连接到服务器并请求一些异步操作。操作完成后,客户端应收到一个事件。重要提示:客户机未通过Spring的身份验证,但来自MessageBroker异步后端部分的事件包含他的登录名,因此我假设它足以存储登录SessionId对的并发映射,以便将消息直接发送到特定会话。

客户代码:

//app.js

var stompClient = null;
var subscription = '/user/queue/response';

//invoked after I hit "connect" button
function connect() {
//reading from input text form
var agentId = $("#agentId").val();

var socket = new SockJS('localhost:5555/cti');
stompClient = Stomp.over(socket);
stompClient.connect({'Login':agentId}, function (frame) {
    setConnected(true);
    console.log('Connected to subscription');
    stompClient.subscribe(subscription, function (response) {
        console.log(response);
    });
});

}

//invoked after I hit "send" button
function send() {

var cmd_str = $("#cmd").val();
var cmd = {
    'command':cmd_str
};
console.log("sending message...");
stompClient.send("/app/request", {}, JSON.stringify(cmd));
console.log("message sent");
}

这是我的配置。

//message broker configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{


@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    /** queue prefix for SUBSCRIPTION (FROM server to CLIENT)  */
    config.enableSimpleBroker("/topic");
    /** queue prefix for SENDING messages (FROM client TO server) */
    config.setApplicationDestinationPrefixes("/app");
}


@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {

    registry
            .addEndpoint("/cti")
            .setAllowedOrigins("*")
            .withSockJS();
}

}

现在,在基本配置之后,我应该实现一个应用程序事件处理程序,以提供有关客户端连接的会话相关信息。

//application listener

@Service
public class STOMPConnectEventListener implements ApplicationListener<SessionConnectEvent> {

@Autowired
//this is basically a concurrent map for storing pairs "sessionId - login"
WebAgentSessionRegistry webAgentSessionRegistry;

@Override
public void onApplicationEvent(SessionConnectEvent event) {
    StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());

    String agentId = sha.getNativeHeader("Login").get(0);
    String sessionId = sha.getSessionId();

    /** add new session to registry */
    webAgentSessionRegistry.addSession(agentId,sessionId);

    //debug: show connected to stdout
    webAgentSessionRegistry.show();

}
}

到目前为止一切都很好。在IDE中运行spring webapp并从两个浏览器选项卡连接“客户端”后,我在IDE控制台中获得了以下信息:

session_id / agent_id
-----------------------------
|kecpp1vt|user1|
|10g5e10n|user2|
-----------------------------

好的,现在让我们试着实现消息机制。

//STOMPController


@Controller
public class STOMPController {

@Autowired
//our registry we have already set up earlier
WebAgentSessionRegistry webAgentSessionRegistry;
@Autowired
//a helper service which I will post below
MessageSender sender;

@MessageMapping("/request")
public void handleRequestMessage() throws InterruptedException {

    Map<String,String> params = new HashMap(1);
    params.put("test","test");
    //a custom object for event, not really relevant
    EventMessage msg = new EventMessage("TEST",params);

    //send to user2 (just for the sake of it)
    String s_id = webAgentSessionRegistry.getSessionId("user2");
    System.out.println("Sending message to user2. Target session: "+s_id);
    sender.sendEventToClient(msg,s_id);
    System.out.println("Message sent");

}
}

从应用程序的任何部分发送消息的服务:

//MessageSender

@Service
public class MessageSender implements IMessageSender{

@Autowired
WebAgentSessionRegistry webAgentSessionRegistry;
@Autowired
SimpMessageSendingOperations messageTemplate;

private String qName = "/queue/response";

private MessageHeaders createHeaders(String sessionId) {
    SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
    headerAccessor.setSessionId(sessionId);
    headerAccessor.setLeaveMutable(true);
    return headerAccessor.getMessageHeaders();
}

@Override
public void sendEventToClient(EventMessage event,String sessionId) {
    messageTemplate.convertAndSendToUser(sessionId,qName,event,createHeaders(sessionId));
}
}

现在,让我们试着测试一下。我运行我的IDE,打开Chrome并创建了2个选项卡形式,我连接到服务器。用户1和用户2。结果控制台:

 session_id / agent_id
    -----------------------------
    |kecpp1vt|user1|
    |10g5e10n|user2|
    -----------------------------
Sending message to user2. Target session: 10g5e10n
Message sent

但是,正如我在开头提到的那样——用户2什么也没有得到,尽管他已经连接并订阅了“/用户/队列/响应”。也没有错误。

问题是,我到底错过了什么?我读过许多关于这个主题的文章,但没有用。SPR-11309说这是可能的,应该有效。也许,id-s不是真正的会话id-s?也许有人知道如何监控消息是否真的已经发送,而不是被内部Spring机制丢弃?

配置错误的位:

//WebSocketConfig.java:
....
 @Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    /** queue prefix for SUBSCRIPTION (FROM server to CLIENT)  */
    // + parameter "/queue"
    config.enableSimpleBroker("/topic","/queue");
    /** queue prefix for SENDING messages (FROM client TO server) */
    config.setApplicationDestinationPrefixes("/app");
}
....

我花了一天时间调试内部Spring力学,以找出它到底出了什么问题:

//AbstractBrokerMessageHandler.java: 
....
protected boolean checkDestinationPrefix(String destination) {
    if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) {
        return true;
    }
    for (String prefix : this.destinationPrefixes) {
        if (destination.startsWith(prefix)) {
//guess what? this.destinationPrefixes contains only "/topic". Surprise, surprise
            return true;
        }
    }
    return false;
}
....

尽管我不得不承认,我仍然认为文档中提到的用户个人队列没有被明确配置,因为它们“已经存在”。也许我只是弄错了。

共有1个答案

苏华荣
2023-03-14

总的来说看起来不错,但是你能从

config.enableSimpleBroker("/topic");

config.enableSimpleBroker("/queue");

... 看看这是否有效?希望这有帮助。

 类似资料:
  • 是否可以向特定会话发送消息? 我在客户端和一个spring servlet之间有一个未经认证的websocket。我需要在异步作业结束时向特定连接发送未经请求的消息。 正如您在这段代码中看到的,客户端可以启动一个异步作业,当它完成时,它需要end消息。很明显,我只需要给申请人发信息,而不是向所有人广播。如果有批注或方法就太好了。 更新 我试过这个: 但这将广播到所有会话,而不仅仅是指定的会话。 更

  • 问题内容: 是否可以向特定会话发送消息? 我在客户端和Spring Servlet之间有未经身份验证的WebSocket。异步作业结束时,我需要向特定的连接发送未经请求的消息。 如你在此代码中看到的,客户端可以启动异步作业,完成后,它需要结束消息。显然,我只需要向申请人发送消息,而不是向所有人广播。拥有注释或方法会很棒。 更新 我尝试了这个: 但这会广播到所有会话,而不仅仅是指定的会话。 更新2

  • 我已经按照Quetion1和Quetion2从堆栈溢出发送消息到特定的客户端,基于其会话ID,但找不到成功。 下面是我的示例RestController类 会话ID:当客户端发送create会话请求时,会生成新的Spring会话ID,并将其存储在MongoDB中。之后,当客户端发送Web套接字连接请求时,会收到与预期存储在mongoDb中的相同的会话ID。直到一切正常。 现在,我的工作是根据Ses

  • 我使用本教程中描述的spring设置了一个WebSocket:https://spring.io/guides/gs/messaging-stomp-websocket/。我需要的是我的服务器每5秒向特定用户发出一条消息。所以我首先做了这个: 而且起作用了。现在,为了只向特定用户发送消息,我更改了以下内容: 在WebSocketConfig.java中添加队列: 更改GreetingControl

  • 问题内容: 如何仅从服务器向特定用户发送websocket消息? 我的webapp具有spring安全设置,并使用websocket。我在尝试仅从服务器向特定用户发送消息时遇到棘手的问题。 通过阅读手册,我的理解是来自我们可以做的服务器 在客户端: 但是我永远无法调用订阅回调。我尝试了许多不同的方法,但是没有运气。 如果我将其发送到/ topic / reply,它可以工作,但所有其他已连接用户也

  • 我正在阅读《Spring in Action4》一书,以使用WebSocket上的STOMP消息传递。 假设,如下所示: 然后客户端使用以下JavaScript代码订阅目标: 则该消息将发送到目的地,如下所示: 那么,两个目的地和看起来是不同的,消息怎么可能到达客户端呢?