PostgreSQL Source Code  git master
sinvaladt.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/transam.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
Include dependency graph for sinvaladt.c:

Go to the source code of this file.

Data Structures

struct  ProcState
 
struct  SISeg
 

Macros

#define MAXNUMMESSAGES   4096
 
#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)
 
#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)
 
#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)
 
#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)
 
#define WRITE_QUANTUM   64
 

Typedefs

typedef struct ProcState ProcState
 
typedef struct SISeg SISeg
 

Functions

static void CleanupInvalidationState (int status, Datum arg)
 
Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin, int *nsubxid, bool *overflowed)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Variables

static SISegshmInvalBuffer
 
static LocalTransactionId nextLocalTransactionId
 

Macro Definition Documentation

◆ CLEANUP_MIN

#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)

Definition at line 132 of file sinvaladt.c.

◆ CLEANUP_QUANTUM

#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)

Definition at line 133 of file sinvaladt.c.

◆ MAXNUMMESSAGES

#define MAXNUMMESSAGES   4096

Definition at line 130 of file sinvaladt.c.

◆ MSGNUMWRAPAROUND

#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)

Definition at line 131 of file sinvaladt.c.

◆ SIG_THRESHOLD

#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)

Definition at line 134 of file sinvaladt.c.

◆ WRITE_QUANTUM

#define WRITE_QUANTUM   64

Definition at line 135 of file sinvaladt.c.

Typedef Documentation

◆ ProcState

typedef struct ProcState ProcState

◆ SISeg

typedef struct SISeg SISeg

Function Documentation

◆ BackendIdGetProc()

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 385 of file sinvaladt.c.

386 {
387  PGPROC *result = NULL;
388  SISeg *segP = shmInvalBuffer;
389 
390  /* Need to lock out additions/removals of backends */
391  LWLockAcquire(SInvalWriteLock, LW_SHARED);
392 
393  if (backendID > 0 && backendID <= segP->lastBackend)
394  {
395  ProcState *stateP = &segP->procState[backendID - 1];
396 
397  result = stateP->proc;
398  }
399 
400  LWLockRelease(SInvalWriteLock);
401 
402  return result;
403 }
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
Definition: proc.h:162
PGPROC * proc
Definition: sinvaladt.c:142
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and shmInvalBuffer.

Referenced by checkTempNamespaceStatus(), LogRecoveryConflict(), VirtualXactLock(), WaitForLockersMultiple(), and WaitForOlderSnapshots().

◆ BackendIdGetTransactionIds()

void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin,
int *  nsubxid,
bool overflowed 
)

Definition at line 412 of file sinvaladt.c.

414 {
415  SISeg *segP = shmInvalBuffer;
416 
417  *xid = InvalidTransactionId;
418  *xmin = InvalidTransactionId;
419  *nsubxid = 0;
420  *overflowed = false;
421 
422  /* Need to lock out additions/removals of backends */
423  LWLockAcquire(SInvalWriteLock, LW_SHARED);
424 
425  if (backendID > 0 && backendID <= segP->lastBackend)
426  {
427  ProcState *stateP = &segP->procState[backendID - 1];
428  PGPROC *proc = stateP->proc;
429 
430  if (proc != NULL)
431  {
432  *xid = proc->xid;
433  *xmin = proc->xmin;
434  *nsubxid = proc->subxidStatus.count;
435  *overflowed = proc->subxidStatus.overflowed;
436  }
437  }
438 
439  LWLockRelease(SInvalWriteLock);
440 }
TransactionId xmin
Definition: proc.h:178
XidCacheStatus subxidStatus
Definition: proc.h:254
TransactionId xid
Definition: proc.h:173
bool overflowed
Definition: proc.h:45
uint8 count
Definition: proc.h:43
#define InvalidTransactionId
Definition: transam.h:31

References XidCacheStatus::count, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), XidCacheStatus::overflowed, ProcState::proc, SISeg::procState, shmInvalBuffer, PGPROC::subxidStatus, PGPROC::xid, and PGPROC::xmin.

