PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
sinvaladt.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  * POSTGRES shared cache invalidation data manager.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/storage/ipc/sinvaladt.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <signal.h>
18 #include <unistd.h>
19 
20 #include "miscadmin.h"
21 #include "storage/backendid.h"
22 #include "storage/ipc.h"
23 #include "storage/proc.h"
24 #include "storage/procsignal.h"
25 #include "storage/shmem.h"
26 #include "storage/sinvaladt.h"
27 #include "storage/spin.h"
28 #include "access/transam.h"
29 
30 
31 /*
32  * Conceptually, the shared cache invalidation messages are stored in an
33  * infinite array, where maxMsgNum is the next array subscript to store a
34  * submitted message in, minMsgNum is the smallest array subscript containing
35  * a message not yet read by all backends, and we always have maxMsgNum >=
36  * minMsgNum. (They are equal when there are no messages pending.) For each
37  * active backend, there is a nextMsgNum pointer indicating the next message it
38  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
39  * backend.
40  *
41  * (In the current implementation, minMsgNum is a lower bound for the
42  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
43  * smallest nextMsgNum --- it may lag behind. We only update it when
44  * SICleanupQueue is called, and we try not to do that often.)
45  *
46  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
47  * entries. We translate MsgNum values into circular-buffer indexes by
48  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
49  * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
50  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
51  * in the buffer. If the buffer does overflow, we recover by setting the
52  * "reset" flag for each backend that has fallen too far behind. A backend
53  * that is in "reset" state is ignored while determining minMsgNum. When
54  * it does finally attempt to receive inval messages, it must discard all
55  * its invalidatable state, since it won't know what it missed.
56  *
57  * To reduce the probability of needing resets, we send a "catchup" interrupt
58  * to any backend that seems to be falling unreasonably far behind. The
59  * normal behavior is that at most one such interrupt is in flight at a time;
60  * when a backend completes processing a catchup interrupt, it executes
61  * SICleanupQueue, which will signal the next-furthest-behind backend if
62  * needed. This avoids undue contention from multiple backends all trying
63  * to catch up at once. However, the furthest-back backend might be stuck
64  * in a state where it can't catch up. Eventually it will get reset, so it
65  * won't cause any more problems for anyone but itself. But we don't want
66  * to find that a bunch of other backends are now too close to the reset
67  * threshold to be saved. So SICleanupQueue is designed to occasionally
68  * send extra catchup interrupts as the queue gets fuller, to backends that
69  * are far behind and haven't gotten one yet. As long as there aren't a lot
70  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
71  * that aren't stuck will propagate their interrupts to the next guy.
72  *
73  * We would have problems if the MsgNum values overflow an integer, so
74  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
75  * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
76  * large so that we don't need to do this often. It must be a multiple of
77  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
78  * to be moved when we do it.
79  *
80  * Access to the shared sinval array is protected by two locks, SInvalReadLock
81  * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
82  * authorizes them to modify their own ProcState but not to modify or even
83  * look at anyone else's. When we need to perform array-wide updates,
84  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
85  * lock out all readers. Writers take SInvalWriteLock (always in exclusive
86  * mode) to serialize adding messages to the queue. Note that a writer
87  * can operate in parallel with one or more readers, because the writer
88  * has no need to touch anyone's ProcState, except in the infrequent cases
89  * when SICleanupQueue is needed. The only point of overlap is that
90  * the writer wants to change maxMsgNum while readers need to read it.
91  * We deal with that by having a spinlock that readers must take for just
92  * long enough to read maxMsgNum, while writers take it for just long enough
93  * to write maxMsgNum. (The exact rule is that you need the spinlock to
94  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
95  * spinlock to write maxMsgNum unless you are holding both locks.)
96  *
97  * Note: since maxMsgNum is an int and hence presumably atomically readable/
98  * writable, the spinlock might seem unnecessary. The reason it is needed
99  * is to provide a memory barrier: we need to be sure that messages written
100  * to the array are actually there before maxMsgNum is increased, and that
101  * readers will see that data after fetching maxMsgNum. Multiprocessors
102  * that have weak memory-ordering guarantees can fail without the memory
103  * barrier instructions that are included in the spinlock sequences.
104  */
105 
106 
107 /*
108  * Configurable parameters.
109  *
110  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
111  * Must be a power of 2 for speed.
112  *
113  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
114  * Must be a multiple of MAXNUMMESSAGES. Should be large.
115  *
116  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
117  * before we bother to call SICleanupQueue.
118  *
119  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
120  * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
121  *
122  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
123  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
124  *
125  * WRITE_QUANTUM: the max number of messages to push into the buffer per
126  * iteration of SIInsertDataEntries. Noncritical but should be less than
127  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
128  * per iteration.
129  */
130 
131 #define MAXNUMMESSAGES 4096
132 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
133 #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
134 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
135 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
136 #define WRITE_QUANTUM 64
137 
138 /* Per-backend state in shared invalidation structure */
139 typedef struct ProcState
140 {
141  /* procPid is zero in an inactive ProcState array entry. */
142  pid_t procPid; /* PID of backend, for signaling */
143  PGPROC *proc; /* PGPROC of backend */
144  /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
145  int nextMsgNum; /* next message number to read */
146  bool resetState; /* backend needs to reset its state */
147  bool signaled; /* backend has been sent catchup signal */
148  bool hasMessages; /* backend has unread messages */
149 
150  /*
151  * Backend only sends invalidations, never receives them. This only makes
152  * sense for Startup process during recovery because it doesn't maintain a
153  * relcache, yet it fires inval messages to allow query backends to see
154  * schema changes.
155  */
156  bool sendOnly; /* backend only sends, never receives */
157 
158  /*
159  * Next LocalTransactionId to use for each idle backend slot. We keep
160  * this here because it is indexed by BackendId and it is convenient to
161  * copy the value to and from local memory when MyBackendId is set. It's
162  * meaningless in an active ProcState entry.
163  */
165 } ProcState;
166 
167 /* Shared cache invalidation memory segment */
168 typedef struct SISeg
169 {
170  /*
171  * General state information
172  */
173  int minMsgNum; /* oldest message still needed */
174  int maxMsgNum; /* next message number to be assigned */
175  int nextThreshold; /* # of messages to call SICleanupQueue */
176  int lastBackend; /* index of last active procState entry, +1 */
177  int maxBackends; /* size of procState array */
178 
179  slock_t msgnumLock; /* spinlock protecting maxMsgNum */
180 
181  /*
182  * Circular buffer holding shared-inval messages
183  */
185 
186  /*
187  * Per-backend invalidation state info (has MaxBackends entries).
188  */
189  ProcState procState[FLEXIBLE_ARRAY_MEMBER];
190 } SISeg;
191 
192 static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
193 
194 
196 
197 static void CleanupInvalidationState(int status, Datum arg);
198 
199 
200 /*
201  * SInvalShmemSize --- return shared-memory space needed
202  */
203 Size
205 {
206  Size size;
207 
208  size = offsetof(SISeg, procState);
209  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
210 
211  return size;
212 }
213 
214 /*
215  * CreateSharedInvalidationState
216  * Create and initialize the SI message buffer
217  */
218 void
220 {
221  int i;
222  bool found;
223 
224  /* Allocate space in shared memory */
225  shmInvalBuffer = (SISeg *)
226  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
227  if (found)
228  return;
229 
230  /* Clear message counters, save size of procState array, init spinlock */
231  shmInvalBuffer->minMsgNum = 0;
232  shmInvalBuffer->maxMsgNum = 0;
233  shmInvalBuffer->nextThreshold = CLEANUP_MIN;
234  shmInvalBuffer->lastBackend = 0;
235  shmInvalBuffer->maxBackends = MaxBackends;
236  SpinLockInit(&shmInvalBuffer->msgnumLock);
237 
238  /* The buffer[] array is initially all unused, so we need not fill it */
239 
240  /* Mark all backends inactive, and initialize nextLXID */
241  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
242  {
243  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
244  shmInvalBuffer->procState[i].proc = NULL;
245  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
246  shmInvalBuffer->procState[i].resetState = false;
247  shmInvalBuffer->procState[i].signaled = false;
248  shmInvalBuffer->procState[i].hasMessages = false;
249  shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
250  }
251 }
252 
253 /*
254  * SharedInvalBackendInit
255  * Initialize a new backend to operate on the sinval buffer
256  */
257 void
259 {
260  int index;
261  ProcState *stateP = NULL;
262  SISeg *segP = shmInvalBuffer;
263 
264  /*
265  * This can run in parallel with read operations, but not with write
266  * operations, since SIInsertDataEntries relies on lastBackend to set
267  * hasMessages appropriately.
268  */
269  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
270 
271  /* Look for a free entry in the procState array */
272  for (index = 0; index < segP->lastBackend; index++)
273  {
274  if (segP->procState[index].procPid == 0) /* inactive slot? */
275  {
276  stateP = &segP->procState[index];
277  break;
278  }
279  }
280 
281  if (stateP == NULL)
282  {
283  if (segP->lastBackend < segP->maxBackends)
284  {
285  stateP = &segP->procState[segP->lastBackend];
286  Assert(stateP->procPid == 0);
287  segP->lastBackend++;
288  }
289  else
290  {
291  /*
292  * out of procState slots: MaxBackends exceeded -- report normally
293  */
295  LWLockRelease(SInvalWriteLock);
296  ereport(FATAL,
297  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
298  errmsg("sorry, too many clients already")));
299  }
300  }
301 
302  MyBackendId = (stateP - &segP->procState[0]) + 1;
303 
304  /* Advertise assigned backend ID in MyProc */
306 
307  /* Fetch next local transaction ID into local memory */
309 
310  /* mark myself active, with all extant messages already read */
311  stateP->procPid = MyProcPid;
312  stateP->proc = MyProc;
313  stateP->nextMsgNum = segP->maxMsgNum;
314  stateP->resetState = false;
315  stateP->signaled = false;
316  stateP->hasMessages = false;
317  stateP->sendOnly = sendOnly;
318 
319  LWLockRelease(SInvalWriteLock);
320 
321  /* register exit routine to mark my entry inactive at exit */
323 
324  elog(DEBUG4, "my backend ID is %d", MyBackendId);
325 }
326 
327 /*
328  * CleanupInvalidationState
329  * Mark the current backend as no longer active.
330  *
331  * This function is called via on_shmem_exit() during backend shutdown.
332  *
333  * arg is really of type "SISeg*".
334  */
335 static void
337 {
338  SISeg *segP = (SISeg *) DatumGetPointer(arg);
339  ProcState *stateP;
340  int i;
341 
342  Assert(PointerIsValid(segP));
343 
344  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
345 
346  stateP = &segP->procState[MyBackendId - 1];
347 
348  /* Update next local transaction ID for next holder of this backendID */
350 
351  /* Mark myself inactive */
352  stateP->procPid = 0;
353  stateP->proc = NULL;
354  stateP->nextMsgNum = 0;
355  stateP->resetState = false;
356  stateP->signaled = false;
357 
358  /* Recompute index of last active backend */
359  for (i = segP->lastBackend; i > 0; i--)
360  {
361  if (segP->procState[i - 1].procPid != 0)
362  break;
363  }
364  segP->lastBackend = i;
365 
366  LWLockRelease(SInvalWriteLock);
367 }
368 
369 /*
370  * BackendIdGetProc
371  * Get the PGPROC structure for a backend, given the backend ID.
372  * The result may be out of date arbitrarily quickly, so the caller
373  * must be careful about how this information is used. NULL is
374  * returned if the backend is not active.
375  */
376 PGPROC *
377 BackendIdGetProc(int backendID)
378 {
379  PGPROC *result = NULL;
380  SISeg *segP = shmInvalBuffer;
381 
382  /* Need to lock out additions/removals of backends */
383  LWLockAcquire(SInvalWriteLock, LW_SHARED);
384 
385  if (backendID > 0 && backendID <= segP->lastBackend)
386  {
387  ProcState *stateP = &segP->procState[backendID - 1];
388 
389  result = stateP->proc;
390  }
391 
392  LWLockRelease(SInvalWriteLock);
393 
394  return result;
395 }
396 
397 /*
398  * BackendIdGetTransactionIds
399  * Get the xid and xmin of the backend. The result may be out of date
400  * arbitrarily quickly, so the caller must be careful about how this
401  * information is used.
402  */
403 void
405 {
406  SISeg *segP = shmInvalBuffer;
407 
408  *xid = InvalidTransactionId;
409  *xmin = InvalidTransactionId;
410 
411  /* Need to lock out additions/removals of backends */
412  LWLockAcquire(SInvalWriteLock, LW_SHARED);
413 
414  if (backendID > 0 && backendID <= segP->lastBackend)
415  {
416  ProcState *stateP = &segP->procState[backendID - 1];
417  PGPROC *proc = stateP->proc;
418 
419  if (proc != NULL)
420  {
421  PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
422 
423  *xid = xact->xid;
424  *xmin = xact->xmin;
425  }
426  }
427 
428  LWLockRelease(SInvalWriteLock);
429 }
430 
431 /*
432  * SIInsertDataEntries
433  * Add new invalidation message(s) to the buffer.
434  */
435 void
437 {
438  SISeg *segP = shmInvalBuffer;
439 
440  /*
441  * N can be arbitrarily large. We divide the work into groups of no more
442  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
443  * an unreasonably long time. (This is not so much because we care about
444  * letting in other writers, as that some just-caught-up backend might be
445  * trying to do SICleanupQueue to pass on its signal, and we don't want it
446  * to have to wait a long time.) Also, we need to consider calling
447  * SICleanupQueue every so often.
448  */
449  while (n > 0)
450  {
451  int nthistime = Min(n, WRITE_QUANTUM);
452  int numMsgs;
453  int max;
454  int i;
455 
456  n -= nthistime;
457 
458  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
459 
460  /*
461  * If the buffer is full, we *must* acquire some space. Clean the
462  * queue and reset anyone who is preventing space from being freed.
463  * Otherwise, clean the queue only when it's exceeded the next
464  * fullness threshold. We have to loop and recheck the buffer state
465  * after any call of SICleanupQueue.
466  */
467  for (;;)
468  {
469  numMsgs = segP->maxMsgNum - segP->minMsgNum;
470  if (numMsgs + nthistime > MAXNUMMESSAGES ||
471  numMsgs >= segP->nextThreshold)
472  SICleanupQueue(true, nthistime);
473  else
474  break;
475  }
476 
477  /*
478  * Insert new message(s) into proper slot of circular buffer
479  */
480  max = segP->maxMsgNum;
481  while (nthistime-- > 0)
482  {
483  segP->buffer[max % MAXNUMMESSAGES] = *data++;
484  max++;
485  }
486 
487  /* Update current value of maxMsgNum using spinlock */
488  SpinLockAcquire(&segP->msgnumLock);
489  segP->maxMsgNum = max;
490  SpinLockRelease(&segP->msgnumLock);
491 
492  /*
493  * Now that the maxMsgNum change is globally visible, we give everyone
494  * a swift kick to make sure they read the newly added messages.
495  * Releasing SInvalWriteLock will enforce a full memory barrier, so
496  * these (unlocked) changes will be committed to memory before we exit
497  * the function.
498  */
499  for (i = 0; i < segP->lastBackend; i++)
500  {
501  ProcState *stateP = &segP->procState[i];
502 
503  stateP->hasMessages = true;
504  }
505 
506  LWLockRelease(SInvalWriteLock);
507  }
508 }
509 
510 /*
511  * SIGetDataEntries
512  * get next SI message(s) for current backend, if there are any
513  *
514  * Possible return values:
515  * 0: no SI message available
516  * n>0: next n SI messages have been extracted into data[]
517  * -1: SI reset message extracted
518  *
519  * If the return value is less than the array size "datasize", the caller
520  * can assume that there are no more SI messages after the one(s) returned.
521  * Otherwise, another call is needed to collect more messages.
522  *
523  * NB: this can run in parallel with other instances of SIGetDataEntries
524  * executing on behalf of other backends, since each instance will modify only
525  * fields of its own backend's ProcState, and no instance will look at fields
526  * of other backends' ProcStates. We express this by grabbing SInvalReadLock
527  * in shared mode. Note that this is not exactly the normal (read-only)
528  * interpretation of a shared lock! Look closely at the interactions before
529  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
530  *
531  * NB: this can also run in parallel with SIInsertDataEntries. It is not
532  * guaranteed that we will return any messages added after the routine is
533  * entered.
534  *
535  * Note: we assume that "datasize" is not so large that it might be important
536  * to break our hold on SInvalReadLock into segments.
537  */
538 int
540 {
541  SISeg *segP;
542  ProcState *stateP;
543  int max;
544  int n;
545 
546  segP = shmInvalBuffer;
547  stateP = &segP->procState[MyBackendId - 1];
548 
549  /*
550  * Before starting to take locks, do a quick, unlocked test to see whether
551  * there can possibly be anything to read. On a multiprocessor system,
552  * it's possible that this load could migrate backwards and occur before
553  * we actually enter this function, so we might miss a sinval message that
554  * was just added by some other processor. But they can't migrate
555  * backwards over a preceding lock acquisition, so it should be OK. If we
556  * haven't acquired a lock preventing against further relevant
557  * invalidations, any such occurrence is not much different than if the
558  * invalidation had arrived slightly later in the first place.
559  */
560  if (!stateP->hasMessages)
561  return 0;
562 
563  LWLockAcquire(SInvalReadLock, LW_SHARED);
564 
565  /*
566  * We must reset hasMessages before determining how many messages we're
567  * going to read. That way, if new messages arrive after we have
568  * determined how many we're reading, the flag will get reset and we'll
569  * notice those messages part-way through.
570  *
571  * Note that, if we don't end up reading all of the messages, we had
572  * better be certain to reset this flag before exiting!
573  */
574  stateP->hasMessages = false;
575 
576  /* Fetch current value of maxMsgNum using spinlock */
577  SpinLockAcquire(&segP->msgnumLock);
578  max = segP->maxMsgNum;
579  SpinLockRelease(&segP->msgnumLock);
580 
581  if (stateP->resetState)
582  {
583  /*
584  * Force reset. We can say we have dealt with any messages added
585  * since the reset, as well; and that means we should clear the
586  * signaled flag, too.
587  */
588  stateP->nextMsgNum = max;
589  stateP->resetState = false;
590  stateP->signaled = false;
591  LWLockRelease(SInvalReadLock);
592  return -1;
593  }
594 
595  /*
596  * Retrieve messages and advance backend's counter, until data array is
597  * full or there are no more messages.
598  *
599  * There may be other backends that haven't read the message(s), so we
600  * cannot delete them here. SICleanupQueue() will eventually remove them
601  * from the queue.
602  */
603  n = 0;
604  while (n < datasize && stateP->nextMsgNum < max)
605  {
606  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
607  stateP->nextMsgNum++;
608  }
609 
610  /*
611  * If we have caught up completely, reset our "signaled" flag so that
612  * we'll get another signal if we fall behind again.
613  *
614  * If we haven't caught up completely, reset the hasMessages flag so that
615  * we see the remaining messages next time.
616  */
617  if (stateP->nextMsgNum >= max)
618  stateP->signaled = false;
619  else
620  stateP->hasMessages = true;
621 
622  LWLockRelease(SInvalReadLock);
623  return n;
624 }
625 
626 /*
627  * SICleanupQueue
628  * Remove messages that have been consumed by all active backends
629  *
630  * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
631  * minFree is the minimum number of message slots to make free.
632  *
633  * Possible side effects of this routine include marking one or more
634  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
635  * to some backend that seems to be getting too far behind. We signal at
636  * most one backend at a time, for reasons explained at the top of the file.
637  *
638  * Caution: because we transiently release write lock when we have to signal
639  * some other backend, it is NOT guaranteed that there are still minFree
640  * free message slots at exit. Caller must recheck and perhaps retry.
641  */
642 void
643 SICleanupQueue(bool callerHasWriteLock, int minFree)
644 {
645  SISeg *segP = shmInvalBuffer;
646  int min,
647  minsig,
648  lowbound,
649  numMsgs,
650  i;
651  ProcState *needSig = NULL;
652 
653  /* Lock out all writers and readers */
654  if (!callerHasWriteLock)
655  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
656  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
657 
658  /*
659  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
660  * furthest-back backend that needs signaling (if any), and reset any
661  * backends that are too far back. Note that because we ignore sendOnly
662  * backends here it is possible for them to keep sending messages without
663  * a problem even when they are the only active backend.
664  */
665  min = segP->maxMsgNum;
666  minsig = min - SIG_THRESHOLD;
667  lowbound = min - MAXNUMMESSAGES + minFree;
668 
669  for (i = 0; i < segP->lastBackend; i++)
670  {
671  ProcState *stateP = &segP->procState[i];
672  int n = stateP->nextMsgNum;
673 
674  /* Ignore if inactive or already in reset state */
675  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
676  continue;
677 
678  /*
679  * If we must free some space and this backend is preventing it, force
680  * him into reset state and then ignore until he catches up.
681  */
682  if (n < lowbound)
683  {
684  stateP->resetState = true;
685  /* no point in signaling him ... */
686  continue;
687  }
688 
689  /* Track the global minimum nextMsgNum */
690  if (n < min)
691  min = n;
692 
693  /* Also see who's furthest back of the unsignaled backends */
694  if (n < minsig && !stateP->signaled)
695  {
696  minsig = n;
697  needSig = stateP;
698  }
699  }
700  segP->minMsgNum = min;
701 
702  /*
703  * When minMsgNum gets really large, decrement all message counters so as
704  * to forestall overflow of the counters. This happens seldom enough that
705  * folding it into the previous loop would be a loser.
706  */
707  if (min >= MSGNUMWRAPAROUND)
708  {
709  segP->minMsgNum -= MSGNUMWRAPAROUND;
710  segP->maxMsgNum -= MSGNUMWRAPAROUND;
711  for (i = 0; i < segP->lastBackend; i++)
712  {
713  /* we don't bother skipping inactive entries here */
715  }
716  }
717 
718  /*
719  * Determine how many messages are still in the queue, and set the
720  * threshold at which we should repeat SICleanupQueue().
721  */
722  numMsgs = segP->maxMsgNum - segP->minMsgNum;
723  if (numMsgs < CLEANUP_MIN)
724  segP->nextThreshold = CLEANUP_MIN;
725  else
726  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
727 
728  /*
729  * Lastly, signal anyone who needs a catchup interrupt. Since
730  * SendProcSignal() might not be fast, we don't want to hold locks while
731  * executing it.
732  */
733  if (needSig)
734  {
735  pid_t his_pid = needSig->procPid;
736  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
737 
738  needSig->signaled = true;
739  LWLockRelease(SInvalReadLock);
740  LWLockRelease(SInvalWriteLock);
741  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
742  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
743  if (callerHasWriteLock)
744  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
745  }
746  else
747  {
748  LWLockRelease(SInvalReadLock);
749  if (!callerHasWriteLock)
750  LWLockRelease(SInvalWriteLock);
751  }
752 }
753 
754 
755 /*
756  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
757  *
758  * We split VirtualTransactionIds into two parts so that it is possible
759  * to allocate a new one without any contention for shared memory, except
760  * for a bit of additional overhead during backend startup/shutdown.
761  * The high-order part of a VirtualTransactionId is a BackendId, and the
762  * low-order part is a LocalTransactionId, which we assign from a local
763  * counter. To avoid the risk of a VirtualTransactionId being reused
764  * within a short interval, successive procs occupying the same backend ID
765  * slot should use a consecutive sequence of local IDs, which is implemented
766  * by copying nextLocalTransactionId as seen above.
767  */
770 {
771  LocalTransactionId result;
772 
773  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
774  do
775  {
776  result = nextLocalTransactionId++;
777  } while (!LocalTransactionIdIsValid(result));
778 
779  return result;
780 }
int slock_t
Definition: s_lock.h:888
Size SInvalShmemSize(void)
Definition: sinvaladt.c:204
#define CLEANUP_MIN
Definition: sinvaladt.c:133
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:134
int lastBackend
Definition: sinvaladt.c:176
int MyProcPid
Definition: globals.c:39
BackendId MyBackendId
Definition: globals.c:73
bool signaled
Definition: sinvaladt.c:147
BackendId backendId
Definition: proc.h:113
uint32 TransactionId
Definition: c.h:391
Definition: proc.h:219
void SharedInvalBackendInit(bool sendOnly)
Definition: sinvaladt.c:258
TransactionId xmin
Definition: proc.h:225
PGXACT * allPgXact
Definition: proc.h:246
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:562
TransactionId xid
Definition: proc.h:221
void CreateSharedInvalidationState(void)
Definition: sinvaladt.c:219
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:189
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * proc
Definition: sinvaladt.c:143
slock_t msgnumLock
Definition: sinvaladt.c:179
#define Min(x, y)
Definition: c.h:812
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:336
int errcode(int sqlerrcode)
Definition: elog.c:575
PROC_HDR * ProcGlobal
Definition: proc.c:80
#define DEBUG4
Definition: elog.h:22
#define SIG_THRESHOLD
Definition: sinvaladt.c:135
bool resetState
Definition: sinvaladt.c:146
Definition: type.h:89
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define FATAL
Definition: elog.h:52
int SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
Definition: sinvaladt.c:539
LocalTransactionId GetNextLocalTransactionId(void)
Definition: sinvaladt.c:769
int MaxBackends
Definition: globals.c:126
bool sendOnly
Definition: sinvaladt.c:156
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define InvalidTransactionId
Definition: transam.h:31
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:643
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:184
uint32 LocalTransactionId
Definition: c.h:393
#define WRITE_QUANTUM
Definition: sinvaladt.c:136
#define ereport(elevel, rest)
Definition: elog.h:122
int maxMsgNum
Definition: sinvaladt.c:174
#define MAXNUMMESSAGES
Definition: sinvaladt.c:131
#define SpinLockRelease(lock)
Definition: spin.h:64
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
uintptr_t Datum
Definition: postgres.h:372
void BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
Definition: sinvaladt.c:404
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int BackendId
Definition: backendid.h:21
LocalTransactionId nextLXID
Definition: sinvaladt.c:164
#define Assert(condition)
Definition: c.h:681
struct ProcState ProcState
size_t Size
Definition: c.h:350
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:195
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:192
struct SISeg SISeg
#define DatumGetPointer(X)
Definition: postgres.h:555
int minMsgNum
Definition: sinvaladt.c:173
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:132
int pgprocno
Definition: proc.h:110
int errmsg(const char *fmt,...)
Definition: elog.c:797
int maxBackends
Definition: sinvaladt.c:177
#define InvalidLocalTransactionId
Definition: lock.h:69
int i
int nextThreshold
Definition: sinvaladt.c:175
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:70
void * arg
void SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
Definition: sinvaladt.c:436
bool hasMessages
Definition: sinvaladt.c:148
#define elog
Definition: elog.h:219
pid_t procPid
Definition: sinvaladt.c:142
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
Definition: proc.h:95
#define PointerIsValid(pointer)
Definition: c.h:520
#define offsetof(type, field)
Definition: c.h:549
static volatile sig_atomic_t signaled
Definition: pg_standby.c:53
int nextMsgNum
Definition: sinvaladt.c:145
PGPROC * BackendIdGetProc(int backendID)
Definition: sinvaladt.c:377