PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
sinvaladt.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
#include "access/transam.h"
Include dependency graph for sinvaladt.c:

Go to the source code of this file.

Data Structures

struct  ProcState
 
struct  SISeg
 

Macros

#define MAXNUMMESSAGES   4096
 
#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)
 
#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)
 
#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)
 
#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)
 
#define WRITE_QUANTUM   64
 

Typedefs

typedef struct ProcState ProcState
 
typedef struct SISeg SISeg
 

Functions

static void CleanupInvalidationState (int status, Datum arg)
 
Size SInvalShmemSize (void)
 
void CreateSharedInvalidationState (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
PGPROCBackendIdGetProc (int backendID)
 
void BackendIdGetTransactionIds (int backendID, TransactionId *xid, TransactionId *xmin)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Variables

static SISegshmInvalBuffer
 
static LocalTransactionId nextLocalTransactionId
 

Macro Definition Documentation

#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)

Definition at line 133 of file sinvaladt.c.

Referenced by CreateSharedInvalidationState(), and SICleanupQueue().

#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)

Definition at line 134 of file sinvaladt.c.

Referenced by SICleanupQueue().

#define MAXNUMMESSAGES   4096

Definition at line 131 of file sinvaladt.c.

Referenced by SICleanupQueue(), SIGetDataEntries(), and SIInsertDataEntries().

#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)

Definition at line 132 of file sinvaladt.c.

Referenced by SICleanupQueue().

#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)

Definition at line 135 of file sinvaladt.c.

Referenced by SICleanupQueue().

#define WRITE_QUANTUM   64

Definition at line 136 of file sinvaladt.c.

Referenced by SIInsertDataEntries().

Typedef Documentation

Function Documentation

PGPROC* BackendIdGetProc ( int  backendID)

Definition at line 377 of file sinvaladt.c.

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and shmInvalBuffer.

Referenced by do_autovacuum(), and VirtualXactLock().

378 {
379  PGPROC *result = NULL;
380  SISeg *segP = shmInvalBuffer;
381 
382  /* Need to lock out additions/removals of backends */
383  LWLockAcquire(SInvalWriteLock, LW_SHARED);
384 
385  if (backendID > 0 && backendID <= segP->lastBackend)
386  {
387  ProcState *stateP = &segP->procState[backendID - 1];
388 
389  result = stateP->proc;
390  }
391 
392  LWLockRelease(SInvalWriteLock);
393 
394  return result;
395 }
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
PGPROC * proc
Definition: sinvaladt.c:143
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
Definition: proc.h:95
void BackendIdGetTransactionIds ( int  backendID,
TransactionId xid,
TransactionId xmin 
)

Definition at line 404 of file sinvaladt.c.

References PROC_HDR::allPgXact, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), PGPROC::pgprocno, ProcState::proc, ProcGlobal, SISeg::procState, shmInvalBuffer, PGXACT::xid, and PGXACT::xmin.

Referenced by pgstat_read_current_status().

405 {
406  SISeg *segP = shmInvalBuffer;
407 
408  *xid = InvalidTransactionId;
409  *xmin = InvalidTransactionId;
410 
411  /* Need to lock out additions/removals of backends */
412  LWLockAcquire(SInvalWriteLock, LW_SHARED);
413 
414  if (backendID > 0 && backendID <= segP->lastBackend)
415  {
416  ProcState *stateP = &segP->procState[backendID - 1];
417  PGPROC *proc = stateP->proc;
418 
419  if (proc != NULL)
420  {
421  PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
422 
423  *xid = xact->xid;
424  *xmin = xact->xmin;
425  }
426  }
427 
428  LWLockRelease(SInvalWriteLock);
429 }
Definition: proc.h:219
TransactionId xmin
Definition: proc.h:225
PGXACT * allPgXact
Definition: proc.h:246
TransactionId xid
Definition: proc.h:221
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
PGPROC * proc
Definition: sinvaladt.c:143
PROC_HDR * ProcGlobal
Definition: proc.c:80
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define InvalidTransactionId
Definition: transam.h:31
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int pgprocno
Definition: proc.h:110
Definition: proc.h:95
static void CleanupInvalidationState ( int  status,
Datum  arg 
)
static

Definition at line 336 of file sinvaladt.c.

References Assert, DatumGetPointer, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, and ProcState::signaled.

Referenced by SharedInvalBackendInit().

