http://ju.outofmemory.cn/entry/70913
http://www.jdon.com/concurrent/spray-akka.html
如何使用Java建立像Node.js那样非堵塞异步事件并发IO服务器呢?Spray是基于NIO2高并发框架,虽然Tomcat 8也是基于NIO2,但是Spary的线程数要低得到,降低CPU上下文切换的负载;Akka和其Mysql库包都是相同线程执行上下文( execution context),因为在非堵塞前提下,性能拼的就不是线程数目越多越好,正好相反,线程数目越低,越接近理论理论最佳点。
为了启动Spary和Akka,需要一个main启动函数,在main函数中,我们创建一个Actor系统:
ActorSystem system = ActorSystem.create("system");
ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");
listener是使用Actor用来处理Http请求。然后要设定监听Http的端口:
InetSocketAddress endpoint = new InetSocketAddress(3000);
int backlog = 100;
List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
Option<ServerSettings> settings = scala.Option.empty();
最后,绑定Actor到监听的Http端口:
Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
整个main函数主要代码如下:
public static final ActorSystem system = ActorSystem.create("system");
public static void main(String[] args) {
...
ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");
InetSocketAddress endpoint = new InetSocketAddress(3000);
int backlog = 100;
List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
Option<ServerSettings> settings = scala.Option.empty();
ServerSSLEngineProvider sslEngineProvider = null;
Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
...
}
设置好启动函数以后,下面是真正开始在Actor里处理进来Http请求了。
首先,我们因为使用的是原生Java代码,不是Scala,因此需要将Scala集成到Java中,可能比较丑陋,可以用专门类包装一下,引入Scala的Http协议:
HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();
Http Actor为了响应Http请求做三件事,第一件是创建一个路由router,这样能够根据请求URL:http://xxx/path中不同的/path分别处理:
Router router = partitionAndCreateRouter();
第二件是处理新的连接,告诉Spray这个actor不仅接受Http连接,也处理实际http连接:
}).match(Tcp.Connected.class, r ->{
sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self()); //tell that connection will be handled here!
})
第三件事就是处理实际的Http连接,将http请求委托给另外一个actor处理。
.match(HttpRequest.class, r -> {
int id = Constants.ID.getAndIncrement();
String path = String.valueOf(r.uri().path());
if("/sell".equals(path)){
... //逻辑处理
}else if("/buy".equals(path)){
... //逻辑处理
}else{
handleUnexpected(r);
}
})
整个HttpActor代码如下,业务逻辑以买卖为模型:
private static class HttpActor extends AbstractActor {
private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();
public HttpActor() {
final Router router = partitionAndCreateRouter();
receive(ReceiveBuilder
.match(HttpRequest.class, r -> {
int id = Constants.ID.getAndIncrement();
String path = String.valueOf(r.uri().path());
if("/sell".equals(path)){
String productId = r.uri().query().get("productId").get();
...
SalesOrder so = new SalesOrder(price, productId, quantity, id);
so.setSeller(new Seller(who));
router.route(so, self());
replyOK(id);
}else if("/buy".equals(path)){
...
}else{
handleUnexpected(r);
}
}).match(Tcp.Connected.class, r ->{
sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self()); //tell that connection will be handled here!
}).build());
}
该案例使用的驱动包有: Akka, Spray, and this Mysql async driver., 整个源码下载:GitHub