继续续上上一回的文章, 这回看到了NettyRemotingClient这个类,只看它的代码,不会分析到netty是怎么干活的,以后深入学一波netty框架。这个类主要就是创建与worker通信的client类,这么看worker是一个server端。
先粘它的代码上来再说,
public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/**
* client bootstrap
*/
private final Bootstrap bootstrap = new Bootstrap();
/**
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
/**
* channels
*/
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* worker group
*/
private final EventLoopGroup workerGroup;
/**
* client config
*/
private final NettyClientConfig clientConfig;
/**
* saync semaphore
*/
private final Semaphore asyncSemaphore = new Semaphore(200, true);
/**
* callback thread executor
*/
private final ExecutorService callbackExecutor;
/**
* client handler
*/
private final NettyClientHandler clientHandler;
/**
* response future executor
*/
private final ScheduledExecutorService responseFutureExecutor;
/**
* client init
*
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
if (NettyUtils.useEpoll()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
} else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
}
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
this.start();
}
/**
* start
*/
private void start() {
this.bootstrap
.group(this.workerGroup)
.channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyDecoder(),
clientHandler,
encoder);
}
});
this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ResponseFuture.scanFutureTable();
}
}, 5000, 1000, TimeUnit.MILLISECONDS);
//
isStarted.compareAndSet(false, true);
}
/**
* async send
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
*/
public void sendAsync(final Host host, final Command command,
final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException("network error");
}
/**
* request unique identification
*/
final long opaque = command.getOpaque();
/**
* control concurrency number
*/
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
/**
* response future
*/
final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis,
invokeCallback,
releaseSemaphore);
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
try {
responseFuture.executeInvokeCallback();
} catch (Throwable ex) {
logger.error("execute callback error", ex);
} finally {
responseFuture.release();
}
}
});
} catch (Throwable ex) {
responseFuture.release();
throw new RemotingException(String.format("send command to host: %s failed", host), ex);
}
} else {
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message);
}
}
/**
* sync send
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
* @throws InterruptedException
* @throws RemotingException
*/
public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
final long opaque = command.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to host {} failed", command, host);
}
});
/**
* sync wait for result
*/
Command result = responseFuture.waitResponse();
if (result == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
}
}
return result;
}
/**
* send task
*
* @param host host
* @param command command
* @throws RemotingException
*/
public void send(final Host host, final Command command) throws RemotingException {
Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
try {
ChannelFuture future = channel.writeAndFlush(command).await();
if (future.isSuccess()) {
logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
} else {
String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
logger.error(msg, future.cause());
throw new RemotingException(msg);
}
} catch (Exception e) {
logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
}
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}
/**
* get channel
*
* @param host
* @return
*/
public Channel getChannel(Host host) {
Channel channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
}
return createChannel(host, true);
}
/**
* create channel
*
* @param host host
* @param isSync sync flag
* @return channel
*/
public Channel createChannel(Host host, boolean isSync) {
ChannelFuture future;
try {
synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
}
if (isSync) {
future.sync();
}
if (future.isSuccess()) {
Channel channel = future.channel();
channels.put(host, channel);
return channel;
}
} catch (Exception ex) {
logger.warn(String.format("connect to %s error", host), ex);
}
return null;
}
/**
* close
*/
public void close() {
if (isStarted.compareAndSet(true, false)) {
try {
closeChannels();
if (workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
if (callbackExecutor != null) {
this.callbackExecutor.shutdownNow();
}
if (this.responseFutureExecutor != null) {
this.responseFutureExecutor.shutdownNow();
}
} catch (Exception ex) {
logger.error("netty client close exception", ex);
}
logger.info("netty client closed");
}
}
/**
* close channels
*/
private void closeChannels() {
for (Channel channel : this.channels.values()) {
channel.close();
}
this.channels.clear();
}
/**
* close channel
*
* @param host host
*/
public void closeChannel(Host host) {
Channel channel = this.channels.remove(host);
if (channel != null) {
channel.close();
}
}
}
上篇追到了调用当前类的send方法,然后send方法通过Channel channel = getChannel(host);去获取channel,如果之前保存有cahnnel连接就直接用,没有就根据host ip地址创建一个与worker的连接。ChannelFuture future = channel.writeAndFlush(command).await();然后将经过编码的command信息发送给worker端,这个方法会触发netty的ChannelInbounHandler的channelRead方法,特指netty server端的handler。
继续看一下该handler类下怎么处理信息的。
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* netty remote server
*/
private final NettyRemotingServer nettyRemotingServer;
/**
* server processors queue
*/
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
。。。。。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command) msg);
}
/**
* process received logic
*
* @param channel channel
* @param msg message
*/
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = new Runnable() {
@Override
public void run() {
try {
pair.getLeft().process(channel, msg);
} catch (Throwable ex) {
logger.error("process msg {} error", msg, ex);
}
}
};
try {
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("commandType {} not support", commandType);
}
}
}
其实它就是调用了processReceived方法,然后获取command的comand类型然后到map里获取对应的类型处理器进行处理当前command。对应的类型处理器都保存在了 ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>>对象内。在loggerserver跟masterserver启动的时候,对应的command类型都会被注册进去了,当然还有其他的command类型。
//注册的方法是NettyRemotingServer类下这个这个
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if (executorRef == null) {
executorRef = nettyRemotingServer.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
}
//在masterserver启动的时候对应的command类型被注册是如下的代码块
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
//在LoggerServer下面被注册的代码块是如下:
this.serverConfig = new NettyServerConfig();
this.serverConfig.setListenPort(Constants.RPC_PORT);
this.server = new NettyRemotingServer(serverConfig);
this.requestProcessor = new LoggerRequestProcessor();
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor())
岔开了,继续跟下去它是怎么处理的,它就是根据command类型拿到左边的对应的信息处理器,并构建实现runnable接口的匿名子类对象。然后有获取map右边对应的线程池,接着将该runnable任务交给该线程池执行。
由于任务发送的command类型是CommandType.TASK_EXECUTE_REQUEST类型,对应的类型就是注册如下的处理器this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
我们看一下worker端的它的process方法,到底干了啥。
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
logger.info("收到任务执行命令 received command : {}", taskRequestCommand);
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setLogPath(getTaskLogPath(taskExecutionContext));
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {} ", execLocalPath);
try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
this.doAck(taskExecutionContext);
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
}
就是将从netty客户端发过来的command对象进行反序化,构建任务执行上下文,同时也是在当前的服务器节点创建创建执行当前任务的目录,并且将当前任务Id与来自客户端的channel 进行保存到 taskCallbackService下。最后另起一个线程处理当前任务。