当前位置: 首页 > 工具软件 > Jafka > 使用案例 >

jafka日志存储源码阅读笔记

汪凌
2023-12-01

  jafka作为kafka的java实现的消息队列,基本按照kafka的设计思路实现的。kafka消息队列的消息先写磁盘再消费,kafka写消息有如下几点重要的设计:

    (1)每次写消息都是文件追加(顺序写),尽量减少磁盘寻道时间;

    (2)每次写消息不是直接写磁盘,jafka按定时和写入消息数刷盘结合。定时刷是异步,默认3秒;写入消息数是同步,每次写入都会检查没有刷盘的消息数,如果超过则刷盘,默认是500。

     (3)消息被消费后不会立即删除,而是按固定时间间隔和日志大小删除。默认保留7天,最大日志容量1GB。这样消息可以被不同的consumer group重复消费。

     (4)同一个topic可以建多个分区,消费进度由消费者控制,这样可以增加消费的并行度。

    

    jafka日志存储的设计和实现大致如下:

    1.jafka的日志存储目录结构:

    rootDir

        |--topic1-partition1

                 |--offset1.jafka

                 |--offset2.jafka

        |--topic1-partition2

                 |--offset1.jafka

                 |--offset2.jafka


    2.类的组织方式

     Message-->MessageSet-->LogSegment-->SegmentList-->Log-->LogManager

                             |--->ByteBufferMessageSet

                             |--->FileMessageSet

         (1)Message为最基本的数据结构类,定义了消息的结构:LENGTH(4byte),MAGIC(1byte),ATTRIBUTE(1byte),CRC(4byte),PAYLOAD

     LENGTH 消息的总长度,MAGIC默认为1,ATTRIBUTE右边2byte识别是否压缩消息,CRC校验和,PAYLOAD实际消息体。

        (2).MessageSet为Message的集合类对应一个offset1.jafka文件,实现了Iterable<MessageAndOffset>接口,定义了读、写两个基本操作和一些便利的工具方法。有两个实现类FileMessageSet和ByteBufferMessageSet。ByteBufferMessageSet主要作为内存的消息传递,FileMessageSet主要负责磁盘消息的读写操作。

        (3)LogSegment可以简单看成是offset1.jafka文件的java实例

        (4)SegmentList为LogSegment的集合,实现了对LogSegment集合的追加、删除、视图方法

        (5)Log对应一个topic-partition,定义了更高级别的api:read、append、getOffsetsBefore

        (6)LogManager管理整个rootDir,负责加载topic、定时删除过期日志、定时刷盘、创建topic-partition;对应的更新操作根据enable.zookeeper属性操作到zookeeper。

 1.LogManager的实例化

public LogManager(ServerConfig config, // broker的各种配置
                      Scheduler scheduler, // 调度器
                      long logCleanupIntervalMs, // 定期清理日志时间间隔
                      long logCleanupDefaultAgeMs, // 缺省日志保留时间
                      boolean needRecovery) { // 是否恢复
        super();
        this.config = config;
        this.maxMessageSize = config.getMaxMessageSize();
        this.scheduler = scheduler;
        //        this.time = time;
        this.logCleanupIntervalMs = logCleanupIntervalMs;
        this.logCleanupDefaultAgeMs = logCleanupDefaultAgeMs;
        this.needRecovery = needRecovery;
        //
        this.logDir = Utils.getCanonicalFile(new File(config.getLogDir()));
        this.numPartitions = config.getNumPartitions(); // 缺省的分区数
        this.flushInterval = config.getFlushInterval();  // 定期刷盘时间间隔
        this.topicPartitionsMap = config.getTopicPartitionsMap();
        this.startupLatch = config.getEnableZookeeper() ? new CountDownLatch(1) : null;
        this.logFlushIntervalMap = config.getFlushIntervalMap();
        this.logRetentionSize = config.getLogRetentionSize(); // 日志保留大小 默认1GB
        this.logRetentionMSMap = getLogRetentionMSMap(config.getLogRetentionHoursMap()); // 每个topic日志保留时间
    }

  2.LogManager的加载 (1)扫描rootDir的log文件、确定每个topic的分区数(2)启动清理过期日志定时任务(3)注册zookeeper

