PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
sinvaladt.h File Reference
#include "storage/lock.h"
#include "storage/sinval.h"
Include dependency graph for sinvaladt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Function Documentation

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 377 of file sinvaladt.c.

References LW_SHARED, LWLockAcquire(), LWLockRelease(), NULL, ProcState::proc, SISeg::procState, result, and shmInvalBuffer.

Referenced by do_autovacuum(), and VirtualXactLock().

378 {
379  PGPROC *result = NULL;
380  SISeg *segP = shmInvalBuffer;
381 
382  /* Need to lock out additions/removals of backends */
383  LWLockAcquire(SInvalWriteLock, LW_SHARED);
384 
385  if (backendID > 0 && backendID <= segP->lastBackend)
386  {
387  ProcState *stateP = &segP->procState[backendID - 1];
388 
389  result = stateP->proc;
390  }
391 
392  LWLockRelease(SInvalWriteLock);
393 
394  return result;
395 }
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
PGPROC * proc
Definition: sinvaladt.c:143
return result
Definition: formatting.c:1633
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define NULL
Definition: c.h:229
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
Definition: proc.h:94
void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin 
)

Definition at line 404 of file sinvaladt.c.

References PROC_HDR::allPgXact, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), NULL, PGPROC::pgprocno, ProcState::proc, ProcGlobal, SISeg::procState, shmInvalBuffer, PGXACT::xid, and PGXACT::xmin.

Referenced by pgstat_read_current_status().

405 {
406  SISeg *segP = shmInvalBuffer;
407 
408  *xid = InvalidTransactionId;
409  *xmin = InvalidTransactionId;
410 
411  /* Need to lock out additions/removals of backends */
412  LWLockAcquire(SInvalWriteLock, LW_SHARED);
413 
414  if (backendID > 0 && backendID <= segP->lastBackend)
415  {
416  ProcState *stateP = &segP->procState[backendID - 1];
417  PGPROC *proc = stateP->proc;
418 
419  if (proc != NULL)
420  {
421  PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
422 
423  *xid = xact->xid;
424  *xmin = xact->xmin;
425  }
426  }
427 
428  LWLockRelease(SInvalWriteLock);
429 }
Definition: proc.h:207
TransactionId xmin
Definition: proc.h:213
PGXACT * allPgXact
Definition: proc.h:234
TransactionId xid
Definition: proc.h:209
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
PGPROC * proc
Definition: sinvaladt.c:143
PROC_HDR * ProcGlobal
Definition: proc.c:80
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define InvalidTransactionId
Definition: transam.h:31
#define NULL
Definition: c.h:229
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int pgprocno
Definition: proc.h:109
Definition: proc.h:94
void CreateSharedInvalidationState ( void  )

Definition at line 219 of file sinvaladt.c.

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, MaxBackends, SISeg::maxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, NULL, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

220 {
221  int i;
222  bool found;
223 
224  /* Allocate space in shared memory */
225  shmInvalBuffer = (SISeg *)
226  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
227  if (found)
228  return;
229 
230  /* Clear message counters, save size of procState array, init spinlock */
237 
238  /* The buffer[] array is initially all unused, so we need not fill it */
239 
240  /* Mark all backends inactive, and initialize nextLXID */
241  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
242  {
243  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
245  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
250  }
251 }
Size SInvalShmemSize(void)
Definition: sinvaladt.c:204
#define CLEANUP_MIN
Definition: sinvaladt.c:133
int lastBackend
Definition: sinvaladt.c:176
bool signaled
Definition: sinvaladt.c:147
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * proc
Definition: sinvaladt.c:143
slock_t msgnumLock
Definition: sinvaladt.c:179
bool resetState
Definition: sinvaladt.c:146
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:127
int maxMsgNum
Definition: sinvaladt.c:174
LocalTransactionId nextLXID
Definition: sinvaladt.c:164
#define NULL
Definition: c.h:229
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int minMsgNum
Definition: sinvaladt.c:173
int maxBackends
Definition: sinvaladt.c:177
#define InvalidLocalTransactionId
Definition: lock.h:69
int i
int nextThreshold
Definition: sinvaladt.c:175
bool hasMessages
Definition: sinvaladt.c:148
pid_t procPid
Definition: sinvaladt.c:142
int nextMsgNum
Definition: sinvaladt.c:145
LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 769 of file sinvaladt.c.

References LocalTransactionIdIsValid, nextLocalTransactionId, and result.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

770 {
772 
773  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
774  do
775  {
776  result = nextLocalTransactionId++;
777  } while (!LocalTransactionIdIsValid(result));
778 
779  return result;
780 }
return result
Definition: formatting.c:1633
uint32 LocalTransactionId
Definition: c.h:399
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:195
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:70
void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 258 of file sinvaladt.c.

