jafka作为kafka的java实现的消息队列,基本按照kafka的设计思路实现的。kafka消息队列的消息先写磁盘再消费,kafka写消息有如下几点重要的设计:
(1)每次写消息都是文件追加(顺序写),尽量减少磁盘寻道时间;
(2)每次写消息不是直接写磁盘,jafka按定时和写入消息数刷盘结合。定时刷是异步,默认3秒;写入消息数是同步,每次写入都会检查没有刷盘的消息数,如果超过则刷盘,默认是500。
(3)消息被消费后不会立即删除,而是按固定时间间隔和日志大小删除。默认保留7天,最大日志容量1GB。这样消息可以被不同的consumer group重复消费。
(4)同一个topic可以建多个分区,消费进度由消费者控制,这样可以增加消费的并行度。
jafka日志存储的设计和实现大致如下:
1.jafka的日志存储目录结构:
rootDir
|--topic1-partition1
|--offset1.jafka
|--offset2.jafka
|--topic1-partition2
|--offset1.jafka
|--offset2.jafka
2.类的组织方式
Message-->MessageSet-->LogSegment-->SegmentList-->Log-->LogManager
|--->ByteBufferMessageSet
|--->FileMessageSet
(1)Message为最基本的数据结构类,定义了消息的结构:LENGTH(4byte),MAGIC(1byte),ATTRIBUTE(1byte),CRC(4byte),PAYLOAD
LENGTH 消息的总长度,MAGIC默认为1,ATTRIBUTE右边2byte识别是否压缩消息,CRC校验和,PAYLOAD实际消息体。
(2).MessageSet为Message的集合类对应一个offset1.jafka文件,实现了Iterable<MessageAndOffset>接口,定义了读、写两个基本操作和一些便利的工具方法。有两个实现类FileMessageSet和ByteBufferMessageSet。ByteBufferMessageSet主要作为内存的消息传递,FileMessageSet主要负责磁盘消息的读写操作。
(3)LogSegment可以简单看成是offset1.jafka文件的java实例
(4)SegmentList为LogSegment的集合,实现了对LogSegment集合的追加、删除、视图方法
(5)Log对应一个topic-partition,定义了更高级别的api:read、append、getOffsetsBefore
(6)LogManager管理整个rootDir,负责加载topic、定时删除过期日志、定时刷盘、创建topic-partition;对应的更新操作根据enable.zookeeper属性操作到zookeeper。
1.LogManager的实例化
public LogManager(ServerConfig config, // broker的各种配置
Scheduler scheduler, // 调度器
long logCleanupIntervalMs, // 定期清理日志时间间隔
long logCleanupDefaultAgeMs, // 缺省日志保留时间
boolean needRecovery) { // 是否恢复
super();
this.config = config;
this.maxMessageSize = config.getMaxMessageSize();
this.scheduler = scheduler;
// this.time = time;
this.logCleanupIntervalMs = logCleanupIntervalMs;
this.logCleanupDefaultAgeMs = logCleanupDefaultAgeMs;
this.needRecovery = needRecovery;
//
this.logDir = Utils.getCanonicalFile(new File(config.getLogDir()));
this.numPartitions = config.getNumPartitions(); // 缺省的分区数
this.flushInterval = config.getFlushInterval(); // 定期刷盘时间间隔
this.topicPartitionsMap = config.getTopicPartitionsMap();
this.startupLatch = config.getEnableZookeeper() ? new CountDownLatch(1) : null;
this.logFlushIntervalMap = config.getFlushIntervalMap();
this.logRetentionSize = config.getLogRetentionSize(); // 日志保留大小 默认1GB
this.logRetentionMSMap = getLogRetentionMSMap(config.getLogRetentionHoursMap()); // 每个topic日志保留时间
}
2.LogManager的加载 (1)扫描rootDir的log文件、确定每个topic的分区数(2)启动清理过期日志定时任务(3)注册zookeeper
public void load() throws IOException {
if (this.rollingStategy == null) { // 缺省按固定日志文件滚动策略
this.rollingStategy = new FixedSizeRollingStrategy(config.getLogFileSize());
}
if (!logDir.exists()) {
logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'");
logDir.mkdirs();
}
if (!logDir.isDirectory() || !logDir.canRead()) {
throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.");
}
File[] subDirs = logDir.listFiles(); // rootDir下所有文件
if (subDirs != null) {
for (File dir : subDirs) {
if (!dir.isDirectory()) {
logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?");
} else {
logger.info("Loading log from " + dir.getAbsolutePath());
final String topicNameAndPartition = dir.getName(); // topic文件夹格式:topicName-partitionNum
if (-1 == topicNameAndPartition.indexOf('-')) {
throw new IllegalArgumentException("error topic directory: " + dir.getAbsolutePath());
}
final KV<String, Integer> topicPartion = Utils.getTopicPartition(topicNameAndPartition);
final String topic = topicPartion.k;
final int partition = topicPartion.v;
Log log = new Log(dir, partition, this.rollingStategy, flushInterval, needRecovery, maxMessageSize); //初始化Log
logs.putIfNotExists(topic, new Pool<Integer, Log>());
Pool<Integer, Log> parts = logs.get(topic);
parts.put(partition, log);
int configPartition = getPartition(topic); // 获取已有的partition
if (configPartition <= partition) { // 保证设置最大的分区数
topicPartitionsMap.put(topic, partition + 1); // parition从0开始,所以总的分区数要+1
}
}
}
}
/* Schedule the cleanup task to delete old logs */
if (this.scheduler != null) {
logger.debug("starting log cleaner every " + logCleanupIntervalMs + " ms");
this.scheduler.scheduleWithRate(new Runnable() {
public void run() {
try {
cleanupLogs(); // 清理过期日志
} catch (IOException e) {
logger.error("cleanup log failed.", e);
}
}
}, 60 * 1000, logCleanupIntervalMs);
}
// 注册topic到zookeeper
if (config.getEnableZookeeper()) {
this.serverRegister = new ServerRegister(config, this);
serverRegister.startup();
TopicRegisterTask task = new TopicRegisterTask();
task.setName("jafka.topicregister");
task.setDaemon(true);
task.start();
}
}
3.LogManager的启动(1)注册topic(2)启动定时任务刷盘
/**
* Register this broker in ZK for the first time.
*/
public void startup() {
if (config.getEnableZookeeper()) {
serverRegister.registerBrokerInZk();
for (String topic : getAllTopics()) {
serverRegister.processTask(new TopicTask(TaskType.CREATE, topic));
}
startupLatch.countDown();
}
logger.debug("Starting log flusher every {} ms with the following overrides {}", config.getFlushSchedulerThreadRate(), logFlushIntervalMap);
logFlusherScheduler.scheduleWithRate(new Runnable() {
public void run() {
flushAllLogs(false); // 定时刷盘
}
}, config.getFlushSchedulerThreadRate(), config.getFlushSchedulerThreadRate());
}
LogManager刷盘,先判断是否刷,如果刷盘则调用Log的刷盘方法
/**
* flush all messages to disk
*
* @param force flush anyway(ignore flush interval)
*/
public void flushAllLogs(final boolean force) {
Iterator<Log> iter = getLogIterator();
while (iter.hasNext()) {
Log log = iter.next();
try {
boolean needFlush = force;
if (!needFlush) { // 计算和检查是否刷盘
long timeSinceLastFlush = System.currentTimeMillis() - log.getLastFlushedTime();
Integer logFlushInterval = logFlushIntervalMap.get(log.getTopicName());
if (logFlushInterval == null) {
logFlushInterval = config.getDefaultFlushIntervalMs();
}
final String flushLogFormat = "[%s] flush interval %d, last flushed %d, need flush? %s";
needFlush = timeSinceLastFlush >= logFlushInterval.intValue();
logger.trace(String.format(flushLogFormat, log.getTopicName(), logFlushInterval,
log.getLastFlushedTime(), needFlush));
}
if (needFlush) {
log.flush(); // 刷盘
}
} catch (IOException ioe) {
logger.error("Error flushing topic " + log.getTopicName(), ioe);
logger.error("Halting due to unrecoverable I/O error while flushing logs: " + ioe.getMessage(), ioe);
Runtime.getRuntime().halt(1);
} catch (Exception e) {
logger.error("Error flushing topic " + log.getTopicName(), e);
}
}
}
Log的刷盘方法如下:
/**
* Flush this log file to the physical disk
*
* @throws IOException
*/
public void flush() throws IOException {
if (unflushed.get() == 0) return;
synchronized (lock) {
if (logger.isTraceEnabled()) {
logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime() + " current time: " + System
.currentTimeMillis());
}
segments.getLastView().getMessageSet().flush(); // 只需刷新写数据的文件
unflushed.set(0);
lastflushedTime.set(System.currentTimeMillis());
}
}
FileMessageSet的刷盘方法
/**
* Commit all written data to the physical disk
*
* @throws IOException
*/
public void flush() throws IOException {
checkMutable(); // 检查文件是否可修改的.如果不能修改则抛出异常
long startTime = System.currentTimeMillis();
channel.force(true); // 最终调用fileChannel的force方法
long elapsedTime = System.currentTimeMillis() - startTime;
LogFlushStats.recordFlushRequest(elapsedTime);
logger.debug("flush time " + elapsedTime);
setHighWaterMark.set(getSizeInBytes());
logger.debug("flush high water mark:" + highWaterMark());
}
4.LogManager获取或者创建一个新的分区
/**
* Create the log if it does not exist or return back exist log
*
* @param topic the topic name
* @param partition the partition id
* @return read or create a log
* @throws IOException any IOException
*/
public ILog getOrCreateLog(String topic, int partition) throws IOException {
final int configPartitionNumber = getPartition(topic);
if (partition >= configPartitionNumber) {
throw new IOException("partition is bigger than the number of configuration: " + configPartitionNumber);
}
boolean hasNewTopic = false;
Pool<Integer, Log> parts = getLogPool(topic, partition);
if (parts == null) {
Pool<Integer, Log> found = logs.putIfNotExists(topic, new Pool<Integer, Log>());
if (found == null) {
hasNewTopic = true;
}
parts = logs.get(topic);
}
//
Log log = parts.get(partition);
if (log == null) { // 如果Log不存在则创建
log = createLog(topic, partition);
Log found = parts.putIfNotExists(partition, log);
if (found != null) {
Closer.closeQuietly(log, logger);
log = found;
} else {
logger.info(format("Created log for [%s-%d], now create other logs if necessary", topic, partition));
final int configPartitions = getPartition(topic);
for (int i = 0; i < configPartitions; i++) {
getOrCreateLog(topic, i);
}
}
}
if (hasNewTopic && config.getEnableZookeeper()) {
topicRegisterTasks.add(new TopicTask(TaskType.CREATE, topic));
}
return log;
}
新建分区(topic-partition)
private Log createLog(String topic, int partition) throws IOException {
synchronized (logCreationLock) { // 新建文件夹 top
File d = new File(logDir, topic + "-" + partition);
d.mkdirs();
return new Log(d, partition, this.rollingStategy, flushInterval, false, maxMessageSize);
}
}
5Log加载,在实例化Log时就会加载segment文件,如:offset1.jafka
private SegmentList loadSegments() throws IOException {
List<LogSegment> accum = new ArrayList<LogSegment>();
File[] ls = dir.listFiles(new FileFilter() { // 加载topic-partition文件下所有的.jafka后缀文件
public boolean accept(File f) {
return f.isFile() && f.getName().endsWith(FileSuffix);
}
});
logger.info("loadSegments files from [" + dir.getAbsolutePath() + "]: " + ls.length);
int n = 0;
for (File f : ls) {
n++;
String filename = f.getName();
long start = Long.parseLong(filename.substring(0, filename.length() - FileSuffix.length())); // 文件名为偏移量+.jafka
final String logFormat = "LOADING_LOG_FILE[%2d], start(offset)=%d, size=%d, path=%s";
logger.info(String.format(logFormat, n, start, f.length(), f.getAbsolutePath()));
FileMessageSet messageSet = new FileMessageSet(f, false); // FileMessageSet对应一个segment文件
accum.add(new LogSegment(f, messageSet, start));
}
if (accum.size() == 0) {
// no existing segments, create a new mutable segment
File newFile = new File(dir, Log.nameFromOffset(0));
FileMessageSet fileMessageSet = new FileMessageSet(newFile, true);
accum.add(new LogSegment(newFile, fileMessageSet, 0));
} else {
// there is at least one existing segment, validate and recover them/it
// sort segments into ascending order for fast searching
Collections.sort(accum); // 升序排序,根据offset可快速查找消息
validateSegments(accum); // 校验每个文件的消息是否完整
}
// 最进的一个文件作为可读写,其他文件只要只读
LogSegment last = accum.remove(accum.size() - 1);
last.getMessageSet().close();
logger.info("Loading the last segment " + last.getFile().getAbsolutePath() + " in mutable mode, recovery " + needRecovery);
LogSegment mutable = new LogSegment(last.getFile(), new FileMessageSet(last.getFile(), true, new AtomicBoolean(
needRecovery)), last.start());
accum.add(mutable);
return new SegmentList(name, accum);
}
6.Log写
public List<Long> append(ByteBufferMessageSet messages) {
//validate the messages
messages.verifyMessageSize(maxMessageSize);
int numberOfMessages = 0;
for (MessageAndOffset messageAndOffset : messages) {
if (!messageAndOffset.message.isValid()) {
throw new InvalidMessageException();
}
numberOfMessages += 1;
}
//
BrokerTopicStat.getBrokerTopicStat(getTopicName()).recordMessagesIn(numberOfMessages);
BrokerTopicStat.getBrokerAllTopicStat().recordMessagesIn(numberOfMessages);
logStats.recordAppendedMessages(numberOfMessages);
// truncate the message set's buffer upto validbytes, before appending it to the on-disk log
ByteBuffer validByteBuffer = messages.getBuffer().duplicate();
long messageSetValidBytes = messages.getValidBytes();
if (messageSetValidBytes > Integer.MAX_VALUE || messageSetValidBytes < 0) throw new InvalidMessageSizeException(
"Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests");
validByteBuffer.limit((int) messageSetValidBytes);
ByteBufferMessageSet validMessages = new ByteBufferMessageSet(validByteBuffer);
// they are valid, insert them in the log
synchronized (lock) {
try {
LogSegment lastSegment = segments.getLastView(); // 获取最新的segment
long[] writtenAndOffset = lastSegment.getMessageSet().append(validMessages); // 写入文件
if (logger.isTraceEnabled()) {
logger.trace(String.format("[%s,%s] save %d messages, bytes %d", name, lastSegment.getName(),
numberOfMessages, writtenAndOffset[0]));
}
maybeFlush(numberOfMessages); // 如果超过flushInterval个消息没有刷盘则同步刷盘
maybeRoll(lastSegment); // 根据滚动策略创建segment
} catch (IOException e) {
logger.error("Halting due to unrecoverable I/O error while handling producer request", e);
Runtime.getRuntime().halt(1);
} catch (RuntimeException re) {
throw re;
}
}
return (List<Long>) null;
}
FileMessageSet的append方法
/**
* Append this message to the message set
*
* @return the written size and first offset
* @throws IOException
*/
public long[] append(MessageSet messages) throws IOException {
checkMutable();
long written = 0L;
while (written < messages.getSizeInBytes())
written += messages.writeTo(channel, 0, messages.getSizeInBytes()); // FileMessageSet的写磁盘方法
long beforeOffset = setSize.getAndAdd(written);
return new long[]{written, beforeOffset};
}
ByteBufferMessageSet的wirteTo方法
public long writeTo(GatheringByteChannel channel, long offset, long maxSize) throws IOException {
buffer.mark();
int written = channel.write(buffer); // 只是写入channel,并没有直接落盘
buffer.reset();
return written;
}
7.Log读.有个需要注意的地方,如果传入的offset越界则直接抛出异常,需要客户端重新定位offset
/**
* read messages beginning from offset
*
* @param offset next message offset
* @param length the max package size
* @return a MessageSet object with length data or empty
* @see MessageSet#Empty
* @throws IOException
*/
public MessageSet read(long offset, int length) throws IOException {
List<LogSegment> views = segments.getView(); // 获取segment视图
LogSegment found = findRange(views, offset, views.size()); // 二分查找定位segment.
if (found == null) {
if (logger.isTraceEnabled()) {
logger.trace(format("NOT FOUND MessageSet from Log[%s], offset=%d, length=%d", name, offset, length));
}
return MessageSet.Empty;
}
return found.getMessageSet().read(offset - found.start(), length); // 总的位移减去当前sement的位移则为当前sement的消息位置
}
FileMessageSet的read方法。read方法直接new的一个对象返回,多个consumer group可以并发重复消费消息
/**
* read message from file
*
* @param readOffset offset in this channel(file);not the message offset
* @param size max data size
* @return messages sharding data with file log
* @throws IOException reading file failed
*/
public MessageSet read(long readOffset, long size) throws IOException {
return new FileMessageSet(channel, this.offset + readOffset, //
Math.min(this.offset + readOffset + size, highWaterMark()), false, new AtomicBoolean(false));
}
FileMessageSet的迭代方法。迭代放回的是Message和Offset的组合,消费则迭代
public Iterator<MessageAndOffset> iterator() {
return new IteratorTemplate<MessageAndOffset>() { // 自定义的迭代模板
long location = offset;
@Override
protected MessageAndOffset makeNext() {
try {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
channel.read(sizeBuffer, location); //读取消息长度
if (sizeBuffer.hasRemaining()) {
return allDone(); // 没有读到则终止.segment已读完
}
sizeBuffer.rewind();
int size = sizeBuffer.getInt();
if (size < Message.MinHeaderSize) { // 读到的消息长度大于最大的消息长度
return allDone();
}
ByteBuffer buffer = ByteBuffer.allocate(size);
channel.read(buffer, location + 4);
if (buffer.hasRemaining()) { // 实际消息长度小于消息头写的消息长度
return allDone();
}
buffer.rewind();
location += size + 4;
return new MessageAndOffset(new Message(buffer), location); // 读到真正的消息,则返回
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
};
}
以上为jafka的日志存储代码主要实现功能,真正的文件操作由FileChannel的read、write、force方法完成,jafka主要完成文件的管理、控制刷盘、zookeeper注册。