public void load() throws IOException {
        if (this.rollingStategy == null) { // 缺省按固定日志文件滚动策略
            this.rollingStategy = new FixedSizeRollingStrategy(config.getLogFileSize());
        }
        if (!logDir.exists()) {
            logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'");
            logDir.mkdirs();
        }
        if (!logDir.isDirectory() || !logDir.canRead()) {
            throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.");
        }
        File[] subDirs = logDir.listFiles(); // rootDir下所有文件
        if (subDirs != null) {
            for (File dir : subDirs) {
                if (!dir.isDirectory()) {
                    logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?");
                } else {
                    logger.info("Loading log from " + dir.getAbsolutePath());
                    final String topicNameAndPartition = dir.getName(); // topic文件夹格式:topicName-partitionNum
                    if (-1 == topicNameAndPartition.indexOf('-')) { 
                        throw new IllegalArgumentException("error topic directory: " + dir.getAbsolutePath());
                    }
                    final KV<String, Integer> topicPartion = Utils.getTopicPartition(topicNameAndPartition);
                    final String topic = topicPartion.k;
                    final int partition = topicPartion.v;
                    Log log = new Log(dir, partition, this.rollingStategy, flushInterval, needRecovery, maxMessageSize); //初始化Log
                    
                    logs.putIfNotExists(topic, new Pool<Integer, Log>());
                    Pool<Integer, Log> parts = logs.get(topic);

                    parts.put(partition, log);
                    int configPartition = getPartition(topic); // 获取已有的partition
                    if (configPartition <= partition) { // 保证设置最大的分区数
                        topicPartitionsMap.put(topic, partition + 1); // parition从0开始,所以总的分区数要+1
                    }
                }
            }
        }

        /* Schedule the cleanup task to delete old logs */
        if (this.scheduler != null) {
            logger.debug("starting log cleaner every " + logCleanupIntervalMs + " ms");
            this.scheduler.scheduleWithRate(new Runnable() {

                public void run() {
                    try {
                        cleanupLogs(); // 清理过期日志
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }

            }, 60 * 1000, logCleanupIntervalMs);
        }
        // 注册topic到zookeeper
        if (config.getEnableZookeeper()) {
            this.serverRegister = new ServerRegister(config, this);
            serverRegister.startup();
            TopicRegisterTask task = new TopicRegisterTask();
            task.setName("jafka.topicregister");
            task.setDaemon(true);
            task.start();
        }
    }

  3.LogManager的启动(1)注册topic(2)启动定时任务刷盘

/**
     * Register this broker in ZK for the first time.
     */
    public void startup() {
        if (config.getEnableZookeeper()) {
            serverRegister.registerBrokerInZk();
            for (String topic : getAllTopics()) {
                serverRegister.processTask(new TopicTask(TaskType.CREATE, topic));
            }
            startupLatch.countDown();
        }
        logger.debug("Starting log flusher every {} ms with the following overrides {}", config.getFlushSchedulerThreadRate(), logFlushIntervalMap);
        logFlusherScheduler.scheduleWithRate(new Runnable() {

            public void run() {
                flushAllLogs(false); // 定时刷盘
            }
        }, config.getFlushSchedulerThreadRate(), config.getFlushSchedulerThreadRate());
    }

LogManager刷盘,先判断是否刷,如果刷盘则调用Log的刷盘方法

/**
     * flush all messages to disk
     *
     * @param force flush anyway(ignore flush interval)
     */
    public void flushAllLogs(final boolean force) {
        Iterator<Log> iter = getLogIterator();
        while (iter.hasNext()) {
            Log log = iter.next();
            try {
                boolean needFlush = force;
                if (!needFlush) { // 计算和检查是否刷盘
                    long timeSinceLastFlush = System.currentTimeMillis() - log.getLastFlushedTime();
                    Integer logFlushInterval = logFlushIntervalMap.get(log.getTopicName());
                    if (logFlushInterval == null) {
                        logFlushInterval = config.getDefaultFlushIntervalMs();
                    }
                    final String flushLogFormat = "[%s] flush interval %d, last flushed %d, need flush? %s";
                    needFlush = timeSinceLastFlush >= logFlushInterval.intValue();
                    logger.trace(String.format(flushLogFormat, log.getTopicName(), logFlushInterval,
                            log.getLastFlushedTime(), needFlush));
                }
                if (needFlush) {
                    log.flush(); // 刷盘
                }
            } catch (IOException ioe) {
                logger.error("Error flushing topic " + log.getTopicName(), ioe);
                logger.error("Halting due to unrecoverable I/O error while flushing logs: " + ioe.getMessage(), ioe);
                Runtime.getRuntime().halt(1);
            } catch (Exception e) {
                logger.error("Error flushing topic " + log.getTopicName(), e);
            }
        }
    }
 Log的刷盘方法如下:

/**
     * Flush this log file to the physical disk
     * 
     * @throws IOException
     */
    public void flush() throws IOException {
        if (unflushed.get() == 0) return;

        synchronized (lock) {
            if (logger.isTraceEnabled()) {
                logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime() + " current time: " + System
                        .currentTimeMillis());
            }
            segments.getLastView().getMessageSet().flush(); // 只需刷新写数据的文件
            unflushed.set(0);
            lastflushedTime.set(System.currentTimeMillis());
        }
    }
