PostgreSQL数据库SI Message——Cache同步

郑哲彦
2023-12-01

在PostgreSQL中,每个进程都有属于自己的Cache。换句话说,同一个系统表在不同的进程中都有对应的Cache来缓存它的元组(对于RelCache来说缓存的是一个RelationData结构)。同一个系统表的元组可能同时被多个进程的Cache所缓存,当其中某个Cache中的一个元组被删除或更新时,需要通知其他进程对其Cache进行同步。在PostgreSQL的实现中,会记录下已被删除的无效元组,并通过SI Message方式(即共享消息队列方式)在进程之间传递这一消息。收到无效消息的进程将同步地把无效元组(或RelationData结构)从自己的Cache中删除。
为了实现SI Message这一功能,PostgreSQL在共享内存中开辟了shmInvalBuffer记录系统中所发出的所有无效消息以及所有进程处理无消息的进度。shmInvalBuffer是一个全局变量,其数据类型为SISeg。

/* Shared cache invalidation memory segment */
typedef struct SISeg {
	/* General state information */
	int			minMsgNum;		/* oldest message still needed */
	int			maxMsgNum;		/* next message number to be assigned */
	int			nextThreshold;	/* # of messages to call SICleanupQueue */
	int			lastBackend;	/* index of last active procState entry, +1 */
	int			maxBackends;	/* size of procState array */
	slock_t		msgnumLock;		/* spinlock protecting maxMsgNum */
	/* Circular buffer holding shared-inval messages */
	SharedInvalidationMessage buffer[MAXNUMMESSAGES];
	/* Per-backend invalidation state info (has MaxBackends entries). */
	ProcState	procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg;
static SISeg *shmInvalBuffer;	/* pointer to the shared inval buffer */

在shmInvalBuffer中,无效消息存储在由Buffer字段指定的定长数组中(其长度MAXNUMMESSAGES预定义为4096)。该数组中每个元素存储一个无效消息,也可以称该数组为无效消息队列。无效消息队列实际是一个环状结构,最初数组为空时,新来的无效消息从前向后依次存放在数组的元素中,当数组被放满之后,新的无效消息将回到Buffer数组的头部开始插入。minMsgNum字段记录Buffer中还未被所有进程处理的无效消息编号中的最小值,maxMsgNum字段记录下一个可以用于存放新无效消息的数组元素下标。实际上,minMsgNum指出了Buffer中还没有被所有进程处理的无效消息的下界,而maxMsgNum则指出了上界,即编号比minMsgNum小的无效消息是已经被所有进程处理完的,而编号大于等于maxMsgNum的无效消息是还没有产生的,而两者之间的无效消息则是至少还有一个进程没有对其进行处理。因此在无效消息队列构成的环中,除了minMsgNum和maxMsgNum之间的位置之外,其他位置都可以用来存放新增加的无效消息。

typedef union
{
	int8		id;				/* type field --- must be first */
	SharedInvalCatcacheMsg cc;
	SharedInvalCatalogMsg cat;
	SharedInvalRelcacheMsg rc;
	SharedInvalSmgrMsg sm;
	SharedInvalRelmapMsg rm;
	SharedInvalSnapshotMsg sn;
} SharedInvalidationMessage;

PostgreSQL在shmInvalBuffer中用一个ProcState数组(procState字段)来存储正在读取无效消息的进程的读取进度,该数组的大小与系统允许的最大进程数MaxBackends有关,在默认情况下这个数组的大小为100(系统的默认最大进程数为100,可在postgresql.conf中修改)。ProcState记录了PID为procPid的进程读取无效消息的状态,其中nextMsgNum的值介于shmInvalBuffer的minMsgNum值和maxMsgNum值之间

/* Per-backend state in shared invalidation structure */
typedef struct ProcState {
	/* procPid is zero in an inactive ProcState array entry. */
	pid_t		procPid;		/* PID of backend, for signaling */
	PGPROC	   *proc;			/* PGPROC of backend */
	/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
	int			nextMsgNum;		/* next message number to read */
	bool		resetState;		/* backend needs to reset its state */
	bool		signaled;		/* backend has been sent catchup signal */
	bool		hasMessages;	/* backend has unread messages */
	/* Backend only sends invalidations, never receives them. This only makes
	 * sense for Startup process during recovery because it doesn't maintain a
	 * relcache, yet it fires inval messages to allow query backends to see
	 * schema changes. */
	bool		sendOnly;		/* backend only sends, never receives */
	/* Next LocalTransactionId to use for each idle backend slot.  We keep
	 * this here because it is indexed by BackendId and it is convenient to
	 * copy the value to and from local memory when MyBackendId is set. It's
	 * meaningless in an active ProcState entry. */
	LocalTransactionId nextLXID;
} ProcState;

初始化

SInvalShmemSize函数计算SInalShmem的大小为SISeg的大小加上其成员ProcState*MaxBackends的大小。

/* SInvalShmemSize --- return shared-memory space needed */
Size SInvalShmemSize(void) {
	Size		size;
	size = offsetof(SISeg, procState);
	size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
	return size;
}
/* CreateSharedInvalidationState Create and initialize the SI message buffer */
void CreateSharedInvalidationState(void) {
	int			i;
	bool		found;
	/* Allocate space in shared memory */
	shmInvalBuffer = (SISeg *) ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
	if (found) return;
	/* Clear message counters, save size of procState array, init spinlock */
	shmInvalBuffer->minMsgNum = 0;
	shmInvalBuffer->maxMsgNum = 0;
	shmInvalBuffer->nextThreshold = CLEANUP_MIN;
	shmInvalBuffer->lastBackend = 0;
	shmInvalBuffer->maxBackends = MaxBackends;
	SpinLockInit(&shmInvalBuffer->msgnumLock);
	/* The buffer[] array is initially all unused, so we need not fill it */
	/* Mark all backends inactive, and initialize nextLXID */
	for (i = 0; i < shmInvalBuffer->maxBackends; i++){
		shmInvalBuffer->procState[i].procPid = 0;	/* inactive */
		shmInvalBuffer->procState[i].proc = NULL;
		shmInvalBuffer->procState[i].nextMsgNum = 0;	/* meaningless */
		shmInvalBuffer->procState[i].resetState = false;
		shmInvalBuffer->procState[i].signaled = false;
		shmInvalBuffer->procState[i].hasMessages = false;
		shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
	}
}
void SharedInvalBackendInit(bool sendOnly) {
	int			index;
	ProcState  *stateP = NULL;
	SISeg	   *segP = shmInvalBuffer;
	/* This can run in parallel with read operations, but not with write operations, since SIInsertDataEntries relies on lastBackend to set hasMessages appropriately. */
	LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
	/* Look for a free entry in the procState array */
	for (index = 0; index < segP->lastBackend; index++){
		if (segP->procState[index].procPid == 0)	/* inactive slot? */{
			stateP = &segP->procState[index];
			break;
		}
	}
	if (stateP == NULL){
		if (segP->lastBackend < segP->maxBackends){
			stateP = &segP->procState[segP->lastBackend];
			Assert(stateP->procPid == 0);
			segP->lastBackend++;
		}else{
			/* out of procState slots: MaxBackends exceeded -- report normally */
			MyBackendId = InvalidBackendId;
			LWLockRelease(SInvalWriteLock);
			ereport(FATAL,(errcode(ERRCODE_TOO_MANY_CONNECTIONS),errmsg("sorry, too many clients already")));
		}
	}
	MyBackendId = (stateP - &segP->procState[0]) + 1;
	/* Advertise assigned backend ID in MyProc */
	MyProc->backendId = MyBackendId;
	/* Fetch next local transaction ID into local memory */
	nextLocalTransactionId = stateP->nextLXID;
	/* mark myself active, with all extant messages already read */
	stateP->procPid = MyProcPid;
	stateP->proc = MyProc;
	stateP->nextMsgNum = segP->maxMsgNum;
	stateP->resetState = false;
	stateP->signaled = false;
	stateP->hasMessages = false;
	stateP->sendOnly = sendOnly;
	LWLockRelease(SInvalWriteLock);
	/* register exit routine to mark my entry inactive at exit */
	on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
	elog(DEBUG4, "my backend ID is %d", MyBackendId);
}

 类似资料: