PostgreSQL Source Code  git master
async.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/commands/async.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 /*-------------------------------------------------------------------------
16  * Async Notification Model as of 9.0:
17  *
18  * 1. Multiple backends on same machine. Multiple backends listening on
19  * several channels. (Channels are also called "conditions" in other
20  * parts of the code.)
21  *
22  * 2. There is one central queue in disk-based storage (directory pg_notify/),
23  * with actively-used pages mapped into shared memory by the slru.c module.
24  * All notification messages are placed in the queue and later read out
25  * by listening backends.
26  *
27  * There is no central knowledge of which backend listens on which channel;
28  * every backend has its own list of interesting channels.
29  *
30  * Although there is only one queue, notifications are treated as being
31  * database-local; this is done by including the sender's database OID
32  * in each notification message. Listening backends ignore messages
33  * that don't match their database OID. This is important because it
34  * ensures senders and receivers have the same database encoding and won't
35  * misinterpret non-ASCII text in the channel name or payload string.
36  *
37  * Since notifications are not expected to survive database crashes,
38  * we can simply clean out the pg_notify data at any reboot, and there
39  * is no need for WAL support or fsync'ing.
40  *
41  * 3. Every backend that is listening on at least one channel registers by
42  * entering its PID into the array in AsyncQueueControl. It then scans all
43  * incoming notifications in the central queue and first compares the
44  * database OID of the notification with its own database OID and then
45  * compares the notified channel with the list of channels that it listens
46  * to. In case there is a match it delivers the notification event to its
47  * frontend. Non-matching events are simply skipped.
48  *
49  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
50  * a backend-local list which will not be processed until transaction end.
51  *
52  * Duplicate notifications from the same transaction are sent out as one
53  * notification only. This is done to save work when for example a trigger
54  * on a 2 million row table fires a notification for each row that has been
55  * changed. If the application needs to receive every single notification
56  * that has been sent, it can easily add some unique string into the extra
57  * payload parameter.
58  *
59  * When the transaction is ready to commit, PreCommit_Notify() adds the
60  * pending notifications to the head of the queue. The head pointer of the
61  * queue always points to the next free position and a position is just a
62  * page number and the offset in that page. This is done before marking the
63  * transaction as committed in clog. If we run into problems writing the
64  * notifications, we can still call elog(ERROR, ...) and the transaction
65  * will roll back.
66  *
67  * Once we have put all of the notifications into the queue, we return to
68  * CommitTransaction() which will then do the actual transaction commit.
69  *
70  * After commit we are called another time (AtCommit_Notify()). Here we
71  * make any actual updates to the effective listen state (listenChannels).
72  * Then we signal any backends that may be interested in our messages
73  * (including our own backend, if listening). This is done by
74  * SignalBackends(), which scans the list of listening backends and sends a
75  * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
76  * know which backend is listening on which channel so we must signal them
77  * all). We can exclude backends that are already up to date, though, and
78  * we can also exclude backends that are in other databases (unless they
79  * are way behind and should be kicked to make them advance their
80  * pointers).
81  *
82  * Finally, after we are out of the transaction altogether and about to go
83  * idle, we scan the queue for messages that need to be sent to our
84  * frontend (which might be notifies from other backends, or self-notifies
85  * from our own). This step is not part of the CommitTransaction sequence
86  * for two important reasons. First, we could get errors while sending
87  * data to our frontend, and it's really bad for errors to happen in
88  * post-commit cleanup. Second, in cases where a procedure issues commits
89  * within a single frontend command, we don't want to send notifies to our
90  * frontend until the command is done; but notifies to other backends
91  * should go out immediately after each commit.
92  *
93  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
94  * sets the process's latch, which triggers the event to be processed
95  * immediately if this backend is idle (i.e., it is waiting for a frontend
96  * command and is not within a transaction block. C.f.
97  * ProcessClientReadInterrupt()). Otherwise the handler may only set a
98  * flag, which will cause the processing to occur just before we next go
99  * idle.
100  *
101  * Inbound-notify processing consists of reading all of the notifications
102  * that have arrived since scanning last time. We read every notification
103  * until we reach either a notification from an uncommitted transaction or
104  * the head pointer's position.
105  *
106  * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
107  * pointer needs to be advanced so that old pages can be truncated.
108  * This is relatively expensive (notably, it requires an exclusive lock),
109  * so we don't want to do it often. We make sending backends do this work
110  * if they advanced the queue head into a new page, but only once every
111  * QUEUE_CLEANUP_DELAY pages.
112  *
113  * An application that listens on the same channel it notifies will get
114  * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
115  * by comparing be_pid in the NOTIFY message to the application's own backend's
116  * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
117  * frontend during startup.) The above design guarantees that notifies from
118  * other backends will never be missed by ignoring self-notifies.
119  *
120  * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
121  * can be varied without affecting anything but performance. The maximum
122  * amount of notification data that can be queued at one time is determined
123  * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
124  *-------------------------------------------------------------------------
125  */
126 
127 #include "postgres.h"
128 
129 #include <limits.h>
130 #include <unistd.h>
131 #include <signal.h>
132 
133 #include "access/parallel.h"
134 #include "access/slru.h"
135 #include "access/transam.h"
136 #include "access/xact.h"
137 #include "catalog/pg_database.h"
138 #include "commands/async.h"
139 #include "common/hashfn.h"
140 #include "funcapi.h"
141 #include "libpq/libpq.h"
142 #include "libpq/pqformat.h"
143 #include "miscadmin.h"
144 #include "storage/ipc.h"
145 #include "storage/lmgr.h"
146 #include "storage/proc.h"
147 #include "storage/procarray.h"
148 #include "storage/procsignal.h"
149 #include "storage/sinval.h"
150 #include "tcop/tcopprot.h"
151 #include "utils/builtins.h"
152 #include "utils/memutils.h"
153 #include "utils/ps_status.h"
154 #include "utils/snapmgr.h"
155 #include "utils/timestamp.h"
156 
157 
158 /*
159  * Maximum size of a NOTIFY payload, including terminating NULL. This
160  * must be kept small enough so that a notification message fits on one
161  * SLRU page. The magic fudge factor here is noncritical as long as it's
162  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
163  * than that, so changes in that data structure won't affect user-visible
164  * restrictions.
165  */
166 #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
167 
168 /*
169  * Struct representing an entry in the global notify queue
170  *
171  * This struct declaration has the maximal length, but in a real queue entry
172  * the data area is only big enough for the actual channel and payload strings
173  * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
174  * entry size, if both channel and payload strings are empty (but note it
175  * doesn't include alignment padding).
176  *
177  * The "length" field should always be rounded up to the next QUEUEALIGN
178  * multiple so that all fields are properly aligned.
179  */
180 typedef struct AsyncQueueEntry
181 {
182  int length; /* total allocated length of entry */
183  Oid dboid; /* sender's database OID */
184  TransactionId xid; /* sender's XID */
185  int32 srcPid; /* sender's PID */
188 
189 /* Currently, no field of AsyncQueueEntry requires more than int alignment */
190 #define QUEUEALIGN(len) INTALIGN(len)
191 
192 #define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
193 
194 /*
195  * Struct describing a queue position, and assorted macros for working with it
196  */
197 typedef struct QueuePosition
198 {
199  int page; /* SLRU page number */
200  int offset; /* byte offset within page */
202 
203 #define QUEUE_POS_PAGE(x) ((x).page)
204 #define QUEUE_POS_OFFSET(x) ((x).offset)
205 
206 #define SET_QUEUE_POS(x,y,z) \
207  do { \
208  (x).page = (y); \
209  (x).offset = (z); \
210  } while (0)
211 
212 #define QUEUE_POS_EQUAL(x,y) \
213  ((x).page == (y).page && (x).offset == (y).offset)
214 
215 #define QUEUE_POS_IS_ZERO(x) \
216  ((x).page == 0 && (x).offset == 0)
217 
218 /* choose logically smaller QueuePosition */
219 #define QUEUE_POS_MIN(x,y) \
220  (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
221  (x).page != (y).page ? (y) : \
222  (x).offset < (y).offset ? (x) : (y))
223 
224 /* choose logically larger QueuePosition */
225 #define QUEUE_POS_MAX(x,y) \
226  (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
227  (x).page != (y).page ? (x) : \
228  (x).offset > (y).offset ? (x) : (y))
229 
230 /*
231  * Parameter determining how often we try to advance the tail pointer:
232  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
233  * also the distance by which a backend in another database needs to be
234  * behind before we'll decide we need to wake it up to advance its pointer.
235  *
236  * Resist the temptation to make this really large. While that would save
237  * work in some places, it would add cost in others. In particular, this
238  * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends
239  * catch up before the pages they'll need to read fall out of SLRU cache.
240  */
241 #define QUEUE_CLEANUP_DELAY 4
242 
243 /*
244  * Struct describing a listening backend's status
245  */
246 typedef struct QueueBackendStatus
247 {
248  int32 pid; /* either a PID or InvalidPid */
249  Oid dboid; /* backend's database OID, or InvalidOid */
250  BackendId nextListener; /* id of next listener, or InvalidBackendId */
251  QueuePosition pos; /* backend has read queue up to here */
253 
254 /*
255  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
256  *
257  * The AsyncQueueControl structure is protected by the NotifyQueueLock and
258  * NotifyQueueTailLock.
259  *
260  * When holding NotifyQueueLock in SHARED mode, backends may only inspect
261  * their own entries as well as the head and tail pointers. Consequently we
262  * can allow a backend to update its own record while holding only SHARED lock
263  * (since no other backend will inspect it).
264  *
265  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
266  * entries of other backends and also change the head pointer. When holding
267  * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
268  * can change the tail pointers.
269  *
270  * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
271  * In order to avoid deadlocks, whenever we need multiple locks, we first get
272  * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock.
273  *
274  * Each backend uses the backend[] array entry with index equal to its
275  * BackendId (which can range from 1 to MaxBackends). We rely on this to make
276  * SendProcSignal fast.
277  *
278  * The backend[] array entries for actively-listening backends are threaded
279  * together using firstListener and the nextListener links, so that we can
280  * scan them without having to iterate over inactive entries. We keep this
281  * list in order by BackendId so that the scan is cache-friendly when there
282  * are many active entries.
283  */
284 typedef struct AsyncQueueControl
285 {
286  QueuePosition head; /* head points to the next free location */
287  QueuePosition tail; /* tail must be <= the queue position of every
288  * listening backend */
289  int stopPage; /* oldest unrecycled page; must be <=
290  * tail.page */
291  BackendId firstListener; /* id of first listener, or InvalidBackendId */
292  TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
294  /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
296 
298 
299 #define QUEUE_HEAD (asyncQueueControl->head)
300 #define QUEUE_TAIL (asyncQueueControl->tail)
301 #define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
302 #define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
303 #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
304 #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
305 #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
306 #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
307 
308 /*
309  * The SLRU buffer area through which we access the notification queue
310  */
312 
313 #define NotifyCtl (&NotifyCtlData)
314 #define QUEUE_PAGESIZE BLCKSZ
315 #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
316 
317 /*
318  * Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages
319  * which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
320  * We could use as many segments as SlruScanDirectory() allows, but this gives
321  * us so much space already that it doesn't seem worth the trouble.
322  *
323  * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
324  * pages, because more than that would confuse slru.c into thinking there
325  * was a wraparound condition. With the default BLCKSZ this means there
326  * can be up to 8GB of queued-and-not-read data.
327  *
328  * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
329  * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
330  */
331 #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
332 
333 /*
334  * listenChannels identifies the channels we are actually listening to
335  * (ie, have committed a LISTEN on). It is a simple list of channel names,
336  * allocated in TopMemoryContext.
337  */
338 static List *listenChannels = NIL; /* list of C strings */
339 
340 /*
341  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
342  * all actions requested in the current transaction. As explained above,
343  * we don't actually change listenChannels until we reach transaction commit.
344  *
345  * The list is kept in CurTransactionContext. In subtransactions, each
346  * subtransaction has its own list in its own CurTransactionContext, but
347  * successful subtransactions attach their lists to their parent's list.
348  * Failed subtransactions simply discard their lists.
349  */
350 typedef enum
351 {
356 
357 typedef struct
358 {
360  char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
361 } ListenAction;
362 
363 typedef struct ActionList
364 {
365  int nestingLevel; /* current transaction nesting depth */
366  List *actions; /* list of ListenAction structs */
367  struct ActionList *upper; /* details for upper transaction levels */
369 
370 static ActionList *pendingActions = NULL;
371 
372 /*
373  * State for outbound notifies consists of a list of all channels+payloads
374  * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
375  * until and unless the transaction commits. pendingNotifies is NULL if no
376  * NOTIFYs have been done in the current (sub) transaction.
377  *
378  * We discard duplicate notify events issued in the same transaction.
379  * Hence, in addition to the list proper (which we need to track the order
380  * of the events, since we guarantee to deliver them in order), we build a
381  * hash table which we can probe to detect duplicates. Since building the
382  * hash table is somewhat expensive, we do so only once we have at least
383  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
384  * before that we just scan the events linearly.
385  *
386  * The list is kept in CurTransactionContext. In subtransactions, each
387  * subtransaction has its own list in its own CurTransactionContext, but
388  * successful subtransactions add their entries to their parent's list.
389  * Failed subtransactions simply discard their lists. Since these lists
390  * are independent, there may be notify events in a subtransaction's list
391  * that duplicate events in some ancestor (sub) transaction; we get rid of
392  * the dups when merging the subtransaction's list into its parent's.
393  *
394  * Note: the action and notify lists do not interact within a transaction.
395  * In particular, if a transaction does NOTIFY and then LISTEN on the same
396  * condition name, it will get a self-notify at commit. This is a bit odd
397  * but is consistent with our historical behavior.
398  */
399 typedef struct Notification
400 {
401  uint16 channel_len; /* length of channel-name string */
402  uint16 payload_len; /* length of payload string */
403  /* null-terminated channel name, then null-terminated payload follow */
406 
407 typedef struct NotificationList
408 {
409  int nestingLevel; /* current transaction nesting depth */
410  List *events; /* list of Notification structs */
411  HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
412  struct NotificationList *upper; /* details for upper transaction levels */
414 
415 #define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
416 
417 typedef struct NotificationHash
418 {
419  Notification *event; /* => the actual Notification struct */
421 
423 
424 /*
425  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
426  * called from inside a signal handler. That just sets the
427  * notifyInterruptPending flag and sets the process
428  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
429  * actually deal with the interrupt.
430  */
431 volatile sig_atomic_t notifyInterruptPending = false;
432 
433 /* True if we've registered an on_shmem_exit cleanup */
434 static bool unlistenExitRegistered = false;
435 
436 /* True if we're currently registered as a listener in asyncQueueControl */
437 static bool amRegisteredListener = false;
438 
439 /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
440 static bool tryAdvanceTail = false;
441 
442 /* GUC parameter */
443 bool Trace_notify = false;
444 
445 /* local function prototypes */
446 static int asyncQueuePageDiff(int p, int q);
447 static bool asyncQueuePagePrecedes(int p, int q);
448 static void queue_listen(ListenActionKind action, const char *channel);
449 static void Async_UnlistenOnExit(int code, Datum arg);
450 static void Exec_ListenPreCommit(void);
451 static void Exec_ListenCommit(const char *channel);
452 static void Exec_UnlistenCommit(const char *channel);
453 static void Exec_UnlistenAllCommit(void);
454 static bool IsListeningOn(const char *channel);
455 static void asyncQueueUnregister(void);
456 static bool asyncQueueIsFull(void);
457 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
459 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
460 static double asyncQueueUsage(void);
461 static void asyncQueueFillWarning(void);
462 static void SignalBackends(void);
463 static void asyncQueueReadAllNotifications(void);
464 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
465  QueuePosition stop,
466  char *page_buffer,
467  Snapshot snapshot);
468 static void asyncQueueAdvanceTail(void);
469 static void ProcessIncomingNotify(bool flush);
470 static bool AsyncExistsPendingNotify(Notification *n);
472 static uint32 notification_hash(const void *key, Size keysize);
473 static int notification_match(const void *key1, const void *key2, Size keysize);
474 static void ClearPendingActionsAndNotifies(void);
475 
476 /*
477  * Compute the difference between two queue page numbers (i.e., p - q),
478  * accounting for wraparound.
479  */
480 static int
481 asyncQueuePageDiff(int p, int q)
482 {
483  int diff;
484 
485  /*
486  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
487  * in the range 0..QUEUE_MAX_PAGE.
488  */
489  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
490  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
491 
492  diff = p - q;
493  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
494  diff -= QUEUE_MAX_PAGE + 1;
495  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
496  diff += QUEUE_MAX_PAGE + 1;
497  return diff;
498 }
499 
500 /*
501  * Is p < q, accounting for wraparound?
502  *
503  * Since asyncQueueIsFull() blocks creation of a page that could precede any
504  * extant page, we need not assess entries within a page.
505  */
506 static bool
508 {
509  return asyncQueuePageDiff(p, q) < 0;
510 }
511 
512 /*
513  * Report space needed for our shared memory area
514  */
515 Size
517 {
518  Size size;
519 
520  /* This had better match AsyncShmemInit */
521  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
522  size = add_size(size, offsetof(AsyncQueueControl, backend));
523 
525 
526  return size;
527 }
528 
529 /*
530  * Initialize our shared memory area
531  */
532 void
534 {
535  bool found;
536  Size size;
537 
538  /*
539  * Create or attach to the AsyncQueueControl structure.
540  *
541  * The used entries in the backend[] array run from 1 to MaxBackends; the
542  * zero'th entry is unused but must be allocated.
543  */
544  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
545  size = add_size(size, offsetof(AsyncQueueControl, backend));
546 
548  ShmemInitStruct("Async Queue Control", size, &found);
549 
550  if (!found)
551  {
552  /* First time through, so initialize it */
553  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
554  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
555  QUEUE_STOP_PAGE = 0;
558  /* zero'th entry won't be used, but let's initialize it anyway */
559  for (int i = 0; i <= MaxBackends; i++)
560  {
565  }
566  }
567 
568  /*
569  * Set up SLRU management of the pg_notify data.
570  */
571  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
573  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
575 
576  if (!found)
577  {
578  /*
579  * During start or reboot, clean out the pg_notify directory.
580  */
582  }
583 }
584 
585 
586 /*
587  * pg_notify -
588  * SQL function to send a notification event
589  */
590 Datum
592 {
593  const char *channel;
594  const char *payload;
595 
596  if (PG_ARGISNULL(0))
597  channel = "";
598  else
599  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
600 
601  if (PG_ARGISNULL(1))
602  payload = "";
603  else
604  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
605 
606  /* For NOTIFY as a statement, this is checked in ProcessUtility */
608 
609  Async_Notify(channel, payload);
610 
611  PG_RETURN_VOID();
612 }
613 
614 
615 /*
616  * Async_Notify
617  *
618  * This is executed by the SQL notify command.
619  *
620  * Adds the message to the list of pending notifies.
621  * Actual notification happens during transaction commit.
622  * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
623  */
624 void
625 Async_Notify(const char *channel, const char *payload)
626 {
627  int my_level = GetCurrentTransactionNestLevel();
628  size_t channel_len;
629  size_t payload_len;
630  Notification *n;
631  MemoryContext oldcontext;
632 
633  if (IsParallelWorker())
634  elog(ERROR, "cannot send notifications from a parallel worker");
635 
636  if (Trace_notify)
637  elog(DEBUG1, "Async_Notify(%s)", channel);
638 
639  channel_len = channel ? strlen(channel) : 0;
640  payload_len = payload ? strlen(payload) : 0;
641 
642  /* a channel name must be specified */
643  if (channel_len == 0)
644  ereport(ERROR,
645  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
646  errmsg("channel name cannot be empty")));
647 
648  /* enforce length limits */
649  if (channel_len >= NAMEDATALEN)
650  ereport(ERROR,
651  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
652  errmsg("channel name too long")));
653 
654  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
655  ereport(ERROR,
656  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
657  errmsg("payload string too long")));
658 
659  /*
660  * We must construct the Notification entry, even if we end up not using
661  * it, in order to compare it cheaply to existing list entries.
662  *
663  * The notification list needs to live until end of transaction, so store
664  * it in the transaction context.
665  */
667 
668  n = (Notification *) palloc(offsetof(Notification, data) +
669  channel_len + payload_len + 2);
670  n->channel_len = channel_len;
671  n->payload_len = payload_len;
672  strcpy(n->data, channel);
673  if (payload)
674  strcpy(n->data + channel_len + 1, payload);
675  else
676  n->data[channel_len + 1] = '\0';
677 
678  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
679  {
680  NotificationList *notifies;
681 
682  /*
683  * First notify event in current (sub)xact. Note that we allocate the
684  * NotificationList in TopTransactionContext; the nestingLevel might
685  * get changed later by AtSubCommit_Notify.
686  */
687  notifies = (NotificationList *)
689  sizeof(NotificationList));
690  notifies->nestingLevel = my_level;
691  notifies->events = list_make1(n);
692  /* We certainly don't need a hashtable yet */
693  notifies->hashtab = NULL;
694  notifies->upper = pendingNotifies;
695  pendingNotifies = notifies;
696  }
697  else
698  {
699  /* Now check for duplicates */
701  {
702  /* It's a dup, so forget it */
703  pfree(n);
704  MemoryContextSwitchTo(oldcontext);
705  return;
706  }
707 
708  /* Append more events to existing list */
710  }
711 
712  MemoryContextSwitchTo(oldcontext);
713 }
714 
715 /*
716  * queue_listen
717  * Common code for listen, unlisten, unlisten all commands.
718  *
719  * Adds the request to the list of pending actions.
720  * Actual update of the listenChannels list happens during transaction
721  * commit.
722  */
723 static void
724 queue_listen(ListenActionKind action, const char *channel)
725 {
726  MemoryContext oldcontext;
727  ListenAction *actrec;
728  int my_level = GetCurrentTransactionNestLevel();
729 
730  /*
731  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
732  * be too complicated to ensure we get the right interactions of
733  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
734  * would be any performance benefit anyway in sane applications.
735  */
737 
738  /* space for terminating null is included in sizeof(ListenAction) */
739  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
740  strlen(channel) + 1);
741  actrec->action = action;
742  strcpy(actrec->channel, channel);
743 
744  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
745  {
746  ActionList *actions;
747 
748  /*
749  * First action in current sub(xact). Note that we allocate the
750  * ActionList in TopTransactionContext; the nestingLevel might get
751  * changed later by AtSubCommit_Notify.
752  */
753  actions = (ActionList *)
755  actions->nestingLevel = my_level;
756  actions->actions = list_make1(actrec);
757  actions->upper = pendingActions;
758  pendingActions = actions;
759  }
760  else
762 
763  MemoryContextSwitchTo(oldcontext);
764 }
765 
766 /*
767  * Async_Listen
768  *
769  * This is executed by the SQL listen command.
770  */
771 void
772 Async_Listen(const char *channel)
773 {
774  if (Trace_notify)
775  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
776 
777  queue_listen(LISTEN_LISTEN, channel);
778 }
779 
780 /*
781  * Async_Unlisten
782  *
783  * This is executed by the SQL unlisten command.
784  */
785 void
786 Async_Unlisten(const char *channel)
787 {
788  if (Trace_notify)
789  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
790 
791  /* If we couldn't possibly be listening, no need to queue anything */
792  if (pendingActions == NULL && !unlistenExitRegistered)
793  return;
794 
795  queue_listen(LISTEN_UNLISTEN, channel);
796 }
797 
798 /*
799  * Async_UnlistenAll
800  *
801  * This is invoked by UNLISTEN * command, and also at backend exit.
802  */
803 void
805 {
806  if (Trace_notify)
807  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
808 
809  /* If we couldn't possibly be listening, no need to queue anything */
810  if (pendingActions == NULL && !unlistenExitRegistered)
811  return;
812 
814 }
815 
816 /*
817  * SQL function: return a set of the channel names this backend is actively
818  * listening to.
819  *
820  * Note: this coding relies on the fact that the listenChannels list cannot
821  * change within a transaction.
822  */
823 Datum
825 {
826  FuncCallContext *funcctx;
827 
828  /* stuff done only on the first call of the function */
829  if (SRF_IS_FIRSTCALL())
830  {
831  /* create a function context for cross-call persistence */
832  funcctx = SRF_FIRSTCALL_INIT();
833  }
834 
835  /* stuff done on every call of the function */
836  funcctx = SRF_PERCALL_SETUP();
837 
838  if (funcctx->call_cntr < list_length(listenChannels))
839  {
840  char *channel = (char *) list_nth(listenChannels,
841  funcctx->call_cntr);
842 
843  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
844  }
845 
846  SRF_RETURN_DONE(funcctx);
847 }
848 
849 /*
850  * Async_UnlistenOnExit
851  *
852  * This is executed at backend exit if we have done any LISTENs in this
853  * backend. It might not be necessary anymore, if the user UNLISTENed
854  * everything, but we don't try to detect that case.
855  */
856 static void
858 {
861 }
862 
863 /*
864  * AtPrepare_Notify
865  *
866  * This is called at the prepare phase of a two-phase
867  * transaction. Save the state for possible commit later.
868  */
869 void
871 {
872  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
874  ereport(ERROR,
875  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
876  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
877 }
878 
879 /*
880  * PreCommit_Notify
881  *
882  * This is called at transaction commit, before actually committing to
883  * clog.
884  *
885  * If there are pending LISTEN actions, make sure we are listed in the
886  * shared-memory listener array. This must happen before commit to
887  * ensure we don't miss any notifies from transactions that commit
888  * just after ours.
889  *
890  * If there are outbound notify requests in the pendingNotifies list,
891  * add them to the global queue. We do that before commit so that
892  * we can still throw error if we run out of queue space.
893  */
894 void
896 {
897  ListCell *p;
898 
900  return; /* no relevant statements in this xact */
901 
902  if (Trace_notify)
903  elog(DEBUG1, "PreCommit_Notify");
904 
905  /* Preflight for any pending listen/unlisten actions */
906  if (pendingActions != NULL)
907  {
908  foreach(p, pendingActions->actions)
909  {
910  ListenAction *actrec = (ListenAction *) lfirst(p);
911 
912  switch (actrec->action)
913  {
914  case LISTEN_LISTEN:
916  break;
917  case LISTEN_UNLISTEN:
918  /* there is no Exec_UnlistenPreCommit() */
919  break;
920  case LISTEN_UNLISTEN_ALL:
921  /* there is no Exec_UnlistenAllPreCommit() */
922  break;
923  }
924  }
925  }
926 
927  /* Queue any pending notifies (must happen after the above) */
928  if (pendingNotifies)
929  {
930  ListCell *nextNotify;
931 
932  /*
933  * Make sure that we have an XID assigned to the current transaction.
934  * GetCurrentTransactionId is cheap if we already have an XID, but not
935  * so cheap if we don't, and we'd prefer not to do that work while
936  * holding NotifyQueueLock.
937  */
938  (void) GetCurrentTransactionId();
939 
940  /*
941  * Serialize writers by acquiring a special lock that we hold till
942  * after commit. This ensures that queue entries appear in commit
943  * order, and in particular that there are never uncommitted queue
944  * entries ahead of committed ones, so an uncommitted transaction
945  * can't block delivery of deliverable notifications.
946  *
947  * We use a heavyweight lock so that it'll automatically be released
948  * after either commit or abort. This also allows deadlocks to be
949  * detected, though really a deadlock shouldn't be possible here.
950  *
951  * The lock is on "database 0", which is pretty ugly but it doesn't
952  * seem worth inventing a special locktag category just for this.
953  * (Historical note: before PG 9.0, a similar lock on "database 0" was
954  * used by the flatfiles mechanism.)
955  */
956  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
958 
959  /* Now push the notifications into the queue */
960  nextNotify = list_head(pendingNotifies->events);
961  while (nextNotify != NULL)
962  {
963  /*
964  * Add the pending notifications to the queue. We acquire and
965  * release NotifyQueueLock once per page, which might be overkill
966  * but it does allow readers to get in while we're doing this.
967  *
968  * A full queue is very uncommon and should really not happen,
969  * given that we have so much space available in the SLRU pages.
970  * Nevertheless we need to deal with this possibility. Note that
971  * when we get here we are in the process of committing our
972  * transaction, but we have not yet committed to clog, so at this
973  * point in time we can still roll the transaction back.
974  */
975  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
977  if (asyncQueueIsFull())
978  ereport(ERROR,
979  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
980  errmsg("too many notifications in the NOTIFY queue")));
981  nextNotify = asyncQueueAddEntries(nextNotify);
982  LWLockRelease(NotifyQueueLock);
983  }
984 
985  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
986  }
987 }
988 
989 /*
990  * AtCommit_Notify
991  *
992  * This is called at transaction commit, after committing to clog.
993  *
994  * Update listenChannels and clear transaction-local state.
995  *
996  * If we issued any notifications in the transaction, send signals to
997  * listening backends (possibly including ourselves) to process them.
998  * Also, if we filled enough queue pages with new notifies, try to
999  * advance the queue tail pointer.
1000  */
1001 void
1003 {
1004  ListCell *p;
1005 
1006  /*
1007  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1008  * return as soon as possible
1009  */
1011  return;
1012 
1013  if (Trace_notify)
1014  elog(DEBUG1, "AtCommit_Notify");
1015 
1016  /* Perform any pending listen/unlisten actions */
1017  if (pendingActions != NULL)
1018  {
1019  foreach(p, pendingActions->actions)
1020  {
1021  ListenAction *actrec = (ListenAction *) lfirst(p);
1022 
1023  switch (actrec->action)
1024  {
1025  case LISTEN_LISTEN:
1026  Exec_ListenCommit(actrec->channel);
1027  break;
1028  case LISTEN_UNLISTEN:
1029  Exec_UnlistenCommit(actrec->channel);
1030  break;
1031  case LISTEN_UNLISTEN_ALL:
1033  break;
1034  }
1035  }
1036  }
1037 
1038  /* If no longer listening to anything, get out of listener array */
1041 
1042  /*
1043  * Send signals to listening backends. We need do this only if there are
1044  * pending notifies, which were previously added to the shared queue by
1045  * PreCommit_Notify().
1046  */
1047  if (pendingNotifies != NULL)
1048  SignalBackends();
1049 
1050  /*
1051  * If it's time to try to advance the global tail pointer, do that.
1052  *
1053  * (It might seem odd to do this in the sender, when more than likely the
1054  * listeners won't yet have read the messages we just sent. However,
1055  * there's less contention if only the sender does it, and there is little
1056  * need for urgency in advancing the global tail. So this typically will
1057  * be clearing out messages that were sent some time ago.)
1058  */
1059  if (tryAdvanceTail)
1060  {
1061  tryAdvanceTail = false;
1063  }
1064 
1065  /* And clean up */
1067 }
1068 
1069 /*
1070  * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
1071  *
1072  * This function must make sure we are ready to catch any incoming messages.
1073  */
1074 static void
1076 {
1077  QueuePosition head;
1078  QueuePosition max;
1079  BackendId prevListener;
1080 
1081  /*
1082  * Nothing to do if we are already listening to something, nor if we
1083  * already ran this routine in this transaction.
1084  */
1086  return;
1087 
1088  if (Trace_notify)
1089  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1090 
1091  /*
1092  * Before registering, make sure we will unlisten before dying. (Note:
1093  * this action does not get undone if we abort later.)
1094  */
1096  {
1098  unlistenExitRegistered = true;
1099  }
1100 
1101  /*
1102  * This is our first LISTEN, so establish our pointer.
1103  *
1104  * We set our pointer to the global tail pointer and then move it forward
1105  * over already-committed notifications. This ensures we cannot miss any
1106  * not-yet-committed notifications. We might get a few more but that
1107  * doesn't hurt.
1108  *
1109  * In some scenarios there might be a lot of committed notifications that
1110  * have not yet been pruned away (because some backend is being lazy about
1111  * reading them). To reduce our startup time, we can look at other
1112  * backends and adopt the maximum "pos" pointer of any backend that's in
1113  * our database; any notifications it's already advanced over are surely
1114  * committed and need not be re-examined by us. (We must consider only
1115  * backends connected to our DB, because others will not have bothered to
1116  * check committed-ness of notifications in our DB.)
1117  *
1118  * We need exclusive lock here so we can look at other backends' entries
1119  * and manipulate the list links.
1120  */
1121  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1122  head = QUEUE_HEAD;
1123  max = QUEUE_TAIL;
1124  prevListener = InvalidBackendId;
1126  {
1128  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1129  /* Also find last listening backend before this one */
1130  if (i < MyBackendId)
1131  prevListener = i;
1132  }
1136  /* Insert backend into list of listeners at correct position */
1137  if (prevListener > 0)
1138  {
1140  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1141  }
1142  else
1143  {
1146  }
1147  LWLockRelease(NotifyQueueLock);
1148 
1149  /* Now we are listed in the global array, so remember we're listening */
1150  amRegisteredListener = true;
1151 
1152  /*
1153  * Try to move our pointer forward as far as possible. This will skip
1154  * over already-committed notifications, which we want to do because they
1155  * might be quite stale. Note that we are not yet listening on anything,
1156  * so we won't deliver such notifications to our frontend. Also, although
1157  * our transaction might have executed NOTIFY, those message(s) aren't
1158  * queued yet so we won't skip them here.
1159  */
1160  if (!QUEUE_POS_EQUAL(max, head))
1162 }
1163 
1164 /*
1165  * Exec_ListenCommit --- subroutine for AtCommit_Notify
1166  *
1167  * Add the channel to the list of channels we are listening on.
1168  */
1169 static void
1170 Exec_ListenCommit(const char *channel)
1171 {
1172  MemoryContext oldcontext;
1173 
1174  /* Do nothing if we are already listening on this channel */
1175  if (IsListeningOn(channel))
1176  return;
1177 
1178  /*
1179  * Add the new channel name to listenChannels.
1180  *
1181  * XXX It is theoretically possible to get an out-of-memory failure here,
1182  * which would be bad because we already committed. For the moment it
1183  * doesn't seem worth trying to guard against that, but maybe improve this
1184  * later.
1185  */
1188  MemoryContextSwitchTo(oldcontext);
1189 }
1190 
1191 /*
1192  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
1193  *
1194  * Remove the specified channel name from listenChannels.
1195  */
1196 static void
1197 Exec_UnlistenCommit(const char *channel)
1198 {
1199  ListCell *q;
1200 
1201  if (Trace_notify)
1202  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1203 
1204  foreach(q, listenChannels)
1205  {
1206  char *lchan = (char *) lfirst(q);
1207 
1208  if (strcmp(lchan, channel) == 0)
1209  {
1211  pfree(lchan);
1212  break;
1213  }
1214  }
1215 
1216  /*
1217  * We do not complain about unlistening something not being listened;
1218  * should we?
1219  */
1220 }
1221 
1222 /*
1223  * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
1224  *
1225  * Unlisten on all channels for this backend.
1226  */
1227 static void
1229 {
1230  if (Trace_notify)
1231  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1232 
1234  listenChannels = NIL;
1235 }
1236 
1237 /*
1238  * Test whether we are actively listening on the given channel name.
1239  *
1240  * Note: this function is executed for every notification found in the queue.
1241  * Perhaps it is worth further optimization, eg convert the list to a sorted
1242  * array so we can binary-search it. In practice the list is likely to be
1243  * fairly short, though.
1244  */
1245 static bool
1246 IsListeningOn(const char *channel)
1247 {
1248  ListCell *p;
1249 
1250  foreach(p, listenChannels)
1251  {
1252  char *lchan = (char *) lfirst(p);
1253 
1254  if (strcmp(lchan, channel) == 0)
1255  return true;
1256  }
1257  return false;
1258 }
1259 
1260 /*
1261  * Remove our entry from the listeners array when we are no longer listening
1262  * on any channel. NB: must not fail if we're already not listening.
1263  */
1264 static void
1266 {
1267  Assert(listenChannels == NIL); /* else caller error */
1268 
1269  if (!amRegisteredListener) /* nothing to do */
1270  return;
1271 
1272  /*
1273  * Need exclusive lock here to manipulate list links.
1274  */
1275  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1276  /* Mark our entry as invalid */
1279  /* and remove it from the list */
1282  else
1283  {
1285  {
1287  {
1289  break;
1290  }
1291  }
1292  }
1294  LWLockRelease(NotifyQueueLock);
1295 
1296  /* mark ourselves as no longer listed in the global array */
1297  amRegisteredListener = false;
1298 }
1299 
1300 /*
1301  * Test whether there is room to insert more notification messages.
1302  *
1303  * Caller must hold at least shared NotifyQueueLock.
1304  */
1305 static bool
1307 {
1308  int nexthead;
1309  int boundary;
1310 
1311  /*
1312  * The queue is full if creating a new head page would create a page that
1313  * logically precedes the current global tail pointer, ie, the head
1314  * pointer would wrap around compared to the tail. We cannot create such
1315  * a head page for fear of confusing slru.c. For safety we round the tail
1316  * pointer back to a segment boundary (truncation logic in
1317  * asyncQueueAdvanceTail does not do this, so doing it here is optional).
1318  *
1319  * Note that this test is *not* dependent on how much space there is on
1320  * the current head page. This is necessary because asyncQueueAddEntries
1321  * might try to create the next head page in any case.
1322  */
1323  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1324  if (nexthead > QUEUE_MAX_PAGE)
1325  nexthead = 0; /* wrap around */
1326  boundary = QUEUE_STOP_PAGE;
1327  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1328  return asyncQueuePagePrecedes(nexthead, boundary);
1329 }
1330 
1331 /*
1332  * Advance the QueuePosition to the next entry, assuming that the current
1333  * entry is of length entryLength. If we jump to a new page the function
1334  * returns true, else false.
1335  */
1336 static bool
1337 asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1338 {
1339  int pageno = QUEUE_POS_PAGE(*position);
1340  int offset = QUEUE_POS_OFFSET(*position);
1341  bool pageJump = false;
1342 
1343  /*
1344  * Move to the next writing position: First jump over what we have just
1345  * written or read.
1346  */
1347  offset += entryLength;
1348  Assert(offset <= QUEUE_PAGESIZE);
1349 
1350  /*
1351  * In a second step check if another entry can possibly be written to the
1352  * page. If so, stay here, we have reached the next position. If not, then
1353  * we need to move on to the next page.
1354  */
1356  {
1357  pageno++;
1358  if (pageno > QUEUE_MAX_PAGE)
1359  pageno = 0; /* wrap around */
1360  offset = 0;
1361  pageJump = true;
1362  }
1363 
1364  SET_QUEUE_POS(*position, pageno, offset);
1365  return pageJump;
1366 }
1367 
1368 /*
1369  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
1370  */
1371 static void
1373 {
1374  size_t channellen = n->channel_len;
1375  size_t payloadlen = n->payload_len;
1376  int entryLength;
1377 
1378  Assert(channellen < NAMEDATALEN);
1379  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1380 
1381  /* The terminators are already included in AsyncQueueEntryEmptySize */
1382  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1383  entryLength = QUEUEALIGN(entryLength);
1384  qe->length = entryLength;
1385  qe->dboid = MyDatabaseId;
1386  qe->xid = GetCurrentTransactionId();
1387  qe->srcPid = MyProcPid;
1388  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1389 }
1390 
1391 /*
1392  * Add pending notifications to the queue.
1393  *
1394  * We go page by page here, i.e. we stop once we have to go to a new page but
1395  * we will be called again and then fill that next page. If an entry does not
1396  * fit into the current page, we write a dummy entry with an InvalidOid as the
1397  * database OID in order to fill the page. So every page is always used up to
1398  * the last byte which simplifies reading the page later.
1399  *
1400  * We are passed the list cell (in pendingNotifies->events) containing the next
1401  * notification to write and return the first still-unwritten cell back.
1402  * Eventually we will return NULL indicating all is done.
1403  *
1404  * We are holding NotifyQueueLock already from the caller and grab
1405  * NotifySLRULock locally in this function.
1406  */
1407 static ListCell *
1409 {
1410  AsyncQueueEntry qe;
1411  QueuePosition queue_head;
1412  int pageno;
1413  int offset;
1414  int slotno;
1415 
1416  /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
1417  LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
1418 
1419  /*
1420  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1421  * memory upon exiting. The reason for this is that if we have to advance
1422  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1423  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1424  * subsequent insertions would try to put entries into a page that slru.c
1425  * thinks doesn't exist yet.) So, use a local position variable. Note
1426  * that if we do fail, any already-inserted queue entries are forgotten;
1427  * this is okay, since they'd be useless anyway after our transaction
1428  * rolls back.
1429  */
1430  queue_head = QUEUE_HEAD;
1431 
1432  /*
1433  * If this is the first write since the postmaster started, we need to
1434  * initialize the first page of the async SLRU. Otherwise, the current
1435  * page should be initialized already, so just fetch it.
1436  *
1437  * (We could also take the first path when the SLRU position has just
1438  * wrapped around, but re-zeroing the page is harmless in that case.)
1439  */
1440  pageno = QUEUE_POS_PAGE(queue_head);
1441  if (QUEUE_POS_IS_ZERO(queue_head))
1442  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1443  else
1444  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1446 
1447  /* Note we mark the page dirty before writing in it */
1448  NotifyCtl->shared->page_dirty[slotno] = true;
1449 
1450  while (nextNotify != NULL)
1451  {
1452  Notification *n = (Notification *) lfirst(nextNotify);
1453 
1454  /* Construct a valid queue entry in local variable qe */
1456 
1457  offset = QUEUE_POS_OFFSET(queue_head);
1458 
1459  /* Check whether the entry really fits on the current page */
1460  if (offset + qe.length <= QUEUE_PAGESIZE)
1461  {
1462  /* OK, so advance nextNotify past this item */
1463  nextNotify = lnext(pendingNotifies->events, nextNotify);
1464  }
1465  else
1466  {
1467  /*
1468  * Write a dummy entry to fill up the page. Actually readers will
1469  * only check dboid and since it won't match any reader's database
1470  * OID, they will ignore this entry and move on.
1471  */
1472  qe.length = QUEUE_PAGESIZE - offset;
1473  qe.dboid = InvalidOid;
1474  qe.data[0] = '\0'; /* empty channel */
1475  qe.data[1] = '\0'; /* empty payload */
1476  }
1477 
1478  /* Now copy qe into the shared buffer page */
1479  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1480  &qe,
1481  qe.length);
1482 
1483  /* Advance queue_head appropriately, and detect if page is full */
1484  if (asyncQueueAdvance(&(queue_head), qe.length))
1485  {
1486  /*
1487  * Page is full, so we're done here, but first fill the next page
1488  * with zeroes. The reason to do this is to ensure that slru.c's
1489  * idea of the head page is always the same as ours, which avoids
1490  * boundary problems in SimpleLruTruncate. The test in
1491  * asyncQueueIsFull() ensured that there is room to create this
1492  * page without overrunning the queue.
1493  */
1494  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1495 
1496  /*
1497  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1498  * set flag to remember that we should try to advance the tail
1499  * pointer (we don't want to actually do that right here).
1500  */
1501  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1502  tryAdvanceTail = true;
1503 
1504  /* And exit the loop */
1505  break;
1506  }
1507  }
1508 
1509  /* Success, so update the global QUEUE_HEAD */
1510  QUEUE_HEAD = queue_head;
1511 
1512  LWLockRelease(NotifySLRULock);
1513 
1514  return nextNotify;
1515 }
1516 
1517 /*
1518  * SQL function to return the fraction of the notification queue currently
1519  * occupied.
1520  */
1521 Datum
1523 {
1524  double usage;
1525 
1526  /* Advance the queue tail so we don't report a too-large result */
1528 
1529  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1530  usage = asyncQueueUsage();
1531  LWLockRelease(NotifyQueueLock);
1532 
1534 }
1535 
1536 /*
1537  * Return the fraction of the queue that is currently occupied.
1538  *
1539  * The caller must hold NotifyQueueLock in (at least) shared mode.
1540  *
1541  * Note: we measure the distance to the logical tail page, not the physical
1542  * tail page. In some sense that's wrong, but the relative position of the
1543  * physical tail is affected by details such as SLRU segment boundaries,
1544  * so that a result based on that is unpleasantly unstable.
1545  */
1546 static double
1548 {
1549  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1550  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1551  int occupied;
1552 
1553  occupied = headPage - tailPage;
1554 
1555  if (occupied == 0)
1556  return (double) 0; /* fast exit for common case */
1557 
1558  if (occupied < 0)
1559  {
1560  /* head has wrapped around, tail not yet */
1561  occupied += QUEUE_MAX_PAGE + 1;
1562  }
1563 
1564  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1565 }
1566 
1567 /*
1568  * Check whether the queue is at least half full, and emit a warning if so.
1569  *
1570  * This is unlikely given the size of the queue, but possible.
1571  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
1572  *
1573  * Caller must hold exclusive NotifyQueueLock.
1574  */
1575 static void
1577 {
1578  double fillDegree;
1579  TimestampTz t;
1580 
1581  fillDegree = asyncQueueUsage();
1582  if (fillDegree < 0.5)
1583  return;
1584 
1585  t = GetCurrentTimestamp();
1586 
1589  {
1590  QueuePosition min = QUEUE_HEAD;
1591  int32 minPid = InvalidPid;
1592 
1594  {
1596  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1597  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1598  minPid = QUEUE_BACKEND_PID(i);
1599  }
1600 
1601  ereport(WARNING,
1602  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1603  (minPid != InvalidPid ?
1604  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1605  : 0),
1606  (minPid != InvalidPid ?
1607  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1608  : 0)));
1609 
1611  }
1612 }
1613 
1614 /*
1615  * Send signals to listening backends.
1616  *
1617  * Normally we signal only backends in our own database, since only those
1618  * backends could be interested in notifies we send. However, if there's
1619  * notify traffic in our database but no traffic in another database that
1620  * does have listener(s), those listeners will fall further and further
1621  * behind. Waken them anyway if they're far enough behind, so that they'll
1622  * advance their queue position pointers, allowing the global tail to advance.
1623  *
1624  * Since we know the BackendId and the Pid the signaling is quite cheap.
1625  *
1626  * This is called during CommitTransaction(), so it's important for it
1627  * to have very low probability of failure.
1628  */
1629 static void
1631 {
1632  int32 *pids;
1633  BackendId *ids;
1634  int count;
1635 
1636  /*
1637  * Identify backends that we need to signal. We don't want to send
1638  * signals while holding the NotifyQueueLock, so this loop just builds a
1639  * list of target PIDs.
1640  *
1641  * XXX in principle these pallocs could fail, which would be bad. Maybe
1642  * preallocate the arrays? They're not that large, though.
1643  */
1644  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1645  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1646  count = 0;
1647 
1648  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1650  {
1651  int32 pid = QUEUE_BACKEND_PID(i);
1652  QueuePosition pos;
1653 
1654  Assert(pid != InvalidPid);
1655  pos = QUEUE_BACKEND_POS(i);
1657  {
1658  /*
1659  * Always signal listeners in our own database, unless they're
1660  * already caught up (unlikely, but possible).
1661  */
1662  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1663  continue;
1664  }
1665  else
1666  {
1667  /*
1668  * Listeners in other databases should be signaled only if they
1669  * are far behind.
1670  */
1673  continue;
1674  }
1675  /* OK, need to signal this one */
1676  pids[count] = pid;
1677  ids[count] = i;
1678  count++;
1679  }
1680  LWLockRelease(NotifyQueueLock);
1681 
1682  /* Now send signals */
1683  for (int i = 0; i < count; i++)
1684  {
1685  int32 pid = pids[i];
1686 
1687  /*
1688  * If we are signaling our own process, no need to involve the kernel;
1689  * just set the flag directly.
1690  */
1691  if (pid == MyProcPid)
1692  {
1693  notifyInterruptPending = true;
1694  continue;
1695  }
1696 
1697  /*
1698  * Note: assuming things aren't broken, a signal failure here could
1699  * only occur if the target backend exited since we released
1700  * NotifyQueueLock; which is unlikely but certainly possible. So we
1701  * just log a low-level debug message if it happens.
1702  */
1703  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1704  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1705  }
1706 
1707  pfree(pids);
1708  pfree(ids);
1709 }
1710 
1711 /*
1712  * AtAbort_Notify
1713  *
1714  * This is called at transaction abort.
1715  *
1716  * Gets rid of pending actions and outbound notifies that we would have
1717  * executed if the transaction got committed.
1718  */
1719 void
1721 {
1722  /*
1723  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1724  * we have registered as a listener but have not made any entry in
1725  * listenChannels. In that case, deregister again.
1726  */
1729 
1730  /* And clean up */
1732 }
1733 
1734 /*
1735  * AtSubCommit_Notify() --- Take care of subtransaction commit.
1736  *
1737  * Reassign all items in the pending lists to the parent transaction.
1738  */
1739 void
1741 {
1742  int my_level = GetCurrentTransactionNestLevel();
1743 
1744  /* If there are actions at our nesting level, we must reparent them. */
1745  if (pendingActions != NULL &&
1746  pendingActions->nestingLevel >= my_level)
1747  {
1748  if (pendingActions->upper == NULL ||
1749  pendingActions->upper->nestingLevel < my_level - 1)
1750  {
1751  /* nothing to merge; give the whole thing to the parent */
1753  }
1754  else
1755  {
1756  ActionList *childPendingActions = pendingActions;
1757 
1759 
1760  /*
1761  * Mustn't try to eliminate duplicates here --- see queue_listen()
1762  */
1765  childPendingActions->actions);
1766  pfree(childPendingActions);
1767  }
1768  }
1769 
1770  /* If there are notifies at our nesting level, we must reparent them. */
1771  if (pendingNotifies != NULL &&
1772  pendingNotifies->nestingLevel >= my_level)
1773  {
1774  Assert(pendingNotifies->nestingLevel == my_level);
1775 
1776  if (pendingNotifies->upper == NULL ||
1777  pendingNotifies->upper->nestingLevel < my_level - 1)
1778  {
1779  /* nothing to merge; give the whole thing to the parent */
1781  }
1782  else
1783  {
1784  /*
1785  * Formerly, we didn't bother to eliminate duplicates here, but
1786  * now we must, else we fall foul of "Assert(!found)", either here
1787  * or during a later attempt to build the parent-level hashtable.
1788  */
1789  NotificationList *childPendingNotifies = pendingNotifies;
1790  ListCell *l;
1791 
1793  /* Insert all the subxact's events into parent, except for dups */
1794  foreach(l, childPendingNotifies->events)
1795  {
1796  Notification *childn = (Notification *) lfirst(l);
1797 
1798  if (!AsyncExistsPendingNotify(childn))
1799  AddEventToPendingNotifies(childn);
1800  }
1801  pfree(childPendingNotifies);
1802  }
1803  }
1804 }
1805 
1806 /*
1807  * AtSubAbort_Notify() --- Take care of subtransaction abort.
1808  */
1809 void
1811 {
1812  int my_level = GetCurrentTransactionNestLevel();
1813 
1814  /*
1815  * All we have to do is pop the stack --- the actions/notifies made in
1816  * this subxact are no longer interesting, and the space will be freed
1817  * when CurTransactionContext is recycled. We still have to free the
1818  * ActionList and NotificationList objects themselves, though, because
1819  * those are allocated in TopTransactionContext.
1820  *
1821  * Note that there might be no entries at all, or no entries for the
1822  * current subtransaction level, either because none were ever created, or
1823  * because we reentered this routine due to trouble during subxact abort.
1824  */
1825  while (pendingActions != NULL &&
1826  pendingActions->nestingLevel >= my_level)
1827  {
1828  ActionList *childPendingActions = pendingActions;
1829 
1831  pfree(childPendingActions);
1832  }
1833 
1834  while (pendingNotifies != NULL &&
1835  pendingNotifies->nestingLevel >= my_level)
1836  {
1837  NotificationList *childPendingNotifies = pendingNotifies;
1838 
1840  pfree(childPendingNotifies);
1841  }
1842 }
1843 
1844 /*
1845  * HandleNotifyInterrupt
1846  *
1847  * Signal handler portion of interrupt handling. Let the backend know
1848  * that there's a pending notify interrupt. If we're currently reading
1849  * from the client, this will interrupt the read and
1850  * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
1851  */
1852 void
1854 {
1855  /*
1856  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1857  * you do here.
1858  */
1859 
1860  /* signal that work needs to be done */
1861  notifyInterruptPending = true;
1862 
1863  /* make sure the event is processed in due course */
1864  SetLatch(MyLatch);
1865 }
1866 
1867 /*
1868  * ProcessNotifyInterrupt
1869  *
1870  * This is called if we see notifyInterruptPending set, just before
1871  * transmitting ReadyForQuery at the end of a frontend command, and
1872  * also if a notify signal occurs while reading from the frontend.
1873  * HandleNotifyInterrupt() will cause the read to be interrupted
1874  * via the process's latch, and this routine will get called.
1875  * If we are truly idle (ie, *not* inside a transaction block),
1876  * process the incoming notifies.
1877  *
1878  * If "flush" is true, force any frontend messages out immediately.
1879  * This can be false when being called at the end of a frontend command,
1880  * since we'll flush after sending ReadyForQuery.
1881  */
1882 void
1884 {
1886  return; /* not really idle */
1887 
1888  /* Loop in case another signal arrives while sending messages */
1889  while (notifyInterruptPending)
1890  ProcessIncomingNotify(flush);
1891 }
1892 
1893 
1894 /*
1895  * Read all pending notifications from the queue, and deliver appropriate
1896  * ones to my frontend. Stop when we reach queue head or an uncommitted
1897  * notification.
1898  */
1899 static void
1901 {
1902  volatile QueuePosition pos;
1903  QueuePosition head;
1904  Snapshot snapshot;
1905 
1906  /* page_buffer must be adequately aligned, so use a union */
1907  union
1908  {
1909  char buf[QUEUE_PAGESIZE];
1910  AsyncQueueEntry align;
1911  } page_buffer;
1912 
1913  /* Fetch current state */
1914  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1915  /* Assert checks that we have a valid state entry */
1918  head = QUEUE_HEAD;
1919  LWLockRelease(NotifyQueueLock);
1920 
1921  if (QUEUE_POS_EQUAL(pos, head))
1922  {
1923  /* Nothing to do, we have read all notifications already. */
1924  return;
1925  }
1926 
1927  /*----------
1928  * Get snapshot we'll use to decide which xacts are still in progress.
1929  * This is trickier than it might seem, because of race conditions.
1930  * Consider the following example:
1931  *
1932  * Backend 1: Backend 2:
1933  *
1934  * transaction starts
1935  * UPDATE foo SET ...;
1936  * NOTIFY foo;
1937  * commit starts
1938  * queue the notify message
1939  * transaction starts
1940  * LISTEN foo; -- first LISTEN in session
1941  * SELECT * FROM foo WHERE ...;
1942  * commit to clog
1943  * commit starts
1944  * add backend 2 to array of listeners
1945  * advance to queue head (this code)
1946  * commit to clog
1947  *
1948  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1949  * wasn't committed yet. Ideally we'd ensure that client 2 would
1950  * eventually get transaction 1's notify message, but there's no way
1951  * to do that; until we're in the listener array, there's no guarantee
1952  * that the notify message doesn't get removed from the queue.
1953  *
1954  * Therefore the coding technique transaction 2 is using is unsafe:
1955  * applications must commit a LISTEN before inspecting database state,
1956  * if they want to ensure they will see notifications about subsequent
1957  * changes to that state.
1958  *
1959  * What we do guarantee is that we'll see all notifications from
1960  * transactions committing after the snapshot we take here.
1961  * Exec_ListenPreCommit has already added us to the listener array,
1962  * so no not-yet-committed messages can be removed from the queue
1963  * before we see them.
1964  *----------
1965  */
1966  snapshot = RegisterSnapshot(GetLatestSnapshot());
1967 
1968  /*
1969  * It is possible that we fail while trying to send a message to our
1970  * frontend (for example, because of encoding conversion failure). If
1971  * that happens it is critical that we not try to send the same message
1972  * over and over again. Therefore, we place a PG_TRY block here that will
1973  * forcibly advance our queue position before we lose control to an error.
1974  * (We could alternatively retake NotifyQueueLock and move the position
1975  * before handling each individual message, but that seems like too much
1976  * lock traffic.)
1977  */
1978  PG_TRY();
1979  {
1980  bool reachedStop;
1981 
1982  do
1983  {
1984  int curpage = QUEUE_POS_PAGE(pos);
1985  int curoffset = QUEUE_POS_OFFSET(pos);
1986  int slotno;
1987  int copysize;
1988 
1989  /*
1990  * We copy the data from SLRU into a local buffer, so as to avoid
1991  * holding the NotifySLRULock while we are examining the entries
1992  * and possibly transmitting them to our frontend. Copy only the
1993  * part of the page we will actually inspect.
1994  */
1995  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
1997  if (curpage == QUEUE_POS_PAGE(head))
1998  {
1999  /* we only want to read as far as head */
2000  copysize = QUEUE_POS_OFFSET(head) - curoffset;
2001  if (copysize < 0)
2002  copysize = 0; /* just for safety */
2003  }
2004  else
2005  {
2006  /* fetch all the rest of the page */
2007  copysize = QUEUE_PAGESIZE - curoffset;
2008  }
2009  memcpy(page_buffer.buf + curoffset,
2010  NotifyCtl->shared->page_buffer[slotno] + curoffset,
2011  copysize);
2012  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2013  LWLockRelease(NotifySLRULock);
2014 
2015  /*
2016  * Process messages up to the stop position, end of page, or an
2017  * uncommitted message.
2018  *
2019  * Our stop position is what we found to be the head's position
2020  * when we entered this function. It might have changed already.
2021  * But if it has, we will receive (or have already received and
2022  * queued) another signal and come here again.
2023  *
2024  * We are not holding NotifyQueueLock here! The queue can only
2025  * extend beyond the head pointer (see above) and we leave our
2026  * backend's pointer where it is so nobody will truncate or
2027  * rewrite pages under us. Especially we don't want to hold a lock
2028  * while sending the notifications to the frontend.
2029  */
2030  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2031  page_buffer.buf,
2032  snapshot);
2033  } while (!reachedStop);
2034  }
2035  PG_FINALLY();
2036  {
2037  /* Update shared state */
2038  LWLockAcquire(NotifyQueueLock, LW_SHARED);
2040  LWLockRelease(NotifyQueueLock);
2041  }
2042  PG_END_TRY();
2043 
2044  /* Done with snapshot */
2045  UnregisterSnapshot(snapshot);
2046 }
2047 
2048 /*
2049  * Fetch notifications from the shared queue, beginning at position current,
2050  * and deliver relevant ones to my frontend.
2051  *
2052  * The current page must have been fetched into page_buffer from shared
2053  * memory. (We could access the page right in shared memory, but that
2054  * would imply holding the NotifySLRULock throughout this routine.)
2055  *
2056  * We stop if we reach the "stop" position, or reach a notification from an
2057  * uncommitted transaction, or reach the end of the page.
2058  *
2059  * The function returns true once we have reached the stop position or an
2060  * uncommitted notification, and false if we have finished with the page.
2061  * In other words: once it returns true there is no need to look further.
2062  * The QueuePosition *current is advanced past all processed messages.
2063  */
2064 static bool
2066  QueuePosition stop,
2067  char *page_buffer,
2068  Snapshot snapshot)
2069 {
2070  bool reachedStop = false;
2071  bool reachedEndOfPage;
2072  AsyncQueueEntry *qe;
2073 
2074  do
2075  {
2076  QueuePosition thisentry = *current;
2077 
2078  if (QUEUE_POS_EQUAL(thisentry, stop))
2079  break;
2080 
2081  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2082 
2083  /*
2084  * Advance *current over this message, possibly to the next page. As
2085  * noted in the comments for asyncQueueReadAllNotifications, we must
2086  * do this before possibly failing while processing the message.
2087  */
2088  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2089 
2090  /* Ignore messages destined for other databases */
2091  if (qe->dboid == MyDatabaseId)
2092  {
2093  if (XidInMVCCSnapshot(qe->xid, snapshot))
2094  {
2095  /*
2096  * The source transaction is still in progress, so we can't
2097  * process this message yet. Break out of the loop, but first
2098  * back up *current so we will reprocess the message next
2099  * time. (Note: it is unlikely but not impossible for
2100  * TransactionIdDidCommit to fail, so we can't really avoid
2101  * this advance-then-back-up behavior when dealing with an
2102  * uncommitted message.)
2103  *
2104  * Note that we must test XidInMVCCSnapshot before we test
2105  * TransactionIdDidCommit, else we might return a message from
2106  * a transaction that is not yet visible to snapshots; compare
2107  * the comments at the head of heapam_visibility.c.
2108  *
2109  * Also, while our own xact won't be listed in the snapshot,
2110  * we need not check for TransactionIdIsCurrentTransactionId
2111  * because our transaction cannot (yet) have queued any
2112  * messages.
2113  */
2114  *current = thisentry;
2115  reachedStop = true;
2116  break;
2117  }
2118  else if (TransactionIdDidCommit(qe->xid))
2119  {
2120  /* qe->data is the null-terminated channel name */
2121  char *channel = qe->data;
2122 
2123  if (IsListeningOn(channel))
2124  {
2125  /* payload follows channel name */
2126  char *payload = qe->data + strlen(channel) + 1;
2127 
2128  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2129  }
2130  }
2131  else
2132  {
2133  /*
2134  * The source transaction aborted or crashed, so we just
2135  * ignore its notifications.
2136  */
2137  }
2138  }
2139 
2140  /* Loop back if we're not at end of page */
2141  } while (!reachedEndOfPage);
2142 
2143  if (QUEUE_POS_EQUAL(*current, stop))
2144  reachedStop = true;
2145 
2146  return reachedStop;
2147 }
2148 
2149 /*
2150  * Advance the shared queue tail variable to the minimum of all the
2151  * per-backend tail pointers. Truncate pg_notify space if possible.
2152  *
2153  * This is (usually) called during CommitTransaction(), so it's important for
2154  * it to have very low probability of failure.
2155  */
2156 static void
2158 {
2159  QueuePosition min;
2160  int oldtailpage;
2161  int newtailpage;
2162  int boundary;
2163 
2164  /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2165  LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2166 
2167  /*
2168  * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2169  * (ie, exactly match at least one backend's queue position), so it must
2170  * be updated atomically with the actual computation. Since v13, we could
2171  * get away with not doing it like that, but it seems prudent to keep it
2172  * so.
2173  *
2174  * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2175  * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2176  * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2177  * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2178  * there are pages we can truncate but haven't yet finished doing so.
2179  *
2180  * For concurrency's sake, we don't want to hold NotifyQueueLock while
2181  * performing SimpleLruTruncate. This is OK because no backend will try
2182  * to access the pages we are in the midst of truncating.
2183  */
2184  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2185  min = QUEUE_HEAD;
2187  {
2189  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2190  }
2191  QUEUE_TAIL = min;
2192  oldtailpage = QUEUE_STOP_PAGE;
2193  LWLockRelease(NotifyQueueLock);
2194 
2195  /*
2196  * We can truncate something if the global tail advanced across an SLRU
2197  * segment boundary.
2198  *
2199  * XXX it might be better to truncate only once every several segments, to
2200  * reduce the number of directory scans.
2201  */
2202  newtailpage = QUEUE_POS_PAGE(min);
2203  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2204  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2205  {
2206  /*
2207  * SimpleLruTruncate() will ask for NotifySLRULock but will also
2208  * release the lock again.
2209  */
2210  SimpleLruTruncate(NotifyCtl, newtailpage);
2211 
2212  /*
2213  * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
2214  * for the segment immediately prior to the old tail, allowing fresh
2215  * data into that segment.
2216  */
2217  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2218  QUEUE_STOP_PAGE = newtailpage;
2219  LWLockRelease(NotifyQueueLock);
2220  }
2221 
2222  LWLockRelease(NotifyQueueTailLock);
2223 }
2224 
2225 /*
2226  * ProcessIncomingNotify
2227  *
2228  * Scan the queue for arriving notifications and report them to the front
2229  * end. The notifications might be from other sessions, or our own;
2230  * there's no need to distinguish here.
2231  *
2232  * If "flush" is true, force any frontend messages out immediately.
2233  *
2234  * NOTE: since we are outside any transaction, we must create our own.
2235  */
2236 static void
2238 {
2239  /* We *must* reset the flag */
2240  notifyInterruptPending = false;
2241 
2242  /* Do nothing else if we aren't actively listening */
2243  if (listenChannels == NIL)
2244  return;
2245 
2246  if (Trace_notify)
2247  elog(DEBUG1, "ProcessIncomingNotify");
2248 
2249  set_ps_display("notify interrupt");
2250 
2251  /*
2252  * We must run asyncQueueReadAllNotifications inside a transaction, else
2253  * bad things happen if it gets an error.
2254  */
2256 
2258 
2260 
2261  /*
2262  * If this isn't an end-of-command case, we must flush the notify messages
2263  * to ensure frontend gets them promptly.
2264  */
2265  if (flush)
2266  pq_flush();
2267 
2268  set_ps_display("idle");
2269 
2270  if (Trace_notify)
2271  elog(DEBUG1, "ProcessIncomingNotify: done");
2272 }
2273 
2274 /*
2275  * Send NOTIFY message to my front end.
2276  */
2277 void
2278 NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
2279 {
2281  {
2283 
2285  pq_sendint32(&buf, srcPid);
2286  pq_sendstring(&buf, channel);
2287  pq_sendstring(&buf, payload);
2288  pq_endmessage(&buf);
2289 
2290  /*
2291  * NOTE: we do not do pq_flush() here. Some level of caller will
2292  * handle it later, allowing this message to be combined into a packet
2293  * with other ones.
2294  */
2295  }
2296  else
2297  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2298 }
2299 
2300 /* Does pendingNotifies include a match for the given event? */
2301 static bool
2303 {
2304  if (pendingNotifies == NULL)
2305  return false;
2306 
2307  if (pendingNotifies->hashtab != NULL)
2308  {
2309  /* Use the hash table to probe for a match */
2311  &n,
2312  HASH_FIND,
2313  NULL))
2314  return true;
2315  }
2316  else
2317  {
2318  /* Must scan the event list */
2319  ListCell *l;
2320 
2321  foreach(l, pendingNotifies->events)
2322  {
2323  Notification *oldn = (Notification *) lfirst(l);
2324 
2325  if (n->channel_len == oldn->channel_len &&
2326  n->payload_len == oldn->payload_len &&
2327  memcmp(n->data, oldn->data,
2328  n->channel_len + n->payload_len + 2) == 0)
2329  return true;
2330  }
2331  }
2332 
2333  return false;
2334 }
2335 
2336 /*
2337  * Add a notification event to a pre-existing pendingNotifies list.
2338  *
2339  * Because pendingNotifies->events is already nonempty, this works
2340  * correctly no matter what CurrentMemoryContext is.
2341  */
2342 static void
2344 {
2346 
2347  /* Create the hash table if it's time to */
2349  pendingNotifies->hashtab == NULL)
2350  {
2351  HASHCTL hash_ctl;
2352  ListCell *l;
2353 
2354  /* Create the hash table */
2355  hash_ctl.keysize = sizeof(Notification *);
2356  hash_ctl.entrysize = sizeof(NotificationHash);
2357  hash_ctl.hash = notification_hash;
2358  hash_ctl.match = notification_match;
2359  hash_ctl.hcxt = CurTransactionContext;
2361  hash_create("Pending Notifies",
2362  256L,
2363  &hash_ctl,
2365 
2366  /* Insert all the already-existing events */
2367  foreach(l, pendingNotifies->events)
2368  {
2369  Notification *oldn = (Notification *) lfirst(l);
2370  NotificationHash *hentry;
2371  bool found;
2372 
2374  &oldn,
2375  HASH_ENTER,
2376  &found);
2377  Assert(!found);
2378  hentry->event = oldn;
2379  }
2380  }
2381 
2382  /* Add new event to the list, in order */
2384 
2385  /* Add event to the hash table if needed */
2386  if (pendingNotifies->hashtab != NULL)
2387  {
2388  NotificationHash *hentry;
2389  bool found;
2390 
2392  &n,
2393  HASH_ENTER,
2394  &found);
2395  Assert(!found);
2396  hentry->event = n;
2397  }
2398 }
2399 
2400 /*
2401  * notification_hash: hash function for notification hash table
2402  *
2403  * The hash "keys" are pointers to Notification structs.
2404  */
2405 static uint32
2406 notification_hash(const void *key, Size keysize)
2407 {
2408  const Notification *k = *(const Notification *const *) key;
2409 
2410  Assert(keysize == sizeof(Notification *));
2411  /* We don't bother to include the payload's trailing null in the hash */
2412  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2413  k->channel_len + k->payload_len + 1));
2414 }
2415 
2416 /*
2417  * notification_match: match function to use with notification_hash
2418  */
2419 static int
2420 notification_match(const void *key1, const void *key2, Size keysize)
2421 {
2422  const Notification *k1 = *(const Notification *const *) key1;
2423  const Notification *k2 = *(const Notification *const *) key2;
2424 
2425  Assert(keysize == sizeof(Notification *));
2426  if (k1->channel_len == k2->channel_len &&
2427  k1->payload_len == k2->payload_len &&
2428  memcmp(k1->data, k2->data,
2429  k1->channel_len + k1->payload_len + 2) == 0)
2430  return 0; /* equal */
2431  return 1; /* not equal */
2432 }
2433 
2434 /* Clear the pendingActions and pendingNotifies lists. */
2435 static void
2437 {
2438  /*
2439  * Everything's allocated in either TopTransactionContext or the context
2440  * for the subtransaction to which it corresponds. So, there's nothing to
2441  * do here except reset the pointers; the space will be reclaimed when the
2442  * contexts are deleted.
2443  */
2444  pendingActions = NULL;
2445  pendingNotifies = NULL;
2446 }
static void SignalBackends(void)
Definition: async.c:1630
static double asyncQueueUsage(void)
Definition: async.c:1547
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:415
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1170
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1372
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
#define QUEUE_POS_MAX(x, y)
Definition: async.c:225
static bool tryAdvanceTail
Definition: async.c:440
struct QueuePosition QueuePosition
void HandleNotifyInterrupt(void)
Definition: async.c:1853
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1197
static void asyncQueueAdvanceTail(void)
Definition: async.c:2157
static void Exec_ListenPreCommit(void)
Definition: async.c:1075
static ActionList * pendingActions
Definition: async.c:370
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2406
void Async_UnlistenAll(void)
Definition: async.c:804
static SlruCtlData NotifyCtlData
Definition: async.c:311
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2278
void AtCommit_Notify(void)
Definition: async.c:1002
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:481
#define QUEUE_POS_MIN(x, y)
Definition: async.c:219
void ProcessNotifyInterrupt(bool flush)
Definition: async.c:1883
ListenActionKind
Definition: async.c:351
@ LISTEN_LISTEN
Definition: async.c:352
@ LISTEN_UNLISTEN_ALL
Definition: async.c:354
@ LISTEN_UNLISTEN
Definition: async.c:353
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2302
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2065
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2420
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:206
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2237
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1900
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:857
#define QUEUE_POS_OFFSET(x)
Definition: async.c:204
bool Trace_notify
Definition: async.c:443
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1408
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2436
static List * listenChannels
Definition: async.c:338
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition: async.c:824
Datum pg_notify(PG_FUNCTION_ARGS)
Definition: async.c:591
struct NotificationHash NotificationHash
static NotificationList * pendingNotifies
Definition: async.c:422
#define AsyncQueueEntryEmptySize
Definition: async.c:192
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2343
static AsyncQueueControl * asyncQueueControl
Definition: async.c:297
static bool unlistenExitRegistered
Definition: async.c:434
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1337
#define QUEUE_TAIL
Definition: async.c:300
#define QUEUE_MAX_PAGE
Definition: async.c:331
void AtAbort_Notify(void)
Definition: async.c:1720
#define QUEUE_POS_PAGE(x)
Definition: async.c:203
void PreCommit_Notify(void)
Definition: async.c:895
#define QUEUE_CLEANUP_DELAY
Definition: async.c:241
struct AsyncQueueControl AsyncQueueControl
static void asyncQueueFillWarning(void)
Definition: async.c:1576
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1228
struct ActionList ActionList
Size AsyncShmemSize(void)
Definition: async.c:516
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:315
void Async_Unlisten(const char *channel)
Definition: async.c:786
void Async_Listen(const char *channel)
Definition: async.c:772
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:166
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:215
#define NotifyCtl
Definition: async.c:313
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
#define QUEUEALIGN(len)
Definition: async.c:190
static bool amRegisteredListener
Definition: async.c:437
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:304
void AtSubAbort_Notify(void)
Definition: async.c:1810
struct NotificationList NotificationList
void AtPrepare_Notify(void)
Definition: async.c:870
#define QUEUE_PAGESIZE
Definition: async.c:314
void AtSubCommit_Notify(void)
Definition: async.c:1740
static bool asyncQueueIsFull(void)
Definition: async.c:1306
#define QUEUE_HEAD
Definition: async.c:299
void AsyncShmemInit(void)
Definition: async.c:533
static void asyncQueueUnregister(void)
Definition: async.c:1265
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition: async.c:1522
struct AsyncQueueEntry AsyncQueueEntry
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:212
struct Notification Notification
static bool IsListeningOn(const char *channel)
Definition: async.c:1246
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:625
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507
struct QueueBackendStatus QueueBackendStatus
#define QUEUE_STOP_PAGE
Definition: async.c:301
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int BackendId
Definition: backendid.h:21
#define InvalidBackendId
Definition: backendid.h:23
#define CStringGetTextDatum(s)
Definition: builtins.h:94
unsigned short uint16
Definition: c.h:494
unsigned int uint32
Definition: c.h:495
signed int int32
Definition: c.h:483
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:387
uint32 TransactionId
Definition: c.h:641
size_t Size
Definition: c.h:594
int64 TimestampTz
Definition: timestamp.h:39
@ DestRemote
Definition: dest.h:89
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:395
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_FINALLY(...)
Definition: elog.h:387
#define INFO
Definition: elog.h:34
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:328
int MyProcPid
Definition: globals.c:44
BackendId MyBackendId
Definition: globals.c:85
int MaxBackends
Definition: globals.c:140
struct Latch * MyLatch
Definition: globals.c:58
Oid MyDatabaseId
Definition: globals.c:89
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_COMPARE
Definition: hsearch.h:99
#define HASH_FUNCTION
Definition: hsearch.h:98
#define IsParallelWorker()
Definition: parallel.h:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:605
#define pq_flush()
Definition: libpq.h:46
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:338
List * list_concat(List *list1, const List *list2)
Definition: list.c:560
void list_free_deep(List *list)
Definition: list.c:1559
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessExclusiveLock
Definition: lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LWTRANCHE_NOTIFY_BUFFER
Definition: lwlock.h:186
@ LW_SHARED
Definition: lwlock.h:117
@ LW_EXCLUSIVE
Definition: lwlock.h:116
MemoryContext TopTransactionContext
Definition: mcxt.c:146
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void pfree(void *pointer)
Definition: mcxt.c:1456
MemoryContext TopMemoryContext
Definition: mcxt.c:141
MemoryContext CurTransactionContext
Definition: mcxt.c:147
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
void * palloc(Size size)
Definition: mcxt.c:1226
#define InvalidPid
Definition: miscadmin.h:32
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void * arg
#define NAMEDATALEN
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
static ListCell * list_head(const List *l)
Definition: pg_list.h:128
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:343
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:390
static char * buf
Definition: pg_test_fsync.c:67
CommandDest whereToSendOutput
Definition: postgres.c:88
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:198
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_NOTIFY_INTERRUPT
Definition: procsignal.h:33
#define PqMsg_NotificationResponse
Definition: protocol.h:41
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1531
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:188
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1227
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1554
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:281
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:496
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:396
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:1831
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:272
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:817
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:775
List * actions
Definition: async.c:366
int nestingLevel
Definition: async.c:365
struct ActionList * upper
Definition: async.c:367
QueuePosition tail
Definition: async.c:287
QueuePosition head
Definition: async.c:286
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:293
BackendId firstListener
Definition: async.c:291
TimestampTz lastQueueFillWarn
Definition: async.c:292
int32 srcPid
Definition: async.c:185
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:186
TransactionId xid
Definition: async.c:184
uint64 call_cntr
Definition: funcapi.h:65
Size keysize
Definition: hsearch.h:75
HashValueFunc hash
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:76
HashCompareFunc match
Definition: hsearch.h:80
MemoryContext hcxt
Definition: hsearch.h:86
Definition: dynahash.c:220
Definition: pg_list.h:54
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:360
ListenActionKind action
Definition: async.c:359
Notification * event
Definition: async.c:419
int nestingLevel
Definition: async.c:409
HTAB * hashtab
Definition: async.c:411
List * events
Definition: async.c:410
struct NotificationList * upper
Definition: async.c:412
uint16 payload_len
Definition: async.c:402
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
uint16 channel_len
Definition: async.c:401
BackendId nextListener
Definition: async.c:250
QueuePosition pos
Definition: async.c:251
int page
Definition: async.c:199
int offset
Definition: async.c:200
@ SYNC_HANDLER_NONE
Definition: sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126
#define InvalidTransactionId
Definition: transam.h:31
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:448
static void usage(const char *progname)
Definition: vacuumlo.c:414
char * text_to_cstring(const text *t)
Definition: varlena.c:215
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4834
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:914
void StartTransactionCommand(void)
Definition: xact.c:2937
void CommitTransactionCommand(void)
Definition: xact.c:3034
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:445