PostgreSQL Source Code  git master
sinval.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * sinval.c
4  * POSTGRES shared cache invalidation communication code.
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/storage/ipc/sinval.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/xact.h"
18 #include "miscadmin.h"
19 #include "storage/latch.h"
20 #include "storage/sinvaladt.h"
21 #include "utils/inval.h"
22 
23 
25 
26 
27 /*
28  * Because backends sitting idle will not be reading sinval events, we
29  * need a way to give an idle backend a swift kick in the rear and make
30  * it catch up before the sinval queue overflows and forces it to go
31  * through a cache reset exercise. This is done by sending
32  * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
33  *
34  * The signal handler will set an interrupt pending flag and will set the
35  * processes latch. Whenever starting to read from the client, or when
36  * interrupted while doing so, ProcessClientReadInterrupt() will call
37  * ProcessCatchupEvent().
38  */
39 volatile sig_atomic_t catchupInterruptPending = false;
40 
41 
42 /*
43  * SendSharedInvalidMessages
44  * Add shared-cache-invalidation message(s) to the global SI message queue.
45  */
46 void
48 {
49  SIInsertDataEntries(msgs, n);
50 }
51 
52 /*
53  * ReceiveSharedInvalidMessages
54  * Process shared-cache-invalidation messages waiting for this backend
55  *
56  * We guarantee to process all messages that had been queued before the
57  * routine was entered. It is of course possible for more messages to get
58  * queued right after our last SIGetDataEntries call.
59  *
60  * NOTE: it is entirely possible for this routine to be invoked recursively
61  * as a consequence of processing inside the invalFunction or resetFunction.
62  * Furthermore, such a recursive call must guarantee that all outstanding
63  * inval messages have been processed before it exits. This is the reason
64  * for the strange-looking choice to use a statically allocated buffer array
65  * and counters; it's so that a recursive call can process messages already
66  * sucked out of sinvaladt.c.
67  */
68 void
70  void (*resetFunction) (void))
71 {
72 #define MAXINVALMSGS 32
73  static SharedInvalidationMessage messages[MAXINVALMSGS];
74 
75  /*
76  * We use volatile here to prevent bugs if a compiler doesn't realize that
77  * recursion is a possibility ...
78  */
79  static volatile int nextmsg = 0;
80  static volatile int nummsgs = 0;
81 
82  /* Deal with any messages still pending from an outer recursion */
83  while (nextmsg < nummsgs)
84  {
85  SharedInvalidationMessage msg = messages[nextmsg++];
86 
88  invalFunction(&msg);
89  }
90 
91  do
92  {
93  int getResult;
94 
95  nextmsg = nummsgs = 0;
96 
97  /* Try to get some more messages */
98  getResult = SIGetDataEntries(messages, MAXINVALMSGS);
99 
100  if (getResult < 0)
101  {
102  /* got a reset message */
103  elog(DEBUG4, "cache state reset");
105  resetFunction();
106  break; /* nothing more to do */
107  }
108 
109  /* Process them, being wary that a recursive call might eat some */
110  nextmsg = 0;
111  nummsgs = getResult;
112 
113  while (nextmsg < nummsgs)
114  {
115  SharedInvalidationMessage msg = messages[nextmsg++];
116 
118  invalFunction(&msg);
119  }
120 
121  /*
122  * We only need to loop if the last SIGetDataEntries call (which might
123  * have been within a recursive call) returned a full buffer.
124  */
125  } while (nummsgs == MAXINVALMSGS);
126 
127  /*
128  * We are now caught up. If we received a catchup signal, reset that
129  * flag, and call SICleanupQueue(). This is not so much because we need
130  * to flush dead messages right now, as that we want to pass on the
131  * catchup signal to the next slowest backend. "Daisy chaining" the
132  * catchup signal this way avoids creating spikes in system load for what
133  * should be just a background maintenance activity.
134  */
136  {
137  catchupInterruptPending = false;
138  elog(DEBUG4, "sinval catchup complete, cleaning queue");
139  SICleanupQueue(false, 0);
140  }
141 }
142 
143 
144 /*
145  * HandleCatchupInterrupt
146  *
147  * This is called when PROCSIG_CATCHUP_INTERRUPT is received.
148  *
149  * We used to directly call ProcessCatchupEvent directly when idle. These days
150  * we just set a flag to do it later and notify the process of that fact by
151  * setting the process's latch.
152  */
153 void
155 {
156  /*
157  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
158  * you do here.
159  */
160 
162 
163  /* make sure the event is processed in due course */
164  SetLatch(MyLatch);
165 }
166 
167 /*
168  * ProcessCatchupInterrupt
169  *
170  * The portion of catchup interrupt handling that runs outside of the signal
171  * handler, which allows it to actually process pending invalidations.
172  */
173 void
175 {
177  {
178  /*
179  * What we need to do here is cause ReceiveSharedInvalidMessages() to
180  * run, which will do the necessary work and also reset the
181  * catchupInterruptPending flag. If we are inside a transaction we
182  * can just call AcceptInvalidationMessages() to do this. If we
183  * aren't, we start and immediately end a transaction; the call to
184  * AcceptInvalidationMessages() happens down inside transaction start.
185  *
186  * It is awfully tempting to just call AcceptInvalidationMessages()
187  * without the rest of the xact start/stop overhead, and I think that
188  * would actually work in the normal case; but I am not sure that
189  * things would clean up nicely if we got an error partway through.
190  */
192  {
193  elog(DEBUG4, "ProcessCatchupEvent inside transaction");
195  }
196  else
197  {
198  elog(DEBUG4, "ProcessCatchupEvent outside transaction");
201  }
202  }
203 }
#define elog(elevel,...)
Definition: elog.h:224
#define DEBUG4
Definition: elog.h:27
struct Latch * MyLatch
Definition: globals.c:60
void AcceptInvalidationMessages(void)
Definition: inval.c:806
void SetLatch(Latch *latch)
Definition: latch.c:632
void HandleCatchupInterrupt(void)
Definition: sinval.c:154
void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
Definition: sinval.c:47
void ReceiveSharedInvalidMessages(void(*invalFunction)(SharedInvalidationMessage *msg), void(*resetFunction)(void))
Definition: sinval.c:69
void ProcessCatchupInterrupt(void)
Definition: sinval.c:174
volatile sig_atomic_t catchupInterruptPending
Definition: sinval.c:39
uint64 SharedInvalidMessageCounter
Definition: sinval.c:24
#define MAXINVALMSGS
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:577
int SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
Definition: sinvaladt.c:473
void SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
Definition: sinvaladt.c:370
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4910
void StartTransactionCommand(void)
Definition: xact.c:2955
void CommitTransactionCommand(void)
Definition: xact.c:3053