FileMessageSet的刷盘方法

/**
     * Commit all written data to the physical disk
     *
     * @throws IOException
     */
    public void flush() throws IOException {
        checkMutable(); // 检查文件是否可修改的.如果不能修改则抛出异常
        long startTime = System.currentTimeMillis();
        channel.force(true); // 最终调用fileChannel的force方法
        long elapsedTime = System.currentTimeMillis() - startTime;
        LogFlushStats.recordFlushRequest(elapsedTime);
        logger.debug("flush time " + elapsedTime);
        setHighWaterMark.set(getSizeInBytes());
        logger.debug("flush high water mark:" + highWaterMark());
    }

 4.LogManager获取或者创建一个新的分区

/**
     * Create the log if it does not exist or return back exist log
     *
     * @param topic     the topic name
     * @param partition the partition id
     * @return read or create a log
     * @throws IOException any IOException
     */
    public ILog getOrCreateLog(String topic, int partition) throws IOException {
        final int configPartitionNumber = getPartition(topic);
        if (partition >= configPartitionNumber) {
            throw new IOException("partition is bigger than the number of configuration: " + configPartitionNumber);
        }
        boolean hasNewTopic = false;
        Pool<Integer, Log> parts = getLogPool(topic, partition);
        if (parts == null) {
            Pool<Integer, Log> found = logs.putIfNotExists(topic, new Pool<Integer, Log>());
            if (found == null) {
                hasNewTopic = true;
            }
            parts = logs.get(topic);
        }
        //
        Log log = parts.get(partition);
        if (log == null) { // 如果Log不存在则创建
            log = createLog(topic, partition);
            Log found = parts.putIfNotExists(partition, log);
            if (found != null) {
                Closer.closeQuietly(log, logger);
                log = found;
            } else {
                logger.info(format("Created log for [%s-%d], now create other logs if necessary", topic, partition));
                final int configPartitions = getPartition(topic);
                for (int i = 0; i < configPartitions; i++) {
                    getOrCreateLog(topic, i);
                }
            }
        }
        if (hasNewTopic && config.getEnableZookeeper()) {
            topicRegisterTasks.add(new TopicTask(TaskType.CREATE, topic));
        }
        return log;
    }
新建分区(topic-partition)
 private Log createLog(String topic, int partition) throws IOException {
        synchronized (logCreationLock) { // 新建文件夹 top
            File d = new File(logDir, topic + "-" + partition);
            d.mkdirs();
            return new Log(d, partition, this.rollingStategy, flushInterval, false, maxMessageSize);
        }
    }