337 {
338  SISeg *segP = (SISeg *) DatumGetPointer(arg);
339  ProcState *stateP;
340  int i;
341 
342  Assert(PointerIsValid(segP));
343 
344  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
345 
346  stateP = &segP->procState[MyBackendId - 1];
347 
348  /* Update next local transaction ID for next holder of this backendID */
350 
351  /* Mark myself inactive */
352  stateP->procPid = 0;
353  stateP->proc = NULL;
354  stateP->nextMsgNum = 0;
355  stateP->resetState = false;
356  stateP->signaled = false;
357 
358  /* Recompute index of last active backend */
359  for (i = segP->lastBackend; i > 0; i--)
360  {
361  if (segP->procState[i - 1].procPid != 0)
362  break;
363  }
364  segP->lastBackend = i;
365 
366  LWLockRelease(SInvalWriteLock);
367 }
int lastBackend
Definition: sinvaladt.c:176
BackendId MyBackendId
Definition: globals.c:73
bool signaled
Definition: sinvaladt.c:147
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
PGPROC * proc
Definition: sinvaladt.c:143
bool resetState
Definition: sinvaladt.c:146
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
LocalTransactionId nextLXID
Definition: sinvaladt.c:164
#define Assert(condition)
Definition: c.h:681
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:195
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
#define DatumGetPointer(X)
Definition: postgres.h:555
int i
void * arg
pid_t procPid
Definition: sinvaladt.c:142
#define PointerIsValid(pointer)
Definition: c.h:520
int nextMsgNum
Definition: sinvaladt.c:145
void CreateSharedInvalidationState ( void  )

Definition at line 219 of file sinvaladt.c.

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::lastBackend, MaxBackends, SISeg::maxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, SInvalShmemSize(), and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

220 {
221  int i;
222  bool found;
223 
224  /* Allocate space in shared memory */
225  shmInvalBuffer = (SISeg *)
226  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
227  if (found)
228  return;
229 
230  /* Clear message counters, save size of procState array, init spinlock */
237 
238  /* The buffer[] array is initially all unused, so we need not fill it */
239 
240  /* Mark all backends inactive, and initialize nextLXID */
241  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
242  {
243  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
244  shmInvalBuffer->procState[i].proc = NULL;
245  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
250  }
251 }
Size SInvalShmemSize(void)
Definition: sinvaladt.c:204
#define CLEANUP_MIN
Definition: sinvaladt.c:133
int lastBackend
Definition: sinvaladt.c:176
bool signaled
Definition: sinvaladt.c:147
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * proc
Definition: sinvaladt.c:143
slock_t msgnumLock
Definition: sinvaladt.c:179
bool resetState
Definition: sinvaladt.c:146
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:126
int maxMsgNum
Definition: sinvaladt.c:174
LocalTransactionId nextLXID
Definition: sinvaladt.c:164
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int minMsgNum
Definition: sinvaladt.c:173
int maxBackends
Definition: sinvaladt.c:177
#define InvalidLocalTransactionId
Definition: lock.h:69
int i
int nextThreshold
Definition: sinvaladt.c:175
bool hasMessages
Definition: sinvaladt.c:148
pid_t procPid
Definition: sinvaladt.c:142
int nextMsgNum
Definition: sinvaladt.c:145
LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 769 of file sinvaladt.c.

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

770 {
771  LocalTransactionId result;
772 
773  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
774  do
775  {
776  result = nextLocalTransactionId++;
777  } while (!LocalTransactionIdIsValid(result));
778 
779  return result;
780 }
uint32 LocalTransactionId
Definition: c.h:393
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:195
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:70
void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 258 of file sinvaladt.c.

References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, InvalidBackendId, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

