PostgreSQL Source Code  git master
sinvaladt.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  * POSTGRES shared cache invalidation data manager.
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/storage/ipc/sinvaladt.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <signal.h>
18 #include <unistd.h>
19 
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "storage/ipc.h"
23 #include "storage/proc.h"
24 #include "storage/procnumber.h"
25 #include "storage/procsignal.h"
26 #include "storage/shmem.h"
27 #include "storage/sinvaladt.h"
28 #include "storage/spin.h"
29 
30 /*
31  * Conceptually, the shared cache invalidation messages are stored in an
32  * infinite array, where maxMsgNum is the next array subscript to store a
33  * submitted message in, minMsgNum is the smallest array subscript containing
34  * a message not yet read by all backends, and we always have maxMsgNum >=
35  * minMsgNum. (They are equal when there are no messages pending.) For each
36  * active backend, there is a nextMsgNum pointer indicating the next message it
37  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
38  * backend.
39  *
40  * (In the current implementation, minMsgNum is a lower bound for the
41  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
42  * smallest nextMsgNum --- it may lag behind. We only update it when
43  * SICleanupQueue is called, and we try not to do that often.)
44  *
45  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
46  * entries. We translate MsgNum values into circular-buffer indexes by
47  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
48  * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
49  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
50  * in the buffer. If the buffer does overflow, we recover by setting the
51  * "reset" flag for each backend that has fallen too far behind. A backend
52  * that is in "reset" state is ignored while determining minMsgNum. When
53  * it does finally attempt to receive inval messages, it must discard all
54  * its invalidatable state, since it won't know what it missed.
55  *
56  * To reduce the probability of needing resets, we send a "catchup" interrupt
57  * to any backend that seems to be falling unreasonably far behind. The
58  * normal behavior is that at most one such interrupt is in flight at a time;
59  * when a backend completes processing a catchup interrupt, it executes
60  * SICleanupQueue, which will signal the next-furthest-behind backend if
61  * needed. This avoids undue contention from multiple backends all trying
62  * to catch up at once. However, the furthest-back backend might be stuck
63  * in a state where it can't catch up. Eventually it will get reset, so it
64  * won't cause any more problems for anyone but itself. But we don't want
65  * to find that a bunch of other backends are now too close to the reset
66  * threshold to be saved. So SICleanupQueue is designed to occasionally
67  * send extra catchup interrupts as the queue gets fuller, to backends that
68  * are far behind and haven't gotten one yet. As long as there aren't a lot
69  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
70  * that aren't stuck will propagate their interrupts to the next guy.
71  *
72  * We would have problems if the MsgNum values overflow an integer, so
73  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
74  * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
75  * large so that we don't need to do this often. It must be a multiple of
76  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
77  * to be moved when we do it.
78  *
79  * Access to the shared sinval array is protected by two locks, SInvalReadLock
80  * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
81  * authorizes them to modify their own ProcState but not to modify or even
82  * look at anyone else's. When we need to perform array-wide updates,
83  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
84  * lock out all readers. Writers take SInvalWriteLock (always in exclusive
85  * mode) to serialize adding messages to the queue. Note that a writer
86  * can operate in parallel with one or more readers, because the writer
87  * has no need to touch anyone's ProcState, except in the infrequent cases
88  * when SICleanupQueue is needed. The only point of overlap is that
89  * the writer wants to change maxMsgNum while readers need to read it.
90  * We deal with that by having a spinlock that readers must take for just
91  * long enough to read maxMsgNum, while writers take it for just long enough
92  * to write maxMsgNum. (The exact rule is that you need the spinlock to
93  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
94  * spinlock to write maxMsgNum unless you are holding both locks.)
95  *
96  * Note: since maxMsgNum is an int and hence presumably atomically readable/
97  * writable, the spinlock might seem unnecessary. The reason it is needed
98  * is to provide a memory barrier: we need to be sure that messages written
99  * to the array are actually there before maxMsgNum is increased, and that
100  * readers will see that data after fetching maxMsgNum. Multiprocessors
101  * that have weak memory-ordering guarantees can fail without the memory
102  * barrier instructions that are included in the spinlock sequences.
103  */
104 
105 
106 /*
107  * Configurable parameters.
108  *
109  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
110  * Must be a power of 2 for speed.
111  *
112  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
113  * Must be a multiple of MAXNUMMESSAGES. Should be large.
114  *
115  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
116  * before we bother to call SICleanupQueue.
117  *
118  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
119  * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120  *
121  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
122  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
123  *
124  * WRITE_QUANTUM: the max number of messages to push into the buffer per
125  * iteration of SIInsertDataEntries. Noncritical but should be less than
126  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
127  * per iteration.
128  */
129 
130 #define MAXNUMMESSAGES 4096
131 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
132 #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
133 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
134 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
135 #define WRITE_QUANTUM 64
136 
137 /* Per-backend state in shared invalidation structure */
138 typedef struct ProcState
139 {
140  /* procPid is zero in an inactive ProcState array entry. */
141  pid_t procPid; /* PID of backend, for signaling */
142  /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
143  int nextMsgNum; /* next message number to read */
144  bool resetState; /* backend needs to reset its state */
145  bool signaled; /* backend has been sent catchup signal */
146  bool hasMessages; /* backend has unread messages */
147 
148  /*
149  * Backend only sends invalidations, never receives them. This only makes
150  * sense for Startup process during recovery because it doesn't maintain a
151  * relcache, yet it fires inval messages to allow query backends to see
152  * schema changes.
153  */
154  bool sendOnly; /* backend only sends, never receives */
155 
156  /*
157  * Next LocalTransactionId to use for each idle backend slot. We keep
158  * this here because it is indexed by ProcNumber and it is convenient to
159  * copy the value to and from local memory when MyProcNumber is set. It's
160  * meaningless in an active ProcState entry.
161  */
164 
165 /* Shared cache invalidation memory segment */
166 typedef struct SISeg
167 {
168  /*
169  * General state information
170  */
171  int minMsgNum; /* oldest message still needed */
172  int maxMsgNum; /* next message number to be assigned */
173  int nextThreshold; /* # of messages to call SICleanupQueue */
174 
175  slock_t msgnumLock; /* spinlock protecting maxMsgNum */
176 
177  /*
178  * Circular buffer holding shared-inval messages
179  */
181 
182  /*
183  * Per-backend invalidation state info.
184  *
185  * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
186  * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
187  * a dense array of their indexes, to speed up scanning all in-use slots.
188  *
189  * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
190  * having our separate copy avoids contention on ProcArrayLock, and allows
191  * us to track only the processes that participate in shared cache
192  * invalidations.
193  */
194  int numProcs;
195  int *pgprocnos;
198 
199 /*
200  * We reserve a slot for each possible ProcNumber, plus one for each
201  * possible auxiliary process type. (This scheme assumes there is not
202  * more than one of any auxiliary process type at a time.)
203  */
204 #define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
205 
206 static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
207 
208 
210 
211 static void CleanupInvalidationState(int status, Datum arg);
212 
213 
214 /*
215  * SInvalShmemSize --- return shared-memory space needed
216  */
217 Size
219 {
220  Size size;
221 
222  size = offsetof(SISeg, procState);
223  size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
224  size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
225 
226  return size;
227 }
228 
229 /*
230  * CreateSharedInvalidationState
231  * Create and initialize the SI message buffer
232  */
233 void
235 {
236  int i;
237  bool found;
238 
239  /* Allocate space in shared memory */
240  shmInvalBuffer = (SISeg *)
241  ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
242  if (found)
243  return;
244 
245  /* Clear message counters, save size of procState array, init spinlock */
250 
251  /* The buffer[] array is initially all unused, so we need not fill it */
252 
253  /* Mark all backends inactive, and initialize nextLXID */
254  for (i = 0; i < NumProcStateSlots; i++)
255  {
256  shmInvalBuffer->procState[i].procPid = 0; /* inactive */
257  shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
262  }
265 }
266 
267 /*
268  * SharedInvalBackendInit
269  * Initialize a new backend to operate on the sinval buffer
270  */
271 void
273 {
274  ProcState *stateP;
275  pid_t oldPid;
276  SISeg *segP = shmInvalBuffer;
277 
278  if (MyProcNumber < 0)
279  elog(ERROR, "MyProcNumber not set");
281  elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
283  stateP = &segP->procState[MyProcNumber];
284 
285  /*
286  * This can run in parallel with read operations, but not with write
287  * operations, since SIInsertDataEntries relies on the pgprocnos array to
288  * set hasMessages appropriately.
289  */
290  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
291 
292  oldPid = stateP->procPid;
293  if (oldPid != 0)
294  {
295  LWLockRelease(SInvalWriteLock);
296  elog(ERROR, "sinval slot for backend %d is already in use by process %d",
297  MyProcNumber, (int) oldPid);
298  }
299 
301 
302  /* Fetch next local transaction ID into local memory */
304 
305  /* mark myself active, with all extant messages already read */
306  stateP->procPid = MyProcPid;
307  stateP->nextMsgNum = segP->maxMsgNum;
308  stateP->resetState = false;
309  stateP->signaled = false;
310  stateP->hasMessages = false;
311  stateP->sendOnly = sendOnly;
312 
313  LWLockRelease(SInvalWriteLock);
314 
315  /* register exit routine to mark my entry inactive at exit */
317 }
318 
319 /*
320  * CleanupInvalidationState
321  * Mark the current backend as no longer active.
322  *
323  * This function is called via on_shmem_exit() during backend shutdown.
324  *
325  * arg is really of type "SISeg*".
326  */
327 static void
329 {
330  SISeg *segP = (SISeg *) DatumGetPointer(arg);
331  ProcState *stateP;
332  int i;
333 
334  Assert(PointerIsValid(segP));
335 
336  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
337 
338  stateP = &segP->procState[MyProcNumber];
339 
340  /* Update next local transaction ID for next holder of this proc number */
342 
343  /* Mark myself inactive */
344  stateP->procPid = 0;
345  stateP->nextMsgNum = 0;
346  stateP->resetState = false;
347  stateP->signaled = false;
348 
349  for (i = segP->numProcs - 1; i >= 0; i--)
350  {
351  if (segP->pgprocnos[i] == MyProcNumber)
352  {
353  if (i != segP->numProcs - 1)
354  segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
355  break;
356  }
357  }
358  if (i < 0)
359  elog(PANIC, "could not find entry in sinval array");
360  segP->numProcs--;
361 
362  LWLockRelease(SInvalWriteLock);
363 }
364 
365 /*
366  * SIInsertDataEntries
367  * Add new invalidation message(s) to the buffer.
368  */
369 void
371 {
372  SISeg *segP = shmInvalBuffer;
373 
374  /*
375  * N can be arbitrarily large. We divide the work into groups of no more
376  * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
377  * an unreasonably long time. (This is not so much because we care about
378  * letting in other writers, as that some just-caught-up backend might be
379  * trying to do SICleanupQueue to pass on its signal, and we don't want it
380  * to have to wait a long time.) Also, we need to consider calling
381  * SICleanupQueue every so often.
382  */
383  while (n > 0)
384  {
385  int nthistime = Min(n, WRITE_QUANTUM);
386  int numMsgs;
387  int max;
388  int i;
389 
390  n -= nthistime;
391 
392  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
393 
394  /*
395  * If the buffer is full, we *must* acquire some space. Clean the
396  * queue and reset anyone who is preventing space from being freed.
397  * Otherwise, clean the queue only when it's exceeded the next
398  * fullness threshold. We have to loop and recheck the buffer state
399  * after any call of SICleanupQueue.
400  */
401  for (;;)
402  {
403  numMsgs = segP->maxMsgNum - segP->minMsgNum;
404  if (numMsgs + nthistime > MAXNUMMESSAGES ||
405  numMsgs >= segP->nextThreshold)
406  SICleanupQueue(true, nthistime);
407  else
408  break;
409  }
410 
411  /*
412  * Insert new message(s) into proper slot of circular buffer
413  */
414  max = segP->maxMsgNum;
415  while (nthistime-- > 0)
416  {
417  segP->buffer[max % MAXNUMMESSAGES] = *data++;
418  max++;
419  }
420 
421  /* Update current value of maxMsgNum using spinlock */
422  SpinLockAcquire(&segP->msgnumLock);
423  segP->maxMsgNum = max;
424  SpinLockRelease(&segP->msgnumLock);
425 
426  /*
427  * Now that the maxMsgNum change is globally visible, we give everyone
428  * a swift kick to make sure they read the newly added messages.
429  * Releasing SInvalWriteLock will enforce a full memory barrier, so
430  * these (unlocked) changes will be committed to memory before we exit
431  * the function.
432  */
433  for (i = 0; i < segP->numProcs; i++)
434  {
435  ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
436 
437  stateP->hasMessages = true;
438  }
439 
440  LWLockRelease(SInvalWriteLock);
441  }
442 }
443 
444 /*
445  * SIGetDataEntries
446  * get next SI message(s) for current backend, if there are any
447  *
448  * Possible return values:
449  * 0: no SI message available
450  * n>0: next n SI messages have been extracted into data[]
451  * -1: SI reset message extracted
452  *
453  * If the return value is less than the array size "datasize", the caller
454  * can assume that there are no more SI messages after the one(s) returned.
455  * Otherwise, another call is needed to collect more messages.
456  *
457  * NB: this can run in parallel with other instances of SIGetDataEntries
458  * executing on behalf of other backends, since each instance will modify only
459  * fields of its own backend's ProcState, and no instance will look at fields
460  * of other backends' ProcStates. We express this by grabbing SInvalReadLock
461  * in shared mode. Note that this is not exactly the normal (read-only)
462  * interpretation of a shared lock! Look closely at the interactions before
463  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
464  *
465  * NB: this can also run in parallel with SIInsertDataEntries. It is not
466  * guaranteed that we will return any messages added after the routine is
467  * entered.
468  *
469  * Note: we assume that "datasize" is not so large that it might be important
470  * to break our hold on SInvalReadLock into segments.
471  */
472 int
474 {
475  SISeg *segP;
476  ProcState *stateP;
477  int max;
478  int n;
479 
480  segP = shmInvalBuffer;
481  stateP = &segP->procState[MyProcNumber];
482 
483  /*
484  * Before starting to take locks, do a quick, unlocked test to see whether
485  * there can possibly be anything to read. On a multiprocessor system,
486  * it's possible that this load could migrate backwards and occur before
487  * we actually enter this function, so we might miss a sinval message that
488  * was just added by some other processor. But they can't migrate
489  * backwards over a preceding lock acquisition, so it should be OK. If we
490  * haven't acquired a lock preventing against further relevant
491  * invalidations, any such occurrence is not much different than if the
492  * invalidation had arrived slightly later in the first place.
493  */
494  if (!stateP->hasMessages)
495  return 0;
496 
497  LWLockAcquire(SInvalReadLock, LW_SHARED);
498 
499  /*
500  * We must reset hasMessages before determining how many messages we're
501  * going to read. That way, if new messages arrive after we have
502  * determined how many we're reading, the flag will get reset and we'll
503  * notice those messages part-way through.
504  *
505  * Note that, if we don't end up reading all of the messages, we had
506  * better be certain to reset this flag before exiting!
507  */
508  stateP->hasMessages = false;
509 
510  /* Fetch current value of maxMsgNum using spinlock */
511  SpinLockAcquire(&segP->msgnumLock);
512  max = segP->maxMsgNum;
513  SpinLockRelease(&segP->msgnumLock);
514 
515  if (stateP->resetState)
516  {
517  /*
518  * Force reset. We can say we have dealt with any messages added
519  * since the reset, as well; and that means we should clear the
520  * signaled flag, too.
521  */
522  stateP->nextMsgNum = max;
523  stateP->resetState = false;
524  stateP->signaled = false;
525  LWLockRelease(SInvalReadLock);
526  return -1;
527  }
528 
529  /*
530  * Retrieve messages and advance backend's counter, until data array is
531  * full or there are no more messages.
532  *
533  * There may be other backends that haven't read the message(s), so we
534  * cannot delete them here. SICleanupQueue() will eventually remove them
535  * from the queue.
536  */
537  n = 0;
538  while (n < datasize && stateP->nextMsgNum < max)
539  {
540  data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
541  stateP->nextMsgNum++;
542  }
543 
544  /*
545  * If we have caught up completely, reset our "signaled" flag so that
546  * we'll get another signal if we fall behind again.
547  *
548  * If we haven't caught up completely, reset the hasMessages flag so that
549  * we see the remaining messages next time.
550  */
551  if (stateP->nextMsgNum >= max)
552  stateP->signaled = false;
553  else
554  stateP->hasMessages = true;
555 
556  LWLockRelease(SInvalReadLock);
557  return n;
558 }
559 
560 /*
561  * SICleanupQueue
562  * Remove messages that have been consumed by all active backends
563  *
564  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
565  * minFree is the minimum number of message slots to make free.
566  *
567  * Possible side effects of this routine include marking one or more
568  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
569  * to some backend that seems to be getting too far behind. We signal at
570  * most one backend at a time, for reasons explained at the top of the file.
571  *
572  * Caution: because we transiently release write lock when we have to signal
573  * some other backend, it is NOT guaranteed that there are still minFree
574  * free message slots at exit. Caller must recheck and perhaps retry.
575  */
576 void
577 SICleanupQueue(bool callerHasWriteLock, int minFree)
578 {
579  SISeg *segP = shmInvalBuffer;
580  int min,
581  minsig,
582  lowbound,
583  numMsgs,
584  i;
585  ProcState *needSig = NULL;
586 
587  /* Lock out all writers and readers */
588  if (!callerHasWriteLock)
589  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
590  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
591 
592  /*
593  * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
594  * furthest-back backend that needs signaling (if any), and reset any
595  * backends that are too far back. Note that because we ignore sendOnly
596  * backends here it is possible for them to keep sending messages without
597  * a problem even when they are the only active backend.
598  */
599  min = segP->maxMsgNum;
600  minsig = min - SIG_THRESHOLD;
601  lowbound = min - MAXNUMMESSAGES + minFree;
602 
603  for (i = 0; i < segP->numProcs; i++)
604  {
605  ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
606  int n = stateP->nextMsgNum;
607 
608  /* Ignore if already in reset state */
609  Assert(stateP->procPid != 0);
610  if (stateP->resetState || stateP->sendOnly)
611  continue;
612 
613  /*
614  * If we must free some space and this backend is preventing it, force
615  * him into reset state and then ignore until he catches up.
616  */
617  if (n < lowbound)
618  {
619  stateP->resetState = true;
620  /* no point in signaling him ... */
621  continue;
622  }
623 
624  /* Track the global minimum nextMsgNum */
625  if (n < min)
626  min = n;
627 
628  /* Also see who's furthest back of the unsignaled backends */
629  if (n < minsig && !stateP->signaled)
630  {
631  minsig = n;
632  needSig = stateP;
633  }
634  }
635  segP->minMsgNum = min;
636 
637  /*
638  * When minMsgNum gets really large, decrement all message counters so as
639  * to forestall overflow of the counters. This happens seldom enough that
640  * folding it into the previous loop would be a loser.
641  */
642  if (min >= MSGNUMWRAPAROUND)
643  {
644  segP->minMsgNum -= MSGNUMWRAPAROUND;
645  segP->maxMsgNum -= MSGNUMWRAPAROUND;
646  for (i = 0; i < segP->numProcs; i++)
648  }
649 
650  /*
651  * Determine how many messages are still in the queue, and set the
652  * threshold at which we should repeat SICleanupQueue().
653  */
654  numMsgs = segP->maxMsgNum - segP->minMsgNum;
655  if (numMsgs < CLEANUP_MIN)
656  segP->nextThreshold = CLEANUP_MIN;
657  else
658  segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
659 
660  /*
661  * Lastly, signal anyone who needs a catchup interrupt. Since
662  * SendProcSignal() might not be fast, we don't want to hold locks while
663  * executing it.
664  */
665  if (needSig)
666  {
667  pid_t his_pid = needSig->procPid;
668  ProcNumber his_procNumber = (needSig - &segP->procState[0]);
669 
670  needSig->signaled = true;
671  LWLockRelease(SInvalReadLock);
672  LWLockRelease(SInvalWriteLock);
673  elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
674  SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
675  if (callerHasWriteLock)
676  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
677  }
678  else
679  {
680  LWLockRelease(SInvalReadLock);
681  if (!callerHasWriteLock)
682  LWLockRelease(SInvalWriteLock);
683  }
684 }
685 
686 
687 /*
688  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
689  *
690  * We split VirtualTransactionIds into two parts so that it is possible
691  * to allocate a new one without any contention for shared memory, except
692  * for a bit of additional overhead during backend startup/shutdown.
693  * The high-order part of a VirtualTransactionId is a ProcNumber, and the
694  * low-order part is a LocalTransactionId, which we assign from a local
695  * counter. To avoid the risk of a VirtualTransactionId being reused
696  * within a short interval, successive procs occupying the same PGPROC slot
697  * should use a consecutive sequence of local IDs, which is implemented
698  * by copying nextLocalTransactionId as seen above.
699  */
702 {
703  LocalTransactionId result;
704 
705  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
706  do
707  {
708  result = nextLocalTransactionId++;
709  } while (!LocalTransactionIdIsValid(result));
710 
711  return result;
712 }
#define Min(x, y)
Definition: c.h:1004
#define Assert(condition)
Definition: c.h:858
#define PointerIsValid(pointer)
Definition: c.h:763
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
uint32 LocalTransactionId
Definition: c.h:654
size_t Size
Definition: c.h:605
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define DEBUG4
Definition: elog.h:27
int MyProcPid
Definition: globals.c:45
ProcNumber MyProcNumber
Definition: globals.c:87
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:73
#define InvalidLocalTransactionId
Definition: lock.h:65
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:66
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * arg
const void * data
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
int ProcNumber
Definition: procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:257
@ PROCSIG_CATCHUP_INTERRUPT
Definition: procsignal.h:32
int slock_t
Definition: s_lock.h:735
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:133
struct ProcState ProcState
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:206
#define WRITE_QUANTUM
Definition: sinvaladt.c:135
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:577
#define NumProcStateSlots
Definition: sinvaladt.c:204
#define MAXNUMMESSAGES
Definition: sinvaladt.c:130
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:131
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:328
Size SInvalShmemSize(void)
Definition: sinvaladt.c:218
#define SIG_THRESHOLD
Definition: sinvaladt.c:134
void SharedInvalBackendInit(bool sendOnly)
Definition: sinvaladt.c:272
int SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
Definition: sinvaladt.c:473
struct SISeg SISeg
void CreateSharedInvalidationState(void)
Definition: sinvaladt.c:234
void SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
Definition: sinvaladt.c:370
LocalTransactionId GetNextLocalTransactionId(void)
Definition: sinvaladt.c:701
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:209
#define CLEANUP_MIN
Definition: sinvaladt.c:132
static pg_noinline void Size size
Definition: slab.c:607
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
int nextMsgNum
Definition: sinvaladt.c:143
bool signaled
Definition: sinvaladt.c:145
LocalTransactionId nextLXID
Definition: sinvaladt.c:162
pid_t procPid
Definition: sinvaladt.c:141
bool hasMessages
Definition: sinvaladt.c:146
bool sendOnly
Definition: sinvaladt.c:154
bool resetState
Definition: sinvaladt.c:144
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:180
int minMsgNum
Definition: sinvaladt.c:171
int maxMsgNum
Definition: sinvaladt.c:172
int * pgprocnos
Definition: sinvaladt.c:195
slock_t msgnumLock
Definition: sinvaladt.c:175
int nextThreshold
Definition: sinvaladt.c:173
int numProcs
Definition: sinvaladt.c:194
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:196