PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * async.c
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/async.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15/*-------------------------------------------------------------------------
16 * Async Notification Model as of v19:
17 *
18 * 1. Multiple backends on same machine. Multiple backends may be listening
19 * on each of several channels.
20 *
21 * 2. There is one central queue in disk-based storage (directory pg_notify/),
22 * with actively-used pages mapped into shared memory by the slru.c module.
23 * All notification messages are placed in the queue and later read out
24 * by listening backends. The single queue allows us to guarantee that
25 * notifications are received in commit order.
26 *
27 * Although there is only one queue, notifications are treated as being
28 * database-local; this is done by including the sender's database OID
29 * in each notification message. Listening backends ignore messages
30 * that don't match their database OID. This is important because it
31 * ensures senders and receivers have the same database encoding and won't
32 * misinterpret non-ASCII text in the channel name or payload string.
33 *
34 * Since notifications are not expected to survive database crashes,
35 * we can simply clean out the pg_notify data at any reboot, and there
36 * is no need for WAL support or fsync'ing.
37 *
38 * 3. Every backend that is listening on at least one channel registers by
39 * entering its PID into the array in AsyncQueueControl. It then scans all
40 * incoming notifications in the central queue and first compares the
41 * database OID of the notification with its own database OID and then
42 * compares the notified channel with the list of channels that it listens
43 * to. In case there is a match it delivers the notification event to its
44 * frontend. Non-matching events are simply skipped.
45 *
46 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
47 * a backend-local list which will not be processed until transaction end.
48 *
49 * Duplicate notifications from the same transaction are sent out as one
50 * notification only. This is done to save work when for example a trigger
51 * on a 2 million row table fires a notification for each row that has been
52 * changed. If the application needs to receive every single notification
53 * that has been sent, it can easily add some unique string into the extra
54 * payload parameter.
55 *
56 * When the transaction is ready to commit, PreCommit_Notify() adds the
57 * pending notifications to the head of the queue. The head pointer of the
58 * queue always points to the next free position and a position is just a
59 * page number and the offset in that page. This is done before marking the
60 * transaction as committed in clog. If we run into problems writing the
61 * notifications, we can still call elog(ERROR, ...) and the transaction
62 * will roll back safely.
63 *
64 * Once we have put all of the notifications into the queue, we return to
65 * CommitTransaction() which will then do the actual transaction commit.
66 *
67 * After commit we are called another time (AtCommit_Notify()). Here we
68 * make any required updates to the effective listen state (see below).
69 * Then we signal any backends that may be interested in our messages
70 * (including our own backend, if listening). This is done by
71 * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
72 * each relevant backend, as described below.
73 *
74 * Finally, after we are out of the transaction altogether and about to go
75 * idle, we scan the queue for messages that need to be sent to our
76 * frontend (which might be notifies from other backends, or self-notifies
77 * from our own). This step is not part of the CommitTransaction sequence
78 * for two important reasons. First, we could get errors while sending
79 * data to our frontend, and it's really bad for errors to happen in
80 * post-commit cleanup. Second, in cases where a procedure issues commits
81 * within a single frontend command, we don't want to send notifies to our
82 * frontend until the command is done; but notifies to other backends
83 * should go out immediately after each commit.
84 *
85 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
86 * sets the process's latch, which triggers the event to be processed
87 * immediately if this backend is idle (i.e., it is waiting for a frontend
88 * command and is not within a transaction block. C.f.
89 * ProcessClientReadInterrupt()). Otherwise the handler may only set a
90 * flag, which will cause the processing to occur just before we next go
91 * idle.
92 *
93 * Inbound-notify processing consists of reading all of the notifications
94 * that have arrived since scanning last time. We read every notification
95 * until we reach either a notification from an uncommitted transaction or
96 * the head pointer's position.
97 *
98 * 6. To limit disk space consumption, the tail pointer needs to be advanced
99 * so that old pages can be truncated. This is relatively expensive
100 * (notably, it requires an exclusive lock), so we don't want to do it
101 * often. We make sending backends do this work if they advanced the queue
102 * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
103 *
104 * 7. So far we have not discussed how backends change their listening state,
105 * nor how notification senders know which backends to awaken. To handle
106 * the latter, we maintain a global channel table (implemented as a dynamic
107 * shared hash table, or dshash) that maps channel names to the set of
108 * backends listening on each channel. This table is created lazily on the
109 * first LISTEN command and grows dynamically as needed. There is also a
110 * local channel table (a plain dynahash table) in each listening backend,
111 * tracking which channels that backend is listening to. The local table
112 * serves to reduce the number of accesses needed to the shared table.
113 *
114 * If the current transaction has executed any LISTEN/UNLISTEN actions,
115 * PreCommit_Notify() prepares to commit those. For LISTEN, it
116 * pre-allocates entries in both the per-backend localChannelTable and the
117 * shared globalChannelTable (with listening=false so that these entries
118 * are no-ops for the moment). It also records the final per-channel
119 * intent in pendingListenActions, so post-commit/abort processing can
120 * apply that in a single step. Since all these allocations happen before
121 * committing to clog, we can safely abort the transaction on failure.
122 *
123 * After commit, AtCommit_Notify() runs through pendingListenActions and
124 * updates the backend's per-channel listening flags to activate or
125 * deactivate listening. This happens before sending signals.
126 *
127 * SignalBackends() consults the shared global channel table to identify
128 * listeners for the channels that the current transaction sent
129 * notification(s) to. Each selected backend is marked as having a wakeup
130 * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
131 * signal is sent to it.
132 *
133 * 8. While writing notifications, PreCommit_Notify() records the queue head
134 * position both before and after the write. Because all writers serialize
135 * on a cluster-wide heavyweight lock, no other backend can insert entries
136 * between these two points. SignalBackends() uses this fact to directly
137 * advance the queue pointer for any backend that is still positioned at
138 * the old head, or within the range written, but is not interested in any
139 * of our notifications. This avoids unnecessary wakeups for idle
140 * listeners that have nothing to read. Backends that are not interested
141 * in our notifications, but cannot be directly advanced, are signaled only
142 * if they are far behind the current queue head; that is to ensure that
143 * we can advance the queue tail without undue delay.
144 *
145 * An application that listens on the same channel it notifies will get
146 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
147 * by comparing be_pid in the NOTIFY message to the application's own backend's
148 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
149 * frontend during startup.) The above design guarantees that notifies from
150 * other backends will never be missed by ignoring self-notifies.
151 *
152 * The amount of shared memory used for notify management (notify_buffers)
153 * can be varied without affecting anything but performance. The maximum
154 * amount of notification data that can be queued at one time is determined
155 * by the max_notify_queue_pages GUC.
156 *-------------------------------------------------------------------------
157 */
158
159#include "postgres.h"
160
161#include <limits.h>
162#include <unistd.h>
163#include <signal.h>
164
165#include "access/parallel.h"
166#include "access/slru.h"
167#include "access/transam.h"
168#include "access/xact.h"
169#include "catalog/pg_database.h"
170#include "commands/async.h"
171#include "common/hashfn.h"
172#include "funcapi.h"
173#include "lib/dshash.h"
174#include "libpq/libpq.h"
175#include "libpq/pqformat.h"
176#include "miscadmin.h"
177#include "storage/dsm_registry.h"
178#include "storage/ipc.h"
179#include "storage/latch.h"
180#include "storage/lmgr.h"
181#include "storage/procsignal.h"
182#include "storage/subsystems.h"
183#include "tcop/tcopprot.h"
184#include "utils/builtins.h"
185#include "utils/dsa.h"
186#include "utils/guc_hooks.h"
187#include "utils/memutils.h"
188#include "utils/ps_status.h"
189#include "utils/snapmgr.h"
190#include "utils/timestamp.h"
191
192
193/*
194 * Maximum size of a NOTIFY payload, including terminating NULL. This
195 * must be kept small enough so that a notification message fits on one
196 * SLRU page. The magic fudge factor here is noncritical as long as it's
197 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
198 * than that, so changes in that data structure won't affect user-visible
199 * restrictions.
200 */
201#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
202
203/*
204 * Struct representing an entry in the global notify queue
205 *
206 * This struct declaration has the maximal length, but in a real queue entry
207 * the data area is only big enough for the actual channel and payload strings
208 * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
209 * entry size, if both channel and payload strings are empty (but note it
210 * doesn't include alignment padding).
211 *
212 * The "length" field should always be rounded up to the next QUEUEALIGN
213 * multiple so that all fields are properly aligned.
214 */
215typedef struct AsyncQueueEntry
216{
217 int length; /* total allocated length of entry */
218 Oid dboid; /* sender's database OID */
219 TransactionId xid; /* sender's XID */
220 int32 srcPid; /* sender's PID */
223
224/* Currently, no field of AsyncQueueEntry requires more than int alignment */
225#define QUEUEALIGN(len) INTALIGN(len)
226
227#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
228
229/*
230 * Struct describing a queue position, and assorted macros for working with it
231 */
232typedef struct QueuePosition
233{
234 int64 page; /* SLRU page number */
235 int offset; /* byte offset within page */
237
238#define QUEUE_POS_PAGE(x) ((x).page)
239#define QUEUE_POS_OFFSET(x) ((x).offset)
240
241#define SET_QUEUE_POS(x,y,z) \
242 do { \
243 (x).page = (y); \
244 (x).offset = (z); \
245 } while (0)
246
247#define QUEUE_POS_EQUAL(x,y) \
248 ((x).page == (y).page && (x).offset == (y).offset)
249
250#define QUEUE_POS_IS_ZERO(x) \
251 ((x).page == 0 && (x).offset == 0)
252
253/* choose logically smaller QueuePosition */
254#define QUEUE_POS_MIN(x,y) \
255 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
256 (x).page != (y).page ? (y) : \
257 (x).offset < (y).offset ? (x) : (y))
258
259/* choose logically larger QueuePosition */
260#define QUEUE_POS_MAX(x,y) \
261 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
262 (x).page != (y).page ? (x) : \
263 (x).offset > (y).offset ? (x) : (y))
264
265/* returns true if x comes before y in queue order */
266#define QUEUE_POS_PRECEDES(x,y) \
267 (asyncQueuePagePrecedes((x).page, (y).page) || \
268 ((x).page == (y).page && (x).offset < (y).offset))
269
270/*
271 * Parameter determining how often we try to advance the tail pointer:
272 * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
273 * also the distance by which a backend that's not interested in our
274 * notifications needs to be behind before we'll decide we need to wake it
275 * up so it can advance its pointer.
276 *
277 * Resist the temptation to make this really large. While that would save
278 * work in some places, it would add cost in others. In particular, this
279 * should likely be less than notify_buffers, to ensure that backends
280 * catch up before the pages they'll need to read fall out of SLRU cache.
281 */
282#define QUEUE_CLEANUP_DELAY 4
283
284/*
285 * Struct describing a listening backend's status
286 */
287typedef struct QueueBackendStatus
288{
289 int32 pid; /* either a PID or InvalidPid */
290 Oid dboid; /* backend's database OID, or InvalidOid */
291 ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
292 QueuePosition pos; /* backend has read queue up to here */
293 bool wakeupPending; /* signal sent to backend, not yet processed */
294 bool isAdvancing; /* backend is advancing its position */
296
297/*
298 * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
299 *
300 * The AsyncQueueControl structure is protected by the NotifyQueueLock and
301 * NotifyQueueTailLock.
302 *
303 * When holding NotifyQueueLock in SHARED mode, backends may only inspect
304 * their own entries as well as the head and tail pointers. Consequently we
305 * can allow a backend to update its own record while holding only SHARED lock
306 * (since no other backend will inspect it).
307 *
308 * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
309 * entries of other backends and also change the head pointer. They can
310 * also advance other backends' queue positions, unless the other backend
311 * has isAdvancing set (i.e., is in process of doing that itself).
312 *
313 * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
314 * mode, backends can change the tail pointers.
315 *
316 * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
317 * the control lock for the pg_notify SLRU buffers.
318 * In order to avoid deadlocks, whenever we need multiple locks, we first get
319 * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
320 * globalChannelTable partition locks.
321 *
322 * Each backend uses the backend[] array entry with index equal to its
323 * ProcNumber. We rely on this to make SendProcSignal fast.
324 *
325 * The backend[] array entries for actively-listening backends are threaded
326 * together using firstListener and the nextListener links, so that we can
327 * scan them without having to iterate over inactive entries. We keep this
328 * list in order by ProcNumber so that the scan is cache-friendly when there
329 * are many active entries.
330 */
331typedef struct AsyncQueueControl
332{
333 QueuePosition head; /* head points to the next free location */
334 QueuePosition tail; /* tail must be <= the queue position of every
335 * listening backend */
336 int64 stopPage; /* oldest unrecycled page; must be <=
337 * tail.page */
338 ProcNumber firstListener; /* id of first listener, or
339 * INVALID_PROC_NUMBER */
340 TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
341 dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */
342 dshash_table_handle globalChannelTableDSH; /* and its dshash handle */
343 /* Array with room for MaxBackends entries: */
346
348
349static void AsyncShmemRequest(void *arg);
350static void AsyncShmemInit(void *arg);
351
356
357
358#define QUEUE_HEAD (asyncQueueControl->head)
359#define QUEUE_TAIL (asyncQueueControl->tail)
360#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
361#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
362#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
363#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
364#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
365#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
366#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
367#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
368
369/*
370 * The SLRU buffer area through which we access the notification queue
371 */
372static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
373static int asyncQueueErrdetailForIoError(const void *opaque_data);
374
376
377
378#define NotifyCtl (&NotifySlruDesc)
379#define QUEUE_PAGESIZE BLCKSZ
380
381#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
382
383/*
384 * Global channel table definitions
385 *
386 * This hash table maps (database OID, channel name) keys to arrays of
387 * ProcNumbers representing the backends listening or about to listen
388 * on each channel. The "listening" flags allow us to create hash table
389 * entries pre-commit and not have to assume that creating them post-commit
390 * will succeed.
391 */
392#define INITIAL_LISTENERS_ARRAY_SIZE 4
393
399
400typedef struct ListenerEntry
401{
402 ProcNumber procNo; /* listener's ProcNumber */
403 bool listening; /* true if committed listener */
405
406typedef struct GlobalChannelEntry
407{
408 GlobalChannelKey key; /* hash key */
409 dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
410 int numListeners; /* Number of listeners currently stored */
411 int allocatedListeners; /* Allocated size of array */
413
416
417/*
418 * localChannelTable caches the channel names this backend is listening on
419 * (including those we have staged to be listened on, but not yet committed).
420 * Used by IsListeningOn() for fast lookups when reading notifications.
421 */
423
424/* We test this condition to detect that we're not listening at all */
425#define LocalChannelTableIsEmpty() \
426 (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
427
428/*
429 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
430 * all actions requested in the current transaction. As explained above,
431 * we don't actually change listen state until we reach transaction commit.
432 *
433 * The list is kept in CurTransactionContext. In subtransactions, each
434 * subtransaction has its own list in its own CurTransactionContext, but
435 * successful subtransactions attach their lists to their parent's list.
436 * Failed subtransactions simply discard their lists.
437 */
444
445typedef struct
446{
448 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
450
451typedef struct ActionList
452{
453 int nestingLevel; /* current transaction nesting depth */
454 List *actions; /* list of ListenAction structs */
455 struct ActionList *upper; /* details for upper transaction levels */
457
459
460/*
461 * Hash table recording the final listen/unlisten intent per channel for
462 * the current transaction. Key is channel name, value is PENDING_LISTEN or
463 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
464 * per channel instead of replaying every action. This is built from the
465 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
466 * AtAbort_Notify.
467 */
473
474typedef struct PendingListenEntry
475{
476 char channel[NAMEDATALEN]; /* hash key */
477 PendingListenAction action; /* which action should we perform? */
479
481
482/*
483 * State for outbound notifies consists of a list of all channels+payloads
484 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
485 * until and unless the transaction commits. pendingNotifies is NULL if no
486 * NOTIFYs have been done in the current (sub) transaction.
487 *
488 * We discard duplicate notify events issued in the same transaction.
489 * Hence, in addition to the list proper (which we need to track the order
490 * of the events, since we guarantee to deliver them in order), we build a
491 * hash table which we can probe to detect duplicates. Since building the
492 * hash table is somewhat expensive, we do so only once we have at least
493 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
494 * before that we just scan the events linearly.
495 *
496 * The list is kept in CurTransactionContext. In subtransactions, each
497 * subtransaction has its own list in its own CurTransactionContext, but
498 * successful subtransactions add their entries to their parent's list.
499 * Failed subtransactions simply discard their lists. Since these lists
500 * are independent, there may be notify events in a subtransaction's list
501 * that duplicate events in some ancestor (sub) transaction; we get rid of
502 * the dups when merging the subtransaction's list into its parent's.
503 *
504 * Note: the action and notify lists do not interact within a transaction.
505 * In particular, if a transaction does NOTIFY and then LISTEN on the same
506 * condition name, it will get a self-notify at commit. This is a bit odd
507 * but is consistent with our historical behavior.
508 */
509typedef struct Notification
510{
511 uint16 channel_len; /* length of channel-name string */
512 uint16 payload_len; /* length of payload string */
513 /* null-terminated channel name, then null-terminated payload follow */
516
517typedef struct NotificationList
518{
519 int nestingLevel; /* current transaction nesting depth */
520 List *events; /* list of Notification structs */
521 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
522 List *uniqueChannelNames; /* unique channel names being notified */
523 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
524 struct NotificationList *upper; /* details for upper transaction levels */
526
527#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
528
530{
531 Notification *event; /* => the actual Notification struct */
532};
533
535
536/*
537 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
538 * (both just carry the channel name, with no payload).
539 */
540typedef struct ChannelName
541{
542 char channel[NAMEDATALEN]; /* hash key */
544
545/*
546 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
547 * called from inside a signal handler. That just sets the
548 * notifyInterruptPending flag and sets the process
549 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
550 * actually deal with the interrupt.
551 */
553
554/* True if we've registered an on_shmem_exit cleanup */
555static bool unlistenExitRegistered = false;
556
557/* True if we're currently registered as a listener in asyncQueueControl */
558static bool amRegisteredListener = false;
559
560/*
561 * Queue head positions for direct advancement.
562 * These are captured during PreCommit_Notify while holding the heavyweight
563 * lock on database 0, ensuring no other backend can insert notifications
564 * between them. SignalBackends uses these to advance idle backends.
565 */
568
569/*
570 * Workspace arrays for SignalBackends. These are preallocated in
571 * PreCommit_Notify to avoid needing memory allocation after committing to
572 * clog.
573 */
576
577/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
578static bool tryAdvanceTail = false;
579
580/* GUC parameters */
581bool Trace_notify = false;
582
583/* For 8 KB pages this gives 8 GB of disk space */
585
586/* local function prototypes */
587static inline int64 asyncQueuePageDiff(int64 p, int64 q);
588static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
589 const char *channel);
590static dshash_hash globalChannelTableHash(const void *key, size_t size,
591 void *arg);
592static void initGlobalChannelTable(void);
593static void initLocalChannelTable(void);
594static void queue_listen(ListenActionKind action, const char *channel);
595static void Async_UnlistenOnExit(int code, Datum arg);
596static void BecomeRegisteredListener(void);
597static void PrepareTableEntriesForListen(const char *channel);
598static void PrepareTableEntriesForUnlisten(const char *channel);
599static void PrepareTableEntriesForUnlistenAll(void);
602 int idx);
603static void ApplyPendingListenActions(bool isCommit);
604static void CleanupListenersOnExit(void);
605static bool IsListeningOn(const char *channel);
606static void asyncQueueUnregister(void);
607static bool asyncQueueIsFull(void);
608static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
611static double asyncQueueUsage(void);
612static void asyncQueueFillWarning(void);
613static void SignalBackends(void);
614static void asyncQueueReadAllNotifications(void);
616 QueuePosition stop,
617 Snapshot snapshot);
618static void asyncQueueAdvanceTail(void);
619static void ProcessIncomingNotify(bool flush);
622static uint32 notification_hash(const void *key, Size keysize);
623static int notification_match(const void *key1, const void *key2, Size keysize);
624static void ClearPendingActionsAndNotifies(void);
625
626static int
628{
629 const QueuePosition *position = opaque_data;
630
631 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
632 position->page, position->offset);
633}
634
635/*
636 * Compute the difference between two queue page numbers.
637 * Previously this function accounted for a wraparound.
638 */
639static inline int64
641{
642 return p - q;
643}
644
645/*
646 * Determines whether p precedes q.
647 * Previously this function accounted for a wraparound.
648 */
649static inline bool
651{
652 return p < q;
653}
654
655/*
656 * GlobalChannelKeyInit
657 * Prepare a global channel table key for hashing.
658 */
659static inline void
660GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
661{
662 memset(key, 0, sizeof(GlobalChannelKey));
663 key->dboid = dboid;
664 strlcpy(key->channel, channel, NAMEDATALEN);
665}
666
667/*
668 * globalChannelTableHash
669 * Hash function for global channel table keys.
670 */
671static dshash_hash
672globalChannelTableHash(const void *key, size_t size, void *arg)
673{
674 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
675 dshash_hash h;
676
678 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
680
681 return h;
682}
683
684/* parameters for the global channel table */
693
694/*
695 * initGlobalChannelTable
696 * Lazy initialization of the global channel table.
697 */
698static void
700{
701 MemoryContext oldcontext;
702
703 /* Quick exit if we already did this */
706 return;
707
708 /* Otherwise, use a lock to ensure only one process creates the table */
710
711 /* Be sure any local memory allocated by DSA routines is persistent */
713
715 {
716 /* Initialize dynamic shared hash table for global channels */
722 NULL);
723
724 /* Store handles in shared memory for other backends to use */
728 }
729 else if (!globalChannelTable)
730 {
731 /* Attach to existing dynamic shared hash table */
737 NULL);
738 }
739
740 MemoryContextSwitchTo(oldcontext);
742}
743
744/*
745 * initLocalChannelTable
746 * Lazy initialization of the local channel table.
747 * Once created, this table lasts for the life of the session.
748 */
749static void
751{
753
754 /* Quick exit if we already did this */
755 if (localChannelTable != NULL)
756 return;
757
758 /* Initialize local hash table for this backend's listened channels */
760 hash_ctl.entrysize = sizeof(ChannelName);
761
763 hash_create("Local Listen Channels",
764 64,
765 &hash_ctl,
767}
768
769/*
770 * initPendingListenActions
771 * Lazy initialization of the pending listen actions hash table.
772 * This is allocated in CurTransactionContext during PreCommit_Notify,
773 * and destroyed at transaction end.
774 */
775static void
777{
779
781 return;
782
784 hash_ctl.entrysize = sizeof(PendingListenEntry);
786
788 hash_create("Pending Listen Actions",
790 &hash_ctl,
792}
793
794/*
795 * Register our shared memory needs
796 */
797static void
799{
800 Size size;
801
802 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
803 size = add_size(size, offsetof(AsyncQueueControl, backend));
804
805 ShmemRequestStruct(.name = "Async Queue Control",
806 .size = size,
807 .ptr = (void **) &asyncQueueControl,
808 );
809
811 .name = "notify",
812 .Dir = "pg_notify",
813
814 /* long segment names are used in order to avoid wraparound */
815 .long_segment_names = true,
816
817 .nslots = notify_buffers,
818
819 .sync_handler = SYNC_HANDLER_NONE,
820 .PagePrecedes = asyncQueuePagePrecedes,
821 .errdetail_for_io_error = asyncQueueErrdetailForIoError,
822
823 .buffer_tranche_id = LWTRANCHE_NOTIFY_BUFFER,
824 .bank_tranche_id = LWTRANCHE_NOTIFY_SLRU,
825 );
826}
827
828static void
853
854
855/*
856 * pg_notify -
857 * SQL function to send a notification event
858 */
859Datum
861{
862 const char *channel;
863 const char *payload;
864
865 if (PG_ARGISNULL(0))
866 channel = "";
867 else
869
870 if (PG_ARGISNULL(1))
871 payload = "";
872 else
874
875 /* For NOTIFY as a statement, this is checked in ProcessUtility */
877
878 Async_Notify(channel, payload);
879
881}
882
883
884/*
885 * Async_Notify
886 *
887 * This is executed by the SQL notify command.
888 *
889 * Adds the message to the list of pending notifies.
890 * Actual notification happens during transaction commit.
891 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
892 */
893void
894Async_Notify(const char *channel, const char *payload)
895{
896 int my_level = GetCurrentTransactionNestLevel();
897 size_t channel_len;
898 size_t payload_len;
899 Notification *n;
900 MemoryContext oldcontext;
901
902 if (IsParallelWorker())
903 elog(ERROR, "cannot send notifications from a parallel worker");
904
905 if (Trace_notify)
906 elog(DEBUG1, "Async_Notify(%s)", channel);
907
908 channel_len = channel ? strlen(channel) : 0;
909 payload_len = payload ? strlen(payload) : 0;
910
911 /* a channel name must be specified */
912 if (channel_len == 0)
915 errmsg("channel name cannot be empty")));
916
917 /* enforce length limits */
918 if (channel_len >= NAMEDATALEN)
921 errmsg("channel name too long")));
922
923 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
926 errmsg("payload string too long")));
927
928 /*
929 * We must construct the Notification entry, even if we end up not using
930 * it, in order to compare it cheaply to existing list entries.
931 *
932 * The notification list needs to live until end of transaction, so store
933 * it in the transaction context.
934 */
936
938 channel_len + payload_len + 2);
939 n->channel_len = channel_len;
940 n->payload_len = payload_len;
941 strcpy(n->data, channel);
942 if (payload)
943 strcpy(n->data + channel_len + 1, payload);
944 else
945 n->data[channel_len + 1] = '\0';
946
947 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
948 {
950
951 /*
952 * First notify event in current (sub)xact. Note that we allocate the
953 * NotificationList in TopTransactionContext; the nestingLevel might
954 * get changed later by AtSubCommit_Notify.
955 */
958 sizeof(NotificationList));
959 notifies->nestingLevel = my_level;
960 notifies->events = list_make1(n);
961 /* We certainly don't need a hashtable yet */
962 notifies->hashtab = NULL;
963 /* We won't build uniqueChannelNames/Hash till later, either */
964 notifies->uniqueChannelNames = NIL;
965 notifies->uniqueChannelHash = NULL;
966 notifies->upper = pendingNotifies;
968 }
969 else
970 {
971 /* Now check for duplicates */
973 {
974 /* It's a dup, so forget it */
975 pfree(n);
976 MemoryContextSwitchTo(oldcontext);
977 return;
978 }
979
980 /* Append more events to existing list */
982 }
983
984 MemoryContextSwitchTo(oldcontext);
985}
986
987/*
988 * queue_listen
989 * Common code for listen, unlisten, unlisten all commands.
990 *
991 * Adds the request to the list of pending actions.
992 * Actual update of localChannelTable and globalChannelTable happens during
993 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
994 */
995static void
996queue_listen(ListenActionKind action, const char *channel)
997{
998 MemoryContext oldcontext;
1000 int my_level = GetCurrentTransactionNestLevel();
1001
1002 /*
1003 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1004 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1005 * final per-channel intent is computed during PreCommit_Notify.
1006 */
1008
1009 /* space for terminating null is included in sizeof(ListenAction) */
1011 strlen(channel) + 1);
1012 actrec->action = action;
1013 strcpy(actrec->channel, channel);
1014
1015 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1016 {
1017 ActionList *actions;
1018
1019 /*
1020 * First action in current sub(xact). Note that we allocate the
1021 * ActionList in TopTransactionContext; the nestingLevel might get
1022 * changed later by AtSubCommit_Notify.
1023 */
1024 actions = (ActionList *)
1026 actions->nestingLevel = my_level;
1027 actions->actions = list_make1(actrec);
1028 actions->upper = pendingActions;
1029 pendingActions = actions;
1030 }
1031 else
1033
1034 MemoryContextSwitchTo(oldcontext);
1035}
1036
1037/*
1038 * Async_Listen
1039 *
1040 * This is executed by the SQL listen command.
1041 */
1042void
1043Async_Listen(const char *channel)
1044{
1045 if (Trace_notify)
1046 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1047
1048 queue_listen(LISTEN_LISTEN, channel);
1049}
1050
1051/*
1052 * Async_Unlisten
1053 *
1054 * This is executed by the SQL unlisten command.
1055 */
1056void
1057Async_Unlisten(const char *channel)
1058{
1059 if (Trace_notify)
1060 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1061
1062 /* If we couldn't possibly be listening, no need to queue anything */
1064 return;
1065
1066 queue_listen(LISTEN_UNLISTEN, channel);
1067}
1068
1069/*
1070 * Async_UnlistenAll
1071 *
1072 * This is invoked by UNLISTEN * command, and also at backend exit.
1073 */
1074void
1076{
1077 if (Trace_notify)
1078 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1079
1080 /* If we couldn't possibly be listening, no need to queue anything */
1082 return;
1083
1085}
1086
1087/*
1088 * SQL function: return a set of the channel names this backend is actively
1089 * listening to.
1090 *
1091 * Note: this coding relies on the fact that the localChannelTable cannot
1092 * change within a transaction.
1093 */
1094Datum
1096{
1098 HASH_SEQ_STATUS *status;
1099
1100 /* stuff done only on the first call of the function */
1101 if (SRF_IS_FIRSTCALL())
1102 {
1103 /* create a function context for cross-call persistence */
1105
1106 /* Initialize hash table iteration if we have any channels */
1107 if (localChannelTable != NULL)
1108 {
1109 MemoryContext oldcontext;
1110
1111 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1112 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1114 funcctx->user_fctx = status;
1115 MemoryContextSwitchTo(oldcontext);
1116 }
1117 else
1118 {
1119 funcctx->user_fctx = NULL;
1120 }
1121 }
1122
1123 /* stuff done on every call of the function */
1125 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1126
1127 if (status != NULL)
1128 {
1129 ChannelName *entry;
1130
1131 entry = (ChannelName *) hash_seq_search(status);
1132 if (entry != NULL)
1134 }
1135
1137}
1138
1139/*
1140 * Async_UnlistenOnExit
1141 *
1142 * This is executed at backend exit if we have done any LISTENs in this
1143 * backend. It might not be necessary anymore, if the user UNLISTENed
1144 * everything, but we don't try to detect that case.
1145 */
1146static void
1152
1153/*
1154 * AtPrepare_Notify
1155 *
1156 * This is called at the prepare phase of a two-phase
1157 * transaction. Save the state for possible commit later.
1158 */
1159void
1161{
1162 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1164 ereport(ERROR,
1166 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1167}
1168
1169/*
1170 * PreCommit_Notify
1171 *
1172 * This is called at transaction commit, before actually committing to
1173 * clog.
1174 *
1175 * If there are pending LISTEN actions, make sure we are listed in the
1176 * shared-memory listener array. This must happen before commit to
1177 * ensure we don't miss any notifies from transactions that commit
1178 * just after ours.
1179 *
1180 * If there are outbound notify requests in the pendingNotifies list,
1181 * add them to the global queue. We do that before commit so that
1182 * we can still throw error if we run out of queue space.
1183 */
1184void
1186{
1187 ListCell *p;
1188
1190 return; /* no relevant statements in this xact */
1191
1192 if (Trace_notify)
1193 elog(DEBUG1, "PreCommit_Notify");
1194
1195 /* Preflight for any pending listen/unlisten actions */
1197
1198 if (pendingActions != NULL)
1199 {
1200 /* Ensure we have a local channel table */
1202 /* Create pendingListenActions hash table for this transaction */
1204
1205 /* Stage all the actions this transaction wants to perform */
1206 foreach(p, pendingActions->actions)
1207 {
1209
1210 switch (actrec->action)
1211 {
1212 case LISTEN_LISTEN:
1215 break;
1216 case LISTEN_UNLISTEN:
1218 break;
1221 break;
1222 }
1223 }
1224 }
1225
1226 /* Queue any pending notifies (must happen after the above) */
1227 if (pendingNotifies)
1228 {
1230 bool firstIteration = true;
1231
1232 /*
1233 * Build list of unique channel names being notified for use by
1234 * SignalBackends().
1235 *
1236 * If uniqueChannelHash is available, use it to efficiently get the
1237 * unique channels. Otherwise, fall back to the O(N^2) approach.
1238 */
1241 {
1242 HASH_SEQ_STATUS status;
1244
1246 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1249 channelEntry->channel);
1250 }
1251 else
1252 {
1253 /* O(N^2) approach is better for small number of notifications */
1255 {
1256 char *channel = n->data;
1257 bool found = false;
1258
1259 /* Name present in list? */
1261 {
1262 if (strcmp(oldchan, channel) == 0)
1263 {
1264 found = true;
1265 break;
1266 }
1267 }
1268 /* Add if not already in list */
1269 if (!found)
1272 channel);
1273 }
1274 }
1275
1276 /* Preallocate workspace that will be needed by SignalBackends() */
1277 if (signalPids == NULL)
1279 MaxBackends * sizeof(int32));
1280
1281 if (signalProcnos == NULL)
1283 MaxBackends * sizeof(ProcNumber));
1284
1285 /*
1286 * Make sure that we have an XID assigned to the current transaction.
1287 * GetCurrentTransactionId is cheap if we already have an XID, but not
1288 * so cheap if we don't, and we'd prefer not to do that work while
1289 * holding NotifyQueueLock.
1290 */
1292
1293 /*
1294 * Serialize writers by acquiring a special lock that we hold till
1295 * after commit. This ensures that queue entries appear in commit
1296 * order, and in particular that there are never uncommitted queue
1297 * entries ahead of committed ones, so an uncommitted transaction
1298 * can't block delivery of deliverable notifications.
1299 *
1300 * We use a heavyweight lock so that it'll automatically be released
1301 * after either commit or abort. This also allows deadlocks to be
1302 * detected, though really a deadlock shouldn't be possible here.
1303 *
1304 * The lock is on "database 0", which is pretty ugly but it doesn't
1305 * seem worth inventing a special locktag category just for this.
1306 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1307 * used by the flatfiles mechanism.)
1308 */
1311
1312 /*
1313 * For the direct advancement optimization in SignalBackends(), we
1314 * need to ensure that no other backend can insert queue entries
1315 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1316 * heavyweight lock above provides this guarantee, since it serializes
1317 * all writers.
1318 *
1319 * Note: if the heavyweight lock were ever removed for scalability
1320 * reasons, we could achieve the same guarantee by holding
1321 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1322 * than releasing and reacquiring it for each page as we do below.
1323 */
1324
1325 /* Initialize values to a safe default in case list is empty */
1328
1329 /* Now push the notifications into the queue */
1331 while (nextNotify != NULL)
1332 {
1333 /*
1334 * Add the pending notifications to the queue. We acquire and
1335 * release NotifyQueueLock once per page, which might be overkill
1336 * but it does allow readers to get in while we're doing this.
1337 *
1338 * A full queue is very uncommon and should really not happen,
1339 * given that we have so much space available in the SLRU pages.
1340 * Nevertheless we need to deal with this possibility. Note that
1341 * when we get here we are in the process of committing our
1342 * transaction, but we have not yet committed to clog, so at this
1343 * point in time we can still roll the transaction back.
1344 */
1346 if (firstIteration)
1347 {
1349 firstIteration = false;
1350 }
1352 if (asyncQueueIsFull())
1353 ereport(ERROR,
1355 errmsg("too many notifications in the NOTIFY queue")));
1359 }
1360
1361 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1362 }
1363}
1364
1365/*
1366 * AtCommit_Notify
1367 *
1368 * This is called at transaction commit, after committing to clog.
1369 *
1370 * Apply pending listen/unlisten changes and clear transaction-local state.
1371 *
1372 * If we issued any notifications in the transaction, send signals to
1373 * listening backends (possibly including ourselves) to process them.
1374 * Also, if we filled enough queue pages with new notifies, try to
1375 * advance the queue tail pointer.
1376 */
1377void
1379{
1380 /*
1381 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1382 * return as soon as possible
1383 */
1385 return;
1386
1387 if (Trace_notify)
1388 elog(DEBUG1, "AtCommit_Notify");
1389
1390 /* Apply staged listen/unlisten changes */
1392
1393 /* If no longer listening to anything, get out of listener array */
1396
1397 /*
1398 * Send signals to listening backends. We need do this only if there are
1399 * pending notifies, which were previously added to the shared queue by
1400 * PreCommit_Notify().
1401 */
1402 if (pendingNotifies != NULL)
1404
1405 /*
1406 * If it's time to try to advance the global tail pointer, do that.
1407 *
1408 * (It might seem odd to do this in the sender, when more than likely the
1409 * listeners won't yet have read the messages we just sent. However,
1410 * there's less contention if only the sender does it, and there is little
1411 * need for urgency in advancing the global tail. So this typically will
1412 * be clearing out messages that were sent some time ago.)
1413 */
1414 if (tryAdvanceTail)
1415 {
1416 tryAdvanceTail = false;
1418 }
1419
1420 /* And clean up */
1422}
1423
1424/*
1425 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1426 *
1427 * This function must make sure we are ready to catch any incoming messages.
1428 */
1429static void
1431{
1432 QueuePosition head;
1433 QueuePosition max;
1435
1436 /*
1437 * Nothing to do if we are already listening to something, nor if we
1438 * already ran this routine in this transaction.
1439 */
1441 return;
1442
1443 if (Trace_notify)
1444 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1445
1446 /*
1447 * Before registering, make sure we will unlisten before dying. (Note:
1448 * this action does not get undone if we abort later.)
1449 */
1451 {
1454 }
1455
1456 /*
1457 * This is our first LISTEN, so establish our pointer.
1458 *
1459 * We set our pointer to the global tail pointer and then move it forward
1460 * over already-committed notifications. This ensures we cannot miss any
1461 * not-yet-committed notifications. We might get a few more but that
1462 * doesn't hurt.
1463 *
1464 * In some scenarios there might be a lot of committed notifications that
1465 * have not yet been pruned away (because some backend is being lazy about
1466 * reading them). To reduce our startup time, we can look at other
1467 * backends and adopt the maximum "pos" pointer of any backend that's in
1468 * our database; any notifications it's already advanced over are surely
1469 * committed and need not be re-examined by us. (We must consider only
1470 * backends connected to our DB, because others will not have bothered to
1471 * check committed-ness of notifications in our DB.)
1472 *
1473 * We need exclusive lock here so we can look at other backends' entries
1474 * and manipulate the list links.
1475 */
1477 head = QUEUE_HEAD;
1478 max = QUEUE_TAIL;
1481 {
1483 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1484 /* Also find last listening backend before this one */
1485 if (i < MyProcNumber)
1486 prevListener = i;
1487 }
1493 /* Insert backend into list of listeners at correct position */
1495 {
1498 }
1499 else
1500 {
1503 }
1505
1506 /* Now we are listed in the global array, so remember we're listening */
1507 amRegisteredListener = true;
1508
1509 /*
1510 * Try to move our pointer forward as far as possible. This will skip
1511 * over already-committed notifications, which we want to do because they
1512 * might be quite stale. Note that we are not yet listening on anything,
1513 * so we won't deliver such notifications to our frontend. Also, although
1514 * our transaction might have executed NOTIFY, those message(s) aren't
1515 * queued yet so we won't skip them here.
1516 */
1517 if (!QUEUE_POS_EQUAL(max, head))
1519}
1520
1521/*
1522 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1523 *
1524 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1525 * an entry in localChannelTable, and pre-allocating an entry in the shared
1526 * globalChannelTable with listening=false. The listening flag will be set
1527 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1528 * will be removed.
1529 */
1530static void
1532{
1533 GlobalChannelKey key;
1534 GlobalChannelEntry *entry;
1535 bool found;
1537 PendingListenEntry *pending;
1538
1539 /*
1540 * Record in local pending hash that we want to LISTEN, overwriting any
1541 * earlier attempt to UNLISTEN.
1542 */
1543 pending = (PendingListenEntry *)
1545 pending->action = PENDING_LISTEN;
1546
1547 /*
1548 * Ensure that there is an entry for the channel in localChannelTable.
1549 * (Should this fail, we can just roll back.) If the transaction fails
1550 * after this point, we will remove the entry if appropriate during
1551 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1552 * to return TRUE; we assume nothing is going to consult that before
1553 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1554 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1555 * present to ensure they do the right things; see
1556 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1557 */
1559
1560 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1561 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1562 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1563
1564 if (!found)
1565 {
1566 /* New channel entry, so initialize it to a safe state */
1568 entry->numListeners = 0;
1569 entry->allocatedListeners = 0;
1570 }
1571
1572 /*
1573 * Create listenersArray if entry doesn't have one. It's tempting to fold
1574 * this into the !found case, but this coding allows us to cope in case
1575 * dsa_allocate() failed in an earlier attempt.
1576 */
1577 if (!DsaPointerIsValid(entry->listenersArray))
1578 {
1582 }
1583
1586
1587 /*
1588 * Check if we already have a ListenerEntry (possibly from earlier in this
1589 * transaction)
1590 */
1591 for (int i = 0; i < entry->numListeners; i++)
1592 {
1593 if (listeners[i].procNo == MyProcNumber)
1594 {
1595 /* Already have an entry; listening flag stays as-is until commit */
1597 return;
1598 }
1599 }
1600
1601 /* Need to add a new entry; grow array if necessary */
1602 if (entry->numListeners >= entry->allocatedListeners)
1603 {
1604 int new_size = entry->allocatedListeners * 2;
1607 sizeof(ListenerEntry) * new_size);
1609
1611 entry->listenersArray = new_array;
1615 }
1616
1617 listeners[entry->numListeners].procNo = MyProcNumber;
1618 listeners[entry->numListeners].listening = false; /* staged, not yet
1619 * committed */
1620 entry->numListeners++;
1621
1623}
1624
1625/*
1626 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1627 *
1628 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1629 * we're currently listening (committed or staged). We don't touch
1630 * globalChannelTable yet - the listener keeps receiving signals until
1631 * commit, when the entry is removed.
1632 */
1633static void
1635{
1636 PendingListenEntry *pending;
1637
1638 /*
1639 * If the channel name is not in localChannelTable, then we are neither
1640 * listening on it nor preparing to listen on it, so we don't need to
1641 * record an UNLISTEN action.
1642 */
1644 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1645 return;
1646
1647 /*
1648 * Record in local pending hash that we want to UNLISTEN, overwriting any
1649 * earlier attempt to LISTEN. Don't touch localChannelTable or
1650 * globalChannelTable yet - we keep receiving signals until commit.
1651 */
1652 pending = (PendingListenEntry *)
1654 pending->action = PENDING_UNLISTEN;
1655}
1656
1657/*
1658 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1659 *
1660 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1661 * about-to-be-listened channels in pendingListenActions.
1662 */
1663static void
1665{
1668 PendingListenEntry *pending;
1669
1670 /*
1671 * Scan localChannelTable, which will have the names of all channels that
1672 * we are listening on or have prepared to listen on. Record an UNLISTEN
1673 * action for each one, overwriting any earlier attempt to LISTEN.
1674 */
1676 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1677 {
1678 pending = (PendingListenEntry *)
1680 pending->action = PENDING_UNLISTEN;
1681 }
1682}
1683
1684/*
1685 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1686 *
1687 * Decrements numListeners, compacts the array, and frees the entry if empty.
1688 * Sets *entry_ptr to NULL if the entry was deleted.
1689 *
1690 * We could get the listeners pointer from the entry, but all callers
1691 * already have it at hand.
1692 */
1693static void
1696 int idx)
1697{
1698 GlobalChannelEntry *entry = *entry_ptr;
1699
1700 entry->numListeners--;
1701 if (idx < entry->numListeners)
1703 sizeof(ListenerEntry) * (entry->numListeners - idx));
1704
1705 if (entry->numListeners == 0)
1706 {
1709 /* tells caller not to release the entry's lock: */
1710 *entry_ptr = NULL;
1711 }
1712}
1713
1714/*
1715 * ApplyPendingListenActions
1716 *
1717 * Apply, or revert, staged listen/unlisten changes to the local and global
1718 * hash tables.
1719 */
1720static void
1722{
1724 PendingListenEntry *pending;
1725
1726 /* Quick exit if nothing to do */
1728 return;
1729
1730 /* We made a globalChannelTable before building pendingListenActions */
1731 if (globalChannelTable == NULL)
1732 elog(PANIC, "global channel table missing post-commit/abort");
1733
1734 /* For each staged action ... */
1736 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1737 {
1738 GlobalChannelKey key;
1739 GlobalChannelEntry *entry;
1740 bool removeLocal = true;
1741 bool foundListener = false;
1742
1743 /*
1744 * Find the global entry for this channel. If isCommit, it had better
1745 * exist (it was created in PreCommit). In an abort, it might not
1746 * exist, in which case we are not listening and should discard any
1747 * local entry that PreCommit may have managed to create.
1748 */
1749 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1750 entry = dshash_find(globalChannelTable, &key, true);
1751 if (entry != NULL)
1752 {
1753 /* Scan entry to find the ListenerEntry for this backend */
1755
1758
1759 for (int i = 0; i < entry->numListeners; i++)
1760 {
1761 if (listeners[i].procNo != MyProcNumber)
1762 continue;
1763 foundListener = true;
1764 if (isCommit)
1765 {
1766 if (pending->action == PENDING_LISTEN)
1767 {
1768 /*
1769 * LISTEN being committed: set listening=true.
1770 * localChannelTable entry was created during
1771 * PreCommit and should be kept.
1772 */
1773 listeners[i].listening = true;
1774 removeLocal = false;
1775 }
1776 else
1777 {
1778 /*
1779 * UNLISTEN being committed: remove pre-allocated
1780 * entries from both tables.
1781 */
1783 }
1784 }
1785 else
1786 {
1787 /*
1788 * Note: this part is reachable only if the transaction
1789 * aborts after PreCommit_Notify() has made some
1790 * pendingListenActions entries, so it's pretty hard to
1791 * test.
1792 */
1793 if (!listeners[i].listening)
1794 {
1795 /*
1796 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1797 * and we weren't listening before, so remove
1798 * pre-allocated entries from both tables.
1799 */
1801 }
1802 else
1803 {
1804 /*
1805 * We're aborting, but the previous state was that
1806 * we're listening, so keep localChannelTable entry.
1807 */
1808 removeLocal = false;
1809 }
1810 }
1811 break; /* there shouldn't be another match */
1812 }
1813
1814 /* We might have already released the entry by removing it */
1815 if (entry != NULL)
1817 }
1818
1819 /*
1820 * If we're committing a LISTEN action, we should have found a
1821 * matching ListenerEntry, but otherwise it's okay if we didn't.
1822 */
1823 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1824 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1825 pending->channel, MyProcNumber);
1826
1827 /*
1828 * If we did not find a globalChannelTable entry for our backend, or
1829 * if we are unlistening, remove any localChannelTable entry that may
1830 * exist. (Note in particular that this cleans up if we created a
1831 * localChannelTable entry and then failed while trying to create a
1832 * globalChannelTable entry.)
1833 */
1836 HASH_REMOVE, NULL);
1837 }
1838}
1839
1840/*
1841 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1842 *
1843 * Remove this backend from all channels in the shared global table.
1844 */
1845static void
1847{
1848 dshash_seq_status status;
1849 GlobalChannelEntry *entry;
1850
1851 if (Trace_notify)
1852 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1853
1854 /* Clear our local cache (not really necessary, but be consistent) */
1855 if (localChannelTable != NULL)
1856 {
1859 }
1860
1861 /* Now remove our entries from the shared globalChannelTable */
1862 if (globalChannelTable == NULL)
1863 return;
1864
1865 dshash_seq_init(&status, globalChannelTable, true);
1866 while ((entry = dshash_seq_next(&status)) != NULL)
1867 {
1869
1870 if (entry->key.dboid != MyDatabaseId)
1871 continue; /* not relevant */
1872
1875
1876 for (int i = 0; i < entry->numListeners; i++)
1877 {
1878 if (listeners[i].procNo == MyProcNumber)
1879 {
1880 entry->numListeners--;
1881 if (i < entry->numListeners)
1882 memmove(&listeners[i], &listeners[i + 1],
1883 sizeof(ListenerEntry) * (entry->numListeners - i));
1884
1885 if (entry->numListeners == 0)
1886 {
1888 dshash_delete_current(&status);
1889 }
1890 break;
1891 }
1892 }
1893 }
1894 dshash_seq_term(&status);
1895}
1896
1897/*
1898 * Test whether we are actively listening on the given channel name.
1899 *
1900 * Note: this function is executed for every notification found in the queue.
1901 */
1902static bool
1903IsListeningOn(const char *channel)
1904{
1905 if (localChannelTable == NULL)
1906 return false;
1907
1908 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1909}
1910
1911/*
1912 * Remove our entry from the listeners array when we are no longer listening
1913 * on any channel. NB: must not fail if we're already not listening.
1914 */
1915static void
1917{
1918 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1919
1920 if (!amRegisteredListener) /* nothing to do */
1921 return;
1922
1923 /*
1924 * Need exclusive lock here to manipulate list links.
1925 */
1927 /* Mark our entry as invalid */
1932 /* and remove it from the list */
1935 else
1936 {
1938 {
1940 {
1942 break;
1943 }
1944 }
1945 }
1948
1949 /* mark ourselves as no longer listed in the global array */
1950 amRegisteredListener = false;
1951}
1952
1953/*
1954 * Test whether there is room to insert more notification messages.
1955 *
1956 * Caller must hold at least shared NotifyQueueLock.
1957 */
1958static bool
1960{
1961 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1963 int64 occupied = headPage - tailPage;
1964
1966}
1967
1968/*
1969 * Advance the QueuePosition to the next entry, assuming that the current
1970 * entry is of length entryLength. If we jump to a new page the function
1971 * returns true, else false.
1972 */
1973static bool
1975{
1976 int64 pageno = QUEUE_POS_PAGE(*position);
1977 int offset = QUEUE_POS_OFFSET(*position);
1978 bool pageJump = false;
1979
1980 /*
1981 * Move to the next writing position: First jump over what we have just
1982 * written or read.
1983 */
1984 offset += entryLength;
1985 Assert(offset <= QUEUE_PAGESIZE);
1986
1987 /*
1988 * In a second step check if another entry can possibly be written to the
1989 * page. If so, stay here, we have reached the next position. If not, then
1990 * we need to move on to the next page.
1991 */
1993 {
1994 pageno++;
1995 offset = 0;
1996 pageJump = true;
1997 }
1998
1999 SET_QUEUE_POS(*position, pageno, offset);
2000 return pageJump;
2001}
2002
2003/*
2004 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
2005 */
2006static void
2008{
2009 size_t channellen = n->channel_len;
2010 size_t payloadlen = n->payload_len;
2011 int entryLength;
2012
2015
2016 /* The terminators are already included in AsyncQueueEntryEmptySize */
2019 qe->length = entryLength;
2020 qe->dboid = MyDatabaseId;
2021 qe->xid = GetCurrentTransactionId();
2022 qe->srcPid = MyProcPid;
2023 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2024}
2025
2026/*
2027 * Add pending notifications to the queue.
2028 *
2029 * We go page by page here, i.e. we stop once we have to go to a new page but
2030 * we will be called again and then fill that next page. If an entry does not
2031 * fit into the current page, we write a dummy entry with an InvalidOid as the
2032 * database OID in order to fill the page. So every page is always used up to
2033 * the last byte which simplifies reading the page later.
2034 *
2035 * We are passed the list cell (in pendingNotifies->events) containing the next
2036 * notification to write and return the first still-unwritten cell back.
2037 * Eventually we will return NULL indicating all is done.
2038 *
2039 * We are holding NotifyQueueLock already from the caller and grab
2040 * page specific SLRU bank lock locally in this function.
2041 */
2042static ListCell *
2044{
2047 int64 pageno;
2048 int offset;
2049 int slotno;
2051
2052 /*
2053 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2054 * memory upon exiting. The reason for this is that if we have to advance
2055 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2056 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2057 * subsequent insertions would try to put entries into a page that slru.c
2058 * thinks doesn't exist yet.) So, use a local position variable. Note
2059 * that if we do fail, any already-inserted queue entries are forgotten;
2060 * this is okay, since they'd be useless anyway after our transaction
2061 * rolls back.
2062 */
2064
2065 /*
2066 * If this is the first write since the postmaster started, we need to
2067 * initialize the first page of the async SLRU. Otherwise, the current
2068 * page should be initialized already, so just fetch it.
2069 */
2070 pageno = QUEUE_POS_PAGE(queue_head);
2072
2073 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2075
2078 else
2079 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2080
2081 /* Note we mark the page dirty before writing in it */
2082 NotifyCtl->shared->page_dirty[slotno] = true;
2083
2084 while (nextNotify != NULL)
2085 {
2087
2088 /* Construct a valid queue entry in local variable qe */
2090
2091 offset = QUEUE_POS_OFFSET(queue_head);
2092
2093 /* Check whether the entry really fits on the current page */
2094 if (offset + qe.length <= QUEUE_PAGESIZE)
2095 {
2096 /* OK, so advance nextNotify past this item */
2098 }
2099 else
2100 {
2101 /*
2102 * Write a dummy entry to fill up the page. Actually readers will
2103 * only check dboid and since it won't match any reader's database
2104 * OID, they will ignore this entry and move on.
2105 */
2106 qe.length = QUEUE_PAGESIZE - offset;
2107 qe.dboid = InvalidOid;
2109 qe.data[0] = '\0'; /* empty channel */
2110 qe.data[1] = '\0'; /* empty payload */
2111 }
2112
2113 /* Now copy qe into the shared buffer page */
2114 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2115 &qe,
2116 qe.length);
2117
2118 /* Advance queue_head appropriately, and detect if page is full */
2119 if (asyncQueueAdvance(&(queue_head), qe.length))
2120 {
2121 LWLock *lock;
2122
2123 pageno = QUEUE_POS_PAGE(queue_head);
2124 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2125 if (lock != prevlock)
2126 {
2129 prevlock = lock;
2130 }
2131
2132 /*
2133 * Page is full, so we're done here, but first fill the next page
2134 * with zeroes. The reason to do this is to ensure that slru.c's
2135 * idea of the head page is always the same as ours, which avoids
2136 * boundary problems in SimpleLruTruncate. The test in
2137 * asyncQueueIsFull() ensured that there is room to create this
2138 * page without overrunning the queue.
2139 */
2141
2142 /*
2143 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2144 * set flag to remember that we should try to advance the tail
2145 * pointer (we don't want to actually do that right here).
2146 */
2148 tryAdvanceTail = true;
2149
2150 /* And exit the loop */
2151 break;
2152 }
2153 }
2154
2155 /* Success, so update the global QUEUE_HEAD */
2157
2159
2160 return nextNotify;
2161}
2162
2163/*
2164 * SQL function to return the fraction of the notification queue currently
2165 * occupied.
2166 */
2167Datum
2169{
2170 double usage;
2171
2172 /* Advance the queue tail so we don't report a too-large result */
2174
2178
2180}
2181
2182/*
2183 * Return the fraction of the queue that is currently occupied.
2184 *
2185 * The caller must hold NotifyQueueLock in (at least) shared mode.
2186 *
2187 * Note: we measure the distance to the logical tail page, not the physical
2188 * tail page. In some sense that's wrong, but the relative position of the
2189 * physical tail is affected by details such as SLRU segment boundaries,
2190 * so that a result based on that is unpleasantly unstable.
2191 */
2192static double
2194{
2195 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2197 int64 occupied = headPage - tailPage;
2198
2199 if (occupied == 0)
2200 return (double) 0; /* fast exit for common case */
2201
2202 return (double) occupied / (double) max_notify_queue_pages;
2203}
2204
2205/*
2206 * Check whether the queue is at least half full, and emit a warning if so.
2207 *
2208 * This is unlikely given the size of the queue, but possible.
2209 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2210 *
2211 * Caller must hold exclusive NotifyQueueLock.
2212 */
2213static void
2215{
2216 double fillDegree;
2217 TimestampTz t;
2218
2220 if (fillDegree < 0.5)
2221 return;
2222
2223 t = GetCurrentTimestamp();
2224
2227 {
2230
2232 {
2234 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2237 }
2238
2240 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2241 (minPid != InvalidPid ?
2242 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2243 : 0),
2244 (minPid != InvalidPid ?
2245 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2246 : 0)));
2247
2249 }
2250}
2251
2252/*
2253 * Send signals to listening backends.
2254 *
2255 * Normally we signal only backends that are interested in the notifies that
2256 * we just sent. However, that will leave idle listeners falling further and
2257 * further behind. Waken them anyway if they're far enough behind, so they'll
2258 * advance their queue position pointers, allowing the global tail to advance.
2259 *
2260 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2261 *
2262 * This is called during CommitTransaction(), so it's important for it
2263 * to have very low probability of failure.
2264 */
2265static void
2267{
2268 int count;
2269
2270 /* Can't get here without PreCommit_Notify having made the global table */
2272
2273 /* It should have set up these arrays, too */
2275
2276 /*
2277 * Identify backends that we need to signal. We don't want to send
2278 * signals while holding the NotifyQueueLock, so this part just builds a
2279 * list of target PIDs in signalPids[] and signalProcnos[].
2280 */
2281 count = 0;
2282
2284
2285 /* Scan each channel name that we notified in this transaction */
2287 {
2288 GlobalChannelKey key;
2289 GlobalChannelEntry *entry;
2291
2292 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2293 entry = dshash_find(globalChannelTable, &key, false);
2294 if (entry == NULL)
2295 continue; /* nobody is listening */
2296
2298 entry->listenersArray);
2299
2300 /* Identify listeners that now need waking, add them to arrays */
2301 for (int j = 0; j < entry->numListeners; j++)
2302 {
2303 ProcNumber i;
2304 int32 pid;
2305 QueuePosition pos;
2306
2307 if (!listeners[j].listening)
2308 continue; /* ignore not-yet-committed listeners */
2309
2310 i = listeners[j].procNo;
2311
2313 continue; /* already signaled, no need to repeat */
2314
2315 pid = QUEUE_BACKEND_PID(i);
2316 pos = QUEUE_BACKEND_POS(i);
2317
2318 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2319 continue; /* it's fully caught up already */
2320
2321 Assert(pid != InvalidPid);
2322
2324 signalPids[count] = pid;
2325 signalProcnos[count] = i;
2326 count++;
2327 }
2328
2330 }
2331
2332 /*
2333 * Scan all listeners. Any that are not already pending wakeup must not
2334 * be interested in our notifications (else we'd have set their wakeup
2335 * flags above). Check to see if we can directly advance their queue
2336 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2337 * them anyway so they will catch up.
2338 */
2340 {
2341 int32 pid;
2342 QueuePosition pos;
2343
2345 continue;
2346
2347 /* If it's currently advancing, we should not touch it */
2349 continue;
2350
2351 pid = QUEUE_BACKEND_PID(i);
2352 pos = QUEUE_BACKEND_POS(i);
2353
2354 /*
2355 * We can directly advance the other backend's queue pointer if it's
2356 * not currently advancing (else there are race conditions), and its
2357 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2358 * it miss some older messages), and we'd not be moving the pointer
2359 * backward.
2360 */
2363 {
2364 /* We can directly advance its pointer past what we wrote */
2366 }
2369 {
2370 /* It's idle and far behind, so wake it up */
2371 Assert(pid != InvalidPid);
2372
2374 signalPids[count] = pid;
2375 signalProcnos[count] = i;
2376 count++;
2377 }
2378 }
2379
2381
2382 /* Now send signals */
2383 for (int i = 0; i < count; i++)
2384 {
2385 int32 pid = signalPids[i];
2386
2387 /*
2388 * If we are signaling our own process, no need to involve the kernel;
2389 * just set the flag directly.
2390 */
2391 if (pid == MyProcPid)
2392 {
2394 continue;
2395 }
2396
2397 /*
2398 * Note: assuming things aren't broken, a signal failure here could
2399 * only occur if the target backend exited since we released
2400 * NotifyQueueLock; which is unlikely but certainly possible. So we
2401 * just log a low-level debug message if it happens.
2402 */
2404 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2405 }
2406}
2407
2408/*
2409 * AtAbort_Notify
2410 *
2411 * This is called at transaction abort.
2412 *
2413 * Revert any staged listen/unlisten changes and clean up transaction state.
2414 * This only does anything if we abort after PreCommit_Notify has staged
2415 * some entries.
2416 */
2417void
2419{
2420 /* Revert staged listen/unlisten changes */
2422
2423 /* If we're no longer listening on anything, unregister */
2426
2427 /* And clean up */
2429}
2430
2431/*
2432 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2433 *
2434 * Reassign all items in the pending lists to the parent transaction.
2435 */
2436void
2438{
2439 int my_level = GetCurrentTransactionNestLevel();
2440
2441 /* If there are actions at our nesting level, we must reparent them. */
2442 if (pendingActions != NULL &&
2443 pendingActions->nestingLevel >= my_level)
2444 {
2445 if (pendingActions->upper == NULL ||
2446 pendingActions->upper->nestingLevel < my_level - 1)
2447 {
2448 /* nothing to merge; give the whole thing to the parent */
2450 }
2451 else
2452 {
2454
2456
2457 /*
2458 * Mustn't try to eliminate duplicates here --- see queue_listen()
2459 */
2462 childPendingActions->actions);
2464 }
2465 }
2466
2467 /* If there are notifies at our nesting level, we must reparent them. */
2468 if (pendingNotifies != NULL &&
2469 pendingNotifies->nestingLevel >= my_level)
2470 {
2471 Assert(pendingNotifies->nestingLevel == my_level);
2472
2473 if (pendingNotifies->upper == NULL ||
2474 pendingNotifies->upper->nestingLevel < my_level - 1)
2475 {
2476 /* nothing to merge; give the whole thing to the parent */
2478 }
2479 else
2480 {
2481 /*
2482 * Formerly, we didn't bother to eliminate duplicates here, but
2483 * now we must, else we fall foul of "Assert(!found)", either here
2484 * or during a later attempt to build the parent-level hashtable.
2485 */
2487 ListCell *l;
2488
2490 /* Insert all the subxact's events into parent, except for dups */
2491 foreach(l, childPendingNotifies->events)
2492 {
2494
2497 }
2499 }
2500 }
2501}
2502
2503/*
2504 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2505 */
2506void
2508{
2509 int my_level = GetCurrentTransactionNestLevel();
2510
2511 /*
2512 * All we have to do is pop the stack --- the actions/notifies made in
2513 * this subxact are no longer interesting, and the space will be freed
2514 * when CurTransactionContext is recycled. We still have to free the
2515 * ActionList and NotificationList objects themselves, though, because
2516 * those are allocated in TopTransactionContext.
2517 *
2518 * Note that there might be no entries at all, or no entries for the
2519 * current subtransaction level, either because none were ever created, or
2520 * because we reentered this routine due to trouble during subxact abort.
2521 */
2522 while (pendingActions != NULL &&
2523 pendingActions->nestingLevel >= my_level)
2524 {
2526
2529 }
2530
2531 while (pendingNotifies != NULL &&
2532 pendingNotifies->nestingLevel >= my_level)
2533 {
2535
2538 }
2539}
2540
2541/*
2542 * HandleNotifyInterrupt
2543 *
2544 * Signal handler portion of interrupt handling. Let the backend know
2545 * that there's a pending notify interrupt. If we're currently reading
2546 * from the client, this will interrupt the read and
2547 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2548 */
2549void
2551{
2552 /*
2553 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2554 * you do here.
2555 */
2556
2557 /* signal that work needs to be done */
2559
2560 /* latch will be set by procsignal_sigusr1_handler */
2561}
2562
2563/*
2564 * ProcessNotifyInterrupt
2565 *
2566 * This is called if we see notifyInterruptPending set, just before
2567 * transmitting ReadyForQuery at the end of a frontend command, and
2568 * also if a notify signal occurs while reading from the frontend.
2569 * HandleNotifyInterrupt() will cause the read to be interrupted
2570 * via the process's latch, and this routine will get called.
2571 * If we are truly idle (ie, *not* inside a transaction block),
2572 * process the incoming notifies.
2573 *
2574 * If "flush" is true, force any frontend messages out immediately.
2575 * This can be false when being called at the end of a frontend command,
2576 * since we'll flush after sending ReadyForQuery.
2577 */
2578void
2580{
2582 return; /* not really idle */
2583
2584 /* Loop in case another signal arrives while sending messages */
2586 ProcessIncomingNotify(flush);
2587}
2588
2589
2590/*
2591 * Read all pending notifications from the queue, and deliver appropriate
2592 * ones to my frontend. Stop when we reach queue head or an uncommitted
2593 * notification.
2594 */
2595static void
2597{
2598 QueuePosition pos;
2599 QueuePosition head;
2600 Snapshot snapshot;
2601
2602 /*
2603 * Fetch current state, indicate to others that we have woken up, and that
2604 * we are in process of advancing our position.
2605 */
2607 /* Assert checks that we have a valid state entry */
2611 head = QUEUE_HEAD;
2612
2613 if (QUEUE_POS_EQUAL(pos, head))
2614 {
2615 /* Nothing to do, we have read all notifications already. */
2617 return;
2618 }
2619
2622
2623 /*----------
2624 * Get snapshot we'll use to decide which xacts are still in progress.
2625 * This is trickier than it might seem, because of race conditions.
2626 * Consider the following example:
2627 *
2628 * Backend 1: Backend 2:
2629 *
2630 * transaction starts
2631 * UPDATE foo SET ...;
2632 * NOTIFY foo;
2633 * commit starts
2634 * queue the notify message
2635 * transaction starts
2636 * LISTEN foo; -- first LISTEN in session
2637 * SELECT * FROM foo WHERE ...;
2638 * commit to clog
2639 * commit starts
2640 * add backend 2 to array of listeners
2641 * advance to queue head (this code)
2642 * commit to clog
2643 *
2644 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2645 * wasn't committed yet. Ideally we'd ensure that client 2 would
2646 * eventually get transaction 1's notify message, but there's no way
2647 * to do that; until we're in the listener array, there's no guarantee
2648 * that the notify message doesn't get removed from the queue.
2649 *
2650 * Therefore the coding technique transaction 2 is using is unsafe:
2651 * applications must commit a LISTEN before inspecting database state,
2652 * if they want to ensure they will see notifications about subsequent
2653 * changes to that state.
2654 *
2655 * What we do guarantee is that we'll see all notifications from
2656 * transactions committing after the snapshot we take here.
2657 * BecomeRegisteredListener has already added us to the listener array,
2658 * so no not-yet-committed messages can be removed from the queue
2659 * before we see them.
2660 *----------
2661 */
2662 snapshot = RegisterSnapshot(GetLatestSnapshot());
2663
2664 /*
2665 * It is possible that we fail while trying to send a message to our
2666 * frontend (for example, because of encoding conversion failure). If
2667 * that happens it is critical that we not try to send the same message
2668 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2669 * ERRORs to FATAL, causing the client connection to be closed on error.
2670 *
2671 * We used to only skip over the offending message and try to soldier on,
2672 * but it was somewhat questionable to lose a notification and give the
2673 * client an ERROR instead. A client application is not be prepared for
2674 * that and can't tell that a notification was missed. It was also not
2675 * very useful in practice because notifications are often processed while
2676 * a connection is idle and reading a message from the client, and in that
2677 * state, any error is upgraded to FATAL anyway. Closing the connection
2678 * is a clear signal to the application that it might have missed
2679 * notifications.
2680 */
2681 {
2683 bool reachedStop;
2684
2685 ExitOnAnyError = true;
2686
2687 do
2688 {
2689 /*
2690 * Process messages up to the stop position, end of page, or an
2691 * uncommitted message.
2692 *
2693 * Our stop position is what we found to be the head's position
2694 * when we entered this function. It might have changed already.
2695 * But if it has, we will receive (or have already received and
2696 * queued) another signal and come here again.
2697 *
2698 * We are not holding NotifyQueueLock here! The queue can only
2699 * extend beyond the head pointer (see above) and we leave our
2700 * backend's pointer where it is so nobody will truncate or
2701 * rewrite pages under us. Especially we don't want to hold a lock
2702 * while sending the notifications to the frontend.
2703 */
2704 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2705 } while (!reachedStop);
2706
2707 /* Update shared state */
2712
2714 }
2715
2716 /* Done with snapshot */
2717 UnregisterSnapshot(snapshot);
2718}
2719
2720/*
2721 * Fetch notifications from the shared queue, beginning at position current,
2722 * and deliver relevant ones to my frontend.
2723 *
2724 * The function returns true once we have reached the stop position or an
2725 * uncommitted notification, and false if we have finished with the page.
2726 * In other words: once it returns true there is no need to look further.
2727 * The QueuePosition *current is advanced past all processed messages.
2728 */
2729static bool
2731 QueuePosition stop,
2732 Snapshot snapshot)
2733{
2734 int64 curpage = QUEUE_POS_PAGE(*current);
2735 int slotno;
2736 char *page_buffer;
2737 bool reachedStop = false;
2738 bool reachedEndOfPage;
2739
2740 /*
2741 * We copy the entries into a local buffer to avoid holding the SLRU lock
2742 * while we transmit them to our frontend. The local buffer must be
2743 * adequately aligned.
2744 */
2746 char *local_buf_end = local_buf;
2747
2749 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2750
2751 do
2752 {
2753 QueuePosition thisentry = *current;
2755
2756 if (QUEUE_POS_EQUAL(thisentry, stop))
2757 break;
2758
2759 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2760
2761 /*
2762 * Advance *current over this message, possibly to the next page. As
2763 * noted in the comments for asyncQueueReadAllNotifications, we must
2764 * do this before possibly failing while processing the message.
2765 */
2766 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2767
2768 /* Ignore messages destined for other databases */
2769 if (qe->dboid == MyDatabaseId)
2770 {
2771 if (XidInMVCCSnapshot(qe->xid, snapshot))
2772 {
2773 /*
2774 * The source transaction is still in progress, so we can't
2775 * process this message yet. Break out of the loop, but first
2776 * back up *current so we will reprocess the message next
2777 * time. (Note: it is unlikely but not impossible for
2778 * TransactionIdDidCommit to fail, so we can't really avoid
2779 * this advance-then-back-up behavior when dealing with an
2780 * uncommitted message.)
2781 *
2782 * Note that we must test XidInMVCCSnapshot before we test
2783 * TransactionIdDidCommit, else we might return a message from
2784 * a transaction that is not yet visible to snapshots; compare
2785 * the comments at the head of heapam_visibility.c.
2786 *
2787 * Also, while our own xact won't be listed in the snapshot,
2788 * we need not check for TransactionIdIsCurrentTransactionId
2789 * because our transaction cannot (yet) have queued any
2790 * messages.
2791 */
2792 *current = thisentry;
2793 reachedStop = true;
2794 break;
2795 }
2796
2797 /*
2798 * Quick check for the case that we're not listening on any
2799 * channels, before calling TransactionIdDidCommit(). This makes
2800 * that case a little faster, but more importantly, it ensures
2801 * that if there's a bad entry in the queue for which
2802 * TransactionIdDidCommit() fails for some reason, we can skip
2803 * over it on the first LISTEN in a session, and not get stuck on
2804 * it indefinitely. (This is a little trickier than it looks: it
2805 * works because BecomeRegisteredListener runs this code before we
2806 * have made the first entry in localChannelTable.)
2807 */
2809 continue;
2810
2811 if (TransactionIdDidCommit(qe->xid))
2812 {
2813 memcpy(local_buf_end, qe, qe->length);
2814 local_buf_end += qe->length;
2815 }
2816 else
2817 {
2818 /*
2819 * The source transaction aborted or crashed, so we just
2820 * ignore its notifications.
2821 */
2822 }
2823 }
2824
2825 /* Loop back if we're not at end of page */
2826 } while (!reachedEndOfPage);
2827
2828 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2830
2831 /*
2832 * Now that we have let go of the SLRU bank lock, send the notifications
2833 * to our backend
2834 */
2836 for (char *p = local_buf; p < local_buf_end;)
2837 {
2839
2840 /* qe->data is the null-terminated channel name */
2841 char *channel = qe->data;
2842
2843 if (IsListeningOn(channel))
2844 {
2845 /* payload follows channel name */
2846 char *payload = qe->data + strlen(channel) + 1;
2847
2848 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2849 }
2850
2851 p += qe->length;
2852 }
2853
2854 if (QUEUE_POS_EQUAL(*current, stop))
2855 reachedStop = true;
2856
2857 return reachedStop;
2858}
2859
2860/*
2861 * Advance the shared queue tail variable to the minimum of all the
2862 * per-backend tail pointers. Truncate pg_notify space if possible.
2863 *
2864 * This is (usually) called during CommitTransaction(), so it's important for
2865 * it to have very low probability of failure.
2866 */
2867static void
2869{
2870 QueuePosition min;
2873 int64 boundary;
2874
2875 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2877
2878 /*
2879 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2880 * (ie, exactly match at least one backend's queue position), so it must
2881 * be updated atomically with the actual computation. Since v13, we could
2882 * get away with not doing it like that, but it seems prudent to keep it
2883 * so.
2884 *
2885 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2886 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2887 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2888 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2889 * there are pages we can truncate but haven't yet finished doing so.
2890 *
2891 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2892 * performing SimpleLruTruncate. This is OK because no backend will try
2893 * to access the pages we are in the midst of truncating.
2894 */
2896 min = QUEUE_HEAD;
2898 {
2900 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2901 }
2902 QUEUE_TAIL = min;
2905
2906 /*
2907 * We can truncate something if the global tail advanced across an SLRU
2908 * segment boundary.
2909 *
2910 * XXX it might be better to truncate only once every several segments, to
2911 * reduce the number of directory scans.
2912 */
2915 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2916 {
2917 /*
2918 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2919 * release the lock again.
2920 */
2922
2926 }
2927
2929}
2930
2931/*
2932 * AsyncNotifyFreezeXids
2933 *
2934 * Prepare the async notification queue for CLOG truncation by freezing
2935 * transaction IDs that are about to become inaccessible.
2936 *
2937 * This function is called by VACUUM before advancing datfrozenxid. It scans
2938 * the notification queue and replaces XIDs that would become inaccessible
2939 * after CLOG truncation with special markers:
2940 * - Committed transactions are set to FrozenTransactionId
2941 * - Aborted/crashed transactions are set to InvalidTransactionId
2942 *
2943 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2944 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2945 * (or it would have held back newFrozenXid through ProcArray).
2946 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2947 * either aborted explicitly or crashed, and we can safely mark it invalid.
2948 */
2949void
2951{
2952 QueuePosition pos;
2953 QueuePosition head;
2954 int64 curpage = -1;
2955 int slotno = -1;
2956 char *page_buffer = NULL;
2957 bool page_dirty = false;
2958
2959 /*
2960 * Acquire locks in the correct order to avoid deadlocks. As per the
2961 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2962 * bank locks.
2963 *
2964 * We only need SHARED mode since we're just reading the head/tail
2965 * positions, not modifying them.
2966 */
2969
2970 pos = QUEUE_TAIL;
2971 head = QUEUE_HEAD;
2972
2973 /* Release NotifyQueueLock early, we only needed to read the positions */
2975
2976 /*
2977 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2978 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2979 * we're working.
2980 */
2981 while (!QUEUE_POS_EQUAL(pos, head))
2982 {
2984 TransactionId xid;
2985 int64 pageno = QUEUE_POS_PAGE(pos);
2986 int offset = QUEUE_POS_OFFSET(pos);
2987
2988 /* If we need a different page, release old lock and get new one */
2989 if (pageno != curpage)
2990 {
2991 LWLock *lock;
2992
2993 /* Release previous page if any */
2994 if (slotno >= 0)
2995 {
2996 if (page_dirty)
2997 {
2998 NotifyCtl->shared->page_dirty[slotno] = true;
2999 page_dirty = false;
3000 }
3002 }
3003
3004 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3006 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3007 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3008 curpage = pageno;
3009 }
3010
3011 qe = (AsyncQueueEntry *) (page_buffer + offset);
3012 xid = qe->xid;
3013
3014 if (TransactionIdIsNormal(xid) &&
3016 {
3017 if (TransactionIdDidCommit(xid))
3018 {
3019 qe->xid = FrozenTransactionId;
3020 page_dirty = true;
3021 }
3022 else
3023 {
3024 qe->xid = InvalidTransactionId;
3025 page_dirty = true;
3026 }
3027 }
3028
3029 /* Advance to next entry */
3030 asyncQueueAdvance(&pos, qe->length);
3031 }
3032
3033 /* Release final page lock if we acquired one */
3034 if (slotno >= 0)
3035 {
3036 if (page_dirty)
3037 NotifyCtl->shared->page_dirty[slotno] = true;
3039 }
3040
3042}
3043
3044/*
3045 * ProcessIncomingNotify
3046 *
3047 * Scan the queue for arriving notifications and report them to the front
3048 * end. The notifications might be from other sessions, or our own;
3049 * there's no need to distinguish here.
3050 *
3051 * If "flush" is true, force any frontend messages out immediately.
3052 *
3053 * NOTE: since we are outside any transaction, we must create our own.
3054 */
3055static void
3057{
3058 /* We *must* reset the flag */
3059 notifyInterruptPending = false;
3060
3061 /* Do nothing else if we aren't actively listening */
3063 return;
3064
3065 if (Trace_notify)
3066 elog(DEBUG1, "ProcessIncomingNotify");
3067
3068 set_ps_display("notify interrupt");
3069
3070 /*
3071 * We must run asyncQueueReadAllNotifications inside a transaction, else
3072 * bad things happen if it gets an error.
3073 */
3075
3077
3079
3080 /*
3081 * If this isn't an end-of-command case, we must flush the notify messages
3082 * to ensure frontend gets them promptly.
3083 */
3084 if (flush)
3085 pq_flush();
3086
3087 set_ps_display("idle");
3088
3089 if (Trace_notify)
3090 elog(DEBUG1, "ProcessIncomingNotify: done");
3091}
3092
3093/*
3094 * Send NOTIFY message to my front end.
3095 */
3096void
3097NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3098{
3100 {
3102
3104 pq_sendint32(&buf, srcPid);
3105 pq_sendstring(&buf, channel);
3106 pq_sendstring(&buf, payload);
3108
3109 /*
3110 * NOTE: we do not do pq_flush() here. Some level of caller will
3111 * handle it later, allowing this message to be combined into a packet
3112 * with other ones.
3113 */
3114 }
3115 else
3116 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3117}
3118
3119/* Does pendingNotifies include a match for the given event? */
3120static bool
3122{
3123 if (pendingNotifies == NULL)
3124 return false;
3125
3127 {
3128 /* Use the hash table to probe for a match */
3130 &n,
3131 HASH_FIND,
3132 NULL))
3133 return true;
3134 }
3135 else
3136 {
3137 /* Must scan the event list */
3138 ListCell *l;
3139
3140 foreach(l, pendingNotifies->events)
3141 {
3143
3144 if (n->channel_len == oldn->channel_len &&
3145 n->payload_len == oldn->payload_len &&
3146 memcmp(n->data, oldn->data,
3147 n->channel_len + n->payload_len + 2) == 0)
3148 return true;
3149 }
3150 }
3151
3152 return false;
3153}
3154
3155/*
3156 * Add a notification event to a pre-existing pendingNotifies list.
3157 *
3158 * Because pendingNotifies->events is already nonempty, this works
3159 * correctly no matter what CurrentMemoryContext is.
3160 */
3161static void
3163{
3165
3166 /* Create the hash tables if it's time to */
3169 {
3171 ListCell *l;
3172
3173 /* Create the hash table */
3174 hash_ctl.keysize = sizeof(Notification *);
3175 hash_ctl.entrysize = sizeof(struct NotificationHash);
3180 hash_create("Pending Notifies",
3181 256L,
3182 &hash_ctl,
3184
3185 /* Create the unique channel name table */
3187 hash_ctl.keysize = NAMEDATALEN;
3188 hash_ctl.entrysize = sizeof(ChannelName);
3191 hash_create("Pending Notify Channel Names",
3192 64L,
3193 &hash_ctl,
3195
3196 /* Insert all the already-existing events */
3197 foreach(l, pendingNotifies->events)
3198 {
3200 char *channel = oldn->data;
3201 bool found;
3202
3204 &oldn,
3205 HASH_ENTER,
3206 &found);
3207 Assert(!found);
3208
3209 /* Add channel name to uniqueChannelHash; might be there already */
3211 channel,
3212 HASH_ENTER,
3213 NULL);
3214 }
3215 }
3216
3217 /* Add new event to the list, in order */
3219
3220 /* Add event to the hash tables if needed */
3222 {
3223 char *channel = n->data;
3224 bool found;
3225
3227 &n,
3228 HASH_ENTER,
3229 &found);
3230 Assert(!found);
3231
3232 /* Add channel name to uniqueChannelHash; might be there already */
3234 channel,
3235 HASH_ENTER,
3236 NULL);
3237 }
3238}
3239
3240/*
3241 * notification_hash: hash function for notification hash table
3242 *
3243 * The hash "keys" are pointers to Notification structs.
3244 */
3245static uint32
3246notification_hash(const void *key, Size keysize)
3247{
3248 const Notification *k = *(const Notification *const *) key;
3249
3250 Assert(keysize == sizeof(Notification *));
3251 /* We don't bother to include the payload's trailing null in the hash */
3252 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3253 k->channel_len + k->payload_len + 1));
3254}
3255
3256/*
3257 * notification_match: match function to use with notification_hash
3258 */
3259static int
3260notification_match(const void *key1, const void *key2, Size keysize)
3261{
3262 const Notification *k1 = *(const Notification *const *) key1;
3263 const Notification *k2 = *(const Notification *const *) key2;
3264
3265 Assert(keysize == sizeof(Notification *));
3266 if (k1->channel_len == k2->channel_len &&
3267 k1->payload_len == k2->payload_len &&
3268 memcmp(k1->data, k2->data,
3269 k1->channel_len + k1->payload_len + 2) == 0)
3270 return 0; /* equal */
3271 return 1; /* not equal */
3272}
3273
3274/* Clear the pendingActions and pendingNotifies lists. */
3275static void
3277{
3278 /*
3279 * Everything's allocated in either TopTransactionContext or the context
3280 * for the subtransaction to which it corresponds. So, there's nothing to
3281 * do here except reset the pointers; the space will be reclaimed when the
3282 * contexts are deleted.
3283 */
3286 /* Also clear pendingListenActions, which is derived from pendingActions */
3288}
3289
3290/*
3291 * GUC check_hook for notify_buffers
3292 */
3293bool
3295{
3296 return check_slru_buffers("notify_buffers", newval);
3297}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2266
static double asyncQueueUsage(void)
Definition async.c:2193
#define MIN_HASHABLE_NOTIFIES
Definition async.c:527
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1531
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:2007
#define QUEUE_FIRST_LISTENER
Definition async.c:361
#define QUEUE_POS_MAX(x, y)
Definition async.c:260
static bool tryAdvanceTail
Definition async.c:578
void HandleNotifyInterrupt(void)
Definition async.c:2550
static void BecomeRegisteredListener(void)
Definition async.c:1430
static void asyncQueueAdvanceTail(void)
Definition async.c:2868
int max_notify_queue_pages
Definition async.c:584
static ActionList * pendingActions
Definition async.c:458
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1721
const ShmemCallbacks AsyncShmemCallbacks
Definition async.c:352
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:367
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3246
void Async_UnlistenAll(void)
Definition async.c:1075
static int32 * signalPids
Definition async.c:574
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3097
void AtCommit_Notify(void)
Definition async.c:1378
#define QUEUE_POS_MIN(x, y)
Definition async.c:254
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1634
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2579
ListenActionKind
Definition async.c:439
@ LISTEN_LISTEN
Definition async.c:440
@ LISTEN_UNLISTEN_ALL
Definition async.c:442
@ LISTEN_UNLISTEN
Definition async.c:441
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3121
#define QUEUE_BACKEND_POS(i)
Definition async.c:365
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:685
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:392
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3260
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:672
#define SET_QUEUE_POS(x, y, z)
Definition async.c:241
static ProcNumber * signalProcnos
Definition async.c:575
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2730
static void ProcessIncomingNotify(bool flush)
Definition async.c:3056
static void asyncQueueReadAllNotifications(void)
Definition async.c:2596
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1147
#define QUEUE_POS_OFFSET(x)
Definition async.c:239
static QueuePosition queueHeadAfterWrite
Definition async.c:567
static int asyncQueueErrdetailForIoError(const void *opaque_data)
Definition async.c:627
bool Trace_notify
Definition async.c:581
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2043
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3276
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1095
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:860
static NotificationList * pendingNotifies
Definition async.c:534
#define AsyncQueueEntryEmptySize
Definition async.c:227
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3162
static AsyncQueueControl * asyncQueueControl
Definition async.c:347
static bool unlistenExitRegistered
Definition async.c:555
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:650
static dsa_area * globalChannelDSA
Definition async.c:415
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1974
#define QUEUE_TAIL
Definition async.c:359
void AtAbort_Notify(void)
Definition async.c:2418
#define QUEUE_POS_PAGE(x)
Definition async.c:238
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1694
void PreCommit_Notify(void)
Definition async.c:1185
#define QUEUE_CLEANUP_DELAY
Definition async.c:282
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1664
static void asyncQueueFillWarning(void)
Definition async.c:2214
#define QUEUE_BACKEND_PID(i)
Definition async.c:362
static SlruDesc NotifySlruDesc
Definition async.c:375
static void AsyncShmemRequest(void *arg)
Definition async.c:798
static void CleanupListenersOnExit(void)
Definition async.c:1846
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:660
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:381
void Async_Unlisten(const char *channel)
Definition async.c:1057
static HTAB * pendingListenActions
Definition async.c:480
void Async_Listen(const char *channel)
Definition async.c:1043
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:201
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:250
static void initGlobalChannelTable(void)
Definition async.c:699
#define NotifyCtl
Definition async.c:378
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:366
static HTAB * localChannelTable
Definition async.c:422
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:640
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:996
#define QUEUEALIGN(len)
Definition async.c:225
static bool amRegisteredListener
Definition async.c:558
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:266
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:364
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:363
void AtSubAbort_Notify(void)
Definition async.c:2507
void AtPrepare_Notify(void)
Definition async.c:1160
#define QUEUE_PAGESIZE
Definition async.c:379
void AtSubCommit_Notify(void)
Definition async.c:2437
static bool asyncQueueIsFull(void)
Definition async.c:1959
#define QUEUE_HEAD
Definition async.c:358
static void AsyncShmemInit(void *arg)
Definition async.c:829
static void initLocalChannelTable(void)
Definition async.c:750
PendingListenAction
Definition async.c:469
@ PENDING_UNLISTEN
Definition async.c:471
@ PENDING_LISTEN
Definition async.c:470
static dshash_table * globalChannelTable
Definition async.c:414
static void asyncQueueUnregister(void)
Definition async.c:1916
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2168
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:247
#define LocalChannelTableIsEmpty()
Definition async.c:425
static void initPendingListenActions(void)
Definition async.c:776
static QueuePosition queueHeadBeforeWrite
Definition async.c:566
static bool IsListeningOn(const char *channel)
Definition async.c:1903
void Async_Notify(const char *channel, const char *payload)
Definition async.c:894
volatile sig_atomic_t notifyInterruptPending
Definition async.c:552
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2950
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3294
#define QUEUE_STOP_PAGE
Definition async.c:360
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1775
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
int32_t int32
Definition c.h:620
uint16_t uint16
Definition c.h:623
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
dsm_handle dsa_handle
Definition dsa.h:136
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:611
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:562
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:579
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:659
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:394
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:371
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:274
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:768
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:678
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:210
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:593
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:778
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
dsa_pointer dshash_table_handle
Definition dshash.h:24
uint32 dshash_hash
Definition dshash.h:30
#define dshash_find_or_insert(hash_table, key, found)
Definition dshash.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
void hash_destroy(HTAB *hashp)
Definition dynahash.c:802
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1317
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:29
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define INFO
Definition elog.h:35
#define ereport(elevel,...)
Definition elog.h:152
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:49
ProcNumber MyProcNumber
Definition globals.c:92
int MaxBackends
Definition globals.c:149
bool ExitOnAnyError
Definition globals.c:125
int notify_buffers
Definition globals.c:167
Oid MyDatabaseId
Definition globals.c:96
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:91
@ HASH_FIND
Definition hsearch.h:108
@ HASH_REMOVE
Definition hsearch.h:110
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_CONTEXT
Definition hsearch.h:97
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_COMPARE
Definition hsearch.h:94
#define HASH_FUNCTION
Definition hsearch.h:93
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:375
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:97
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:288
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176
bool SlruScanDirectory(SlruDesc *ctl, SlruScanCallback callback, void *data)
Definition slru.c:1844
int SimpleLruReadPage_ReadOnly(SlruDesc *ctl, int64 pageno, const void *opaque_data)
Definition slru.c:654
void SimpleLruTruncate(SlruDesc *ctl, int64 cutoffPage)
Definition slru.c:1458
int SimpleLruZeroPage(SlruDesc *ctl, int64 pageno)
Definition slru.c:397
bool SlruScanDirCbDeleteAll(SlruDesc *ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1797
int SimpleLruReadPage(SlruDesc *ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:550
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:377
#define SimpleLruRequest(...)
Definition slru.h:218
static LWLock * SimpleLruGetBankLock(SlruDesc *ctl, int64 pageno)
Definition slru.h:207
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:454
int nestingLevel
Definition async.c:453
struct ActionList * upper
Definition async.c:455
ProcNumber firstListener
Definition async.c:338
QueuePosition tail
Definition async.c:334
QueuePosition head
Definition async.c:333
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:344
dshash_table_handle globalChannelTableDSH
Definition async.c:342
TimestampTz lastQueueFillWarn
Definition async.c:340
dsa_handle globalChannelTableDSA
Definition async.c:341
int32 srcPid
Definition async.c:220
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:221
TransactionId xid
Definition async.c:219
char channel[NAMEDATALEN]
Definition async.c:542
dsa_pointer listenersArray
Definition async.c:409
int allocatedListeners
Definition async.c:411
GlobalChannelKey key
Definition async.c:408
char channel[NAMEDATALEN]
Definition async.c:397
Size keysize
Definition hsearch.h:69
Definition pg_list.h:54
ListenActionKind action
Definition async.c:447
ProcNumber procNo
Definition async.c:402
bool listening
Definition async.c:403
Notification * event
Definition async.c:531
List * uniqueChannelNames
Definition async.c:522
HTAB * uniqueChannelHash
Definition async.c:523
HTAB * hashtab
Definition async.c:521
List * events
Definition async.c:520
struct NotificationList * upper
Definition async.c:524
uint16 payload_len
Definition async.c:512
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:514
uint16 channel_len
Definition async.c:511
PendingListenAction action
Definition async.c:477
char channel[NAMEDATALEN]
Definition async.c:476
ProcNumber nextListener
Definition async.c:291
QueuePosition pos
Definition async.c:292
int64 page
Definition async.c:234
ShmemRequestCallback request_fn
Definition shmem.h:133
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:446
char * text_to_cstring(const text *t)
Definition varlena.c:217
const char * name
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040
int GetCurrentTransactionNestLevel(void)
Definition xact.c:931
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207
TransactionId GetCurrentTransactionId(void)
Definition xact.c:456