     (3)消息被消费后不会立即删除,而是按固定时间间隔和日志大小删除。默认保留7天,最大日志容量1GB。这样消息可以被不同的consumer group重复消费。

















     LENGTH 消息的总长度,MAGIC默认为1,ATTRIBUTE右边2byte识别是否压缩消息,CRC校验和,PAYLOAD实际消息体。







public LogManager(ServerConfig config, // broker的各种配置
                      Scheduler scheduler, // 调度器
                      long logCleanupIntervalMs, // 定期清理日志时间间隔
                      long logCleanupDefaultAgeMs, // 缺省日志保留时间
                      boolean needRecovery) { // 是否恢复
        this.config = config;
        this.maxMessageSize = config.getMaxMessageSize();
        this.scheduler = scheduler;
        //        this.time = time;
        this.logCleanupIntervalMs = logCleanupIntervalMs;
        this.logCleanupDefaultAgeMs = logCleanupDefaultAgeMs;
        this.needRecovery = needRecovery;
        this.logDir = Utils.getCanonicalFile(new File(config.getLogDir()));
        this.numPartitions = config.getNumPartitions(); // 缺省的分区数
        this.flushInterval = config.getFlushInterval();  // 定期刷盘时间间隔
        this.topicPartitionsMap = config.getTopicPartitionsMap();
        this.startupLatch = config.getEnableZookeeper() ? new CountDownLatch(1) : null;
        this.logFlushIntervalMap = config.getFlushIntervalMap();
        this.logRetentionSize = config.getLogRetentionSize(); // 日志保留大小 默认1GB
        this.logRetentionMSMap = getLogRetentionMSMap(config.getLogRetentionHoursMap()); // 每个topic日志保留时间

  2.LogManager的加载 (1)扫描rootDir的log文件、确定每个topic的分区数(2)启动清理过期日志定时任务(3)注册zookeeper

public void load() throws IOException {
        if (this.rollingStategy == null) { // 缺省按固定日志文件滚动策略
            this.rollingStategy = new FixedSizeRollingStrategy(config.getLogFileSize());
        if (!logDir.exists()) {
            logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'");
        if (!logDir.isDirectory() || !logDir.canRead()) {
            throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.");
        File[] subDirs = logDir.listFiles(); // rootDir下所有文件
        if (subDirs != null) {
            for (File dir : subDirs) {
                if (!dir.isDirectory()) {
                    logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?");
                } else {
                    logger.info("Loading log from " + dir.getAbsolutePath());
                    final String topicNameAndPartition = dir.getName(); // topic文件夹格式:topicName-partitionNum
                    if (-1 == topicNameAndPartition.indexOf('-')) { 
                        throw new IllegalArgumentException("error topic directory: " + dir.getAbsolutePath());
                    final KV<String, Integer> topicPartion = Utils.getTopicPartition(topicNameAndPartition);
                    final String topic = topicPartion.k;
                    final int partition = topicPartion.v;
                    Log log = new Log(dir, partition, this.rollingStategy, flushInterval, needRecovery, maxMessageSize); //初始化Log
                    logs.putIfNotExists(topic, new Pool<Integer, Log>());
                    Pool<Integer, Log> parts = logs.get(topic);

                    parts.put(partition, log);
                    int configPartition = getPartition(topic); // 获取已有的partition
                    if (configPartition <= partition) { // 保证设置最大的分区数
                        topicPartitionsMap.put(topic, partition + 1); // parition从0开始,所以总的分区数要+1

        /* Schedule the cleanup task to delete old logs */
        if (this.scheduler != null) {
            logger.debug("starting log cleaner every " + logCleanupIntervalMs + " ms");
            this.scheduler.scheduleWithRate(new Runnable() {

                public void run() {
                    try {
                        cleanupLogs(); // 清理过期日志
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);

            }, 60 * 1000, logCleanupIntervalMs);
        // 注册topic到zookeeper
        if (config.getEnableZookeeper()) {
            this.serverRegister = new ServerRegister(config, this);
            TopicRegisterTask task = new TopicRegisterTask();


     * Register this broker in ZK for the first time.
    public void startup() {
        if (config.getEnableZookeeper()) {
            for (String topic : getAllTopics()) {
                serverRegister.processTask(new TopicTask(TaskType.CREATE, topic));
        logger.debug("Starting log flusher every {} ms with the following overrides {}", config.getFlushSchedulerThreadRate(), logFlushIntervalMap);
        logFlusherScheduler.scheduleWithRate(new Runnable() {

            public void run() {
                flushAllLogs(false); // 定时刷盘
        }, config.getFlushSchedulerThreadRate(), config.getFlushSchedulerThreadRate());


     * flush all messages to disk
     * @param force flush anyway(ignore flush interval)
    public void flushAllLogs(final boolean force) {
        Iterator<Log> iter = getLogIterator();
        while (iter.hasNext()) {
            Log log = iter.next();
            try {
                boolean needFlush = force;
                if (!needFlush) { // 计算和检查是否刷盘
                    long timeSinceLastFlush = System.currentTimeMillis() - log.getLastFlushedTime();
                    Integer logFlushInterval = logFlushIntervalMap.get(log.getTopicName());
                    if (logFlushInterval == null) {
                        logFlushInterval = config.getDefaultFlushIntervalMs();
                    final String flushLogFormat = "[%s] flush interval %d, last flushed %d, need flush? %s";
                    needFlush = timeSinceLastFlush >= logFlushInterval.intValue();
                    logger.trace(String.format(flushLogFormat, log.getTopicName(), logFlushInterval,
                            log.getLastFlushedTime(), needFlush));
                if (needFlush) {
                    log.flush(); // 刷盘
            } catch (IOException ioe) {
                logger.error("Error flushing topic " + log.getTopicName(), ioe);
                logger.error("Halting due to unrecoverable I/O error while flushing logs: " + ioe.getMessage(), ioe);
            } catch (Exception e) {
                logger.error("Error flushing topic " + log.getTopicName(), e);

     * Flush this log file to the physical disk
     * @throws IOException
    public void flush() throws IOException {
        if (unflushed.get() == 0) return;

        synchronized (lock) {
            if (logger.isTraceEnabled()) {
                logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime() + " current time: " + System
            segments.getLastView().getMessageSet().flush(); // 只需刷新写数据的文件

     * Commit all written data to the physical disk
     * @throws IOException
    public void flush() throws IOException {
        checkMutable(); // 检查文件是否可修改的.如果不能修改则抛出异常
        long startTime = System.currentTimeMillis();
        channel.force(true); // 最终调用fileChannel的force方法
        long elapsedTime = System.currentTimeMillis() - startTime;
        logger.debug("flush time " + elapsedTime);
        logger.debug("flush high water mark:" + highWaterMark());


     * Create the log if it does not exist or return back exist log
     * @param topic     the topic name
     * @param partition the partition id
     * @return read or create a log
     * @throws IOException any IOException
    public ILog getOrCreateLog(String topic, int partition) throws IOException {
        final int configPartitionNumber = getPartition(topic);
        if (partition >= configPartitionNumber) {
            throw new IOException("partition is bigger than the number of configuration: " + configPartitionNumber);
        boolean hasNewTopic = false;
        Pool<Integer, Log> parts = getLogPool(topic, partition);
        if (parts == null) {
            Pool<Integer, Log> found = logs.putIfNotExists(topic, new Pool<Integer, Log>());
            if (found == null) {
                hasNewTopic = true;
            parts = logs.get(topic);
        Log log = parts.get(partition);
        if (log == null) { // 如果Log不存在则创建
            log = createLog(topic, partition);
            Log found = parts.putIfNotExists(partition, log);
            if (found != null) {
                Closer.closeQuietly(log, logger);
                log = found;
            } else {
                logger.info(format("Created log for [%s-%d], now create other logs if necessary", topic, partition));
                final int configPartitions = getPartition(topic);
                for (int i = 0; i < configPartitions; i++) {
                    getOrCreateLog(topic, i);
        if (hasNewTopic && config.getEnableZookeeper()) {
            topicRegisterTasks.add(new TopicTask(TaskType.CREATE, topic));
        return log;
 private Log createLog(String topic, int partition) throws IOException {
        synchronized (logCreationLock) { // 新建文件夹 top
            File d = new File(logDir, topic + "-" + partition);
            return new Log(d, partition, this.rollingStategy, flushInterval, false, maxMessageSize);


private SegmentList loadSegments() throws IOException {
        List<LogSegment> accum = new ArrayList<LogSegment>();
        File[] ls = dir.listFiles(new FileFilter() {  // 加载topic-partition文件下所有的.jafka后缀文件

            public boolean accept(File f) {
                return f.isFile() && f.getName().endsWith(FileSuffix);
        logger.info("loadSegments files from [" + dir.getAbsolutePath() + "]: " + ls.length);
        int n = 0;
        for (File f : ls) {
            String filename = f.getName();
            long start = Long.parseLong(filename.substring(0, filename.length() - FileSuffix.length())); // 文件名为偏移量+.jafka
            final String logFormat = "LOADING_LOG_FILE[%2d], start(offset)=%d, size=%d, path=%s";
            logger.info(String.format(logFormat, n, start, f.length(), f.getAbsolutePath()));
            FileMessageSet messageSet = new FileMessageSet(f, false); // FileMessageSet对应一个segment文件
            accum.add(new LogSegment(f, messageSet, start));
        if (accum.size() == 0) {
            // no existing segments, create a new mutable segment
            File newFile = new File(dir, Log.nameFromOffset(0));
            FileMessageSet fileMessageSet = new FileMessageSet(newFile, true);
            accum.add(new LogSegment(newFile, fileMessageSet, 0));
        } else {
            // there is at least one existing segment, validate and recover them/it
            // sort segments into ascending order for fast searching
            Collections.sort(accum); // 升序排序,根据offset可快速查找消息
            validateSegments(accum); // 校验每个文件的消息是否完整
        // 最进的一个文件作为可读写,其他文件只要只读
        LogSegment last = accum.remove(accum.size() - 1);
        logger.info("Loading the last segment " + last.getFile().getAbsolutePath() + " in mutable mode, recovery " + needRecovery);
        LogSegment mutable = new LogSegment(last.getFile(), new FileMessageSet(last.getFile(), true, new AtomicBoolean(
                needRecovery)), last.start());
        return new SegmentList(name, accum);


public List<Long> append(ByteBufferMessageSet messages) {
        //validate the messages
        int numberOfMessages = 0;
        for (MessageAndOffset messageAndOffset : messages) {
            if (!messageAndOffset.message.isValid()) {
                throw new InvalidMessageException();
            numberOfMessages += 1;

        // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
        ByteBuffer validByteBuffer = messages.getBuffer().duplicate();
        long messageSetValidBytes = messages.getValidBytes();
        if (messageSetValidBytes > Integer.MAX_VALUE || messageSetValidBytes < 0) throw new InvalidMessageSizeException(
                "Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests");

        validByteBuffer.limit((int) messageSetValidBytes);
        ByteBufferMessageSet validMessages = new ByteBufferMessageSet(validByteBuffer);

        // they are valid, insert them in the log
        synchronized (lock) {
            try {
                LogSegment lastSegment = segments.getLastView(); // 获取最新的segment
                long[] writtenAndOffset = lastSegment.getMessageSet().append(validMessages); // 写入文件
                if (logger.isTraceEnabled()) {
                    logger.trace(String.format("[%s,%s] save %d messages, bytes %d", name, lastSegment.getName(),
                            numberOfMessages, writtenAndOffset[0]));
                maybeFlush(numberOfMessages); // 如果超过flushInterval个消息没有刷盘则同步刷盘
                maybeRoll(lastSegment); // 根据滚动策略创建segment

            } catch (IOException e) {
                logger.error("Halting due to unrecoverable I/O error while handling producer request", e);
            } catch (RuntimeException re) {
                throw re;
        return (List<Long>) null;


     * Append this message to the message set
     * @return the written size and first offset
     * @throws IOException
    public long[] append(MessageSet messages) throws IOException {
        long written = 0L;
        while (written < messages.getSizeInBytes())
            written += messages.writeTo(channel, 0, messages.getSizeInBytes()); // FileMessageSet的写磁盘方法
        long beforeOffset = setSize.getAndAdd(written);
        return new long[]{written, beforeOffset};

public long writeTo(GatheringByteChannel channel, long offset, long maxSize) throws IOException {
        int written = channel.write(buffer); // 只是写入channel,并没有直接落盘
        return written;


     * read messages beginning from offset
     * @param offset next message offset
     * @param length the max package size
     * @return a MessageSet object with length data or empty
     * @see MessageSet#Empty
     * @throws IOException
    public MessageSet read(long offset, int length) throws IOException {
        List<LogSegment> views = segments.getView(); // 获取segment视图
        LogSegment found = findRange(views, offset, views.size()); // 二分查找定位segment.
        if (found == null) {
            if (logger.isTraceEnabled()) {
                logger.trace(format("NOT FOUND MessageSet from Log[%s], offset=%d, length=%d", name, offset, length));
            return MessageSet.Empty;
        return found.getMessageSet().read(offset - found.start(), length); // 总的位移减去当前sement的位移则为当前sement的消息位置

FileMessageSet的read方法。read方法直接new的一个对象返回,多个consumer group可以并发重复消费消息

     * read message from file
     * @param readOffset offset in this channel(file);not the message offset
     * @param size       max data size
     * @return messages sharding data with file log
     * @throws IOException reading file failed
    public MessageSet read(long readOffset, long size) throws IOException {
        return new FileMessageSet(channel, this.offset + readOffset, //
                Math.min(this.offset + readOffset + size, highWaterMark()), false, new AtomicBoolean(false));

public Iterator<MessageAndOffset> iterator() {
        return new IteratorTemplate<MessageAndOffset>() { // 自定义的迭代模板

            long location = offset;

            protected MessageAndOffset makeNext() {
                try {

                    ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
                    channel.read(sizeBuffer, location); //读取消息长度
                    if (sizeBuffer.hasRemaining()) {
                        return allDone(); // 没有读到则终止.segment已读完
                    int size = sizeBuffer.getInt();
                    if (size < Message.MinHeaderSize) { // 读到的消息长度大于最大的消息长度
                        return allDone();
                    ByteBuffer buffer = ByteBuffer.allocate(size);
                    channel.read(buffer, location + 4);
                    if (buffer.hasRemaining()) { // 实际消息长度小于消息头写的消息长度
                        return allDone();
                    location += size + 4;

                    return new MessageAndOffset(new Message(buffer), location); // 读到真正的消息,则返回
                } catch (IOException e) {
                    throw new RuntimeException(e.getMessage(), e);


