PostgreSQL Source Code  git master
sinvaladt.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/transam.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
Include dependency graph for sinvaladt.c:

Go to the source code of this file.

Data Structures

struct  ProcState
 
struct  SISeg
 

Macros

#define MAXNUMMESSAGES   4096
 
#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)
 
#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)
 
#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)
 
#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)
 
#define WRITE_QUANTUM   64
 

Typedefs

typedef struct ProcState ProcState
 
typedef struct SISeg SISeg
 

Functions

static void CleanupInvalidationState (int status, Datum arg)
 
Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Variables

static SISegshmInvalBuffer
 
static LocalTransactionId nextLocalTransactionId
 

Macro Definition Documentation

◆ CLEANUP_MIN

#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)

Definition at line 132 of file sinvaladt.c.

◆ CLEANUP_QUANTUM

#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)

Definition at line 133 of file sinvaladt.c.

◆ MAXNUMMESSAGES

#define MAXNUMMESSAGES   4096

Definition at line 130 of file sinvaladt.c.

◆ MSGNUMWRAPAROUND

#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)

Definition at line 131 of file sinvaladt.c.

◆ SIG_THRESHOLD

#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)

Definition at line 134 of file sinvaladt.c.

◆ WRITE_QUANTUM

#define WRITE_QUANTUM   64

Definition at line 135 of file sinvaladt.c.

Typedef Documentation

◆ ProcState

typedef struct ProcState ProcState

◆ SISeg

typedef struct SISeg SISeg

Function Documentation

◆ BackendIdGetProc()

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 385 of file sinvaladt.c.

386 {
387  PGPROC *result = NULL;
388  SISeg *segP = shmInvalBuffer;
389 
390  /* Need to lock out additions/removals of backends */
391  LWLockAcquire(SInvalWriteLock, LW_SHARED);
392 
393  if (backendID > 0 && backendID <= segP->lastBackend)
394  {
395  ProcState *stateP = &segP->procState[backendID - 1];
396 
397  result = stateP->proc;
398  }
399 
400  LWLockRelease(SInvalWriteLock);
401 
402  return result;
403 }
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
@ LW_SHARED
Definition: lwlock.h:105
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
Definition: proc.h:160
PGPROC * proc
Definition: sinvaladt.c:142
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and shmInvalBuffer.

Referenced by checkTempNamespaceStatus(), LogRecoveryConflict(), VirtualXactLock(), WaitForLockersMultiple(), and WaitForOlderSnapshots().

◆ BackendIdGetTransactionIds()

void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin 
)

Definition at line 412 of file sinvaladt.c.

413 {
414  SISeg *segP = shmInvalBuffer;
415 
416  *xid = InvalidTransactionId;
417  *xmin = InvalidTransactionId;
418 
419  /* Need to lock out additions/removals of backends */
420  LWLockAcquire(SInvalWriteLock, LW_SHARED);
421 
422  if (backendID > 0 && backendID <= segP->lastBackend)
423  {
424  ProcState *stateP = &segP->procState[backendID - 1];
425  PGPROC *proc = stateP->proc;
426 
427  if (proc != NULL)
428  {
429  *xid = proc->xid;
430  *xmin = proc->xmin;
431  }
432  }
433 
434  LWLockRelease(SInvalWriteLock);
435 }
TransactionId xmin
Definition: proc.h:176
TransactionId xid
Definition: proc.h:171
#define InvalidTransactionId
Definition: transam.h:31

References InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, shmInvalBuffer, PGPROC::xid, and PGPROC::xmin.

Referenced by pgstat_read_current_status().

◆ CleanupInvalidationState()

static void CleanupInvalidationState ( int  status,
Datum  arg 
)
static

Definition at line 344 of file sinvaladt.c.

345 {
346  SISeg *segP = (SISeg *) DatumGetPointer(arg);
347  ProcState *stateP;
348  int i;
349 
350  Assert(PointerIsValid(segP));
351 
352  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
353 
354  stateP = &segP->procState[MyBackendId - 1];
355 
356  /* Update next local transaction ID for next holder of this backendID */
358 
359  /* Mark myself inactive */
360  stateP->procPid = 0;
361  stateP->proc = NULL;
362  stateP->nextMsgNum = 0;
363  stateP->resetState = false;
364  stateP->signaled = false;
365 
366  /* Recompute index of last active backend */
367  for (i = segP->lastBackend; i > 0; i--)
368  {
369  if (segP->procState[i - 1].procPid != 0)
370  break;
371  }
372  segP->lastBackend = i;
373 
374  LWLockRelease(SInvalWriteLock);
375 }
#define PointerIsValid(pointer)
Definition: c.h:698
BackendId MyBackendId
Definition: globals.c:85
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
@ LW_EXCLUSIVE
Definition: lwlock.h:104
void * arg
#define DatumGetPointer(X)
Definition: postgres.h:593
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
int nextMsgNum
Definition: sinvaladt.c:144
bool signaled
Definition: sinvaladt.c:146
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
pid_t procPid
Definition: sinvaladt.c:141
bool resetState
Definition: sinvaladt.c:145
int lastBackend
Definition: sinvaladt.c:175

