PostgreSQL Source Code  git master
sinvaladt.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/transam.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
Include dependency graph for sinvaladt.c:

Go to the source code of this file.

Data Structures

struct  ProcState
 
struct  SISeg
 

Macros

#define MAXNUMMESSAGES   4096
 
#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)
 
#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)
 
#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)
 
#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)
 
#define WRITE_QUANTUM   64
 

Typedefs

typedef struct ProcState ProcState
 
typedef struct SISeg SISeg
 

Functions

static void CleanupInvalidationState (int status, Datum arg)
 
Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Variables

static SISegshmInvalBuffer
 
static LocalTransactionId nextLocalTransactionId
 

Macro Definition Documentation

◆ CLEANUP_MIN

#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)

Definition at line 132 of file sinvaladt.c.

Referenced by CreateSharedInvalidationState(), and SICleanupQueue().

◆ CLEANUP_QUANTUM

#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)

Definition at line 133 of file sinvaladt.c.

Referenced by SICleanupQueue().

◆ MAXNUMMESSAGES

#define MAXNUMMESSAGES   4096

Definition at line 130 of file sinvaladt.c.

Referenced by SICleanupQueue(), SIGetDataEntries(), and SIInsertDataEntries().

◆ MSGNUMWRAPAROUND

#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)

Definition at line 131 of file sinvaladt.c.

Referenced by SICleanupQueue().

◆ SIG_THRESHOLD

#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)

Definition at line 134 of file sinvaladt.c.

Referenced by SICleanupQueue().

◆ WRITE_QUANTUM

#define WRITE_QUANTUM   64

Definition at line 135 of file sinvaladt.c.

Referenced by SIInsertDataEntries().

Typedef Documentation

◆ ProcState

typedef struct ProcState ProcState

◆ SISeg

typedef struct SISeg SISeg

Function Documentation

◆ BackendIdGetProc()

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 376 of file sinvaladt.c.

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and shmInvalBuffer.

Referenced by checkTempNamespaceStatus(), VirtualXactLock(), WaitForLockersMultiple(), and WaitForOlderSnapshots().

377 {
378  PGPROC *result = NULL;
379  SISeg *segP = shmInvalBuffer;
380 
381  /* Need to lock out additions/removals of backends */
382  LWLockAcquire(SInvalWriteLock, LW_SHARED);
383 
384  if (backendID > 0 && backendID <= segP->lastBackend)
385  {
386  ProcState *stateP = &segP->procState[backendID - 1];
387 
388  result = stateP->proc;
389  }
390 
391  LWLockRelease(SInvalWriteLock);
392 
393  return result;
394 }
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
Definition: proc.h:112

◆ BackendIdGetTransactionIds()

void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin 
)

Definition at line 403 of file sinvaladt.c.

References InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, shmInvalBuffer, PGPROC::xid, and PGPROC::xmin.

Referenced by pgstat_read_current_status().

404 {
405  SISeg *segP = shmInvalBuffer;
406 
407  *xid = InvalidTransactionId;
408  *xmin = InvalidTransactionId;
409 
410  /* Need to lock out additions/removals of backends */
411  LWLockAcquire(SInvalWriteLock, LW_SHARED);
412 
413  if (backendID > 0 && backendID <= segP->lastBackend)
414  {
415  ProcState *stateP = &segP->procState[backendID - 1];
416  PGPROC *proc = stateP->proc;
417 
418  if (proc != NULL)
419  {
420  *xid = proc->xid;
421  *xmin = proc->xmin;
422  }
423  }
424 
425  LWLockRelease(SInvalWriteLock);
426 }
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
TransactionId xmin
Definition: proc.h:129
#define InvalidTransactionId
Definition: transam.h:31
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
TransactionId xid
Definition: proc.h:124
Definition: proc.h:112

◆ CleanupInvalidationState()

static void CleanupInvalidationState ( int  status,
Datum  arg 
)
static

Definition at line 335 of file sinvaladt.c.

References Assert, DatumGetPointer, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, and ProcState::signaled.

Referenced by SharedInvalBackendInit().

