接着上一节,我们来先实现阻塞RPC的使用。
上一节我们生成了Message.java,其中包含RpcService和ReplyService类,其中BlockingInterface为阻塞接口,Interface为非阻塞接口。下面我们来实现一下这两个接口。
RpcService阻塞接口实现,用于RPC的调用。
BlockRpcService.java
package cn.slimsmart.protoc.demo.rpc.rpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.slimsmart.protoc.demo.rpc.Message.Msg;
import cn.slimsmart.protoc.demo.rpc.Message.ReplyService;
import cn.slimsmart.protoc.demo.rpc.Message.Request;
import cn.slimsmart.protoc.demo.rpc.Message.Response;
import cn.slimsmart.protoc.demo.rpc.Message.RpcService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.googlecode.protobuf.pro.duplex.ClientRpcController;
import com.googlecode.protobuf.pro.duplex.RpcClientChannel;
import com.googlecode.protobuf.pro.duplex.execute.ServerRpcController;
/**
* 阻塞接口实现
*/
public class BlockRpcService implements RpcService.BlockingInterface{
private Logger log = LoggerFactory.getLogger(getClass());
@Override
public Response call(RpcController controller, Request request) throws ServiceException {
if ( controller.isCanceled() ) {
return null;
}
log.info("接收到数据:");
log.info("serviceName : "+request.getServiceName());
log.info("methodName : "+request.getMethodName());
log.info("params : "+request.getParams());
RpcClientChannel channel = ServerRpcController.getRpcChannel(controller);
ReplyService.BlockingInterface clientService = ReplyService.newBlockingStub(channel);
ClientRpcController clientController = channel.newRpcController();
clientController.setTimeoutMs(3000);
//调用过程反馈消息
Msg msg = Msg.newBuilder().setContent("success.").build();
clientService.call(clientController, msg);
Response response = Response.newBuilder().setCode(0).setMsg("处理完成").setData("server hello").build();
return response;
}
}
ReplyService阻塞接口,用于调用过程中消息反馈。
BlockReplyService.java
package cn.slimsmart.protoc.demo.rpc.rpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.slimsmart.protoc.demo.rpc.Message.Msg;
import cn.slimsmart.protoc.demo.rpc.Message.ReplyService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* 阻塞反馈服务实现
*/
public class BlockReplyService implements ReplyService.BlockingInterface{
private Logger log = LoggerFactory.getLogger(getClass());
@Override
public Msg call(RpcController controller, Msg request) throws ServiceException {
log.debug("接收反馈消息:"+request.getContent());
if ( controller.isCanceled() ) {
return null;
}
return Msg.newBuilder().setContent("收到反馈成功.").build();
}
}
服务端注册RPC服务,并等待监听。
BlockServer.java
package cn.slimsmart.protoc.demo.rpc.rpc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.List;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.slimsmart.protoc.demo.rpc.Message;
import cn.slimsmart.protoc.demo.rpc.Message.RpcService;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ExtensionRegistry;
import com.googlecode.protobuf.pro.duplex.CleanShutdownHandler;
import com.googlecode.protobuf.pro.duplex.PeerInfo;
import com.googlecode.protobuf.pro.duplex.RpcClientChannel;
import com.googlecode.protobuf.pro.duplex.RpcConnectionEventNotifier;
import com.googlecode.protobuf.pro.duplex.execute.RpcServerCallExecutor;
import com.googlecode.protobuf.pro.duplex.execute.ThreadPoolCallExecutor;
import com.googlecode.protobuf.pro.duplex.listener.RpcConnectionEventListener;
import com.googlecode.protobuf.pro.duplex.logging.CategoryPerServiceLogger;
import com.googlecode.protobuf.pro.duplex.server.DuplexTcpServerPipelineFactory;
import com.googlecode.protobuf.pro.duplex.util.RenamingThreadFactoryProxy;
public class BlockServer {
private static Logger log = LoggerFactory.getLogger(BlockServer.class);
public static void main(String[] args) {
PeerInfo serverInfo = new PeerInfo("127.0.0.1", 12345);
// RPC payloads are uncompressed when logged - so reduce logging
CategoryPerServiceLogger logger = new CategoryPerServiceLogger();
logger.setLogRequestProto(false);
logger.setLogResponseProto(false);
// Configure the server.
DuplexTcpServerPipelineFactory serverFactory = new DuplexTcpServerPipelineFactory(serverInfo);
//扩展
ExtensionRegistry r = ExtensionRegistry.newInstance();
Message.registerAllExtensions(r);
serverFactory.setExtensionRegistry(r);
RpcServerCallExecutor rpcExecutor = new ThreadPoolCallExecutor(10, 10);
serverFactory.setRpcServerCallExecutor(rpcExecutor);
serverFactory.setLogger(logger);
// setup a RPC event listener - it just logs what happens
RpcConnectionEventNotifier rpcEventNotifier = new RpcConnectionEventNotifier();
RpcConnectionEventListener listener = new RpcConnectionEventListener() {
@Override
public void connectionReestablished(RpcClientChannel clientChannel) {
log.info("connectionReestablished " + clientChannel);
}
@Override
public void connectionOpened(RpcClientChannel clientChannel) {
log.info("connectionOpened " + clientChannel);
}
@Override
public void connectionLost(RpcClientChannel clientChannel) {
log.info("connectionLost " + clientChannel);
}
@Override
public void connectionChanged(RpcClientChannel clientChannel) {
log.info("connectionChanged " + clientChannel);
}
};
rpcEventNotifier.setEventListener(listener);
serverFactory.registerConnectionEventListener(rpcEventNotifier);
//注册服务 阻塞RPC服务
BlockingService blockingService = RpcService.newReflectiveBlockingService(new BlockRpcService());
serverFactory.getRpcServiceRegistry().registerService(true, blockingService);
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(2,new RenamingThreadFactoryProxy("boss", Executors.defaultThreadFactory()));
EventLoopGroup workers = new NioEventLoopGroup(2,new RenamingThreadFactoryProxy("worker", Executors.defaultThreadFactory()));
bootstrap.group(boss,workers);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.option(ChannelOption.SO_RCVBUF, 1048576);
bootstrap.childOption(ChannelOption.SO_RCVBUF, 1048576);
bootstrap.childOption(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childHandler(serverFactory);
bootstrap.localAddress(serverInfo.getPort());
CleanShutdownHandler shutdownHandler = new CleanShutdownHandler();
shutdownHandler.addResource(boss);
shutdownHandler.addResource(workers);
shutdownHandler.addResource(rpcExecutor);
// Bind and start to accept incoming connections.
bootstrap.bind();
log.info("Serving " + bootstrap);
while ( true ) {
List<RpcClientChannel> clients = serverFactory.getRpcClientRegistry().getAllClients();
log.info("Number of clients="+ clients.size());
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
客户端注册反馈服务,并调用服务端RPC
BlockClient.java
package cn.slimsmart.protoc.demo.rpc.rpc;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.slimsmart.protoc.demo.rpc.Message;
import cn.slimsmart.protoc.demo.rpc.Message.Params;
import cn.slimsmart.protoc.demo.rpc.Message.ReplyService;
import cn.slimsmart.protoc.demo.rpc.Message.Request;
import cn.slimsmart.protoc.demo.rpc.Message.RpcService;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ExtensionRegistry;
import com.googlecode.protobuf.pro.duplex.CleanShutdownHandler;
import com.googlecode.protobuf.pro.duplex.ClientRpcController;
import com.googlecode.protobuf.pro.duplex.PeerInfo;
import com.googlecode.protobuf.pro.duplex.RpcClientChannel;
import com.googlecode.protobuf.pro.duplex.RpcConnectionEventNotifier;
import com.googlecode.protobuf.pro.duplex.client.DuplexTcpClientPipelineFactory;
import com.googlecode.protobuf.pro.duplex.client.RpcClientConnectionWatchdog;
import com.googlecode.protobuf.pro.duplex.execute.RpcServerCallExecutor;
import com.googlecode.protobuf.pro.duplex.execute.ThreadPoolCallExecutor;
import com.googlecode.protobuf.pro.duplex.listener.RpcConnectionEventListener;
import com.googlecode.protobuf.pro.duplex.logging.CategoryPerServiceLogger;
import com.googlecode.protobuf.pro.duplex.util.RenamingThreadFactoryProxy;
public class BlockClient {
private static RpcClientChannel channel = null;
private static Logger log = LoggerFactory.getLogger(BlockClient.class);
public static void main(String[] args) throws Exception {
PeerInfo client = new PeerInfo("127.0.0.1", 54321);
PeerInfo server = new PeerInfo("127.0.0.1", 12345);
DuplexTcpClientPipelineFactory clientFactory = new DuplexTcpClientPipelineFactory();
// force the use of a local port
// - normally you don't need this
clientFactory.setClientInfo(client);
ExtensionRegistry r = ExtensionRegistry.newInstance();
Message.registerAllExtensions(r);
clientFactory.setExtensionRegistry(r);
clientFactory.setConnectResponseTimeoutMillis(10000);
RpcServerCallExecutor rpcExecutor = new ThreadPoolCallExecutor(3, 10);
clientFactory.setRpcServerCallExecutor(rpcExecutor);
// RPC payloads are uncompressed when logged - so reduce logging
CategoryPerServiceLogger logger = new CategoryPerServiceLogger();
logger.setLogRequestProto(false);
logger.setLogResponseProto(false);
clientFactory.setRpcLogger(logger);
// Set up the event pipeline factory.
// setup a RPC event listener - it just logs what happens
RpcConnectionEventNotifier rpcEventNotifier = new RpcConnectionEventNotifier();
final RpcConnectionEventListener listener = new RpcConnectionEventListener() {
@Override
public void connectionReestablished(RpcClientChannel clientChannel) {
log.info("connectionReestablished " + clientChannel);
channel = clientChannel;
}
@Override
public void connectionOpened(RpcClientChannel clientChannel) {
log.info("connectionOpened " + clientChannel);
channel = clientChannel;
}
@Override
public void connectionLost(RpcClientChannel clientChannel) {
log.info("connectionLost " + clientChannel);
}
@Override
public void connectionChanged(RpcClientChannel clientChannel) {
log.info("connectionChanged " + clientChannel);
channel = clientChannel;
}
};
rpcEventNotifier.addEventListener(listener);
clientFactory.registerConnectionEventListener(rpcEventNotifier);
//注册服务 reply阻塞服务,用于反馈
BlockingService blockingReplyService = ReplyService.newReflectiveBlockingService(new BlockReplyService());
clientFactory.getRpcServiceRegistry().registerService(blockingReplyService);
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workers = new NioEventLoopGroup(16, new RenamingThreadFactoryProxy("workers", Executors.defaultThreadFactory()));
bootstrap.group(workers);
bootstrap.handler(clientFactory);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.option(ChannelOption.SO_RCVBUF, 1048576);
RpcClientConnectionWatchdog watchdog = new RpcClientConnectionWatchdog(clientFactory, bootstrap);
rpcEventNotifier.addEventListener(watchdog);
watchdog.start();
CleanShutdownHandler shutdownHandler = new CleanShutdownHandler();
shutdownHandler.addResource(workers);
shutdownHandler.addResource(rpcExecutor);
clientFactory.peerWith(server, bootstrap);
while (true && channel != null) {
RpcService.BlockingInterface blockingService = RpcService.newBlockingStub(channel);
final ClientRpcController controller = channel.newRpcController();
controller.setTimeoutMs(0);
Params params = Params.newBuilder().setKey("name").setValue("jack").build();
Request request = Request.newBuilder().setServiceName("UserService").setMethodName("insert").setParams(params).build();
//阻塞调用
blockingService.call(controller, request);
Thread.sleep(100000);
}
}
}
运行服务端和客户端测试一下吧。
至于非阻塞很简单,主要使用异步回调RpcCallback实现,使用异步接口注册服务类,如下:
//注册服务非 阻塞RPC服务
Service nbService = RpcService.newReflectiveService(new NonBlockRpcService());
serverFactory.getRpcServiceRegistry().registerService(true, nbService);
这里我就不贴代码了,详情下面实例代码查看。