PostgreSQL Source Code  git master
sinvaladt.h File Reference
#include "storage/lock.h"
#include "storage/sinval.h"
Include dependency graph for sinvaladt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin, int *nsubxid, bool *overflowed)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Function Documentation

◆ BackendIdGetProc()

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 385 of file sinvaladt.c.

386 {
387  PGPROC *result = NULL;
388  SISeg *segP = shmInvalBuffer;
389 
390  /* Need to lock out additions/removals of backends */
391  LWLockAcquire(SInvalWriteLock, LW_SHARED);
392 
393  if (backendID > 0 && backendID <= segP->lastBackend)
394  {
395  ProcState *stateP = &segP->procState[backendID - 1];
396 
397  result = stateP->proc;
398  }
399 
400  LWLockRelease(SInvalWriteLock);
401 
402  return result;
403 }
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
Definition: proc.h:162
PGPROC * proc
Definition: sinvaladt.c:142
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and shmInvalBuffer.

Referenced by checkTempNamespaceStatus(), LogRecoveryConflict(), VirtualXactLock(), WaitForLockersMultiple(), and WaitForOlderSnapshots().

◆ BackendIdGetTransactionIds()

void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin,
int *  nsubxid,
bool overflowed 
)

Definition at line 412 of file sinvaladt.c.

414 {
415  SISeg *segP = shmInvalBuffer;
416 
417  *xid = InvalidTransactionId;
418  *xmin = InvalidTransactionId;
419  *nsubxid = 0;
420  *overflowed = false;
421 
422  /* Need to lock out additions/removals of backends */
423  LWLockAcquire(SInvalWriteLock, LW_SHARED);
424 
425  if (backendID > 0 && backendID <= segP->lastBackend)
426  {
427  ProcState *stateP = &segP->procState[backendID - 1];
428  PGPROC *proc = stateP->proc;
429 
430  if (proc != NULL)
431  {
432  *xid = proc->xid;
433  *xmin = proc->xmin;
434  *nsubxid = proc->subxidStatus.count;
435  *overflowed = proc->subxidStatus.overflowed;
436  }
437  }
438 
439  LWLockRelease(SInvalWriteLock);
440 }
TransactionId xmin
Definition: proc.h:178
XidCacheStatus subxidStatus
Definition: proc.h:254
TransactionId xid
Definition: proc.h:173
bool overflowed
Definition: proc.h:45
uint8 count
Definition: proc.h:43
#define InvalidTransactionId
Definition: transam.h:31

References XidCacheStatus::count, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), XidCacheStatus::overflowed, ProcState::proc, SISeg::procState, shmInvalBuffer, PGPROC::subxidStatus, PGPROC::xid, and PGPROC::xmin.

Referenced by pgstat_read_current_status().

◆ CreateSharedInvalidationState()

void CreateSharedInvalidationState ( void  )

Definition at line 227 of file sinvaladt.c.

228 {
229  int i;
230  bool found;
231 
232  /* Allocate space in shared memory */
233  shmInvalBuffer = (SISeg *)
234  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
235  if (found)
236  return;
237 
238  /* Clear message counters, save size of procState array, init spinlock */
245 
246  /* The buffer[] array is initially all unused, so we need not fill it */
247 
248  /* Mark all backends inactive, and initialize nextLXID */
249  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
250  {
251  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
252  shmInvalBuffer->procState[i].proc = NULL;
253  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
258  }
259 }
int MaxBackends
Definition: globals.c:140
int i
Definition: isn.c:73
#define InvalidLocalTransactionId
Definition: lock.h:65
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size SInvalShmemSize(void)
Definition: sinvaladt.c:203
#define CLEANUP_MIN
Definition: sinvaladt.c:132
#define SpinLockInit(lock)
Definition: spin.h:60
int nextMsgNum
Definition: sinvaladt.c:144
bool signaled
Definition: sinvaladt.c:146
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
pid_t procPid
Definition: sinvaladt.c:141
bool hasMessages
Definition: sinvaladt.c:147
bool resetState
Definition: sinvaladt.c:145
int minMsgNum
Definition: sinvaladt.c:172
int maxMsgNum
Definition: sinvaladt.c:173
int lastBackend
Definition: sinvaladt.c:175
slock_t msgnumLock
Definition: sinvaladt.c:178
int nextThreshold
Definition: sinvaladt.c:174
int maxBackends
Definition: sinvaladt.c:176

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), shmInvalBuffer, ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 780 of file sinvaladt.c.

