PostgreSQL Source Code  git master
sinvaladt.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  * POSTGRES shared cache invalidation data manager.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/storage/ipc/sinvaladt.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <signal.h>
18 #include <unistd.h>
19 
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "storage/backendid.h"
23 #include "storage/ipc.h"
24 #include "storage/proc.h"
25 #include "storage/procsignal.h"
26 #include "storage/shmem.h"
27 #include "storage/sinvaladt.h"
28 #include "storage/spin.h"
29 
30 /*
31  * Conceptually, the shared cache invalidation messages are stored in an
32  * infinite array, where maxMsgNum is the next array subscript to store a
33  * submitted message in, minMsgNum is the smallest array subscript containing
34  * a message not yet read by all backends, and we always have maxMsgNum >=
35  * minMsgNum. (They are equal when there are no messages pending.) For each
36  * active backend, there is a nextMsgNum pointer indicating the next message it
37  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
38  * backend.
39  *
40  * (In the current implementation, minMsgNum is a lower bound for the
41  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
42  * smallest nextMsgNum --- it may lag behind. We only update it when
43  * SICleanupQueue is called, and we try not to do that often.)
44  *
45  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
46  * entries. We translate MsgNum values into circular-buffer indexes by
47  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
48  * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
49  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
50  * in the buffer. If the buffer does overflow, we recover by setting the
51  * "reset" flag for each backend that has fallen too far behind. A backend
52  * that is in "reset" state is ignored while determining minMsgNum. When
53  * it does finally attempt to receive inval messages, it must discard all
54  * its invalidatable state, since it won't know what it missed.
55  *
56  * To reduce the probability of needing resets, we send a "catchup" interrupt
57  * to any backend that seems to be falling unreasonably far behind. The
58  * normal behavior is that at most one such interrupt is in flight at a time;
59  * when a backend completes processing a catchup interrupt, it executes
60  * SICleanupQueue, which will signal the next-furthest-behind backend if
61  * needed. This avoids undue contention from multiple backends all trying
62  * to catch up at once. However, the furthest-back backend might be stuck
63  * in a state where it can't catch up. Eventually it will get reset, so it
64  * won't cause any more problems for anyone but itself. But we don't want
65  * to find that a bunch of other backends are now too close to the reset
66  * threshold to be saved. So SICleanupQueue is designed to occasionally
67  * send extra catchup interrupts as the queue gets fuller, to backends that
68  * are far behind and haven't gotten one yet. As long as there aren't a lot
69  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
70  * that aren't stuck will propagate their interrupts to the next guy.
71  *
72  * We would have problems if the MsgNum values overflow an integer, so
73  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
74  * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
75  * large so that we don't need to do this often. It must be a multiple of
76  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
77  * to be moved when we do it.
78  *
79  * Access to the shared sinval array is protected by two locks, SInvalReadLock
80  * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
81  * authorizes them to modify their own ProcState but not to modify or even
82  * look at anyone else's. When we need to perform array-wide updates,
83  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
84  * lock out all readers. Writers take SInvalWriteLock (always in exclusive
85  * mode) to serialize adding messages to the queue. Note that a writer
86  * can operate in parallel with one or more readers, because the writer
87  * has no need to touch anyone's ProcState, except in the infrequent cases
88  * when SICleanupQueue is needed. The only point of overlap is that
89  * the writer wants to change maxMsgNum while readers need to read it.
90  * We deal with that by having a spinlock that readers must take for just
91  * long enough to read maxMsgNum, while writers take it for just long enough
92  * to write maxMsgNum. (The exact rule is that you need the spinlock to
93  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
94  * spinlock to write maxMsgNum unless you are holding both locks.)
95  *
96  * Note: since maxMsgNum is an int and hence presumably atomically readable/
97  * writable, the spinlock might seem unnecessary. The reason it is needed
98  * is to provide a memory barrier: we need to be sure that messages written
99  * to the array are actually there before maxMsgNum is increased, and that
100  * readers will see that data after fetching maxMsgNum. Multiprocessors
101  * that have weak memory-ordering guarantees can fail without the memory
102  * barrier instructions that are included in the spinlock sequences.
103  */
104 
105 
106 /*
107  * Configurable parameters.
108  *
109  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
110  * Must be a power of 2 for speed.
111  *
112  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
113  * Must be a multiple of MAXNUMMESSAGES. Should be large.
114  *
115  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
116  * before we bother to call SICleanupQueue.
117  *
118  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
119  * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120  *
121  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
122  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
123  *
124  * WRITE_QUANTUM: the max number of messages to push into the buffer per
125  * iteration of SIInsertDataEntries. Noncritical but should be less than
126  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
127  * per iteration.
128  */
129 
130 #define MAXNUMMESSAGES 4096
131 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
132 #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
133 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
134 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
135 #define WRITE_QUANTUM 64
136 
137 /* Per-backend state in shared invalidation structure */
138 typedef struct ProcState
139 {
140  /* procPid is zero in an inactive ProcState array entry. */
141  pid_t procPid; /* PID of backend, for signaling */
142  PGPROC *proc; /* PGPROC of backend */
143  /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
144  int nextMsgNum; /* next message number to read */
145  bool resetState; /* backend needs to reset its state */
146  bool signaled; /* backend has been sent catchup signal */
147  bool hasMessages; /* backend has unread messages */
148 
149  /*
150  * Backend only sends invalidations, never receives them. This only makes
151  * sense for Startup process during recovery because it doesn't maintain a
152  * relcache, yet it fires inval messages to allow query backends to see
153  * schema changes.
154  */
155  bool sendOnly; /* backend only sends, never receives */
156 
157  /*
158  * Next LocalTransactionId to use for each idle backend slot. We keep
159  * this here because it is indexed by BackendId and it is convenient to
160  * copy the value to and from local memory when MyBackendId is set. It's
161  * meaningless in an active ProcState entry.
162  */
164 } ProcState;
165 
166 /* Shared cache invalidation memory segment */
167 typedef struct SISeg
168 {
169  /*
170  * General state information
171  */
172  int minMsgNum; /* oldest message still needed */
173  int maxMsgNum; /* next message number to be assigned */
174  int nextThreshold; /* # of messages to call SICleanupQueue */
175  int lastBackend; /* index of last active procState entry, +1 */
176  int maxBackends; /* size of procState array */
177 
178  slock_t msgnumLock; /* spinlock protecting maxMsgNum */
179 
180  /*
181  * Circular buffer holding shared-inval messages
182  */
184 
185  /*
186  * Per-backend invalidation state info (has MaxBackends entries).
187  */
188  ProcState procState[FLEXIBLE_ARRAY_MEMBER];
189 } SISeg;
190 
191 static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
192 
193 
195 
196 static void CleanupInvalidationState(int status, Datum arg);
197 
198 
199 /*
200  * SInvalShmemSize --- return shared-memory space needed
201  */
202 Size
204 {
205  Size size;
206 
207  size = offsetof(SISeg, procState);
208  size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
209 
210  return size;
211 }
212 
213 /*
214  * CreateSharedInvalidationState
215  * Create and initialize the SI message buffer
216  */
217 void
219 {
220  int i;
221  bool found;
222 
223  /* Allocate space in shared memory */
224  shmInvalBuffer = (SISeg *)
225  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
226  if (found)
227  return;
228 
229  /* Clear message counters, save size of procState array, init spinlock */
230  shmInvalBuffer->minMsgNum = 0;
231  shmInvalBuffer->maxMsgNum = 0;
232  shmInvalBuffer->nextThreshold = CLEANUP_MIN;
233  shmInvalBuffer->lastBackend = 0;
234  shmInvalBuffer->maxBackends = MaxBackends;
235  SpinLockInit(&shmInvalBuffer->msgnumLock);
236 
237  /* The buffer[] array is initially all unused, so we need not fill it */
238 
239  /* Mark all backends inactive, and initialize nextLXID */
240  for (i = 0; i < shmInvalBuffer->maxBackends; i++)
241  {
242  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
243  shmInvalBuffer->procState[i].proc = NULL;
244  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
245  shmInvalBuffer->procState[i].resetState = false;
246  shmInvalBuffer->procState[i].signaled = false;
247  shmInvalBuffer->procState[i].hasMessages = false;
248  shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
249  }
250 }
251 
252 /*
253  * SharedInvalBackendInit
254  * Initialize a new backend to operate on the sinval buffer
255  */
256 void
258 {
259  int index;
260  ProcState *stateP = NULL;
261  SISeg *segP = shmInvalBuffer;
262 
263  /*
264  * This can run in parallel with read operations, but not with write
265  * operations, since SIInsertDataEntries relies on lastBackend to set
266  * hasMessages appropriately.
267  */
268  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
269 
270  /* Look for a free entry in the procState array */
271  for (index = 0; index < segP->lastBackend; index++)
272  {
273  if (segP->procState[index].procPid == 0) /* inactive slot? */
274  {
275  stateP = &segP->procState[index];
276  break;
277  }
278  }
279 
280  if (stateP == NULL)
281  {
282  if (segP->lastBackend < segP->maxBackends)
283  {
284  stateP = &segP->procState[segP->lastBackend];
285  Assert(stateP->procPid == 0);
286  segP->lastBackend++;
287  }
288  else
289  {
290  /*
291  * out of procState slots: MaxBackends exceeded -- report normally
292  */
294  LWLockRelease(SInvalWriteLock);
295  ereport(FATAL,
296  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
297  errmsg("sorry, too many clients already")));
298  }
299  }
300 
301  MyBackendId = (stateP - &segP->procState[0]) + 1;
302 
303  /* Advertise assigned backend ID in MyProc */
305 
306  /* Fetch next local transaction ID into local memory */
308 
309  /* mark myself active, with all extant messages already read */
310  stateP->procPid = MyProcPid;
311  stateP->proc = MyProc;
312  stateP->nextMsgNum = segP->maxMsgNum;
313  stateP->resetState = false;
314  stateP->signaled = false;
315  stateP->hasMessages = false;
316  stateP->sendOnly = sendOnly;
317 
318  LWLockRelease(SInvalWriteLock);
319 
320  /* register exit routine to mark my entry inactive at exit */
322 
323  elog(DEBUG4, "my backend ID is %d", MyBackendId);
324 }
325 
326 /*
327  * CleanupInvalidationState
328  * Mark the current backend as no longer active.
329  *
330  * This function is called via on_shmem_exit() during backend shutdown.
331  *
332  * arg is really of type "SISeg*".
333  */
334 static void
336 {
337  SISeg *segP = (SISeg *) DatumGetPointer(arg);
338  ProcState *stateP;
339  int i;
340 
341  Assert(PointerIsValid(segP));
342 
343  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
344 
345  stateP = &segP->procState[MyBackendId - 1];
346 
347  /* Update next local transaction ID for next holder of this backendID */
349 
350  /* Mark myself inactive */
351  stateP->procPid = 0;
352  stateP->proc = NULL;
353  stateP->nextMsgNum = 0;
354  stateP->resetState = false;
355  stateP->signaled = false;
356 
357  /* Recompute index of last active backend */
358  for (i = segP->lastBackend; i > 0; i--)
359  {
360  if (segP->procState[i - 1].procPid != 0)
361  break;
362  }
363  segP->lastBackend = i;
364 
365  LWLockRelease(SInvalWriteLock);
366 }
367 
368 /*
369  * BackendIdGetProc
370  * Get the PGPROC structure for a backend, given the backend ID.
371  * The result may be out of date arbitrarily quickly, so the caller
372  * must be careful about how this information is used. NULL is
373  * returned if the backend is not active.
374  */
375 PGPROC *
376 BackendIdGetProc(int backendID)
377 {
378  PGPROC *result = NULL;
379  SISeg *segP = shmInvalBuffer;
380 
381  /* Need to lock out additions/removals of backends */
382  LWLockAcquire(SInvalWriteLock, LW_SHARED);
383 
384  if (backendID > 0 && backendID <= segP->lastBackend)
385  {
386  ProcState *stateP = &segP->procState[backendID - 1];
387 
388  result = stateP->proc;
389  }
390 
391  LWLockRelease(SInvalWriteLock);
392 
393  return result;
394 }
395 
396 /*
397  * BackendIdGetTransactionIds
398  * Get the xid and xmin of the backend. The result may be out of date
399  * arbitrarily quickly, so the caller must be careful about how this
400  * information is used.
401  */
402 void
404 {
405  SISeg *segP = shmInvalBuffer;
406 
407  *xid = InvalidTransactionId;
408  *xmin = InvalidTransactionId;
409 
410  /* Need to lock out additions/removals of backends */
411  LWLockAcquire(SInvalWriteLock, LW_SHARED);
412 
413  if (backendID > 0 && backendID <= segP->lastBackend)
414  {
415  ProcState *stateP = &segP->procState[backendID - 1];
416  PGPROC *proc = stateP->proc;
417 
418  if (proc != NULL)
419  {
420  PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
421 
422  *xid = xact->xid;
423  *xmin = xact->xmin;
424  }
425  }
426 
427  LWLockRelease(SInvalWriteLock);
428 }
429 
430 /*
431  * SIInsertDataEntries
432  * Add new invalidation message(s) to the buffer.
433  */
434 void
436 {
437  SISeg *segP = shmInvalBuffer;
438 
439  /*
440  * N can be arbitrarily large. We divide the work into groups of no more
441  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
442  * an unreasonably long time. (This is not so much because we care about
443  * letting in other writers, as that some just-caught-up backend might be
444  * trying to do SICleanupQueue to pass on its signal, and we don't want it
445  * to have to wait a long time.) Also, we need to consider calling
446  * SICleanupQueue every so often.
447  */
448  while (n > 0)
449  {
450  int nthistime = Min(n, WRITE_QUANTUM);
451  int numMsgs;
452  int max;
453  int i;
454 
455  n -= nthistime;
456 
457  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
458 
459  /*
460  * If the buffer is full, we *must* acquire some space. Clean the
461  * queue and reset anyone who is preventing space from being freed.
462  * Otherwise, clean the queue only when it's exceeded the next
463  * fullness threshold. We have to loop and recheck the buffer state
464  * after any call of SICleanupQueue.
465  */
466  for (;;)
467  {
468  numMsgs = segP->maxMsgNum - segP->minMsgNum;
469  if (numMsgs + nthistime > MAXNUMMESSAGES ||
470  numMsgs >= segP->nextThreshold)
471  SICleanupQueue(true, nthistime);
472  else
473  break;
474  }
475 
476  /*
477  * Insert new message(s) into proper slot of circular buffer
478  */
479  max = segP->maxMsgNum;
480  while (nthistime-- > 0)
481  {
482  segP->buffer[max % MAXNUMMESSAGES] = *data++;
483  max++;
484  }
485 
486  /* Update current value of maxMsgNum using spinlock */
487  SpinLockAcquire(&segP->msgnumLock);
488  segP->maxMsgNum = max;
489  SpinLockRelease(&segP->msgnumLock);
490 
491  /*
492  * Now that the maxMsgNum change is globally visible, we give everyone
493  * a swift kick to make sure they read the newly added messages.
494  * Releasing SInvalWriteLock will enforce a full memory barrier, so
495  * these (unlocked) changes will be committed to memory before we exit
496  * the function.
497  */
498  for (i = 0; i < segP->lastBackend; i++)
499  {
500  ProcState *stateP = &segP->procState[i];
501 
502  stateP->hasMessages = true;
503  }
504 
505  LWLockRelease(SInvalWriteLock);
506  }
507 }
508 
509 /*
510  * SIGetDataEntries
511  * get next SI message(s) for current backend, if there are any
512  *
513  * Possible return values:
514  * 0: no SI message available
515  * n>0: next n SI messages have been extracted into data[]
516  * -1: SI reset message extracted
517  *
518  * If the return value is less than the array size "datasize", the caller
519  * can assume that there are no more SI messages after the one(s) returned.
520  * Otherwise, another call is needed to collect more messages.
521  *
522  * NB: this can run in parallel with other instances of SIGetDataEntries
523  * executing on behalf of other backends, since each instance will modify only
524  * fields of its own backend's ProcState, and no instance will look at fields
525  * of other backends' ProcStates. We express this by grabbing SInvalReadLock
526  * in shared mode. Note that this is not exactly the normal (read-only)
527  * interpretation of a shared lock! Look closely at the interactions before
528  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
529  *
530  * NB: this can also run in parallel with SIInsertDataEntries. It is not
531  * guaranteed that we will return any messages added after the routine is
532  * entered.
533  *
534  * Note: we assume that "datasize" is not so large that it might be important
535  * to break our hold on SInvalReadLock into segments.
536  */
537 int
539 {
540  SISeg *segP;
541  ProcState *stateP;
542  int max;
543  int n;
544 
545  segP = shmInvalBuffer;
546  stateP = &segP->procState[MyBackendId - 1];
547 
548  /*
549  * Before starting to take locks, do a quick, unlocked test to see whether
550  * there can possibly be anything to read. On a multiprocessor system,
551  * it's possible that this load could migrate backwards and occur before
552  * we actually enter this function, so we might miss a sinval message that
553  * was just added by some other processor. But they can't migrate
554  * backwards over a preceding lock acquisition, so it should be OK. If we
555  * haven't acquired a lock preventing against further relevant
556  * invalidations, any such occurrence is not much different than if the
557  * invalidation had arrived slightly later in the first place.
558  */
559  if (!stateP->hasMessages)
560  return 0;
561 
562  LWLockAcquire(SInvalReadLock, LW_SHARED);
563 
564  /*
565  * We must reset hasMessages before determining how many messages we're
566  * going to read. That way, if new messages arrive after we have
567  * determined how many we're reading, the flag will get reset and we'll
568  * notice those messages part-way through.
569  *
570  * Note that, if we don't end up reading all of the messages, we had
571  * better be certain to reset this flag before exiting!
572  */
573  stateP->hasMessages = false;
574 
575  /* Fetch current value of maxMsgNum using spinlock */
576  SpinLockAcquire(&segP->msgnumLock);
577  max = segP->maxMsgNum;
578  SpinLockRelease(&segP->msgnumLock);
579 
580  if (stateP->resetState)
581  {
582  /*
583  * Force reset. We can say we have dealt with any messages added
584  * since the reset, as well; and that means we should clear the
585  * signaled flag, too.
586  */
587  stateP->nextMsgNum = max;
588  stateP->resetState = false;
589  stateP->signaled = false;
590  LWLockRelease(SInvalReadLock);
591  return -1;
592  }
593 
594  /*
595  * Retrieve messages and advance backend's counter, until data array is
596  * full or there are no more messages.
597  *
598  * There may be other backends that haven't read the message(s), so we
599  * cannot delete them here. SICleanupQueue() will eventually remove them
600  * from the queue.
601  */
602  n = 0;
603  while (n < datasize && stateP->nextMsgNum < max)
604  {
605  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
606  stateP->nextMsgNum++;
607  }
608 
609  /*
610  * If we have caught up completely, reset our "signaled" flag so that
611  * we'll get another signal if we fall behind again.
612  *
613  * If we haven't caught up completely, reset the hasMessages flag so that
614  * we see the remaining messages next time.
615  */
616  if (stateP->nextMsgNum >= max)
617  stateP->signaled = false;
618  else
619  stateP->hasMessages = true;
620 
621  LWLockRelease(SInvalReadLock);
622  return n;
623 }
624 
625 /*
626  * SICleanupQueue
627  * Remove messages that have been consumed by all active backends
628  *
629  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
630  * minFree is the minimum number of message slots to make free.
631  *
632  * Possible side effects of this routine include marking one or more
633  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
634  * to some backend that seems to be getting too far behind. We signal at
635  * most one backend at a time, for reasons explained at the top of the file.
636  *
637  * Caution: because we transiently release write lock when we have to signal
638  * some other backend, it is NOT guaranteed that there are still minFree
639  * free message slots at exit. Caller must recheck and perhaps retry.
640  */
641 void
642 SICleanupQueue(bool callerHasWriteLock, int minFree)
643 {
644  SISeg *segP = shmInvalBuffer;
645  int min,
646  minsig,
647  lowbound,
648  numMsgs,
649  i;
650  ProcState *needSig = NULL;
651 
652  /* Lock out all writers and readers */
653  if (!callerHasWriteLock)
654  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
655  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
656 
657  /*
658  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
659  * furthest-back backend that needs signaling (if any), and reset any
660  * backends that are too far back. Note that because we ignore sendOnly
661  * backends here it is possible for them to keep sending messages without
662  * a problem even when they are the only active backend.
663  */
664  min = segP->maxMsgNum;
665  minsig = min - SIG_THRESHOLD;
666  lowbound = min - MAXNUMMESSAGES + minFree;
667 
668  for (i = 0; i < segP->lastBackend; i++)
669  {
670  ProcState *stateP = &segP->procState[i];
671  int n = stateP->nextMsgNum;
672 
673  /* Ignore if inactive or already in reset state */
674  if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
675  continue;
676 
677  /*
678  * If we must free some space and this backend is preventing it, force
679  * him into reset state and then ignore until he catches up.
680  */
681  if (n < lowbound)
682  {
683  stateP->resetState = true;
684  /* no point in signaling him ... */
685  continue;
686  }
687 
688  /* Track the global minimum nextMsgNum */
689  if (n < min)
690  min = n;
691 
692  /* Also see who's furthest back of the unsignaled backends */
693  if (n < minsig && !stateP->signaled)
694  {
695  minsig = n;
696  needSig = stateP;
697  }
698  }
699  segP->minMsgNum = min;
700 
701  /*
702  * When minMsgNum gets really large, decrement all message counters so as
703  * to forestall overflow of the counters. This happens seldom enough that
704  * folding it into the previous loop would be a loser.
705  */
706  if (min >= MSGNUMWRAPAROUND)
707  {
708  segP->minMsgNum -= MSGNUMWRAPAROUND;
709  segP->maxMsgNum -= MSGNUMWRAPAROUND;
710  for (i = 0; i < segP->lastBackend; i++)
711  {
712  /* we don't bother skipping inactive entries here */
714  }
715  }
716 
717  /*
718  * Determine how many messages are still in the queue, and set the
719  * threshold at which we should repeat SICleanupQueue().
720  */
721  numMsgs = segP->maxMsgNum - segP->minMsgNum;
722  if (numMsgs < CLEANUP_MIN)
723  segP->nextThreshold = CLEANUP_MIN;
724  else
725  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
726 
727  /*
728  * Lastly, signal anyone who needs a catchup interrupt. Since
729  * SendProcSignal() might not be fast, we don't want to hold locks while
730  * executing it.
731  */
732  if (needSig)
733  {
734  pid_t his_pid = needSig->procPid;
735  BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
736 
737  needSig->signaled = true;
738  LWLockRelease(SInvalReadLock);
739  LWLockRelease(SInvalWriteLock);
740  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
741  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
742  if (callerHasWriteLock)
743  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
744  }
745  else
746  {
747  LWLockRelease(SInvalReadLock);
748  if (!callerHasWriteLock)
749  LWLockRelease(SInvalWriteLock);
750  }
751 }
752 
753 
754 /*
755  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
756  *
757  * We split VirtualTransactionIds into two parts so that it is possible
758  * to allocate a new one without any contention for shared memory, except
759  * for a bit of additional overhead during backend startup/shutdown.
760  * The high-order part of a VirtualTransactionId is a BackendId, and the
761  * low-order part is a LocalTransactionId, which we assign from a local
762  * counter. To avoid the risk of a VirtualTransactionId being reused
763  * within a short interval, successive procs occupying the same backend ID
764  * slot should use a consecutive sequence of local IDs, which is implemented
765  * by copying nextLocalTransactionId as seen above.
766  */
769 {
770  LocalTransactionId result;
771 
772  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
773  do
774  {
775  result = nextLocalTransactionId++;
776  } while (!LocalTransactionIdIsValid(result));
777 
778  return result;
779 }
int slock_t
Definition: s_lock.h:934
Size SInvalShmemSize(void)
Definition: sinvaladt.c:203
#define CLEANUP_MIN
Definition: sinvaladt.c:132
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
int lastBackend
Definition: sinvaladt.c:175
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
bool signaled
Definition: sinvaladt.c:146
BackendId backendId
Definition: proc.h:113
uint32 TransactionId
Definition: c.h:514
Definition: proc.h:222
void SharedInvalBackendInit(bool sendOnly)
Definition: sinvaladt.c:257
TransactionId xmin
Definition: proc.h:228
PGXACT * allPgXact
Definition: proc.h:249
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:556
TransactionId xid
Definition: proc.h:224
void CreateSharedInvalidationState(void)
Definition: sinvaladt.c:218
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:188
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * proc
Definition: sinvaladt.c:142
slock_t msgnumLock
Definition: sinvaladt.c:178
#define Min(x, y)
Definition: c.h:911
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:335
int errcode(int sqlerrcode)
Definition: elog.c:608
PROC_HDR * ProcGlobal
Definition: proc.c:80
#define DEBUG4
Definition: elog.h:22
#define SIG_THRESHOLD
Definition: sinvaladt.c:134
bool resetState
Definition: sinvaladt.c:145
Definition: type.h:89
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:179
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define FATAL
Definition: elog.h:52
int SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
Definition: sinvaladt.c:538
LocalTransactionId GetNextLocalTransactionId(void)
Definition: sinvaladt.c:768
int MaxBackends
Definition: globals.c:135
bool sendOnly
Definition: sinvaladt.c:155
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
#define InvalidTransactionId
Definition: transam.h:31
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:642
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:183
uint32 LocalTransactionId
Definition: c.h:516
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
#define ereport(elevel, rest)
Definition: elog.h:141
int maxMsgNum
Definition: sinvaladt.c:173
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define SpinLockRelease(lock)
Definition: spin.h:64
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
uintptr_t Datum
Definition: postgres.h:367
void BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
Definition: sinvaladt.c:403
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int BackendId
Definition: backendid.h:21
LocalTransactionId nextLXID
Definition: sinvaladt.c:163
#define Assert(condition)
Definition: c.h:739
struct ProcState ProcState
size_t Size
Definition: c.h:467
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:194
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:191
struct SISeg SISeg
#define DatumGetPointer(X)
Definition: postgres.h:549
int minMsgNum
Definition: sinvaladt.c:172
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
int pgprocno
Definition: proc.h:110
int errmsg(const char *fmt,...)
Definition: elog.c:822
int maxBackends
Definition: sinvaladt.c:176
#define elog(elevel,...)
Definition: elog.h:228
#define InvalidLocalTransactionId
Definition: lock.h:68
int i
int nextThreshold
Definition: sinvaladt.c:174
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:69
void * arg
void SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
Definition: sinvaladt.c:435
bool hasMessages
Definition: sinvaladt.c:147
pid_t procPid
Definition: sinvaladt.c:141
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:226
Definition: proc.h:95
#define PointerIsValid(pointer)
Definition: c.h:633
#define offsetof(type, field)
Definition: c.h:662
int nextMsgNum
Definition: sinvaladt.c:144
PGPROC * BackendIdGetProc(int backendID)
Definition: sinvaladt.c:376