使用 httpclient 提供的流式 jar 包:
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
</dependency>
public class FluentHttpClient {
final static PoolingHttpClientConnectionManager CONNMGR;
final static HttpClient CLIENT;
final static Executor executor;
static {
LayeredConnectionSocketFactory ssl = null;
// 注释中有bug
//there also seems to have been a regression between 4.5.10 and 4.5.11 which was fixed in 4.5.12
// Certificate for <域名> doesn't match any of the subject alternative names问题的解决
//try {
// ssl = SSLConnectionSocketFactory.getSystemSocketFactory();
//} catch (final SSLInitializationException ex) {
//4.5.12之前用下面
try {
ssl = new SSLConnectionSocketFactory(SSLContexts.custom()
.loadTrustMaterial(null,new TrustSelfSignedStrategy()).build(),
NoopHostnameVerifier.INSTANCE);
} catch (final SSLInitializationException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) {
final SSLContext sslcontext;
try {
sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
sslcontext.init(null, null, null);
ssl = new SSLConnectionSocketFactory(sslcontext);
} catch (final Exception e) {
throw new BizException(e.getMessage());
}
}
final Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", ssl != null ? ssl : SSLConnectionSocketFactory.getSocketFactory())
.build();
CONNMGR = new PoolingHttpClientConnectionManager(sfr);
CONNMGR.setDefaultMaxPerRoute(FluentHttpClient.getPoolMaxPerRoute());
CONNMGR.setMaxTotal(FluentHttpClient.getPoolMaxTotal());
//校验连接
CONNMGR.setValidateAfterInactivity(FluentHttpClient.getValidateAfterInactivity());
RequestConfig.Builder configBuilder = RequestConfig.custom();
// 设置连接超时
configBuilder.setConnectTimeout(FluentHttpClient.getConnectTimeout());
// 设置读取超时
configBuilder.setSocketTimeout(FluentHttpClient.getSocketTimeout());
// 设置从连接池获取连接实例的超时
configBuilder.setConnectionRequestTimeout(FluentHttpClient.getConnectTimeout());
CLIENT = HttpClientBuilder.create()
.setDefaultRequestConfig(configBuilder.build())
.setConnectionManager(CONNMGR)
.build();
executor = Executor.newInstance(CLIENT);
}
/**
* Request body = Request.Post(uri)
* .socketTimeout(timeoutMills)
* .connectTimeout(timeoutMills)
* .body()
*
* @param request
* @return
* @throws IOException
*/
public static Response execute(Request request) throws IOException {
return executor.execute(request);
}
/**
* GET 请求 构造 URI
*
* @param url
* @param params
* @return null if error !!
*/
public static URI buildURI(String url, Map<String, String> params) {
return buildURI(url, params, false, null);
}
/**
* GET 请求构造 ENCODED URI
*
* @param url
* @param params
* @return null if error !!
*/
public static URI buildEncodeURI(String url, Map<String, String> params) {
return buildURI(url, params, true, "UTF-8");
}
public static URI buildURI(String url, Map<String, String> params, boolean encoded, String charset) {
try {
URIBuilder uriBuilder = null;
if (encoded) {
uriBuilder = new URIBuilder(url, Charset.forName(charset));
} else {
uriBuilder = new URIBuilder(url);
}
if (params != null) {
Set<Map.Entry<String, String>> entrySet = params.entrySet();
for (Map.Entry<String, String> entry : entrySet) {
uriBuilder.setParameter(entry.getKey(), entry.getValue());
}
}
return uriBuilder.build();
} catch (URISyntaxException e) {
log.error("#### buildURI error: {}-{}-{}", url, params, e);
return null;
}
}
/**
* POST encoded entity
*
* @param params
* @return
*/
public static UrlEncodedFormEntity buildFormEntity(Map<String, String> params) {
return new UrlEncodedFormEntity(paramToPair(params), StandardCharsets.UTF_8);
}
public static List<NameValuePair> paramToPair(Map<String, String> params) {
List<NameValuePair> result = Collections.emptyList();
if (MapUtil.isNotEmpty(params)) {
result = new ArrayList<>(params.size());
for (Map.Entry<String, String> entry : params.entrySet()) {
String value = entry.getValue();
if (value != null) {
String key = entry.getKey();
result.add(new BasicNameValuePair(key, value));
}
}
}
return result;
}
}
全局配置
public class FluentGlobalConfig {
/**
* 超时全局 毫秒
*/
protected static int connectTimeout = 6000;
protected static int socketTimeout = 6000;
/**
* 连接池全局信息
*/
protected static int poolMaxTotal = 200;
protected static int poolMaxPerRoute = 100;
protected static int validateAfterInactivity = 1000;
public static int getConnectTimeout() {
return connectTimeout;
}
/**
* 设置连接时间
*
* @param timeout
*/
public static void setConnectTimeout(int timeout) {
connectTimeout = timeout;
}
public static int getSocketTimeout() {
return socketTimeout;
}
/**
* 设置 socket 时间
*
* @param timeout
*/
public static void setSocketTimeout(int timeout) {
socketTimeout = timeout;
}
public static int getPoolMaxTotal() {
return poolMaxTotal;
}
/**
* 设置连接池
*
* @param poolMaxTotal
*/
public static void setPoolMaxTotal(int poolMaxTotal) {
FluentGlobalConfig.poolMaxTotal = poolMaxTotal;
}
public static int getPoolMaxPerRoute() {
return poolMaxPerRoute;
}
/**
* 设置并发
*
* @param poolMaxPerRoute
*/
public static void setPoolMaxPerRoute(int poolMaxPerRoute) {
FluentGlobalConfig.poolMaxPerRoute = poolMaxPerRoute;
}
public static int getValidateAfterInactivity() {
return validateAfterInactivity;
}
public static void setValidateAfterInactivity(int validateAfterInactivity) {
FluentGlobalConfig.validateAfterInactivity = validateAfterInactivity;
}
}
测试用例
public class FluentTest {
static {
FluentGlobalConfig.setSocketTimeout(20000);
}
@Test
public void get() throws IOException {
Map<String,String> params = new HashMap<>();
params.put("ts","21212");
params.put("sig","heelo");
Request get = Request.Get(FluentHttpClient.buildURI("http://localhost:8080/test/get/form",params));
String s = FluentHttpClient.execute(get).returnContent().asString();
System.out.println(s);
}
@Test
public void postForm() throws IOException {
Request request =
Request.Post("http://localhost:8080/test/post/form")
.bodyForm(Form.form().add("sig","232")
.add("ts","34333")
.add("unifiedString","hello").build());
String s = FluentHttpClient.execute(request).returnContent().asString();
System.out.println(s);
}
@Test
public void postJson() throws IOException {
QueryDTO query = new QueryDTO();
query.setSig("cccddd");
query.setUnifiedString("hello me 1223");
Request request = Request.Post("http://localhost:8080/test/post/json").bodyString(JSON.toJSONString(query),
ContentType.APPLICATION_JSON);
String s = FluentHttpClient.execute(request).returnContent().asString();
System.out.println(s);
}
}
测试 API:
@Slf4j
@RestController
public class TestController {
@GetMapping("/test/get/form")
public Mono<QueryDTO> getForm(QueryDTO query){
log.info("request: {}", JSON.toJSONString(query));
QueryDTO dto = new QueryDTO();
dto.setSig("aaa");
return Mono.just(dto);
}
@PostMapping("/test/post/form")
public Mono<QueryDTO> postForm(QueryDTO query){
log.info("request: {}", JSON.toJSONString(query));
QueryDTO dto = new QueryDTO();
dto.setSig("aaa");
return Mono.just(dto);
}
@PostMapping("/test/post/json")
public Mono<QueryDTO> postJson(@RequestBody QueryDTO query){
log.info("request: {}", JSON.toJSONString(query));
QueryDTO dto = new QueryDTO();
dto.setSig("bbb");
return Mono.just(dto);
}
}
测试结果:
request: {"sig":"heelo","ts":21212}
request: {"sig":"232","ts":34333,"unifiedString":"hello"}
request: {"sig":"cccddd","unifiedString":"hello me 1223"}