RPC框架protobuf-rpc-pro 阻塞和非阻塞实例

孟永望
2023-12-01

接着上一节,我们来先实现阻塞RPC的使用。

上一节我们生成了Message.java,其中包含RpcService和ReplyService类,其中BlockingInterface为阻塞接口,Interface为非阻塞接口。下面我们来实现一下这两个接口。

RpcService阻塞接口实现,用于RPC的调用。

BlockRpcService.java

package cn.slimsmart.protoc.demo.rpc.rpc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.slimsmart.protoc.demo.rpc.Message.Msg;
import cn.slimsmart.protoc.demo.rpc.Message.ReplyService;
import cn.slimsmart.protoc.demo.rpc.Message.Request;
import cn.slimsmart.protoc.demo.rpc.Message.Response;
import cn.slimsmart.protoc.demo.rpc.Message.RpcService;

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.googlecode.protobuf.pro.duplex.ClientRpcController;
import com.googlecode.protobuf.pro.duplex.RpcClientChannel;
import com.googlecode.protobuf.pro.duplex.execute.ServerRpcController;

/**
 * 阻塞接口实现
 */
public  class BlockRpcService implements RpcService.BlockingInterface{
	
	private Logger log = LoggerFactory.getLogger(getClass());

	@Override
	public Response call(RpcController controller, Request request) throws ServiceException {
		if ( controller.isCanceled() ) {
			return null;
		}
		log.info("接收到数据:");
		log.info("serviceName : "+request.getServiceName());
		log.info("methodName : "+request.getMethodName());
		log.info("params : "+request.getParams());
		
		RpcClientChannel channel = ServerRpcController.getRpcChannel(controller);
		ReplyService.BlockingInterface clientService = ReplyService.newBlockingStub(channel);
		ClientRpcController clientController = channel.newRpcController();
		clientController.setTimeoutMs(3000);
		//调用过程反馈消息
		Msg msg = Msg.newBuilder().setContent("success.").build();
		clientService.call(clientController, msg);
		Response response = Response.newBuilder().setCode(0).setMsg("处理完成").setData("server hello").build();
		return response;
	}
	
}

ReplyService阻塞接口,用于调用过程中消息反馈。

BlockReplyService.java

package cn.slimsmart.protoc.demo.rpc.rpc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.slimsmart.protoc.demo.rpc.Message.Msg;
import cn.slimsmart.protoc.demo.rpc.Message.ReplyService;

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;

/**
 * 阻塞反馈服务实现
 */
public class BlockReplyService implements ReplyService.BlockingInterface{
	
	private Logger log = LoggerFactory.getLogger(getClass());

	@Override
	public Msg call(RpcController controller, Msg request) throws ServiceException {
		log.debug("接收反馈消息:"+request.getContent());
		if ( controller.isCanceled() ) {
			return null;
		}
		return Msg.newBuilder().setContent("收到反馈成功.").build();
	}
}
服务端注册RPC服务,并等待监听。

BlockServer.java

package cn.slimsmart.protoc.demo.rpc.rpc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.util.List;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.slimsmart.protoc.demo.rpc.Message;
import cn.slimsmart.protoc.demo.rpc.Message.RpcService;

import com.google.protobuf.BlockingService;
import com.google.protobuf.ExtensionRegistry;
import com.googlecode.protobuf.pro.duplex.CleanShutdownHandler;
import com.googlecode.protobuf.pro.duplex.PeerInfo;
import com.googlecode.protobuf.pro.duplex.RpcClientChannel;
import com.googlecode.protobuf.pro.duplex.RpcConnectionEventNotifier;
import com.googlecode.protobuf.pro.duplex.execute.RpcServerCallExecutor;
import com.googlecode.protobuf.pro.duplex.execute.ThreadPoolCallExecutor;
import com.googlecode.protobuf.pro.duplex.listener.RpcConnectionEventListener;
import com.googlecode.protobuf.pro.duplex.logging.CategoryPerServiceLogger;
import com.googlecode.protobuf.pro.duplex.server.DuplexTcpServerPipelineFactory;
import com.googlecode.protobuf.pro.duplex.util.RenamingThreadFactoryProxy;

public class BlockServer {
	private static Logger log = LoggerFactory.getLogger(BlockServer.class);

