PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * async.c
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/async.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15/*-------------------------------------------------------------------------
16 * Async Notification Model as of v19:
17 *
18 * 1. Multiple backends on same machine. Multiple backends may be listening
19 * on each of several channels.
20 *
21 * 2. There is one central queue in disk-based storage (directory pg_notify/),
22 * with actively-used pages mapped into shared memory by the slru.c module.
23 * All notification messages are placed in the queue and later read out
24 * by listening backends. The single queue allows us to guarantee that
25 * notifications are received in commit order.
26 *
27 * Although there is only one queue, notifications are treated as being
28 * database-local; this is done by including the sender's database OID
29 * in each notification message. Listening backends ignore messages
30 * that don't match their database OID. This is important because it
31 * ensures senders and receivers have the same database encoding and won't
32 * misinterpret non-ASCII text in the channel name or payload string.
33 *
34 * Since notifications are not expected to survive database crashes,
35 * we can simply clean out the pg_notify data at any reboot, and there
36 * is no need for WAL support or fsync'ing.
37 *
38 * 3. Every backend that is listening on at least one channel registers by
39 * entering its PID into the array in AsyncQueueControl. It then scans all
40 * incoming notifications in the central queue and first compares the
41 * database OID of the notification with its own database OID and then
42 * compares the notified channel with the list of channels that it listens
43 * to. In case there is a match it delivers the notification event to its
44 * frontend. Non-matching events are simply skipped.
45 *
46 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
47 * a backend-local list which will not be processed until transaction end.
48 *
49 * Duplicate notifications from the same transaction are sent out as one
50 * notification only. This is done to save work when for example a trigger
51 * on a 2 million row table fires a notification for each row that has been
52 * changed. If the application needs to receive every single notification
53 * that has been sent, it can easily add some unique string into the extra
54 * payload parameter.
55 *
56 * When the transaction is ready to commit, PreCommit_Notify() adds the
57 * pending notifications to the head of the queue. The head pointer of the
58 * queue always points to the next free position and a position is just a
59 * page number and the offset in that page. This is done before marking the
60 * transaction as committed in clog. If we run into problems writing the
61 * notifications, we can still call elog(ERROR, ...) and the transaction
62 * will roll back safely.
63 *
64 * Once we have put all of the notifications into the queue, we return to
65 * CommitTransaction() which will then do the actual transaction commit.
66 *
67 * After commit we are called another time (AtCommit_Notify()). Here we
68 * make any required updates to the effective listen state (see below).
69 * Then we signal any backends that may be interested in our messages
70 * (including our own backend, if listening). This is done by
71 * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
72 * each relevant backend, as described below.
73 *
74 * Finally, after we are out of the transaction altogether and about to go
75 * idle, we scan the queue for messages that need to be sent to our
76 * frontend (which might be notifies from other backends, or self-notifies
77 * from our own). This step is not part of the CommitTransaction sequence
78 * for two important reasons. First, we could get errors while sending
79 * data to our frontend, and it's really bad for errors to happen in
80 * post-commit cleanup. Second, in cases where a procedure issues commits
81 * within a single frontend command, we don't want to send notifies to our
82 * frontend until the command is done; but notifies to other backends
83 * should go out immediately after each commit.
84 *
85 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
86 * sets the process's latch, which triggers the event to be processed
87 * immediately if this backend is idle (i.e., it is waiting for a frontend
88 * command and is not within a transaction block. C.f.
89 * ProcessClientReadInterrupt()). Otherwise the handler may only set a
90 * flag, which will cause the processing to occur just before we next go
91 * idle.
92 *
93 * Inbound-notify processing consists of reading all of the notifications
94 * that have arrived since scanning last time. We read every notification
95 * until we reach either a notification from an uncommitted transaction or
96 * the head pointer's position.
97 *
98 * 6. To limit disk space consumption, the tail pointer needs to be advanced
99 * so that old pages can be truncated. This is relatively expensive
100 * (notably, it requires an exclusive lock), so we don't want to do it
101 * often. We make sending backends do this work if they advanced the queue
102 * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
103 *
104 * 7. So far we have not discussed how backends change their listening state,
105 * nor how notification senders know which backends to awaken. To handle
106 * the latter, we maintain a global channel table (implemented as a dynamic
107 * shared hash table, or dshash) that maps channel names to the set of
108 * backends listening on each channel. This table is created lazily on the
109 * first LISTEN command and grows dynamically as needed. There is also a
110 * local channel table (a plain dynahash table) in each listening backend,
111 * tracking which channels that backend is listening to. The local table
112 * serves to reduce the number of accesses needed to the shared table.
113 *
114 * If the current transaction has executed any LISTEN/UNLISTEN actions,
115 * PreCommit_Notify() prepares to commit those. For LISTEN, it
116 * pre-allocates entries in both the per-backend localChannelTable and the
117 * shared globalChannelTable, marking new shared entries removeOnAbort.
118 * It also records the final per-channel intent in pendingListenActions,
119 * so post-commit/abort processing can apply that in a single step.
120 * Since all these allocations happen before committing to clog, we can
121 * safely abort the transaction on failure.
122 *
123 * After commit, AtCommit_Notify() runs through pendingListenActions and
124 * applies the final per-channel listen/unlisten state. This happens
125 * before sending signals.
126 *
127 * SignalBackends() consults the shared global channel table to identify
128 * listeners for the channels that the current transaction sent
129 * notification(s) to. Each selected backend is marked as having a wakeup
130 * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
131 * signal is sent to it.
132 *
133 * 8. While writing notifications, PreCommit_Notify() records the queue head
134 * position both before and after the write. Because all writers serialize
135 * on a cluster-wide heavyweight lock, no other backend can insert entries
136 * between these two points. SignalBackends() uses this fact to directly
137 * advance the queue pointer for any backend that is still positioned at
138 * the old head, or within the range written, but is not interested in any
139 * of our notifications. This avoids unnecessary wakeups for idle
140 * listeners that have nothing to read. Backends that are not interested
141 * in our notifications, but cannot be directly advanced, are signaled only
142 * if they are far behind the current queue head; that is to ensure that
143 * we can advance the queue tail without undue delay.
144 *
145 * An application that listens on the same channel it notifies will get
146 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
147 * by comparing be_pid in the NOTIFY message to the application's own backend's
148 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
149 * frontend during startup.) The above design guarantees that notifies from
150 * other backends will never be missed by ignoring self-notifies.
151 *
152 * The amount of shared memory used for notify management (notify_buffers)
153 * can be varied without affecting anything but performance. The maximum
154 * amount of notification data that can be queued at one time is determined
155 * by the max_notify_queue_pages GUC.
156 *-------------------------------------------------------------------------
157 */
158
159#include "postgres.h"
160
161#include <limits.h>
162#include <unistd.h>
163#include <signal.h>
164
165#include "access/parallel.h"
166#include "access/slru.h"
167#include "access/transam.h"
168#include "access/xact.h"
169#include "catalog/pg_database.h"
170#include "commands/async.h"
171#include "common/hashfn.h"
172#include "funcapi.h"
173#include "lib/dshash.h"
174#include "libpq/libpq.h"
175#include "libpq/pqformat.h"
176#include "miscadmin.h"
177#include "storage/dsm_registry.h"
178#include "storage/ipc.h"
179#include "storage/latch.h"
180#include "storage/lmgr.h"
181#include "storage/procsignal.h"
182#include "storage/subsystems.h"
183#include "tcop/tcopprot.h"
184#include "utils/builtins.h"
185#include "utils/dsa.h"
186#include "utils/guc_hooks.h"
187#include "utils/memutils.h"
188#include "utils/ps_status.h"
189#include "utils/snapmgr.h"
190#include "utils/timestamp.h"
191
192
193/*
194 * Maximum size of a NOTIFY payload, including terminating NULL. This
195 * must be kept small enough so that a notification message fits on one
196 * SLRU page. The magic fudge factor here is noncritical as long as it's
197 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
198 * than that, so changes in that data structure won't affect user-visible
199 * restrictions.
200 */
201#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
202
203/*
204 * Struct representing an entry in the global notify queue
205 *
206 * This struct declaration has the maximal length, but in a real queue entry
207 * the data area is only big enough for the actual channel and payload strings
208 * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
209 * entry size, if both channel and payload strings are empty (but note it
210 * doesn't include alignment padding).
211 *
212 * The "length" field should always be rounded up to the next QUEUEALIGN
213 * multiple so that all fields are properly aligned.
214 */
215typedef struct AsyncQueueEntry
216{
217 int length; /* total allocated length of entry */
218 Oid dboid; /* sender's database OID */
219 TransactionId xid; /* sender's XID */
220 int32 srcPid; /* sender's PID */
223
224/* Currently, no field of AsyncQueueEntry requires more than int alignment */
225#define QUEUEALIGN(len) INTALIGN(len)
226
227#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
228
229/*
230 * Struct describing a queue position, and assorted macros for working with it
231 */
232typedef struct QueuePosition
233{
234 int64 page; /* SLRU page number */
235 int offset; /* byte offset within page */
237
238#define QUEUE_POS_PAGE(x) ((x).page)
239#define QUEUE_POS_OFFSET(x) ((x).offset)
240
241#define SET_QUEUE_POS(x,y,z) \
242 do { \
243 (x).page = (y); \
244 (x).offset = (z); \
245 } while (0)
246
247#define QUEUE_POS_EQUAL(x,y) \
248 ((x).page == (y).page && (x).offset == (y).offset)
249
250#define QUEUE_POS_IS_ZERO(x) \
251 ((x).page == 0 && (x).offset == 0)
252
253/* choose logically smaller QueuePosition */
254#define QUEUE_POS_MIN(x,y) \
255 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
256 (x).page != (y).page ? (y) : \
257 (x).offset < (y).offset ? (x) : (y))
258
259/* choose logically larger QueuePosition */
260#define QUEUE_POS_MAX(x,y) \
261 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
262 (x).page != (y).page ? (x) : \
263 (x).offset > (y).offset ? (x) : (y))
264
265/* returns true if x comes before y in queue order */
266#define QUEUE_POS_PRECEDES(x,y) \
267 (asyncQueuePagePrecedes((x).page, (y).page) || \
268 ((x).page == (y).page && (x).offset < (y).offset))
269
270/*
271 * Parameter determining how often we try to advance the tail pointer:
272 * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
273 * also the distance by which a backend that's not interested in our
274 * notifications needs to be behind before we'll decide we need to wake it
275 * up so it can advance its pointer.
276 *
277 * Resist the temptation to make this really large. While that would save
278 * work in some places, it would add cost in others. In particular, this
279 * should likely be less than notify_buffers, to ensure that backends
280 * catch up before the pages they'll need to read fall out of SLRU cache.
281 */
282#define QUEUE_CLEANUP_DELAY 4
283
284/*
285 * Struct describing a listening backend's status
286 */
287typedef struct QueueBackendStatus
288{
289 int32 pid; /* either a PID or InvalidPid */
290 Oid dboid; /* backend's database OID, or InvalidOid */
291 ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
292 QueuePosition pos; /* backend has read queue up to here */
293 bool wakeupPending; /* signal sent to backend, not yet processed */
294 bool isAdvancing; /* backend is advancing its position */
296
297/*
298 * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
299 *
300 * The AsyncQueueControl structure is protected by the NotifyQueueLock and
301 * NotifyQueueTailLock.
302 *
303 * When holding NotifyQueueLock in SHARED mode, backends may only inspect
304 * their own entries as well as the head and tail pointers. Consequently we
305 * can allow a backend to update its own record while holding only SHARED lock
306 * (since no other backend will inspect it).
307 *
308 * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
309 * entries of other backends and also change the head pointer. They can
310 * also advance other backends' queue positions, unless the other backend
311 * has isAdvancing set (i.e., is in process of doing that itself).
312 *
313 * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
314 * mode, backends can change the tail pointers.
315 *
316 * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
317 * the control lock for the pg_notify SLRU buffers.
318 * In order to avoid deadlocks, whenever we need multiple locks, we first get
319 * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
320 * globalChannelTable partition locks.
321 *
322 * Each backend uses the backend[] array entry with index equal to its
323 * ProcNumber. We rely on this to make SendProcSignal fast.
324 *
325 * The backend[] array entries for actively-listening backends are threaded
326 * together using firstListener and the nextListener links, so that we can
327 * scan them without having to iterate over inactive entries. We keep this
328 * list in order by ProcNumber so that the scan is cache-friendly when there
329 * are many active entries.
330 */
331typedef struct AsyncQueueControl
332{
333 QueuePosition head; /* head points to the next free location */
334 QueuePosition tail; /* tail must be <= the queue position of every
335 * listening backend */
336 int64 stopPage; /* oldest unrecycled page; must be <=
337 * tail.page */
338 ProcNumber firstListener; /* id of first listener, or
339 * INVALID_PROC_NUMBER */
340 TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
341 dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */
342 dshash_table_handle globalChannelTableDSH; /* and its dshash handle */
343 /* Array with room for MaxBackends entries: */
346
348
349static void AsyncShmemRequest(void *arg);
350static void AsyncShmemInit(void *arg);
351
356
357
358#define QUEUE_HEAD (asyncQueueControl->head)
359#define QUEUE_TAIL (asyncQueueControl->tail)
360#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
361#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
362#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
363#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
364#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
365#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
366#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
367#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
368
369/*
370 * The SLRU buffer area through which we access the notification queue
371 */
372static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
373static int asyncQueueErrdetailForIoError(const void *opaque_data);
374
376
377
378#define NotifyCtl (&NotifySlruDesc)
379#define QUEUE_PAGESIZE BLCKSZ
380
381#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
382
383/*
384 * Global channel table definitions
385 *
386 * This hash table maps (database OID, channel name) keys to arrays of
387 * ProcNumbers representing the backends listening or about to listen
388 * on each channel. The removeOnAbort flags allow us to create hash table
389 * entries pre-commit and not have to assume that creating them post-commit
390 * will succeed.
391 */
392#define INITIAL_LISTENERS_ARRAY_SIZE 4
393
399
400typedef struct ListenerEntry
401{
402 ProcNumber procNo; /* listener's ProcNumber */
403 bool removeOnAbort; /* remove entry if current xact aborts */
405
406typedef struct GlobalChannelEntry
407{
408 GlobalChannelKey key; /* hash key */
409 dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
410 int numListeners; /* Number of listeners currently stored */
411 int allocatedListeners; /* Allocated size of array */
413
416
417/*
418 * localChannelTable caches the channel names this backend is listening on
419 * (including those we have staged to be listened on, but not yet committed).
420 * Used by IsListeningOn() for fast lookups when reading notifications.
421 */
423
424/* We test this condition to detect that we're not listening at all */
425#define LocalChannelTableIsEmpty() \
426 (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
427
428/*
429 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
430 * all actions requested in the current transaction. As explained above,
431 * we don't actually change listen state until we reach transaction commit.
432 *
433 * The list is kept in CurTransactionContext. In subtransactions, each
434 * subtransaction has its own list in its own CurTransactionContext, but
435 * successful subtransactions attach their lists to their parent's list.
436 * Failed subtransactions simply discard their lists.
437 */
444
445typedef struct
446{
448 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
450
451typedef struct ActionList
452{
453 int nestingLevel; /* current transaction nesting depth */
454 List *actions; /* list of ListenAction structs */
455 struct ActionList *upper; /* details for upper transaction levels */
457
459
460/*
461 * Hash table recording the final listen/unlisten intent per channel for
462 * the current transaction. Key is channel name, value is PENDING_LISTEN or
463 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
464 * per channel instead of replaying every action. This is built from the
465 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
466 * AtAbort_Notify.
467 */
473
474typedef struct PendingListenEntry
475{
476 char channel[NAMEDATALEN]; /* hash key */
477 PendingListenAction action; /* which action should we perform? */
479
481
482/*
483 * State for outbound notifies consists of a list of all channels+payloads
484 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
485 * until and unless the transaction commits. pendingNotifies is NULL if no
486 * NOTIFYs have been done in the current (sub) transaction.
487 *
488 * We discard duplicate notify events issued in the same transaction.
489 * Hence, in addition to the list proper (which we need to track the order
490 * of the events, since we guarantee to deliver them in order), we build a
491 * hash table which we can probe to detect duplicates. Since building the
492 * hash table is somewhat expensive, we do so only once we have at least
493 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
494 * before that we just scan the events linearly.
495 *
496 * The list is kept in CurTransactionContext. In subtransactions, each
497 * subtransaction has its own list in its own CurTransactionContext, but
498 * successful subtransactions add their entries to their parent's list.
499 * Failed subtransactions simply discard their lists. Since these lists
500 * are independent, there may be notify events in a subtransaction's list
501 * that duplicate events in some ancestor (sub) transaction; we get rid of
502 * the dups when merging the subtransaction's list into its parent's.
503 *
504 * Note: the action and notify lists do not interact within a transaction.
505 * In particular, if a transaction does NOTIFY and then LISTEN on the same
506 * condition name, it will get a self-notify at commit. This is a bit odd
507 * but is consistent with our historical behavior.
508 */
509typedef struct Notification
510{
511 uint16 channel_len; /* length of channel-name string */
512 uint16 payload_len; /* length of payload string */
513 /* null-terminated channel name, then null-terminated payload follow */
516
517typedef struct NotificationList
518{
519 int nestingLevel; /* current transaction nesting depth */
520 List *events; /* list of Notification structs */
521 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
522 List *uniqueChannelNames; /* unique channel names being notified */
523 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
524 struct NotificationList *upper; /* details for upper transaction levels */
526
527#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
528
530{
531 Notification *event; /* => the actual Notification struct */
532};
533
535
536/*
537 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
538 * (both just carry the channel name, with no payload).
539 */
540typedef struct ChannelName
541{
542 char channel[NAMEDATALEN]; /* hash key */
544
545/*
546 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
547 * called from inside a signal handler. That just sets the
548 * notifyInterruptPending flag and sets the process
549 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
550 * actually deal with the interrupt.
551 */
553
554/* True if we've registered an on_shmem_exit cleanup */
555static bool unlistenExitRegistered = false;
556
557/* True if we're currently registered as a listener in asyncQueueControl */
558static bool amRegisteredListener = false;
559
560/*
561 * Queue head positions for direct advancement.
562 * These are captured during PreCommit_Notify while holding the heavyweight
563 * lock on database 0, ensuring no other backend can insert notifications
564 * between them. SignalBackends uses these to advance idle backends.
565 */
568
569/*
570 * Workspace arrays for SignalBackends. These are preallocated in
571 * PreCommit_Notify to avoid needing memory allocation after committing to
572 * clog.
573 */
576
577/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
578static bool tryAdvanceTail = false;
579
580/* GUC parameters */
581bool Trace_notify = false;
582
583/* For 8 KB pages this gives 8 GB of disk space */
585
586/* local function prototypes */
587static inline int64 asyncQueuePageDiff(int64 p, int64 q);
588static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
589 const char *channel);
590static dshash_hash globalChannelTableHash(const void *key, size_t size,
591 void *arg);
592static void initGlobalChannelTable(void);
593static void initLocalChannelTable(void);
594static void queue_listen(ListenActionKind action, const char *channel);
595static void Async_UnlistenOnExit(int code, Datum arg);
596static void BecomeRegisteredListener(void);
597static void PrepareTableEntriesForListen(const char *channel);
598static void PrepareTableEntriesForUnlisten(const char *channel);
599static void PrepareTableEntriesForUnlistenAll(void);
602 int idx);
603static void ApplyPendingListenActions(bool isCommit);
604static void CleanupListenersOnExit(void);
605static bool IsListeningOn(const char *channel);
606static void asyncQueueUnregister(void);
607static bool asyncQueueIsFull(void);
608static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
611static double asyncQueueUsage(void);
612static void asyncQueueFillWarning(void);
613static void SignalBackends(void);
614static void asyncQueueReadAllNotifications(void);
616 QueuePosition stop,
617 Snapshot snapshot);
618static void asyncQueueAdvanceTail(void);
619static void ProcessIncomingNotify(bool flush);
622static uint32 notification_hash(const void *key, Size keysize);
623static int notification_match(const void *key1, const void *key2, Size keysize);
624static void ClearPendingActionsAndNotifies(void);
625
626static int
628{
629 const QueuePosition *position = opaque_data;
630
631 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
632 position->page, position->offset);
633}
634
635/*
636 * Compute the difference between two queue page numbers.
637 * Previously this function accounted for a wraparound.
638 */
639static inline int64
641{
642 return p - q;
643}
644
645/*
646 * Determines whether p precedes q.
647 * Previously this function accounted for a wraparound.
648 */
649static inline bool
651{
652 return p < q;
653}
654
655/*
656 * GlobalChannelKeyInit
657 * Prepare a global channel table key for hashing.
658 */
659static inline void
660GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
661{
662 memset(key, 0, sizeof(GlobalChannelKey));
663 key->dboid = dboid;
664 strlcpy(key->channel, channel, NAMEDATALEN);
665}
666
667/*
668 * globalChannelTableHash
669 * Hash function for global channel table keys.
670 */
671static dshash_hash
672globalChannelTableHash(const void *key, size_t size, void *arg)
673{
674 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
675 dshash_hash h;
676
678 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
680
681 return h;
682}
683
684/* parameters for the global channel table */
693
694/*
695 * initGlobalChannelTable
696 * Lazy initialization of the global channel table.
697 */
698static void
700{
701 MemoryContext oldcontext;
702
703 /* Quick exit if we already did this */
706 return;
707
708 /* Otherwise, use a lock to ensure only one process creates the table */
710
711 /* Be sure any local memory allocated by DSA routines is persistent */
713
715 {
716 /* Initialize dynamic shared hash table for global channels */
722 NULL);
723
724 /* Store handles in shared memory for other backends to use */
728 }
729 else if (!globalChannelTable)
730 {
731 /* Attach to existing dynamic shared hash table */
737 NULL);
738 }
739
740 MemoryContextSwitchTo(oldcontext);
742}
743
744/*
745 * initLocalChannelTable
746 * Lazy initialization of the local channel table.
747 * Once created, this table lasts for the life of the session.
748 */
749static void
751{
753
754 /* Quick exit if we already did this */
755 if (localChannelTable != NULL)
756 return;
757
758 /* Initialize local hash table for this backend's listened channels */
760 hash_ctl.entrysize = sizeof(ChannelName);
761
763 hash_create("Local Listen Channels",
764 64,
765 &hash_ctl,
767}
768
769/*
770 * initPendingListenActions
771 * Lazy initialization of the pending listen actions hash table.
772 * This is allocated in CurTransactionContext during PreCommit_Notify,
773 * and destroyed at transaction end.
774 */
775static void
777{
779
781 return;
782
784 hash_ctl.entrysize = sizeof(PendingListenEntry);
786
788 hash_create("Pending Listen Actions",
790 &hash_ctl,
792}
793
794/*
795 * Register our shared memory needs
796 */
797static void
799{
800 Size size;
801
802 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
803 size = add_size(size, offsetof(AsyncQueueControl, backend));
804
805 ShmemRequestStruct(.name = "Async Queue Control",
806 .size = size,
807 .ptr = (void **) &asyncQueueControl,
808 );
809
811 .name = "notify",
812 .Dir = "pg_notify",
813
814 /* long segment names are used in order to avoid wraparound */
815 .long_segment_names = true,
816
817 .nslots = notify_buffers,
818
819 .sync_handler = SYNC_HANDLER_NONE,
820 .PagePrecedes = asyncQueuePagePrecedes,
821 .errdetail_for_io_error = asyncQueueErrdetailForIoError,
822
823 .buffer_tranche_id = LWTRANCHE_NOTIFY_BUFFER,
824 .bank_tranche_id = LWTRANCHE_NOTIFY_SLRU,
825 );
826}
827
828static void
853
854
855/*
856 * pg_notify -
857 * SQL function to send a notification event
858 */
859Datum
861{
862 const char *channel;
863 const char *payload;
864
865 if (PG_ARGISNULL(0))
866 channel = "";
867 else
869
870 if (PG_ARGISNULL(1))
871 payload = "";
872 else
874
875 /* For NOTIFY as a statement, this is checked in ProcessUtility */
877
878 Async_Notify(channel, payload);
879
881}
882
883
884/*
885 * Async_Notify
886 *
887 * This is executed by the SQL notify command.
888 *
889 * Adds the message to the list of pending notifies.
890 * Actual notification happens during transaction commit.
891 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
892 */
893void
894Async_Notify(const char *channel, const char *payload)
895{
896 int my_level = GetCurrentTransactionNestLevel();
897 size_t channel_len;
898 size_t payload_len;
899 Notification *n;
900 MemoryContext oldcontext;
901
902 if (IsParallelWorker())
903 elog(ERROR, "cannot send notifications from a parallel worker");
904
905 if (Trace_notify)
906 elog(DEBUG1, "Async_Notify(%s)", channel);
907
908 channel_len = channel ? strlen(channel) : 0;
909 payload_len = payload ? strlen(payload) : 0;
910
911 /* a channel name must be specified */
912 if (channel_len == 0)
915 errmsg("channel name cannot be empty")));
916
917 /* enforce length limits */
918 if (channel_len >= NAMEDATALEN)
921 errmsg("channel name too long")));
922
923 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
926 errmsg("payload string too long")));
927
928 /*
929 * We must construct the Notification entry, even if we end up not using
930 * it, in order to compare it cheaply to existing list entries.
931 *
932 * The notification list needs to live until end of transaction, so store
933 * it in the transaction context.
934 */
936
938 channel_len + payload_len + 2);
939 n->channel_len = channel_len;
940 n->payload_len = payload_len;
941 strcpy(n->data, channel);
942 if (payload)
943 strcpy(n->data + channel_len + 1, payload);
944 else
945 n->data[channel_len + 1] = '\0';
946
947 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
948 {
950
951 /*
952 * First notify event in current (sub)xact. Note that we allocate the
953 * NotificationList in TopTransactionContext; the nestingLevel might
954 * get changed later by AtSubCommit_Notify.
955 */
958 sizeof(NotificationList));
959 notifies->nestingLevel = my_level;
960 notifies->events = list_make1(n);
961 /* We certainly don't need a hashtable yet */
962 notifies->hashtab = NULL;
963 /* We won't build uniqueChannelNames/Hash till later, either */
964 notifies->uniqueChannelNames = NIL;
965 notifies->uniqueChannelHash = NULL;
966 notifies->upper = pendingNotifies;
968 }
969 else
970 {
971 /* Now check for duplicates */
973 {
974 /* It's a dup, so forget it */
975 pfree(n);
976 MemoryContextSwitchTo(oldcontext);
977 return;
978 }
979
980 /* Append more events to existing list */
982 }
983
984 MemoryContextSwitchTo(oldcontext);
985}
986
987/*
988 * queue_listen
989 * Common code for listen, unlisten, unlisten all commands.
990 *
991 * Adds the request to the list of pending actions.
992 * Actual update of localChannelTable and globalChannelTable happens during
993 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
994 */
995static void
996queue_listen(ListenActionKind action, const char *channel)
997{
998 MemoryContext oldcontext;
1000 int my_level = GetCurrentTransactionNestLevel();
1001
1002 /*
1003 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1004 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1005 * final per-channel intent is computed during PreCommit_Notify.
1006 */
1008
1009 /* space for terminating null is included in sizeof(ListenAction) */
1011 strlen(channel) + 1);
1012 actrec->action = action;
1013 strcpy(actrec->channel, channel);
1014
1015 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1016 {
1017 ActionList *actions;
1018
1019 /*
1020 * First action in current sub(xact). Note that we allocate the
1021 * ActionList in TopTransactionContext; the nestingLevel might get
1022 * changed later by AtSubCommit_Notify.
1023 */
1024 actions = (ActionList *)
1026 actions->nestingLevel = my_level;
1027 actions->actions = list_make1(actrec);
1028 actions->upper = pendingActions;
1029 pendingActions = actions;
1030 }
1031 else
1033
1034 MemoryContextSwitchTo(oldcontext);
1035}
1036
1037/*
1038 * Async_Listen
1039 *
1040 * This is executed by the SQL listen command.
1041 */
1042void
1043Async_Listen(const char *channel)
1044{
1045 if (Trace_notify)
1046 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1047
1048 queue_listen(LISTEN_LISTEN, channel);
1049}
1050
1051/*
1052 * Async_Unlisten
1053 *
1054 * This is executed by the SQL unlisten command.
1055 */
1056void
1057Async_Unlisten(const char *channel)
1058{
1059 if (Trace_notify)
1060 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1061
1062 /* If we couldn't possibly be listening, no need to queue anything */
1064 return;
1065
1066 queue_listen(LISTEN_UNLISTEN, channel);
1067}
1068
1069/*
1070 * Async_UnlistenAll
1071 *
1072 * This is invoked by UNLISTEN * command, and also at backend exit.
1073 */
1074void
1076{
1077 if (Trace_notify)
1078 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1079
1080 /* If we couldn't possibly be listening, no need to queue anything */
1082 return;
1083
1085}
1086
1087/*
1088 * SQL function: return a set of the channel names this backend is actively
1089 * listening to.
1090 *
1091 * Note: this coding relies on the fact that the localChannelTable cannot
1092 * change within a transaction.
1093 */
1094Datum
1096{
1098 HASH_SEQ_STATUS *status;
1099
1100 /* stuff done only on the first call of the function */
1101 if (SRF_IS_FIRSTCALL())
1102 {
1103 /* create a function context for cross-call persistence */
1105
1106 /* Initialize hash table iteration if we have any channels */
1107 if (localChannelTable != NULL)
1108 {
1109 MemoryContext oldcontext;
1110
1111 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1112 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1114 funcctx->user_fctx = status;
1115 MemoryContextSwitchTo(oldcontext);
1116 }
1117 else
1118 {
1119 funcctx->user_fctx = NULL;
1120 }
1121 }
1122
1123 /* stuff done on every call of the function */
1125 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1126
1127 if (status != NULL)
1128 {
1129 ChannelName *entry;
1130
1131 entry = (ChannelName *) hash_seq_search(status);
1132 if (entry != NULL)
1134 }
1135
1137}
1138
1139/*
1140 * Async_UnlistenOnExit
1141 *
1142 * This is executed at backend exit if we have done any LISTENs in this
1143 * backend. It might not be necessary anymore, if the user UNLISTENed
1144 * everything, but we don't try to detect that case.
1145 */
1146static void
1152
1153/*
1154 * AtPrepare_Notify
1155 *
1156 * This is called at the prepare phase of a two-phase
1157 * transaction. Save the state for possible commit later.
1158 */
1159void
1161{
1162 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1164 ereport(ERROR,
1166 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1167}
1168
1169/*
1170 * PreCommit_Notify
1171 *
1172 * This is called at transaction commit, before actually committing to
1173 * clog.
1174 *
1175 * If there are pending LISTEN actions, make sure we are listed in the
1176 * shared-memory listener array. This must happen before commit to
1177 * ensure we don't miss any notifies from transactions that commit
1178 * just after ours.
1179 *
1180 * If there are outbound notify requests in the pendingNotifies list,
1181 * add them to the global queue. We do that before commit so that
1182 * we can still throw error if we run out of queue space.
1183 */
1184void
1186{
1187 ListCell *p;
1188
1190 return; /* no relevant statements in this xact */
1191
1192 if (Trace_notify)
1193 elog(DEBUG1, "PreCommit_Notify");
1194
1195 /* Preflight for any pending listen/unlisten actions */
1197
1198 if (pendingActions != NULL)
1199 {
1200 /* Ensure we have a local channel table */
1202 /* Create pendingListenActions hash table for this transaction */
1204
1205 /* Stage all the actions this transaction wants to perform */
1206 foreach(p, pendingActions->actions)
1207 {
1209
1210 switch (actrec->action)
1211 {
1212 case LISTEN_LISTEN:
1215 break;
1216 case LISTEN_UNLISTEN:
1218 break;
1221 break;
1222 }
1223 }
1224 }
1225
1226 /* Queue any pending notifies (must happen after the above) */
1227 if (pendingNotifies)
1228 {
1230 bool firstIteration = true;
1231
1232 /*
1233 * Build list of unique channel names being notified for use by
1234 * SignalBackends().
1235 *
1236 * If uniqueChannelHash is available, use it to efficiently get the
1237 * unique channels. Otherwise, fall back to the O(N^2) approach.
1238 */
1241 {
1242 HASH_SEQ_STATUS status;
1244
1246 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1249 channelEntry->channel);
1250 }
1251 else
1252 {
1253 /* O(N^2) approach is better for small number of notifications */
1255 {
1256 char *channel = n->data;
1257 bool found = false;
1258
1259 /* Name present in list? */
1261 {
1262 if (strcmp(oldchan, channel) == 0)
1263 {
1264 found = true;
1265 break;
1266 }
1267 }
1268 /* Add if not already in list */
1269 if (!found)
1272 channel);
1273 }
1274 }
1275
1276 /* Preallocate workspace that will be needed by SignalBackends() */
1277 if (signalPids == NULL)
1279 MaxBackends * sizeof(int32));
1280
1281 if (signalProcnos == NULL)
1283 MaxBackends * sizeof(ProcNumber));
1284
1285 /*
1286 * Make sure that we have an XID assigned to the current transaction.
1287 * GetCurrentTransactionId is cheap if we already have an XID, but not
1288 * so cheap if we don't, and we'd prefer not to do that work while
1289 * holding NotifyQueueLock.
1290 */
1292
1293 /*
1294 * Serialize writers by acquiring a special lock that we hold till
1295 * after commit. This ensures that queue entries appear in commit
1296 * order, and in particular that there are never uncommitted queue
1297 * entries ahead of committed ones, so an uncommitted transaction
1298 * can't block delivery of deliverable notifications.
1299 *
1300 * We use a heavyweight lock so that it'll automatically be released
1301 * after either commit or abort. This also allows deadlocks to be
1302 * detected, though really a deadlock shouldn't be possible here.
1303 *
1304 * The lock is on "database 0", which is pretty ugly but it doesn't
1305 * seem worth inventing a special locktag category just for this.
1306 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1307 * used by the flatfiles mechanism.)
1308 */
1311
1312 /*
1313 * For the direct advancement optimization in SignalBackends(), we
1314 * need to ensure that no other backend can insert queue entries
1315 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1316 * heavyweight lock above provides this guarantee, since it serializes
1317 * all writers.
1318 *
1319 * Note: if the heavyweight lock were ever removed for scalability
1320 * reasons, we could achieve the same guarantee by holding
1321 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1322 * than releasing and reacquiring it for each page as we do below.
1323 */
1324
1325 /* Initialize values to a safe default in case list is empty */
1328
1329 /* Now push the notifications into the queue */
1331 while (nextNotify != NULL)
1332 {
1333 /*
1334 * Add the pending notifications to the queue. We acquire and
1335 * release NotifyQueueLock once per page, which might be overkill
1336 * but it does allow readers to get in while we're doing this.
1337 *
1338 * A full queue is very uncommon and should really not happen,
1339 * given that we have so much space available in the SLRU pages.
1340 * Nevertheless we need to deal with this possibility. Note that
1341 * when we get here we are in the process of committing our
1342 * transaction, but we have not yet committed to clog, so at this
1343 * point in time we can still roll the transaction back.
1344 */
1346 if (firstIteration)
1347 {
1349 firstIteration = false;
1350 }
1352 if (asyncQueueIsFull())
1353 ereport(ERROR,
1355 errmsg("too many notifications in the NOTIFY queue")));
1359 }
1360
1361 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1362 }
1363}
1364
1365/*
1366 * AtCommit_Notify
1367 *
1368 * This is called at transaction commit, after committing to clog.
1369 *
1370 * Apply pending listen/unlisten changes and clear transaction-local state.
1371 *
1372 * If we issued any notifications in the transaction, send signals to
1373 * listening backends (possibly including ourselves) to process them.
1374 * Also, if we filled enough queue pages with new notifies, try to
1375 * advance the queue tail pointer.
1376 */
1377void
1379{
1380 /*
1381 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1382 * return as soon as possible
1383 */
1385 return;
1386
1387 if (Trace_notify)
1388 elog(DEBUG1, "AtCommit_Notify");
1389
1390 /* Apply staged listen/unlisten changes */
1392
1393 /* If no longer listening to anything, get out of listener array */
1396
1397 /*
1398 * Send signals to listening backends. We need do this only if there are
1399 * pending notifies, which were previously added to the shared queue by
1400 * PreCommit_Notify().
1401 */
1402 if (pendingNotifies != NULL)
1404
1405 /*
1406 * If it's time to try to advance the global tail pointer, do that.
1407 *
1408 * (It might seem odd to do this in the sender, when more than likely the
1409 * listeners won't yet have read the messages we just sent. However,
1410 * there's less contention if only the sender does it, and there is little
1411 * need for urgency in advancing the global tail. So this typically will
1412 * be clearing out messages that were sent some time ago.)
1413 */
1414 if (tryAdvanceTail)
1415 {
1416 tryAdvanceTail = false;
1418 }
1419
1420 /* And clean up */
1422}
1423
1424/*
1425 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1426 *
1427 * This function must make sure we are ready to catch any incoming messages.
1428 */
1429static void
1431{
1432 QueuePosition head;
1433 QueuePosition max;
1435
1436 /*
1437 * Nothing to do if we are already listening to something, nor if we
1438 * already ran this routine in this transaction.
1439 */
1441 return;
1442
1443 if (Trace_notify)
1444 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1445
1446 /*
1447 * Before registering, make sure we will unlisten before dying. (Note:
1448 * this action does not get undone if we abort later.)
1449 */
1451 {
1454 }
1455
1456 /*
1457 * This is our first LISTEN, so establish our pointer.
1458 *
1459 * We set our pointer to the global tail pointer and then move it forward
1460 * over already-committed notifications. This ensures we cannot miss any
1461 * not-yet-committed notifications. We might get a few more but that
1462 * doesn't hurt.
1463 *
1464 * In some scenarios there might be a lot of committed notifications that
1465 * have not yet been pruned away (because some backend is being lazy about
1466 * reading them). To reduce our startup time, we can look at other
1467 * backends and adopt the maximum "pos" pointer of any backend that's in
1468 * our database; any notifications it's already advanced over are surely
1469 * committed and need not be re-examined by us. (We must consider only
1470 * backends connected to our DB, because others will not have bothered to
1471 * check committed-ness of notifications in our DB.)
1472 *
1473 * We need exclusive lock here so we can look at other backends' entries
1474 * and manipulate the list links.
1475 */
1477 head = QUEUE_HEAD;
1478 max = QUEUE_TAIL;
1481 {
1483 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1484 /* Also find last listening backend before this one */
1485 if (i < MyProcNumber)
1486 prevListener = i;
1487 }
1493 /* Insert backend into list of listeners at correct position */
1495 {
1498 }
1499 else
1500 {
1503 }
1505
1506 /* Now we are listed in the global array, so remember we're listening */
1507 amRegisteredListener = true;
1508
1509 /*
1510 * Try to move our pointer forward as far as possible. This will skip
1511 * over already-committed notifications, which we want to do because they
1512 * might be quite stale. Note that we are not yet listening on anything,
1513 * so we won't deliver such notifications to our frontend. Also, although
1514 * our transaction might have executed NOTIFY, those message(s) aren't
1515 * queued yet so we won't skip them here.
1516 */
1517 if (!QUEUE_POS_EQUAL(max, head))
1519}
1520
1521/*
1522 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1523 *
1524 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1525 * an entry in localChannelTable, and pre-allocating an entry in the shared
1526 * globalChannelTable with removeOnAbort set. AtCommit_Notify will clear
1527 * removeOnAbort; abort processing will remove entries still marked so.
1528 */
1529static void
1531{
1532 GlobalChannelKey key;
1533 GlobalChannelEntry *entry;
1534 bool found;
1536 PendingListenEntry *pending;
1537
1538 /*
1539 * Record in local pending hash that we want to LISTEN, overwriting any
1540 * earlier attempt to UNLISTEN.
1541 */
1542 pending = (PendingListenEntry *)
1544 pending->action = PENDING_LISTEN;
1545
1546 /*
1547 * Ensure that there is an entry for the channel in localChannelTable.
1548 * (Should this fail, we can just roll back.) If the transaction fails
1549 * after this point, we will remove the entry if appropriate during
1550 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1551 * to return TRUE; we assume nothing is going to consult that before
1552 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1553 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1554 * present to ensure they do the right things; see
1555 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1556 */
1558
1559 /* Pre-allocate entry in shared globalChannelTable */
1560 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1561 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1562
1563 if (!found)
1564 {
1565 /* New channel entry, so initialize it to a safe state */
1567 entry->numListeners = 0;
1568 entry->allocatedListeners = 0;
1569 }
1570
1571 /*
1572 * Create listenersArray if entry doesn't have one. It's tempting to fold
1573 * this into the !found case, but this coding allows us to cope in case
1574 * dsa_allocate() failed in an earlier attempt.
1575 */
1576 if (!DsaPointerIsValid(entry->listenersArray))
1577 {
1581 }
1582
1585
1586 /*
1587 * Check if we already have a ListenerEntry (possibly from earlier in this
1588 * transaction)
1589 */
1590 for (int i = 0; i < entry->numListeners; i++)
1591 {
1592 if (listeners[i].procNo == MyProcNumber)
1593 {
1594 /* Already have an entry; leave removeOnAbort as-is */
1596 return;
1597 }
1598 }
1599
1600 /* Need to add a new entry; grow array if necessary */
1601 if (entry->numListeners >= entry->allocatedListeners)
1602 {
1603 int new_size = entry->allocatedListeners * 2;
1606 sizeof(ListenerEntry) * new_size);
1608
1610 entry->listenersArray = new_array;
1614 }
1615
1616 listeners[entry->numListeners].procNo = MyProcNumber;
1617 listeners[entry->numListeners].removeOnAbort = true;
1618 entry->numListeners++;
1619
1621}
1622
1623/*
1624 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1625 *
1626 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1627 * we're currently listening (committed or staged). We don't touch
1628 * globalChannelTable yet - the listener keeps receiving signals until
1629 * commit, when the entry is removed.
1630 */
1631static void
1633{
1634 PendingListenEntry *pending;
1635
1636 /*
1637 * If the channel name is not in localChannelTable, then we are neither
1638 * listening on it nor preparing to listen on it, so we don't need to
1639 * record an UNLISTEN action.
1640 */
1642 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1643 return;
1644
1645 /*
1646 * Record in local pending hash that we want to UNLISTEN, overwriting any
1647 * earlier attempt to LISTEN. Don't touch localChannelTable or
1648 * globalChannelTable yet - we keep receiving signals until commit.
1649 */
1650 pending = (PendingListenEntry *)
1652 pending->action = PENDING_UNLISTEN;
1653}
1654
1655/*
1656 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1657 *
1658 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1659 * about-to-be-listened channels in pendingListenActions.
1660 */
1661static void
1663{
1666 PendingListenEntry *pending;
1667
1668 /*
1669 * Scan localChannelTable, which will have the names of all channels that
1670 * we are listening on or have prepared to listen on. Record an UNLISTEN
1671 * action for each one, overwriting any earlier attempt to LISTEN.
1672 */
1674 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1675 {
1676 pending = (PendingListenEntry *)
1678 pending->action = PENDING_UNLISTEN;
1679 }
1680}
1681
1682/*
1683 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1684 *
1685 * Decrements numListeners, compacts the array, and frees the entry if empty.
1686 * Sets *entry_ptr to NULL if the entry was deleted.
1687 *
1688 * We could get the listeners pointer from the entry, but all callers
1689 * already have it at hand.
1690 */
1691static void
1694 int idx)
1695{
1696 GlobalChannelEntry *entry = *entry_ptr;
1697
1698 entry->numListeners--;
1699 if (idx < entry->numListeners)
1701 sizeof(ListenerEntry) * (entry->numListeners - idx));
1702
1703 if (entry->numListeners == 0)
1704 {
1707 /* tells caller not to release the entry's lock: */
1708 *entry_ptr = NULL;
1709 }
1710}
1711
1712/*
1713 * ApplyPendingListenActions
1714 *
1715 * Apply, or revert, staged listen/unlisten changes to the local and global
1716 * hash tables.
1717 */
1718static void
1720{
1722 PendingListenEntry *pending;
1723
1724 /* Quick exit if nothing to do */
1726 return;
1727
1728 /* We made a globalChannelTable before building pendingListenActions */
1729 if (globalChannelTable == NULL)
1730 elog(PANIC, "global channel table missing post-commit/abort");
1731
1732 /* For each staged action ... */
1734 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1735 {
1736 GlobalChannelKey key;
1737 GlobalChannelEntry *entry;
1738 bool removeLocal = true;
1739 bool foundListener = false;
1740
1741 /*
1742 * Find the global entry for this channel. If isCommit, it had better
1743 * exist (it was created in PreCommit). In an abort, it might not
1744 * exist, in which case we are not listening and should discard any
1745 * local entry that PreCommit may have managed to create.
1746 */
1747 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1748 entry = dshash_find(globalChannelTable, &key, true);
1749 if (entry != NULL)
1750 {
1751 /* Scan entry to find the ListenerEntry for this backend */
1753
1756
1757 for (int i = 0; i < entry->numListeners; i++)
1758 {
1759 if (listeners[i].procNo != MyProcNumber)
1760 continue;
1761 foundListener = true;
1762 if (isCommit)
1763 {
1764 if (pending->action == PENDING_LISTEN)
1765 {
1766 /*
1767 * LISTEN being committed: entry is now permanent.
1768 * localChannelTable entry was created during
1769 * PreCommit and should be kept.
1770 */
1771 listeners[i].removeOnAbort = false;
1772 removeLocal = false;
1773 }
1774 else
1775 {
1776 /*
1777 * UNLISTEN being committed: remove pre-allocated
1778 * entries from both tables.
1779 */
1781 }
1782 }
1783 else
1784 {
1785 /*
1786 * Note: this part is reachable only if the transaction
1787 * aborts after PreCommit_Notify() has made some
1788 * pendingListenActions entries, so it's pretty hard to
1789 * test.
1790 */
1791 if (listeners[i].removeOnAbort)
1792 {
1793 /*
1794 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1795 * so remove pre-allocated entries from both tables.
1796 */
1798 }
1799 else
1800 {
1801 /*
1802 * Entry predates this transaction, so keep the
1803 * localChannelTable entry.
1804 */
1805 removeLocal = false;
1806 }
1807 }
1808 break; /* there shouldn't be another match */
1809 }
1810
1811 /* We might have already released the entry by removing it */
1812 if (entry != NULL)
1814 }
1815
1816 /*
1817 * If we're committing a LISTEN action, we should have found a
1818 * matching ListenerEntry, but otherwise it's okay if we didn't.
1819 */
1820 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1821 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1822 pending->channel, MyProcNumber);
1823
1824 /*
1825 * If we did not find a globalChannelTable entry for our backend, or
1826 * if we are unlistening, remove any localChannelTable entry that may
1827 * exist. (Note in particular that this cleans up if we created a
1828 * localChannelTable entry and then failed while trying to create a
1829 * globalChannelTable entry.)
1830 */
1833 HASH_REMOVE, NULL);
1834 }
1835}
1836
1837/*
1838 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1839 *
1840 * Remove this backend from all channels in the shared global table.
1841 */
1842static void
1844{
1845 dshash_seq_status status;
1846 GlobalChannelEntry *entry;
1847
1848 if (Trace_notify)
1849 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1850
1851 /* Clear our local cache (not really necessary, but be consistent) */
1852 if (localChannelTable != NULL)
1853 {
1856 }
1857
1858 /* Now remove our entries from the shared globalChannelTable */
1859 if (globalChannelTable == NULL)
1860 return;
1861
1862 dshash_seq_init(&status, globalChannelTable, true);
1863 while ((entry = dshash_seq_next(&status)) != NULL)
1864 {
1866
1867 if (entry->key.dboid != MyDatabaseId)
1868 continue; /* not relevant */
1869
1872
1873 for (int i = 0; i < entry->numListeners; i++)
1874 {
1875 if (listeners[i].procNo == MyProcNumber)
1876 {
1877 entry->numListeners--;
1878 if (i < entry->numListeners)
1879 memmove(&listeners[i], &listeners[i + 1],
1880 sizeof(ListenerEntry) * (entry->numListeners - i));
1881
1882 if (entry->numListeners == 0)
1883 {
1885 dshash_delete_current(&status);
1886 }
1887 break;
1888 }
1889 }
1890 }
1891 dshash_seq_term(&status);
1892}
1893
1894/*
1895 * Test whether we are actively listening on the given channel name.
1896 *
1897 * Note: this function is executed for every notification found in the queue.
1898 */
1899static bool
1900IsListeningOn(const char *channel)
1901{
1902 if (localChannelTable == NULL)
1903 return false;
1904
1905 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1906}
1907
1908/*
1909 * Remove our entry from the listeners array when we are no longer listening
1910 * on any channel. NB: must not fail if we're already not listening.
1911 */
1912static void
1914{
1915 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1916
1917 if (!amRegisteredListener) /* nothing to do */
1918 return;
1919
1920 /*
1921 * Need exclusive lock here to manipulate list links.
1922 */
1924 /* Mark our entry as invalid */
1929 /* and remove it from the list */
1932 else
1933 {
1935 {
1937 {
1939 break;
1940 }
1941 }
1942 }
1945
1946 /* mark ourselves as no longer listed in the global array */
1947 amRegisteredListener = false;
1948}
1949
1950/*
1951 * Test whether there is room to insert more notification messages.
1952 *
1953 * Caller must hold at least shared NotifyQueueLock.
1954 */
1955static bool
1957{
1958 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1960 int64 occupied = headPage - tailPage;
1961
1963}
1964
1965/*
1966 * Advance the QueuePosition to the next entry, assuming that the current
1967 * entry is of length entryLength. If we jump to a new page the function
1968 * returns true, else false.
1969 */
1970static bool
1972{
1973 int64 pageno = QUEUE_POS_PAGE(*position);
1974 int offset = QUEUE_POS_OFFSET(*position);
1975 bool pageJump = false;
1976
1977 /*
1978 * Move to the next writing position: First jump over what we have just
1979 * written or read.
1980 */
1981 offset += entryLength;
1982 Assert(offset <= QUEUE_PAGESIZE);
1983
1984 /*
1985 * In a second step check if another entry can possibly be written to the
1986 * page. If so, stay here, we have reached the next position. If not, then
1987 * we need to move on to the next page.
1988 */
1990 {
1991 pageno++;
1992 offset = 0;
1993 pageJump = true;
1994 }
1995
1996 SET_QUEUE_POS(*position, pageno, offset);
1997 return pageJump;
1998}
1999
2000/*
2001 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
2002 */
2003static void
2005{
2006 size_t channellen = n->channel_len;
2007 size_t payloadlen = n->payload_len;
2008 int entryLength;
2009
2012
2013 /* The terminators are already included in AsyncQueueEntryEmptySize */
2016 qe->length = entryLength;
2017 qe->dboid = MyDatabaseId;
2018 qe->xid = GetCurrentTransactionId();
2019 qe->srcPid = MyProcPid;
2020 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2021}
2022
2023/*
2024 * Add pending notifications to the queue.
2025 *
2026 * We go page by page here, i.e. we stop once we have to go to a new page but
2027 * we will be called again and then fill that next page. If an entry does not
2028 * fit into the current page, we write a dummy entry with an InvalidOid as the
2029 * database OID in order to fill the page. So every page is always used up to
2030 * the last byte which simplifies reading the page later.
2031 *
2032 * We are passed the list cell (in pendingNotifies->events) containing the next
2033 * notification to write and return the first still-unwritten cell back.
2034 * Eventually we will return NULL indicating all is done.
2035 *
2036 * We are holding NotifyQueueLock already from the caller and grab
2037 * page specific SLRU bank lock locally in this function.
2038 */
2039static ListCell *
2041{
2044 int64 pageno;
2045 int offset;
2046 int slotno;
2048
2049 /*
2050 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2051 * memory upon exiting. The reason for this is that if we have to advance
2052 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2053 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2054 * subsequent insertions would try to put entries into a page that slru.c
2055 * thinks doesn't exist yet.) So, use a local position variable. Note
2056 * that if we do fail, any already-inserted queue entries are forgotten;
2057 * this is okay, since they'd be useless anyway after our transaction
2058 * rolls back.
2059 */
2061
2062 /*
2063 * If this is the first write since the postmaster started, we need to
2064 * initialize the first page of the async SLRU. Otherwise, the current
2065 * page should be initialized already, so just fetch it.
2066 */
2067 pageno = QUEUE_POS_PAGE(queue_head);
2069
2070 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2072
2075 else
2076 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2077
2078 /* Note we mark the page dirty before writing in it */
2079 NotifyCtl->shared->page_dirty[slotno] = true;
2080
2081 while (nextNotify != NULL)
2082 {
2084
2085 /* Construct a valid queue entry in local variable qe */
2087
2088 offset = QUEUE_POS_OFFSET(queue_head);
2089
2090 /* Check whether the entry really fits on the current page */
2091 if (offset + qe.length <= QUEUE_PAGESIZE)
2092 {
2093 /* OK, so advance nextNotify past this item */
2095 }
2096 else
2097 {
2098 /*
2099 * Write a dummy entry to fill up the page. Actually readers will
2100 * only check dboid and since it won't match any reader's database
2101 * OID, they will ignore this entry and move on.
2102 */
2103 qe.length = QUEUE_PAGESIZE - offset;
2104 qe.dboid = InvalidOid;
2106 qe.data[0] = '\0'; /* empty channel */
2107 qe.data[1] = '\0'; /* empty payload */
2108 }
2109
2110 /* Now copy qe into the shared buffer page */
2111 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2112 &qe,
2113 qe.length);
2114
2115 /* Advance queue_head appropriately, and detect if page is full */
2116 if (asyncQueueAdvance(&(queue_head), qe.length))
2117 {
2118 LWLock *lock;
2119
2120 pageno = QUEUE_POS_PAGE(queue_head);
2121 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2122 if (lock != prevlock)
2123 {
2126 prevlock = lock;
2127 }
2128
2129 /*
2130 * Page is full, so we're done here, but first fill the next page
2131 * with zeroes. The reason to do this is to ensure that slru.c's
2132 * idea of the head page is always the same as ours, which avoids
2133 * boundary problems in SimpleLruTruncate. The test in
2134 * asyncQueueIsFull() ensured that there is room to create this
2135 * page without overrunning the queue.
2136 */
2138
2139 /*
2140 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2141 * set flag to remember that we should try to advance the tail
2142 * pointer (we don't want to actually do that right here).
2143 */
2145 tryAdvanceTail = true;
2146
2147 /* And exit the loop */
2148 break;
2149 }
2150 }
2151
2152 /* Success, so update the global QUEUE_HEAD */
2154
2156
2157 return nextNotify;
2158}
2159
2160/*
2161 * SQL function to return the fraction of the notification queue currently
2162 * occupied.
2163 */
2164Datum
2166{
2167 double usage;
2168
2169 /* Advance the queue tail so we don't report a too-large result */
2171
2175
2177}
2178
2179/*
2180 * Return the fraction of the queue that is currently occupied.
2181 *
2182 * The caller must hold NotifyQueueLock in (at least) shared mode.
2183 *
2184 * Note: we measure the distance to the logical tail page, not the physical
2185 * tail page. In some sense that's wrong, but the relative position of the
2186 * physical tail is affected by details such as SLRU segment boundaries,
2187 * so that a result based on that is unpleasantly unstable.
2188 */
2189static double
2191{
2192 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2194 int64 occupied = headPage - tailPage;
2195
2196 if (occupied == 0)
2197 return (double) 0; /* fast exit for common case */
2198
2199 return (double) occupied / (double) max_notify_queue_pages;
2200}
2201
2202/*
2203 * Check whether the queue is at least half full, and emit a warning if so.
2204 *
2205 * This is unlikely given the size of the queue, but possible.
2206 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2207 *
2208 * Caller must hold exclusive NotifyQueueLock.
2209 */
2210static void
2212{
2213 double fillDegree;
2214 TimestampTz t;
2215
2217 if (fillDegree < 0.5)
2218 return;
2219
2220 t = GetCurrentTimestamp();
2221
2224 {
2227
2229 {
2231 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2234 }
2235
2237 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2238 (minPid != InvalidPid ?
2239 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2240 : 0),
2241 (minPid != InvalidPid ?
2242 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2243 : 0)));
2244
2246 }
2247}
2248
2249/*
2250 * Send signals to listening backends.
2251 *
2252 * Normally we signal only backends that are interested in the notifies that
2253 * we just sent. However, that will leave idle listeners falling further and
2254 * further behind. Waken them anyway if they're far enough behind, so they'll
2255 * advance their queue position pointers, allowing the global tail to advance.
2256 *
2257 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2258 *
2259 * This is called during CommitTransaction(), so it's important for it
2260 * to have very low probability of failure.
2261 */
2262static void
2264{
2265 int count;
2266
2267 /* Can't get here without PreCommit_Notify having made the global table */
2269
2270 /* It should have set up these arrays, too */
2272
2273 /*
2274 * Identify backends that we need to signal. We don't want to send
2275 * signals while holding the NotifyQueueLock, so this part just builds a
2276 * list of target PIDs in signalPids[] and signalProcnos[].
2277 */
2278 count = 0;
2279
2281
2282 /* Scan each channel name that we notified in this transaction */
2284 {
2285 GlobalChannelKey key;
2286 GlobalChannelEntry *entry;
2288
2289 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2290 entry = dshash_find(globalChannelTable, &key, false);
2291 if (entry == NULL)
2292 continue; /* nobody is listening */
2293
2295 entry->listenersArray);
2296
2297 /*
2298 * Identify listeners that now need waking, add them to arrays.
2299 *
2300 * Note that we signal listeners regardless of the state of their
2301 * removeOnAbort flags. Hence a new listener that reached PreCommit,
2302 * but then failed before AtCommit_Notify, can receive a signal even
2303 * though it was never really listening. This is okay because it will
2304 * not do anything in response to that signal. If we did not do it
2305 * like this then a new listener might miss some messages due to the
2306 * direct-advance logic below.
2307 */
2308 for (int j = 0; j < entry->numListeners; j++)
2309 {
2310 ProcNumber i = listeners[j].procNo;
2311 int32 pid;
2312 QueuePosition pos;
2313
2315 continue; /* already signaled, no need to repeat */
2316
2317 pid = QUEUE_BACKEND_PID(i);
2318 pos = QUEUE_BACKEND_POS(i);
2319
2320 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2321 continue; /* it's fully caught up already */
2322
2323 Assert(pid != InvalidPid);
2324
2326 signalPids[count] = pid;
2327 signalProcnos[count] = i;
2328 count++;
2329 }
2330
2332 }
2333
2334 /*
2335 * Scan all listeners. Any that are not already pending wakeup must not
2336 * be interested in our notifications (else we'd have set their wakeup
2337 * flags above). Check to see if we can directly advance their queue
2338 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2339 * them anyway so they will catch up.
2340 */
2342 {
2343 int32 pid;
2344 QueuePosition pos;
2345
2347 continue;
2348
2349 /* If it's currently advancing, we should not touch it */
2351 continue;
2352
2353 pid = QUEUE_BACKEND_PID(i);
2354 pos = QUEUE_BACKEND_POS(i);
2355
2356 /*
2357 * We can directly advance the other backend's queue pointer if it's
2358 * not currently advancing (else there are race conditions), and its
2359 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2360 * it miss some older messages), and we'd not be moving the pointer
2361 * backward.
2362 */
2365 {
2366 /* We can directly advance its pointer past what we wrote */
2368 }
2371 {
2372 /* It's idle and far behind, so wake it up */
2373 Assert(pid != InvalidPid);
2374
2376 signalPids[count] = pid;
2377 signalProcnos[count] = i;
2378 count++;
2379 }
2380 }
2381
2383
2384 /* Now send signals */
2385 for (int i = 0; i < count; i++)
2386 {
2387 int32 pid = signalPids[i];
2388
2389 /*
2390 * If we are signaling our own process, no need to involve the kernel;
2391 * just set the flag directly.
2392 */
2393 if (pid == MyProcPid)
2394 {
2396 continue;
2397 }
2398
2399 /*
2400 * Note: assuming things aren't broken, a signal failure here could
2401 * only occur if the target backend exited since we released
2402 * NotifyQueueLock; which is unlikely but certainly possible. So we
2403 * just log a low-level debug message if it happens.
2404 */
2406 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2407 }
2408}
2409
2410/*
2411 * AtAbort_Notify
2412 *
2413 * This is called at transaction abort.
2414 *
2415 * Revert any staged listen/unlisten changes and clean up transaction state.
2416 * This only does anything if we abort after PreCommit_Notify has staged
2417 * some entries.
2418 */
2419void
2421{
2422 /* Revert staged listen/unlisten changes */
2424
2425 /* If we're no longer listening on anything, unregister */
2428
2429 /* And clean up */
2431}
2432
2433/*
2434 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2435 *
2436 * Reassign all items in the pending lists to the parent transaction.
2437 */
2438void
2440{
2441 int my_level = GetCurrentTransactionNestLevel();
2442
2443 /* If there are actions at our nesting level, we must reparent them. */
2444 if (pendingActions != NULL &&
2445 pendingActions->nestingLevel >= my_level)
2446 {
2447 if (pendingActions->upper == NULL ||
2448 pendingActions->upper->nestingLevel < my_level - 1)
2449 {
2450 /* nothing to merge; give the whole thing to the parent */
2452 }
2453 else
2454 {
2456
2458
2459 /*
2460 * Mustn't try to eliminate duplicates here --- see queue_listen()
2461 */
2464 childPendingActions->actions);
2466 }
2467 }
2468
2469 /* If there are notifies at our nesting level, we must reparent them. */
2470 if (pendingNotifies != NULL &&
2471 pendingNotifies->nestingLevel >= my_level)
2472 {
2473 Assert(pendingNotifies->nestingLevel == my_level);
2474
2475 if (pendingNotifies->upper == NULL ||
2476 pendingNotifies->upper->nestingLevel < my_level - 1)
2477 {
2478 /* nothing to merge; give the whole thing to the parent */
2480 }
2481 else
2482 {
2483 /*
2484 * Formerly, we didn't bother to eliminate duplicates here, but
2485 * now we must, else we fall foul of "Assert(!found)", either here
2486 * or during a later attempt to build the parent-level hashtable.
2487 */
2489 ListCell *l;
2490
2492 /* Insert all the subxact's events into parent, except for dups */
2493 foreach(l, childPendingNotifies->events)
2494 {
2496
2499 }
2501 }
2502 }
2503}
2504
2505/*
2506 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2507 */
2508void
2510{
2511 int my_level = GetCurrentTransactionNestLevel();
2512
2513 /*
2514 * All we have to do is pop the stack --- the actions/notifies made in
2515 * this subxact are no longer interesting, and the space will be freed
2516 * when CurTransactionContext is recycled. We still have to free the
2517 * ActionList and NotificationList objects themselves, though, because
2518 * those are allocated in TopTransactionContext.
2519 *
2520 * Note that there might be no entries at all, or no entries for the
2521 * current subtransaction level, either because none were ever created, or
2522 * because we reentered this routine due to trouble during subxact abort.
2523 */
2524 while (pendingActions != NULL &&
2525 pendingActions->nestingLevel >= my_level)
2526 {
2528
2531 }
2532
2533 while (pendingNotifies != NULL &&
2534 pendingNotifies->nestingLevel >= my_level)
2535 {
2537
2540 }
2541}
2542
2543/*
2544 * HandleNotifyInterrupt
2545 *
2546 * Signal handler portion of interrupt handling. Let the backend know
2547 * that there's a pending notify interrupt. If we're currently reading
2548 * from the client, this will interrupt the read and
2549 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2550 */
2551void
2553{
2554 /*
2555 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2556 * you do here.
2557 */
2558
2559 /* signal that work needs to be done */
2561
2562 /* latch will be set by procsignal_sigusr1_handler */
2563}
2564
2565/*
2566 * ProcessNotifyInterrupt
2567 *
2568 * This is called if we see notifyInterruptPending set, just before
2569 * transmitting ReadyForQuery at the end of a frontend command, and
2570 * also if a notify signal occurs while reading from the frontend.
2571 * HandleNotifyInterrupt() will cause the read to be interrupted
2572 * via the process's latch, and this routine will get called.
2573 * If we are truly idle (ie, *not* inside a transaction block),
2574 * process the incoming notifies.
2575 *
2576 * If "flush" is true, force any frontend messages out immediately.
2577 * This can be false when being called at the end of a frontend command,
2578 * since we'll flush after sending ReadyForQuery.
2579 */
2580void
2582{
2584 return; /* not really idle */
2585
2586 /* Loop in case another signal arrives while sending messages */
2588 ProcessIncomingNotify(flush);
2589}
2590
2591
2592/*
2593 * Read all pending notifications from the queue, and deliver appropriate
2594 * ones to my frontend. Stop when we reach queue head or an uncommitted
2595 * notification.
2596 */
2597static void
2599{
2600 QueuePosition pos;
2601 QueuePosition head;
2602 Snapshot snapshot;
2603
2604 /*
2605 * Fetch current state, indicate to others that we have woken up, and that
2606 * we are in process of advancing our position.
2607 */
2609 /* Assert checks that we have a valid state entry */
2613 head = QUEUE_HEAD;
2614
2615 if (QUEUE_POS_EQUAL(pos, head))
2616 {
2617 /* Nothing to do, we have read all notifications already. */
2619 return;
2620 }
2621
2624
2625 /*----------
2626 * Get snapshot we'll use to decide which xacts are still in progress.
2627 * This is trickier than it might seem, because of race conditions.
2628 * Consider the following example:
2629 *
2630 * Backend 1: Backend 2:
2631 *
2632 * transaction starts
2633 * UPDATE foo SET ...;
2634 * NOTIFY foo;
2635 * commit starts
2636 * queue the notify message
2637 * transaction starts
2638 * LISTEN foo; -- first LISTEN in session
2639 * SELECT * FROM foo WHERE ...;
2640 * commit to clog
2641 * commit starts
2642 * add backend 2 to array of listeners
2643 * advance to queue head (this code)
2644 * commit to clog
2645 *
2646 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2647 * wasn't committed yet. Ideally we'd ensure that client 2 would
2648 * eventually get transaction 1's notify message, but there's no way
2649 * to do that; until we're in the listener array, there's no guarantee
2650 * that the notify message doesn't get removed from the queue.
2651 *
2652 * Therefore the coding technique transaction 2 is using is unsafe:
2653 * applications must commit a LISTEN before inspecting database state,
2654 * if they want to ensure they will see notifications about subsequent
2655 * changes to that state.
2656 *
2657 * What we do guarantee is that we'll see all notifications from
2658 * transactions committing after the snapshot we take here.
2659 * BecomeRegisteredListener has already added us to the listener array,
2660 * so no not-yet-committed messages can be removed from the queue
2661 * before we see them.
2662 *----------
2663 */
2664 snapshot = RegisterSnapshot(GetLatestSnapshot());
2665
2666 /*
2667 * It is possible that we fail while trying to send a message to our
2668 * frontend (for example, because of encoding conversion failure). If
2669 * that happens it is critical that we not try to send the same message
2670 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2671 * ERRORs to FATAL, causing the client connection to be closed on error.
2672 *
2673 * We used to only skip over the offending message and try to soldier on,
2674 * but it was somewhat questionable to lose a notification and give the
2675 * client an ERROR instead. A client application is not be prepared for
2676 * that and can't tell that a notification was missed. It was also not
2677 * very useful in practice because notifications are often processed while
2678 * a connection is idle and reading a message from the client, and in that
2679 * state, any error is upgraded to FATAL anyway. Closing the connection
2680 * is a clear signal to the application that it might have missed
2681 * notifications.
2682 */
2683 {
2685 bool reachedStop;
2686
2687 ExitOnAnyError = true;
2688
2689 do
2690 {
2691 /*
2692 * Process messages up to the stop position, end of page, or an
2693 * uncommitted message.
2694 *
2695 * Our stop position is what we found to be the head's position
2696 * when we entered this function. It might have changed already.
2697 * But if it has, we will receive (or have already received and
2698 * queued) another signal and come here again.
2699 *
2700 * We are not holding NotifyQueueLock here! The queue can only
2701 * extend beyond the head pointer (see above) and we leave our
2702 * backend's pointer where it is so nobody will truncate or
2703 * rewrite pages under us. Especially we don't want to hold a lock
2704 * while sending the notifications to the frontend.
2705 */
2706 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2707 } while (!reachedStop);
2708
2709 /* Update shared state */
2714
2716 }
2717
2718 /* Done with snapshot */
2719 UnregisterSnapshot(snapshot);
2720}
2721
2722/*
2723 * Fetch notifications from the shared queue, beginning at position current,
2724 * and deliver relevant ones to my frontend.
2725 *
2726 * The function returns true once we have reached the stop position or an
2727 * uncommitted notification, and false if we have finished with the page.
2728 * In other words: once it returns true there is no need to look further.
2729 * The QueuePosition *current is advanced past all processed messages.
2730 */
2731static bool
2733 QueuePosition stop,
2734 Snapshot snapshot)
2735{
2736 int64 curpage = QUEUE_POS_PAGE(*current);
2737 int slotno;
2738 char *page_buffer;
2739 bool reachedStop = false;
2740 bool reachedEndOfPage;
2741
2742 /*
2743 * We copy the entries into a local buffer to avoid holding the SLRU lock
2744 * while we transmit them to our frontend. The local buffer must be
2745 * adequately aligned.
2746 */
2748 char *local_buf_end = local_buf;
2749
2751 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2752
2753 do
2754 {
2755 QueuePosition thisentry = *current;
2757
2758 if (QUEUE_POS_EQUAL(thisentry, stop))
2759 break;
2760
2761 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2762
2763 /*
2764 * Advance *current over this message, possibly to the next page. As
2765 * noted in the comments for asyncQueueReadAllNotifications, we must
2766 * do this before possibly failing while processing the message.
2767 */
2768 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2769
2770 /* Ignore messages destined for other databases */
2771 if (qe->dboid == MyDatabaseId)
2772 {
2773 if (XidInMVCCSnapshot(qe->xid, snapshot))
2774 {
2775 /*
2776 * The source transaction is still in progress, so we can't
2777 * process this message yet. Break out of the loop, but first
2778 * back up *current so we will reprocess the message next
2779 * time. (Note: it is unlikely but not impossible for
2780 * TransactionIdDidCommit to fail, so we can't really avoid
2781 * this advance-then-back-up behavior when dealing with an
2782 * uncommitted message.)
2783 *
2784 * Note that we must test XidInMVCCSnapshot before we test
2785 * TransactionIdDidCommit, else we might return a message from
2786 * a transaction that is not yet visible to snapshots; compare
2787 * the comments at the head of heapam_visibility.c.
2788 *
2789 * Also, while our own xact won't be listed in the snapshot,
2790 * we need not check for TransactionIdIsCurrentTransactionId
2791 * because our transaction cannot (yet) have queued any
2792 * messages.
2793 */
2794 *current = thisentry;
2795 reachedStop = true;
2796 break;
2797 }
2798
2799 /*
2800 * Quick check for the case that we're not listening on any
2801 * channels, before calling TransactionIdDidCommit(). This makes
2802 * that case a little faster, but more importantly, it ensures
2803 * that if there's a bad entry in the queue for which
2804 * TransactionIdDidCommit() fails for some reason, we can skip
2805 * over it on the first LISTEN in a session, and not get stuck on
2806 * it indefinitely. (This is a little trickier than it looks: it
2807 * works because BecomeRegisteredListener runs this code before we
2808 * have made the first entry in localChannelTable.)
2809 */
2811 continue;
2812
2813 if (TransactionIdDidCommit(qe->xid))
2814 {
2815 memcpy(local_buf_end, qe, qe->length);
2816 local_buf_end += qe->length;
2817 }
2818 else
2819 {
2820 /*
2821 * The source transaction aborted or crashed, so we just
2822 * ignore its notifications.
2823 */
2824 }
2825 }
2826
2827 /* Loop back if we're not at end of page */
2828 } while (!reachedEndOfPage);
2829
2830 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2832
2833 /*
2834 * Now that we have let go of the SLRU bank lock, send the notifications
2835 * to our backend
2836 */
2838 for (char *p = local_buf; p < local_buf_end;)
2839 {
2841
2842 /* qe->data is the null-terminated channel name */
2843 char *channel = qe->data;
2844
2845 if (IsListeningOn(channel))
2846 {
2847 /* payload follows channel name */
2848 char *payload = qe->data + strlen(channel) + 1;
2849
2850 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2851 }
2852
2853 p += qe->length;
2854 }
2855
2856 if (QUEUE_POS_EQUAL(*current, stop))
2857 reachedStop = true;
2858
2859 return reachedStop;
2860}
2861
2862/*
2863 * Advance the shared queue tail variable to the minimum of all the
2864 * per-backend tail pointers. Truncate pg_notify space if possible.
2865 *
2866 * This is (usually) called during CommitTransaction(), so it's important for
2867 * it to have very low probability of failure.
2868 */
2869static void
2871{
2872 QueuePosition min;
2875 int64 boundary;
2876
2877 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2879
2880 /*
2881 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2882 * (ie, exactly match at least one backend's queue position), so it must
2883 * be updated atomically with the actual computation. Since v13, we could
2884 * get away with not doing it like that, but it seems prudent to keep it
2885 * so.
2886 *
2887 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2888 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2889 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2890 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2891 * there are pages we can truncate but haven't yet finished doing so.
2892 *
2893 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2894 * performing SimpleLruTruncate. This is OK because no backend will try
2895 * to access the pages we are in the midst of truncating.
2896 */
2898 min = QUEUE_HEAD;
2900 {
2902 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2903 }
2904 QUEUE_TAIL = min;
2907
2908 /*
2909 * We can truncate something if the global tail advanced across an SLRU
2910 * segment boundary.
2911 *
2912 * XXX it might be better to truncate only once every several segments, to
2913 * reduce the number of directory scans.
2914 */
2917 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2918 {
2919 /*
2920 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2921 * release the lock again.
2922 */
2924
2928 }
2929
2931}
2932
2933/*
2934 * AsyncNotifyFreezeXids
2935 *
2936 * Prepare the async notification queue for CLOG truncation by freezing
2937 * transaction IDs that are about to become inaccessible.
2938 *
2939 * This function is called by VACUUM before advancing datfrozenxid. It scans
2940 * the notification queue and replaces XIDs that would become inaccessible
2941 * after CLOG truncation with special markers:
2942 * - Committed transactions are set to FrozenTransactionId
2943 * - Aborted/crashed transactions are set to InvalidTransactionId
2944 *
2945 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2946 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2947 * (or it would have held back newFrozenXid through ProcArray).
2948 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2949 * either aborted explicitly or crashed, and we can safely mark it invalid.
2950 */
2951void
2953{
2954 QueuePosition pos;
2955 QueuePosition head;
2956 int64 curpage = -1;
2957 int slotno = -1;
2958 char *page_buffer = NULL;
2959 bool page_dirty = false;
2960
2961 /*
2962 * Acquire locks in the correct order to avoid deadlocks. As per the
2963 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2964 * bank locks.
2965 *
2966 * We only need SHARED mode since we're just reading the head/tail
2967 * positions, not modifying them.
2968 */
2971
2972 pos = QUEUE_TAIL;
2973 head = QUEUE_HEAD;
2974
2975 /* Release NotifyQueueLock early, we only needed to read the positions */
2977
2978 /*
2979 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2980 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2981 * we're working.
2982 */
2983 while (!QUEUE_POS_EQUAL(pos, head))
2984 {
2986 TransactionId xid;
2987 int64 pageno = QUEUE_POS_PAGE(pos);
2988 int offset = QUEUE_POS_OFFSET(pos);
2989
2990 /* If we need a different page, release old lock and get new one */
2991 if (pageno != curpage)
2992 {
2993 LWLock *lock;
2994
2995 /* Release previous page if any */
2996 if (slotno >= 0)
2997 {
2998 if (page_dirty)
2999 {
3000 NotifyCtl->shared->page_dirty[slotno] = true;
3001 page_dirty = false;
3002 }
3004 }
3005
3006 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3008 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3009 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3010 curpage = pageno;
3011 }
3012
3013 qe = (AsyncQueueEntry *) (page_buffer + offset);
3014 xid = qe->xid;
3015
3016 if (TransactionIdIsNormal(xid) &&
3018 {
3019 if (TransactionIdDidCommit(xid))
3020 {
3021 qe->xid = FrozenTransactionId;
3022 page_dirty = true;
3023 }
3024 else
3025 {
3026 qe->xid = InvalidTransactionId;
3027 page_dirty = true;
3028 }
3029 }
3030
3031 /* Advance to next entry */
3032 asyncQueueAdvance(&pos, qe->length);
3033 }
3034
3035 /* Release final page lock if we acquired one */
3036 if (slotno >= 0)
3037 {
3038 if (page_dirty)
3039 NotifyCtl->shared->page_dirty[slotno] = true;
3041 }
3042
3044}
3045
3046/*
3047 * ProcessIncomingNotify
3048 *
3049 * Scan the queue for arriving notifications and report them to the front
3050 * end. The notifications might be from other sessions, or our own;
3051 * there's no need to distinguish here.
3052 *
3053 * If "flush" is true, force any frontend messages out immediately.
3054 *
3055 * NOTE: since we are outside any transaction, we must create our own.
3056 */
3057static void
3059{
3060 /* We *must* reset the flag */
3061 notifyInterruptPending = false;
3062
3063 /* Do nothing else if we aren't actively listening */
3065 return;
3066
3067 if (Trace_notify)
3068 elog(DEBUG1, "ProcessIncomingNotify");
3069
3070 set_ps_display("notify interrupt");
3071
3072 /*
3073 * We must run asyncQueueReadAllNotifications inside a transaction, else
3074 * bad things happen if it gets an error.
3075 */
3077
3079
3081
3082 /*
3083 * If this isn't an end-of-command case, we must flush the notify messages
3084 * to ensure frontend gets them promptly.
3085 */
3086 if (flush)
3087 pq_flush();
3088
3089 set_ps_display("idle");
3090
3091 if (Trace_notify)
3092 elog(DEBUG1, "ProcessIncomingNotify: done");
3093}
3094
3095/*
3096 * Send NOTIFY message to my front end.
3097 */
3098void
3099NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3100{
3102 {
3104
3106 pq_sendint32(&buf, srcPid);
3107 pq_sendstring(&buf, channel);
3108 pq_sendstring(&buf, payload);
3110
3111 /*
3112 * NOTE: we do not do pq_flush() here. Some level of caller will
3113 * handle it later, allowing this message to be combined into a packet
3114 * with other ones.
3115 */
3116 }
3117 else
3118 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3119}
3120
3121/* Does pendingNotifies include a match for the given event? */
3122static bool
3124{
3125 if (pendingNotifies == NULL)
3126 return false;
3127
3129 {
3130 /* Use the hash table to probe for a match */
3132 &n,
3133 HASH_FIND,
3134 NULL))
3135 return true;
3136 }
3137 else
3138 {
3139 /* Must scan the event list */
3140 ListCell *l;
3141
3142 foreach(l, pendingNotifies->events)
3143 {
3145
3146 if (n->channel_len == oldn->channel_len &&
3147 n->payload_len == oldn->payload_len &&
3148 memcmp(n->data, oldn->data,
3149 n->channel_len + n->payload_len + 2) == 0)
3150 return true;
3151 }
3152 }
3153
3154 return false;
3155}
3156
3157/*
3158 * Add a notification event to a pre-existing pendingNotifies list.
3159 *
3160 * Because pendingNotifies->events is already nonempty, this works
3161 * correctly no matter what CurrentMemoryContext is.
3162 */
3163static void
3165{
3167
3168 /* Create the hash tables if it's time to */
3171 {
3173 ListCell *l;
3174
3175 /* Create the hash table */
3176 hash_ctl.keysize = sizeof(Notification *);
3177 hash_ctl.entrysize = sizeof(struct NotificationHash);
3182 hash_create("Pending Notifies",
3183 256L,
3184 &hash_ctl,
3186
3187 /* Create the unique channel name table */
3189 hash_ctl.keysize = NAMEDATALEN;
3190 hash_ctl.entrysize = sizeof(ChannelName);
3193 hash_create("Pending Notify Channel Names",
3194 64L,
3195 &hash_ctl,
3197
3198 /* Insert all the already-existing events */
3199 foreach(l, pendingNotifies->events)
3200 {
3202 char *channel = oldn->data;
3203 bool found;
3204
3206 &oldn,
3207 HASH_ENTER,
3208 &found);
3209 Assert(!found);
3210
3211 /* Add channel name to uniqueChannelHash; might be there already */
3213 channel,
3214 HASH_ENTER,
3215 NULL);
3216 }
3217 }
3218
3219 /* Add new event to the list, in order */
3221
3222 /* Add event to the hash tables if needed */
3224 {
3225 char *channel = n->data;
3226 bool found;
3227
3229 &n,
3230 HASH_ENTER,
3231 &found);
3232 Assert(!found);
3233
3234 /* Add channel name to uniqueChannelHash; might be there already */
3236 channel,
3237 HASH_ENTER,
3238 NULL);
3239 }
3240}
3241
3242/*
3243 * notification_hash: hash function for notification hash table
3244 *
3245 * The hash "keys" are pointers to Notification structs.
3246 */
3247static uint32
3248notification_hash(const void *key, Size keysize)
3249{
3250 const Notification *k = *(const Notification *const *) key;
3251
3252 Assert(keysize == sizeof(Notification *));
3253 /* We don't bother to include the payload's trailing null in the hash */
3254 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3255 k->channel_len + k->payload_len + 1));
3256}
3257
3258/*
3259 * notification_match: match function to use with notification_hash
3260 */
3261static int
3262notification_match(const void *key1, const void *key2, Size keysize)
3263{
3264 const Notification *k1 = *(const Notification *const *) key1;
3265 const Notification *k2 = *(const Notification *const *) key2;
3266
3267 Assert(keysize == sizeof(Notification *));
3268 if (k1->channel_len == k2->channel_len &&
3269 k1->payload_len == k2->payload_len &&
3270 memcmp(k1->data, k2->data,
3271 k1->channel_len + k1->payload_len + 2) == 0)
3272 return 0; /* equal */
3273 return 1; /* not equal */
3274}
3275
3276/* Clear the pendingActions and pendingNotifies lists. */
3277static void
3279{
3280 /*
3281 * Everything's allocated in either TopTransactionContext or the context
3282 * for the subtransaction to which it corresponds. So, there's nothing to
3283 * do here except reset the pointers; the space will be reclaimed when the
3284 * contexts are deleted.
3285 */
3288 /* Also clear pendingListenActions, which is derived from pendingActions */
3290}
3291
3292/*
3293 * GUC check_hook for notify_buffers
3294 */
3295bool
3297{
3298 return check_slru_buffers("notify_buffers", newval);
3299}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:263
static void SignalBackends(void)
Definition async.c:2263
static double asyncQueueUsage(void)
Definition async.c:2190
#define MIN_HASHABLE_NOTIFIES
Definition async.c:527
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1530
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:2004
#define QUEUE_FIRST_LISTENER
Definition async.c:361
#define QUEUE_POS_MAX(x, y)
Definition async.c:260
static bool tryAdvanceTail
Definition async.c:578
void HandleNotifyInterrupt(void)
Definition async.c:2552
static void BecomeRegisteredListener(void)
Definition async.c:1430
static void asyncQueueAdvanceTail(void)
Definition async.c:2870
int max_notify_queue_pages
Definition async.c:584
static ActionList * pendingActions
Definition async.c:458
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1719
const ShmemCallbacks AsyncShmemCallbacks
Definition async.c:352
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:367
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3248
void Async_UnlistenAll(void)
Definition async.c:1075
static int32 * signalPids
Definition async.c:574
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3099
void AtCommit_Notify(void)
Definition async.c:1378
#define QUEUE_POS_MIN(x, y)
Definition async.c:254
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1632
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2581
ListenActionKind
Definition async.c:439
@ LISTEN_LISTEN
Definition async.c:440
@ LISTEN_UNLISTEN_ALL
Definition async.c:442
@ LISTEN_UNLISTEN
Definition async.c:441
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3123
#define QUEUE_BACKEND_POS(i)
Definition async.c:365
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:685
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:392
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3262
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:672
#define SET_QUEUE_POS(x, y, z)
Definition async.c:241
static ProcNumber * signalProcnos
Definition async.c:575
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2732
static void ProcessIncomingNotify(bool flush)
Definition async.c:3058
static void asyncQueueReadAllNotifications(void)
Definition async.c:2598
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1147
#define QUEUE_POS_OFFSET(x)
Definition async.c:239
static QueuePosition queueHeadAfterWrite
Definition async.c:567
static int asyncQueueErrdetailForIoError(const void *opaque_data)
Definition async.c:627
bool Trace_notify
Definition async.c:581
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2040
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3278
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1095
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:860
static NotificationList * pendingNotifies
Definition async.c:534
#define AsyncQueueEntryEmptySize
Definition async.c:227
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3164
static AsyncQueueControl * asyncQueueControl
Definition async.c:347
static bool unlistenExitRegistered
Definition async.c:555
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:650
static dsa_area * globalChannelDSA
Definition async.c:415
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1971
#define QUEUE_TAIL
Definition async.c:359
void AtAbort_Notify(void)
Definition async.c:2420
#define QUEUE_POS_PAGE(x)
Definition async.c:238
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1692
void PreCommit_Notify(void)
Definition async.c:1185
#define QUEUE_CLEANUP_DELAY
Definition async.c:282
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1662
static void asyncQueueFillWarning(void)
Definition async.c:2211
#define QUEUE_BACKEND_PID(i)
Definition async.c:362
static SlruDesc NotifySlruDesc
Definition async.c:375
static void AsyncShmemRequest(void *arg)
Definition async.c:798
static void CleanupListenersOnExit(void)
Definition async.c:1843
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:660
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:381
void Async_Unlisten(const char *channel)
Definition async.c:1057
static HTAB * pendingListenActions
Definition async.c:480
void Async_Listen(const char *channel)
Definition async.c:1043
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:201
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:250
static void initGlobalChannelTable(void)
Definition async.c:699
#define NotifyCtl
Definition async.c:378
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:366
static HTAB * localChannelTable
Definition async.c:422
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:640
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:996
#define QUEUEALIGN(len)
Definition async.c:225
static bool amRegisteredListener
Definition async.c:558
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:266
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:364
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:363
void AtSubAbort_Notify(void)
Definition async.c:2509
void AtPrepare_Notify(void)
Definition async.c:1160
#define QUEUE_PAGESIZE
Definition async.c:379
void AtSubCommit_Notify(void)
Definition async.c:2439
static bool asyncQueueIsFull(void)
Definition async.c:1956
#define QUEUE_HEAD
Definition async.c:358
static void AsyncShmemInit(void *arg)
Definition async.c:829
static void initLocalChannelTable(void)
Definition async.c:750
PendingListenAction
Definition async.c:469
@ PENDING_UNLISTEN
Definition async.c:471
@ PENDING_LISTEN
Definition async.c:470
static dshash_table * globalChannelTable
Definition async.c:414
static void asyncQueueUnregister(void)
Definition async.c:1913
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2165
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:247
#define LocalChannelTableIsEmpty()
Definition async.c:425
static void initPendingListenActions(void)
Definition async.c:776
static QueuePosition queueHeadBeforeWrite
Definition async.c:566
static bool IsListeningOn(const char *channel)
Definition async.c:1900
void Async_Notify(const char *channel, const char *payload)
Definition async.c:894
volatile sig_atomic_t notifyInterruptPending
Definition async.c:552
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2952
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3296
#define QUEUE_STOP_PAGE
Definition async.c:360
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1789
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
int32_t int32
Definition c.h:620
uint16_t uint16
Definition c.h:623
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
dsm_handle dsa_handle
Definition dsa.h:136
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:611
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:562
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:579
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:659
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:394
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:371
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:274
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:768
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:678
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:210
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:593
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:778
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
dsa_pointer dshash_table_handle
Definition dshash.h:24
uint32 dshash_hash
Definition dshash.h:30
#define dshash_find_or_insert(hash_table, key, found)
Definition dshash.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
void hash_destroy(HTAB *hashp)
Definition dynahash.c:802
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1317
Datum arg
Definition elog.c:1323
int errcode(int sqlerrcode)
Definition elog.c:875
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:29
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define INFO
Definition elog.h:35
#define ereport(elevel,...)
Definition elog.h:152
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:49
ProcNumber MyProcNumber
Definition globals.c:92
int MaxBackends
Definition globals.c:149
bool ExitOnAnyError
Definition globals.c:125
int notify_buffers
Definition globals.c:167
Oid MyDatabaseId
Definition globals.c:96
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:91
@ HASH_FIND
Definition hsearch.h:108
@ HASH_REMOVE
Definition hsearch.h:110
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_CONTEXT
Definition hsearch.h:97
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_COMPARE
Definition hsearch.h:94
#define HASH_FUNCTION
Definition hsearch.h:93
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1235
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
MemoryContext TopTransactionContext
Definition mcxt.c:172
void pfree(void *pointer)
Definition mcxt.c:1619
MemoryContext TopMemoryContext
Definition mcxt.c:167
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
void * palloc(Size size)
Definition mcxt.c:1390
MemoryContext CurTransactionContext
Definition mcxt.c:173
#define InvalidPid
Definition miscadmin.h:32
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:375
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:97
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:296
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define ShmemRequestStruct(...)
Definition shmem.h:176
bool SlruScanDirectory(SlruDesc *ctl, SlruScanCallback callback, void *data)
Definition slru.c:1844
int SimpleLruReadPage_ReadOnly(SlruDesc *ctl, int64 pageno, const void *opaque_data)
Definition slru.c:654
void SimpleLruTruncate(SlruDesc *ctl, int64 cutoffPage)
Definition slru.c:1458
int SimpleLruZeroPage(SlruDesc *ctl, int64 pageno)
Definition slru.c:397
bool SlruScanDirCbDeleteAll(SlruDesc *ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1797
int SimpleLruReadPage(SlruDesc *ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:550
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:377
#define SimpleLruRequest(...)
Definition slru.h:218
static LWLock * SimpleLruGetBankLock(SlruDesc *ctl, int64 pageno)
Definition slru.h:207
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:454
int nestingLevel
Definition async.c:453
struct ActionList * upper
Definition async.c:455
ProcNumber firstListener
Definition async.c:338
QueuePosition tail
Definition async.c:334
QueuePosition head
Definition async.c:333
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:344
dshash_table_handle globalChannelTableDSH
Definition async.c:342
TimestampTz lastQueueFillWarn
Definition async.c:340
dsa_handle globalChannelTableDSA
Definition async.c:341
int32 srcPid
Definition async.c:220
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:221
TransactionId xid
Definition async.c:219
char channel[NAMEDATALEN]
Definition async.c:542
dsa_pointer listenersArray
Definition async.c:409
int allocatedListeners
Definition async.c:411
GlobalChannelKey key
Definition async.c:408
char channel[NAMEDATALEN]
Definition async.c:397
Size keysize
Definition hsearch.h:69
Definition pg_list.h:54
ListenActionKind action
Definition async.c:447
ProcNumber procNo
Definition async.c:402
bool removeOnAbort
Definition async.c:403
Notification * event
Definition async.c:531
List * uniqueChannelNames
Definition async.c:522
HTAB * uniqueChannelHash
Definition async.c:523
HTAB * hashtab
Definition async.c:521
List * events
Definition async.c:520
struct NotificationList * upper
Definition async.c:524
uint16 payload_len
Definition async.c:512
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:514
uint16 channel_len
Definition async.c:511
PendingListenAction action
Definition async.c:477
char channel[NAMEDATALEN]
Definition async.c:476
ProcNumber nextListener
Definition async.c:291
QueuePosition pos
Definition async.c:292
int64 page
Definition async.c:234
ShmemRequestCallback request_fn
Definition shmem.h:133
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:446
char * text_to_cstring(const text *t)
Definition varlena.c:217
const char * name
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040
int GetCurrentTransactionNestLevel(void)
Definition xact.c:931
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207
TransactionId GetCurrentTransactionId(void)
Definition xact.c:456