PostgreSQL Source Code  git master
sinval.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * sinval.c
4  * POSTGRES shared cache invalidation communication code.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/storage/ipc/sinval.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/xact.h"
18 #include "commands/async.h"
19 #include "miscadmin.h"
20 #include "storage/ipc.h"
21 #include "storage/proc.h"
22 #include "storage/sinvaladt.h"
23 #include "utils/inval.h"
24 
25 
27 
28 
29 /*
30  * Because backends sitting idle will not be reading sinval events, we
31  * need a way to give an idle backend a swift kick in the rear and make
32  * it catch up before the sinval queue overflows and forces it to go
33  * through a cache reset exercise. This is done by sending
34  * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
35  *
36  * The signal handler will set an interrupt pending flag and will set the
37  * processes latch. Whenever starting to read from the client, or when
38  * interrupted while doing so, ProcessClientReadInterrupt() will call
39  * ProcessCatchupEvent().
40  */
41 volatile sig_atomic_t catchupInterruptPending = false;
42 
43 
44 /*
45  * SendSharedInvalidMessages
46  * Add shared-cache-invalidation message(s) to the global SI message queue.
47  */
48 void
50 {
51  SIInsertDataEntries(msgs, n);
52 }
53 
54 /*
55  * ReceiveSharedInvalidMessages
56  * Process shared-cache-invalidation messages waiting for this backend
57  *
58  * We guarantee to process all messages that had been queued before the
59  * routine was entered. It is of course possible for more messages to get
60  * queued right after our last SIGetDataEntries call.
61  *
62  * NOTE: it is entirely possible for this routine to be invoked recursively
63  * as a consequence of processing inside the invalFunction or resetFunction.
64  * Furthermore, such a recursive call must guarantee that all outstanding
65  * inval messages have been processed before it exits. This is the reason
66  * for the strange-looking choice to use a statically allocated buffer array
67  * and counters; it's so that a recursive call can process messages already
68  * sucked out of sinvaladt.c.
69  */
70 void
72  void (*resetFunction) (void))
73 {
74 #define MAXINVALMSGS 32
75  static SharedInvalidationMessage messages[MAXINVALMSGS];
76 
77  /*
78  * We use volatile here to prevent bugs if a compiler doesn't realize that
79  * recursion is a possibility ...
80  */
81  static volatile int nextmsg = 0;
82  static volatile int nummsgs = 0;
83 
84  /* Deal with any messages still pending from an outer recursion */
85  while (nextmsg < nummsgs)
86  {
87  SharedInvalidationMessage msg = messages[nextmsg++];
88 
90  invalFunction(&msg);
91  }
92 
93  do
94  {
95  int getResult;
96 
97  nextmsg = nummsgs = 0;
98 
99  /* Try to get some more messages */
100  getResult = SIGetDataEntries(messages, MAXINVALMSGS);
101 
102  if (getResult < 0)
103  {
104  /* got a reset message */
105  elog(DEBUG4, "cache state reset");
107  resetFunction();
108  break; /* nothing more to do */
109  }
110 
111  /* Process them, being wary that a recursive call might eat some */
112  nextmsg = 0;
113  nummsgs = getResult;
114 
115  while (nextmsg < nummsgs)
116  {
117  SharedInvalidationMessage msg = messages[nextmsg++];
118 
120  invalFunction(&msg);
121  }
122 
123  /*
124  * We only need to loop if the last SIGetDataEntries call (which might
125  * have been within a recursive call) returned a full buffer.
126  */
127  } while (nummsgs == MAXINVALMSGS);
128 
129  /*
130  * We are now caught up. If we received a catchup signal, reset that
131  * flag, and call SICleanupQueue(). This is not so much because we need
132  * to flush dead messages right now, as that we want to pass on the
133  * catchup signal to the next slowest backend. "Daisy chaining" the
134  * catchup signal this way avoids creating spikes in system load for what
135  * should be just a background maintenance activity.
136  */
138  {
139  catchupInterruptPending = false;
140  elog(DEBUG4, "sinval catchup complete, cleaning queue");
141  SICleanupQueue(false, 0);
142  }
143 }
144 
145 
146 /*
147  * HandleCatchupInterrupt
148  *
149  * This is called when PROCSIG_CATCHUP_INTERRUPT is received.
150  *
151  * We used to directly call ProcessCatchupEvent directly when idle. These days
152  * we just set a flag to do it later and notify the process of that fact by
153  * setting the process's latch.
154  */
155 void
157 {
158  /*
159  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
160  * you do here.
161  */
162 
164 
165  /* make sure the event is processed in due course */
166  SetLatch(MyLatch);
167 }
168 
169 /*
170  * ProcessCatchupInterrupt
171  *
172  * The portion of catchup interrupt handling that runs outside of the signal
173  * handler, which allows it to actually process pending invalidations.
174  */
175 void
177 {
179  {
180  /*
181  * What we need to do here is cause ReceiveSharedInvalidMessages() to
182  * run, which will do the necessary work and also reset the
183  * catchupInterruptPending flag. If we are inside a transaction we
184  * can just call AcceptInvalidationMessages() to do this. If we
185  * aren't, we start and immediately end a transaction; the call to
186  * AcceptInvalidationMessages() happens down inside transaction start.
187  *
188  * It is awfully tempting to just call AcceptInvalidationMessages()
189  * without the rest of the xact start/stop overhead, and I think that
190  * would actually work in the normal case; but I am not sure that
191  * things would clean up nicely if we got an error partway through.
192  */
194  {
195  elog(DEBUG4, "ProcessCatchupEvent inside transaction");
197  }
198  else
199  {
200  elog(DEBUG4, "ProcessCatchupEvent outside transaction");
203  }
204  }
205 }
void ProcessCatchupInterrupt(void)
Definition: sinval.c:176
void AcceptInvalidationMessages(void)
Definition: inval.c:681
void CommitTransactionCommand(void)
Definition: xact.c:2898
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
void SetLatch(Latch *latch)
Definition: latch.c:436
#define DEBUG4
Definition: elog.h:22
int SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
Definition: sinvaladt.c:538
void ReceiveSharedInvalidMessages(void(*invalFunction)(SharedInvalidationMessage *msg), void(*resetFunction)(void))
Definition: sinval.c:71
#define MAXINVALMSGS
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:642
uint64 SharedInvalidMessageCounter
Definition: sinval.c:26
void StartTransactionCommand(void)
Definition: xact.c:2797
void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
Definition: sinval.c:49
#define elog(elevel,...)
Definition: elog.h:228
struct Latch * MyLatch
Definition: globals.c:54
void SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
Definition: sinvaladt.c:435
volatile sig_atomic_t catchupInterruptPending
Definition: sinval.c:41
void HandleCatchupInterrupt(void)
Definition: sinval.c:156