259 {
260  int index;
261  ProcState *stateP = NULL;
262  SISeg *segP = shmInvalBuffer;
263 
264  /*
265  * This can run in parallel with read operations, but not with write
266  * operations, since SIInsertDataEntries relies on lastBackend to set
267  * hasMessages appropriately.
268  */
269  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
270 
271  /* Look for a free entry in the procState array */
272  for (index = 0; index < segP->lastBackend; index++)
273  {
274  if (segP->procState[index].procPid == 0) /* inactive slot? */
275  {
276  stateP = &segP->procState[index];
277  break;
278  }
279  }
280 
281  if (stateP == NULL)
282  {
283  if (segP->lastBackend < segP->maxBackends)
284  {
285  stateP = &segP->procState[segP->lastBackend];
286  Assert(stateP->procPid == 0);
287  segP->lastBackend++;
288  }
289  else
290  {
291  /*
292  * out of procState slots: MaxBackends exceeded -- report normally
293  */
295  LWLockRelease(SInvalWriteLock);
296  ereport(FATAL,
297  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
298  errmsg("sorry, too many clients already")));
299  }
300  }
301 
302  MyBackendId = (stateP - &segP->procState[0]) + 1;
303 
304  /* Advertise assigned backend ID in MyProc */
306 
307  /* Fetch next local transaction ID into local memory */
309 
310  /* mark myself active, with all extant messages already read */
311  stateP->procPid = MyProcPid;
312  stateP->proc = MyProc;
313  stateP->nextMsgNum = segP->maxMsgNum;
314  stateP->resetState = false;
315  stateP->signaled = false;
316  stateP->hasMessages = false;
317  stateP->sendOnly = sendOnly;
318 
319  LWLockRelease(SInvalWriteLock);
320 
321  /* register exit routine to mark my entry inactive at exit */
323 
324  elog(DEBUG4, "my backend ID is %d", MyBackendId);
325 }
int lastBackend
Definition: sinvaladt.c:176
int MyProcPid
Definition: globals.c:39
BackendId MyBackendId
Definition: globals.c:73
bool signaled
Definition: sinvaladt.c:147
BackendId backendId
Definition: proc.h:113
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:562
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
PGPROC * proc
Definition: sinvaladt.c:143
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:336
int errcode(int sqlerrcode)
Definition: elog.c:575
#define DEBUG4
Definition: elog.h:22
bool resetState
Definition: sinvaladt.c:146
Definition: type.h:89
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define FATAL
Definition: elog.h:52
bool sendOnly
Definition: sinvaladt.c:156
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define ereport(elevel, rest)
Definition: elog.h:122
int maxMsgNum
Definition: sinvaladt.c:174
#define InvalidBackendId
Definition: backendid.h:23
LocalTransactionId nextLXID
Definition: sinvaladt.c:164
#define Assert(condition)
Definition: c.h:681
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:195
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int errmsg(const char *fmt,...)
Definition: elog.c:797
int maxBackends
Definition: sinvaladt.c:177
bool hasMessages
Definition: sinvaladt.c:148
#define elog
Definition: elog.h:219
pid_t procPid
Definition: sinvaladt.c:142
int nextMsgNum
Definition: sinvaladt.c:145
void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 643 of file sinvaladt.c.

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, signaled, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

644 {
645  SISeg *segP = shmInvalBuffer;
646  int min,
647  minsig,
648  lowbound,
649  numMsgs,
650  i;
651  ProcState *needSig = NULL;
652 
653  /* Lock out all writers and readers */
654  if (!callerHasWriteLock)
655  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
656  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
657 
658  /*
659  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
660  * furthest-back backend that needs signaling (if any), and reset any
661  * backends that are too far back. Note that because we ignore sendOnly
662  * backends here it is possible for them to keep sending messages without
663  * a problem even when they are the only active backend.
664  */
665  min = segP->maxMsgNum;
666  minsig = min - SIG_THRESHOLD;
667  lowbound = min - MAXNUMMESSAGES + minFree;
668 
669  for (i = 0; i < segP->lastBackend; i++)
670  {
671  ProcState *stateP = &segP->procState[i];
672  int n = stateP->nextMsgNum;
673 
674  /* Ignore if inactive or already in reset state */
675  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
676  continue;
677 
678  /*
679  * If we must free some space and this backend is preventing it, force
680  * him into reset state and then ignore until he catches up.
681  */
682  if (n < lowbound)
683  {
684  stateP->resetState = true;
685  /* no point in signaling him ... */
686  continue;
687  }
688 
689  /* Track the global minimum nextMsgNum */
690  if (n < min)
691  min = n;
692 
693  /* Also see who's furthest back of the unsignaled backends */
694  if (n < minsig && !stateP->signaled)
695  {
696  minsig = n;
697  needSig = stateP;
698  }
699  }
700  segP->minMsgNum = min;
701 
702  /*
703  * When minMsgNum gets really large, decrement all message counters so as
704  * to forestall overflow of the counters. This happens seldom enough that
705  * folding it into the previous loop would be a loser.
706  */
707  if (min >= MSGNUMWRAPAROUND)
708  {
709  segP->minMsgNum -= MSGNUMWRAPAROUND;
710  segP->maxMsgNum -= MSGNUMWRAPAROUND;
711  for (i = 0; i < segP->lastBackend; i++)
712  {
713  /* we don't bother skipping inactive entries here */
715  }
716  }
717 
718  /*
719  * Determine how many messages are still in the queue, and set the
720  * threshold at which we should repeat SICleanupQueue().
721  */
722  numMsgs = segP->maxMsgNum - segP->minMsgNum;
723  if (numMsgs < CLEANUP_MIN)
724  segP->nextThreshold = CLEANUP_MIN;
725  else
726  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
727 
728  /*
729  * Lastly, signal anyone who needs a catchup interrupt. Since
730  * SendProcSignal() might not be fast, we don't want to hold locks while
731  * executing it.
732  */
733  if (needSig)
734  {
735  pid_t his_pid = needSig->procPid;
736  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
737 
738  needSig->signaled = true;
739  LWLockRelease(SInvalReadLock);
740  LWLockRelease(SInvalWriteLock);
741  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
742  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
743  if (callerHasWriteLock)
744  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
745  }
746  else
747  {
748  LWLockRelease(SInvalReadLock);
749  if (!callerHasWriteLock)
750  LWLockRelease(SInvalWriteLock);
751  }
752 }
#define CLEANUP_MIN
Definition: sinvaladt.c:133
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:134
int lastBackend
Definition: sinvaladt.c:176
bool signaled
Definition: sinvaladt.c:147
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
#define DEBUG4
Definition: elog.h:22
#define SIG_THRESHOLD
Definition: sinvaladt.c:135
bool resetState
Definition: sinvaladt.c:146
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
bool sendOnly
Definition: sinvaladt.c:156
int maxMsgNum
Definition: sinvaladt.c:174
#define MAXNUMMESSAGES
Definition: sinvaladt.c:131
int BackendId
Definition: backendid.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int minMsgNum
Definition: sinvaladt.c:173
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:132
int i
int nextThreshold
Definition: sinvaladt.c:175
#define elog
Definition: elog.h:219
pid_t procPid
Definition: sinvaladt.c:142
static volatile sig_atomic_t signaled
Definition: pg_standby.c:53
int nextMsgNum
Definition: sinvaladt.c:145
int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 539 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

540 {
541  SISeg *segP;
542  ProcState *stateP;
543  int max;
544  int n;
545 
546  segP = shmInvalBuffer;
547  stateP = &segP->procState[MyBackendId - 1];
548 
549  /*
550  * Before starting to take locks, do a quick, unlocked test to see whether
551  * there can possibly be anything to read. On a multiprocessor system,
552  * it's possible that this load could migrate backwards and occur before
553  * we actually enter this function, so we might miss a sinval message that
554  * was just added by some other processor. But they can't migrate
555  * backwards over a preceding lock acquisition, so it should be OK. If we
556  * haven't acquired a lock preventing against further relevant
557  * invalidations, any such occurrence is not much different than if the
558  * invalidation had arrived slightly later in the first place.
559  */
560  if (!stateP->hasMessages)
561  return 0;
562 
563  LWLockAcquire(SInvalReadLock, LW_SHARED);
564 
565  /*
566  * We must reset hasMessages before determining how many messages we're
567  * going to read. That way, if new messages arrive after we have
568  * determined how many we're reading, the flag will get reset and we'll
569  * notice those messages part-way through.
570  *
571  * Note that, if we don't end up reading all of the messages, we had
572  * better be certain to reset this flag before exiting!
573  */
574  stateP->hasMessages = false;
575 
576  /* Fetch current value of maxMsgNum using spinlock */
577  SpinLockAcquire(&segP->msgnumLock);
578  max = segP->maxMsgNum;
579  SpinLockRelease(&segP->msgnumLock);
580 
581  if (stateP->resetState)
582  {
583  /*
584  * Force reset. We can say we have dealt with any messages added
585  * since the reset, as well; and that means we should clear the
586  * signaled flag, too.
587  */
588  stateP->nextMsgNum = max;
589  stateP->resetState = false;
590  stateP->signaled = false;
591  LWLockRelease(SInvalReadLock);
592  return -1;
593  }
594 
595  /*
596  * Retrieve messages and advance backend's counter, until data array is
597  * full or there are no more messages.
598  *
599  * There may be other backends that haven't read the message(s), so we
600  * cannot delete them here. SICleanupQueue() will eventually remove them
601  * from the queue.
602  */
603  n = 0;
604  while (n < datasize && stateP->nextMsgNum < max)
605  {
606  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
607  stateP->nextMsgNum++;
608  }
609 
610  /*
611  * If we have caught up completely, reset our "signaled" flag so that
612  * we'll get another signal if we fall behind again.
613  *
614  * If we haven't caught up completely, reset the hasMessages flag so that
615  * we see the remaining messages next time.
616  */
617  if (stateP->nextMsgNum >= max)
618  stateP->signaled = false;
619  else
620  stateP->hasMessages = true;
621 
622  LWLockRelease(SInvalReadLock);
623  return n;
624 }
BackendId MyBackendId
Definition: globals.c:73
bool signaled
Definition: sinvaladt.c:147
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
slock_t msgnumLock
Definition: sinvaladt.c:179
bool resetState
Definition: sinvaladt.c:146
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:184
int maxMsgNum
Definition: sinvaladt.c:174
#define MAXNUMMESSAGES
Definition: sinvaladt.c:131
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
bool hasMessages
Definition: sinvaladt.c:148
int nextMsgNum
Definition: sinvaladt.c:145
void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 436 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

437 {
438  SISeg *segP = shmInvalBuffer;
439 
440  /*
441  * N can be arbitrarily large. We divide the work into groups of no more
442  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
443  * an unreasonably long time. (This is not so much because we care about
444  * letting in other writers, as that some just-caught-up backend might be
445  * trying to do SICleanupQueue to pass on its signal, and we don't want it
446  * to have to wait a long time.) Also, we need to consider calling
447  * SICleanupQueue every so often.
448  */
449  while (n > 0)
450  {
451  int nthistime = Min(n, WRITE_QUANTUM);
452  int numMsgs;
453  int max;
454  int i;
455 
456  n -= nthistime;
457 
458  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
459 
460  /*
461  * If the buffer is full, we *must* acquire some space. Clean the
462  * queue and reset anyone who is preventing space from being freed.
463  * Otherwise, clean the queue only when it's exceeded the next
464  * fullness threshold. We have to loop and recheck the buffer state
465  * after any call of SICleanupQueue.
466  */
467  for (;;)
468  {
469  numMsgs = segP->maxMsgNum - segP->minMsgNum;
470  if (numMsgs + nthistime > MAXNUMMESSAGES ||
471  numMsgs >= segP->nextThreshold)
472  SICleanupQueue(true, nthistime);
473  else
474  break;
475  }
476 
477  /*
478  * Insert new message(s) into proper slot of circular buffer
479  */
480  max = segP->maxMsgNum;
481  while (nthistime-- > 0)
482  {
483  segP->buffer[max % MAXNUMMESSAGES] = *data++;
484  max++;
485  }
486 
487  /* Update current value of maxMsgNum using spinlock */
488  SpinLockAcquire(&segP->msgnumLock);
489  segP->maxMsgNum = max;
490  SpinLockRelease(&segP->msgnumLock);
491 
492  /*
493  * Now that the maxMsgNum change is globally visible, we give everyone
494  * a swift kick to make sure they read the newly added messages.
495  * Releasing SInvalWriteLock will enforce a full memory barrier, so
496  * these (unlocked) changes will be committed to memory before we exit
497  * the function.
498  */
499  for (i = 0; i < segP->lastBackend; i++)
500  {
501  ProcState *stateP = &segP->procState[i];
502 
503  stateP->hasMessages = true;
504  }
505 
506  LWLockRelease(SInvalWriteLock);
507  }
508 }
int lastBackend
Definition: sinvaladt.c:176
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
slock_t msgnumLock
Definition: sinvaladt.c:179
#define Min(x, y)
Definition: c.h:812
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define SpinLockAcquire(lock)
Definition: spin.h:62
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:643
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:184
#define WRITE_QUANTUM
Definition: sinvaladt.c:136
int maxMsgNum
Definition: sinvaladt.c:174
#define MAXNUMMESSAGES
Definition: sinvaladt.c:131
#define SpinLockRelease(lock)
Definition: spin.h:64
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
int minMsgNum
Definition: sinvaladt.c:173
int i
int nextThreshold
Definition: sinvaladt.c:175
bool hasMessages
Definition: sinvaladt.c:148
Size SInvalShmemSize ( void  )

Definition at line 204 of file sinvaladt.c.

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CreateSharedInvalidationState(), and CreateSharedMemoryAndSemaphores().

205 {
206  Size size;
207 
208  size = offsetof(SISeg, procState);
209  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
210 
211  return size;
212 }
int MaxBackends
Definition: globals.c:126
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:350
#define offsetof(type, field)
Definition: c.h:549

Variable Documentation

LocalTransactionId nextLocalTransactionId
static