下面分析get请求逻辑
//根据key获取相应的值,并使用解码器进行解码
public Object get(String key) {
return get(key, transcoder);
}
//异步获取future,并通过get方法设置超时时间,获取结果
public <T> T get(String key, Transcoder<T> tc) {
try {
return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Exception waiting for value", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value: "
+ buildTimeoutMessage(operationTimeout, TimeUnit.MILLISECONDS), e);
}
}
public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key,
executorService);
//读取到END/n/r的时候先后调用receivedStatus,complete
//获取到数据后调用gotData
//执行顺序是gotData,receivedStatus,complete
Operation op = opFact.get(key, new GetOperation.Callback() {
private Future<T> val;
@Override
public void receivedStatus(OperationStatus status) {
rv.set(val, status);
}
@Override
public void gotData(String k, int flags, byte[] data) {
assert key.equals(k) : "Wrong key returned";
val =
tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
}
@Override
public void complete() {
latch.countDown();
rv.signalComplete();
}
});
rv.setOperation(op);
mconn.enqueueOperation(key, op); ==》
return rv;
}
protected void addOperation(final String key, final Operation o) {
MemcachedNode placeIn = null;
//根据key路由,默认负载算法是根据key hash取模
MemcachedNode primary = locator.getPrimary(key);
//节点可用或者失败处理模式为重试机制
if (primary.isActive() || failureMode == FailureMode.Retry) {
placeIn = primary;
} else if (failureMode == FailureMode.Cancel) {
//节点不可用且重试模式为立即停止
o.cancel();
} else {
//如果primary不可用,且FailureMode为Redistribute则选择一个可以节点处理
Iterator<MemcachedNode> i = locator.getSequence(key);
while (placeIn == null && i.hasNext()) {
MemcachedNode n = i.next();
if (n.isActive()) {
placeIn = n;
}
}
if (placeIn == null) {
placeIn = primary;
this.getLogger().warn("Could not redistribute to another node, "
+ "retrying primary node for %s.", key);
}
}
assert o.isCancelled() || placeIn != null : "No node found for key " + key;
if (placeIn != null) {
//将op添加到该节点的inputQ中,同时也把op放入到addedQueue中
addOperation(placeIn, o);
} else {
assert o.isCancelled() : "No node found for " + key + " (and not "
+ "immediately cancelled)";
}
}
调用完asyncGet后获取到GetFuture,然后再调用GetFuture的get方法:
可以看出首先根据rc获取到Furure对象,然后通过Future get方法获取最终结果
public T get(long duration, TimeUnit units) throws InterruptedException,
TimeoutException, ExecutionException {
Future<T> v = rv.get(duration, units);
return v == null ? null : v.get();
}
再看rv.get(duration, units)方法:
public T get(long duration, TimeUnit units) throws InterruptedException,
TimeoutException, ExecutionException {
//阻塞,等待GetOperation.Callback()调用complete方法
if (!latch.await(duration, units)) {
// whenever timeout occurs, continuous timeout counter will increase by 1.
//记录某个节点操作超时次数+1
MemcachedConnection.opTimedOut(op);
if (op != null) { // op can be null on a flush
//标识超时
op.timeOut();
}
throw new CheckedOperationTimeoutException(
"Timed out waiting for operation", op);
} else {
// continuous timeout counter will be reset
//重置某个节点操作超时次数为0
MemcachedConnection.opSucceeded(op);
}
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (isCancelled()) {
throw new ExecutionException(new CancellationException("Cancelled"));
}
if (op != null && op.isTimedOut()) {
throw new ExecutionException(new CheckedOperationTimeoutException(
"Operation timed out.", op));
}
/* TODO: re-add assertion that op.getState() == OperationState.COMPLETE */
//获取GetOperation中Callbac中gotData方法中
//tcService.decode(tc, new CachedData(flags, data,tc.getMaxSize()))生成的Future
return objRef.get();
}