References arg, Assert(), DatumGetPointer, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, and ProcState::signaled.

Referenced by SharedInvalBackendInit().

◆ CreateSharedInvalidationState()

void CreateSharedInvalidationState ( void  )

Definition at line 227 of file sinvaladt.c.

228 {
229  int i;
230  bool found;
231 
232  /* Allocate space in shared memory */
233  shmInvalBuffer = (SISeg *)
234  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
235  if (found)
236  return;
237 
238  /* Clear message counters, save size of procState array, init spinlock */
245 
246  /* The buffer[] array is initially all unused, so we need not fill it */
247 
248  /* Mark all backends inactive, and initialize nextLXID */
249  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
250  {
251  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
252  shmInvalBuffer->procState[i].proc = NULL;
253  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
258  }
259 }
int MaxBackends
Definition: globals.c:140
#define InvalidLocalTransactionId
Definition: lock.h:70
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size SInvalShmemSize(void)
Definition: sinvaladt.c:203
#define CLEANUP_MIN
Definition: sinvaladt.c:132
#define SpinLockInit(lock)
Definition: spin.h:60
bool hasMessages
Definition: sinvaladt.c:147
int minMsgNum
Definition: sinvaladt.c:172
int maxMsgNum
Definition: sinvaladt.c:173
slock_t msgnumLock
Definition: sinvaladt.c:178
int nextThreshold
Definition: sinvaladt.c:174
int maxBackends
Definition: sinvaladt.c:176

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), shmInvalBuffer, ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 775 of file sinvaladt.c.

776 {
777  LocalTransactionId result;
778 
779  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
780  do
781  {
782  result = nextLocalTransactionId++;
783  } while (!LocalTransactionIdIsValid(result));
784 
785  return result;
786 }
uint32 LocalTransactionId
Definition: c.h:589
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:71

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 266 of file sinvaladt.c.

267 {
268  int index;
269  ProcState *stateP = NULL;
270  SISeg *segP = shmInvalBuffer;
271 
272  /*
273  * This can run in parallel with read operations, but not with write
274  * operations, since SIInsertDataEntries relies on lastBackend to set
275  * hasMessages appropriately.
276  */
277  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
278 
279  /* Look for a free entry in the procState array */
280  for (index = 0; index < segP->lastBackend; index++)
281  {
282  if (segP->procState[index].procPid == 0) /* inactive slot? */
283  {
284  stateP = &segP->procState[index];
285  break;
286  }
287  }
288 
289  if (stateP == NULL)
290  {
291  if (segP->lastBackend < segP->maxBackends)
292  {
293  stateP = &segP->procState[segP->lastBackend];
294  Assert(stateP->procPid == 0);
295  segP->lastBackend++;
296  }
297  else
298  {
299  /*
300  * out of procState slots: MaxBackends exceeded -- report normally
301  */
303  LWLockRelease(SInvalWriteLock);
304  ereport(FATAL,
305  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
306  errmsg("sorry, too many clients already")));
307  }
308  }
309 
310  MyBackendId = (stateP - &segP->procState[0]) + 1;
311 
312  /* Advertise assigned backend ID in MyProc */
314 
315  /* Fetch next local transaction ID into local memory */
317 
318  /* mark myself active, with all extant messages already read */
319  stateP->procPid = MyProcPid;
320  stateP->proc = MyProc;
321  stateP->nextMsgNum = segP->maxMsgNum;
322  stateP->resetState = false;
323  stateP->signaled = false;
324  stateP->hasMessages = false;
325  stateP->sendOnly = sendOnly;
326 
327  LWLockRelease(SInvalWriteLock);
328 
329  /* register exit routine to mark my entry inactive at exit */
331 
332  elog(DEBUG4, "my backend ID is %d", MyBackendId);
333 }
#define InvalidBackendId
Definition: backendid.h:23
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define FATAL
Definition: elog.h:35
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
#define DEBUG4
Definition: elog.h:21
int MyProcPid
Definition: globals.c:44
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
#define PointerGetDatum(X)
Definition: postgres.h:600
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:344
PGPROC * MyProc
Definition: proc.c:68
BackendId backendId
Definition: proc.h:191
bool sendOnly
Definition: sinvaladt.c:155
Definition: type.h:90

References Assert(), PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 649 of file sinvaladt.c.

