RxJava与Spring MVC

优质
小牛编辑
147浏览
2023-12-01

Spring Cloud Netflix包括RxJava

RxJava是Reactive Extensions的Java VM实现:用于通过使用observable序列来构建异步和基于事件的程序的库。

Spring Cloud Netflix支持从Spring MVC控制器返回rx.Single对象。它还支持对服务器发送事件(SSE)使用rx.Observable对象。如果您的内部API已经使用RxJava构建(参见Feign Hystrix支持示例),这可能非常方便。

以下是使用rx.Single的一些示例:

@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
	return Single.just("single value");
}
@RequestMapping(method = RequestMethod.GET, value = "/singleWithResponse")
public ResponseEntity<Single<String>> singleWithResponse() {
	return new ResponseEntity<>(Single.just("single value"),
			HttpStatus.NOT_FOUND);
}
@RequestMapping(method = RequestMethod.GET, value = "/singleCreatedWithResponse")
public Single<ResponseEntity<String>> singleOuterWithResponse() {
	return Single.just(new ResponseEntity<>("single value", HttpStatus.CREATED));
}
@RequestMapping(method = RequestMethod.GET, value = "/throw")
public Single<Object> error() {
	return Single.error(new RuntimeException("Unexpected"));
}

如果您有Observable而不是单个,则可以使用.toSingle().toList().toSingle()。这里有些例子:

@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
  return Observable.just("single value").toSingle();
}
@RequestMapping(method = RequestMethod.GET, value = "/multiple")
public Single<List<String>> multiple() {
  return Observable.just("multiple", "values").toList().toSingle();
}
@RequestMapping(method = RequestMethod.GET, value = "/responseWithObservable")
public ResponseEntity<Single<String>> responseWithObservable() {
  Observable<String> observable = Observable.just("single value");
  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(APPLICATION_JSON_UTF8);
  return new ResponseEntity<>(observable.toSingle(), headers, HttpStatus.CREATED);
}
@RequestMapping(method = RequestMethod.GET, value = "/timeout")
public Observable<String> timeout() {
  return Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() {
    @Override
    public String call(Long aLong) {
      return "single value";
    }
  });
}

如果您有流式端点和客户端,SSE可以是一个选项。要将rx.Observable转换为Spring SseEmitter使用RxResponse.sse()。这里有些例子:

@RequestMapping(method = RequestMethod.GET, value = "/sse")
public SseEmitter single() {
	return RxResponse.sse(Observable.just("single value"));
}
@RequestMapping(method = RequestMethod.GET, value = "/messages")
public SseEmitter messages() {
	return RxResponse.sse(Observable.just("message 1", "message 2", "message 3"));
}
@RequestMapping(method = RequestMethod.GET, value = "/events")
public SseEmitter event() {
	return RxResponse.sse(APPLICATION_JSON_UTF8,
			Observable.just(new EventDto("Spring io", getDate(2016, 5, 19)),
					new EventDto("SpringOnePlatform", getDate(2016, 8, 1))));
}