5Log加载,在实例化Log时就会加载segment文件,如:offset1.jafka

private SegmentList loadSegments() throws IOException {
        List<LogSegment> accum = new ArrayList<LogSegment>();
        File[] ls = dir.listFiles(new FileFilter() {  // 加载topic-partition文件下所有的.jafka后缀文件

            public boolean accept(File f) {
                return f.isFile() && f.getName().endsWith(FileSuffix);
            }
        });
        logger.info("loadSegments files from [" + dir.getAbsolutePath() + "]: " + ls.length);
        int n = 0;
        for (File f : ls) {
            n++;
            String filename = f.getName();
            long start = Long.parseLong(filename.substring(0, filename.length() - FileSuffix.length())); // 文件名为偏移量+.jafka
            final String logFormat = "LOADING_LOG_FILE[%2d], start(offset)=%d, size=%d, path=%s";
            logger.info(String.format(logFormat, n, start, f.length(), f.getAbsolutePath()));
            FileMessageSet messageSet = new FileMessageSet(f, false); // FileMessageSet对应一个segment文件
            accum.add(new LogSegment(f, messageSet, start));
        }
        if (accum.size() == 0) {
            // no existing segments, create a new mutable segment
            File newFile = new File(dir, Log.nameFromOffset(0));
            FileMessageSet fileMessageSet = new FileMessageSet(newFile, true);
            accum.add(new LogSegment(newFile, fileMessageSet, 0));
        } else {
            // there is at least one existing segment, validate and recover them/it
            // sort segments into ascending order for fast searching
            Collections.sort(accum); // 升序排序,根据offset可快速查找消息
            validateSegments(accum); // 校验每个文件的消息是否完整
        }
        // 最进的一个文件作为可读写,其他文件只要只读
        LogSegment last = accum.remove(accum.size() - 1);
        last.getMessageSet().close();
        logger.info("Loading the last segment " + last.getFile().getAbsolutePath() + " in mutable mode, recovery " + needRecovery);
        LogSegment mutable = new LogSegment(last.getFile(), new FileMessageSet(last.getFile(), true, new AtomicBoolean(
                needRecovery)), last.start());
        accum.add(mutable);
        return new SegmentList(name, accum);
    }

6.Log写

public List<Long> append(ByteBufferMessageSet messages) {
        //validate the messages
        messages.verifyMessageSize(maxMessageSize);
        int numberOfMessages = 0;
        for (MessageAndOffset messageAndOffset : messages) {
            if (!messageAndOffset.message.isValid()) {
                throw new InvalidMessageException();
            }
            numberOfMessages += 1;
        }
        //
        BrokerTopicStat.getBrokerTopicStat(getTopicName()).recordMessagesIn(numberOfMessages);
        BrokerTopicStat.getBrokerAllTopicStat().recordMessagesIn(numberOfMessages);
        logStats.recordAppendedMessages(numberOfMessages);

        // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
        ByteBuffer validByteBuffer = messages.getBuffer().duplicate();
        long messageSetValidBytes = messages.getValidBytes();
        if (messageSetValidBytes > Integer.MAX_VALUE || messageSetValidBytes < 0) throw new InvalidMessageSizeException(
                "Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests");

        validByteBuffer.limit((int) messageSetValidBytes);
        ByteBufferMessageSet validMessages = new ByteBufferMessageSet(validByteBuffer);

        // they are valid, insert them in the log
        synchronized (lock) {
            try {
                LogSegment lastSegment = segments.getLastView(); // 获取最新的segment
                long[] writtenAndOffset = lastSegment.getMessageSet().append(validMessages); // 写入文件
                if (logger.isTraceEnabled()) {
                    logger.trace(String.format("[%s,%s] save %d messages, bytes %d", name, lastSegment.getName(),
                            numberOfMessages, writtenAndOffset[0]));
                }
                maybeFlush(numberOfMessages); // 如果超过flushInterval个消息没有刷盘则同步刷盘
                maybeRoll(lastSegment); // 根据滚动策略创建segment

            } catch (IOException e) {
                logger.error("Halting due to unrecoverable I/O error while handling producer request", e);
                Runtime.getRuntime().halt(1);
            } catch (RuntimeException re) {
                throw re;
            }
        }
        return (List<Long>) null;
    }