781 {
782  LocalTransactionId result;
783 
784  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
785  do
786  {
787  result = nextLocalTransactionId++;
788  } while (!LocalTransactionIdIsValid(result));
789 
790  return result;
791 }
uint32 LocalTransactionId
Definition: c.h:638
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:66
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 266 of file sinvaladt.c.

267 {
268  int index;
269  ProcState *stateP = NULL;
270  SISeg *segP = shmInvalBuffer;
271 
272  /*
273  * This can run in parallel with read operations, but not with write
274  * operations, since SIInsertDataEntries relies on lastBackend to set
275  * hasMessages appropriately.
276  */
277  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
278 
279  /* Look for a free entry in the procState array */
280  for (index = 0; index < segP->lastBackend; index++)
281  {
282  if (segP->procState[index].procPid == 0) /* inactive slot? */
283  {
284  stateP = &segP->procState[index];
285  break;
286  }
287  }
288 
289  if (stateP == NULL)
290  {
291  if (segP->lastBackend < segP->maxBackends)
292  {
293  stateP = &segP->procState[segP->lastBackend];
294  Assert(stateP->procPid == 0);
295  segP->lastBackend++;
296  }
297  else
298  {
299  /*
300  * out of procState slots: MaxBackends exceeded -- report normally
301  */
303  LWLockRelease(SInvalWriteLock);
304  ereport(FATAL,
305  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
306  errmsg("sorry, too many clients already")));
307  }
308  }
309 
310  MyBackendId = (stateP - &segP->procState[0]) + 1;
311 
312  /* Advertise assigned backend ID in MyProc */
314 
315  /* Fetch next local transaction ID into local memory */
317 
318  /* mark myself active, with all extant messages already read */
319  stateP->procPid = MyProcPid;
320  stateP->proc = MyProc;
321  stateP->nextMsgNum = segP->maxMsgNum;
322  stateP->resetState = false;
323  stateP->signaled = false;
324  stateP->hasMessages = false;
325  stateP->sendOnly = sendOnly;
326 
327  LWLockRelease(SInvalWriteLock);
328 
329  /* register exit routine to mark my entry inactive at exit */
331 
332  elog(DEBUG4, "my backend ID is %d", MyBackendId);
333 }
#define InvalidBackendId
Definition: backendid.h:23
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:149
#define DEBUG4
Definition: elog.h:27
int MyProcPid
Definition: globals.c:44
BackendId MyBackendId
Definition: globals.c:85
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
Assert(fmt[strlen(fmt) - 1] !='\n')
@ LW_EXCLUSIVE
Definition: lwlock.h:116
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:344
PGPROC * MyProc
Definition: proc.c:66
BackendId backendId
Definition: proc.h:197
bool sendOnly
Definition: sinvaladt.c:155
Definition: type.h:95

References Assert(), PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog(), ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, on_shmem_exit(), PointerGetDatum(), ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 654 of file sinvaladt.c.

