golang 基于文件的消息队列 ---> diskqueue

金珂
2023-12-01

前言:

提到消息队列,首先想到:rabbit、kafka、redis/cordis、zeromq这种分布式的消息队列,基于内存缓存和服务发现算法,跨节点的这种消息发布订阅机制。
有时候的需求可能比较简单,需要一个可以不溢出的本地消息队列,diskqueue就是这种设计目标,基于文件的消息队列。

源码目录:


total 2630
drwxr-xr-x 1 Administrator 197121       0  4月  8 16:31 diskqueue/
-rw-r--r-- 1 Administrator 197121      59  4月  7 17:42 go.mod
-rw-r--r-- 1 Administrator 197121     523  4月  7 17:42 go.sum
-rw-r--r-- 1 Administrator 197121    2264  4月  8 16:17 main.go
-rwxr-xr-x 1 Administrator 197121 2683392  4月  8 16:28 test.exe*
drwxr-xr-x 1 Administrator 197121       0  4月  8 16:31 tmp/

测试源码


```go
// main.go

package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"

	disk "test/diskqueue"

	"github.com/minio/cli"
)

func DqueConsumer(ctx *cli.Context) {
	syncTicker := time.NewTicker(time.Second * 10000000)
	/*
		dq := disk.New("deque", "/opt/dque", 1024, 4, 1<<10, 2500, 2*time.Second,
			func(lvl disk.LogLevel, f string, args ...interface{}) {
				//fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
			})
	*/
	dqName := "test"
	tmpDir := "tmp"
	dq := disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 2*time.Second, 1*time.Second,
		func(lvl disk.LogLevel, f string, args ...interface{}) {
			fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
		})

	go func() {
		for {
			select {
			case ms := <-dq.ReadChan():
				fmt.Println(">>>>>>>>>>>>>>>>>   " + string(ms))

				if ms == nil {
					dq.Close()
					dq = disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 2*time.Second, 1*time.Second,
						func(lvl disk.LogLevel, f string, args ...interface{}) {
							fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
						})

					time.Sleep(time.Millisecond * 1000)
					//fmt.Println("read over !!!! exit!!!!!!!!!!!!!!!!!!")
					//defer wg.Done()
					//break
				}
			case <-syncTicker.C:
				dq.Close()
				time.Sleep(time.Millisecond * 2000)
				dq = disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 5*time.Second, 1*time.Second,
					func(lvl disk.LogLevel, f string, args ...interface{}) {
						fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
					})

			}
		}
	}()
}

var wg sync.WaitGroup

func main() {

	dqName := "test"
	tmpDir := "tmp"
	dq := disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 2*time.Second, 1*time.Second,
		func(lvl disk.LogLevel, f string, args ...interface{}) {
			fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
		})

	
	go func() {
		wg.Add(1)
		for i := 0; i < 1000; i++ {
			dq.Put([]byte("hello worker," + strconv.Itoa(i) + "\n"))
			time.Sleep(time.Millisecond * 10)
		}
		defer wg.Done()
	}()
	time.Sleep(time.Millisecond * 1000)

	//cnt := 0

	///*
		go func() {
			wg.Add(1)
			cnt := 0
			for {
				select {
				case ms := <-dq.ReadChan():
					cnt++
					fmt.Println("<<<<<<<<<<<<<<<<<<<<<< " + string(ms))
				}
			}
			defer wg.Done()
		}()
	//*/

	//wg.Add(1)
	//go DqueConsumer(nil)
	time.Sleep(time.Millisecond * 1000)
	wg.Wait()
	fmt.Print("end")
}

// diskqueue.go
package diskqueue

import (
	"bufio"
	"bytes"
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"math/rand"
	"os"
	"path"
	"sync"
	"sync/atomic"
	"time"
)

// logging stuff copied from github.com/nsqio/nsq/internal/lg

type LogLevel int

const (
	DEBUG = LogLevel(1)
	INFO  = LogLevel(2)
	WARN  = LogLevel(3)
	ERROR = LogLevel(4)
	FATAL = LogLevel(5)
)

type AppLogFunc func(lvl LogLevel, f string, args ...interface{})

func (l LogLevel) String() string {
	switch l {
	case 1:
		return "DEBUG"
	case 2:
		return "INFO"
	case 3:
		return "WARNING"
	case 4:
		return "ERROR"
	case 5:
		return "FATAL"
	}
	panic("invalid LogLevel")
}

type Interface interface {
	Put([]byte) error
	ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms

	// run-time state (also persisted to disk) 运行状态(会持久化到磁盘文件)
	readPos      int64
	writePos     int64
	readFileNum  int64
	writeFileNum int64
	depth        int64 // 存在消息条数。写一条加1,读一条减1

	sync.RWMutex

	// instantiation time metadata
	name            string // 消息文件名前缀
	dataPath        string // 消息文件目录
	maxBytesPerFile int64  // currently this cannot change once created. 每个文件的最大字节数
	minMsgSize      int32
	maxMsgSize      int32
	syncEvery       int64         // number of writes per fsync. syncEvery条消息持久化
	syncTimeout     time.Duration // duration of time per fsync. syncTimeout 超时(单位是秒),消息持久化
	readTimeout     time.Duration // duration of time per fsync. syncTimeout 超时(单位是秒),消息持久化
	exitFlag        int32
	needSync        bool

	// keeps track of the position where we have read
	// (but not yet sent over readChan)
	nextReadPos     int64
	nextReadFileNum int64

	readFile  *os.File
	writeFile *os.File
	reader    *bufio.Reader
	writeBuf  bytes.Buffer

	// exposed via ReadChan()
	readChan chan []byte

	// internal channels
	writeChan         chan []byte
	writeResponseChan chan error
	emptyChan         chan int
	emptyResponseChan chan error
	exitChan          chan int
	exitSyncChan      chan int

	logf AppLogFunc // 日志回调函数
}

// New instantiates an instance of diskQueue, retrieving metadata
// from the filesystem and starting the read ahead goroutine
func New(name string, dataPath string, maxBytesPerFile int64,
	minMsgSize int32, maxMsgSize int32,
	syncEvery int64, syncTimeout time.Duration, readTimeout time.Duration, logf AppLogFunc) Interface {
	d := diskQueue{
		name:              name,
		dataPath:          dataPath,
		maxBytesPerFile:   maxBytesPerFile,
		minMsgSize:        minMsgSize,
		maxMsgSize:        maxMsgSize,
		readChan:          make(chan []byte),
		writeChan:         make(chan []byte),
		writeResponseChan: make(chan error),
		emptyChan:         make(chan int),
		emptyResponseChan: make(chan error),
		exitChan:          make(chan int),
		exitSyncChan:      make(chan int),
		syncEvery:         syncEvery,
		syncTimeout:       syncTimeout,
		readTimeout:       readTimeout,
		logf:              logf,
	}

	// no need to lock here, nothing else could possibly be touching this instance. 加载消息队列元数据
	err := d.retrieveMetaData()
	if err != nil && !os.IsNotExist(err) {
		d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
	}

	go d.ioLoop() // 开启main loop go routine,select chan,执行任务
	return &d     // 返回消息队列指针
}

// Depth returns the depth of the queue. 消息队列深度(消息条数)
func (d *diskQueue) Depth() int64 {
	return atomic.LoadInt64(&d.depth)
}

// ReadChan returns the receive-only []byte channel for reading data
func (d *diskQueue) ReadChan() <-chan []byte {
	return d.readChan
}

// Put writes a []byte to the queue. 向写通道写入数据,main loop会读取数据写入消息文件
func (d *diskQueue) Put(data []byte) error {
	d.RLock()
	defer d.RUnlock()

	if d.exitFlag == 1 {
		return errors.New("exiting")
	}

	d.writeChan <- data
	return <-d.writeResponseChan
}

// Close cleans up the queue and persists metadata. 关系消息读文件、写文件。持久化d到元数据文件
func (d *diskQueue) Close() error {
	err := d.exit(false)
	if err != nil {
		return err
	}
	return d.sync()
}

func (d *diskQueue) Delete() error {
	return d.exit(true)
}

func (d *diskQueue) exit(deleted bool) error {
	d.Lock()
	defer d.Unlock()

	d.exitFlag = 1

	if deleted {
		d.logf(INFO, "DISKQUEUE(%s): deleting", d.name)
	} else {
		d.logf(INFO, "DISKQUEUE(%s): closing", d.name)
	}

	// 关闭退出通道,通知退出前同步文件
	close(d.exitChan)
	// ensure that ioLoop has exited
	<-d.exitSyncChan

	// 关闭读写文件
	if d.readFile != nil {
		d.readFile.Close()
		d.readFile = nil
	}

	if d.writeFile != nil {
		d.writeFile.Close()
		d.writeFile = nil
	}

	return nil
}

/*
 Empty destructively clears out any pending data in the queue by fast forwarding read positions and removing intermediate files
通过快速转发读取位置并删除中间文件,Empty破坏性地清除队列中的所有未决数据
*/
func (d *diskQueue) Empty() error {
	d.RLock()
	defer d.RUnlock()

	if d.exitFlag == 1 {
		return errors.New("exiting")
	}

	d.logf(INFO, "DISKQUEUE(%s): emptying", d.name)

	d.emptyChan <- 1
	return <-d.emptyResponseChan
}

// 删除全部消息文件、更新d对象状态、删除元数据文件
func (d *diskQueue) deleteAllFiles() error {
	err := d.skipToNextRWFile()

	innerErr := os.Remove(d.metaDataFileName())
	if innerErr != nil && !os.IsNotExist(innerErr) {
		d.logf(ERROR, "DISKQUEUE(%s) failed to remove metadata file - %s", d.name, innerErr)
		return innerErr
	}

	return err
}

// 删除全部消息文件、更新d对象的状态
func (d *diskQueue) skipToNextRWFile() error {
	var err error

	// 关闭读、写文件
	if d.readFile != nil {
		d.readFile.Close()
		d.readFile = nil
	}

	if d.writeFile != nil {
		d.writeFile.Close()
		d.writeFile = nil
	}

	// 删除全部消息文件
	for i := d.readFileNum; i <= d.writeFileNum; i++ {
		fn := d.fileName(i)
		innerErr := os.Remove(fn)
		if innerErr != nil && !os.IsNotExist(innerErr) {
			d.logf(ERROR, "DISKQUEUE(%s) failed to remove data file - %s", d.name, innerErr)
			err = innerErr
		}
	}

	// 更新元数据状态
	d.writeFileNum++
	d.writePos = 0
	d.readFileNum = d.writeFileNum
	d.readPos = 0
	d.nextReadFileNum = d.writeFileNum
	d.nextReadPos = 0
	atomic.StoreInt64(&d.depth, 0)

	return err
}

// readOne performs a low level filesystem read for a single []byte
// while advancing read positions and rolling files, if necessary
func (d *diskQueue) readOne() ([]byte, error) {
	var err error
	var msgSize int32

	if d.readFile == nil {
		curFileName := d.fileName(d.readFileNum)
		d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
		if err != nil {
			return nil, err
		}

		d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s, time is:%+v", d.name, curFileName, time.Now())

		if d.readPos > 0 {
			_, err = d.readFile.Seek(d.readPos, 0)
			if err != nil {
				d.readFile.Close()
				d.readFile = nil
				return nil, err
			}
		}

		d.reader = bufio.NewReader(d.readFile)
	}

	err = binary.Read(d.reader, binary.BigEndian, &msgSize)
	if err != nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}

	if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
		// this file is corrupt and we have no reasonable guarantee on
		// where a new message should begin
		d.readFile.Close()
		d.readFile = nil
		return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
	}

	readBuf := make([]byte, msgSize)
	_, err = io.ReadFull(d.reader, readBuf)
	if err != nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}

	totalBytes := int64(4 + msgSize)

	// we only advance next* because we have not yet sent this to consumers
	// (where readFileNum, readPos will actually be advanced)
	d.nextReadPos = d.readPos + totalBytes
	d.nextReadFileNum = d.readFileNum

	// TODO: each data file should embed the maxBytesPerFile
	// as the first 8 bytes (at creation time) ensuring that
	// the value can change without affecting runtime
	if d.nextReadPos > d.maxBytesPerFile {
		if d.readFile != nil {
			d.readFile.Close()
			d.readFile = nil
		}

		d.nextReadFileNum++
		d.nextReadPos = 0
	}

	return readBuf, nil
}

// writeOne performs a low level filesystem write for a single []byte
// while advancing write positions and rolling files, if necessary
func (d *diskQueue) writeOne(data []byte) error {
	var err error

	if d.writeFile == nil {
		curFileName := d.fileName(d.writeFileNum)
		d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
		if err != nil {
			return err
		}

		d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)

		if d.writePos > 0 {
			_, err = d.writeFile.Seek(d.writePos, 0)
			if err != nil {
				d.writeFile.Close()
				d.writeFile = nil
				return err
			}
		}
	}

	dataLen := int32(len(data))

	if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
		return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
	}

	d.writeBuf.Reset()
	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
	if err != nil {
		return err
	}

	_, err = d.writeBuf.Write(data)
	if err != nil {
		return err
	}

	// only write to the file once
	_, err = d.writeFile.Write(d.writeBuf.Bytes())
	if err != nil {
		d.writeFile.Close()
		d.writeFile = nil
		return err
	}

	totalBytes := int64(4 + dataLen)
	d.writePos += totalBytes
	atomic.AddInt64(&d.depth, 1)

	if d.writePos >= d.maxBytesPerFile {
		d.writeFileNum++
		d.writePos = 0

		// sync every time we start writing to a new file
		err = d.sync()
		if err != nil {
			d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
		}

		if d.writeFile != nil {
			d.writeFile.Close()
			d.writeFile = nil
		}
	}

	return err
}

// sync fsyncs the current writeFile and persists metadata. 同步当前的写文件,同步d到元数据文件
func (d *diskQueue) sync() error {
	if d.writeFile != nil {
		err := d.writeFile.Sync()
		if err != nil {
			d.writeFile.Close()
			d.writeFile = nil
			return err
		}
	}

	err := d.persistMetaData()
	if err != nil {
		return err
	}

	d.needSync = false
	return nil
}

// retrieveMetaData initializes state from the filesystem. 从文件加载消息队列元数据到d指向的变量
func (d *diskQueue) retrieveMetaData() error {
	var f *os.File
	var err error

	fileName := d.metaDataFileName()
	f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
	if err != nil {
		return err
	}
	defer f.Close()

	var depth int64
	_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
		&depth,
		&d.readFileNum, &d.readPos,
		&d.writeFileNum, &d.writePos)
	if err != nil {
		return err
	}
	atomic.StoreInt64(&d.depth, depth)
	d.nextReadFileNum = d.readFileNum
	d.nextReadPos = d.readPos

	return nil
}

// persistMetaData atomically writes state to the filesystem. 将d指向的对象写入元数据文件,如果元数据文件不存在则新创建一个
func (d *diskQueue) persistMetaData() error {
	var f *os.File
	var err error

	fileName := d.metaDataFileName()
	tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

	// write to tmp file
	f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
	if err != nil {
		return err
	}

	_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
		atomic.LoadInt64(&d.depth),
		d.readFileNum, d.readPos,
		d.writeFileNum, d.writePos)
	if err != nil {
		f.Close()
		return err
	}
	f.Sync()
	f.Close()

	// atomically rename
	return os.Rename(tmpFileName, fileName)
}

// 返回拼接的元数据文件名
func (d *diskQueue) metaDataFileName() string {
	return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name)
}

// 返回消息文件的文件名
func (d *diskQueue) fileName(fileNum int64) string {
	return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
}

func (d *diskQueue) checkTailCorruption(depth int64) {
	// 如果还有消息文件没有读完,则立即返回
	if d.readFileNum < d.writeFileNum || d.readPos < d.writePos {
		return
	}

	// 消息已经读完,清理全部的消息文件 now !!!
	// we've reached the end of the diskqueue
	// if depth isn't 0 something went wrong
	if depth != 0 {
		if depth < 0 {
			d.logf(ERROR,
				"DISKQUEUE(%s) negative depth at tail (%d), metadata corruption, resetting 0...",
				d.name, depth)
		} else if depth > 0 {
			d.logf(ERROR,
				"DISKQUEUE(%s) positive depth at tail (%d), data loss, resetting 0...",
				d.name, depth)
		}
		// force set depth 0. 强制将未读消息条数写为0条
		atomic.StoreInt64(&d.depth, 0)
		d.needSync = true
	}

	if d.readFileNum != d.writeFileNum || d.readPos != d.writePos {
		if d.readFileNum > d.writeFileNum {
			d.logf(ERROR,
				"DISKQUEUE(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
				d.name, d.readFileNum, d.writeFileNum)
		}

		if d.readPos > d.writePos {
			d.logf(ERROR,
				"DISKQUEUE(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
				d.name, d.readPos, d.writePos)
		}

		d.skipToNextRWFile()
		d.needSync = true
	}
}

// 删除读取的上一条消息文件(如果光标进入了下一个文件)
func (d *diskQueue) moveForward() {
	oldReadFileNum := d.readFileNum
	d.readFileNum = d.nextReadFileNum
	d.readPos = d.nextReadPos
	depth := atomic.AddInt64(&d.depth, -1)

	// see if we need to clean up the old file
	if oldReadFileNum != d.nextReadFileNum {
		// sync every time we start reading from a new file
		d.needSync = true

		fn := d.fileName(oldReadFileNum)
		err := os.Remove(fn)
		if err != nil {
			d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
		}
	}

	d.checkTailCorruption(depth)
}

// 如果文件读取错误,则将文件重命名(加.bad后缀),读文件号光标+1、设置need同步flag
func (d *diskQueue) handleReadError() {
	// jump to the next read file and rename the current (bad) file
	if d.readFileNum == d.writeFileNum {
		// if you can't properly read from the current write file it's safe to
		// assume that something is fucked and we should skip the current file too
		if d.writeFile != nil {
			d.writeFile.Close()
			d.writeFile = nil
		}
		d.writeFileNum++
		d.writePos = 0
	}

	badFn := d.fileName(d.readFileNum)
	badRenameFn := badFn + ".bad"

	d.logf(WARN,
		"DISKQUEUE(%s) jump to next file and saving bad file as %s",
		d.name, badRenameFn)

	err := os.Rename(badFn, badRenameFn)
	if err != nil {
		d.logf(ERROR,
			"DISKQUEUE(%s) failed to rename bad diskqueue file %s to %s",
			d.name, badFn, badRenameFn)
	}

	d.readFileNum++
	d.readPos = 0
	d.nextReadFileNum = d.readFileNum
	d.nextReadPos = 0

	// significant state change, schedule a sync on the next iteration
	d.needSync = true
}

// ioLoop provides the backend for exposing a go channel (via ReadChan())
// in support of multiple concurrent queue consumers
//
// it works by looping and branching based on whether or not the queue has data
// to read and blocking until data is either read or written over the appropriate
// go channels
//
// conveniently this also means that we're asynchronously reading from the filesystem
func (d *diskQueue) ioLoop() {
	var dataRead []byte
	var err error
	var count int64
	var r chan []byte

	syncTicker := time.NewTicker(d.syncTimeout)
	readTicker := time.NewTicker(d.readTimeout)

	for {
		// dont sync all the time :) 积累到目标条数强制同步
		if count == d.syncEvery {
			d.needSync = true
		}

		if d.needSync {
			err = d.sync()
			if err != nil {
				d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
			}
			count = 0
		}

		// 有未读的消息文件,或读位置小于写位置
		if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
			if d.nextReadPos == d.readPos {
				dataRead, err = d.readOne()
				if err != nil {
					d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
						d.name, d.readPos, d.fileName(d.readFileNum), err)
					d.handleReadError()
					continue
				} else {
					//d.readChan <- dataRead
					//count++
					// moveForward sets needSync flag if a file is removed
					//d.moveForward()
					fmt.Println("read message" + string(dataRead))
				}
			}
			r = d.readChan // r指向读chan []byte
		} else {
			r = nil
		}

		select {
		// the Go channel spec dictates that nil channel operations (read or write)
		// in a select are skipped, we set r to d.readChan only when there is data to read
		case r <- dataRead:
			//case d.readChan <- dataRead:
			count++
			// moveForward sets needSync flag if a file is removed
			d.moveForward()
		case <-readTicker.C:
			/*if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
				if d.nextReadPos == d.readPos {
					dataRead, err = d.readOne()
					if err != nil {
						d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
							d.name, d.readPos, d.fileName(d.readFileNum), err)
						d.handleReadError()
						continue
					} else {
						//d.readChan <- dataRead
						//count++
						// moveForward sets needSync flag if a file is removed
						//d.moveForward()
						fmt.Println("read message" + string(dataRead))
					}
				}
				r = d.readChan // r指向读chan []byte
			}*/
		case <-d.emptyChan:
			d.emptyResponseChan <- d.deleteAllFiles()
			count = 0
		case dataWrite := <-d.writeChan:
			count++
			d.writeResponseChan <- d.writeOne(dataWrite)
		case <-syncTicker.C:
			if count == 0 {
				// avoid sync when there's no activity
				continue
			}
			d.needSync = true
		case <-d.exitChan:
			goto exit
		}
	}

exit:
	d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
	syncTicker.Stop()
	d.exitSyncChan <- 1
}

 类似资料: