java请求并行方案_让 Yar Java Client 支持执行并行请求,ExecutorService 的使用

汪阿苏
2023-12-01

官方 php 客户端文档如下Yar_Concurrent_Client {

/* 属性 */

static $_callstack ;

static $_callback ;

static $_error_callback ;

/* 方法 */

public static int call ( string $uri , string $method , array $parameters [, callable $callback ] )

public static boolean loop ([ callable $callback [, callable $error_callback ]] )

public static boolean reset ( void )

}

也就是说call方法实际实在注册并行的服务调用,loop是统一发送,reset是清空调用任务集。下面我也需要实现上面类似的功能。

首先学习下ExecutorService 的使用import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

/**

* Created by zhoumengkang on 16/12/15.

*/

public class YarConcurrentClient {

private static ExecutorService executorService;

static{

poolInit();

}

private static void poolInit(){

executorService = Executors.newCachedThreadPool();

}

public void call() throws ExecutionException, InterruptedException {

List> result =new ArrayList>();

for (int i = 0; i 

Future future = executorService.submit(new YarClientCallable(i));

result.add(future);

}

for(Future future:result){

System.out.println("返回值:"+ future.get());

}

}

public class YarClientCallable implements Callable {

private int seq;

public YarClientCallable(int seq) {

this.seq = seq;

}

public String call() throws Exception {

System.out.println(Thread.currentThread().getName());

Thread.sleep(3000);

System.out.println("Weak up" + seq);

return "完成" + seq;

}

}

}

测试下call方法import junit.framework.TestCase;

/**

* Created by zhoumengkang on 16/12/15.

*/

public class YarConcurrentClientTest extends TestCase {

public void testName() throws Exception {

new YarConcurrentClient().call();

new YarConcurrentClient().call();

new YarConcurrentClient().call();

}

}

测试结果为pool-1-thread-1

pool-1-thread-2

pool-1-thread-3

pool-1-thread-4

Weak up1

Weak up3

Weak up2

Weak up0

返回值:完成0

返回值:完成1

返回值:完成2

返回值:完成3

pool-1-thread-1

pool-1-thread-2

pool-1-thread-3

pool-1-thread-4

Weak up1

Weak up2

Weak up3

Weak up0

返回值:完成0

返回值:完成1

返回值:完成2

返回值:完成3

pool-1-thread-1

pool-1-thread-4

pool-1-thread-3

pool-1-thread-2

Weak up0

返回值:完成0

Weak up3

Weak up2

Weak up1

返回值:完成1

返回值:完成2

返回值:完成3

从Thread.currentThread().getName()可知,只生成了一个线程池,并且该池里的4个线程也被被重复利用了。

YarConcurrentClient 雏形package yar.concurrent.client;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import yar.YarConfig;

import yar.protocol.YarRequest;

import yar.protocol.YarResponse;

import yar.transport.YarTransport;

import yar.transport.YarTransportFactory;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

/**

* Created by zhoumengkang on 2/12/15.

*/

public class YarConcurrentClient {

protected final static Logger logger = LoggerFactory.getLogger(YarConcurrentClient.class);

private static ExecutorService executorService;

private static List yarConcurrentTasks;

static{

init();

}

private static void init(){

yarConcurrentTasks = new ArrayList();

executorService = Executors.newCachedThreadPool();

}

public static void call(YarConcurrentTask yarConcurrentTask) {

yarConcurrentTasks.add(yarConcurrentTask);

}

public static void loop() {

List> result =new ArrayList>();

try{

for (YarConcurrentTask task : yarConcurrentTasks){

Future future = executorService.submit(new YarClientCallable(task));

result.add(future);

}

}catch(Exception e){

}

for(Future future:result){

try {

logger.info("返回值"+future.get().toString());

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

}

public static void reset(){

yarConcurrentTasks = null;

yarConcurrentTasks = new ArrayList();

}

public static class YarClientCallable implements Callable {

private YarConcurrentTask yarConcurrentTask;

public YarClientCallable(YarConcurrentTask yarConcurrentTask) {

this.yarConcurrentTask = yarConcurrentTask;

}

public Object call() throws Exception {

logger.debug("开始处理任务" + yarConcurrentTask.getId());

YarResponse yarResponse = null;

YarRequest yarRequest = new YarRequest();

yarRequest.setId(yarConcurrentTask.getId());

yarRequest.setMethod(yarConcurrentTask.getMethod());

yarRequest.setParameters(yarConcurrentTask.getParams());

yarRequest.setPackagerName(YarConfig.getString("yar.packager"));

YarTransport yarTransport = YarTransportFactory.get(YarConfig.getString("yar.transport"));

yarTransport.open("http://10.211.55.4/yar/server/RewardScoreService.class.php");

try {

yarResponse = yarTransport.exec(yarRequest);

} catch (IOException e) {

e.printStackTrace();

}

assert yarResponse != null;

return yarResponse.getRetVal();

}

}

}

 类似资料: