序
本文主要研究下spring cloud gateway的streaming-media-types属性
配置
配置说明
{
"sourceType": "org.springframework.cloud.gateway.config.GatewayProperties",
"name": "spring.cloud.gateway.streaming-media-types",
"type": "java.util.List<org.springframework.http.MediaType>"
}
GatewayProperties
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayProperties.java
@ConfigurationProperties("spring.cloud.gateway")
@Validated
public class GatewayProperties {
/**
* List of Routes
*/
@NotNull
@Valid
private List<RouteDefinition> routes = new ArrayList<>();
/**
* List of filter definitions that are applied to every route.
*/
private List<FilterDefinition> defaultFilters = new ArrayList<>();
private List<MediaType> streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM,
MediaType.APPLICATION_STREAM_JSON);
public List<RouteDefinition> getRoutes() {
return routes;
}
public void setRoutes(List<RouteDefinition> routes) {
this.routes = routes;
}
public List<FilterDefinition> getDefaultFilters() {
return defaultFilters;
}
public void setDefaultFilters(List<FilterDefinition> defaultFilters) {
this.defaultFilters = defaultFilters;
}
public List<MediaType> getStreamingMediaTypes() {
return streamingMediaTypes;
}
public void setStreamingMediaTypes(List<MediaType> streamingMediaTypes) {
this.streamingMediaTypes = streamingMediaTypes;
}
@Override
public String toString() {
return "GatewayProperties{" +
"routes=" + routes +
", defaultFilters=" + defaultFilters +
", streamingMediaTypes=" + streamingMediaTypes +
'}';
}
}
可以看到默认是MediaType.TEXT_EVENT_STREAM(text/event-stream
)、MediaType.APPLICATION_STREAM_JSON(application/stream+json
)
使用
GatewayAutoConfiguration
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java
@Configuration
@ConditionalOnClass(HttpClient.class)
protected static class NettyConfiguration {
@Bean
@ConditionalOnMissingBean
public HttpClient httpClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
return HttpClient.create(options);
}
//......
@Bean
public HttpClientProperties httpClientProperties() {
return new HttpClientProperties();
}
@Bean
public NettyRoutingFilter routingFilter(HttpClient httpClient,
ObjectProvider<List<HttpHeadersFilter>> headersFilters) {
return new NettyRoutingFilter(httpClient, headersFilters);
}
@Bean
public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) {
return new NettyWriteResponseFilter(properties.getStreamingMediaTypes());
}
@Bean
public ReactorNettyWebSocketClient reactorNettyWebSocketClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
return new ReactorNettyWebSocketClient(options);
}
}
这里的NettyWriteResponseFilter使用到了properties.getStreamingMediaTypes()
NettyWriteResponseFilter
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.javac
public class NettyWriteResponseFilter implements GlobalFilter, Ordered {
private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
private final List<MediaType> streamingMediaTypes;
public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {
this.streamingMediaTypes = streamingMediaTypes;
}
@Override
public int getOrder() {
return WRITE_RESPONSE_FILTER_ORDER;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
// until the WebHandler is run
return chain.filter(exchange).then(Mono.defer(() -> {
HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
if (clientResponse == null) {
return Mono.empty();
}
log.trace("NettyWriteResponseFilter start");
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
//TODO: what if it's not netty
final Flux<NettyDataBuffer> body = clientResponse.receive()
.retain() //TODO: needed?
.map(factory::wrap);
MediaType contentType = response.getHeaders().getContentType();
return (isStreamingMediaType(contentType) ?
response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
}));
}
//TODO: use framework if possible
//TODO: port to WebClientWriteResponseFilter
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
return (contentType != null && this.streamingMediaTypes.stream()
.anyMatch(contentType::isCompatibleWith));
}
}
可以看到这里根据isStreamingMediaType方法判断是否是stream类型,如果是则采用writeAndFlushWith方法,不是则采用writeWith方法
ReactiveHttpOutputMessage
spring-web-5.0.6.RELEASE-sources.jar!/org/springframework/http/ReactiveHttpOutputMessage.java
/**
* A "reactive" HTTP output message that accepts output as a {@link Publisher}.
*
* <p>Typically implemented by an HTTP request on the client-side or an
* HTTP response on the server-side.
*
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @since 5.0
*/
public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Return a {@link DataBufferFactory} that can be used to create the body.
* @return a buffer factory
* @see #writeWith(Publisher)
*/
DataBufferFactory bufferFactory();
/**
* Register an action to apply just before the HttpOutputMessage is committed.
* <p><strong>Note:</strong> the supplied action must be properly deferred,
* e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's
* executed in the right order, relative to other actions.
* @param action the action to apply
*/
void beforeCommit(Supplier<? extends Mono<Void>> action);
/**
* Whether the HttpOutputMessage is committed.
*/
boolean isCommitted();
/**
* Use the given {@link Publisher} to write the body of the message to the
* underlying HTTP layer.
* @param body the body content publisher
* @return a {@link Mono} that indicates completion or error
*/
Mono<Void> writeWith(Publisher<? extends DataBuffer> body);
/**
* Use the given {@link Publisher} of {@code Publishers} to write the body
* of the HttpOutputMessage to the underlying HTTP layer, flushing after
* each {@code Publisher<DataBuffer>}.
* @param body the body content publisher
* @return a {@link Mono} that indicates completion or error
*/
Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body);
/**
* Indicate that message handling is complete, allowing for any cleanup or
* end-of-processing tasks to be performed such as applying header changes
* made via {@link #getHeaders()} to the underlying HTTP message (if not
* applied already).
* <p>This method should be automatically invoked at the end of message
* processing so typically applications should not have to invoke it.
* If invoked multiple times it should have no side effects.
* @return a {@link Mono} that indicates completion or error
*/
Mono<Void> setComplete();
}
从接口的注释可以看到,writeWith与writeAndFlushWith的参数泛型不同,一个是Publisher<? extends DataBuffer>,一个是Publisher<? extends Publisher<? extends DataBuffer>>。而writeAndFlushWith则是在每个Publisher<DataBuffer>写入之后就flush。
小结
NettyWriteResponseFilter根据spring.cloud.gateway.streaming-media-types配置的类型来判断是writeAndFlushWith还是writeWith,如果是指定类型则选择用writeAndFlushWith写入response。默认该配置指定了MediaType.TEXT_EVENT_STREAM(text/event-stream
)、MediaType.APPLICATION_STREAM_JSON(application/stream+json
)这两种类型。