Referenced by pgstat_read_current_status().

◆ CleanupInvalidationState()

static void CleanupInvalidationState ( int  status,
Datum  arg 
)
static

Definition at line 344 of file sinvaladt.c.

345 {
346  SISeg *segP = (SISeg *) DatumGetPointer(arg);
347  ProcState *stateP;
348  int i;
349 
350  Assert(PointerIsValid(segP));
351 
352  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
353 
354  stateP = &segP->procState[MyBackendId - 1];
355 
356  /* Update next local transaction ID for next holder of this backendID */
358 
359  /* Mark myself inactive */
360  stateP->procPid = 0;
361  stateP->proc = NULL;
362  stateP->nextMsgNum = 0;
363  stateP->resetState = false;
364  stateP->signaled = false;
365 
366  /* Recompute index of last active backend */
367  for (i = segP->lastBackend; i > 0; i--)
368  {
369  if (segP->procState[i - 1].procPid != 0)
370  break;
371  }
372  segP->lastBackend = i;
373 
374  LWLockRelease(SInvalWriteLock);
375 }
#define PointerIsValid(pointer)
Definition: c.h:747
BackendId MyBackendId
Definition: globals.c:85
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
@ LW_EXCLUSIVE
Definition: lwlock.h:116
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
int nextMsgNum
Definition: sinvaladt.c:144
bool signaled
Definition: sinvaladt.c:146
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
pid_t procPid
Definition: sinvaladt.c:141
bool resetState
Definition: sinvaladt.c:145
int lastBackend
Definition: sinvaladt.c:175

References arg, Assert(), DatumGetPointer(), i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, and ProcState::signaled.

Referenced by SharedInvalBackendInit().

◆ CreateSharedInvalidationState()

void CreateSharedInvalidationState ( void  )

Definition at line 227 of file sinvaladt.c.

228 {
229  int i;
230  bool found;
231 
232  /* Allocate space in shared memory */
233  shmInvalBuffer = (SISeg *)
234  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
235  if (found)
236  return;
237 
238  /* Clear message counters, save size of procState array, init spinlock */
245 
246  /* The buffer[] array is initially all unused, so we need not fill it */
247 
248  /* Mark all backends inactive, and initialize nextLXID */
249  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
250  {
251  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
252  shmInvalBuffer->procState[i].proc = NULL;
253  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
258  }
259 }
int MaxBackends
Definition: globals.c:140
#define InvalidLocalTransactionId
Definition: lock.h:65
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size SInvalShmemSize(void)
Definition: sinvaladt.c:203
#define CLEANUP_MIN
Definition: sinvaladt.c:132
#define SpinLockInit(lock)
Definition: spin.h:60
bool hasMessages
Definition: sinvaladt.c:147
int minMsgNum
Definition: sinvaladt.c:172
int maxMsgNum
Definition: sinvaladt.c:173
slock_t msgnumLock
Definition: sinvaladt.c:178
int nextThreshold
Definition: sinvaladt.c:174
int maxBackends
Definition: sinvaladt.c:176

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), shmInvalBuffer, ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 780 of file sinvaladt.c.

781 {
782  LocalTransactionId result;
783 
784  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
785  do
786  {
787  result = nextLocalTransactionId++;
788  } while (!LocalTransactionIdIsValid(result));
789 
790  return result;
791 }
uint32 LocalTransactionId
Definition: c.h:638
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:66

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 266 of file sinvaladt.c.

267 {
268  int index;
269  ProcState *stateP = NULL;
270  SISeg *segP = shmInvalBuffer;
271 
272  /*
273  * This can run in parallel with read operations, but not with write
274  * operations, since SIInsertDataEntries relies on lastBackend to set
275  * hasMessages appropriately.
276  */
277  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
278 
279  /* Look for a free entry in the procState array */
280  for (index = 0; index < segP->lastBackend; index++)
281  {
282  if (segP->procState[index].procPid == 0) /* inactive slot? */
283  {
284  stateP = &segP->procState[index];
285  break;
286  }
287  }
288 
289  if (stateP == NULL)
290  {
291  if (segP->lastBackend < segP->maxBackends)
292  {
293  stateP = &segP->procState[segP->lastBackend];
294  Assert(stateP->procPid == 0);
295  segP->lastBackend++;
296  }
297  else
298  {
299  /*
300  * out of procState slots: MaxBackends exceeded -- report normally
301  */
303  LWLockRelease(SInvalWriteLock);
304  ereport(FATAL,
305  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
306  errmsg("sorry, too many clients already")));
307  }
308  }
309 
310  MyBackendId = (stateP - &segP->procState[0]) + 1;
311 
312  /* Advertise assigned backend ID in MyProc */
314 
315  /* Fetch next local transaction ID into local memory */
317 
318  /* mark myself active, with all extant messages already read */
319  stateP->procPid = MyProcPid;
320  stateP->proc = MyProc;
321  stateP->nextMsgNum = segP->maxMsgNum;
322  stateP->resetState = false;
323  stateP->signaled = false;
324  stateP->hasMessages = false;
325  stateP->sendOnly = sendOnly;
326 
327  LWLockRelease(SInvalWriteLock);
328 
329  /* register exit routine to mark my entry inactive at exit */
331 
332  elog(DEBUG4, "my backend ID is %d", MyBackendId);
333 }
#define InvalidBackendId
Definition: backendid.h:23
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:149
#define DEBUG4
Definition: elog.h:27
int MyProcPid
Definition: globals.c:44
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:344
PGPROC * MyProc
Definition: proc.c:66
BackendId backendId
Definition: proc.h:197
bool sendOnly
Definition: sinvaladt.c:155
Definition: type.h:95

References Assert(), PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog(), ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, on_shmem_exit(), PointerGetDatum(), ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 654 of file sinvaladt.c.

655 {
656  SISeg *segP = shmInvalBuffer;
657  int min,
658  minsig,
659  lowbound,
660  numMsgs,
661  i;
662  ProcState *needSig = NULL;
663 
664  /* Lock out all writers and readers */
665  if (!callerHasWriteLock)
666  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
667  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
668 
669  /*
670  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
671  * furthest-back backend that needs signaling (if any), and reset any
672  * backends that are too far back. Note that because we ignore sendOnly
673  * backends here it is possible for them to keep sending messages without
674  * a problem even when they are the only active backend.
675  */
676  min = segP->maxMsgNum;
677  minsig = min - SIG_THRESHOLD;
678  lowbound = min - MAXNUMMESSAGES + minFree;
679 
680  for (i = 0; i < segP->lastBackend; i++)
681  {
682  ProcState *stateP = &segP->procState[i];
683  int n = stateP->nextMsgNum;
684 
685  /* Ignore if inactive or already in reset state */
686  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
687  continue;
688 
689  /*
690  * If we must free some space and this backend is preventing it, force
691  * him into reset state and then ignore until he catches up.
692  */
693  if (n < lowbound)
694  {
695  stateP->resetState = true;
696  /* no point in signaling him ... */
697  continue;
698  }
699 
700  /* Track the global minimum nextMsgNum */
701  if (n < min)
702  min = n;
703 
704  /* Also see who's furthest back of the unsignaled backends */
705  if (n < minsig && !stateP->signaled)
706  {
707  minsig = n;
708  needSig = stateP;
709  }
710  }
711  segP->minMsgNum = min;
712 
713  /*
714  * When minMsgNum gets really large, decrement all message counters so as
715  * to forestall overflow of the counters. This happens seldom enough that
716  * folding it into the previous loop would be a loser.
717  */
718  if (min >= MSGNUMWRAPAROUND)
719  {
720  segP->minMsgNum -= MSGNUMWRAPAROUND;
721  segP->maxMsgNum -= MSGNUMWRAPAROUND;
722  for (i = 0; i < segP->lastBackend; i++)
723  {
724  /* we don't bother skipping inactive entries here */
726  }
727  }
728 
729  /*
730  * Determine how many messages are still in the queue, and set the
731  * threshold at which we should repeat SICleanupQueue().
732  */
733  numMsgs = segP->maxMsgNum - segP->minMsgNum;
734  if (numMsgs < CLEANUP_MIN)
735  segP->nextThreshold = CLEANUP_MIN;
736  else
737  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
738 
739  /*
740  * Lastly, signal anyone who needs a catchup interrupt. Since
741  * SendProcSignal() might not be fast, we don't want to hold locks while
742  * executing it.
743  */
744  if (needSig)
745  {
746  pid_t his_pid = needSig->procPid;
747  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
748 
749  needSig->signaled = true;
750  LWLockRelease(SInvalReadLock);
751  LWLockRelease(SInvalWriteLock);
752  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
753  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
754  if (callerHasWriteLock)
755  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
756  }
757  else
758  {
759  LWLockRelease(SInvalReadLock);
760  if (!callerHasWriteLock)
761  LWLockRelease(SInvalWriteLock);
762  }
763 }
int BackendId
Definition: backendid.h:21
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_CATCHUP_INTERRUPT
Definition: procsignal.h:32
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
#define SIG_THRESHOLD
Definition: sinvaladt.c:134

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog(), i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 550 of file sinvaladt.c.