FileMessageSet的append方法

/**
     * Append this message to the message set
     *
     * @return the written size and first offset
     * @throws IOException
     */
    public long[] append(MessageSet messages) throws IOException {
        checkMutable();
        long written = 0L;
        while (written < messages.getSizeInBytes())
            written += messages.writeTo(channel, 0, messages.getSizeInBytes()); // FileMessageSet的写磁盘方法
        long beforeOffset = setSize.getAndAdd(written);
        return new long[]{written, beforeOffset};
    }
ByteBufferMessageSet的wirteTo方法

public long writeTo(GatheringByteChannel channel, long offset, long maxSize) throws IOException {
        buffer.mark();
        int written = channel.write(buffer); // 只是写入channel,并没有直接落盘
        buffer.reset();
        return written;
    }

7.Log读.有个需要注意的地方,如果传入的offset越界则直接抛出异常,需要客户端重新定位offset

/**
     * read messages beginning from offset
     * 
     * @param offset next message offset
     * @param length the max package size
     * @return a MessageSet object with length data or empty
     * @see MessageSet#Empty
     * @throws IOException
     */
    public MessageSet read(long offset, int length) throws IOException {
        List<LogSegment> views = segments.getView(); // 获取segment视图
        LogSegment found = findRange(views, offset, views.size()); // 二分查找定位segment.
        if (found == null) {
            if (logger.isTraceEnabled()) {
                logger.trace(format("NOT FOUND MessageSet from Log[%s], offset=%d, length=%d", name, offset, length));
            }
            return MessageSet.Empty;
        }
        return found.getMessageSet().read(offset - found.start(), length); // 总的位移减去当前sement的位移则为当前sement的消息位置
    }

FileMessageSet的read方法。read方法直接new的一个对象返回,多个consumer group可以并发重复消费消息

/**
     * read message from file
     *
     * @param readOffset offset in this channel(file);not the message offset
     * @param size       max data size
     * @return messages sharding data with file log
     * @throws IOException reading file failed
     */
    public MessageSet read(long readOffset, long size) throws IOException {
        return new FileMessageSet(channel, this.offset + readOffset, //
                Math.min(this.offset + readOffset + size, highWaterMark()), false, new AtomicBoolean(false));
    }
FileMessageSet的迭代方法。迭代放回的是Message和Offset的组合,消费则迭代

public Iterator<MessageAndOffset> iterator() {
        return new IteratorTemplate<MessageAndOffset>() { // 自定义的迭代模板

            long location = offset;

            @Override
            protected MessageAndOffset makeNext() {
                try {

                    ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
                    channel.read(sizeBuffer, location); //读取消息长度
                    if (sizeBuffer.hasRemaining()) {
                        return allDone(); // 没有读到则终止.segment已读完
                    }
                    sizeBuffer.rewind();
                    int size = sizeBuffer.getInt();
                    if (size < Message.MinHeaderSize) { // 读到的消息长度大于最大的消息长度
                        return allDone();
                    }
                    ByteBuffer buffer = ByteBuffer.allocate(size);
                    channel.read(buffer, location + 4);
                    if (buffer.hasRemaining()) { // 实际消息长度小于消息头写的消息长度
                        return allDone();
                    }
                    buffer.rewind();
                    location += size + 4;

                    return new MessageAndOffset(new Message(buffer), location); // 读到真正的消息,则返回
                } catch (IOException e) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        };
    }


     以上为jafka的日志存储代码主要实现功能,真正的文件操作由FileChannel的read、write、force方法完成,jafka主要完成文件的管理、控制刷盘、zookeeper注册。

  

 类似资料: