PostgreSQL Source Code  git master
sinvaladt.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/transam.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
Include dependency graph for sinvaladt.c:

Go to the source code of this file.

Data Structures

struct  ProcState
 
struct  SISeg
 

Macros

#define MAXNUMMESSAGES   4096
 
#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)
 
#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)
 
#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)
 
#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)
 
#define WRITE_QUANTUM   64
 

Typedefs

typedef struct ProcState ProcState
 
typedef struct SISeg SISeg
 

Functions

static void CleanupInvalidationState (int status, Datum arg)
 
Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Variables

static SISegshmInvalBuffer
 
static LocalTransactionId nextLocalTransactionId
 

Macro Definition Documentation

◆ CLEANUP_MIN

#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)

Definition at line 132 of file sinvaladt.c.

Referenced by CreateSharedInvalidationState(), and SICleanupQueue().

◆ CLEANUP_QUANTUM

#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)

Definition at line 133 of file sinvaladt.c.

Referenced by SICleanupQueue().

◆ MAXNUMMESSAGES

#define MAXNUMMESSAGES   4096

Definition at line 130 of file sinvaladt.c.

Referenced by SICleanupQueue(), SIGetDataEntries(), and SIInsertDataEntries().

◆ MSGNUMWRAPAROUND

#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)

Definition at line 131 of file sinvaladt.c.

Referenced by SICleanupQueue().

◆ SIG_THRESHOLD

#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)

Definition at line 134 of file sinvaladt.c.

Referenced by SICleanupQueue().

◆ WRITE_QUANTUM

#define WRITE_QUANTUM   64

Definition at line 135 of file sinvaladt.c.

Referenced by SIInsertDataEntries().

Typedef Documentation

◆ ProcState

typedef struct ProcState ProcState

◆ SISeg

typedef struct SISeg SISeg

Function Documentation

◆ BackendIdGetProc()

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 376 of file sinvaladt.c.

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and shmInvalBuffer.

Referenced by isTempNamespaceInUse(), VirtualXactLock(), WaitForLockersMultiple(), and WaitForOlderSnapshots().

377 {
378  PGPROC *result = NULL;
379  SISeg *segP = shmInvalBuffer;
380 
381  /* Need to lock out additions/removals of backends */
382  LWLockAcquire(SInvalWriteLock, LW_SHARED);
383 
384  if (backendID > 0 && backendID <= segP->lastBackend)
385  {
386  ProcState *stateP = &segP->procState[backendID - 1];
387 
388  result = stateP->proc;
389  }
390 
391  LWLockRelease(SInvalWriteLock);
392 
393  return result;
394 }
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
Definition: proc.h:95

◆ BackendIdGetTransactionIds()

void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin 
)

Definition at line 403 of file sinvaladt.c.

References PROC_HDR::allPgXact, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), PGPROC::pgprocno, ProcState::proc, ProcGlobal, SISeg::procState, shmInvalBuffer, PGXACT::xid, and PGXACT::xmin.

Referenced by pgstat_read_current_status().

404 {
405  SISeg *segP = shmInvalBuffer;
406 
407  *xid = InvalidTransactionId;
408  *xmin = InvalidTransactionId;
409 
410  /* Need to lock out additions/removals of backends */
411  LWLockAcquire(SInvalWriteLock, LW_SHARED);
412 
413  if (backendID > 0 && backendID <= segP->lastBackend)
414  {
415  ProcState *stateP = &segP->procState[backendID - 1];
416  PGPROC *proc = stateP->proc;
417 
418  if (proc != NULL)
419  {
420  PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
421 
422  *xid = xact->xid;
423  *xmin = xact->xmin;
424  }
425  }
426 
427  LWLockRelease(SInvalWriteLock);
428 }
Definition: proc.h:222
TransactionId xmin
Definition: proc.h:228
PGXACT * allPgXact
Definition: proc.h:249
TransactionId xid
Definition: proc.h:224
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
PROC_HDR * ProcGlobal
Definition: proc.c:80
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define InvalidTransactionId
Definition: transam.h:31
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int pgprocno
Definition: proc.h:110
Definition: proc.h:95

◆ CleanupInvalidationState()

static void CleanupInvalidationState ( int  status,
Datum  arg 
)
static

Definition at line 335 of file sinvaladt.c.

References Assert, DatumGetPointer, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, and ProcState::signaled.

Referenced by SharedInvalBackendInit().

336 {
337  SISeg *segP = (SISeg *) DatumGetPointer(arg);
338  ProcState *stateP;
339  int i;
340 
341  Assert(PointerIsValid(segP));
342 
343  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
344 
345  stateP = &segP->procState[MyBackendId - 1];
346 
347  /* Update next local transaction ID for next holder of this backendID */
349 
350  /* Mark myself inactive */
351  stateP->procPid = 0;
352  stateP->proc = NULL;
353  stateP->nextMsgNum = 0;
354  stateP->resetState = false;
355  stateP->signaled = false;
356 
357  /* Recompute index of last active backend */
358  for (i = segP->lastBackend; i > 0; i--)
359  {
360  if (segP->procState[i - 1].procPid != 0)
361  break;
362  }
363  segP->lastBackend = i;
364 
365  LWLockRelease(SInvalWriteLock);
366 }
int lastBackend
Definition: sinvaladt.c:175
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
#define Assert(condition)
Definition: c.h:739
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define DatumGetPointer(X)
Definition: postgres.h:549
int i
void * arg
pid_t procPid
Definition: sinvaladt.c:141
#define PointerIsValid(pointer)
Definition: c.h:633
int nextMsgNum
Definition: sinvaladt.c:144

◆ CreateSharedInvalidationState()

void CreateSharedInvalidationState ( void  )

Definition at line 218 of file sinvaladt.c.

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, MaxBackends, SISeg::maxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

219 {
220  int i;
221  bool found;
222 
223  /* Allocate space in shared memory */
224  shmInvalBuffer = (SISeg *)
225  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
226  if (found)
227  return;
228 
229  /* Clear message counters, save size of procState array, init spinlock */
236 
237  /* The buffer[] array is initially all unused, so we need not fill it */
238 
239  /* Mark all backends inactive, and initialize nextLXID */
240  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
241  {
242  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
243  shmInvalBuffer->procState[i].proc = NULL;
244  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
249  }
250 }
Size SInvalShmemSize(void)
Definition: sinvaladt.c:203
#define CLEANUP_MIN
Definition: sinvaladt.c:132
int lastBackend
Definition: sinvaladt.c:175
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * proc
Definition: sinvaladt.c:142
slock_t msgnumLock
Definition: sinvaladt.c:178
bool resetState
Definition: sinvaladt.c:145
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:135
int maxMsgNum
Definition: sinvaladt.c:173
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
int maxBackends
Definition: sinvaladt.c:176
#define InvalidLocalTransactionId
Definition: lock.h:68
int i
int nextThreshold
Definition: sinvaladt.c:174
bool hasMessages
Definition: sinvaladt.c:147
pid_t procPid
Definition: sinvaladt.c:141
int nextMsgNum
Definition: sinvaladt.c:144

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 768 of file sinvaladt.c.

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

769 {
770  LocalTransactionId result;
771 
772  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
773  do
774  {
775  result = nextLocalTransactionId++;
776  } while (!LocalTransactionIdIsValid(result));
777 
778  return result;
779 }
uint32 LocalTransactionId
Definition: c.h:516
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:69

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 257 of file sinvaladt.c.

References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

258 {
259  int index;
260  ProcState *stateP = NULL;
261  SISeg *segP = shmInvalBuffer;
262 
263  /*
264  * This can run in parallel with read operations, but not with write
265  * operations, since SIInsertDataEntries relies on lastBackend to set
266  * hasMessages appropriately.
267  */
268  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
269 
270  /* Look for a free entry in the procState array */
271  for (index = 0; index < segP->lastBackend; index++)
272  {
273  if (segP->procState[index].procPid == 0) /* inactive slot? */
274  {
275  stateP = &segP->procState[index];
276  break;
277  }
278  }
279 
280  if (stateP == NULL)
281  {
282  if (segP->lastBackend < segP->maxBackends)
283  {
284  stateP = &segP->procState[segP->lastBackend];
285  Assert(stateP->procPid == 0);
286  segP->lastBackend++;
287  }
288  else
289  {
290  /*
291  * out of procState slots: MaxBackends exceeded -- report normally
292  */
294  LWLockRelease(SInvalWriteLock);
295  ereport(FATAL,
296  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
297  errmsg("sorry, too many clients already")));
298  }
299  }
300 
301  MyBackendId = (stateP - &segP->procState[0]) + 1;
302 
303  /* Advertise assigned backend ID in MyProc */
305 
306  /* Fetch next local transaction ID into local memory */
308 
309  /* mark myself active, with all extant messages already read */
310  stateP->procPid = MyProcPid;
311  stateP->proc = MyProc;
312  stateP->nextMsgNum = segP->maxMsgNum;
313  stateP->resetState = false;
314  stateP->signaled = false;
315  stateP->hasMessages = false;
316  stateP->sendOnly = sendOnly;
317 
318  LWLockRelease(SInvalWriteLock);
319 
320  /* register exit routine to mark my entry inactive at exit */
322 
323  elog(DEBUG4, "my backend ID is %d", MyBackendId);
324 }
int lastBackend
Definition: sinvaladt.c:175
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
BackendId backendId
Definition: proc.h:113
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:556
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
PGPROC * proc
Definition: sinvaladt.c:142
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:335
int errcode(int sqlerrcode)
Definition: elog.c:608
#define DEBUG4
Definition: elog.h:22
bool resetState
Definition: sinvaladt.c:145
Definition: type.h:89
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define FATAL
Definition: elog.h:52
bool sendOnly
Definition: sinvaladt.c:155
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
#define ereport(elevel, rest)
Definition: elog.h:141
int maxMsgNum
Definition: sinvaladt.c:173
#define InvalidBackendId
Definition: backendid.h:23
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
#define Assert(condition)
Definition: c.h:739
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int errmsg(const char *fmt,...)
Definition: elog.c:822
int maxBackends
Definition: sinvaladt.c:176
#define elog(elevel,...)
Definition: elog.h:228
bool hasMessages
Definition: sinvaladt.c:147
pid_t procPid
Definition: sinvaladt.c:141
int nextMsgNum
Definition: sinvaladt.c:144

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 642 of file sinvaladt.c.

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

643 {
644  SISeg *segP = shmInvalBuffer;
645  int min,
646  minsig,
647  lowbound,
648  numMsgs,
649  i;
650  ProcState *needSig = NULL;
651 
652  /* Lock out all writers and readers */
653  if (!callerHasWriteLock)
654  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
655  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
656 
657  /*
658  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
659  * furthest-back backend that needs signaling (if any), and reset any
660  * backends that are too far back. Note that because we ignore sendOnly
661  * backends here it is possible for them to keep sending messages without
662  * a problem even when they are the only active backend.
663  */
664  min = segP->maxMsgNum;
665  minsig = min - SIG_THRESHOLD;
666  lowbound = min - MAXNUMMESSAGES + minFree;
667 
668  for (i = 0; i < segP->lastBackend; i++)
669  {
670  ProcState *stateP = &segP->procState[i];
671  int n = stateP->nextMsgNum;
672 
673  /* Ignore if inactive or already in reset state */
674  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
675  continue;
676 
677  /*
678  * If we must free some space and this backend is preventing it, force
679  * him into reset state and then ignore until he catches up.
680  */
681  if (n < lowbound)
682  {
683  stateP->resetState = true;
684  /* no point in signaling him ... */
685  continue;
686  }
687 
688  /* Track the global minimum nextMsgNum */
689  if (n < min)
690  min = n;
691 
692  /* Also see who's furthest back of the unsignaled backends */
693  if (n < minsig && !stateP->signaled)
694  {
695  minsig = n;
696  needSig = stateP;
697  }
698  }
699  segP->minMsgNum = min;
700 
701  /*
702  * When minMsgNum gets really large, decrement all message counters so as
703  * to forestall overflow of the counters. This happens seldom enough that
704  * folding it into the previous loop would be a loser.
705  */
706  if (min >= MSGNUMWRAPAROUND)
707  {
708  segP->minMsgNum -= MSGNUMWRAPAROUND;
709  segP->maxMsgNum -= MSGNUMWRAPAROUND;
710  for (i = 0; i < segP->lastBackend; i++)
711  {
712  /* we don't bother skipping inactive entries here */
714  }
715  }
716 
717  /*
718  * Determine how many messages are still in the queue, and set the
719  * threshold at which we should repeat SICleanupQueue().
720  */
721  numMsgs = segP->maxMsgNum - segP->minMsgNum;
722  if (numMsgs < CLEANUP_MIN)
723  segP->nextThreshold = CLEANUP_MIN;
724  else
725  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
726 
727  /*
728  * Lastly, signal anyone who needs a catchup interrupt. Since
729  * SendProcSignal() might not be fast, we don't want to hold locks while
730  * executing it.
731  */
732  if (needSig)
733  {
734  pid_t his_pid = needSig->procPid;
735  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
736 
737  needSig->signaled = true;
738  LWLockRelease(SInvalReadLock);
739  LWLockRelease(SInvalWriteLock);
740  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
741  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
742  if (callerHasWriteLock)
743  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
744  }
745  else
746  {
747  LWLockRelease(SInvalReadLock);
748  if (!callerHasWriteLock)
749  LWLockRelease(SInvalWriteLock);
750  }
751 }
#define CLEANUP_MIN
Definition: sinvaladt.c:132
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
int lastBackend
Definition: sinvaladt.c:175
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
#define DEBUG4
Definition: elog.h:22
#define SIG_THRESHOLD
Definition: sinvaladt.c:134
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:179
bool sendOnly
Definition: sinvaladt.c:155
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
int BackendId
Definition: backendid.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
#define elog(elevel,...)
Definition: elog.h:228
int i
int nextThreshold
Definition: sinvaladt.c:174
pid_t procPid
Definition: sinvaladt.c:141
static volatile sig_atomic_t signaled
Definition: pg_standby.c:52
int nextMsgNum
Definition: sinvaladt.c:144

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 538 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

539 {
540  SISeg *segP;
541  ProcState *stateP;
542  int max;
543  int n;
544 
545  segP = shmInvalBuffer;
546  stateP = &segP->procState[MyBackendId - 1];
547 
548  /*
549  * Before starting to take locks, do a quick, unlocked test to see whether
550  * there can possibly be anything to read. On a multiprocessor system,
551  * it's possible that this load could migrate backwards and occur before
552  * we actually enter this function, so we might miss a sinval message that
553  * was just added by some other processor. But they can't migrate
554  * backwards over a preceding lock acquisition, so it should be OK. If we
555  * haven't acquired a lock preventing against further relevant
556  * invalidations, any such occurrence is not much different than if the
557  * invalidation had arrived slightly later in the first place.
558  */
559  if (!stateP->hasMessages)
560  return 0;
561 
562  LWLockAcquire(SInvalReadLock, LW_SHARED);
563 
564  /*
565  * We must reset hasMessages before determining how many messages we're
566  * going to read. That way, if new messages arrive after we have
567  * determined how many we're reading, the flag will get reset and we'll
568  * notice those messages part-way through.
569  *
570  * Note that, if we don't end up reading all of the messages, we had
571  * better be certain to reset this flag before exiting!
572  */
573  stateP->hasMessages = false;
574 
575  /* Fetch current value of maxMsgNum using spinlock */
576  SpinLockAcquire(&segP->msgnumLock);
577  max = segP->maxMsgNum;
578  SpinLockRelease(&segP->msgnumLock);
579 
580  if (stateP->resetState)
581  {
582  /*
583  * Force reset. We can say we have dealt with any messages added
584  * since the reset, as well; and that means we should clear the
585  * signaled flag, too.
586  */
587  stateP->nextMsgNum = max;
588  stateP->resetState = false;
589  stateP->signaled = false;
590  LWLockRelease(SInvalReadLock);
591  return -1;
592  }
593 
594  /*
595  * Retrieve messages and advance backend's counter, until data array is
596  * full or there are no more messages.
597  *
598  * There may be other backends that haven't read the message(s), so we
599  * cannot delete them here. SICleanupQueue() will eventually remove them
600  * from the queue.
601  */
602  n = 0;
603  while (n < datasize && stateP->nextMsgNum < max)
604  {
605  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
606  stateP->nextMsgNum++;
607  }
608 
609  /*
610  * If we have caught up completely, reset our "signaled" flag so that
611  * we'll get another signal if we fall behind again.
612  *
613  * If we haven't caught up completely, reset the hasMessages flag so that
614  * we see the remaining messages next time.
615  */
616  if (stateP->nextMsgNum >= max)
617  stateP->signaled = false;
618  else
619  stateP->hasMessages = true;
620 
621  LWLockRelease(SInvalReadLock);
622  return n;
623 }
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
slock_t msgnumLock
Definition: sinvaladt.c:178
bool resetState
Definition: sinvaladt.c:145
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
bool hasMessages
Definition: sinvaladt.c:147
int nextMsgNum
Definition: sinvaladt.c:144

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 435 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

436 {
437  SISeg *segP = shmInvalBuffer;
438 
439  /*
440  * N can be arbitrarily large. We divide the work into groups of no more
441  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
442  * an unreasonably long time. (This is not so much because we care about
443  * letting in other writers, as that some just-caught-up backend might be
444  * trying to do SICleanupQueue to pass on its signal, and we don't want it
445  * to have to wait a long time.) Also, we need to consider calling
446  * SICleanupQueue every so often.
447  */
448  while (n > 0)
449  {
450  int nthistime = Min(n, WRITE_QUANTUM);
451  int numMsgs;
452  int max;
453  int i;
454 
455  n -= nthistime;
456 
457  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
458 
459  /*
460  * If the buffer is full, we *must* acquire some space. Clean the
461  * queue and reset anyone who is preventing space from being freed.
462  * Otherwise, clean the queue only when it's exceeded the next
463  * fullness threshold. We have to loop and recheck the buffer state
464  * after any call of SICleanupQueue.
465  */
466  for (;;)
467  {
468  numMsgs = segP->maxMsgNum - segP->minMsgNum;
469  if (numMsgs + nthistime > MAXNUMMESSAGES ||
470  numMsgs >= segP->nextThreshold)
471  SICleanupQueue(true, nthistime);
472  else
473  break;
474  }
475 
476  /*
477  * Insert new message(s) into proper slot of circular buffer
478  */
479  max = segP->maxMsgNum;
480  while (nthistime-- > 0)
481  {
482  segP->buffer[max % MAXNUMMESSAGES] = *data++;
483  max++;
484  }
485 
486  /* Update current value of maxMsgNum using spinlock */
487  SpinLockAcquire(&segP->msgnumLock);
488  segP->maxMsgNum = max;
489  SpinLockRelease(&segP->msgnumLock);
490 
491  /*
492  * Now that the maxMsgNum change is globally visible, we give everyone
493  * a swift kick to make sure they read the newly added messages.
494  * Releasing SInvalWriteLock will enforce a full memory barrier, so
495  * these (unlocked) changes will be committed to memory before we exit
496  * the function.
497  */
498  for (i = 0; i < segP->lastBackend; i++)
499  {
500  ProcState *stateP = &segP->procState[i];
501 
502  stateP->hasMessages = true;
503  }
504 
505  LWLockRelease(SInvalWriteLock);
506  }
507 }
int lastBackend
Definition: sinvaladt.c:175
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
slock_t msgnumLock
Definition: sinvaladt.c:178
#define Min(x, y)
Definition: c.h:911
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define SpinLockAcquire(lock)
Definition: spin.h:62
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:642
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
int minMsgNum
Definition: sinvaladt.c:172
int i
int nextThreshold
Definition: sinvaladt.c:174
bool hasMessages
Definition: sinvaladt.c:147

◆ SInvalShmemSize()

Size SInvalShmemSize ( void  )

Definition at line 203 of file sinvaladt.c.

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CreateSharedInvalidationState(), and CreateSharedMemoryAndSemaphores().

204 {
205  Size size;
206 
207  size = offsetof(SISeg, procState);
208  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
209 
210  return size;
211 }
int MaxBackends
Definition: globals.c:135
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:467
#define offsetof(type, field)
Definition: c.h:662

Variable Documentation

◆ nextLocalTransactionId

LocalTransactionId nextLocalTransactionId
static

◆ shmInvalBuffer