655 {
656  SISeg *segP = shmInvalBuffer;
657  int min,
658  minsig,
659  lowbound,
660  numMsgs,
661  i;
662  ProcState *needSig = NULL;
663 
664  /* Lock out all writers and readers */
665  if (!callerHasWriteLock)
666  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
667  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
668 
669  /*
670  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
671  * furthest-back backend that needs signaling (if any), and reset any
672  * backends that are too far back. Note that because we ignore sendOnly
673  * backends here it is possible for them to keep sending messages without
674  * a problem even when they are the only active backend.
675  */
676  min = segP->maxMsgNum;
677  minsig = min - SIG_THRESHOLD;
678  lowbound = min - MAXNUMMESSAGES + minFree;
679 
680  for (i = 0; i < segP->lastBackend; i++)
681  {
682  ProcState *stateP = &segP->procState[i];
683  int n = stateP->nextMsgNum;
684 
685  /* Ignore if inactive or already in reset state */
686  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
687  continue;
688 
689  /*
690  * If we must free some space and this backend is preventing it, force
691  * him into reset state and then ignore until he catches up.
692  */
693  if (n < lowbound)
694  {
695  stateP->resetState = true;
696  /* no point in signaling him ... */
697  continue;
698  }
699 
700  /* Track the global minimum nextMsgNum */
701  if (n < min)
702  min = n;
703 
704  /* Also see who's furthest back of the unsignaled backends */
705  if (n < minsig && !stateP->signaled)
706  {
707  minsig = n;
708  needSig = stateP;
709  }
710  }
711  segP->minMsgNum = min;
712 
713  /*
714  * When minMsgNum gets really large, decrement all message counters so as
715  * to forestall overflow of the counters. This happens seldom enough that
716  * folding it into the previous loop would be a loser.
717  */
718  if (min >= MSGNUMWRAPAROUND)
719  {
720  segP->minMsgNum -= MSGNUMWRAPAROUND;
721  segP->maxMsgNum -= MSGNUMWRAPAROUND;
722  for (i = 0; i < segP->lastBackend; i++)
723  {
724  /* we don't bother skipping inactive entries here */
726  }
727  }
728 
729  /*
730  * Determine how many messages are still in the queue, and set the
731  * threshold at which we should repeat SICleanupQueue().
732  */
733  numMsgs = segP->maxMsgNum - segP->minMsgNum;
734  if (numMsgs < CLEANUP_MIN)
735  segP->nextThreshold = CLEANUP_MIN;
736  else
737  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
738 
739  /*
740  * Lastly, signal anyone who needs a catchup interrupt. Since
741  * SendProcSignal() might not be fast, we don't want to hold locks while
742  * executing it.
743  */
744  if (needSig)
745  {
746  pid_t his_pid = needSig->procPid;
747  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
748 
749  needSig->signaled = true;
750  LWLockRelease(SInvalReadLock);
751  LWLockRelease(SInvalWriteLock);
752  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
753  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
754  if (callerHasWriteLock)
755  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
756  }
757  else
758  {
759  LWLockRelease(SInvalReadLock);
760  if (!callerHasWriteLock)
761  LWLockRelease(SInvalWriteLock);
762  }
763 }
int BackendId
Definition: backendid.h:21
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_CATCHUP_INTERRUPT
Definition: procsignal.h:32
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
#define SIG_THRESHOLD
Definition: sinvaladt.c:134

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog(), i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 550 of file sinvaladt.c.

551 {
552  SISeg *segP;
553  ProcState *stateP;
554  int max;
555  int n;
556 
557  segP = shmInvalBuffer;
558  stateP = &segP->procState[MyBackendId - 1];
559 
560  /*
561  * Before starting to take locks, do a quick, unlocked test to see whether
562  * there can possibly be anything to read. On a multiprocessor system,
563  * it's possible that this load could migrate backwards and occur before
564  * we actually enter this function, so we might miss a sinval message that
565  * was just added by some other processor. But they can't migrate
566  * backwards over a preceding lock acquisition, so it should be OK. If we
567  * haven't acquired a lock preventing against further relevant
568  * invalidations, any such occurrence is not much different than if the
569  * invalidation had arrived slightly later in the first place.
570  */
571  if (!stateP->hasMessages)
572  return 0;
573 
574  LWLockAcquire(SInvalReadLock, LW_SHARED);
575 
576  /*
577  * We must reset hasMessages before determining how many messages we're
578  * going to read. That way, if new messages arrive after we have
579  * determined how many we're reading, the flag will get reset and we'll
580  * notice those messages part-way through.
581  *
582  * Note that, if we don't end up reading all of the messages, we had
583  * better be certain to reset this flag before exiting!
584  */
585  stateP->hasMessages = false;
586 
587  /* Fetch current value of maxMsgNum using spinlock */
588  SpinLockAcquire(&segP->msgnumLock);
589  max = segP->maxMsgNum;
590  SpinLockRelease(&segP->msgnumLock);
591 
592  if (stateP->resetState)
593  {
594  /*
595  * Force reset. We can say we have dealt with any messages added
596  * since the reset, as well; and that means we should clear the
597  * signaled flag, too.
598  */
599  stateP->nextMsgNum = max;
600  stateP->resetState = false;
601  stateP->signaled = false;
602  LWLockRelease(SInvalReadLock);
603  return -1;
604  }
605 
606  /*
607  * Retrieve messages and advance backend's counter, until data array is
608  * full or there are no more messages.
609  *
610  * There may be other backends that haven't read the message(s), so we
611  * cannot delete them here. SICleanupQueue() will eventually remove them
612  * from the queue.
613  */
614  n = 0;
615  while (n < datasize && stateP->nextMsgNum < max)
616  {
617  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
618  stateP->nextMsgNum++;
619  }
620 
621  /*
622  * If we have caught up completely, reset our "signaled" flag so that
623  * we'll get another signal if we fall behind again.
624  *
625  * If we haven't caught up completely, reset the hasMessages flag so that
626  * we see the remaining messages next time.
627  */
628  if (stateP->nextMsgNum >= max)
629  stateP->signaled = false;
630  else
631  stateP->hasMessages = true;
632 
633  LWLockRelease(SInvalReadLock);
634  return n;
635 }
const void * data
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183

References SISeg::buffer, data, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 447 of file sinvaladt.c.

448 {
449  SISeg *segP = shmInvalBuffer;
450 
451  /*
452  * N can be arbitrarily large. We divide the work into groups of no more
453  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
454  * an unreasonably long time. (This is not so much because we care about
455  * letting in other writers, as that some just-caught-up backend might be
456  * trying to do SICleanupQueue to pass on its signal, and we don't want it
457  * to have to wait a long time.) Also, we need to consider calling
458  * SICleanupQueue every so often.
459  */
460  while (n > 0)
461  {
462  int nthistime = Min(n, WRITE_QUANTUM);
463  int numMsgs;
464  int max;
465  int i;
466 
467  n -= nthistime;
468 
469  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
470 
471  /*
472  * If the buffer is full, we *must* acquire some space. Clean the
473  * queue and reset anyone who is preventing space from being freed.
474  * Otherwise, clean the queue only when it's exceeded the next
475  * fullness threshold. We have to loop and recheck the buffer state
476  * after any call of SICleanupQueue.
477  */
478  for (;;)
479  {
480  numMsgs = segP->maxMsgNum - segP->minMsgNum;
481  if (numMsgs + nthistime > MAXNUMMESSAGES ||
482  numMsgs >= segP->nextThreshold)
483  SICleanupQueue(true, nthistime);
484  else
485  break;
486  }
487 
488  /*
489  * Insert new message(s) into proper slot of circular buffer
490  */
491  max = segP->maxMsgNum;
492  while (nthistime-- > 0)
493  {
494  segP->buffer[max % MAXNUMMESSAGES] = *data++;
495  max++;
496  }
497 
498  /* Update current value of maxMsgNum using spinlock */
499  SpinLockAcquire(&segP->msgnumLock);
500  segP->maxMsgNum = max;
501  SpinLockRelease(&segP->msgnumLock);
502 
503  /*
504  * Now that the maxMsgNum change is globally visible, we give everyone
505  * a swift kick to make sure they read the newly added messages.
506  * Releasing SInvalWriteLock will enforce a full memory barrier, so
507  * these (unlocked) changes will be committed to memory before we exit
508  * the function.
509  */
510  for (i = 0; i < segP->lastBackend; i++)
511  {
512  ProcState *stateP = &segP->procState[i];
513 
514  stateP->hasMessages = true;
515  }
516 
517  LWLockRelease(SInvalWriteLock);
518  }
519 }
#define Min(x, y)
Definition: c.h:988
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:654

References SISeg::buffer, data, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

◆ SInvalShmemSize()

Size SInvalShmemSize ( void  )

Definition at line 203 of file sinvaladt.c.

204 {
205  Size size;
206 
207  size = offsetof(SISeg, procState);
208 
209  /*
210  * In Hot Standby mode, the startup process requests a procState array
211  * slot using InitRecoveryTransactionEnvironment(). Even though
212  * MaxBackends doesn't account for the startup process, it is guaranteed
213  * to get a free slot. This is because the autovacuum launcher and worker
214  * processes, which are included in MaxBackends, are not started in Hot
215  * Standby mode.
216  */
217  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
218 
219  return size;
220 }
size_t Size
Definition: c.h:589
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), MaxBackends, and mul_size().

Referenced by CalculateShmemSize(), and CreateSharedInvalidationState().