336 {
337  SISeg *segP = (SISeg *) DatumGetPointer(arg);
338  ProcState *stateP;
339  int i;
340 
341  Assert(PointerIsValid(segP));
342 
343  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
344 
345  stateP = &segP->procState[MyBackendId - 1];
346 
347  /* Update next local transaction ID for next holder of this backendID */
349 
350  /* Mark myself inactive */
351  stateP->procPid = 0;
352  stateP->proc = NULL;
353  stateP->nextMsgNum = 0;
354  stateP->resetState = false;
355  stateP->signaled = false;
356 
357  /* Recompute index of last active backend */
358  for (i = segP->lastBackend; i > 0; i--)
359  {
360  if (segP->procState[i - 1].procPid != 0)
361  break;
362  }
363  segP->lastBackend = i;
364 
365  LWLockRelease(SInvalWriteLock);
366 }
int lastBackend
Definition: sinvaladt.c:175
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
#define Assert(condition)
Definition: c.h:745
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
#define DatumGetPointer(X)
Definition: postgres.h:549
int i
void * arg
pid_t procPid
Definition: sinvaladt.c:141
#define PointerIsValid(pointer)
Definition: c.h:639
int nextMsgNum
Definition: sinvaladt.c:144

◆ CreateSharedInvalidationState()

void CreateSharedInvalidationState ( void  )

Definition at line 218 of file sinvaladt.c.

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, MaxBackends, SISeg::maxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

219 {
220  int i;
221  bool found;
222 
223  /* Allocate space in shared memory */
224  shmInvalBuffer = (SISeg *)
225  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
226  if (found)
227  return;
228 
229  /* Clear message counters, save size of procState array, init spinlock */
236 
237  /* The buffer[] array is initially all unused, so we need not fill it */
238 
239  /* Mark all backends inactive, and initialize nextLXID */
240  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
241  {
242  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
243  shmInvalBuffer->procState[i].proc = NULL;
244  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
249  }
250 }
Size SInvalShmemSize(void)
Definition: sinvaladt.c:203
#define CLEANUP_MIN
Definition: sinvaladt.c:132
int lastBackend
Definition: sinvaladt.c:175
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * proc
Definition: sinvaladt.c:142
slock_t msgnumLock
Definition: sinvaladt.c:178
bool resetState
Definition: sinvaladt.c:145
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
int MaxBackends
Definition: globals.c:136
int maxMsgNum
Definition: sinvaladt.c:173
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
int maxBackends
Definition: sinvaladt.c:176
#define InvalidLocalTransactionId
Definition: lock.h:68
int i
int nextThreshold
Definition: sinvaladt.c:174
bool hasMessages
Definition: sinvaladt.c:147
pid_t procPid
Definition: sinvaladt.c:141
int nextMsgNum
Definition: sinvaladt.c:144

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 766 of file sinvaladt.c.

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

767 {
768  LocalTransactionId result;
769 
770  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
771  do
772  {
773  result = nextLocalTransactionId++;
774  } while (!LocalTransactionIdIsValid(result));
775 
776  return result;
777 }
uint32 LocalTransactionId
Definition: c.h:522
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:69

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 257 of file sinvaladt.c.

References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

258 {
259  int index;
260  ProcState *stateP = NULL;
261  SISeg *segP = shmInvalBuffer;
262 
263  /*
264  * This can run in parallel with read operations, but not with write
265  * operations, since SIInsertDataEntries relies on lastBackend to set
266  * hasMessages appropriately.
267  */
268  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
269 
270  /* Look for a free entry in the procState array */
271  for (index = 0; index < segP->lastBackend; index++)
272  {
273  if (segP->procState[index].procPid == 0) /* inactive slot? */
274  {
275  stateP = &segP->procState[index];
276  break;
277  }
278  }
279 
280  if (stateP == NULL)
281  {
282  if (segP->lastBackend < segP->maxBackends)
283  {
284  stateP = &segP->procState[segP->lastBackend];
285  Assert(stateP->procPid == 0);
286  segP->lastBackend++;
287  }
288  else
289  {
290  /*
291  * out of procState slots: MaxBackends exceeded -- report normally
292  */
294  LWLockRelease(SInvalWriteLock);
295  ereport(FATAL,
296  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
297  errmsg("sorry, too many clients already")));
298  }
299  }
300 
301  MyBackendId = (stateP - &segP->procState[0]) + 1;
302 
303  /* Advertise assigned backend ID in MyProc */
305 
306  /* Fetch next local transaction ID into local memory */
308 
309  /* mark myself active, with all extant messages already read */
310  stateP->procPid = MyProcPid;
311  stateP->proc = MyProc;
312  stateP->nextMsgNum = segP->maxMsgNum;
313  stateP->resetState = false;
314  stateP->signaled = false;
315  stateP->hasMessages = false;
316  stateP->sendOnly = sendOnly;
317 
318  LWLockRelease(SInvalWriteLock);
319 
320  /* register exit routine to mark my entry inactive at exit */
322 
323  elog(DEBUG4, "my backend ID is %d", MyBackendId);
324 }
int lastBackend
Definition: sinvaladt.c:175
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
BackendId backendId
Definition: proc.h:144
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:556
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:335
int errcode(int sqlerrcode)
Definition: elog.c:610
#define DEBUG4
Definition: elog.h:22
bool resetState
Definition: sinvaladt.c:145
Definition: type.h:89
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define FATAL
Definition: elog.h:52
bool sendOnly
Definition: sinvaladt.c:155
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int maxMsgNum
Definition: sinvaladt.c:173
#define InvalidBackendId
Definition: backendid.h:23
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:745
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int errmsg(const char *fmt,...)
Definition: elog.c:824
int maxBackends
Definition: sinvaladt.c:176
#define elog(elevel,...)
Definition: elog.h:214
bool hasMessages
Definition: sinvaladt.c:147
pid_t procPid
Definition: sinvaladt.c:141
int nextMsgNum
Definition: sinvaladt.c:144

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 640 of file sinvaladt.c.

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

641 {
642  SISeg *segP = shmInvalBuffer;
643  int min,
644  minsig,
645  lowbound,
646  numMsgs,
647  i;
648  ProcState *needSig = NULL;
649 
650  /* Lock out all writers and readers */
651  if (!callerHasWriteLock)
652  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
653  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
654 
655  /*
656  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
657  * furthest-back backend that needs signaling (if any), and reset any
658  * backends that are too far back. Note that because we ignore sendOnly
659  * backends here it is possible for them to keep sending messages without
660  * a problem even when they are the only active backend.
661  */
662  min = segP->maxMsgNum;
663  minsig = min - SIG_THRESHOLD;
664  lowbound = min - MAXNUMMESSAGES + minFree;
665 
666  for (i = 0; i < segP->lastBackend; i++)
667  {
668  ProcState *stateP = &segP->procState[i];
669  int n = stateP->nextMsgNum;
670 
671  /* Ignore if inactive or already in reset state */
672  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
673  continue;
674 
675  /*
676  * If we must free some space and this backend is preventing it, force
677  * him into reset state and then ignore until he catches up.
678  */
679  if (n < lowbound)
680  {
681  stateP->resetState = true;
682  /* no point in signaling him ... */
683  continue;
684  }
685 
686  /* Track the global minimum nextMsgNum */
687  if (n < min)
688  min = n;
689 
690  /* Also see who's furthest back of the unsignaled backends */
691  if (n < minsig && !stateP->signaled)
692  {
693  minsig = n;
694  needSig = stateP;
695  }
696  }
697  segP->minMsgNum = min;
698 
699  /*
700  * When minMsgNum gets really large, decrement all message counters so as
701  * to forestall overflow of the counters. This happens seldom enough that
702  * folding it into the previous loop would be a loser.
703  */
704  if (min >= MSGNUMWRAPAROUND)
705  {
706  segP->minMsgNum -= MSGNUMWRAPAROUND;
707  segP->maxMsgNum -= MSGNUMWRAPAROUND;
708  for (i = 0; i < segP->lastBackend; i++)
709  {
710  /* we don't bother skipping inactive entries here */
712  }
713  }
714 
715  /*
716  * Determine how many messages are still in the queue, and set the
717  * threshold at which we should repeat SICleanupQueue().
718  */
719  numMsgs = segP->maxMsgNum - segP->minMsgNum;
720  if (numMsgs < CLEANUP_MIN)
721  segP->nextThreshold = CLEANUP_MIN;
722  else
723  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
724 
725  /*
726  * Lastly, signal anyone who needs a catchup interrupt. Since
727  * SendProcSignal() might not be fast, we don't want to hold locks while
728  * executing it.
729  */
730  if (needSig)
731  {
732  pid_t his_pid = needSig->procPid;
733  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
734 
735  needSig->signaled = true;
736  LWLockRelease(SInvalReadLock);
737  LWLockRelease(SInvalWriteLock);
738  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
739  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
740  if (callerHasWriteLock)
741  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
742  }
743  else
744  {
745  LWLockRelease(SInvalReadLock);
746  if (!callerHasWriteLock)
747  LWLockRelease(SInvalWriteLock);
748  }
749 }
#define CLEANUP_MIN
Definition: sinvaladt.c:132
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
int lastBackend
Definition: sinvaladt.c:175
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
#define DEBUG4
Definition: elog.h:22
#define SIG_THRESHOLD
Definition: sinvaladt.c:134
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
bool sendOnly
Definition: sinvaladt.c:155
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
int BackendId
Definition: backendid.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
#define elog(elevel,...)
Definition: elog.h:214
int i
int nextThreshold
Definition: sinvaladt.c:174
pid_t procPid
Definition: sinvaladt.c:141
static volatile sig_atomic_t signaled
Definition: pg_standby.c:52
int nextMsgNum
Definition: sinvaladt.c:144

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 536 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