	public static void main(String[] args) {
		PeerInfo serverInfo = new PeerInfo("127.0.0.1", 12345);

		// RPC payloads are uncompressed when logged - so reduce logging
		CategoryPerServiceLogger logger = new CategoryPerServiceLogger();
		logger.setLogRequestProto(false);
		logger.setLogResponseProto(false);

		// Configure the server.
		DuplexTcpServerPipelineFactory serverFactory = new DuplexTcpServerPipelineFactory(serverInfo);

		//扩展
		ExtensionRegistry r = ExtensionRegistry.newInstance();
		Message.registerAllExtensions(r);
		serverFactory.setExtensionRegistry(r);

		RpcServerCallExecutor rpcExecutor = new ThreadPoolCallExecutor(10, 10);
		serverFactory.setRpcServerCallExecutor(rpcExecutor);
		serverFactory.setLogger(logger);

		// setup a RPC event listener - it just logs what happens
		RpcConnectionEventNotifier rpcEventNotifier = new RpcConnectionEventNotifier();
		
		RpcConnectionEventListener listener = new RpcConnectionEventListener() {
			@Override
			public void connectionReestablished(RpcClientChannel clientChannel) {
				log.info("connectionReestablished " + clientChannel);
			}

			@Override
			public void connectionOpened(RpcClientChannel clientChannel) {
				log.info("connectionOpened " + clientChannel);
			}

			@Override
			public void connectionLost(RpcClientChannel clientChannel) {
				log.info("connectionLost " + clientChannel);
			}

			@Override
			public void connectionChanged(RpcClientChannel clientChannel) {
				log.info("connectionChanged " + clientChannel);
			}
		};
		rpcEventNotifier.setEventListener(listener);
		serverFactory.registerConnectionEventListener(rpcEventNotifier);
		
       //注册服务 阻塞RPC服务
  		BlockingService blockingService = RpcService.newReflectiveBlockingService(new BlockRpcService());
  	    serverFactory.getRpcServiceRegistry().registerService(true, blockingService);
      	    
	    ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup(2,new RenamingThreadFactoryProxy("boss", Executors.defaultThreadFactory()));
        EventLoopGroup workers = new NioEventLoopGroup(2,new RenamingThreadFactoryProxy("worker", Executors.defaultThreadFactory()));
        bootstrap.group(boss,workers);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
        bootstrap.option(ChannelOption.SO_RCVBUF, 1048576);
        bootstrap.childOption(ChannelOption.SO_RCVBUF, 1048576);
        bootstrap.childOption(ChannelOption.SO_SNDBUF, 1048576);
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        bootstrap.childHandler(serverFactory);
        bootstrap.localAddress(serverInfo.getPort());

		CleanShutdownHandler shutdownHandler = new CleanShutdownHandler();
        shutdownHandler.addResource(boss);
        shutdownHandler.addResource(workers);
        shutdownHandler.addResource(rpcExecutor);
        
    	// Bind and start to accept incoming connections.
        bootstrap.bind();
        log.info("Serving " + bootstrap);
        while ( true ) {
            List<RpcClientChannel> clients = serverFactory.getRpcClientRegistry().getAllClients();
            log.info("Number of clients="+ clients.size());
        	try {
				Thread.sleep(50000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
        }
	}
}
客户端注册反馈服务,并调用服务端RPC

BlockClient.java

package cn.slimsmart.protoc.demo.rpc.rpc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.slimsmart.protoc.demo.rpc.Message;
import cn.slimsmart.protoc.demo.rpc.Message.Params;
import cn.slimsmart.protoc.demo.rpc.Message.ReplyService;
import cn.slimsmart.protoc.demo.rpc.Message.Request;
import cn.slimsmart.protoc.demo.rpc.Message.RpcService;

import com.google.protobuf.BlockingService;
import com.google.protobuf.ExtensionRegistry;
import com.googlecode.protobuf.pro.duplex.CleanShutdownHandler;
import com.googlecode.protobuf.pro.duplex.ClientRpcController;
import com.googlecode.protobuf.pro.duplex.PeerInfo;
import com.googlecode.protobuf.pro.duplex.RpcClientChannel;
import com.googlecode.protobuf.pro.duplex.RpcConnectionEventNotifier;
import com.googlecode.protobuf.pro.duplex.client.DuplexTcpClientPipelineFactory;
import com.googlecode.protobuf.pro.duplex.client.RpcClientConnectionWatchdog;
import com.googlecode.protobuf.pro.duplex.execute.RpcServerCallExecutor;
import com.googlecode.protobuf.pro.duplex.execute.ThreadPoolCallExecutor;
import com.googlecode.protobuf.pro.duplex.listener.RpcConnectionEventListener;
import com.googlecode.protobuf.pro.duplex.logging.CategoryPerServiceLogger;
import com.googlecode.protobuf.pro.duplex.util.RenamingThreadFactoryProxy;

public class BlockClient {
	private static RpcClientChannel channel = null;

	private static Logger log = LoggerFactory.getLogger(BlockClient.class);

	public static void main(String[] args) throws Exception {
		PeerInfo client = new PeerInfo("127.0.0.1", 54321);
		PeerInfo server = new PeerInfo("127.0.0.1", 12345);

		DuplexTcpClientPipelineFactory clientFactory = new DuplexTcpClientPipelineFactory();
		// force the use of a local port
		// - normally you don't need this
		clientFactory.setClientInfo(client);

		ExtensionRegistry r = ExtensionRegistry.newInstance();
		Message.registerAllExtensions(r);
		clientFactory.setExtensionRegistry(r);

		clientFactory.setConnectResponseTimeoutMillis(10000);
		RpcServerCallExecutor rpcExecutor = new ThreadPoolCallExecutor(3, 10);
		clientFactory.setRpcServerCallExecutor(rpcExecutor);

		// RPC payloads are uncompressed when logged - so reduce logging
		CategoryPerServiceLogger logger = new CategoryPerServiceLogger();
		logger.setLogRequestProto(false);
		logger.setLogResponseProto(false);
		clientFactory.setRpcLogger(logger);

		// Set up the event pipeline factory.
		// setup a RPC event listener - it just logs what happens
		RpcConnectionEventNotifier rpcEventNotifier = new RpcConnectionEventNotifier();

		final RpcConnectionEventListener listener = new RpcConnectionEventListener() {

			@Override
			public void connectionReestablished(RpcClientChannel clientChannel) {
				log.info("connectionReestablished " + clientChannel);
				channel = clientChannel;
			}

			@Override
			public void connectionOpened(RpcClientChannel clientChannel) {
				log.info("connectionOpened " + clientChannel);
				channel = clientChannel;
			}

			@Override
			public void connectionLost(RpcClientChannel clientChannel) {
				log.info("connectionLost " + clientChannel);
			}

			@Override
			public void connectionChanged(RpcClientChannel clientChannel) {
				log.info("connectionChanged " + clientChannel);
				channel = clientChannel;
			}
		};
		rpcEventNotifier.addEventListener(listener);
		clientFactory.registerConnectionEventListener(rpcEventNotifier);
		//注册服务 reply阻塞服务,用于反馈
		BlockingService blockingReplyService = ReplyService.newReflectiveBlockingService(new BlockReplyService());
		clientFactory.getRpcServiceRegistry().registerService(blockingReplyService);	

		Bootstrap bootstrap = new Bootstrap();
		EventLoopGroup workers = new NioEventLoopGroup(16, new RenamingThreadFactoryProxy("workers", Executors.defaultThreadFactory()));

		bootstrap.group(workers);
		bootstrap.handler(clientFactory);
		bootstrap.channel(NioSocketChannel.class);
		bootstrap.option(ChannelOption.TCP_NODELAY, true);
		bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
		bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
		bootstrap.option(ChannelOption.SO_RCVBUF, 1048576);

		RpcClientConnectionWatchdog watchdog = new RpcClientConnectionWatchdog(clientFactory, bootstrap);
		rpcEventNotifier.addEventListener(watchdog);
		watchdog.start();

		CleanShutdownHandler shutdownHandler = new CleanShutdownHandler();
		shutdownHandler.addResource(workers);
		shutdownHandler.addResource(rpcExecutor);

		clientFactory.peerWith(server, bootstrap);

		while (true && channel != null) {
			RpcService.BlockingInterface blockingService = RpcService.newBlockingStub(channel);
			final ClientRpcController controller = channel.newRpcController();
			controller.setTimeoutMs(0);

			Params params = Params.newBuilder().setKey("name").setValue("jack").build();
			Request request = Request.newBuilder().setServiceName("UserService").setMethodName("insert").setParams(params).build();
			//阻塞调用
			blockingService.call(controller, request);
			Thread.sleep(100000);
		}
	}

}
运行服务端和客户端测试一下吧。

至于非阻塞很简单,主要使用异步回调RpcCallback实现,使用异步接口注册服务类,如下:

//注册服务非 阻塞RPC服务
	    Service nbService = RpcService.newReflectiveService(new NonBlockRpcService());
        serverFactory.getRpcServiceRegistry().registerService(true, nbService);
这里我就不贴代码了,详情下面实例代码查看。

实例代码:http://download.csdn.net/detail/tianwei7518/8476361

 类似资料: