ChaperoneClient
ChaperoneClient
是一个侵入式的组件,本身并不作为一个服务。
主要的是MessageTracker
的track()
方法,代码如下
public void track(double timestamp, int msgCount) {
long currentTimeMillis = 0;
timeBucketsRWLock.readLock().lock();
try {
TimeBucketMetadata timeBucket = getTimeBucket(timestamp);
//该时间桶内Message个数
timeBucket.msgCount.addAndGet(msgCount);
currentTimeMillis = System.currentTimeMillis();
timeBucket.lastMessageTimestampSeenInSec = timestamp;
double latency = currentTimeMillis - (timestamp * 1000);
for (int i = 0; i < msgCount; i++) {
timeBucket.latencyStats.addValue(latency);
}
} finally {
timeBucketsRWLock.readLock().unlock();
}
//时间桶个数达到一定数量或当前时间大于应该反馈的时间后进行反馈
if (timeBucketCount.get() >= reportFreqBucketCount || currentTimeMillis > nextReportTime.get()) {
report();
}
}
调用该方法,需要传入一个timestamp
。调用track
方法后,getTimeBucket()
方法会根据传入的timestamp
获得该数据对应的时间桶。这个时间桶会被维护在timeBucketsMap
内。当时间桶的数量大于reportFreqBucketCount
或者当前时间已经超过下一次需要反馈的时间,就会调用report()
方法。
public boolean report() {
...
//清空timeBucketCount
Map<Double, TimeBucketMetadata> tempTimeCountsMap;
timeBucketsRWLock.writeLock().lock();
try {
tempTimeCountsMap = timeBucketsMap;
timeBucketCount.set(0);
timeBucketsMap = new ConcurrentHashMap<>();
} finally {
timeBucketsRWLock.writeLock().unlock();
}
try {
//构建AuditMessage 并report
for (TimeBucketMetadata bucket : tempTimeCountsMap.values()) {
AuditMessage m = auditReporter.buildAuditMessage(topicName, bucket);
auditReporter.reportAuditMessage(m);
}
} catch (IOException ioe) {
logger.error("IOException when trying to send audit message: {}", ioe.toString());
}
nextReportTime.set(setNextReportingTime());
reportingInProgress.set(false);
return true;
}
report()
内会先记录timeBucketsMap
内所有时间桶的数据,并将timeBucketsMap
清空。 之后会使用KafkaAuditReporter
的buildAuditMessage
构建一个message
,buildAuditMessage
方法如下
public AuditMessage buildAuditMessage(String topicName, TimeBucketMetadata timeBucket) {
return new AuditMessage(topicName, timeBucket, hostMetadata);
}
构建得到message
后,会调用reportAuditMessage()
方法反馈该条记录,此方法内会向审计的Topic
发送该message
数据
public void reportAuditMessage(final AuditMessage message) throws IOException {
// Create a heatpipe-encoded message.
final JSONObject auditMsg = new JSONObject();
......
auditMsg.put(AuditMsgField.UUID.getName(), UUID.randomUUID().toString());
final byte[] outputBytes = auditMsg.toJSONString().getBytes();
// Send message via Kafka producer
final ProducerRecord<String, byte[]> data = new ProducerRecord<>(topicForAuditMsg, null, outputBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.warn("Could not send auditMsg over Kafka for topic {}", message.topicName, e);
} else {
messageReportRate.mark(message.timeBucketMetadata.msgCount.get());
bucketReportRate.mark();
}
}
});
}
消息全部反馈完成后,设置下次的反馈时间。ChaperoneClient
大体功能就是这样。
Client
发送的AuditMessage
类维护了一个时间桶,包含桶的开始、结束时间,时间桶内消息的个数等。