Play 2.6 Body Parser

厍书
2023-12-01

Body parsers

一个http请求包含了一个请求头和一个请求体,头部信息比较短,可以安全的保存在内存中,在Play中头部信息使用RequestHeader类进行建模。请求体的内容可能较大,使用流stream的形式进行建模,Play提供了一个BodyParser抽象用于将流中的信息转换为内存对象。

由于Play是一个异步框架,对流的处理不实用Java中的InputStream,因为该方法在读取时会阻塞整个线程直到数据读取完毕。Play中使用异步的Akka Stream进行处理,Akka Stream是Reactive Streams的一个实现。

使用内建的body parsers

默认解析器

在不指定解析器的情况下,play会根据请求头的Content-Type字段来相应的选择解析器,application/json会被解析为JsonNodeapplication/x-www-form-urlencoded会被解析为Map<String, String[]>

请求体可以通过Request.body()方法获取,该方法返回一个RequestBody对象,并提供了便捷的方法获取各种类型的内容:

public Result index() {
    JsonNode json = request().body().asJson();
    return ok("Got name: " + json.get("name").asText());
}

以下是一些请求内容与获取方法的映射:
- text/plain: String, accessible via asText()
- application/json: com.fasterxml.jackson.databind.JsonNode, accessible via asJson()
- application/xml, text/xml or application/XXX+xml: org.w3c.Document, accessible via asXml()
- application/x-www-form-urlencoded: Map

指定解析器

可以通过@BodyParser.Of注解来指定解析器

@BodyParser.Of(BodyParser.Text.class)
public Result index() {
    RequestBody body = request().body();
    return ok("Got text: " + body.asText());
}

解析器的类型
- Default: The default body parser.
- AnyContent: Like the default body parser, but will parse bodies of GET, HEAD and DELETE requests.
- Json: Parses the body as JSON.
- TolerantJson: Like Json, but does not validate that the Content-Type header is JSON.
- Xml: Parses the body as XML.
- TolerantXml: Like Xml, but does not validate that the Content-Type header is XML.
- Text: Parses the body as a String.
- TolerantText: Like Text, but does not validate that the Content-Type is text/plain.
- Bytes: Parses the body as a ByteString.
- Raw: Parses the body as a RawBuffer. This will attempt to store the body in memory, up to Play’s configured memory buffer size, but fallback to writing it out to a File if that’s exceeded.
- FormUrlEncoded: Parses the body as a form.
- MultipartFormData: Parses the body as a multipart form, storing file parts to files.
- Empty: Does not parse the body, rather it ignores it

长度限制

内建的解析器在内存和硬盘中缓冲请求体,如果缓冲区的长度没有限制,可能会带来隐藏的隐患。在application.conf中的play.http.parser.maxMemoryBufferplay.http.parser.maxDiskBuffer可以设置缓冲区大小,默认大小分别为256K和10M。

自定义解析器

可以通过实现BodyParser类来实现自己的解析器,该接口有一个抽象方法

public abstract Accumulator<ByteString, F.Either<Result, A>> apply(RequestHeader request);

Java8中接口可以有默认方法。

该方法的参数为一个请求头RequestHeader,通过该参数可以检查相关信息,主要是Content-type

返回类型为Accumulator,An accumulator is a thin layer around an Akka Streams Sink(相当于数据使用方)。accumulator异步计算流中的数据并保存到result中,accumulator可以通过akka Stream Source(数据提供方)运行,并返回一个CompletionStage对象,该对象会在计算结束时返回结果。Accumulator与Sink<E, CompletionStage<A>>并无本质上的区别,实际上就像是一个包装类型,最大的不同是Accumulator提供了诸如mapmapFuturerecover这些方便的接口。

apply方法返回的Accumulator包含了一个ByteString元素,该对象类似一个byte数组,但是不可变,并且大多数操作都可以在常量时间内完成。

accumulator返回的类型为F.Either<Result, A>,返回类型result往往用于一个错误,例如Content-type与解析器不匹配或者内存溢出。当解析器返回一个reuslt是,会缩短action的处理流程,结果会直接返回而不会调用acion方法。

组合已有的解析器
package models;


import akka.util.ByteString;
import com.fasterxml.jackson.databind.JsonNode;
import play.libs.F;
import play.libs.streams.Accumulator;
import play.mvc.BodyParser;
import play.mvc.Http;
import play.mvc.Result;
import play.mvc.Results;

import javax.inject.Inject;
import java.util.concurrent.Executor;

public class Person {

    public Long id;

    public String name;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public static class PersonBodyParser implements BodyParser<Person>{

        private BodyParser.Json jsonParser;
        private Executor executor;

        @Inject
        public PersonBodyParser(Json jsonParser, Executor executor) {
            this.jsonParser = jsonParser;
            this.executor = executor;
        }

        @Override
        public Accumulator<ByteString, F.Either<Result, Person>> apply(Http.RequestHeader requestHeader) {
            Accumulator<ByteString,F.Either<Result,JsonNode>> jsonAccumulator=jsonParser.apply(requestHeader);
            return jsonAccumulator.map(resultOrJson ->{
               if (resultOrJson.left.isPresent()){
                   return F.Either.Left(resultOrJson.left.get());
               }else {
                   JsonNode json = resultOrJson.right.get();
                   try {
                       Person person =play.libs.Json.fromJson(json,Person.class);
                       return F.Either.Right(person);
                   }catch (Exception e){
                       return F.Either.Left(Results.badRequest("Unable to read Person from json: "+e.getMessage()))
                   }
               }
            },executor);
        }
    }

}

返回的结果会被封装在RequestBody中,可以通过as方法获取

    @BodyParser.Of(Person.PersonBodyParser.class)
    public Result save() {
        Http.RequestBody body = request().body();
        Person user = body.as(Person.class);

        return ok("Got: " + user.name);
    }
自定义的请求体长度限制
// Accept only 10KB of data.
public static class Text10Kb extends BodyParser.Text {
    @Inject
    public Text10Kb(HttpErrorHandler errorHandler) {
        super(10 * 1024, errorHandler);
    }
}

@BodyParser.Of(Text10Kb.class)
public Result index() {
    return ok("Got body: " + request().body().asText());
}
转发请求
public static class ForwardingBodyParser implements BodyParser<WSResponse> {
    private WSClient ws;
    private Executor executor;

    @Inject
    public ForwardingBodyParser(WSClient ws, Executor executor) {
        this.ws = ws;
        this.executor = executor;
    }

    String url = "http://example.com";

    public Accumulator<ByteString, F.Either<Result, WSResponse>> apply(RequestHeader request) {
        Accumulator<ByteString, Source<ByteString, ?>> forwarder = Accumulator.source();

        return forwarder.mapFuture(source -> {
            // TODO: when streaming upload has been implemented, pass the source as the body
            return ws.url(url)
                    .setMethod("POST")
                        // .setBody(source)
                    .execute().thenApply(F.Either::Right);
        }, executor);
    }
}
通过akka Streams定制解析器

对akka Stream的讨论超出本文的内容,最好去参考akka的相关文档,https://doc.akka.io/docs/akka/2.5/stream/index.html?language=java

以下例子是一个CSV文件的解析器,该实例基于
https://doc.akka.io/docs/akka/2.5/stream/stream-cookbook.html?language=java#Parsing_lines_from_a_stream_of_ByteStrings

package parser;

import akka.stream.javadsl.*;
import akka.util.ByteString;
import play.libs.F;
import play.libs.streams.Accumulator;
import play.mvc.BodyParser;
import play.mvc.Http;
import play.mvc.Result;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

public class CsvBodyParser implements BodyParser<List<List<String>>> {
    private Executor executor;

    @Inject
    public CsvBodyParser(Executor executor) {
        this.executor = executor;
    }

    @Override
    public Accumulator<ByteString, F.Either<Result, List<List<String>>>> apply(Http.RequestHeader requestHeader) {
        Sink<ByteString,CompletionStage<List<List<String>>>> sink= Flow.<ByteString>create()
                .via(Framing.delimiter(ByteString.fromString("\n"),1000, FramingTruncation.ALLOW))
                .map(bytes ->{
                    String[] values=bytes.utf8String().trim().split(",");
                    return Arrays.asList(values);
                })
                .toMat(Sink.<List<List<String>>,List<String>>fold(
                        new ArrayList<>(),(list,values) ->{
                            list.add(values);
                            return list;
                        }
                ), Keep.right());
        return Accumulator.fromSink(sink).map(F.Either::Right,executor);
    }
}
 类似资料:

相关阅读

相关文章

相关问答