References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, NULL, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

259 {
260  int index;
261  ProcState *stateP = NULL;
262  SISeg *segP = shmInvalBuffer;
263 
264  /*
265  * This can run in parallel with read operations, but not with write
266  * operations, since SIInsertDataEntries relies on lastBackend to set
267  * hasMessages appropriately.
268  */
269  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
270 
271  /* Look for a free entry in the procState array */
272  for (index = 0; index < segP->lastBackend; index++)
273  {
274  if (segP->procState[index].procPid == 0) /* inactive slot? */
275  {
276  stateP = &segP->procState[index];
277  break;
278  }
279  }
280 
281  if (stateP == NULL)
282  {
283  if (segP->lastBackend < segP->maxBackends)
284  {
285  stateP = &segP->procState[segP->lastBackend];
286  Assert(stateP->procPid == 0);
287  segP->lastBackend++;
288  }
289  else
290  {
291  /*
292  * out of procState slots: MaxBackends exceeded -- report normally
293  */
295  LWLockRelease(SInvalWriteLock);
296  ereport(FATAL,
297  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
298  errmsg("sorry, too many clients already")));
299  }
300  }
301 
302  MyBackendId = (stateP - &segP->procState[0]) + 1;
303 
304  /* Advertise assigned backend ID in MyProc */
306 
307  /* Fetch next local transaction ID into local memory */
309 
310  /* mark myself active, with all extant messages already read */
311  stateP->procPid = MyProcPid;
312  stateP->proc = MyProc;
313  stateP->nextMsgNum = segP->maxMsgNum;
314  stateP->resetState = false;
315  stateP->signaled = false;
316  stateP->hasMessages = false;
317  stateP->sendOnly = sendOnly;
318 
319  LWLockRelease(SInvalWriteLock);
320 
321  /* register exit routine to mark my entry inactive at exit */
323 
324  elog(DEBUG4, "my backend ID is %d", MyBackendId);
325 }
int lastBackend
Definition: sinvaladt.c:176
int MyProcPid
Definition: globals.c:39
BackendId MyBackendId
Definition: globals.c:73
bool signaled
Definition: sinvaladt.c:147
BackendId backendId
Definition: proc.h:112
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:562
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
PGPROC * proc
Definition: sinvaladt.c:143
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:336
int errcode(int sqlerrcode)
Definition: elog.c:575
#define DEBUG4
Definition: elog.h:22
bool resetState
Definition: sinvaladt.c:146
Definition: type.h:89
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define FATAL
Definition: elog.h:52
bool sendOnly
Definition: sinvaladt.c:156
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define ereport(elevel, rest)
Definition: elog.h:122
int maxMsgNum
Definition: sinvaladt.c:174
#define InvalidBackendId
Definition: backendid.h:23
LocalTransactionId nextLXID
Definition: sinvaladt.c:164
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:195
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int errmsg(const char *fmt,...)
Definition: elog.c:797
int maxBackends
Definition: sinvaladt.c:177
bool hasMessages
Definition: sinvaladt.c:148
#define elog
Definition: elog.h:219
pid_t procPid
Definition: sinvaladt.c:142
int nextMsgNum
Definition: sinvaladt.c:145
void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 643 of file sinvaladt.c.

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, NULL, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, signaled, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

