当前位置: 首页 > 知识库问答 >
问题:

Spring阀瓣通量反应式阀瓣座防止连接关闭

富涛
2023-03-14

我正在为我的应用程序开发简单的聊天模块,使用Spring WebFlux,后端为ReactiveMongoRepository,前面为Angular 4。我能够通过WebSocket会话接收数据,但从数据库流式传输所有消息后,我想保持连接,以便我可以更新消息列表。谁能告诉我如何实现这一目标,或者也许我在遵循错误的假设?

Java后端负责WebSocket,我的订户只记录当前状态,与此无关:

WebFluxConfiguration:

@Configuration
@EnableWebFlux
public class WebSocketConfig {

private final WebSocketHandler webSocketHandler;

@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) {
    this.webSocketHandler = webSocketHandler;
}

@Bean
@Primary
public HandlerMapping webSocketMapping() {
    Map<String, Object> map = new HashMap<>();
    map.put("/websocket-messages", webSocketHandler);

    SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    mapping.setOrder(10);
    mapping.setUrlMap(map);
    return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
    return new WebSocketHandlerAdapter();
}


}

WebSocketHandler实现

@Component
public class MessageWebSocketHandler implements WebSocketHandler {

private final MessageRepository messageRepository;
private ObjectMapper mapper = new ObjectMapper();
private MessageSubscriber subscriber = new MessageSubscriber();

@Autowired
public MessageWebSocketHandler(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
}

@Override
    public Mono<Void> handle(WebSocketSession session) {
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::toMessage)
            .subscribe(subscriber::onNext, subscriber:: onError, subscriber::onComplete);
    return session.send(
            messageRepository.findAll()
                    .map(this::toJSON)
                    .map(session::textMessage));
}

private String toJSON(Message message) {
    try {
        return mapper.writeValueAsString(message);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

private Message toMessage(String json) {
    try {
        return mapper.readValue(json, Message.class);
    } catch (IOException e) {
        throw new RuntimeException("Invalid JSON:" + json, e);
    }
}
}

和蒙哥雷布

@Repository
public interface MessageRepository extends 
ReactiveMongoRepository<Message,String> {
}

网页前端工程师处理:

@Injectable()
export class WebSocketService {
  private subject: Rx.Subject<MessageEvent>;

  constructor() {
  }

  public connect(url): Rx.Subject<MessageEvent> {
    if (!this.subject) {
      this.subject = this.create(url);
      console.log('Successfully connected: ' + url);
    }
    return this.subject;
  }

  private create(url): Rx.Subject<MessageEvent> {
    const ws = new WebSocket(url);
    const observable = Rx.Observable.create(
      (obs: Rx.Observer<MessageEvent>) => {
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
      });
    const observer = {
      next: (data: Object) => {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(JSON.stringify(data));
        }
      }
    };
    return Rx.Subject.create(observer, observable);
  }
}

在其他服务中,我将从响应映射到我的类型

  constructor(private wsService: WebSocketService) {
    this.messages = <Subject<MessageEntity>>this.wsService
      .connect('ws://localhost:8081/websocket-messages')
      .map((response: MessageEvent): MessageEntity => {
        const data = JSON.parse(response.data);
        return new MessageEntity(data.id, data.user_id, data.username, data.message, data.links);
      });
  }

最后,由于连接关闭,我无法使用send函数订阅:

  ngOnInit() {
    this.messages = [];
    this._ws_subscription = this.chatService.messages.subscribe(
      (message: MessageEntity) => {
        this.messages.push(message);
      },
      error2 => {
        console.log(error2.json());
      },
      () => {
        console.log('Closed');
      }
    );
  }

  sendTestMessage() {
    this.chatService.messages.next(new MessageEntity(null, '59ca30ac87e77d0f38237739', 'mickl', 'test message angular', null));
  }

共有1个答案

刘和正
2023-03-14

假设您的聊天消息在接收时被持久化到数据存储中,您可以使用SpringDataMongoDBreactive中的可定制游标功能(请参阅参考文档)。

因此,您可以在存储库中创建一个新方法,例如:

public interface MessageRepository extends ReactiveSortingRepository< Message, String> {

    @Tailable
    Flux<Message> findWithTailableCursor();
}

请注意,可定制游标有一些限制:您需要对mongo集合进行封顶,并按插入顺序对条目进行流式处理。

SpringWebFlux websocket支持还不支持STOMP或消息代理,但对于这样的用例,这可能是一个更好的选择。

 类似资料:
  • 当上面的”阀门“打开时,输出下面的数值 用法 Your browser does not support the video tag. 案例:数字标签 功能:显示通过的数字。 工作原理 上面的输入接收为YES/NO;下面的输入接收为数值。如果上面的输入是YES,那么下面的输入将被发送到节点的输出;否则,节点将输出NO。

  • 描述 Makeblock电磁阀DC 12V / 0520E拥有迷你机身,广泛用于工业设备和DIY项目。 规格 额定电压:DC 12V 载入:Air 电流(有负载):小于240mA 模式:两个位置,三向 总大小:34 x 21mm 最大压力:超过300mmHg 绝缘等级:A 尺寸图(mm)

  • 概述 MAKEBLOCK电磁阀DC12V/0520E是一种微型阀体,广泛应用于工业装置和DIY工程中。 参数 电压:DC12V 模式:两位三通 绝缘等级:A 尺寸图纸

  • 概述 Makeblock两位三通电磁阀是一个单作用气动执行器。二位三通电磁阀为双线圈控制,一个线圈瞬间通电后关闭电源、阀打开,另一个线圈瞬间通电后关闭电源、阀关闭。 参数 电压:DC12V 尺寸图纸

  • 如何在骆驼的所有路线上添加油门 那可以得到不止一个Restresource_path,我怎么能在我的所有主要路线中插入油门。我知道我可以在.from(“直接:转移路由”)和.from(“直接:帐户路由”)中的每个路由的开始之后插入,但我想一般地插入我的所有资源。我可以在骆驼身上做到这一点,或者也许使用Spring更安全?

  • RxJava中是否有一个操作员、一个外部库或一种我不知道的方法来创建一个可流动/可观察的,接收控制数据排放的函数,比如阀门? 我有一个巨大的json文件需要处理,但我必须得到文件的一部分,一个实体列表,处理它,然后得到另一部分,我尝试使用windows(),buffer(),但我传递给Flowable的双函数。generate()在收到第一个列表但尚未完成处理后继续执行。我也试过流动变形金刚。我是