551 {
552  SISeg *segP;
553  ProcState *stateP;
554  int max;
555  int n;
556 
557  segP = shmInvalBuffer;
558  stateP = &segP->procState[MyBackendId - 1];
559 
560  /*
561  * Before starting to take locks, do a quick, unlocked test to see whether
562  * there can possibly be anything to read. On a multiprocessor system,
563  * it's possible that this load could migrate backwards and occur before
564  * we actually enter this function, so we might miss a sinval message that
565  * was just added by some other processor. But they can't migrate
566  * backwards over a preceding lock acquisition, so it should be OK. If we
567  * haven't acquired a lock preventing against further relevant
568  * invalidations, any such occurrence is not much different than if the
569  * invalidation had arrived slightly later in the first place.
570  */
571  if (!stateP->hasMessages)
572  return 0;
573 
574  LWLockAcquire(SInvalReadLock, LW_SHARED);
575 
576  /*
577  * We must reset hasMessages before determining how many messages we're
578  * going to read. That way, if new messages arrive after we have
579  * determined how many we're reading, the flag will get reset and we'll
580  * notice those messages part-way through.
581  *
582  * Note that, if we don't end up reading all of the messages, we had
583  * better be certain to reset this flag before exiting!
584  */
585  stateP->hasMessages = false;
586 
587  /* Fetch current value of maxMsgNum using spinlock */
588  SpinLockAcquire(&segP->msgnumLock);
589  max = segP->maxMsgNum;
590  SpinLockRelease(&segP->msgnumLock);
591 
592  if (stateP->resetState)
593  {
594  /*
595  * Force reset. We can say we have dealt with any messages added
596  * since the reset, as well; and that means we should clear the
597  * signaled flag, too.
598  */
599  stateP->nextMsgNum = max;
600  stateP->resetState = false;
601  stateP->signaled = false;
602  LWLockRelease(SInvalReadLock);
603  return -1;
604  }
605 
606  /*
607  * Retrieve messages and advance backend's counter, until data array is
608  * full or there are no more messages.
609  *
610  * There may be other backends that haven't read the message(s), so we
611  * cannot delete them here. SICleanupQueue() will eventually remove them
612  * from the queue.
613  */
614  n = 0;
615  while (n < datasize && stateP->nextMsgNum < max)
616  {
617  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
618  stateP->nextMsgNum++;
619  }
620 
621  /*
622  * If we have caught up completely, reset our "signaled" flag so that
623  * we'll get another signal if we fall behind again.
624  *
625  * If we haven't caught up completely, reset the hasMessages flag so that
626  * we see the remaining messages next time.
627  */
628  if (stateP->nextMsgNum >= max)
629  stateP->signaled = false;
630  else
631  stateP->hasMessages = true;
632 
633  LWLockRelease(SInvalReadLock);
634  return n;
635 }
const void * data
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183