644 {
645  SISeg *segP = shmInvalBuffer;
646  int min,
647  minsig,
648  lowbound,
649  numMsgs,
650  i;
651  ProcState *needSig = NULL;
652 
653  /* Lock out all writers and readers */
654  if (!callerHasWriteLock)
655  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
656  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
657 
658  /*
659  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
660  * furthest-back backend that needs signaling (if any), and reset any
661  * backends that are too far back. Note that because we ignore sendOnly
662  * backends here it is possible for them to keep sending messages without
663  * a problem even when they are the only active backend.
664  */
665  min = segP->maxMsgNum;
666  minsig = min - SIG_THRESHOLD;
667  lowbound = min - MAXNUMMESSAGES + minFree;
668 
669  for (i = 0; i < segP->lastBackend; i++)
670  {
671  ProcState *stateP = &segP->procState[i];
672  int n = stateP->nextMsgNum;
673 
674  /* Ignore if inactive or already in reset state */
675  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
676  continue;
677 
678  /*
679  * If we must free some space and this backend is preventing it, force
680  * him into reset state and then ignore until he catches up.
681  */
682  if (n < lowbound)
683  {
684  stateP->resetState = true;
685  /* no point in signaling him ... */
686  continue;
687  }
688 
689  /* Track the global minimum nextMsgNum */
690  if (n < min)
691  min = n;
692 
693  /* Also see who's furthest back of the unsignaled backends */
694  if (n < minsig && !stateP->signaled)
695  {
696  minsig = n;
697  needSig = stateP;
698  }
699  }
700  segP->minMsgNum = min;
701 
702  /*
703  * When minMsgNum gets really large, decrement all message counters so as
704  * to forestall overflow of the counters. This happens seldom enough that
705  * folding it into the previous loop would be a loser.
706  */
707  if (min >= MSGNUMWRAPAROUND)
708  {
709  segP->minMsgNum -= MSGNUMWRAPAROUND;
710  segP->maxMsgNum -= MSGNUMWRAPAROUND;
711  for (i = 0; i < segP->lastBackend; i++)
712  {
713  /* we don't bother skipping inactive entries here */
715  }
716  }
717 
718  /*
719  * Determine how many messages are still in the queue, and set the
720  * threshold at which we should repeat SICleanupQueue().
721  */
722  numMsgs = segP->maxMsgNum - segP->minMsgNum;
723  if (numMsgs < CLEANUP_MIN)
724  segP->nextThreshold = CLEANUP_MIN;
725  else
726  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
727 
728  /*
729  * Lastly, signal anyone who needs a catchup interrupt. Since
730  * SendProcSignal() might not be fast, we don't want to hold locks while
731  * executing it.
732  */
733  if (needSig)
734  {
735  pid_t his_pid = needSig->procPid;
736  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
737 
738  needSig->signaled = true;
739  LWLockRelease(SInvalReadLock);
740  LWLockRelease(SInvalWriteLock);
741  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
742  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
743  if (callerHasWriteLock)
744  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
745  }
746  else
747  {
748  LWLockRelease(SInvalReadLock);
749  if (!callerHasWriteLock)
750  LWLockRelease(SInvalWriteLock);
751  }
752 }
#define CLEANUP_MIN
Definition: sinvaladt.c:133
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:134
int lastBackend
Definition: sinvaladt.c:176
bool signaled
Definition: sinvaladt.c:147
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
#define DEBUG4
Definition: elog.h:22
#define SIG_THRESHOLD
Definition: sinvaladt.c:135
bool resetState
Definition: sinvaladt.c:146
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
bool sendOnly
Definition: sinvaladt.c:156
int maxMsgNum
Definition: sinvaladt.c:174
#define MAXNUMMESSAGES
Definition: sinvaladt.c:131
int BackendId
Definition: backendid.h:21
#define NULL
Definition: c.h:229
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int minMsgNum
Definition: sinvaladt.c:173
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:132
int i
int nextThreshold
Definition: sinvaladt.c:175
#define elog
Definition: elog.h:219
pid_t procPid
Definition: sinvaladt.c:142
static volatile sig_atomic_t signaled
Definition: pg_standby.c:51
int nextMsgNum
Definition: sinvaladt.c:145
int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 539 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

540 {
541  SISeg *segP;
542  ProcState *stateP;
543  int max;
544  int n;
545 
546  segP = shmInvalBuffer;
547  stateP = &segP->procState[MyBackendId - 1];
548 
549  /*
550  * Before starting to take locks, do a quick, unlocked test to see whether
551  * there can possibly be anything to read. On a multiprocessor system,
552  * it's possible that this load could migrate backwards and occur before
553  * we actually enter this function, so we might miss a sinval message that
554  * was just added by some other processor. But they can't migrate
555  * backwards over a preceding lock acquisition, so it should be OK. If we
556  * haven't acquired a lock preventing against further relevant
557  * invalidations, any such occurrence is not much different than if the
558  * invalidation had arrived slightly later in the first place.
559  */
560  if (!stateP->hasMessages)
561  return 0;
562 
563  LWLockAcquire(SInvalReadLock, LW_SHARED);
564 
565  /*
566  * We must reset hasMessages before determining how many messages we're
567  * going to read. That way, if new messages arrive after we have
568  * determined how many we're reading, the flag will get reset and we'll
569  * notice those messages part-way through.
570  *
571  * Note that, if we don't end up reading all of the messages, we had
572  * better be certain to reset this flag before exiting!
573  */
574  stateP->hasMessages = false;
575 
576  /* Fetch current value of maxMsgNum using spinlock */
577  SpinLockAcquire(&segP->msgnumLock);
578  max = segP->maxMsgNum;
579  SpinLockRelease(&segP->msgnumLock);
580 
581  if (stateP->resetState)
582  {
583  /*
584  * Force reset. We can say we have dealt with any messages added
585  * since the reset, as well; and that means we should clear the
586  * signaled flag, too.
587  */
588  stateP->nextMsgNum = max;
589  stateP->resetState = false;
590  stateP->signaled = false;
591  LWLockRelease(SInvalReadLock);
592  return -1;
593  }
594 
595  /*
596  * Retrieve messages and advance backend's counter, until data array is
597  * full or there are no more messages.
598  *
599  * There may be other backends that haven't read the message(s), so we
600  * cannot delete them here. SICleanupQueue() will eventually remove them
601  * from the queue.
602  */
603  n = 0;
604  while (n < datasize && stateP->nextMsgNum < max)
605  {
606  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
607  stateP->nextMsgNum++;
608  }
609 
610  /*
611  * If we have caught up completely, reset our "signaled" flag so that
612  * we'll get another signal if we fall behind again.
613  *
614  * If we haven't caught up completely, reset the hasMessages flag so that
615  * we see the remaining messages next time.
616  */
617  if (stateP->nextMsgNum >= max)
618  stateP->signaled = false;
619  else
620  stateP->hasMessages = true;
621 
622  LWLockRelease(SInvalReadLock);
623  return n;
624 }
BackendId MyBackendId
Definition: globals.c:73
bool signaled
Definition: sinvaladt.c:147
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
slock_t msgnumLock
Definition: sinvaladt.c:179
bool resetState
Definition: sinvaladt.c:146
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:184
int maxMsgNum
Definition: sinvaladt.c:174
#define MAXNUMMESSAGES
Definition: sinvaladt.c:131
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
bool hasMessages
Definition: sinvaladt.c:148
int nextMsgNum
Definition: sinvaladt.c:145
void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 436 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