650 {
651  SISeg *segP = shmInvalBuffer;
652  int min,
653  minsig,
654  lowbound,
655  numMsgs,
656  i;
657  ProcState *needSig = NULL;
658 
659  /* Lock out all writers and readers */
660  if (!callerHasWriteLock)
661  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
662  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
663 
664  /*
665  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
666  * furthest-back backend that needs signaling (if any), and reset any
667  * backends that are too far back. Note that because we ignore sendOnly
668  * backends here it is possible for them to keep sending messages without
669  * a problem even when they are the only active backend.
670  */
671  min = segP->maxMsgNum;
672  minsig = min - SIG_THRESHOLD;
673  lowbound = min - MAXNUMMESSAGES + minFree;
674 
675  for (i = 0; i < segP->lastBackend; i++)
676  {
677  ProcState *stateP = &segP->procState[i];
678  int n = stateP->nextMsgNum;
679 
680  /* Ignore if inactive or already in reset state */
681  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
682  continue;
683 
684  /*
685  * If we must free some space and this backend is preventing it, force
686  * him into reset state and then ignore until he catches up.
687  */
688  if (n < lowbound)
689  {
690  stateP->resetState = true;
691  /* no point in signaling him ... */
692  continue;
693  }
694 
695  /* Track the global minimum nextMsgNum */
696  if (n < min)
697  min = n;
698 
699  /* Also see who's furthest back of the unsignaled backends */
700  if (n < minsig && !stateP->signaled)
701  {
702  minsig = n;
703  needSig = stateP;
704  }
705  }
706  segP->minMsgNum = min;
707 
708  /*
709  * When minMsgNum gets really large, decrement all message counters so as
710  * to forestall overflow of the counters. This happens seldom enough that
711  * folding it into the previous loop would be a loser.
712  */
713  if (min >= MSGNUMWRAPAROUND)
714  {
715  segP->minMsgNum -= MSGNUMWRAPAROUND;
716  segP->maxMsgNum -= MSGNUMWRAPAROUND;
717  for (i = 0; i < segP->lastBackend; i++)
718  {
719  /* we don't bother skipping inactive entries here */
721  }
722  }
723 
724  /*
725  * Determine how many messages are still in the queue, and set the
726  * threshold at which we should repeat SICleanupQueue().
727  */
728  numMsgs = segP->maxMsgNum - segP->minMsgNum;
729  if (numMsgs < CLEANUP_MIN)
730  segP->nextThreshold = CLEANUP_MIN;
731  else
732  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
733 
734  /*
735  * Lastly, signal anyone who needs a catchup interrupt. Since
736  * SendProcSignal() might not be fast, we don't want to hold locks while
737  * executing it.
738  */
739  if (needSig)
740  {
741  pid_t his_pid = needSig->procPid;
742  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
743 
744  needSig->signaled = true;
745  LWLockRelease(SInvalReadLock);
746  LWLockRelease(SInvalWriteLock);
747  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
748  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
749  if (callerHasWriteLock)
750  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
751  }
752  else
753  {
754  LWLockRelease(SInvalReadLock);
755  if (!callerHasWriteLock)
756  LWLockRelease(SInvalWriteLock);
757  }
758 }
int BackendId
Definition: backendid.h:21
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
@ PROCSIG_CATCHUP_INTERRUPT
Definition: procsignal.h:32
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
#define SIG_THRESHOLD
Definition: sinvaladt.c:134

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 545 of file sinvaladt.c.

546 {
547  SISeg *segP;
548  ProcState *stateP;
549  int max;
550  int n;
551 
552  segP = shmInvalBuffer;
553  stateP = &segP->procState[MyBackendId - 1];
554 
555  /*
556  * Before starting to take locks, do a quick, unlocked test to see whether
557  * there can possibly be anything to read. On a multiprocessor system,
558  * it's possible that this load could migrate backwards and occur before
559  * we actually enter this function, so we might miss a sinval message that
560  * was just added by some other processor. But they can't migrate
561  * backwards over a preceding lock acquisition, so it should be OK. If we
562  * haven't acquired a lock preventing against further relevant
563  * invalidations, any such occurrence is not much different than if the
564  * invalidation had arrived slightly later in the first place.
565  */
566  if (!stateP->hasMessages)
567  return 0;
568 
569  LWLockAcquire(SInvalReadLock, LW_SHARED);
570 
571  /*
572  * We must reset hasMessages before determining how many messages we're
573  * going to read. That way, if new messages arrive after we have
574  * determined how many we're reading, the flag will get reset and we'll
575  * notice those messages part-way through.
576  *
577  * Note that, if we don't end up reading all of the messages, we had
578  * better be certain to reset this flag before exiting!
579  */
580  stateP->hasMessages = false;
581 
582  /* Fetch current value of maxMsgNum using spinlock */
583  SpinLockAcquire(&segP->msgnumLock);
584  max = segP->maxMsgNum;
585  SpinLockRelease(&segP->msgnumLock);
586 
587  if (stateP->resetState)
588  {
589  /*
590  * Force reset. We can say we have dealt with any messages added
591  * since the reset, as well; and that means we should clear the
592  * signaled flag, too.
593  */
594  stateP->nextMsgNum = max;
595  stateP->resetState = false;
596  stateP->signaled = false;
597  LWLockRelease(SInvalReadLock);
598  return -1;
599  }
600 
601  /*
602  * Retrieve messages and advance backend's counter, until data array is
603  * full or there are no more messages.
604  *
605  * There may be other backends that haven't read the message(s), so we
606  * cannot delete them here. SICleanupQueue() will eventually remove them
607  * from the queue.
608  */
609  n = 0;
610  while (n < datasize && stateP->nextMsgNum < max)
611  {
612  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
613  stateP->nextMsgNum++;
614  }
615 
616  /*
617  * If we have caught up completely, reset our "signaled" flag so that
618  * we'll get another signal if we fall behind again.
619  *
620  * If we haven't caught up completely, reset the hasMessages flag so that
621  * we see the remaining messages next time.
622  */
623  if (stateP->nextMsgNum >= max)
624  stateP->signaled = false;
625  else
626  stateP->hasMessages = true;
627 
628  LWLockRelease(SInvalReadLock);
629  return n;
630 }
const void * data
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183