537 {
538  SISeg *segP;
539  ProcState *stateP;
540  int max;
541  int n;
542 
543  segP = shmInvalBuffer;
544  stateP = &segP->procState[MyBackendId - 1];
545 
546  /*
547  * Before starting to take locks, do a quick, unlocked test to see whether
548  * there can possibly be anything to read. On a multiprocessor system,
549  * it's possible that this load could migrate backwards and occur before
550  * we actually enter this function, so we might miss a sinval message that
551  * was just added by some other processor. But they can't migrate
552  * backwards over a preceding lock acquisition, so it should be OK. If we
553  * haven't acquired a lock preventing against further relevant
554  * invalidations, any such occurrence is not much different than if the
555  * invalidation had arrived slightly later in the first place.
556  */
557  if (!stateP->hasMessages)
558  return 0;
559 
560  LWLockAcquire(SInvalReadLock, LW_SHARED);
561 
562  /*
563  * We must reset hasMessages before determining how many messages we're
564  * going to read. That way, if new messages arrive after we have
565  * determined how many we're reading, the flag will get reset and we'll
566  * notice those messages part-way through.
567  *
568  * Note that, if we don't end up reading all of the messages, we had
569  * better be certain to reset this flag before exiting!
570  */
571  stateP->hasMessages = false;
572 
573  /* Fetch current value of maxMsgNum using spinlock */
574  SpinLockAcquire(&segP->msgnumLock);
575  max = segP->maxMsgNum;
576  SpinLockRelease(&segP->msgnumLock);
577 
578  if (stateP->resetState)
579  {
580  /*
581  * Force reset. We can say we have dealt with any messages added
582  * since the reset, as well; and that means we should clear the
583  * signaled flag, too.
584  */
585  stateP->nextMsgNum = max;
586  stateP->resetState = false;
587  stateP->signaled = false;
588  LWLockRelease(SInvalReadLock);
589  return -1;
590  }
591 
592  /*
593  * Retrieve messages and advance backend's counter, until data array is
594  * full or there are no more messages.
595  *
596  * There may be other backends that haven't read the message(s), so we
597  * cannot delete them here. SICleanupQueue() will eventually remove them
598  * from the queue.
599  */
600  n = 0;
601  while (n < datasize && stateP->nextMsgNum < max)
602  {
603  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
604  stateP->nextMsgNum++;
605  }
606 
607  /*
608  * If we have caught up completely, reset our "signaled" flag so that
609  * we'll get another signal if we fall behind again.
610  *
611  * If we haven't caught up completely, reset the hasMessages flag so that
612  * we see the remaining messages next time.
613  */
614  if (stateP->nextMsgNum >= max)
615  stateP->signaled = false;
616  else
617  stateP->hasMessages = true;
618 
619  LWLockRelease(SInvalReadLock);
620  return n;
621 }
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
slock_t msgnumLock
Definition: sinvaladt.c:178
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
bool hasMessages
Definition: sinvaladt.c:147
int nextMsgNum
Definition: sinvaladt.c:144

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 433 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

434 {
435  SISeg *segP = shmInvalBuffer;
436 
437  /*
438  * N can be arbitrarily large. We divide the work into groups of no more
439  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
440  * an unreasonably long time. (This is not so much because we care about
441  * letting in other writers, as that some just-caught-up backend might be
442  * trying to do SICleanupQueue to pass on its signal, and we don't want it
443  * to have to wait a long time.) Also, we need to consider calling
444  * SICleanupQueue every so often.
445  */
446  while (n > 0)
447  {
448  int nthistime = Min(n, WRITE_QUANTUM);
449  int numMsgs;
450  int max;
451  int i;
452 
453  n -= nthistime;
454 
455  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
456 
457  /*
458  * If the buffer is full, we *must* acquire some space. Clean the
459  * queue and reset anyone who is preventing space from being freed.
460  * Otherwise, clean the queue only when it's exceeded the next
461  * fullness threshold. We have to loop and recheck the buffer state
462  * after any call of SICleanupQueue.
463  */
464  for (;;)
465  {
466  numMsgs = segP->maxMsgNum - segP->minMsgNum;
467  if (numMsgs + nthistime > MAXNUMMESSAGES ||
468  numMsgs >= segP->nextThreshold)
469  SICleanupQueue(true, nthistime);
470  else
471  break;
472  }
473 
474  /*
475  * Insert new message(s) into proper slot of circular buffer
476  */
477  max = segP->maxMsgNum;
478  while (nthistime-- > 0)
479  {
480  segP->buffer[max % MAXNUMMESSAGES] = *data++;
481  max++;
482  }
483 
484  /* Update current value of maxMsgNum using spinlock */
485  SpinLockAcquire(&segP->msgnumLock);
486  segP->maxMsgNum = max;
487  SpinLockRelease(&segP->msgnumLock);
488 
489  /*
490  * Now that the maxMsgNum change is globally visible, we give everyone
491  * a swift kick to make sure they read the newly added messages.
492  * Releasing SInvalWriteLock will enforce a full memory barrier, so
493  * these (unlocked) changes will be committed to memory before we exit
494  * the function.
495  */
496  for (i = 0; i < segP->lastBackend; i++)
497  {
498  ProcState *stateP = &segP->procState[i];
499 
500  stateP->hasMessages = true;
501  }
502 
503  LWLockRelease(SInvalWriteLock);
504  }
505 }
int lastBackend
Definition: sinvaladt.c:175
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
slock_t msgnumLock
Definition: sinvaladt.c:178
#define Min(x, y)
Definition: c.h:927
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define SpinLockAcquire(lock)
Definition: spin.h:62
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:640
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
int i
int nextThreshold
Definition: sinvaladt.c:174
bool hasMessages
Definition: sinvaladt.c:147

◆ SInvalShmemSize()

Size SInvalShmemSize ( void  )

Definition at line 203 of file sinvaladt.c.

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CreateSharedInvalidationState(), and CreateSharedMemoryAndSemaphores().

204 {
205  Size size;
206 
207  size = offsetof(SISeg, procState);
208  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
209 
210  return size;
211 }
int MaxBackends
Definition: globals.c:136
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:473
#define offsetof(type, field)
Definition: c.h:668

Variable Documentation

◆ nextLocalTransactionId

LocalTransactionId nextLocalTransactionId
static

◆ shmInvalBuffer