437 {
438  SISeg *segP = shmInvalBuffer;
439 
440  /*
441  * N can be arbitrarily large. We divide the work into groups of no more
442  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
443  * an unreasonably long time. (This is not so much because we care about
444  * letting in other writers, as that some just-caught-up backend might be
445  * trying to do SICleanupQueue to pass on its signal, and we don't want it
446  * to have to wait a long time.) Also, we need to consider calling
447  * SICleanupQueue every so often.
448  */
449  while (n > 0)
450  {
451  int nthistime = Min(n, WRITE_QUANTUM);
452  int numMsgs;
453  int max;
454  int i;
455 
456  n -= nthistime;
457 
458  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
459 
460  /*
461  * If the buffer is full, we *must* acquire some space. Clean the
462  * queue and reset anyone who is preventing space from being freed.
463  * Otherwise, clean the queue only when it's exceeded the next
464  * fullness threshold. We have to loop and recheck the buffer state
465  * after any call of SICleanupQueue.
466  */
467  for (;;)
468  {
469  numMsgs = segP->maxMsgNum - segP->minMsgNum;
470  if (numMsgs + nthistime > MAXNUMMESSAGES ||
471  numMsgs >= segP->nextThreshold)
472  SICleanupQueue(true, nthistime);
473  else
474  break;
475  }
476 
477  /*
478  * Insert new message(s) into proper slot of circular buffer
479  */
480  max = segP->maxMsgNum;
481  while (nthistime-- > 0)
482  {
483  segP->buffer[max % MAXNUMMESSAGES] = *data++;
484  max++;
485  }
486 
487  /* Update current value of maxMsgNum using spinlock */
488  SpinLockAcquire(&segP->msgnumLock);
489  segP->maxMsgNum = max;
490  SpinLockRelease(&segP->msgnumLock);
491 
492  /*
493  * Now that the maxMsgNum change is globally visible, we give everyone
494  * a swift kick to make sure they read the newly added messages.
495  * Releasing SInvalWriteLock will enforce a full memory barrier, so
496  * these (unlocked) changes will be committed to memory before we exit
497  * the function.
498  */
499  for (i = 0; i < segP->lastBackend; i++)
500  {
501  ProcState *stateP = &segP->procState[i];
502 
503  stateP->hasMessages = true;
504  }
505 
506  LWLockRelease(SInvalWriteLock);
507  }
508 }
int lastBackend
Definition: sinvaladt.c:176
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
slock_t msgnumLock
Definition: sinvaladt.c:179
#define Min(x, y)
Definition: c.h:807
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define SpinLockAcquire(lock)
Definition: spin.h:62
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:643
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:184
#define WRITE_QUANTUM
Definition: sinvaladt.c:136
int maxMsgNum
Definition: sinvaladt.c:174
#define MAXNUMMESSAGES
Definition: sinvaladt.c:131
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int minMsgNum
Definition: sinvaladt.c:173
int i
int nextThreshold
Definition: sinvaladt.c:175
bool hasMessages
Definition: sinvaladt.c:148
Size SInvalShmemSize ( void  )

Definition at line 204 of file sinvaladt.c.

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CreateSharedInvalidationState(), and CreateSharedMemoryAndSemaphores().

205 {
206  Size size;
207 
208  size = offsetof(SISeg, procState);
209  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
210 
211  return size;
212 }
int MaxBackends
Definition: globals.c:127
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555