References SISeg::buffer, data, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 447 of file sinvaladt.c.

448 {
449  SISeg *segP = shmInvalBuffer;
450 
451  /*
452  * N can be arbitrarily large. We divide the work into groups of no more
453  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
454  * an unreasonably long time. (This is not so much because we care about
455  * letting in other writers, as that some just-caught-up backend might be
456  * trying to do SICleanupQueue to pass on its signal, and we don't want it
457  * to have to wait a long time.) Also, we need to consider calling
458  * SICleanupQueue every so often.
459  */
460  while (n > 0)
461  {
462  int nthistime = Min(n, WRITE_QUANTUM);
463  int numMsgs;
464  int max;
465  int i;
466 
467  n -= nthistime;
468 
469  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
470 
471  /*
472  * If the buffer is full, we *must* acquire some space. Clean the
473  * queue and reset anyone who is preventing space from being freed.
474  * Otherwise, clean the queue only when it's exceeded the next
475  * fullness threshold. We have to loop and recheck the buffer state
476  * after any call of SICleanupQueue.
477  */
478  for (;;)
479  {
480  numMsgs = segP->maxMsgNum - segP->minMsgNum;
481  if (numMsgs + nthistime > MAXNUMMESSAGES ||
482  numMsgs >= segP->nextThreshold)
483  SICleanupQueue(true, nthistime);
484  else
485  break;
486  }
487 
488  /*
489  * Insert new message(s) into proper slot of circular buffer
490  */
491  max = segP->maxMsgNum;
492  while (nthistime-- > 0)
493  {
494  segP->buffer[max % MAXNUMMESSAGES] = *data++;
495  max++;
496  }
497 
498  /* Update current value of maxMsgNum using spinlock */
499  SpinLockAcquire(&segP->msgnumLock);
500  segP->maxMsgNum = max;
501  SpinLockRelease(&segP->msgnumLock);
502 
503  /*
504  * Now that the maxMsgNum change is globally visible, we give everyone
505  * a swift kick to make sure they read the newly added messages.
506  * Releasing SInvalWriteLock will enforce a full memory barrier, so
507  * these (unlocked) changes will be committed to memory before we exit
508  * the function.
509  */
510  for (i = 0; i < segP->lastBackend; i++)
511  {
512  ProcState *stateP = &segP->procState[i];
513 
514  stateP->hasMessages = true;
515  }
516 
517  LWLockRelease(SInvalWriteLock);
518  }
519 }
#define Min(x, y)
Definition: c.h:988
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:654

References SISeg::buffer, data, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

◆ SInvalShmemSize()

Size SInvalShmemSize ( void  )

Definition at line 203 of file sinvaladt.c.

204 {
205  Size size;
206 
207  size = offsetof(SISeg, procState);
208 
209  /*
210  * In Hot Standby mode, the startup process requests a procState array
211  * slot using InitRecoveryTransactionEnvironment(). Even though
212  * MaxBackends doesn't account for the startup process, it is guaranteed
213  * to get a free slot. This is because the autovacuum launcher and worker
214  * processes, which are included in MaxBackends, are not started in Hot
215  * Standby mode.
216  */
217  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
218 
219  return size;
220 }
size_t Size
Definition: c.h:589
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), MaxBackends, and mul_size().

Referenced by CalculateShmemSize(), and CreateSharedInvalidationState().

Variable Documentation

◆ nextLocalTransactionId

LocalTransactionId nextLocalTransactionId
static

◆ shmInvalBuffer