References SISeg::buffer, data, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 442 of file sinvaladt.c.

443 {
444  SISeg *segP = shmInvalBuffer;
445 
446  /*
447  * N can be arbitrarily large. We divide the work into groups of no more
448  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
449  * an unreasonably long time. (This is not so much because we care about
450  * letting in other writers, as that some just-caught-up backend might be
451  * trying to do SICleanupQueue to pass on its signal, and we don't want it
452  * to have to wait a long time.) Also, we need to consider calling
453  * SICleanupQueue every so often.
454  */
455  while (n > 0)
456  {
457  int nthistime = Min(n, WRITE_QUANTUM);
458  int numMsgs;
459  int max;
460  int i;
461 
462  n -= nthistime;
463 
464  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
465 
466  /*
467  * If the buffer is full, we *must* acquire some space. Clean the
468  * queue and reset anyone who is preventing space from being freed.
469  * Otherwise, clean the queue only when it's exceeded the next
470  * fullness threshold. We have to loop and recheck the buffer state
471  * after any call of SICleanupQueue.
472  */
473  for (;;)
474  {
475  numMsgs = segP->maxMsgNum - segP->minMsgNum;
476  if (numMsgs + nthistime > MAXNUMMESSAGES ||
477  numMsgs >= segP->nextThreshold)
478  SICleanupQueue(true, nthistime);
479  else
480  break;
481  }
482 
483  /*
484  * Insert new message(s) into proper slot of circular buffer
485  */
486  max = segP->maxMsgNum;
487  while (nthistime-- > 0)
488  {
489  segP->buffer[max % MAXNUMMESSAGES] = *data++;
490  max++;
491  }
492 
493  /* Update current value of maxMsgNum using spinlock */
494  SpinLockAcquire(&segP->msgnumLock);
495  segP->maxMsgNum = max;
496  SpinLockRelease(&segP->msgnumLock);
497 
498  /*
499  * Now that the maxMsgNum change is globally visible, we give everyone
500  * a swift kick to make sure they read the newly added messages.
501  * Releasing SInvalWriteLock will enforce a full memory barrier, so
502  * these (unlocked) changes will be committed to memory before we exit
503  * the function.
504  */
505  for (i = 0; i < segP->lastBackend; i++)
506  {
507  ProcState *stateP = &segP->procState[i];
508 
509  stateP->hasMessages = true;
510  }
511 
512  LWLockRelease(SInvalWriteLock);
513  }
514 }
#define Min(x, y)
Definition: c.h:986
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:649

References SISeg::buffer, data, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

◆ SInvalShmemSize()

Size SInvalShmemSize ( void  )

Definition at line 203 of file sinvaladt.c.

204 {
205  Size size;
206 
207  size = offsetof(SISeg, procState);
208 
209  /*
210  * In Hot Standby mode, the startup process requests a procState array
211  * slot using InitRecoveryTransactionEnvironment(). Even though
212  * MaxBackends doesn't account for the startup process, it is guaranteed
213  * to get a free slot. This is because the autovacuum launcher and worker
214  * processes, which are included in MaxBackends, are not started in Hot
215  * Standby mode.
216  */
217  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
218 
219  return size;
220 }
#define offsetof(type, field)
Definition: c.h:727
size_t Size
Definition: c.h:540
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CalculateShmemSize(), and CreateSharedInvalidationState().

Variable Documentation

◆ nextLocalTransactionId

LocalTransactionId nextLocalTransactionId
static

◆ shmInvalBuffer