PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * async.c
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/async.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15/*-------------------------------------------------------------------------
16 * Async Notification Model as of v19:
17 *
18 * 1. Multiple backends on same machine. Multiple backends may be listening
19 * on each of several channels.
20 *
21 * 2. There is one central queue in disk-based storage (directory pg_notify/),
22 * with actively-used pages mapped into shared memory by the slru.c module.
23 * All notification messages are placed in the queue and later read out
24 * by listening backends. The single queue allows us to guarantee that
25 * notifications are received in commit order.
26 *
27 * Although there is only one queue, notifications are treated as being
28 * database-local; this is done by including the sender's database OID
29 * in each notification message. Listening backends ignore messages
30 * that don't match their database OID. This is important because it
31 * ensures senders and receivers have the same database encoding and won't
32 * misinterpret non-ASCII text in the channel name or payload string.
33 *
34 * Since notifications are not expected to survive database crashes,
35 * we can simply clean out the pg_notify data at any reboot, and there
36 * is no need for WAL support or fsync'ing.
37 *
38 * 3. Every backend that is listening on at least one channel registers by
39 * entering its PID into the array in AsyncQueueControl. It then scans all
40 * incoming notifications in the central queue and first compares the
41 * database OID of the notification with its own database OID and then
42 * compares the notified channel with the list of channels that it listens
43 * to. In case there is a match it delivers the notification event to its
44 * frontend. Non-matching events are simply skipped.
45 *
46 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
47 * a backend-local list which will not be processed until transaction end.
48 *
49 * Duplicate notifications from the same transaction are sent out as one
50 * notification only. This is done to save work when for example a trigger
51 * on a 2 million row table fires a notification for each row that has been
52 * changed. If the application needs to receive every single notification
53 * that has been sent, it can easily add some unique string into the extra
54 * payload parameter.
55 *
56 * When the transaction is ready to commit, PreCommit_Notify() adds the
57 * pending notifications to the head of the queue. The head pointer of the
58 * queue always points to the next free position and a position is just a
59 * page number and the offset in that page. This is done before marking the
60 * transaction as committed in clog. If we run into problems writing the
61 * notifications, we can still call elog(ERROR, ...) and the transaction
62 * will roll back safely.
63 *
64 * Once we have put all of the notifications into the queue, we return to
65 * CommitTransaction() which will then do the actual transaction commit.
66 *
67 * After commit we are called another time (AtCommit_Notify()). Here we
68 * make any required updates to the effective listen state (see below).
69 * Then we signal any backends that may be interested in our messages
70 * (including our own backend, if listening). This is done by
71 * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
72 * each relevant backend, as described below.
73 *
74 * Finally, after we are out of the transaction altogether and about to go
75 * idle, we scan the queue for messages that need to be sent to our
76 * frontend (which might be notifies from other backends, or self-notifies
77 * from our own). This step is not part of the CommitTransaction sequence
78 * for two important reasons. First, we could get errors while sending
79 * data to our frontend, and it's really bad for errors to happen in
80 * post-commit cleanup. Second, in cases where a procedure issues commits
81 * within a single frontend command, we don't want to send notifies to our
82 * frontend until the command is done; but notifies to other backends
83 * should go out immediately after each commit.
84 *
85 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
86 * sets the process's latch, which triggers the event to be processed
87 * immediately if this backend is idle (i.e., it is waiting for a frontend
88 * command and is not within a transaction block. C.f.
89 * ProcessClientReadInterrupt()). Otherwise the handler may only set a
90 * flag, which will cause the processing to occur just before we next go
91 * idle.
92 *
93 * Inbound-notify processing consists of reading all of the notifications
94 * that have arrived since scanning last time. We read every notification
95 * until we reach either a notification from an uncommitted transaction or
96 * the head pointer's position.
97 *
98 * 6. To limit disk space consumption, the tail pointer needs to be advanced
99 * so that old pages can be truncated. This is relatively expensive
100 * (notably, it requires an exclusive lock), so we don't want to do it
101 * often. We make sending backends do this work if they advanced the queue
102 * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
103 *
104 * 7. So far we have not discussed how backends change their listening state,
105 * nor how notification senders know which backends to awaken. To handle
106 * the latter, we maintain a global channel table (implemented as a dynamic
107 * shared hash table, or dshash) that maps channel names to the set of
108 * backends listening on each channel. This table is created lazily on the
109 * first LISTEN command and grows dynamically as needed. There is also a
110 * local channel table (a plain dynahash table) in each listening backend,
111 * tracking which channels that backend is listening to. The local table
112 * serves to reduce the number of accesses needed to the shared table.
113 *
114 * If the current transaction has executed any LISTEN/UNLISTEN actions,
115 * PreCommit_Notify() prepares to commit those. For LISTEN, it
116 * pre-allocates entries in both the per-backend localChannelTable and the
117 * shared globalChannelTable (with listening=false so that these entries
118 * are no-ops for the moment). It also records the final per-channel
119 * intent in pendingListenActions, so post-commit/abort processing can
120 * apply that in a single step. Since all these allocations happen before
121 * committing to clog, we can safely abort the transaction on failure.
122 *
123 * After commit, AtCommit_Notify() runs through pendingListenActions and
124 * updates the backend's per-channel listening flags to activate or
125 * deactivate listening. This happens before sending signals.
126 *
127 * SignalBackends() consults the shared global channel table to identify
128 * listeners for the channels that the current transaction sent
129 * notification(s) to. Each selected backend is marked as having a wakeup
130 * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
131 * signal is sent to it.
132 *
133 * 8. While writing notifications, PreCommit_Notify() records the queue head
134 * position both before and after the write. Because all writers serialize
135 * on a cluster-wide heavyweight lock, no other backend can insert entries
136 * between these two points. SignalBackends() uses this fact to directly
137 * advance the queue pointer for any backend that is still positioned at
138 * the old head, or within the range written, but is not interested in any
139 * of our notifications. This avoids unnecessary wakeups for idle
140 * listeners that have nothing to read. Backends that are not interested
141 * in our notifications, but cannot be directly advanced, are signaled only
142 * if they are far behind the current queue head; that is to ensure that
143 * we can advance the queue tail without undue delay.
144 *
145 * An application that listens on the same channel it notifies will get
146 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
147 * by comparing be_pid in the NOTIFY message to the application's own backend's
148 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
149 * frontend during startup.) The above design guarantees that notifies from
150 * other backends will never be missed by ignoring self-notifies.
151 *
152 * The amount of shared memory used for notify management (notify_buffers)
153 * can be varied without affecting anything but performance. The maximum
154 * amount of notification data that can be queued at one time is determined
155 * by the max_notify_queue_pages GUC.
156 *-------------------------------------------------------------------------
157 */
158
159#include "postgres.h"
160
161#include <limits.h>
162#include <unistd.h>
163#include <signal.h>
164
165#include "access/parallel.h"
166#include "access/slru.h"
167#include "access/transam.h"
168#include "access/xact.h"
169#include "catalog/pg_database.h"
170#include "commands/async.h"
171#include "common/hashfn.h"
172#include "funcapi.h"
173#include "lib/dshash.h"
174#include "libpq/libpq.h"
175#include "libpq/pqformat.h"
176#include "miscadmin.h"
177#include "storage/dsm_registry.h"
178#include "storage/ipc.h"
179#include "storage/latch.h"
180#include "storage/lmgr.h"
181#include "storage/procsignal.h"
182#include "tcop/tcopprot.h"
183#include "utils/builtins.h"
184#include "utils/dsa.h"
185#include "utils/guc_hooks.h"
186#include "utils/memutils.h"
187#include "utils/ps_status.h"
188#include "utils/snapmgr.h"
189#include "utils/timestamp.h"
190
191
192/*
193 * Maximum size of a NOTIFY payload, including terminating NULL. This
194 * must be kept small enough so that a notification message fits on one
195 * SLRU page. The magic fudge factor here is noncritical as long as it's
196 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
197 * than that, so changes in that data structure won't affect user-visible
198 * restrictions.
199 */
200#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
201
202/*
203 * Struct representing an entry in the global notify queue
204 *
205 * This struct declaration has the maximal length, but in a real queue entry
206 * the data area is only big enough for the actual channel and payload strings
207 * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
208 * entry size, if both channel and payload strings are empty (but note it
209 * doesn't include alignment padding).
210 *
211 * The "length" field should always be rounded up to the next QUEUEALIGN
212 * multiple so that all fields are properly aligned.
213 */
214typedef struct AsyncQueueEntry
215{
216 int length; /* total allocated length of entry */
217 Oid dboid; /* sender's database OID */
218 TransactionId xid; /* sender's XID */
219 int32 srcPid; /* sender's PID */
222
223/* Currently, no field of AsyncQueueEntry requires more than int alignment */
224#define QUEUEALIGN(len) INTALIGN(len)
225
226#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
227
228/*
229 * Struct describing a queue position, and assorted macros for working with it
230 */
231typedef struct QueuePosition
232{
233 int64 page; /* SLRU page number */
234 int offset; /* byte offset within page */
236
237#define QUEUE_POS_PAGE(x) ((x).page)
238#define QUEUE_POS_OFFSET(x) ((x).offset)
239
240#define SET_QUEUE_POS(x,y,z) \
241 do { \
242 (x).page = (y); \
243 (x).offset = (z); \
244 } while (0)
245
246#define QUEUE_POS_EQUAL(x,y) \
247 ((x).page == (y).page && (x).offset == (y).offset)
248
249#define QUEUE_POS_IS_ZERO(x) \
250 ((x).page == 0 && (x).offset == 0)
251
252/* choose logically smaller QueuePosition */
253#define QUEUE_POS_MIN(x,y) \
254 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
255 (x).page != (y).page ? (y) : \
256 (x).offset < (y).offset ? (x) : (y))
257
258/* choose logically larger QueuePosition */
259#define QUEUE_POS_MAX(x,y) \
260 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
261 (x).page != (y).page ? (x) : \
262 (x).offset > (y).offset ? (x) : (y))
263
264/* returns true if x comes before y in queue order */
265#define QUEUE_POS_PRECEDES(x,y) \
266 (asyncQueuePagePrecedes((x).page, (y).page) || \
267 ((x).page == (y).page && (x).offset < (y).offset))
268
269/*
270 * Parameter determining how often we try to advance the tail pointer:
271 * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
272 * also the distance by which a backend that's not interested in our
273 * notifications needs to be behind before we'll decide we need to wake it
274 * up so it can advance its pointer.
275 *
276 * Resist the temptation to make this really large. While that would save
277 * work in some places, it would add cost in others. In particular, this
278 * should likely be less than notify_buffers, to ensure that backends
279 * catch up before the pages they'll need to read fall out of SLRU cache.
280 */
281#define QUEUE_CLEANUP_DELAY 4
282
283/*
284 * Struct describing a listening backend's status
285 */
286typedef struct QueueBackendStatus
287{
288 int32 pid; /* either a PID or InvalidPid */
289 Oid dboid; /* backend's database OID, or InvalidOid */
290 ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
291 QueuePosition pos; /* backend has read queue up to here */
292 bool wakeupPending; /* signal sent to backend, not yet processed */
293 bool isAdvancing; /* backend is advancing its position */
295
296/*
297 * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
298 *
299 * The AsyncQueueControl structure is protected by the NotifyQueueLock and
300 * NotifyQueueTailLock.
301 *
302 * When holding NotifyQueueLock in SHARED mode, backends may only inspect
303 * their own entries as well as the head and tail pointers. Consequently we
304 * can allow a backend to update its own record while holding only SHARED lock
305 * (since no other backend will inspect it).
306 *
307 * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
308 * entries of other backends and also change the head pointer. They can
309 * also advance other backends' queue positions, unless the other backend
310 * has isAdvancing set (i.e., is in process of doing that itself).
311 *
312 * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
313 * mode, backends can change the tail pointers.
314 *
315 * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
316 * the control lock for the pg_notify SLRU buffers.
317 * In order to avoid deadlocks, whenever we need multiple locks, we first get
318 * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
319 * globalChannelTable partition locks.
320 *
321 * Each backend uses the backend[] array entry with index equal to its
322 * ProcNumber. We rely on this to make SendProcSignal fast.
323 *
324 * The backend[] array entries for actively-listening backends are threaded
325 * together using firstListener and the nextListener links, so that we can
326 * scan them without having to iterate over inactive entries. We keep this
327 * list in order by ProcNumber so that the scan is cache-friendly when there
328 * are many active entries.
329 */
330typedef struct AsyncQueueControl
331{
332 QueuePosition head; /* head points to the next free location */
333 QueuePosition tail; /* tail must be <= the queue position of every
334 * listening backend */
335 int64 stopPage; /* oldest unrecycled page; must be <=
336 * tail.page */
337 ProcNumber firstListener; /* id of first listener, or
338 * INVALID_PROC_NUMBER */
339 TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
340 dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */
341 dshash_table_handle globalChannelTableDSH; /* and its dshash handle */
342 /* Array with room for MaxBackends entries: */
345
347
348#define QUEUE_HEAD (asyncQueueControl->head)
349#define QUEUE_TAIL (asyncQueueControl->tail)
350#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
351#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
352#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
353#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
354#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
355#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
356#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
357#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
358
359/*
360 * The SLRU buffer area through which we access the notification queue
361 */
363
364#define NotifyCtl (&NotifyCtlData)
365#define QUEUE_PAGESIZE BLCKSZ
366
367#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
368
369/*
370 * Global channel table definitions
371 *
372 * This hash table maps (database OID, channel name) keys to arrays of
373 * ProcNumbers representing the backends listening or about to listen
374 * on each channel. The "listening" flags allow us to create hash table
375 * entries pre-commit and not have to assume that creating them post-commit
376 * will succeed.
377 */
378#define INITIAL_LISTENERS_ARRAY_SIZE 4
379
385
386typedef struct ListenerEntry
387{
388 ProcNumber procNo; /* listener's ProcNumber */
389 bool listening; /* true if committed listener */
391
392typedef struct GlobalChannelEntry
393{
394 GlobalChannelKey key; /* hash key */
395 dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
396 int numListeners; /* Number of listeners currently stored */
397 int allocatedListeners; /* Allocated size of array */
399
402
403/*
404 * localChannelTable caches the channel names this backend is listening on
405 * (including those we have staged to be listened on, but not yet committed).
406 * Used by IsListeningOn() for fast lookups when reading notifications.
407 */
409
410/* We test this condition to detect that we're not listening at all */
411#define LocalChannelTableIsEmpty() \
412 (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
413
414/*
415 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
416 * all actions requested in the current transaction. As explained above,
417 * we don't actually change listen state until we reach transaction commit.
418 *
419 * The list is kept in CurTransactionContext. In subtransactions, each
420 * subtransaction has its own list in its own CurTransactionContext, but
421 * successful subtransactions attach their lists to their parent's list.
422 * Failed subtransactions simply discard their lists.
423 */
430
431typedef struct
432{
434 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
436
437typedef struct ActionList
438{
439 int nestingLevel; /* current transaction nesting depth */
440 List *actions; /* list of ListenAction structs */
441 struct ActionList *upper; /* details for upper transaction levels */
443
445
446/*
447 * Hash table recording the final listen/unlisten intent per channel for
448 * the current transaction. Key is channel name, value is PENDING_LISTEN or
449 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
450 * per channel instead of replaying every action. This is built from the
451 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
452 * AtAbort_Notify.
453 */
459
460typedef struct PendingListenEntry
461{
462 char channel[NAMEDATALEN]; /* hash key */
463 PendingListenAction action; /* which action should we perform? */
465
467
468/*
469 * State for outbound notifies consists of a list of all channels+payloads
470 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
471 * until and unless the transaction commits. pendingNotifies is NULL if no
472 * NOTIFYs have been done in the current (sub) transaction.
473 *
474 * We discard duplicate notify events issued in the same transaction.
475 * Hence, in addition to the list proper (which we need to track the order
476 * of the events, since we guarantee to deliver them in order), we build a
477 * hash table which we can probe to detect duplicates. Since building the
478 * hash table is somewhat expensive, we do so only once we have at least
479 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
480 * before that we just scan the events linearly.
481 *
482 * The list is kept in CurTransactionContext. In subtransactions, each
483 * subtransaction has its own list in its own CurTransactionContext, but
484 * successful subtransactions add their entries to their parent's list.
485 * Failed subtransactions simply discard their lists. Since these lists
486 * are independent, there may be notify events in a subtransaction's list
487 * that duplicate events in some ancestor (sub) transaction; we get rid of
488 * the dups when merging the subtransaction's list into its parent's.
489 *
490 * Note: the action and notify lists do not interact within a transaction.
491 * In particular, if a transaction does NOTIFY and then LISTEN on the same
492 * condition name, it will get a self-notify at commit. This is a bit odd
493 * but is consistent with our historical behavior.
494 */
495typedef struct Notification
496{
497 uint16 channel_len; /* length of channel-name string */
498 uint16 payload_len; /* length of payload string */
499 /* null-terminated channel name, then null-terminated payload follow */
502
503typedef struct NotificationList
504{
505 int nestingLevel; /* current transaction nesting depth */
506 List *events; /* list of Notification structs */
507 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
508 List *uniqueChannelNames; /* unique channel names being notified */
509 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
510 struct NotificationList *upper; /* details for upper transaction levels */
512
513#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
514
516{
517 Notification *event; /* => the actual Notification struct */
518};
519
521
522/*
523 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
524 * (both just carry the channel name, with no payload).
525 */
526typedef struct ChannelName
527{
528 char channel[NAMEDATALEN]; /* hash key */
530
531/*
532 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
533 * called from inside a signal handler. That just sets the
534 * notifyInterruptPending flag and sets the process
535 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
536 * actually deal with the interrupt.
537 */
539
540/* True if we've registered an on_shmem_exit cleanup */
541static bool unlistenExitRegistered = false;
542
543/* True if we're currently registered as a listener in asyncQueueControl */
544static bool amRegisteredListener = false;
545
546/*
547 * Queue head positions for direct advancement.
548 * These are captured during PreCommit_Notify while holding the heavyweight
549 * lock on database 0, ensuring no other backend can insert notifications
550 * between them. SignalBackends uses these to advance idle backends.
551 */
554
555/*
556 * Workspace arrays for SignalBackends. These are preallocated in
557 * PreCommit_Notify to avoid needing memory allocation after committing to
558 * clog.
559 */
562
563/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
564static bool tryAdvanceTail = false;
565
566/* GUC parameters */
567bool Trace_notify = false;
568
569/* For 8 KB pages this gives 8 GB of disk space */
571
572/* local function prototypes */
573static int asyncQueueErrdetailForIoError(const void *opaque_data);
574static inline int64 asyncQueuePageDiff(int64 p, int64 q);
575static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
576static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
577 const char *channel);
578static dshash_hash globalChannelTableHash(const void *key, size_t size,
579 void *arg);
580static void initGlobalChannelTable(void);
581static void initLocalChannelTable(void);
582static void queue_listen(ListenActionKind action, const char *channel);
583static void Async_UnlistenOnExit(int code, Datum arg);
584static void BecomeRegisteredListener(void);
585static void PrepareTableEntriesForListen(const char *channel);
586static void PrepareTableEntriesForUnlisten(const char *channel);
587static void PrepareTableEntriesForUnlistenAll(void);
590 int idx);
591static void ApplyPendingListenActions(bool isCommit);
592static void CleanupListenersOnExit(void);
593static bool IsListeningOn(const char *channel);
594static void asyncQueueUnregister(void);
595static bool asyncQueueIsFull(void);
596static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
599static double asyncQueueUsage(void);
600static void asyncQueueFillWarning(void);
601static void SignalBackends(void);
602static void asyncQueueReadAllNotifications(void);
604 QueuePosition stop,
605 Snapshot snapshot);
606static void asyncQueueAdvanceTail(void);
607static void ProcessIncomingNotify(bool flush);
610static uint32 notification_hash(const void *key, Size keysize);
611static int notification_match(const void *key1, const void *key2, Size keysize);
612static void ClearPendingActionsAndNotifies(void);
613
614static int
616{
617 const QueuePosition *position = opaque_data;
618
619 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
620 position->page, position->offset);
621}
622
623/*
624 * Compute the difference between two queue page numbers.
625 * Previously this function accounted for a wraparound.
626 */
627static inline int64
629{
630 return p - q;
631}
632
633/*
634 * Determines whether p precedes q.
635 * Previously this function accounted for a wraparound.
636 */
637static inline bool
639{
640 return p < q;
641}
642
643/*
644 * GlobalChannelKeyInit
645 * Prepare a global channel table key for hashing.
646 */
647static inline void
648GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
649{
650 memset(key, 0, sizeof(GlobalChannelKey));
651 key->dboid = dboid;
652 strlcpy(key->channel, channel, NAMEDATALEN);
653}
654
655/*
656 * globalChannelTableHash
657 * Hash function for global channel table keys.
658 */
659static dshash_hash
660globalChannelTableHash(const void *key, size_t size, void *arg)
661{
662 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
663 dshash_hash h;
664
666 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
668
669 return h;
670}
671
672/* parameters for the global channel table */
681
682/*
683 * initGlobalChannelTable
684 * Lazy initialization of the global channel table.
685 */
686static void
688{
689 MemoryContext oldcontext;
690
691 /* Quick exit if we already did this */
694 return;
695
696 /* Otherwise, use a lock to ensure only one process creates the table */
698
699 /* Be sure any local memory allocated by DSA routines is persistent */
701
703 {
704 /* Initialize dynamic shared hash table for global channels */
710 NULL);
711
712 /* Store handles in shared memory for other backends to use */
716 }
717 else if (!globalChannelTable)
718 {
719 /* Attach to existing dynamic shared hash table */
725 NULL);
726 }
727
728 MemoryContextSwitchTo(oldcontext);
730}
731
732/*
733 * initLocalChannelTable
734 * Lazy initialization of the local channel table.
735 * Once created, this table lasts for the life of the session.
736 */
737static void
739{
741
742 /* Quick exit if we already did this */
743 if (localChannelTable != NULL)
744 return;
745
746 /* Initialize local hash table for this backend's listened channels */
748 hash_ctl.entrysize = sizeof(ChannelName);
749
751 hash_create("Local Listen Channels",
752 64,
753 &hash_ctl,
755}
756
757/*
758 * initPendingListenActions
759 * Lazy initialization of the pending listen actions hash table.
760 * This is allocated in CurTransactionContext during PreCommit_Notify,
761 * and destroyed at transaction end.
762 */
763static void
765{
767
769 return;
770
772 hash_ctl.entrysize = sizeof(PendingListenEntry);
774
776 hash_create("Pending Listen Actions",
778 &hash_ctl,
780}
781
782/*
783 * Report space needed for our shared memory area
784 */
785Size
787{
788 Size size;
789
790 /* This had better match AsyncShmemInit */
791 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
792 size = add_size(size, offsetof(AsyncQueueControl, backend));
793
795
796 return size;
797}
798
799/*
800 * Initialize our shared memory area
801 */
802void
804{
805 bool found;
806 Size size;
807
808 /*
809 * Create or attach to the AsyncQueueControl structure.
810 */
811 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
812 size = add_size(size, offsetof(AsyncQueueControl, backend));
813
815 ShmemInitStruct("Async Queue Control", size, &found);
816
817 if (!found)
818 {
819 /* First time through, so initialize it */
822 QUEUE_STOP_PAGE = 0;
827 for (int i = 0; i < MaxBackends; i++)
828 {
835 }
836 }
837
838 /*
839 * Set up SLRU management of the pg_notify data. Note that long segment
840 * names are used in order to avoid wraparound.
841 */
842 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
843 NotifyCtl->errdetail_for_io_error = asyncQueueErrdetailForIoError;
846 SYNC_HANDLER_NONE, true);
847
848 if (!found)
849 {
850 /*
851 * During start or reboot, clean out the pg_notify directory.
852 */
854 }
855}
856
857
858/*
859 * pg_notify -
860 * SQL function to send a notification event
861 */
862Datum
864{
865 const char *channel;
866 const char *payload;
867
868 if (PG_ARGISNULL(0))
869 channel = "";
870 else
872
873 if (PG_ARGISNULL(1))
874 payload = "";
875 else
877
878 /* For NOTIFY as a statement, this is checked in ProcessUtility */
880
881 Async_Notify(channel, payload);
882
884}
885
886
887/*
888 * Async_Notify
889 *
890 * This is executed by the SQL notify command.
891 *
892 * Adds the message to the list of pending notifies.
893 * Actual notification happens during transaction commit.
894 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
895 */
896void
897Async_Notify(const char *channel, const char *payload)
898{
899 int my_level = GetCurrentTransactionNestLevel();
900 size_t channel_len;
901 size_t payload_len;
902 Notification *n;
903 MemoryContext oldcontext;
904
905 if (IsParallelWorker())
906 elog(ERROR, "cannot send notifications from a parallel worker");
907
908 if (Trace_notify)
909 elog(DEBUG1, "Async_Notify(%s)", channel);
910
911 channel_len = channel ? strlen(channel) : 0;
912 payload_len = payload ? strlen(payload) : 0;
913
914 /* a channel name must be specified */
915 if (channel_len == 0)
918 errmsg("channel name cannot be empty")));
919
920 /* enforce length limits */
921 if (channel_len >= NAMEDATALEN)
924 errmsg("channel name too long")));
925
926 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
929 errmsg("payload string too long")));
930
931 /*
932 * We must construct the Notification entry, even if we end up not using
933 * it, in order to compare it cheaply to existing list entries.
934 *
935 * The notification list needs to live until end of transaction, so store
936 * it in the transaction context.
937 */
939
941 channel_len + payload_len + 2);
942 n->channel_len = channel_len;
943 n->payload_len = payload_len;
944 strcpy(n->data, channel);
945 if (payload)
946 strcpy(n->data + channel_len + 1, payload);
947 else
948 n->data[channel_len + 1] = '\0';
949
950 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
951 {
953
954 /*
955 * First notify event in current (sub)xact. Note that we allocate the
956 * NotificationList in TopTransactionContext; the nestingLevel might
957 * get changed later by AtSubCommit_Notify.
958 */
961 sizeof(NotificationList));
962 notifies->nestingLevel = my_level;
963 notifies->events = list_make1(n);
964 /* We certainly don't need a hashtable yet */
965 notifies->hashtab = NULL;
966 /* We won't build uniqueChannelNames/Hash till later, either */
967 notifies->uniqueChannelNames = NIL;
968 notifies->uniqueChannelHash = NULL;
969 notifies->upper = pendingNotifies;
971 }
972 else
973 {
974 /* Now check for duplicates */
976 {
977 /* It's a dup, so forget it */
978 pfree(n);
979 MemoryContextSwitchTo(oldcontext);
980 return;
981 }
982
983 /* Append more events to existing list */
985 }
986
987 MemoryContextSwitchTo(oldcontext);
988}
989
990/*
991 * queue_listen
992 * Common code for listen, unlisten, unlisten all commands.
993 *
994 * Adds the request to the list of pending actions.
995 * Actual update of localChannelTable and globalChannelTable happens during
996 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
997 */
998static void
999queue_listen(ListenActionKind action, const char *channel)
1000{
1001 MemoryContext oldcontext;
1003 int my_level = GetCurrentTransactionNestLevel();
1004
1005 /*
1006 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1007 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1008 * final per-channel intent is computed during PreCommit_Notify.
1009 */
1011
1012 /* space for terminating null is included in sizeof(ListenAction) */
1014 strlen(channel) + 1);
1015 actrec->action = action;
1016 strcpy(actrec->channel, channel);
1017
1018 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1019 {
1020 ActionList *actions;
1021
1022 /*
1023 * First action in current sub(xact). Note that we allocate the
1024 * ActionList in TopTransactionContext; the nestingLevel might get
1025 * changed later by AtSubCommit_Notify.
1026 */
1027 actions = (ActionList *)
1029 actions->nestingLevel = my_level;
1030 actions->actions = list_make1(actrec);
1031 actions->upper = pendingActions;
1032 pendingActions = actions;
1033 }
1034 else
1036
1037 MemoryContextSwitchTo(oldcontext);
1038}
1039
1040/*
1041 * Async_Listen
1042 *
1043 * This is executed by the SQL listen command.
1044 */
1045void
1046Async_Listen(const char *channel)
1047{
1048 if (Trace_notify)
1049 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1050
1051 queue_listen(LISTEN_LISTEN, channel);
1052}
1053
1054/*
1055 * Async_Unlisten
1056 *
1057 * This is executed by the SQL unlisten command.
1058 */
1059void
1060Async_Unlisten(const char *channel)
1061{
1062 if (Trace_notify)
1063 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1064
1065 /* If we couldn't possibly be listening, no need to queue anything */
1067 return;
1068
1069 queue_listen(LISTEN_UNLISTEN, channel);
1070}
1071
1072/*
1073 * Async_UnlistenAll
1074 *
1075 * This is invoked by UNLISTEN * command, and also at backend exit.
1076 */
1077void
1079{
1080 if (Trace_notify)
1081 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1082
1083 /* If we couldn't possibly be listening, no need to queue anything */
1085 return;
1086
1088}
1089
1090/*
1091 * SQL function: return a set of the channel names this backend is actively
1092 * listening to.
1093 *
1094 * Note: this coding relies on the fact that the localChannelTable cannot
1095 * change within a transaction.
1096 */
1097Datum
1099{
1101 HASH_SEQ_STATUS *status;
1102
1103 /* stuff done only on the first call of the function */
1104 if (SRF_IS_FIRSTCALL())
1105 {
1106 /* create a function context for cross-call persistence */
1108
1109 /* Initialize hash table iteration if we have any channels */
1110 if (localChannelTable != NULL)
1111 {
1112 MemoryContext oldcontext;
1113
1114 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1115 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1117 funcctx->user_fctx = status;
1118 MemoryContextSwitchTo(oldcontext);
1119 }
1120 else
1121 {
1122 funcctx->user_fctx = NULL;
1123 }
1124 }
1125
1126 /* stuff done on every call of the function */
1128 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1129
1130 if (status != NULL)
1131 {
1132 ChannelName *entry;
1133
1134 entry = (ChannelName *) hash_seq_search(status);
1135 if (entry != NULL)
1137 }
1138
1140}
1141
1142/*
1143 * Async_UnlistenOnExit
1144 *
1145 * This is executed at backend exit if we have done any LISTENs in this
1146 * backend. It might not be necessary anymore, if the user UNLISTENed
1147 * everything, but we don't try to detect that case.
1148 */
1149static void
1155
1156/*
1157 * AtPrepare_Notify
1158 *
1159 * This is called at the prepare phase of a two-phase
1160 * transaction. Save the state for possible commit later.
1161 */
1162void
1164{
1165 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1167 ereport(ERROR,
1169 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1170}
1171
1172/*
1173 * PreCommit_Notify
1174 *
1175 * This is called at transaction commit, before actually committing to
1176 * clog.
1177 *
1178 * If there are pending LISTEN actions, make sure we are listed in the
1179 * shared-memory listener array. This must happen before commit to
1180 * ensure we don't miss any notifies from transactions that commit
1181 * just after ours.
1182 *
1183 * If there are outbound notify requests in the pendingNotifies list,
1184 * add them to the global queue. We do that before commit so that
1185 * we can still throw error if we run out of queue space.
1186 */
1187void
1189{
1190 ListCell *p;
1191
1193 return; /* no relevant statements in this xact */
1194
1195 if (Trace_notify)
1196 elog(DEBUG1, "PreCommit_Notify");
1197
1198 /* Preflight for any pending listen/unlisten actions */
1200
1201 if (pendingActions != NULL)
1202 {
1203 /* Ensure we have a local channel table */
1205 /* Create pendingListenActions hash table for this transaction */
1207
1208 /* Stage all the actions this transaction wants to perform */
1209 foreach(p, pendingActions->actions)
1210 {
1212
1213 switch (actrec->action)
1214 {
1215 case LISTEN_LISTEN:
1218 break;
1219 case LISTEN_UNLISTEN:
1221 break;
1224 break;
1225 }
1226 }
1227 }
1228
1229 /* Queue any pending notifies (must happen after the above) */
1230 if (pendingNotifies)
1231 {
1233 bool firstIteration = true;
1234
1235 /*
1236 * Build list of unique channel names being notified for use by
1237 * SignalBackends().
1238 *
1239 * If uniqueChannelHash is available, use it to efficiently get the
1240 * unique channels. Otherwise, fall back to the O(N^2) approach.
1241 */
1244 {
1245 HASH_SEQ_STATUS status;
1247
1249 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1252 channelEntry->channel);
1253 }
1254 else
1255 {
1256 /* O(N^2) approach is better for small number of notifications */
1258 {
1259 char *channel = n->data;
1260 bool found = false;
1261
1262 /* Name present in list? */
1264 {
1265 if (strcmp(oldchan, channel) == 0)
1266 {
1267 found = true;
1268 break;
1269 }
1270 }
1271 /* Add if not already in list */
1272 if (!found)
1275 channel);
1276 }
1277 }
1278
1279 /* Preallocate workspace that will be needed by SignalBackends() */
1280 if (signalPids == NULL)
1282 MaxBackends * sizeof(int32));
1283
1284 if (signalProcnos == NULL)
1286 MaxBackends * sizeof(ProcNumber));
1287
1288 /*
1289 * Make sure that we have an XID assigned to the current transaction.
1290 * GetCurrentTransactionId is cheap if we already have an XID, but not
1291 * so cheap if we don't, and we'd prefer not to do that work while
1292 * holding NotifyQueueLock.
1293 */
1295
1296 /*
1297 * Serialize writers by acquiring a special lock that we hold till
1298 * after commit. This ensures that queue entries appear in commit
1299 * order, and in particular that there are never uncommitted queue
1300 * entries ahead of committed ones, so an uncommitted transaction
1301 * can't block delivery of deliverable notifications.
1302 *
1303 * We use a heavyweight lock so that it'll automatically be released
1304 * after either commit or abort. This also allows deadlocks to be
1305 * detected, though really a deadlock shouldn't be possible here.
1306 *
1307 * The lock is on "database 0", which is pretty ugly but it doesn't
1308 * seem worth inventing a special locktag category just for this.
1309 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1310 * used by the flatfiles mechanism.)
1311 */
1314
1315 /*
1316 * For the direct advancement optimization in SignalBackends(), we
1317 * need to ensure that no other backend can insert queue entries
1318 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1319 * heavyweight lock above provides this guarantee, since it serializes
1320 * all writers.
1321 *
1322 * Note: if the heavyweight lock were ever removed for scalability
1323 * reasons, we could achieve the same guarantee by holding
1324 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1325 * than releasing and reacquiring it for each page as we do below.
1326 */
1327
1328 /* Initialize values to a safe default in case list is empty */
1331
1332 /* Now push the notifications into the queue */
1334 while (nextNotify != NULL)
1335 {
1336 /*
1337 * Add the pending notifications to the queue. We acquire and
1338 * release NotifyQueueLock once per page, which might be overkill
1339 * but it does allow readers to get in while we're doing this.
1340 *
1341 * A full queue is very uncommon and should really not happen,
1342 * given that we have so much space available in the SLRU pages.
1343 * Nevertheless we need to deal with this possibility. Note that
1344 * when we get here we are in the process of committing our
1345 * transaction, but we have not yet committed to clog, so at this
1346 * point in time we can still roll the transaction back.
1347 */
1349 if (firstIteration)
1350 {
1352 firstIteration = false;
1353 }
1355 if (asyncQueueIsFull())
1356 ereport(ERROR,
1358 errmsg("too many notifications in the NOTIFY queue")));
1362 }
1363
1364 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1365 }
1366}
1367
1368/*
1369 * AtCommit_Notify
1370 *
1371 * This is called at transaction commit, after committing to clog.
1372 *
1373 * Apply pending listen/unlisten changes and clear transaction-local state.
1374 *
1375 * If we issued any notifications in the transaction, send signals to
1376 * listening backends (possibly including ourselves) to process them.
1377 * Also, if we filled enough queue pages with new notifies, try to
1378 * advance the queue tail pointer.
1379 */
1380void
1382{
1383 /*
1384 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1385 * return as soon as possible
1386 */
1388 return;
1389
1390 if (Trace_notify)
1391 elog(DEBUG1, "AtCommit_Notify");
1392
1393 /* Apply staged listen/unlisten changes */
1395
1396 /* If no longer listening to anything, get out of listener array */
1399
1400 /*
1401 * Send signals to listening backends. We need do this only if there are
1402 * pending notifies, which were previously added to the shared queue by
1403 * PreCommit_Notify().
1404 */
1405 if (pendingNotifies != NULL)
1407
1408 /*
1409 * If it's time to try to advance the global tail pointer, do that.
1410 *
1411 * (It might seem odd to do this in the sender, when more than likely the
1412 * listeners won't yet have read the messages we just sent. However,
1413 * there's less contention if only the sender does it, and there is little
1414 * need for urgency in advancing the global tail. So this typically will
1415 * be clearing out messages that were sent some time ago.)
1416 */
1417 if (tryAdvanceTail)
1418 {
1419 tryAdvanceTail = false;
1421 }
1422
1423 /* And clean up */
1425}
1426
1427/*
1428 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1429 *
1430 * This function must make sure we are ready to catch any incoming messages.
1431 */
1432static void
1434{
1435 QueuePosition head;
1436 QueuePosition max;
1438
1439 /*
1440 * Nothing to do if we are already listening to something, nor if we
1441 * already ran this routine in this transaction.
1442 */
1444 return;
1445
1446 if (Trace_notify)
1447 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1448
1449 /*
1450 * Before registering, make sure we will unlisten before dying. (Note:
1451 * this action does not get undone if we abort later.)
1452 */
1454 {
1457 }
1458
1459 /*
1460 * This is our first LISTEN, so establish our pointer.
1461 *
1462 * We set our pointer to the global tail pointer and then move it forward
1463 * over already-committed notifications. This ensures we cannot miss any
1464 * not-yet-committed notifications. We might get a few more but that
1465 * doesn't hurt.
1466 *
1467 * In some scenarios there might be a lot of committed notifications that
1468 * have not yet been pruned away (because some backend is being lazy about
1469 * reading them). To reduce our startup time, we can look at other
1470 * backends and adopt the maximum "pos" pointer of any backend that's in
1471 * our database; any notifications it's already advanced over are surely
1472 * committed and need not be re-examined by us. (We must consider only
1473 * backends connected to our DB, because others will not have bothered to
1474 * check committed-ness of notifications in our DB.)
1475 *
1476 * We need exclusive lock here so we can look at other backends' entries
1477 * and manipulate the list links.
1478 */
1480 head = QUEUE_HEAD;
1481 max = QUEUE_TAIL;
1484 {
1486 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1487 /* Also find last listening backend before this one */
1488 if (i < MyProcNumber)
1489 prevListener = i;
1490 }
1496 /* Insert backend into list of listeners at correct position */
1498 {
1501 }
1502 else
1503 {
1506 }
1508
1509 /* Now we are listed in the global array, so remember we're listening */
1510 amRegisteredListener = true;
1511
1512 /*
1513 * Try to move our pointer forward as far as possible. This will skip
1514 * over already-committed notifications, which we want to do because they
1515 * might be quite stale. Note that we are not yet listening on anything,
1516 * so we won't deliver such notifications to our frontend. Also, although
1517 * our transaction might have executed NOTIFY, those message(s) aren't
1518 * queued yet so we won't skip them here.
1519 */
1520 if (!QUEUE_POS_EQUAL(max, head))
1522}
1523
1524/*
1525 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1526 *
1527 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1528 * an entry in localChannelTable, and pre-allocating an entry in the shared
1529 * globalChannelTable with listening=false. The listening flag will be set
1530 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1531 * will be removed.
1532 */
1533static void
1535{
1536 GlobalChannelKey key;
1537 GlobalChannelEntry *entry;
1538 bool found;
1540 PendingListenEntry *pending;
1541
1542 /*
1543 * Record in local pending hash that we want to LISTEN, overwriting any
1544 * earlier attempt to UNLISTEN.
1545 */
1546 pending = (PendingListenEntry *)
1548 pending->action = PENDING_LISTEN;
1549
1550 /*
1551 * Ensure that there is an entry for the channel in localChannelTable.
1552 * (Should this fail, we can just roll back.) If the transaction fails
1553 * after this point, we will remove the entry if appropriate during
1554 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1555 * to return TRUE; we assume nothing is going to consult that before
1556 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1557 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1558 * present to ensure they do the right things; see
1559 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1560 */
1562
1563 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1564 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1565 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1566
1567 if (!found)
1568 {
1569 /* New channel entry, so initialize it to a safe state */
1571 entry->numListeners = 0;
1572 entry->allocatedListeners = 0;
1573 }
1574
1575 /*
1576 * Create listenersArray if entry doesn't have one. It's tempting to fold
1577 * this into the !found case, but this coding allows us to cope in case
1578 * dsa_allocate() failed in an earlier attempt.
1579 */
1580 if (!DsaPointerIsValid(entry->listenersArray))
1581 {
1585 }
1586
1589
1590 /*
1591 * Check if we already have a ListenerEntry (possibly from earlier in this
1592 * transaction)
1593 */
1594 for (int i = 0; i < entry->numListeners; i++)
1595 {
1596 if (listeners[i].procNo == MyProcNumber)
1597 {
1598 /* Already have an entry; listening flag stays as-is until commit */
1600 return;
1601 }
1602 }
1603
1604 /* Need to add a new entry; grow array if necessary */
1605 if (entry->numListeners >= entry->allocatedListeners)
1606 {
1607 int new_size = entry->allocatedListeners * 2;
1610 sizeof(ListenerEntry) * new_size);
1612
1614 entry->listenersArray = new_array;
1618 }
1619
1620 listeners[entry->numListeners].procNo = MyProcNumber;
1621 listeners[entry->numListeners].listening = false; /* staged, not yet
1622 * committed */
1623 entry->numListeners++;
1624
1626}
1627
1628/*
1629 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1630 *
1631 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1632 * we're currently listening (committed or staged). We don't touch
1633 * globalChannelTable yet - the listener keeps receiving signals until
1634 * commit, when the entry is removed.
1635 */
1636static void
1638{
1639 PendingListenEntry *pending;
1640
1641 /*
1642 * If the channel name is not in localChannelTable, then we are neither
1643 * listening on it nor preparing to listen on it, so we don't need to
1644 * record an UNLISTEN action.
1645 */
1647 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1648 return;
1649
1650 /*
1651 * Record in local pending hash that we want to UNLISTEN, overwriting any
1652 * earlier attempt to LISTEN. Don't touch localChannelTable or
1653 * globalChannelTable yet - we keep receiving signals until commit.
1654 */
1655 pending = (PendingListenEntry *)
1657 pending->action = PENDING_UNLISTEN;
1658}
1659
1660/*
1661 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1662 *
1663 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1664 * about-to-be-listened channels in pendingListenActions.
1665 */
1666static void
1668{
1671 PendingListenEntry *pending;
1672
1673 /*
1674 * Scan localChannelTable, which will have the names of all channels that
1675 * we are listening on or have prepared to listen on. Record an UNLISTEN
1676 * action for each one, overwriting any earlier attempt to LISTEN.
1677 */
1679 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1680 {
1681 pending = (PendingListenEntry *)
1683 pending->action = PENDING_UNLISTEN;
1684 }
1685}
1686
1687/*
1688 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1689 *
1690 * Decrements numListeners, compacts the array, and frees the entry if empty.
1691 * Sets *entry_ptr to NULL if the entry was deleted.
1692 *
1693 * We could get the listeners pointer from the entry, but all callers
1694 * already have it at hand.
1695 */
1696static void
1699 int idx)
1700{
1701 GlobalChannelEntry *entry = *entry_ptr;
1702
1703 entry->numListeners--;
1704 if (idx < entry->numListeners)
1706 sizeof(ListenerEntry) * (entry->numListeners - idx));
1707
1708 if (entry->numListeners == 0)
1709 {
1712 /* tells caller not to release the entry's lock: */
1713 *entry_ptr = NULL;
1714 }
1715}
1716
1717/*
1718 * ApplyPendingListenActions
1719 *
1720 * Apply, or revert, staged listen/unlisten changes to the local and global
1721 * hash tables.
1722 */
1723static void
1725{
1727 PendingListenEntry *pending;
1728
1729 /* Quick exit if nothing to do */
1731 return;
1732
1733 /* We made a globalChannelTable before building pendingListenActions */
1734 if (globalChannelTable == NULL)
1735 elog(PANIC, "global channel table missing post-commit/abort");
1736
1737 /* For each staged action ... */
1739 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1740 {
1741 GlobalChannelKey key;
1742 GlobalChannelEntry *entry;
1743 bool removeLocal = true;
1744 bool foundListener = false;
1745
1746 /*
1747 * Find the global entry for this channel. If isCommit, it had better
1748 * exist (it was created in PreCommit). In an abort, it might not
1749 * exist, in which case we are not listening and should discard any
1750 * local entry that PreCommit may have managed to create.
1751 */
1752 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1753 entry = dshash_find(globalChannelTable, &key, true);
1754 if (entry != NULL)
1755 {
1756 /* Scan entry to find the ListenerEntry for this backend */
1758
1761
1762 for (int i = 0; i < entry->numListeners; i++)
1763 {
1764 if (listeners[i].procNo != MyProcNumber)
1765 continue;
1766 foundListener = true;
1767 if (isCommit)
1768 {
1769 if (pending->action == PENDING_LISTEN)
1770 {
1771 /*
1772 * LISTEN being committed: set listening=true.
1773 * localChannelTable entry was created during
1774 * PreCommit and should be kept.
1775 */
1776 listeners[i].listening = true;
1777 removeLocal = false;
1778 }
1779 else
1780 {
1781 /*
1782 * UNLISTEN being committed: remove pre-allocated
1783 * entries from both tables.
1784 */
1786 }
1787 }
1788 else
1789 {
1790 /*
1791 * Note: this part is reachable only if the transaction
1792 * aborts after PreCommit_Notify() has made some
1793 * pendingListenActions entries, so it's pretty hard to
1794 * test.
1795 */
1796 if (!listeners[i].listening)
1797 {
1798 /*
1799 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1800 * and we weren't listening before, so remove
1801 * pre-allocated entries from both tables.
1802 */
1804 }
1805 else
1806 {
1807 /*
1808 * We're aborting, but the previous state was that
1809 * we're listening, so keep localChannelTable entry.
1810 */
1811 removeLocal = false;
1812 }
1813 }
1814 break; /* there shouldn't be another match */
1815 }
1816
1817 /* We might have already released the entry by removing it */
1818 if (entry != NULL)
1820 }
1821
1822 /*
1823 * If we're committing a LISTEN action, we should have found a
1824 * matching ListenerEntry, but otherwise it's okay if we didn't.
1825 */
1826 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1827 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1828 pending->channel, MyProcNumber);
1829
1830 /*
1831 * If we did not find a globalChannelTable entry for our backend, or
1832 * if we are unlistening, remove any localChannelTable entry that may
1833 * exist. (Note in particular that this cleans up if we created a
1834 * localChannelTable entry and then failed while trying to create a
1835 * globalChannelTable entry.)
1836 */
1839 HASH_REMOVE, NULL);
1840 }
1841}
1842
1843/*
1844 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1845 *
1846 * Remove this backend from all channels in the shared global table.
1847 */
1848static void
1850{
1851 dshash_seq_status status;
1852 GlobalChannelEntry *entry;
1853
1854 if (Trace_notify)
1855 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1856
1857 /* Clear our local cache (not really necessary, but be consistent) */
1858 if (localChannelTable != NULL)
1859 {
1862 }
1863
1864 /* Now remove our entries from the shared globalChannelTable */
1865 if (globalChannelTable == NULL)
1866 return;
1867
1868 dshash_seq_init(&status, globalChannelTable, true);
1869 while ((entry = dshash_seq_next(&status)) != NULL)
1870 {
1872
1873 if (entry->key.dboid != MyDatabaseId)
1874 continue; /* not relevant */
1875
1878
1879 for (int i = 0; i < entry->numListeners; i++)
1880 {
1881 if (listeners[i].procNo == MyProcNumber)
1882 {
1883 entry->numListeners--;
1884 if (i < entry->numListeners)
1885 memmove(&listeners[i], &listeners[i + 1],
1886 sizeof(ListenerEntry) * (entry->numListeners - i));
1887
1888 if (entry->numListeners == 0)
1889 {
1891 dshash_delete_current(&status);
1892 }
1893 break;
1894 }
1895 }
1896 }
1897 dshash_seq_term(&status);
1898}
1899
1900/*
1901 * Test whether we are actively listening on the given channel name.
1902 *
1903 * Note: this function is executed for every notification found in the queue.
1904 */
1905static bool
1906IsListeningOn(const char *channel)
1907{
1908 if (localChannelTable == NULL)
1909 return false;
1910
1911 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1912}
1913
1914/*
1915 * Remove our entry from the listeners array when we are no longer listening
1916 * on any channel. NB: must not fail if we're already not listening.
1917 */
1918static void
1920{
1921 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1922
1923 if (!amRegisteredListener) /* nothing to do */
1924 return;
1925
1926 /*
1927 * Need exclusive lock here to manipulate list links.
1928 */
1930 /* Mark our entry as invalid */
1935 /* and remove it from the list */
1938 else
1939 {
1941 {
1943 {
1945 break;
1946 }
1947 }
1948 }
1951
1952 /* mark ourselves as no longer listed in the global array */
1953 amRegisteredListener = false;
1954}
1955
1956/*
1957 * Test whether there is room to insert more notification messages.
1958 *
1959 * Caller must hold at least shared NotifyQueueLock.
1960 */
1961static bool
1963{
1964 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1966 int64 occupied = headPage - tailPage;
1967
1969}
1970
1971/*
1972 * Advance the QueuePosition to the next entry, assuming that the current
1973 * entry is of length entryLength. If we jump to a new page the function
1974 * returns true, else false.
1975 */
1976static bool
1978{
1979 int64 pageno = QUEUE_POS_PAGE(*position);
1980 int offset = QUEUE_POS_OFFSET(*position);
1981 bool pageJump = false;
1982
1983 /*
1984 * Move to the next writing position: First jump over what we have just
1985 * written or read.
1986 */
1987 offset += entryLength;
1988 Assert(offset <= QUEUE_PAGESIZE);
1989
1990 /*
1991 * In a second step check if another entry can possibly be written to the
1992 * page. If so, stay here, we have reached the next position. If not, then
1993 * we need to move on to the next page.
1994 */
1996 {
1997 pageno++;
1998 offset = 0;
1999 pageJump = true;
2000 }
2001
2002 SET_QUEUE_POS(*position, pageno, offset);
2003 return pageJump;
2004}
2005
2006/*
2007 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
2008 */
2009static void
2011{
2012 size_t channellen = n->channel_len;
2013 size_t payloadlen = n->payload_len;
2014 int entryLength;
2015
2018
2019 /* The terminators are already included in AsyncQueueEntryEmptySize */
2022 qe->length = entryLength;
2023 qe->dboid = MyDatabaseId;
2024 qe->xid = GetCurrentTransactionId();
2025 qe->srcPid = MyProcPid;
2026 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2027}
2028
2029/*
2030 * Add pending notifications to the queue.
2031 *
2032 * We go page by page here, i.e. we stop once we have to go to a new page but
2033 * we will be called again and then fill that next page. If an entry does not
2034 * fit into the current page, we write a dummy entry with an InvalidOid as the
2035 * database OID in order to fill the page. So every page is always used up to
2036 * the last byte which simplifies reading the page later.
2037 *
2038 * We are passed the list cell (in pendingNotifies->events) containing the next
2039 * notification to write and return the first still-unwritten cell back.
2040 * Eventually we will return NULL indicating all is done.
2041 *
2042 * We are holding NotifyQueueLock already from the caller and grab
2043 * page specific SLRU bank lock locally in this function.
2044 */
2045static ListCell *
2047{
2050 int64 pageno;
2051 int offset;
2052 int slotno;
2054
2055 /*
2056 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2057 * memory upon exiting. The reason for this is that if we have to advance
2058 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2059 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2060 * subsequent insertions would try to put entries into a page that slru.c
2061 * thinks doesn't exist yet.) So, use a local position variable. Note
2062 * that if we do fail, any already-inserted queue entries are forgotten;
2063 * this is okay, since they'd be useless anyway after our transaction
2064 * rolls back.
2065 */
2067
2068 /*
2069 * If this is the first write since the postmaster started, we need to
2070 * initialize the first page of the async SLRU. Otherwise, the current
2071 * page should be initialized already, so just fetch it.
2072 */
2073 pageno = QUEUE_POS_PAGE(queue_head);
2075
2076 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2078
2081 else
2082 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2083
2084 /* Note we mark the page dirty before writing in it */
2085 NotifyCtl->shared->page_dirty[slotno] = true;
2086
2087 while (nextNotify != NULL)
2088 {
2090
2091 /* Construct a valid queue entry in local variable qe */
2093
2094 offset = QUEUE_POS_OFFSET(queue_head);
2095
2096 /* Check whether the entry really fits on the current page */
2097 if (offset + qe.length <= QUEUE_PAGESIZE)
2098 {
2099 /* OK, so advance nextNotify past this item */
2101 }
2102 else
2103 {
2104 /*
2105 * Write a dummy entry to fill up the page. Actually readers will
2106 * only check dboid and since it won't match any reader's database
2107 * OID, they will ignore this entry and move on.
2108 */
2109 qe.length = QUEUE_PAGESIZE - offset;
2110 qe.dboid = InvalidOid;
2112 qe.data[0] = '\0'; /* empty channel */
2113 qe.data[1] = '\0'; /* empty payload */
2114 }
2115
2116 /* Now copy qe into the shared buffer page */
2117 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2118 &qe,
2119 qe.length);
2120
2121 /* Advance queue_head appropriately, and detect if page is full */
2122 if (asyncQueueAdvance(&(queue_head), qe.length))
2123 {
2124 LWLock *lock;
2125
2126 pageno = QUEUE_POS_PAGE(queue_head);
2127 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2128 if (lock != prevlock)
2129 {
2132 prevlock = lock;
2133 }
2134
2135 /*
2136 * Page is full, so we're done here, but first fill the next page
2137 * with zeroes. The reason to do this is to ensure that slru.c's
2138 * idea of the head page is always the same as ours, which avoids
2139 * boundary problems in SimpleLruTruncate. The test in
2140 * asyncQueueIsFull() ensured that there is room to create this
2141 * page without overrunning the queue.
2142 */
2144
2145 /*
2146 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2147 * set flag to remember that we should try to advance the tail
2148 * pointer (we don't want to actually do that right here).
2149 */
2151 tryAdvanceTail = true;
2152
2153 /* And exit the loop */
2154 break;
2155 }
2156 }
2157
2158 /* Success, so update the global QUEUE_HEAD */
2160
2162
2163 return nextNotify;
2164}
2165
2166/*
2167 * SQL function to return the fraction of the notification queue currently
2168 * occupied.
2169 */
2170Datum
2172{
2173 double usage;
2174
2175 /* Advance the queue tail so we don't report a too-large result */
2177
2181
2183}
2184
2185/*
2186 * Return the fraction of the queue that is currently occupied.
2187 *
2188 * The caller must hold NotifyQueueLock in (at least) shared mode.
2189 *
2190 * Note: we measure the distance to the logical tail page, not the physical
2191 * tail page. In some sense that's wrong, but the relative position of the
2192 * physical tail is affected by details such as SLRU segment boundaries,
2193 * so that a result based on that is unpleasantly unstable.
2194 */
2195static double
2197{
2198 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2200 int64 occupied = headPage - tailPage;
2201
2202 if (occupied == 0)
2203 return (double) 0; /* fast exit for common case */
2204
2205 return (double) occupied / (double) max_notify_queue_pages;
2206}
2207
2208/*
2209 * Check whether the queue is at least half full, and emit a warning if so.
2210 *
2211 * This is unlikely given the size of the queue, but possible.
2212 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2213 *
2214 * Caller must hold exclusive NotifyQueueLock.
2215 */
2216static void
2218{
2219 double fillDegree;
2220 TimestampTz t;
2221
2223 if (fillDegree < 0.5)
2224 return;
2225
2226 t = GetCurrentTimestamp();
2227
2230 {
2233
2235 {
2237 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2240 }
2241
2243 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2244 (minPid != InvalidPid ?
2245 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2246 : 0),
2247 (minPid != InvalidPid ?
2248 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2249 : 0)));
2250
2252 }
2253}
2254
2255/*
2256 * Send signals to listening backends.
2257 *
2258 * Normally we signal only backends that are interested in the notifies that
2259 * we just sent. However, that will leave idle listeners falling further and
2260 * further behind. Waken them anyway if they're far enough behind, so they'll
2261 * advance their queue position pointers, allowing the global tail to advance.
2262 *
2263 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2264 *
2265 * This is called during CommitTransaction(), so it's important for it
2266 * to have very low probability of failure.
2267 */
2268static void
2270{
2271 int count;
2272
2273 /* Can't get here without PreCommit_Notify having made the global table */
2275
2276 /* It should have set up these arrays, too */
2278
2279 /*
2280 * Identify backends that we need to signal. We don't want to send
2281 * signals while holding the NotifyQueueLock, so this part just builds a
2282 * list of target PIDs in signalPids[] and signalProcnos[].
2283 */
2284 count = 0;
2285
2287
2288 /* Scan each channel name that we notified in this transaction */
2290 {
2291 GlobalChannelKey key;
2292 GlobalChannelEntry *entry;
2294
2295 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2296 entry = dshash_find(globalChannelTable, &key, false);
2297 if (entry == NULL)
2298 continue; /* nobody is listening */
2299
2301 entry->listenersArray);
2302
2303 /* Identify listeners that now need waking, add them to arrays */
2304 for (int j = 0; j < entry->numListeners; j++)
2305 {
2306 ProcNumber i;
2307 int32 pid;
2308 QueuePosition pos;
2309
2310 if (!listeners[j].listening)
2311 continue; /* ignore not-yet-committed listeners */
2312
2313 i = listeners[j].procNo;
2314
2316 continue; /* already signaled, no need to repeat */
2317
2318 pid = QUEUE_BACKEND_PID(i);
2319 pos = QUEUE_BACKEND_POS(i);
2320
2321 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2322 continue; /* it's fully caught up already */
2323
2324 Assert(pid != InvalidPid);
2325
2327 signalPids[count] = pid;
2328 signalProcnos[count] = i;
2329 count++;
2330 }
2331
2333 }
2334
2335 /*
2336 * Scan all listeners. Any that are not already pending wakeup must not
2337 * be interested in our notifications (else we'd have set their wakeup
2338 * flags above). Check to see if we can directly advance their queue
2339 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2340 * them anyway so they will catch up.
2341 */
2343 {
2344 int32 pid;
2345 QueuePosition pos;
2346
2348 continue;
2349
2350 /* If it's currently advancing, we should not touch it */
2352 continue;
2353
2354 pid = QUEUE_BACKEND_PID(i);
2355 pos = QUEUE_BACKEND_POS(i);
2356
2357 /*
2358 * We can directly advance the other backend's queue pointer if it's
2359 * not currently advancing (else there are race conditions), and its
2360 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2361 * it miss some older messages), and we'd not be moving the pointer
2362 * backward.
2363 */
2366 {
2367 /* We can directly advance its pointer past what we wrote */
2369 }
2372 {
2373 /* It's idle and far behind, so wake it up */
2374 Assert(pid != InvalidPid);
2375
2377 signalPids[count] = pid;
2378 signalProcnos[count] = i;
2379 count++;
2380 }
2381 }
2382
2384
2385 /* Now send signals */
2386 for (int i = 0; i < count; i++)
2387 {
2388 int32 pid = signalPids[i];
2389
2390 /*
2391 * If we are signaling our own process, no need to involve the kernel;
2392 * just set the flag directly.
2393 */
2394 if (pid == MyProcPid)
2395 {
2397 continue;
2398 }
2399
2400 /*
2401 * Note: assuming things aren't broken, a signal failure here could
2402 * only occur if the target backend exited since we released
2403 * NotifyQueueLock; which is unlikely but certainly possible. So we
2404 * just log a low-level debug message if it happens.
2405 */
2407 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2408 }
2409}
2410
2411/*
2412 * AtAbort_Notify
2413 *
2414 * This is called at transaction abort.
2415 *
2416 * Revert any staged listen/unlisten changes and clean up transaction state.
2417 * This only does anything if we abort after PreCommit_Notify has staged
2418 * some entries.
2419 */
2420void
2422{
2423 /* Revert staged listen/unlisten changes */
2425
2426 /* If we're no longer listening on anything, unregister */
2429
2430 /* And clean up */
2432}
2433
2434/*
2435 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2436 *
2437 * Reassign all items in the pending lists to the parent transaction.
2438 */
2439void
2441{
2442 int my_level = GetCurrentTransactionNestLevel();
2443
2444 /* If there are actions at our nesting level, we must reparent them. */
2445 if (pendingActions != NULL &&
2446 pendingActions->nestingLevel >= my_level)
2447 {
2448 if (pendingActions->upper == NULL ||
2449 pendingActions->upper->nestingLevel < my_level - 1)
2450 {
2451 /* nothing to merge; give the whole thing to the parent */
2453 }
2454 else
2455 {
2457
2459
2460 /*
2461 * Mustn't try to eliminate duplicates here --- see queue_listen()
2462 */
2465 childPendingActions->actions);
2467 }
2468 }
2469
2470 /* If there are notifies at our nesting level, we must reparent them. */
2471 if (pendingNotifies != NULL &&
2472 pendingNotifies->nestingLevel >= my_level)
2473 {
2474 Assert(pendingNotifies->nestingLevel == my_level);
2475
2476 if (pendingNotifies->upper == NULL ||
2477 pendingNotifies->upper->nestingLevel < my_level - 1)
2478 {
2479 /* nothing to merge; give the whole thing to the parent */
2481 }
2482 else
2483 {
2484 /*
2485 * Formerly, we didn't bother to eliminate duplicates here, but
2486 * now we must, else we fall foul of "Assert(!found)", either here
2487 * or during a later attempt to build the parent-level hashtable.
2488 */
2490 ListCell *l;
2491
2493 /* Insert all the subxact's events into parent, except for dups */
2494 foreach(l, childPendingNotifies->events)
2495 {
2497
2500 }
2502 }
2503 }
2504}
2505
2506/*
2507 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2508 */
2509void
2511{
2512 int my_level = GetCurrentTransactionNestLevel();
2513
2514 /*
2515 * All we have to do is pop the stack --- the actions/notifies made in
2516 * this subxact are no longer interesting, and the space will be freed
2517 * when CurTransactionContext is recycled. We still have to free the
2518 * ActionList and NotificationList objects themselves, though, because
2519 * those are allocated in TopTransactionContext.
2520 *
2521 * Note that there might be no entries at all, or no entries for the
2522 * current subtransaction level, either because none were ever created, or
2523 * because we reentered this routine due to trouble during subxact abort.
2524 */
2525 while (pendingActions != NULL &&
2526 pendingActions->nestingLevel >= my_level)
2527 {
2529
2532 }
2533
2534 while (pendingNotifies != NULL &&
2535 pendingNotifies->nestingLevel >= my_level)
2536 {
2538
2541 }
2542}
2543
2544/*
2545 * HandleNotifyInterrupt
2546 *
2547 * Signal handler portion of interrupt handling. Let the backend know
2548 * that there's a pending notify interrupt. If we're currently reading
2549 * from the client, this will interrupt the read and
2550 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2551 */
2552void
2554{
2555 /*
2556 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2557 * you do here.
2558 */
2559
2560 /* signal that work needs to be done */
2562
2563 /* make sure the event is processed in due course */
2565}
2566
2567/*
2568 * ProcessNotifyInterrupt
2569 *
2570 * This is called if we see notifyInterruptPending set, just before
2571 * transmitting ReadyForQuery at the end of a frontend command, and
2572 * also if a notify signal occurs while reading from the frontend.
2573 * HandleNotifyInterrupt() will cause the read to be interrupted
2574 * via the process's latch, and this routine will get called.
2575 * If we are truly idle (ie, *not* inside a transaction block),
2576 * process the incoming notifies.
2577 *
2578 * If "flush" is true, force any frontend messages out immediately.
2579 * This can be false when being called at the end of a frontend command,
2580 * since we'll flush after sending ReadyForQuery.
2581 */
2582void
2584{
2586 return; /* not really idle */
2587
2588 /* Loop in case another signal arrives while sending messages */
2590 ProcessIncomingNotify(flush);
2591}
2592
2593
2594/*
2595 * Read all pending notifications from the queue, and deliver appropriate
2596 * ones to my frontend. Stop when we reach queue head or an uncommitted
2597 * notification.
2598 */
2599static void
2601{
2602 QueuePosition pos;
2603 QueuePosition head;
2604 Snapshot snapshot;
2605
2606 /*
2607 * Fetch current state, indicate to others that we have woken up, and that
2608 * we are in process of advancing our position.
2609 */
2611 /* Assert checks that we have a valid state entry */
2615 head = QUEUE_HEAD;
2616
2617 if (QUEUE_POS_EQUAL(pos, head))
2618 {
2619 /* Nothing to do, we have read all notifications already. */
2621 return;
2622 }
2623
2626
2627 /*----------
2628 * Get snapshot we'll use to decide which xacts are still in progress.
2629 * This is trickier than it might seem, because of race conditions.
2630 * Consider the following example:
2631 *
2632 * Backend 1: Backend 2:
2633 *
2634 * transaction starts
2635 * UPDATE foo SET ...;
2636 * NOTIFY foo;
2637 * commit starts
2638 * queue the notify message
2639 * transaction starts
2640 * LISTEN foo; -- first LISTEN in session
2641 * SELECT * FROM foo WHERE ...;
2642 * commit to clog
2643 * commit starts
2644 * add backend 2 to array of listeners
2645 * advance to queue head (this code)
2646 * commit to clog
2647 *
2648 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2649 * wasn't committed yet. Ideally we'd ensure that client 2 would
2650 * eventually get transaction 1's notify message, but there's no way
2651 * to do that; until we're in the listener array, there's no guarantee
2652 * that the notify message doesn't get removed from the queue.
2653 *
2654 * Therefore the coding technique transaction 2 is using is unsafe:
2655 * applications must commit a LISTEN before inspecting database state,
2656 * if they want to ensure they will see notifications about subsequent
2657 * changes to that state.
2658 *
2659 * What we do guarantee is that we'll see all notifications from
2660 * transactions committing after the snapshot we take here.
2661 * BecomeRegisteredListener has already added us to the listener array,
2662 * so no not-yet-committed messages can be removed from the queue
2663 * before we see them.
2664 *----------
2665 */
2666 snapshot = RegisterSnapshot(GetLatestSnapshot());
2667
2668 /*
2669 * It is possible that we fail while trying to send a message to our
2670 * frontend (for example, because of encoding conversion failure). If
2671 * that happens it is critical that we not try to send the same message
2672 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2673 * ERRORs to FATAL, causing the client connection to be closed on error.
2674 *
2675 * We used to only skip over the offending message and try to soldier on,
2676 * but it was somewhat questionable to lose a notification and give the
2677 * client an ERROR instead. A client application is not be prepared for
2678 * that and can't tell that a notification was missed. It was also not
2679 * very useful in practice because notifications are often processed while
2680 * a connection is idle and reading a message from the client, and in that
2681 * state, any error is upgraded to FATAL anyway. Closing the connection
2682 * is a clear signal to the application that it might have missed
2683 * notifications.
2684 */
2685 {
2687 bool reachedStop;
2688
2689 ExitOnAnyError = true;
2690
2691 do
2692 {
2693 /*
2694 * Process messages up to the stop position, end of page, or an
2695 * uncommitted message.
2696 *
2697 * Our stop position is what we found to be the head's position
2698 * when we entered this function. It might have changed already.
2699 * But if it has, we will receive (or have already received and
2700 * queued) another signal and come here again.
2701 *
2702 * We are not holding NotifyQueueLock here! The queue can only
2703 * extend beyond the head pointer (see above) and we leave our
2704 * backend's pointer where it is so nobody will truncate or
2705 * rewrite pages under us. Especially we don't want to hold a lock
2706 * while sending the notifications to the frontend.
2707 */
2708 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2709 } while (!reachedStop);
2710
2711 /* Update shared state */
2716
2718 }
2719
2720 /* Done with snapshot */
2721 UnregisterSnapshot(snapshot);
2722}
2723
2724/*
2725 * Fetch notifications from the shared queue, beginning at position current,
2726 * and deliver relevant ones to my frontend.
2727 *
2728 * The function returns true once we have reached the stop position or an
2729 * uncommitted notification, and false if we have finished with the page.
2730 * In other words: once it returns true there is no need to look further.
2731 * The QueuePosition *current is advanced past all processed messages.
2732 */
2733static bool
2735 QueuePosition stop,
2736 Snapshot snapshot)
2737{
2738 int64 curpage = QUEUE_POS_PAGE(*current);
2739 int slotno;
2740 char *page_buffer;
2741 bool reachedStop = false;
2742 bool reachedEndOfPage;
2743
2744 /*
2745 * We copy the entries into a local buffer to avoid holding the SLRU lock
2746 * while we transmit them to our frontend. The local buffer must be
2747 * adequately aligned.
2748 */
2750 char *local_buf_end = local_buf;
2751
2753 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2754
2755 do
2756 {
2757 QueuePosition thisentry = *current;
2759
2760 if (QUEUE_POS_EQUAL(thisentry, stop))
2761 break;
2762
2763 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2764
2765 /*
2766 * Advance *current over this message, possibly to the next page. As
2767 * noted in the comments for asyncQueueReadAllNotifications, we must
2768 * do this before possibly failing while processing the message.
2769 */
2770 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2771
2772 /* Ignore messages destined for other databases */
2773 if (qe->dboid == MyDatabaseId)
2774 {
2775 if (XidInMVCCSnapshot(qe->xid, snapshot))
2776 {
2777 /*
2778 * The source transaction is still in progress, so we can't
2779 * process this message yet. Break out of the loop, but first
2780 * back up *current so we will reprocess the message next
2781 * time. (Note: it is unlikely but not impossible for
2782 * TransactionIdDidCommit to fail, so we can't really avoid
2783 * this advance-then-back-up behavior when dealing with an
2784 * uncommitted message.)
2785 *
2786 * Note that we must test XidInMVCCSnapshot before we test
2787 * TransactionIdDidCommit, else we might return a message from
2788 * a transaction that is not yet visible to snapshots; compare
2789 * the comments at the head of heapam_visibility.c.
2790 *
2791 * Also, while our own xact won't be listed in the snapshot,
2792 * we need not check for TransactionIdIsCurrentTransactionId
2793 * because our transaction cannot (yet) have queued any
2794 * messages.
2795 */
2796 *current = thisentry;
2797 reachedStop = true;
2798 break;
2799 }
2800
2801 /*
2802 * Quick check for the case that we're not listening on any
2803 * channels, before calling TransactionIdDidCommit(). This makes
2804 * that case a little faster, but more importantly, it ensures
2805 * that if there's a bad entry in the queue for which
2806 * TransactionIdDidCommit() fails for some reason, we can skip
2807 * over it on the first LISTEN in a session, and not get stuck on
2808 * it indefinitely. (This is a little trickier than it looks: it
2809 * works because BecomeRegisteredListener runs this code before we
2810 * have made the first entry in localChannelTable.)
2811 */
2813 continue;
2814
2815 if (TransactionIdDidCommit(qe->xid))
2816 {
2817 memcpy(local_buf_end, qe, qe->length);
2818 local_buf_end += qe->length;
2819 }
2820 else
2821 {
2822 /*
2823 * The source transaction aborted or crashed, so we just
2824 * ignore its notifications.
2825 */
2826 }
2827 }
2828
2829 /* Loop back if we're not at end of page */
2830 } while (!reachedEndOfPage);
2831
2832 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2834
2835 /*
2836 * Now that we have let go of the SLRU bank lock, send the notifications
2837 * to our backend
2838 */
2840 for (char *p = local_buf; p < local_buf_end;)
2841 {
2843
2844 /* qe->data is the null-terminated channel name */
2845 char *channel = qe->data;
2846
2847 if (IsListeningOn(channel))
2848 {
2849 /* payload follows channel name */
2850 char *payload = qe->data + strlen(channel) + 1;
2851
2852 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2853 }
2854
2855 p += qe->length;
2856 }
2857
2858 if (QUEUE_POS_EQUAL(*current, stop))
2859 reachedStop = true;
2860
2861 return reachedStop;
2862}
2863
2864/*
2865 * Advance the shared queue tail variable to the minimum of all the
2866 * per-backend tail pointers. Truncate pg_notify space if possible.
2867 *
2868 * This is (usually) called during CommitTransaction(), so it's important for
2869 * it to have very low probability of failure.
2870 */
2871static void
2873{
2874 QueuePosition min;
2877 int64 boundary;
2878
2879 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2881
2882 /*
2883 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2884 * (ie, exactly match at least one backend's queue position), so it must
2885 * be updated atomically with the actual computation. Since v13, we could
2886 * get away with not doing it like that, but it seems prudent to keep it
2887 * so.
2888 *
2889 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2890 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2891 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2892 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2893 * there are pages we can truncate but haven't yet finished doing so.
2894 *
2895 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2896 * performing SimpleLruTruncate. This is OK because no backend will try
2897 * to access the pages we are in the midst of truncating.
2898 */
2900 min = QUEUE_HEAD;
2902 {
2904 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2905 }
2906 QUEUE_TAIL = min;
2909
2910 /*
2911 * We can truncate something if the global tail advanced across an SLRU
2912 * segment boundary.
2913 *
2914 * XXX it might be better to truncate only once every several segments, to
2915 * reduce the number of directory scans.
2916 */
2919 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2920 {
2921 /*
2922 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2923 * release the lock again.
2924 */
2926
2930 }
2931
2933}
2934
2935/*
2936 * AsyncNotifyFreezeXids
2937 *
2938 * Prepare the async notification queue for CLOG truncation by freezing
2939 * transaction IDs that are about to become inaccessible.
2940 *
2941 * This function is called by VACUUM before advancing datfrozenxid. It scans
2942 * the notification queue and replaces XIDs that would become inaccessible
2943 * after CLOG truncation with special markers:
2944 * - Committed transactions are set to FrozenTransactionId
2945 * - Aborted/crashed transactions are set to InvalidTransactionId
2946 *
2947 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2948 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2949 * (or it would have held back newFrozenXid through ProcArray).
2950 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2951 * either aborted explicitly or crashed, and we can safely mark it invalid.
2952 */
2953void
2955{
2956 QueuePosition pos;
2957 QueuePosition head;
2958 int64 curpage = -1;
2959 int slotno = -1;
2960 char *page_buffer = NULL;
2961 bool page_dirty = false;
2962
2963 /*
2964 * Acquire locks in the correct order to avoid deadlocks. As per the
2965 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2966 * bank locks.
2967 *
2968 * We only need SHARED mode since we're just reading the head/tail
2969 * positions, not modifying them.
2970 */
2973
2974 pos = QUEUE_TAIL;
2975 head = QUEUE_HEAD;
2976
2977 /* Release NotifyQueueLock early, we only needed to read the positions */
2979
2980 /*
2981 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2982 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2983 * we're working.
2984 */
2985 while (!QUEUE_POS_EQUAL(pos, head))
2986 {
2988 TransactionId xid;
2989 int64 pageno = QUEUE_POS_PAGE(pos);
2990 int offset = QUEUE_POS_OFFSET(pos);
2991
2992 /* If we need a different page, release old lock and get new one */
2993 if (pageno != curpage)
2994 {
2995 LWLock *lock;
2996
2997 /* Release previous page if any */
2998 if (slotno >= 0)
2999 {
3000 if (page_dirty)
3001 {
3002 NotifyCtl->shared->page_dirty[slotno] = true;
3003 page_dirty = false;
3004 }
3006 }
3007
3008 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3010 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3011 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3012 curpage = pageno;
3013 }
3014
3015 qe = (AsyncQueueEntry *) (page_buffer + offset);
3016 xid = qe->xid;
3017
3018 if (TransactionIdIsNormal(xid) &&
3020 {
3021 if (TransactionIdDidCommit(xid))
3022 {
3023 qe->xid = FrozenTransactionId;
3024 page_dirty = true;
3025 }
3026 else
3027 {
3028 qe->xid = InvalidTransactionId;
3029 page_dirty = true;
3030 }
3031 }
3032
3033 /* Advance to next entry */
3034 asyncQueueAdvance(&pos, qe->length);
3035 }
3036
3037 /* Release final page lock if we acquired one */
3038 if (slotno >= 0)
3039 {
3040 if (page_dirty)
3041 NotifyCtl->shared->page_dirty[slotno] = true;
3043 }
3044
3046}
3047
3048/*
3049 * ProcessIncomingNotify
3050 *
3051 * Scan the queue for arriving notifications and report them to the front
3052 * end. The notifications might be from other sessions, or our own;
3053 * there's no need to distinguish here.
3054 *
3055 * If "flush" is true, force any frontend messages out immediately.
3056 *
3057 * NOTE: since we are outside any transaction, we must create our own.
3058 */
3059static void
3061{
3062 /* We *must* reset the flag */
3063 notifyInterruptPending = false;
3064
3065 /* Do nothing else if we aren't actively listening */
3067 return;
3068
3069 if (Trace_notify)
3070 elog(DEBUG1, "ProcessIncomingNotify");
3071
3072 set_ps_display("notify interrupt");
3073
3074 /*
3075 * We must run asyncQueueReadAllNotifications inside a transaction, else
3076 * bad things happen if it gets an error.
3077 */
3079
3081
3083
3084 /*
3085 * If this isn't an end-of-command case, we must flush the notify messages
3086 * to ensure frontend gets them promptly.
3087 */
3088 if (flush)
3089 pq_flush();
3090
3091 set_ps_display("idle");
3092
3093 if (Trace_notify)
3094 elog(DEBUG1, "ProcessIncomingNotify: done");
3095}
3096
3097/*
3098 * Send NOTIFY message to my front end.
3099 */
3100void
3101NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3102{
3104 {
3106
3108 pq_sendint32(&buf, srcPid);
3109 pq_sendstring(&buf, channel);
3110 pq_sendstring(&buf, payload);
3112
3113 /*
3114 * NOTE: we do not do pq_flush() here. Some level of caller will
3115 * handle it later, allowing this message to be combined into a packet
3116 * with other ones.
3117 */
3118 }
3119 else
3120 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3121}
3122
3123/* Does pendingNotifies include a match for the given event? */
3124static bool
3126{
3127 if (pendingNotifies == NULL)
3128 return false;
3129
3131 {
3132 /* Use the hash table to probe for a match */
3134 &n,
3135 HASH_FIND,
3136 NULL))
3137 return true;
3138 }
3139 else
3140 {
3141 /* Must scan the event list */
3142 ListCell *l;
3143
3144 foreach(l, pendingNotifies->events)
3145 {
3147
3148 if (n->channel_len == oldn->channel_len &&
3149 n->payload_len == oldn->payload_len &&
3150 memcmp(n->data, oldn->data,
3151 n->channel_len + n->payload_len + 2) == 0)
3152 return true;
3153 }
3154 }
3155
3156 return false;
3157}
3158
3159/*
3160 * Add a notification event to a pre-existing pendingNotifies list.
3161 *
3162 * Because pendingNotifies->events is already nonempty, this works
3163 * correctly no matter what CurrentMemoryContext is.
3164 */
3165static void
3167{
3169
3170 /* Create the hash tables if it's time to */
3173 {
3175 ListCell *l;
3176
3177 /* Create the hash table */
3178 hash_ctl.keysize = sizeof(Notification *);
3179 hash_ctl.entrysize = sizeof(struct NotificationHash);
3184 hash_create("Pending Notifies",
3185 256L,
3186 &hash_ctl,
3188
3189 /* Create the unique channel name table */
3191 hash_ctl.keysize = NAMEDATALEN;
3192 hash_ctl.entrysize = sizeof(ChannelName);
3195 hash_create("Pending Notify Channel Names",
3196 64L,
3197 &hash_ctl,
3199
3200 /* Insert all the already-existing events */
3201 foreach(l, pendingNotifies->events)
3202 {
3204 char *channel = oldn->data;
3205 bool found;
3206
3208 &oldn,
3209 HASH_ENTER,
3210 &found);
3211 Assert(!found);
3212
3213 /* Add channel name to uniqueChannelHash; might be there already */
3215 channel,
3216 HASH_ENTER,
3217 NULL);
3218 }
3219 }
3220
3221 /* Add new event to the list, in order */
3223
3224 /* Add event to the hash tables if needed */
3226 {
3227 char *channel = n->data;
3228 bool found;
3229
3231 &n,
3232 HASH_ENTER,
3233 &found);
3234 Assert(!found);
3235
3236 /* Add channel name to uniqueChannelHash; might be there already */
3238 channel,
3239 HASH_ENTER,
3240 NULL);
3241 }
3242}
3243
3244/*
3245 * notification_hash: hash function for notification hash table
3246 *
3247 * The hash "keys" are pointers to Notification structs.
3248 */
3249static uint32
3250notification_hash(const void *key, Size keysize)
3251{
3252 const Notification *k = *(const Notification *const *) key;
3253
3254 Assert(keysize == sizeof(Notification *));
3255 /* We don't bother to include the payload's trailing null in the hash */
3256 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3257 k->channel_len + k->payload_len + 1));
3258}
3259
3260/*
3261 * notification_match: match function to use with notification_hash
3262 */
3263static int
3264notification_match(const void *key1, const void *key2, Size keysize)
3265{
3266 const Notification *k1 = *(const Notification *const *) key1;
3267 const Notification *k2 = *(const Notification *const *) key2;
3268
3269 Assert(keysize == sizeof(Notification *));
3270 if (k1->channel_len == k2->channel_len &&
3271 k1->payload_len == k2->payload_len &&
3272 memcmp(k1->data, k2->data,
3273 k1->channel_len + k1->payload_len + 2) == 0)
3274 return 0; /* equal */
3275 return 1; /* not equal */
3276}
3277
3278/* Clear the pendingActions and pendingNotifies lists. */
3279static void
3281{
3282 /*
3283 * Everything's allocated in either TopTransactionContext or the context
3284 * for the subtransaction to which it corresponds. So, there's nothing to
3285 * do here except reset the pointers; the space will be reclaimed when the
3286 * contexts are deleted.
3287 */
3290 /* Also clear pendingListenActions, which is derived from pendingActions */
3292}
3293
3294/*
3295 * GUC check_hook for notify_buffers
3296 */
3297bool
3299{
3300 return check_slru_buffers("notify_buffers", newval);
3301}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2269
static double asyncQueueUsage(void)
Definition async.c:2196
#define MIN_HASHABLE_NOTIFIES
Definition async.c:513
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1534
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:2010
#define QUEUE_FIRST_LISTENER
Definition async.c:351
#define QUEUE_POS_MAX(x, y)
Definition async.c:259
static bool tryAdvanceTail
Definition async.c:564
void HandleNotifyInterrupt(void)
Definition async.c:2553
static void BecomeRegisteredListener(void)
Definition async.c:1433
static void asyncQueueAdvanceTail(void)
Definition async.c:2872
int max_notify_queue_pages
Definition async.c:570
static ActionList * pendingActions
Definition async.c:444
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1724
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:357
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3250
void Async_UnlistenAll(void)
Definition async.c:1078
static int32 * signalPids
Definition async.c:560
static SlruCtlData NotifyCtlData
Definition async.c:362
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3101
void AtCommit_Notify(void)
Definition async.c:1381
#define QUEUE_POS_MIN(x, y)
Definition async.c:253
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1637
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2583
ListenActionKind
Definition async.c:425
@ LISTEN_LISTEN
Definition async.c:426
@ LISTEN_UNLISTEN_ALL
Definition async.c:428
@ LISTEN_UNLISTEN
Definition async.c:427
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3125
#define QUEUE_BACKEND_POS(i)
Definition async.c:355
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:673
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:378
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3264
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:660
#define SET_QUEUE_POS(x, y, z)
Definition async.c:240
static ProcNumber * signalProcnos
Definition async.c:561
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2734
static void ProcessIncomingNotify(bool flush)
Definition async.c:3060
static void asyncQueueReadAllNotifications(void)
Definition async.c:2600
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1150
#define QUEUE_POS_OFFSET(x)
Definition async.c:238
static QueuePosition queueHeadAfterWrite
Definition async.c:553
static int asyncQueueErrdetailForIoError(const void *opaque_data)
Definition async.c:615
bool Trace_notify
Definition async.c:567
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2046
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3280
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1098
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:863
static NotificationList * pendingNotifies
Definition async.c:520
#define AsyncQueueEntryEmptySize
Definition async.c:226
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3166
static AsyncQueueControl * asyncQueueControl
Definition async.c:346
static bool unlistenExitRegistered
Definition async.c:541
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:638
static dsa_area * globalChannelDSA
Definition async.c:401
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1977
#define QUEUE_TAIL
Definition async.c:349
void AtAbort_Notify(void)
Definition async.c:2421
#define QUEUE_POS_PAGE(x)
Definition async.c:237
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1697
void PreCommit_Notify(void)
Definition async.c:1188
#define QUEUE_CLEANUP_DELAY
Definition async.c:281
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1667
static void asyncQueueFillWarning(void)
Definition async.c:2217
#define QUEUE_BACKEND_PID(i)
Definition async.c:352
static void CleanupListenersOnExit(void)
Definition async.c:1849
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:648
Size AsyncShmemSize(void)
Definition async.c:786
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:367
void Async_Unlisten(const char *channel)
Definition async.c:1060
static HTAB * pendingListenActions
Definition async.c:466
void Async_Listen(const char *channel)
Definition async.c:1046
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:200
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:249
static void initGlobalChannelTable(void)
Definition async.c:687
#define NotifyCtl
Definition async.c:364
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:356
static HTAB * localChannelTable
Definition async.c:408
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:628
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:999
#define QUEUEALIGN(len)
Definition async.c:224
static bool amRegisteredListener
Definition async.c:544
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:265
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:354
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:353
void AtSubAbort_Notify(void)
Definition async.c:2510
void AtPrepare_Notify(void)
Definition async.c:1163
#define QUEUE_PAGESIZE
Definition async.c:365
void AtSubCommit_Notify(void)
Definition async.c:2440
static bool asyncQueueIsFull(void)
Definition async.c:1962
#define QUEUE_HEAD
Definition async.c:348
void AsyncShmemInit(void)
Definition async.c:803
static void initLocalChannelTable(void)
Definition async.c:738
PendingListenAction
Definition async.c:455
@ PENDING_UNLISTEN
Definition async.c:457
@ PENDING_LISTEN
Definition async.c:456
static dshash_table * globalChannelTable
Definition async.c:400
static void asyncQueueUnregister(void)
Definition async.c:1919
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2171
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:246
#define LocalChannelTableIsEmpty()
Definition async.c:411
static void initPendingListenActions(void)
Definition async.c:764
static QueuePosition queueHeadBeforeWrite
Definition async.c:552
static bool IsListeningOn(const char *channel)
Definition async.c:1906
void Async_Notify(const char *channel, const char *payload)
Definition async.c:897
volatile sig_atomic_t notifyInterruptPending
Definition async.c:538
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2954
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3298
#define QUEUE_STOP_PAGE
Definition async.c:350
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1772
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:552
int32_t int32
Definition c.h:614
uint16_t uint16
Definition c.h:617
uint32_t uint32
Definition c.h:618
uint32 TransactionId
Definition c.h:738
size_t Size
Definition c.h:691
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
dsm_handle dsa_handle
Definition dsa.h:136
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:611
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:562
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:579
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:659
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:394
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:371
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:274
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:768
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:678
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:210
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:593
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:778
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
dsa_pointer dshash_table_handle
Definition dshash.h:24
uint32 dshash_hash
Definition dshash.h:30
#define dshash_find_or_insert(hash_table, key, found)
Definition dshash.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:28
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define INFO
Definition elog.h:34
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:47
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
bool ExitOnAnyError
Definition globals.c:123
int notify_buffers
Definition globals.c:164
struct Latch * MyLatch
Definition globals.c:63
Oid MyDatabaseId
Definition globals.c:94
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:96
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_COMPARE
Definition hsearch.h:99
#define HASH_FUNCTION
Definition hsearch.h:98
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:343
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:94
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:287
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:254
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:533
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1824
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1777
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:380
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1441
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, const void *opaque_data)
Definition slru.c:637
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:200
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:360
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:171
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:440
int nestingLevel
Definition async.c:439
struct ActionList * upper
Definition async.c:441
ProcNumber firstListener
Definition async.c:337
QueuePosition tail
Definition async.c:333
QueuePosition head
Definition async.c:332
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:343
dshash_table_handle globalChannelTableDSH
Definition async.c:341
TimestampTz lastQueueFillWarn
Definition async.c:339
dsa_handle globalChannelTableDSA
Definition async.c:340
int32 srcPid
Definition async.c:219
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:220
TransactionId xid
Definition async.c:218
char channel[NAMEDATALEN]
Definition async.c:528
dsa_pointer listenersArray
Definition async.c:395
int allocatedListeners
Definition async.c:397
GlobalChannelKey key
Definition async.c:394
char channel[NAMEDATALEN]
Definition async.c:383
Size keysize
Definition hsearch.h:75
Definition pg_list.h:54
ListenActionKind action
Definition async.c:433
ProcNumber procNo
Definition async.c:388
bool listening
Definition async.c:389
Notification * event
Definition async.c:517
List * uniqueChannelNames
Definition async.c:508
HTAB * uniqueChannelHash
Definition async.c:509
HTAB * hashtab
Definition async.c:507
List * events
Definition async.c:506
struct NotificationList * upper
Definition async.c:510
uint16 payload_len
Definition async.c:498
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:500
uint16 channel_len
Definition async.c:497
PendingListenAction action
Definition async.c:463
char channel[NAMEDATALEN]
Definition async.c:462
ProcNumber nextListener
Definition async.c:290
QueuePosition pos
Definition async.c:291
int64 page
Definition async.c:233
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:446
char * text_to_cstring(const text *t)
Definition varlena.c:217
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5012
int GetCurrentTransactionNestLevel(void)
Definition xact.c:931
void StartTransactionCommand(void)
Definition xact.c:3081
void CommitTransactionCommand(void)
Definition xact.c:3179
TransactionId GetCurrentTransactionId(void)
Definition xact.c:456