官方 php 客户端文档如下Yar_Concurrent_Client {
/* 属性 */
static $_callstack ;
static $_callback ;
static $_error_callback ;
/* 方法 */
public static int call ( string $uri , string $method , array $parameters [, callable $callback ] )
public static boolean loop ([ callable $callback [, callable $error_callback ]] )
public static boolean reset ( void )
}
也就是说call方法实际实在注册并行的服务调用,loop是统一发送,reset是清空调用任务集。下面我也需要实现上面类似的功能。
首先学习下ExecutorService 的使用import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* Created by zhoumengkang on 16/12/15.
*/
public class YarConcurrentClient {
private static ExecutorService executorService;
static{
poolInit();
}
private static void poolInit(){
executorService = Executors.newCachedThreadPool();
}
public void call() throws ExecutionException, InterruptedException {
List> result =new ArrayList>();
for (int i = 0; i
Future future = executorService.submit(new YarClientCallable(i));
result.add(future);
}
for(Future future:result){
System.out.println("返回值:"+ future.get());
}
}
public class YarClientCallable implements Callable {
private int seq;
public YarClientCallable(int seq) {
this.seq = seq;
}
public String call() throws Exception {
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
System.out.println("Weak up" + seq);
return "完成" + seq;
}
}
}
测试下call方法import junit.framework.TestCase;
/**
* Created by zhoumengkang on 16/12/15.
*/
public class YarConcurrentClientTest extends TestCase {
public void testName() throws Exception {
new YarConcurrentClient().call();
new YarConcurrentClient().call();
new YarConcurrentClient().call();
}
}
测试结果为pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
Weak up1
Weak up3
Weak up2
Weak up0
返回值:完成0
返回值:完成1
返回值:完成2
返回值:完成3
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
Weak up1
Weak up2
Weak up3
Weak up0
返回值:完成0
返回值:完成1
返回值:完成2
返回值:完成3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
Weak up0
返回值:完成0
Weak up3
Weak up2
Weak up1
返回值:完成1
返回值:完成2
返回值:完成3
从Thread.currentThread().getName()可知,只生成了一个线程池,并且该池里的4个线程也被被重复利用了。
YarConcurrentClient 雏形package yar.concurrent.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import yar.YarConfig;
import yar.protocol.YarRequest;
import yar.protocol.YarResponse;
import yar.transport.YarTransport;
import yar.transport.YarTransportFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* Created by zhoumengkang on 2/12/15.
*/
public class YarConcurrentClient {
protected final static Logger logger = LoggerFactory.getLogger(YarConcurrentClient.class);
private static ExecutorService executorService;
private static List yarConcurrentTasks;
static{
init();
}
private static void init(){
yarConcurrentTasks = new ArrayList();
executorService = Executors.newCachedThreadPool();
}
public static void call(YarConcurrentTask yarConcurrentTask) {
yarConcurrentTasks.add(yarConcurrentTask);
}
public static void loop() {
List> result =new ArrayList>();
try{
for (YarConcurrentTask task : yarConcurrentTasks){
Future future = executorService.submit(new YarClientCallable(task));
result.add(future);
}
}catch(Exception e){
}
for(Future future:result){
try {
logger.info("返回值"+future.get().toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static void reset(){
yarConcurrentTasks = null;
yarConcurrentTasks = new ArrayList();
}
public static class YarClientCallable implements Callable {
private YarConcurrentTask yarConcurrentTask;
public YarClientCallable(YarConcurrentTask yarConcurrentTask) {
this.yarConcurrentTask = yarConcurrentTask;
}
public Object call() throws Exception {
logger.debug("开始处理任务" + yarConcurrentTask.getId());
YarResponse yarResponse = null;
YarRequest yarRequest = new YarRequest();
yarRequest.setId(yarConcurrentTask.getId());
yarRequest.setMethod(yarConcurrentTask.getMethod());
yarRequest.setParameters(yarConcurrentTask.getParams());
yarRequest.setPackagerName(YarConfig.getString("yar.packager"));
YarTransport yarTransport = YarTransportFactory.get(YarConfig.getString("yar.transport"));
yarTransport.open("http://10.211.55.4/yar/server/RewardScoreService.class.php");
try {
yarResponse = yarTransport.exec(yarRequest);
} catch (IOException e) {
e.printStackTrace();
}
assert yarResponse != null;
return yarResponse.getRetVal();
}
}
}