消费者会定期向GroupCoordinator发送HeartbeatRequest来确定 彼此在线,也就是说告诉GroupCoordinator我还活着,或者也判断GrooupCoordinator是否还活着
HeartbeatRequest的组成:它是由groupId,generationId,memberId.
HeartbeatResponse组成:它只有一个errorCode
HeartbeatThread是专门处理Heartbeat的一个线程类
源码分析:
一 Heartbeat
// session到期时间
private final long sessionTimeout;
// 发送heartbeat的间隔
private final long heartbeatInterval;
// 最大的poll间隔
private final long maxPollInterval;
// 重试时间
private final long retryBackoffMs;
// 上一次发送heartbeat时间
private volatile long lastHeartbeatSend; // volatile since it is read by metrics
// 上一次接收heartbeat响应时间
private long lastHeartbeatReceive;
private long lastSessionReset;
private long lastPoll;
// heartbeat是否成功
private boolean heartbeatFailed;
// 更新lastPoll时间 public void poll(long now) { this.lastPoll = now; } // 更新上一次心跳发送时间 public void sentHeartbeat(long now) { this.lastHeartbeatSend = now; this.heartbeatFailed = false; } // 更新心跳状态为失败 public void failHeartbeat() { this.heartbeatFailed = true; } // 更新上次接收心跳时间 public void receiveHeartbeat(long now) { this.lastHeartbeatReceive = now; } // 更新上一次心跳发送时间 public boolean shouldHeartbeat(long now) { return timeToNextHeartbeat(now) == 0; }
// 判断session是否过期 public boolean sessionTimeoutExpired(long now) { return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout; }
// 判断poll是否过期 public boolean pollTimeoutExpired(long now) { return now - lastPoll > maxPollInterval; }
二 HeartbeatThread
public void run() { try { while (true) { synchronized (AbstractCoordinator.this) { if (closed) return; // 是否enable HeartbeatThread if (!enabled) { AbstractCoordinator.this.wait(); continue; } // 如果消费者状态如果不是STABLE(消费者已经加入消费者组,并且开始发送心跳) if (state != MemberState.STABLE) { // 可能是消费者离开消费者组或者coordinator把我们踢了,所以需要disable heartbeats,等待主线程重新加入 disable(); continue; } client.pollNoWakeup(); long now = time.milliseconds(); // 检测GroupCoordinator是否已连接 if (coordinatorUnknown()) { // 如果没有连接,则查找GroupCoordinator,并返回一个请求结果 if (findCoordinatorFuture == null) lookupCoordinator(); else AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) {// 检测HeartbeatRespose是否超时 // 如果超时,则认为GroupCoordinator宕机,调用coordinatorDead方法清空unsent集合中的 // 请求,将coordinator 设置为null,表示将重新选举GroupCoordinator coordinatorDead(); } else if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(), so we explicitly leave the group. maybeLeaveGroup(); } else if (!heartbeat.shouldHeartbeat(now)) {// 没有到心跳请求的发送时间,等待 // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected AbstractCoordinator.this.wait(retryBackoffMs); } else { // 更新lastHeartbeatSend的时间,并且初始化heartbeatFailed heartbeat.sentHeartbeat(now); // 构造HeartbeatRequest对象,通过ConsumerClientNetwork添加到unsent队列, // 等待发送,结果HeartbeatResponseHandler处理后返回一个RequestFuture // 添加RequestFutureListener监听器,如果成功更新lastHeartbeatReceive时间 // 如果失败,则需要看情况: // # 如果是正处于rebalance过程还是更新lastHeartbeatReceive时间 // # 标记heartbeat请求失败 sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { synchronized (AbstractCoordinator.this) { heartbeat.receiveHeartbeat(time.milliseconds()); } } @Override public void onFailure(RuntimeException e) { synchronized (AbstractCoordinator.this) { if (e instanceof RebalanceInProgressException) { // it is valid to continue heartbeating while the group is rebalancing. This // ensures that the coordinator keeps the member in the group for as long // as the duration of the rebalance timeout. If we stop sending heartbeats, // however, then the session timeout may expire before we can rejoin. heartbeat.receiveHeartbeat(time.milliseconds()); } else { heartbeat.failHeartbeat(); // wake up the thread if it's sleeping to reschedule the heartbeat AbstractCoordinator.this.notify(); } } } }); } } } } catch (InterruptedException e) { log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); this.failed.set(new RuntimeException(e)); } catch (RuntimeException e) { log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); this.failed.set(e); } } }
三sendHeartbeatRequest
// 构造HeartbeatRequest对象,通过ConsumerClientNetwork添加到unsent队列, // 等待发送,结果HeartbeatResponseHandler处理后返回一个RequestFuture synchronized RequestFuture<Void> sendHeartbeatRequest() { HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId); return client.send(coordinator, ApiKeys.HEARTBEAT, req) .compose(new HeartbeatResponseHandler()); }
四 HeartbeatResponse的处理
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { // 将ClientResponse转换成HeartbeatResponse @Override public HeartbeatResponse parse(ClientResponse response) { return new HeartbeatResponse(response.responseBody()); } @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); Errors error = Errors.forCode(heartbeatResponse.errorCode()); if (error == Errors.NONE) {// 心跳正常,没有错误 log.debug("Received successful heartbeat response for group {}", groupId); future.complete(null); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) {// 找不到服务器端的GroupCoordinator log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", groupId, coordinator()); coordinatorDead();// 清空unsent集合中请求与,并置空coordinator future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) {// 如果正在rebalance log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId); requestRejoin();// 重新发送JoinGroupRequest future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) {//如果Generation不合法 log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId); resetGeneration();//重新设置Generation future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) {// 如果member未知 log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId); resetGeneration();//重新设置Generation future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } } }