PostgreSQL Source Code  git master
sinvaladt.h File Reference
#include "storage/lock.h"
#include "storage/sinval.h"
Include dependency graph for sinvaladt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Function Documentation

◆ BackendIdGetProc()

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 376 of file sinvaladt.c.

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and shmInvalBuffer.

Referenced by checkTempNamespaceStatus(), VirtualXactLock(), WaitForLockersMultiple(), and WaitForOlderSnapshots().

377 {
378  PGPROC *result = NULL;
379  SISeg *segP = shmInvalBuffer;
380 
381  /* Need to lock out additions/removals of backends */
382  LWLockAcquire(SInvalWriteLock, LW_SHARED);
383 
384  if (backendID > 0 && backendID <= segP->lastBackend)
385  {
386  ProcState *stateP = &segP->procState[backendID - 1];
387 
388  result = stateP->proc;
389  }
390 
391  LWLockRelease(SInvalWriteLock);
392 
393  return result;
394 }
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
Definition: proc.h:112

◆ BackendIdGetTransactionIds()

void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin 
)

Definition at line 403 of file sinvaladt.c.

References InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, shmInvalBuffer, PGPROC::xid, and PGPROC::xmin.

Referenced by pgstat_read_current_status().

404 {
405  SISeg *segP = shmInvalBuffer;
406 
407  *xid = InvalidTransactionId;
408  *xmin = InvalidTransactionId;
409 
410  /* Need to lock out additions/removals of backends */
411  LWLockAcquire(SInvalWriteLock, LW_SHARED);
412 
413  if (backendID > 0 && backendID <= segP->lastBackend)
414  {
415  ProcState *stateP = &segP->procState[backendID - 1];
416  PGPROC *proc = stateP->proc;
417 
418  if (proc != NULL)
419  {
420  *xid = proc->xid;
421  *xmin = proc->xmin;
422  }
423  }
424 
425  LWLockRelease(SInvalWriteLock);
426 }
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
TransactionId xmin
Definition: proc.h:129
#define InvalidTransactionId
Definition: transam.h:31
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
TransactionId xid
Definition: proc.h:124
Definition: proc.h:112

◆ CreateSharedInvalidationState()

void CreateSharedInvalidationState ( void  )

Definition at line 218 of file sinvaladt.c.

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, MaxBackends, SISeg::maxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

219 {
220  int i;
221  bool found;
222 
223  /* Allocate space in shared memory */
224  shmInvalBuffer = (SISeg *)
225  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
226  if (found)
227  return;
228 
229  /* Clear message counters, save size of procState array, init spinlock */
236 
237  /* The buffer[] array is initially all unused, so we need not fill it */
238 
239  /* Mark all backends inactive, and initialize nextLXID */
240  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
241  {
242  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
243  shmInvalBuffer->procState[i].proc = NULL;
244  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
249  }
250 }
Size SInvalShmemSize(void)
Definition: sinvaladt.c:203
#define CLEANUP_MIN
Definition: sinvaladt.c:132
int lastBackend
Definition: sinvaladt.c:175
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * proc
Definition: sinvaladt.c:142
slock_t msgnumLock
Definition: sinvaladt.c:178
bool resetState
Definition: sinvaladt.c:145
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
int MaxBackends
Definition: globals.c:136
int maxMsgNum
Definition: sinvaladt.c:173
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
int maxBackends
Definition: sinvaladt.c:176
#define InvalidLocalTransactionId
Definition: lock.h:68
int i
int nextThreshold
Definition: sinvaladt.c:174
bool hasMessages
Definition: sinvaladt.c:147
pid_t procPid
Definition: sinvaladt.c:141
int nextMsgNum
Definition: sinvaladt.c:144

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 766 of file sinvaladt.c.

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

767 {
768  LocalTransactionId result;
769 
770  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
771  do
772  {
773  result = nextLocalTransactionId++;
774  } while (!LocalTransactionIdIsValid(result));
775 
776  return result;
777 }
uint32 LocalTransactionId
Definition: c.h:522
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:69

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 257 of file sinvaladt.c.

References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

258 {
259  int index;
260  ProcState *stateP = NULL;
261  SISeg *segP = shmInvalBuffer;
262 
263  /*
264  * This can run in parallel with read operations, but not with write
265  * operations, since SIInsertDataEntries relies on lastBackend to set
266  * hasMessages appropriately.
267  */
268  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
269 
270  /* Look for a free entry in the procState array */
271  for (index = 0; index < segP->lastBackend; index++)
272  {
273  if (segP->procState[index].procPid == 0) /* inactive slot? */
274  {
275  stateP = &segP->procState[index];
276  break;
277  }
278  }
279 
280  if (stateP == NULL)
281  {
282  if (segP->lastBackend < segP->maxBackends)
283  {
284  stateP = &segP->procState[segP->lastBackend];
285  Assert(stateP->procPid == 0);
286  segP->lastBackend++;
287  }
288  else
289  {
290  /*
291  * out of procState slots: MaxBackends exceeded -- report normally
292  */
294  LWLockRelease(SInvalWriteLock);
295  ereport(FATAL,
296  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
297  errmsg("sorry, too many clients already")));
298  }
299  }
300 
301  MyBackendId = (stateP - &segP->procState[0]) + 1;
302 
303  /* Advertise assigned backend ID in MyProc */
305 
306  /* Fetch next local transaction ID into local memory */
308 
309  /* mark myself active, with all extant messages already read */
310  stateP->procPid = MyProcPid;
311  stateP->proc = MyProc;
312  stateP->nextMsgNum = segP->maxMsgNum;
313  stateP->resetState = false;
314  stateP->signaled = false;
315  stateP->hasMessages = false;
316  stateP->sendOnly = sendOnly;
317 
318  LWLockRelease(SInvalWriteLock);
319 
320  /* register exit routine to mark my entry inactive at exit */
322 
323  elog(DEBUG4, "my backend ID is %d", MyBackendId);
324 }
int lastBackend
Definition: sinvaladt.c:175
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
BackendId backendId
Definition: proc.h:144
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:556
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:335
int errcode(int sqlerrcode)
Definition: elog.c:610
#define DEBUG4
Definition: elog.h:22
bool resetState
Definition: sinvaladt.c:145
Definition: type.h:89
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define FATAL
Definition: elog.h:52
bool sendOnly
Definition: sinvaladt.c:155
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int maxMsgNum
Definition: sinvaladt.c:173
#define InvalidBackendId
Definition: backendid.h:23
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:745
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int errmsg(const char *fmt,...)
Definition: elog.c:824
int maxBackends
Definition: sinvaladt.c:176
#define elog(elevel,...)
Definition: elog.h:214
bool hasMessages
Definition: sinvaladt.c:147
pid_t procPid
Definition: sinvaladt.c:141
int nextMsgNum
Definition: sinvaladt.c:144

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 640 of file sinvaladt.c.

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

641 {
642  SISeg *segP = shmInvalBuffer;
643  int min,
644  minsig,
645  lowbound,
646  numMsgs,
647  i;
648  ProcState *needSig = NULL;
649 
650  /* Lock out all writers and readers */
651  if (!callerHasWriteLock)
652  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
653  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
654 
655  /*
656  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
657  * furthest-back backend that needs signaling (if any), and reset any
658  * backends that are too far back. Note that because we ignore sendOnly
659  * backends here it is possible for them to keep sending messages without
660  * a problem even when they are the only active backend.
661  */
662  min = segP->maxMsgNum;
663  minsig = min - SIG_THRESHOLD;
664  lowbound = min - MAXNUMMESSAGES + minFree;
665 
666  for (i = 0; i < segP->lastBackend; i++)
667  {
668  ProcState *stateP = &segP->procState[i];
669  int n = stateP->nextMsgNum;
670 
671  /* Ignore if inactive or already in reset state */
672  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
673  continue;
674 
675  /*
676  * If we must free some space and this backend is preventing it, force
677  * him into reset state and then ignore until he catches up.
678  */
679  if (n < lowbound)
680  {
681  stateP->resetState = true;
682  /* no point in signaling him ... */
683  continue;
684  }
685 
686  /* Track the global minimum nextMsgNum */
687  if (n < min)
688  min = n;
689 
690  /* Also see who's furthest back of the unsignaled backends */
691  if (n < minsig && !stateP->signaled)
692  {
693  minsig = n;
694  needSig = stateP;
695  }
696  }
697  segP->minMsgNum = min;
698 
699  /*
700  * When minMsgNum gets really large, decrement all message counters so as
701  * to forestall overflow of the counters. This happens seldom enough that
702  * folding it into the previous loop would be a loser.
703  */
704  if (min >= MSGNUMWRAPAROUND)
705  {
706  segP->minMsgNum -= MSGNUMWRAPAROUND;
707  segP->maxMsgNum -= MSGNUMWRAPAROUND;
708  for (i = 0; i < segP->lastBackend; i++)
709  {
710  /* we don't bother skipping inactive entries here */
712  }
713  }
714 
715  /*
716  * Determine how many messages are still in the queue, and set the
717  * threshold at which we should repeat SICleanupQueue().
718  */
719  numMsgs = segP->maxMsgNum - segP->minMsgNum;
720  if (numMsgs < CLEANUP_MIN)
721  segP->nextThreshold = CLEANUP_MIN;
722  else
723  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
724 
725  /*
726  * Lastly, signal anyone who needs a catchup interrupt. Since
727  * SendProcSignal() might not be fast, we don't want to hold locks while
728  * executing it.
729  */
730  if (needSig)
731  {
732  pid_t his_pid = needSig->procPid;
733  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
734 
735  needSig->signaled = true;
736  LWLockRelease(SInvalReadLock);
737  LWLockRelease(SInvalWriteLock);
738  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
739  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
740  if (callerHasWriteLock)
741  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
742  }
743  else
744  {
745  LWLockRelease(SInvalReadLock);
746  if (!callerHasWriteLock)
747  LWLockRelease(SInvalWriteLock);
748  }
749 }
#define CLEANUP_MIN
Definition: sinvaladt.c:132
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
int lastBackend
Definition: sinvaladt.c:175
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
#define DEBUG4
Definition: elog.h:22
#define SIG_THRESHOLD
Definition: sinvaladt.c:134
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
bool sendOnly
Definition: sinvaladt.c:155
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
int BackendId
Definition: backendid.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
#define elog(elevel,...)
Definition: elog.h:214
int i
int nextThreshold
Definition: sinvaladt.c:174
pid_t procPid
Definition: sinvaladt.c:141
static volatile sig_atomic_t signaled
Definition: pg_standby.c:52
int nextMsgNum
Definition: sinvaladt.c:144

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 536 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

537 {
538  SISeg *segP;
539  ProcState *stateP;
540  int max;
541  int n;
542 
543  segP = shmInvalBuffer;
544  stateP = &segP->procState[MyBackendId - 1];
545 
546  /*
547  * Before starting to take locks, do a quick, unlocked test to see whether
548  * there can possibly be anything to read. On a multiprocessor system,
549  * it's possible that this load could migrate backwards and occur before
550  * we actually enter this function, so we might miss a sinval message that
551  * was just added by some other processor. But they can't migrate
552  * backwards over a preceding lock acquisition, so it should be OK. If we
553  * haven't acquired a lock preventing against further relevant
554  * invalidations, any such occurrence is not much different than if the
555  * invalidation had arrived slightly later in the first place.
556  */
557  if (!stateP->hasMessages)
558  return 0;
559 
560  LWLockAcquire(SInvalReadLock, LW_SHARED);
561 
562  /*
563  * We must reset hasMessages before determining how many messages we're
564  * going to read. That way, if new messages arrive after we have
565  * determined how many we're reading, the flag will get reset and we'll
566  * notice those messages part-way through.
567  *
568  * Note that, if we don't end up reading all of the messages, we had
569  * better be certain to reset this flag before exiting!
570  */
571  stateP->hasMessages = false;
572 
573  /* Fetch current value of maxMsgNum using spinlock */
574  SpinLockAcquire(&segP->msgnumLock);
575  max = segP->maxMsgNum;
576  SpinLockRelease(&segP->msgnumLock);
577 
578  if (stateP->resetState)
579  {
580  /*
581  * Force reset. We can say we have dealt with any messages added
582  * since the reset, as well; and that means we should clear the
583  * signaled flag, too.
584  */
585  stateP->nextMsgNum = max;
586  stateP->resetState = false;
587  stateP->signaled = false;
588  LWLockRelease(SInvalReadLock);
589  return -1;
590  }
591 
592  /*
593  * Retrieve messages and advance backend's counter, until data array is
594  * full or there are no more messages.
595  *
596  * There may be other backends that haven't read the message(s), so we
597  * cannot delete them here. SICleanupQueue() will eventually remove them
598  * from the queue.
599  */
600  n = 0;
601  while (n < datasize && stateP->nextMsgNum < max)
602  {
603  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
604  stateP->nextMsgNum++;
605  }
606 
607  /*
608  * If we have caught up completely, reset our "signaled" flag so that
609  * we'll get another signal if we fall behind again.
610  *
611  * If we haven't caught up completely, reset the hasMessages flag so that
612  * we see the remaining messages next time.
613  */
614  if (stateP->nextMsgNum >= max)
615  stateP->signaled = false;
616  else
617  stateP->hasMessages = true;
618 
619  LWLockRelease(SInvalReadLock);
620  return n;
621 }
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
slock_t msgnumLock
Definition: sinvaladt.c:178
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
bool hasMessages
Definition: sinvaladt.c:147
int nextMsgNum
Definition: sinvaladt.c:144

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 433 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

434 {
435  SISeg *segP = shmInvalBuffer;
436 
437  /*
438  * N can be arbitrarily large. We divide the work into groups of no more
439  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
440  * an unreasonably long time. (This is not so much because we care about
441  * letting in other writers, as that some just-caught-up backend might be
442  * trying to do SICleanupQueue to pass on its signal, and we don't want it
443  * to have to wait a long time.) Also, we need to consider calling
444  * SICleanupQueue every so often.
445  */
446  while (n > 0)
447  {
448  int nthistime = Min(n, WRITE_QUANTUM);
449  int numMsgs;
450  int max;
451  int i;
452 
453  n -= nthistime;
454 
455  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
456 
457  /*
458  * If the buffer is full, we *must* acquire some space. Clean the
459  * queue and reset anyone who is preventing space from being freed.
460  * Otherwise, clean the queue only when it's exceeded the next
461  * fullness threshold. We have to loop and recheck the buffer state
462  * after any call of SICleanupQueue.
463  */
464  for (;;)
465  {
466  numMsgs = segP->maxMsgNum - segP->minMsgNum;
467  if (numMsgs + nthistime > MAXNUMMESSAGES ||
468  numMsgs >= segP->nextThreshold)
469  SICleanupQueue(true, nthistime);
470  else
471  break;
472  }
473 
474  /*
475  * Insert new message(s) into proper slot of circular buffer
476  */
477  max = segP->maxMsgNum;
478  while (nthistime-- > 0)
479  {
480  segP->buffer[max % MAXNUMMESSAGES] = *data++;
481  max++;
482  }
483 
484  /* Update current value of maxMsgNum using spinlock */
485  SpinLockAcquire(&segP->msgnumLock);
486  segP->maxMsgNum = max;
487  SpinLockRelease(&segP->msgnumLock);
488 
489  /*
490  * Now that the maxMsgNum change is globally visible, we give everyone
491  * a swift kick to make sure they read the newly added messages.
492  * Releasing SInvalWriteLock will enforce a full memory barrier, so
493  * these (unlocked) changes will be committed to memory before we exit
494  * the function.
495  */
496  for (i = 0; i < segP->lastBackend; i++)
497  {
498  ProcState *stateP = &segP->procState[i];
499 
500  stateP->hasMessages = true;
501  }
502 
503  LWLockRelease(SInvalWriteLock);
504  }
505 }
int lastBackend
Definition: sinvaladt.c:175
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
slock_t msgnumLock
Definition: sinvaladt.c:178
#define Min(x, y)
Definition: c.h:927
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define SpinLockAcquire(lock)
Definition: spin.h:62
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:640
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
int i
int nextThreshold
Definition: sinvaladt.c:174
bool hasMessages
Definition: sinvaladt.c:147

◆ SInvalShmemSize()

Size SInvalShmemSize ( void  )

Definition at line 203 of file sinvaladt.c.

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CreateSharedInvalidationState(), and CreateSharedMemoryAndSemaphores().

204 {
205  Size size;
206 
207  size = offsetof(SISeg, procState);
208  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
209 
210  return size;
211 }
int MaxBackends
Definition: globals.c:136
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:473
#define offsetof(type, field)
Definition: c.h:668