PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * async.c
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/async.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15/*-------------------------------------------------------------------------
16 * Async Notification Model as of v19:
17 *
18 * 1. Multiple backends on same machine. Multiple backends may be listening
19 * on each of several channels.
20 *
21 * 2. There is one central queue in disk-based storage (directory pg_notify/),
22 * with actively-used pages mapped into shared memory by the slru.c module.
23 * All notification messages are placed in the queue and later read out
24 * by listening backends. The single queue allows us to guarantee that
25 * notifications are received in commit order.
26 *
27 * Although there is only one queue, notifications are treated as being
28 * database-local; this is done by including the sender's database OID
29 * in each notification message. Listening backends ignore messages
30 * that don't match their database OID. This is important because it
31 * ensures senders and receivers have the same database encoding and won't
32 * misinterpret non-ASCII text in the channel name or payload string.
33 *
34 * Since notifications are not expected to survive database crashes,
35 * we can simply clean out the pg_notify data at any reboot, and there
36 * is no need for WAL support or fsync'ing.
37 *
38 * 3. Every backend that is listening on at least one channel registers by
39 * entering its PID into the array in AsyncQueueControl. It then scans all
40 * incoming notifications in the central queue and first compares the
41 * database OID of the notification with its own database OID and then
42 * compares the notified channel with the list of channels that it listens
43 * to. In case there is a match it delivers the notification event to its
44 * frontend. Non-matching events are simply skipped.
45 *
46 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
47 * a backend-local list which will not be processed until transaction end.
48 *
49 * Duplicate notifications from the same transaction are sent out as one
50 * notification only. This is done to save work when for example a trigger
51 * on a 2 million row table fires a notification for each row that has been
52 * changed. If the application needs to receive every single notification
53 * that has been sent, it can easily add some unique string into the extra
54 * payload parameter.
55 *
56 * When the transaction is ready to commit, PreCommit_Notify() adds the
57 * pending notifications to the head of the queue. The head pointer of the
58 * queue always points to the next free position and a position is just a
59 * page number and the offset in that page. This is done before marking the
60 * transaction as committed in clog. If we run into problems writing the
61 * notifications, we can still call elog(ERROR, ...) and the transaction
62 * will roll back safely.
63 *
64 * Once we have put all of the notifications into the queue, we return to
65 * CommitTransaction() which will then do the actual transaction commit.
66 *
67 * After commit we are called another time (AtCommit_Notify()). Here we
68 * make any required updates to the effective listen state (see below).
69 * Then we signal any backends that may be interested in our messages
70 * (including our own backend, if listening). This is done by
71 * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
72 * each relevant backend, as described below.
73 *
74 * Finally, after we are out of the transaction altogether and about to go
75 * idle, we scan the queue for messages that need to be sent to our
76 * frontend (which might be notifies from other backends, or self-notifies
77 * from our own). This step is not part of the CommitTransaction sequence
78 * for two important reasons. First, we could get errors while sending
79 * data to our frontend, and it's really bad for errors to happen in
80 * post-commit cleanup. Second, in cases where a procedure issues commits
81 * within a single frontend command, we don't want to send notifies to our
82 * frontend until the command is done; but notifies to other backends
83 * should go out immediately after each commit.
84 *
85 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
86 * sets the process's latch, which triggers the event to be processed
87 * immediately if this backend is idle (i.e., it is waiting for a frontend
88 * command and is not within a transaction block. C.f.
89 * ProcessClientReadInterrupt()). Otherwise the handler may only set a
90 * flag, which will cause the processing to occur just before we next go
91 * idle.
92 *
93 * Inbound-notify processing consists of reading all of the notifications
94 * that have arrived since scanning last time. We read every notification
95 * until we reach either a notification from an uncommitted transaction or
96 * the head pointer's position.
97 *
98 * 6. To limit disk space consumption, the tail pointer needs to be advanced
99 * so that old pages can be truncated. This is relatively expensive
100 * (notably, it requires an exclusive lock), so we don't want to do it
101 * often. We make sending backends do this work if they advanced the queue
102 * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
103 *
104 * 7. So far we have not discussed how backends change their listening state,
105 * nor how notification senders know which backends to awaken. To handle
106 * the latter, we maintain a global channel table (implemented as a dynamic
107 * shared hash table, or dshash) that maps channel names to the set of
108 * backends listening on each channel. This table is created lazily on the
109 * first LISTEN command and grows dynamically as needed. There is also a
110 * local channel table (a plain dynahash table) in each listening backend,
111 * tracking which channels that backend is listening to. The local table
112 * serves to reduce the number of accesses needed to the shared table.
113 *
114 * If the current transaction has executed any LISTEN/UNLISTEN actions,
115 * PreCommit_Notify() prepares to commit those. For LISTEN, it
116 * pre-allocates entries in both the per-backend localChannelTable and the
117 * shared globalChannelTable (with listening=false so that these entries
118 * are no-ops for the moment). It also records the final per-channel
119 * intent in pendingListenActions, so post-commit/abort processing can
120 * apply that in a single step. Since all these allocations happen before
121 * committing to clog, we can safely abort the transaction on failure.
122 *
123 * After commit, AtCommit_Notify() runs through pendingListenActions and
124 * updates the backend's per-channel listening flags to activate or
125 * deactivate listening. This happens before sending signals.
126 *
127 * SignalBackends() consults the shared global channel table to identify
128 * listeners for the channels that the current transaction sent
129 * notification(s) to. Each selected backend is marked as having a wakeup
130 * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
131 * signal is sent to it.
132 *
133 * 8. While writing notifications, PreCommit_Notify() records the queue head
134 * position both before and after the write. Because all writers serialize
135 * on a cluster-wide heavyweight lock, no other backend can insert entries
136 * between these two points. SignalBackends() uses this fact to directly
137 * advance the queue pointer for any backend that is still positioned at
138 * the old head, or within the range written, but is not interested in any
139 * of our notifications. This avoids unnecessary wakeups for idle
140 * listeners that have nothing to read. Backends that are not interested
141 * in our notifications, but cannot be directly advanced, are signaled only
142 * if they are far behind the current queue head; that is to ensure that
143 * we can advance the queue tail without undue delay.
144 *
145 * An application that listens on the same channel it notifies will get
146 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
147 * by comparing be_pid in the NOTIFY message to the application's own backend's
148 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
149 * frontend during startup.) The above design guarantees that notifies from
150 * other backends will never be missed by ignoring self-notifies.
151 *
152 * The amount of shared memory used for notify management (notify_buffers)
153 * can be varied without affecting anything but performance. The maximum
154 * amount of notification data that can be queued at one time is determined
155 * by the max_notify_queue_pages GUC.
156 *-------------------------------------------------------------------------
157 */
158
159#include "postgres.h"
160
161#include <limits.h>
162#include <unistd.h>
163#include <signal.h>
164
165#include "access/parallel.h"
166#include "access/slru.h"
167#include "access/transam.h"
168#include "access/xact.h"
169#include "catalog/pg_database.h"
170#include "commands/async.h"
171#include "common/hashfn.h"
172#include "funcapi.h"
173#include "lib/dshash.h"
174#include "libpq/libpq.h"
175#include "libpq/pqformat.h"
176#include "miscadmin.h"
177#include "storage/dsm_registry.h"
178#include "storage/ipc.h"
179#include "storage/latch.h"
180#include "storage/lmgr.h"
181#include "storage/procsignal.h"
182#include "tcop/tcopprot.h"
183#include "utils/builtins.h"
184#include "utils/dsa.h"
185#include "utils/guc_hooks.h"
186#include "utils/memutils.h"
187#include "utils/ps_status.h"
188#include "utils/snapmgr.h"
189#include "utils/timestamp.h"
190
191
192/*
193 * Maximum size of a NOTIFY payload, including terminating NULL. This
194 * must be kept small enough so that a notification message fits on one
195 * SLRU page. The magic fudge factor here is noncritical as long as it's
196 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
197 * than that, so changes in that data structure won't affect user-visible
198 * restrictions.
199 */
200#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
201
202/*
203 * Struct representing an entry in the global notify queue
204 *
205 * This struct declaration has the maximal length, but in a real queue entry
206 * the data area is only big enough for the actual channel and payload strings
207 * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
208 * entry size, if both channel and payload strings are empty (but note it
209 * doesn't include alignment padding).
210 *
211 * The "length" field should always be rounded up to the next QUEUEALIGN
212 * multiple so that all fields are properly aligned.
213 */
214typedef struct AsyncQueueEntry
215{
216 int length; /* total allocated length of entry */
217 Oid dboid; /* sender's database OID */
218 TransactionId xid; /* sender's XID */
219 int32 srcPid; /* sender's PID */
222
223/* Currently, no field of AsyncQueueEntry requires more than int alignment */
224#define QUEUEALIGN(len) INTALIGN(len)
225
226#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
227
228/*
229 * Struct describing a queue position, and assorted macros for working with it
230 */
231typedef struct QueuePosition
232{
233 int64 page; /* SLRU page number */
234 int offset; /* byte offset within page */
236
237#define QUEUE_POS_PAGE(x) ((x).page)
238#define QUEUE_POS_OFFSET(x) ((x).offset)
239
240#define SET_QUEUE_POS(x,y,z) \
241 do { \
242 (x).page = (y); \
243 (x).offset = (z); \
244 } while (0)
245
246#define QUEUE_POS_EQUAL(x,y) \
247 ((x).page == (y).page && (x).offset == (y).offset)
248
249#define QUEUE_POS_IS_ZERO(x) \
250 ((x).page == 0 && (x).offset == 0)
251
252/* choose logically smaller QueuePosition */
253#define QUEUE_POS_MIN(x,y) \
254 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
255 (x).page != (y).page ? (y) : \
256 (x).offset < (y).offset ? (x) : (y))
257
258/* choose logically larger QueuePosition */
259#define QUEUE_POS_MAX(x,y) \
260 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
261 (x).page != (y).page ? (x) : \
262 (x).offset > (y).offset ? (x) : (y))
263
264/* returns true if x comes before y in queue order */
265#define QUEUE_POS_PRECEDES(x,y) \
266 (asyncQueuePagePrecedes((x).page, (y).page) || \
267 ((x).page == (y).page && (x).offset < (y).offset))
268
269/*
270 * Parameter determining how often we try to advance the tail pointer:
271 * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
272 * also the distance by which a backend that's not interested in our
273 * notifications needs to be behind before we'll decide we need to wake it
274 * up so it can advance its pointer.
275 *
276 * Resist the temptation to make this really large. While that would save
277 * work in some places, it would add cost in others. In particular, this
278 * should likely be less than notify_buffers, to ensure that backends
279 * catch up before the pages they'll need to read fall out of SLRU cache.
280 */
281#define QUEUE_CLEANUP_DELAY 4
282
283/*
284 * Struct describing a listening backend's status
285 */
286typedef struct QueueBackendStatus
287{
288 int32 pid; /* either a PID or InvalidPid */
289 Oid dboid; /* backend's database OID, or InvalidOid */
290 ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
291 QueuePosition pos; /* backend has read queue up to here */
292 bool wakeupPending; /* signal sent to backend, not yet processed */
293 bool isAdvancing; /* backend is advancing its position */
295
296/*
297 * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
298 *
299 * The AsyncQueueControl structure is protected by the NotifyQueueLock and
300 * NotifyQueueTailLock.
301 *
302 * When holding NotifyQueueLock in SHARED mode, backends may only inspect
303 * their own entries as well as the head and tail pointers. Consequently we
304 * can allow a backend to update its own record while holding only SHARED lock
305 * (since no other backend will inspect it).
306 *
307 * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
308 * entries of other backends and also change the head pointer. They can
309 * also advance other backends' queue positions, unless the other backend
310 * has isAdvancing set (i.e., is in process of doing that itself).
311 *
312 * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
313 * mode, backends can change the tail pointers.
314 *
315 * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
316 * the control lock for the pg_notify SLRU buffers.
317 * In order to avoid deadlocks, whenever we need multiple locks, we first get
318 * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
319 * globalChannelTable partition locks.
320 *
321 * Each backend uses the backend[] array entry with index equal to its
322 * ProcNumber. We rely on this to make SendProcSignal fast.
323 *
324 * The backend[] array entries for actively-listening backends are threaded
325 * together using firstListener and the nextListener links, so that we can
326 * scan them without having to iterate over inactive entries. We keep this
327 * list in order by ProcNumber so that the scan is cache-friendly when there
328 * are many active entries.
329 */
330typedef struct AsyncQueueControl
331{
332 QueuePosition head; /* head points to the next free location */
333 QueuePosition tail; /* tail must be <= the queue position of every
334 * listening backend */
335 int64 stopPage; /* oldest unrecycled page; must be <=
336 * tail.page */
337 ProcNumber firstListener; /* id of first listener, or
338 * INVALID_PROC_NUMBER */
339 TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
340 dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */
341 dshash_table_handle globalChannelTableDSH; /* and its dshash handle */
342 /* Array with room for MaxBackends entries: */
345
347
348#define QUEUE_HEAD (asyncQueueControl->head)
349#define QUEUE_TAIL (asyncQueueControl->tail)
350#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
351#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
352#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
353#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
354#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
355#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
356#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
357#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
358
359/*
360 * The SLRU buffer area through which we access the notification queue
361 */
363
364#define NotifyCtl (&NotifyCtlData)
365#define QUEUE_PAGESIZE BLCKSZ
366
367#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
368
369/*
370 * Global channel table definitions
371 *
372 * This hash table maps (database OID, channel name) keys to arrays of
373 * ProcNumbers representing the backends listening or about to listen
374 * on each channel. The "listening" flags allow us to create hash table
375 * entries pre-commit and not have to assume that creating them post-commit
376 * will succeed.
377 */
378#define INITIAL_LISTENERS_ARRAY_SIZE 4
379
385
386typedef struct ListenerEntry
387{
388 ProcNumber procNo; /* listener's ProcNumber */
389 bool listening; /* true if committed listener */
391
392typedef struct GlobalChannelEntry
393{
394 GlobalChannelKey key; /* hash key */
395 dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
396 int numListeners; /* Number of listeners currently stored */
397 int allocatedListeners; /* Allocated size of array */
399
402
403/*
404 * localChannelTable caches the channel names this backend is listening on
405 * (including those we have staged to be listened on, but not yet committed).
406 * Used by IsListeningOn() for fast lookups when reading notifications.
407 */
409
410/* We test this condition to detect that we're not listening at all */
411#define LocalChannelTableIsEmpty() \
412 (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
413
414/*
415 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
416 * all actions requested in the current transaction. As explained above,
417 * we don't actually change listen state until we reach transaction commit.
418 *
419 * The list is kept in CurTransactionContext. In subtransactions, each
420 * subtransaction has its own list in its own CurTransactionContext, but
421 * successful subtransactions attach their lists to their parent's list.
422 * Failed subtransactions simply discard their lists.
423 */
430
431typedef struct
432{
434 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
436
437typedef struct ActionList
438{
439 int nestingLevel; /* current transaction nesting depth */
440 List *actions; /* list of ListenAction structs */
441 struct ActionList *upper; /* details for upper transaction levels */
443
445
446/*
447 * Hash table recording the final listen/unlisten intent per channel for
448 * the current transaction. Key is channel name, value is PENDING_LISTEN or
449 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
450 * per channel instead of replaying every action. This is built from the
451 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
452 * AtAbort_Notify.
453 */
459
460typedef struct PendingListenEntry
461{
462 char channel[NAMEDATALEN]; /* hash key */
463 PendingListenAction action; /* which action should we perform? */
465
467
468/*
469 * State for outbound notifies consists of a list of all channels+payloads
470 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
471 * until and unless the transaction commits. pendingNotifies is NULL if no
472 * NOTIFYs have been done in the current (sub) transaction.
473 *
474 * We discard duplicate notify events issued in the same transaction.
475 * Hence, in addition to the list proper (which we need to track the order
476 * of the events, since we guarantee to deliver them in order), we build a
477 * hash table which we can probe to detect duplicates. Since building the
478 * hash table is somewhat expensive, we do so only once we have at least
479 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
480 * before that we just scan the events linearly.
481 *
482 * The list is kept in CurTransactionContext. In subtransactions, each
483 * subtransaction has its own list in its own CurTransactionContext, but
484 * successful subtransactions add their entries to their parent's list.
485 * Failed subtransactions simply discard their lists. Since these lists
486 * are independent, there may be notify events in a subtransaction's list
487 * that duplicate events in some ancestor (sub) transaction; we get rid of
488 * the dups when merging the subtransaction's list into its parent's.
489 *
490 * Note: the action and notify lists do not interact within a transaction.
491 * In particular, if a transaction does NOTIFY and then LISTEN on the same
492 * condition name, it will get a self-notify at commit. This is a bit odd
493 * but is consistent with our historical behavior.
494 */
495typedef struct Notification
496{
497 uint16 channel_len; /* length of channel-name string */
498 uint16 payload_len; /* length of payload string */
499 /* null-terminated channel name, then null-terminated payload follow */
502
503typedef struct NotificationList
504{
505 int nestingLevel; /* current transaction nesting depth */
506 List *events; /* list of Notification structs */
507 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
508 List *uniqueChannelNames; /* unique channel names being notified */
509 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
510 struct NotificationList *upper; /* details for upper transaction levels */
512
513#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
514
516{
517 Notification *event; /* => the actual Notification struct */
518};
519
521
522/*
523 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
524 * (both just carry the channel name, with no payload).
525 */
526typedef struct ChannelName
527{
528 char channel[NAMEDATALEN]; /* hash key */
530
531/*
532 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
533 * called from inside a signal handler. That just sets the
534 * notifyInterruptPending flag and sets the process
535 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
536 * actually deal with the interrupt.
537 */
539
540/* True if we've registered an on_shmem_exit cleanup */
541static bool unlistenExitRegistered = false;
542
543/* True if we're currently registered as a listener in asyncQueueControl */
544static bool amRegisteredListener = false;
545
546/*
547 * Queue head positions for direct advancement.
548 * These are captured during PreCommit_Notify while holding the heavyweight
549 * lock on database 0, ensuring no other backend can insert notifications
550 * between them. SignalBackends uses these to advance idle backends.
551 */
554
555/*
556 * Workspace arrays for SignalBackends. These are preallocated in
557 * PreCommit_Notify to avoid needing memory allocation after committing to
558 * clog.
559 */
562
563/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
564static bool tryAdvanceTail = false;
565
566/* GUC parameters */
567bool Trace_notify = false;
568
569/* For 8 KB pages this gives 8 GB of disk space */
571
572/* local function prototypes */
573static inline int64 asyncQueuePageDiff(int64 p, int64 q);
574static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
575static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
576 const char *channel);
577static dshash_hash globalChannelTableHash(const void *key, size_t size,
578 void *arg);
579static void initGlobalChannelTable(void);
580static void initLocalChannelTable(void);
581static void queue_listen(ListenActionKind action, const char *channel);
582static void Async_UnlistenOnExit(int code, Datum arg);
583static void BecomeRegisteredListener(void);
584static void PrepareTableEntriesForListen(const char *channel);
585static void PrepareTableEntriesForUnlisten(const char *channel);
586static void PrepareTableEntriesForUnlistenAll(void);
589 int idx);
590static void ApplyPendingListenActions(bool isCommit);
591static void CleanupListenersOnExit(void);
592static bool IsListeningOn(const char *channel);
593static void asyncQueueUnregister(void);
594static bool asyncQueueIsFull(void);
595static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
598static double asyncQueueUsage(void);
599static void asyncQueueFillWarning(void);
600static void SignalBackends(void);
601static void asyncQueueReadAllNotifications(void);
603 QueuePosition stop,
604 Snapshot snapshot);
605static void asyncQueueAdvanceTail(void);
606static void ProcessIncomingNotify(bool flush);
609static uint32 notification_hash(const void *key, Size keysize);
610static int notification_match(const void *key1, const void *key2, Size keysize);
611static void ClearPendingActionsAndNotifies(void);
612
613/*
614 * Compute the difference between two queue page numbers.
615 * Previously this function accounted for a wraparound.
616 */
617static inline int64
619{
620 return p - q;
621}
622
623/*
624 * Determines whether p precedes q.
625 * Previously this function accounted for a wraparound.
626 */
627static inline bool
629{
630 return p < q;
631}
632
633/*
634 * GlobalChannelKeyInit
635 * Prepare a global channel table key for hashing.
636 */
637static inline void
638GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
639{
640 memset(key, 0, sizeof(GlobalChannelKey));
641 key->dboid = dboid;
642 strlcpy(key->channel, channel, NAMEDATALEN);
643}
644
645/*
646 * globalChannelTableHash
647 * Hash function for global channel table keys.
648 */
649static dshash_hash
650globalChannelTableHash(const void *key, size_t size, void *arg)
651{
652 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
653 dshash_hash h;
654
656 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
658
659 return h;
660}
661
662/* parameters for the global channel table */
671
672/*
673 * initGlobalChannelTable
674 * Lazy initialization of the global channel table.
675 */
676static void
678{
679 MemoryContext oldcontext;
680
681 /* Quick exit if we already did this */
684 return;
685
686 /* Otherwise, use a lock to ensure only one process creates the table */
688
689 /* Be sure any local memory allocated by DSA routines is persistent */
691
693 {
694 /* Initialize dynamic shared hash table for global channels */
700 NULL);
701
702 /* Store handles in shared memory for other backends to use */
706 }
707 else if (!globalChannelTable)
708 {
709 /* Attach to existing dynamic shared hash table */
715 NULL);
716 }
717
718 MemoryContextSwitchTo(oldcontext);
720}
721
722/*
723 * initLocalChannelTable
724 * Lazy initialization of the local channel table.
725 * Once created, this table lasts for the life of the session.
726 */
727static void
729{
731
732 /* Quick exit if we already did this */
733 if (localChannelTable != NULL)
734 return;
735
736 /* Initialize local hash table for this backend's listened channels */
738 hash_ctl.entrysize = sizeof(ChannelName);
739
741 hash_create("Local Listen Channels",
742 64,
743 &hash_ctl,
745}
746
747/*
748 * initPendingListenActions
749 * Lazy initialization of the pending listen actions hash table.
750 * This is allocated in CurTransactionContext during PreCommit_Notify,
751 * and destroyed at transaction end.
752 */
753static void
755{
757
759 return;
760
762 hash_ctl.entrysize = sizeof(PendingListenEntry);
764
766 hash_create("Pending Listen Actions",
768 &hash_ctl,
770}
771
772/*
773 * Report space needed for our shared memory area
774 */
775Size
777{
778 Size size;
779
780 /* This had better match AsyncShmemInit */
781 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
782 size = add_size(size, offsetof(AsyncQueueControl, backend));
783
785
786 return size;
787}
788
789/*
790 * Initialize our shared memory area
791 */
792void
794{
795 bool found;
796 Size size;
797
798 /*
799 * Create or attach to the AsyncQueueControl structure.
800 */
801 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
802 size = add_size(size, offsetof(AsyncQueueControl, backend));
803
805 ShmemInitStruct("Async Queue Control", size, &found);
806
807 if (!found)
808 {
809 /* First time through, so initialize it */
812 QUEUE_STOP_PAGE = 0;
817 for (int i = 0; i < MaxBackends; i++)
818 {
825 }
826 }
827
828 /*
829 * Set up SLRU management of the pg_notify data. Note that long segment
830 * names are used in order to avoid wraparound.
831 */
832 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
835 SYNC_HANDLER_NONE, true);
836
837 if (!found)
838 {
839 /*
840 * During start or reboot, clean out the pg_notify directory.
841 */
843 }
844}
845
846
847/*
848 * pg_notify -
849 * SQL function to send a notification event
850 */
851Datum
853{
854 const char *channel;
855 const char *payload;
856
857 if (PG_ARGISNULL(0))
858 channel = "";
859 else
861
862 if (PG_ARGISNULL(1))
863 payload = "";
864 else
866
867 /* For NOTIFY as a statement, this is checked in ProcessUtility */
869
870 Async_Notify(channel, payload);
871
873}
874
875
876/*
877 * Async_Notify
878 *
879 * This is executed by the SQL notify command.
880 *
881 * Adds the message to the list of pending notifies.
882 * Actual notification happens during transaction commit.
883 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
884 */
885void
886Async_Notify(const char *channel, const char *payload)
887{
888 int my_level = GetCurrentTransactionNestLevel();
889 size_t channel_len;
890 size_t payload_len;
891 Notification *n;
892 MemoryContext oldcontext;
893
894 if (IsParallelWorker())
895 elog(ERROR, "cannot send notifications from a parallel worker");
896
897 if (Trace_notify)
898 elog(DEBUG1, "Async_Notify(%s)", channel);
899
900 channel_len = channel ? strlen(channel) : 0;
901 payload_len = payload ? strlen(payload) : 0;
902
903 /* a channel name must be specified */
904 if (channel_len == 0)
907 errmsg("channel name cannot be empty")));
908
909 /* enforce length limits */
910 if (channel_len >= NAMEDATALEN)
913 errmsg("channel name too long")));
914
915 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
918 errmsg("payload string too long")));
919
920 /*
921 * We must construct the Notification entry, even if we end up not using
922 * it, in order to compare it cheaply to existing list entries.
923 *
924 * The notification list needs to live until end of transaction, so store
925 * it in the transaction context.
926 */
928
930 channel_len + payload_len + 2);
931 n->channel_len = channel_len;
932 n->payload_len = payload_len;
933 strcpy(n->data, channel);
934 if (payload)
935 strcpy(n->data + channel_len + 1, payload);
936 else
937 n->data[channel_len + 1] = '\0';
938
939 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
940 {
942
943 /*
944 * First notify event in current (sub)xact. Note that we allocate the
945 * NotificationList in TopTransactionContext; the nestingLevel might
946 * get changed later by AtSubCommit_Notify.
947 */
950 sizeof(NotificationList));
951 notifies->nestingLevel = my_level;
952 notifies->events = list_make1(n);
953 /* We certainly don't need a hashtable yet */
954 notifies->hashtab = NULL;
955 /* We won't build uniqueChannelNames/Hash till later, either */
956 notifies->uniqueChannelNames = NIL;
957 notifies->uniqueChannelHash = NULL;
958 notifies->upper = pendingNotifies;
960 }
961 else
962 {
963 /* Now check for duplicates */
965 {
966 /* It's a dup, so forget it */
967 pfree(n);
968 MemoryContextSwitchTo(oldcontext);
969 return;
970 }
971
972 /* Append more events to existing list */
974 }
975
976 MemoryContextSwitchTo(oldcontext);
977}
978
979/*
980 * queue_listen
981 * Common code for listen, unlisten, unlisten all commands.
982 *
983 * Adds the request to the list of pending actions.
984 * Actual update of localChannelTable and globalChannelTable happens during
985 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
986 */
987static void
988queue_listen(ListenActionKind action, const char *channel)
989{
990 MemoryContext oldcontext;
992 int my_level = GetCurrentTransactionNestLevel();
993
994 /*
995 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
996 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
997 * final per-channel intent is computed during PreCommit_Notify.
998 */
1000
1001 /* space for terminating null is included in sizeof(ListenAction) */
1003 strlen(channel) + 1);
1004 actrec->action = action;
1005 strcpy(actrec->channel, channel);
1006
1007 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1008 {
1009 ActionList *actions;
1010
1011 /*
1012 * First action in current sub(xact). Note that we allocate the
1013 * ActionList in TopTransactionContext; the nestingLevel might get
1014 * changed later by AtSubCommit_Notify.
1015 */
1016 actions = (ActionList *)
1018 actions->nestingLevel = my_level;
1019 actions->actions = list_make1(actrec);
1020 actions->upper = pendingActions;
1021 pendingActions = actions;
1022 }
1023 else
1025
1026 MemoryContextSwitchTo(oldcontext);
1027}
1028
1029/*
1030 * Async_Listen
1031 *
1032 * This is executed by the SQL listen command.
1033 */
1034void
1035Async_Listen(const char *channel)
1036{
1037 if (Trace_notify)
1038 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1039
1040 queue_listen(LISTEN_LISTEN, channel);
1041}
1042
1043/*
1044 * Async_Unlisten
1045 *
1046 * This is executed by the SQL unlisten command.
1047 */
1048void
1049Async_Unlisten(const char *channel)
1050{
1051 if (Trace_notify)
1052 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1053
1054 /* If we couldn't possibly be listening, no need to queue anything */
1056 return;
1057
1058 queue_listen(LISTEN_UNLISTEN, channel);
1059}
1060
1061/*
1062 * Async_UnlistenAll
1063 *
1064 * This is invoked by UNLISTEN * command, and also at backend exit.
1065 */
1066void
1068{
1069 if (Trace_notify)
1070 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1071
1072 /* If we couldn't possibly be listening, no need to queue anything */
1074 return;
1075
1077}
1078
1079/*
1080 * SQL function: return a set of the channel names this backend is actively
1081 * listening to.
1082 *
1083 * Note: this coding relies on the fact that the localChannelTable cannot
1084 * change within a transaction.
1085 */
1086Datum
1088{
1090 HASH_SEQ_STATUS *status;
1091
1092 /* stuff done only on the first call of the function */
1093 if (SRF_IS_FIRSTCALL())
1094 {
1095 /* create a function context for cross-call persistence */
1097
1098 /* Initialize hash table iteration if we have any channels */
1099 if (localChannelTable != NULL)
1100 {
1101 MemoryContext oldcontext;
1102
1103 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1104 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1106 funcctx->user_fctx = status;
1107 MemoryContextSwitchTo(oldcontext);
1108 }
1109 else
1110 {
1111 funcctx->user_fctx = NULL;
1112 }
1113 }
1114
1115 /* stuff done on every call of the function */
1117 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1118
1119 if (status != NULL)
1120 {
1121 ChannelName *entry;
1122
1123 entry = (ChannelName *) hash_seq_search(status);
1124 if (entry != NULL)
1126 }
1127
1129}
1130
1131/*
1132 * Async_UnlistenOnExit
1133 *
1134 * This is executed at backend exit if we have done any LISTENs in this
1135 * backend. It might not be necessary anymore, if the user UNLISTENed
1136 * everything, but we don't try to detect that case.
1137 */
1138static void
1144
1145/*
1146 * AtPrepare_Notify
1147 *
1148 * This is called at the prepare phase of a two-phase
1149 * transaction. Save the state for possible commit later.
1150 */
1151void
1153{
1154 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1156 ereport(ERROR,
1158 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1159}
1160
1161/*
1162 * PreCommit_Notify
1163 *
1164 * This is called at transaction commit, before actually committing to
1165 * clog.
1166 *
1167 * If there are pending LISTEN actions, make sure we are listed in the
1168 * shared-memory listener array. This must happen before commit to
1169 * ensure we don't miss any notifies from transactions that commit
1170 * just after ours.
1171 *
1172 * If there are outbound notify requests in the pendingNotifies list,
1173 * add them to the global queue. We do that before commit so that
1174 * we can still throw error if we run out of queue space.
1175 */
1176void
1178{
1179 ListCell *p;
1180
1182 return; /* no relevant statements in this xact */
1183
1184 if (Trace_notify)
1185 elog(DEBUG1, "PreCommit_Notify");
1186
1187 /* Preflight for any pending listen/unlisten actions */
1189
1190 if (pendingActions != NULL)
1191 {
1192 /* Ensure we have a local channel table */
1194 /* Create pendingListenActions hash table for this transaction */
1196
1197 /* Stage all the actions this transaction wants to perform */
1198 foreach(p, pendingActions->actions)
1199 {
1201
1202 switch (actrec->action)
1203 {
1204 case LISTEN_LISTEN:
1207 break;
1208 case LISTEN_UNLISTEN:
1210 break;
1213 break;
1214 }
1215 }
1216 }
1217
1218 /* Queue any pending notifies (must happen after the above) */
1219 if (pendingNotifies)
1220 {
1222 bool firstIteration = true;
1223
1224 /*
1225 * Build list of unique channel names being notified for use by
1226 * SignalBackends().
1227 *
1228 * If uniqueChannelHash is available, use it to efficiently get the
1229 * unique channels. Otherwise, fall back to the O(N^2) approach.
1230 */
1233 {
1234 HASH_SEQ_STATUS status;
1236
1238 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1241 channelEntry->channel);
1242 }
1243 else
1244 {
1245 /* O(N^2) approach is better for small number of notifications */
1247 {
1248 char *channel = n->data;
1249 bool found = false;
1250
1251 /* Name present in list? */
1253 {
1254 if (strcmp(oldchan, channel) == 0)
1255 {
1256 found = true;
1257 break;
1258 }
1259 }
1260 /* Add if not already in list */
1261 if (!found)
1264 channel);
1265 }
1266 }
1267
1268 /* Preallocate workspace that will be needed by SignalBackends() */
1269 if (signalPids == NULL)
1271 MaxBackends * sizeof(int32));
1272
1273 if (signalProcnos == NULL)
1275 MaxBackends * sizeof(ProcNumber));
1276
1277 /*
1278 * Make sure that we have an XID assigned to the current transaction.
1279 * GetCurrentTransactionId is cheap if we already have an XID, but not
1280 * so cheap if we don't, and we'd prefer not to do that work while
1281 * holding NotifyQueueLock.
1282 */
1284
1285 /*
1286 * Serialize writers by acquiring a special lock that we hold till
1287 * after commit. This ensures that queue entries appear in commit
1288 * order, and in particular that there are never uncommitted queue
1289 * entries ahead of committed ones, so an uncommitted transaction
1290 * can't block delivery of deliverable notifications.
1291 *
1292 * We use a heavyweight lock so that it'll automatically be released
1293 * after either commit or abort. This also allows deadlocks to be
1294 * detected, though really a deadlock shouldn't be possible here.
1295 *
1296 * The lock is on "database 0", which is pretty ugly but it doesn't
1297 * seem worth inventing a special locktag category just for this.
1298 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1299 * used by the flatfiles mechanism.)
1300 */
1303
1304 /*
1305 * For the direct advancement optimization in SignalBackends(), we
1306 * need to ensure that no other backend can insert queue entries
1307 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1308 * heavyweight lock above provides this guarantee, since it serializes
1309 * all writers.
1310 *
1311 * Note: if the heavyweight lock were ever removed for scalability
1312 * reasons, we could achieve the same guarantee by holding
1313 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1314 * than releasing and reacquiring it for each page as we do below.
1315 */
1316
1317 /* Initialize values to a safe default in case list is empty */
1320
1321 /* Now push the notifications into the queue */
1323 while (nextNotify != NULL)
1324 {
1325 /*
1326 * Add the pending notifications to the queue. We acquire and
1327 * release NotifyQueueLock once per page, which might be overkill
1328 * but it does allow readers to get in while we're doing this.
1329 *
1330 * A full queue is very uncommon and should really not happen,
1331 * given that we have so much space available in the SLRU pages.
1332 * Nevertheless we need to deal with this possibility. Note that
1333 * when we get here we are in the process of committing our
1334 * transaction, but we have not yet committed to clog, so at this
1335 * point in time we can still roll the transaction back.
1336 */
1338 if (firstIteration)
1339 {
1341 firstIteration = false;
1342 }
1344 if (asyncQueueIsFull())
1345 ereport(ERROR,
1347 errmsg("too many notifications in the NOTIFY queue")));
1351 }
1352
1353 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1354 }
1355}
1356
1357/*
1358 * AtCommit_Notify
1359 *
1360 * This is called at transaction commit, after committing to clog.
1361 *
1362 * Apply pending listen/unlisten changes and clear transaction-local state.
1363 *
1364 * If we issued any notifications in the transaction, send signals to
1365 * listening backends (possibly including ourselves) to process them.
1366 * Also, if we filled enough queue pages with new notifies, try to
1367 * advance the queue tail pointer.
1368 */
1369void
1371{
1372 /*
1373 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1374 * return as soon as possible
1375 */
1377 return;
1378
1379 if (Trace_notify)
1380 elog(DEBUG1, "AtCommit_Notify");
1381
1382 /* Apply staged listen/unlisten changes */
1384
1385 /* If no longer listening to anything, get out of listener array */
1388
1389 /*
1390 * Send signals to listening backends. We need do this only if there are
1391 * pending notifies, which were previously added to the shared queue by
1392 * PreCommit_Notify().
1393 */
1394 if (pendingNotifies != NULL)
1396
1397 /*
1398 * If it's time to try to advance the global tail pointer, do that.
1399 *
1400 * (It might seem odd to do this in the sender, when more than likely the
1401 * listeners won't yet have read the messages we just sent. However,
1402 * there's less contention if only the sender does it, and there is little
1403 * need for urgency in advancing the global tail. So this typically will
1404 * be clearing out messages that were sent some time ago.)
1405 */
1406 if (tryAdvanceTail)
1407 {
1408 tryAdvanceTail = false;
1410 }
1411
1412 /* And clean up */
1414}
1415
1416/*
1417 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1418 *
1419 * This function must make sure we are ready to catch any incoming messages.
1420 */
1421static void
1423{
1424 QueuePosition head;
1425 QueuePosition max;
1427
1428 /*
1429 * Nothing to do if we are already listening to something, nor if we
1430 * already ran this routine in this transaction.
1431 */
1433 return;
1434
1435 if (Trace_notify)
1436 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1437
1438 /*
1439 * Before registering, make sure we will unlisten before dying. (Note:
1440 * this action does not get undone if we abort later.)
1441 */
1443 {
1446 }
1447
1448 /*
1449 * This is our first LISTEN, so establish our pointer.
1450 *
1451 * We set our pointer to the global tail pointer and then move it forward
1452 * over already-committed notifications. This ensures we cannot miss any
1453 * not-yet-committed notifications. We might get a few more but that
1454 * doesn't hurt.
1455 *
1456 * In some scenarios there might be a lot of committed notifications that
1457 * have not yet been pruned away (because some backend is being lazy about
1458 * reading them). To reduce our startup time, we can look at other
1459 * backends and adopt the maximum "pos" pointer of any backend that's in
1460 * our database; any notifications it's already advanced over are surely
1461 * committed and need not be re-examined by us. (We must consider only
1462 * backends connected to our DB, because others will not have bothered to
1463 * check committed-ness of notifications in our DB.)
1464 *
1465 * We need exclusive lock here so we can look at other backends' entries
1466 * and manipulate the list links.
1467 */
1469 head = QUEUE_HEAD;
1470 max = QUEUE_TAIL;
1473 {
1475 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1476 /* Also find last listening backend before this one */
1477 if (i < MyProcNumber)
1478 prevListener = i;
1479 }
1485 /* Insert backend into list of listeners at correct position */
1487 {
1490 }
1491 else
1492 {
1495 }
1497
1498 /* Now we are listed in the global array, so remember we're listening */
1499 amRegisteredListener = true;
1500
1501 /*
1502 * Try to move our pointer forward as far as possible. This will skip
1503 * over already-committed notifications, which we want to do because they
1504 * might be quite stale. Note that we are not yet listening on anything,
1505 * so we won't deliver such notifications to our frontend. Also, although
1506 * our transaction might have executed NOTIFY, those message(s) aren't
1507 * queued yet so we won't skip them here.
1508 */
1509 if (!QUEUE_POS_EQUAL(max, head))
1511}
1512
1513/*
1514 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1515 *
1516 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1517 * an entry in localChannelTable, and pre-allocating an entry in the shared
1518 * globalChannelTable with listening=false. The listening flag will be set
1519 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1520 * will be removed.
1521 */
1522static void
1524{
1525 GlobalChannelKey key;
1526 GlobalChannelEntry *entry;
1527 bool found;
1529 PendingListenEntry *pending;
1530
1531 /*
1532 * Record in local pending hash that we want to LISTEN, overwriting any
1533 * earlier attempt to UNLISTEN.
1534 */
1535 pending = (PendingListenEntry *)
1537 pending->action = PENDING_LISTEN;
1538
1539 /*
1540 * Ensure that there is an entry for the channel in localChannelTable.
1541 * (Should this fail, we can just roll back.) If the transaction fails
1542 * after this point, we will remove the entry if appropriate during
1543 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1544 * to return TRUE; we assume nothing is going to consult that before
1545 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1546 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1547 * present to ensure they do the right things; see
1548 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1549 */
1551
1552 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1553 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1554 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1555
1556 if (!found)
1557 {
1558 /* New channel entry, so initialize it to a safe state */
1560 entry->numListeners = 0;
1561 entry->allocatedListeners = 0;
1562 }
1563
1564 /*
1565 * Create listenersArray if entry doesn't have one. It's tempting to fold
1566 * this into the !found case, but this coding allows us to cope in case
1567 * dsa_allocate() failed in an earlier attempt.
1568 */
1569 if (!DsaPointerIsValid(entry->listenersArray))
1570 {
1574 }
1575
1578
1579 /*
1580 * Check if we already have a ListenerEntry (possibly from earlier in this
1581 * transaction)
1582 */
1583 for (int i = 0; i < entry->numListeners; i++)
1584 {
1585 if (listeners[i].procNo == MyProcNumber)
1586 {
1587 /* Already have an entry; listening flag stays as-is until commit */
1589 return;
1590 }
1591 }
1592
1593 /* Need to add a new entry; grow array if necessary */
1594 if (entry->numListeners >= entry->allocatedListeners)
1595 {
1596 int new_size = entry->allocatedListeners * 2;
1599 sizeof(ListenerEntry) * new_size);
1601
1603 entry->listenersArray = new_array;
1607 }
1608
1609 listeners[entry->numListeners].procNo = MyProcNumber;
1610 listeners[entry->numListeners].listening = false; /* staged, not yet
1611 * committed */
1612 entry->numListeners++;
1613
1615}
1616
1617/*
1618 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1619 *
1620 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1621 * we're currently listening (committed or staged). We don't touch
1622 * globalChannelTable yet - the listener keeps receiving signals until
1623 * commit, when the entry is removed.
1624 */
1625static void
1627{
1628 PendingListenEntry *pending;
1629
1630 /*
1631 * If the channel name is not in localChannelTable, then we are neither
1632 * listening on it nor preparing to listen on it, so we don't need to
1633 * record an UNLISTEN action.
1634 */
1636 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1637 return;
1638
1639 /*
1640 * Record in local pending hash that we want to UNLISTEN, overwriting any
1641 * earlier attempt to LISTEN. Don't touch localChannelTable or
1642 * globalChannelTable yet - we keep receiving signals until commit.
1643 */
1644 pending = (PendingListenEntry *)
1646 pending->action = PENDING_UNLISTEN;
1647}
1648
1649/*
1650 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1651 *
1652 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1653 * about-to-be-listened channels in pendingListenActions.
1654 */
1655static void
1657{
1660 PendingListenEntry *pending;
1661
1662 /*
1663 * Scan localChannelTable, which will have the names of all channels that
1664 * we are listening on or have prepared to listen on. Record an UNLISTEN
1665 * action for each one, overwriting any earlier attempt to LISTEN.
1666 */
1668 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1669 {
1670 pending = (PendingListenEntry *)
1672 pending->action = PENDING_UNLISTEN;
1673 }
1674}
1675
1676/*
1677 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1678 *
1679 * Decrements numListeners, compacts the array, and frees the entry if empty.
1680 * Sets *entry_ptr to NULL if the entry was deleted.
1681 *
1682 * We could get the listeners pointer from the entry, but all callers
1683 * already have it at hand.
1684 */
1685static void
1688 int idx)
1689{
1690 GlobalChannelEntry *entry = *entry_ptr;
1691
1692 entry->numListeners--;
1693 if (idx < entry->numListeners)
1695 sizeof(ListenerEntry) * (entry->numListeners - idx));
1696
1697 if (entry->numListeners == 0)
1698 {
1701 /* tells caller not to release the entry's lock: */
1702 *entry_ptr = NULL;
1703 }
1704}
1705
1706/*
1707 * ApplyPendingListenActions
1708 *
1709 * Apply, or revert, staged listen/unlisten changes to the local and global
1710 * hash tables.
1711 */
1712static void
1714{
1716 PendingListenEntry *pending;
1717
1718 /* Quick exit if nothing to do */
1720 return;
1721
1722 /* We made a globalChannelTable before building pendingListenActions */
1723 if (globalChannelTable == NULL)
1724 elog(PANIC, "global channel table missing post-commit/abort");
1725
1726 /* For each staged action ... */
1728 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1729 {
1730 GlobalChannelKey key;
1731 GlobalChannelEntry *entry;
1732 bool removeLocal = true;
1733 bool foundListener = false;
1734
1735 /*
1736 * Find the global entry for this channel. If isCommit, it had better
1737 * exist (it was created in PreCommit). In an abort, it might not
1738 * exist, in which case we are not listening and should discard any
1739 * local entry that PreCommit may have managed to create.
1740 */
1741 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1742 entry = dshash_find(globalChannelTable, &key, true);
1743 if (entry != NULL)
1744 {
1745 /* Scan entry to find the ListenerEntry for this backend */
1747
1750
1751 for (int i = 0; i < entry->numListeners; i++)
1752 {
1753 if (listeners[i].procNo != MyProcNumber)
1754 continue;
1755 foundListener = true;
1756 if (isCommit)
1757 {
1758 if (pending->action == PENDING_LISTEN)
1759 {
1760 /*
1761 * LISTEN being committed: set listening=true.
1762 * localChannelTable entry was created during
1763 * PreCommit and should be kept.
1764 */
1765 listeners[i].listening = true;
1766 removeLocal = false;
1767 }
1768 else
1769 {
1770 /*
1771 * UNLISTEN being committed: remove pre-allocated
1772 * entries from both tables.
1773 */
1775 }
1776 }
1777 else
1778 {
1779 /*
1780 * Note: this part is reachable only if the transaction
1781 * aborts after PreCommit_Notify() has made some
1782 * pendingListenActions entries, so it's pretty hard to
1783 * test.
1784 */
1785 if (!listeners[i].listening)
1786 {
1787 /*
1788 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1789 * and we weren't listening before, so remove
1790 * pre-allocated entries from both tables.
1791 */
1793 }
1794 else
1795 {
1796 /*
1797 * We're aborting, but the previous state was that
1798 * we're listening, so keep localChannelTable entry.
1799 */
1800 removeLocal = false;
1801 }
1802 }
1803 break; /* there shouldn't be another match */
1804 }
1805
1806 /* We might have already released the entry by removing it */
1807 if (entry != NULL)
1809 }
1810
1811 /*
1812 * If we're committing a LISTEN action, we should have found a
1813 * matching ListenerEntry, but otherwise it's okay if we didn't.
1814 */
1815 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1816 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1817 pending->channel, MyProcNumber);
1818
1819 /*
1820 * If we did not find a globalChannelTable entry for our backend, or
1821 * if we are unlistening, remove any localChannelTable entry that may
1822 * exist. (Note in particular that this cleans up if we created a
1823 * localChannelTable entry and then failed while trying to create a
1824 * globalChannelTable entry.)
1825 */
1828 HASH_REMOVE, NULL);
1829 }
1830}
1831
1832/*
1833 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1834 *
1835 * Remove this backend from all channels in the shared global table.
1836 */
1837static void
1839{
1840 dshash_seq_status status;
1841 GlobalChannelEntry *entry;
1842
1843 if (Trace_notify)
1844 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1845
1846 /* Clear our local cache (not really necessary, but be consistent) */
1847 if (localChannelTable != NULL)
1848 {
1851 }
1852
1853 /* Now remove our entries from the shared globalChannelTable */
1854 if (globalChannelTable == NULL)
1855 return;
1856
1857 dshash_seq_init(&status, globalChannelTable, true);
1858 while ((entry = dshash_seq_next(&status)) != NULL)
1859 {
1861
1862 if (entry->key.dboid != MyDatabaseId)
1863 continue; /* not relevant */
1864
1867
1868 for (int i = 0; i < entry->numListeners; i++)
1869 {
1870 if (listeners[i].procNo == MyProcNumber)
1871 {
1872 entry->numListeners--;
1873 if (i < entry->numListeners)
1874 memmove(&listeners[i], &listeners[i + 1],
1875 sizeof(ListenerEntry) * (entry->numListeners - i));
1876
1877 if (entry->numListeners == 0)
1878 {
1880 dshash_delete_current(&status);
1881 }
1882 break;
1883 }
1884 }
1885 }
1886 dshash_seq_term(&status);
1887}
1888
1889/*
1890 * Test whether we are actively listening on the given channel name.
1891 *
1892 * Note: this function is executed for every notification found in the queue.
1893 */
1894static bool
1895IsListeningOn(const char *channel)
1896{
1897 if (localChannelTable == NULL)
1898 return false;
1899
1900 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1901}
1902
1903/*
1904 * Remove our entry from the listeners array when we are no longer listening
1905 * on any channel. NB: must not fail if we're already not listening.
1906 */
1907static void
1909{
1910 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1911
1912 if (!amRegisteredListener) /* nothing to do */
1913 return;
1914
1915 /*
1916 * Need exclusive lock here to manipulate list links.
1917 */
1919 /* Mark our entry as invalid */
1924 /* and remove it from the list */
1927 else
1928 {
1930 {
1932 {
1934 break;
1935 }
1936 }
1937 }
1940
1941 /* mark ourselves as no longer listed in the global array */
1942 amRegisteredListener = false;
1943}
1944
1945/*
1946 * Test whether there is room to insert more notification messages.
1947 *
1948 * Caller must hold at least shared NotifyQueueLock.
1949 */
1950static bool
1952{
1953 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1955 int64 occupied = headPage - tailPage;
1956
1958}
1959
1960/*
1961 * Advance the QueuePosition to the next entry, assuming that the current
1962 * entry is of length entryLength. If we jump to a new page the function
1963 * returns true, else false.
1964 */
1965static bool
1967{
1968 int64 pageno = QUEUE_POS_PAGE(*position);
1969 int offset = QUEUE_POS_OFFSET(*position);
1970 bool pageJump = false;
1971
1972 /*
1973 * Move to the next writing position: First jump over what we have just
1974 * written or read.
1975 */
1976 offset += entryLength;
1977 Assert(offset <= QUEUE_PAGESIZE);
1978
1979 /*
1980 * In a second step check if another entry can possibly be written to the
1981 * page. If so, stay here, we have reached the next position. If not, then
1982 * we need to move on to the next page.
1983 */
1985 {
1986 pageno++;
1987 offset = 0;
1988 pageJump = true;
1989 }
1990
1991 SET_QUEUE_POS(*position, pageno, offset);
1992 return pageJump;
1993}
1994
1995/*
1996 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
1997 */
1998static void
2000{
2001 size_t channellen = n->channel_len;
2002 size_t payloadlen = n->payload_len;
2003 int entryLength;
2004
2007
2008 /* The terminators are already included in AsyncQueueEntryEmptySize */
2011 qe->length = entryLength;
2012 qe->dboid = MyDatabaseId;
2013 qe->xid = GetCurrentTransactionId();
2014 qe->srcPid = MyProcPid;
2015 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2016}
2017
2018/*
2019 * Add pending notifications to the queue.
2020 *
2021 * We go page by page here, i.e. we stop once we have to go to a new page but
2022 * we will be called again and then fill that next page. If an entry does not
2023 * fit into the current page, we write a dummy entry with an InvalidOid as the
2024 * database OID in order to fill the page. So every page is always used up to
2025 * the last byte which simplifies reading the page later.
2026 *
2027 * We are passed the list cell (in pendingNotifies->events) containing the next
2028 * notification to write and return the first still-unwritten cell back.
2029 * Eventually we will return NULL indicating all is done.
2030 *
2031 * We are holding NotifyQueueLock already from the caller and grab
2032 * page specific SLRU bank lock locally in this function.
2033 */
2034static ListCell *
2036{
2039 int64 pageno;
2040 int offset;
2041 int slotno;
2043
2044 /*
2045 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2046 * memory upon exiting. The reason for this is that if we have to advance
2047 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2048 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2049 * subsequent insertions would try to put entries into a page that slru.c
2050 * thinks doesn't exist yet.) So, use a local position variable. Note
2051 * that if we do fail, any already-inserted queue entries are forgotten;
2052 * this is okay, since they'd be useless anyway after our transaction
2053 * rolls back.
2054 */
2056
2057 /*
2058 * If this is the first write since the postmaster started, we need to
2059 * initialize the first page of the async SLRU. Otherwise, the current
2060 * page should be initialized already, so just fetch it.
2061 */
2062 pageno = QUEUE_POS_PAGE(queue_head);
2064
2065 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2067
2070 else
2071 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2073
2074 /* Note we mark the page dirty before writing in it */
2075 NotifyCtl->shared->page_dirty[slotno] = true;
2076
2077 while (nextNotify != NULL)
2078 {
2080
2081 /* Construct a valid queue entry in local variable qe */
2083
2084 offset = QUEUE_POS_OFFSET(queue_head);
2085
2086 /* Check whether the entry really fits on the current page */
2087 if (offset + qe.length <= QUEUE_PAGESIZE)
2088 {
2089 /* OK, so advance nextNotify past this item */
2091 }
2092 else
2093 {
2094 /*
2095 * Write a dummy entry to fill up the page. Actually readers will
2096 * only check dboid and since it won't match any reader's database
2097 * OID, they will ignore this entry and move on.
2098 */
2099 qe.length = QUEUE_PAGESIZE - offset;
2100 qe.dboid = InvalidOid;
2102 qe.data[0] = '\0'; /* empty channel */
2103 qe.data[1] = '\0'; /* empty payload */
2104 }
2105
2106 /* Now copy qe into the shared buffer page */
2107 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2108 &qe,
2109 qe.length);
2110
2111 /* Advance queue_head appropriately, and detect if page is full */
2112 if (asyncQueueAdvance(&(queue_head), qe.length))
2113 {
2114 LWLock *lock;
2115
2116 pageno = QUEUE_POS_PAGE(queue_head);
2117 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2118 if (lock != prevlock)
2119 {
2122 prevlock = lock;
2123 }
2124
2125 /*
2126 * Page is full, so we're done here, but first fill the next page
2127 * with zeroes. The reason to do this is to ensure that slru.c's
2128 * idea of the head page is always the same as ours, which avoids
2129 * boundary problems in SimpleLruTruncate. The test in
2130 * asyncQueueIsFull() ensured that there is room to create this
2131 * page without overrunning the queue.
2132 */
2134
2135 /*
2136 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2137 * set flag to remember that we should try to advance the tail
2138 * pointer (we don't want to actually do that right here).
2139 */
2141 tryAdvanceTail = true;
2142
2143 /* And exit the loop */
2144 break;
2145 }
2146 }
2147
2148 /* Success, so update the global QUEUE_HEAD */
2150
2152
2153 return nextNotify;
2154}
2155
2156/*
2157 * SQL function to return the fraction of the notification queue currently
2158 * occupied.
2159 */
2160Datum
2162{
2163 double usage;
2164
2165 /* Advance the queue tail so we don't report a too-large result */
2167
2171
2173}
2174
2175/*
2176 * Return the fraction of the queue that is currently occupied.
2177 *
2178 * The caller must hold NotifyQueueLock in (at least) shared mode.
2179 *
2180 * Note: we measure the distance to the logical tail page, not the physical
2181 * tail page. In some sense that's wrong, but the relative position of the
2182 * physical tail is affected by details such as SLRU segment boundaries,
2183 * so that a result based on that is unpleasantly unstable.
2184 */
2185static double
2187{
2188 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2190 int64 occupied = headPage - tailPage;
2191
2192 if (occupied == 0)
2193 return (double) 0; /* fast exit for common case */
2194
2195 return (double) occupied / (double) max_notify_queue_pages;
2196}
2197
2198/*
2199 * Check whether the queue is at least half full, and emit a warning if so.
2200 *
2201 * This is unlikely given the size of the queue, but possible.
2202 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2203 *
2204 * Caller must hold exclusive NotifyQueueLock.
2205 */
2206static void
2208{
2209 double fillDegree;
2210 TimestampTz t;
2211
2213 if (fillDegree < 0.5)
2214 return;
2215
2216 t = GetCurrentTimestamp();
2217
2220 {
2223
2225 {
2227 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2230 }
2231
2233 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2234 (minPid != InvalidPid ?
2235 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2236 : 0),
2237 (minPid != InvalidPid ?
2238 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2239 : 0)));
2240
2242 }
2243}
2244
2245/*
2246 * Send signals to listening backends.
2247 *
2248 * Normally we signal only backends that are interested in the notifies that
2249 * we just sent. However, that will leave idle listeners falling further and
2250 * further behind. Waken them anyway if they're far enough behind, so they'll
2251 * advance their queue position pointers, allowing the global tail to advance.
2252 *
2253 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2254 *
2255 * This is called during CommitTransaction(), so it's important for it
2256 * to have very low probability of failure.
2257 */
2258static void
2260{
2261 int count;
2262
2263 /* Can't get here without PreCommit_Notify having made the global table */
2265
2266 /* It should have set up these arrays, too */
2268
2269 /*
2270 * Identify backends that we need to signal. We don't want to send
2271 * signals while holding the NotifyQueueLock, so this part just builds a
2272 * list of target PIDs in signalPids[] and signalProcnos[].
2273 */
2274 count = 0;
2275
2277
2278 /* Scan each channel name that we notified in this transaction */
2280 {
2281 GlobalChannelKey key;
2282 GlobalChannelEntry *entry;
2284
2285 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2286 entry = dshash_find(globalChannelTable, &key, false);
2287 if (entry == NULL)
2288 continue; /* nobody is listening */
2289
2291 entry->listenersArray);
2292
2293 /* Identify listeners that now need waking, add them to arrays */
2294 for (int j = 0; j < entry->numListeners; j++)
2295 {
2296 ProcNumber i;
2297 int32 pid;
2298 QueuePosition pos;
2299
2300 if (!listeners[j].listening)
2301 continue; /* ignore not-yet-committed listeners */
2302
2303 i = listeners[j].procNo;
2304
2306 continue; /* already signaled, no need to repeat */
2307
2308 pid = QUEUE_BACKEND_PID(i);
2309 pos = QUEUE_BACKEND_POS(i);
2310
2311 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2312 continue; /* it's fully caught up already */
2313
2314 Assert(pid != InvalidPid);
2315
2317 signalPids[count] = pid;
2318 signalProcnos[count] = i;
2319 count++;
2320 }
2321
2323 }
2324
2325 /*
2326 * Scan all listeners. Any that are not already pending wakeup must not
2327 * be interested in our notifications (else we'd have set their wakeup
2328 * flags above). Check to see if we can directly advance their queue
2329 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2330 * them anyway so they will catch up.
2331 */
2333 {
2334 int32 pid;
2335 QueuePosition pos;
2336
2338 continue;
2339
2340 /* If it's currently advancing, we should not touch it */
2342 continue;
2343
2344 pid = QUEUE_BACKEND_PID(i);
2345 pos = QUEUE_BACKEND_POS(i);
2346
2347 /*
2348 * We can directly advance the other backend's queue pointer if it's
2349 * not currently advancing (else there are race conditions), and its
2350 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2351 * it miss some older messages), and we'd not be moving the pointer
2352 * backward.
2353 */
2356 {
2357 /* We can directly advance its pointer past what we wrote */
2359 }
2362 {
2363 /* It's idle and far behind, so wake it up */
2364 Assert(pid != InvalidPid);
2365
2367 signalPids[count] = pid;
2368 signalProcnos[count] = i;
2369 count++;
2370 }
2371 }
2372
2374
2375 /* Now send signals */
2376 for (int i = 0; i < count; i++)
2377 {
2378 int32 pid = signalPids[i];
2379
2380 /*
2381 * If we are signaling our own process, no need to involve the kernel;
2382 * just set the flag directly.
2383 */
2384 if (pid == MyProcPid)
2385 {
2387 continue;
2388 }
2389
2390 /*
2391 * Note: assuming things aren't broken, a signal failure here could
2392 * only occur if the target backend exited since we released
2393 * NotifyQueueLock; which is unlikely but certainly possible. So we
2394 * just log a low-level debug message if it happens.
2395 */
2397 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2398 }
2399}
2400
2401/*
2402 * AtAbort_Notify
2403 *
2404 * This is called at transaction abort.
2405 *
2406 * Revert any staged listen/unlisten changes and clean up transaction state.
2407 * This only does anything if we abort after PreCommit_Notify has staged
2408 * some entries.
2409 */
2410void
2412{
2413 /* Revert staged listen/unlisten changes */
2415
2416 /* If we're no longer listening on anything, unregister */
2419
2420 /* And clean up */
2422}
2423
2424/*
2425 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2426 *
2427 * Reassign all items in the pending lists to the parent transaction.
2428 */
2429void
2431{
2432 int my_level = GetCurrentTransactionNestLevel();
2433
2434 /* If there are actions at our nesting level, we must reparent them. */
2435 if (pendingActions != NULL &&
2436 pendingActions->nestingLevel >= my_level)
2437 {
2438 if (pendingActions->upper == NULL ||
2439 pendingActions->upper->nestingLevel < my_level - 1)
2440 {
2441 /* nothing to merge; give the whole thing to the parent */
2443 }
2444 else
2445 {
2447
2449
2450 /*
2451 * Mustn't try to eliminate duplicates here --- see queue_listen()
2452 */
2455 childPendingActions->actions);
2457 }
2458 }
2459
2460 /* If there are notifies at our nesting level, we must reparent them. */
2461 if (pendingNotifies != NULL &&
2462 pendingNotifies->nestingLevel >= my_level)
2463 {
2464 Assert(pendingNotifies->nestingLevel == my_level);
2465
2466 if (pendingNotifies->upper == NULL ||
2467 pendingNotifies->upper->nestingLevel < my_level - 1)
2468 {
2469 /* nothing to merge; give the whole thing to the parent */
2471 }
2472 else
2473 {
2474 /*
2475 * Formerly, we didn't bother to eliminate duplicates here, but
2476 * now we must, else we fall foul of "Assert(!found)", either here
2477 * or during a later attempt to build the parent-level hashtable.
2478 */
2480 ListCell *l;
2481
2483 /* Insert all the subxact's events into parent, except for dups */
2484 foreach(l, childPendingNotifies->events)
2485 {
2487
2490 }
2492 }
2493 }
2494}
2495
2496/*
2497 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2498 */
2499void
2501{
2502 int my_level = GetCurrentTransactionNestLevel();
2503
2504 /*
2505 * All we have to do is pop the stack --- the actions/notifies made in
2506 * this subxact are no longer interesting, and the space will be freed
2507 * when CurTransactionContext is recycled. We still have to free the
2508 * ActionList and NotificationList objects themselves, though, because
2509 * those are allocated in TopTransactionContext.
2510 *
2511 * Note that there might be no entries at all, or no entries for the
2512 * current subtransaction level, either because none were ever created, or
2513 * because we reentered this routine due to trouble during subxact abort.
2514 */
2515 while (pendingActions != NULL &&
2516 pendingActions->nestingLevel >= my_level)
2517 {
2519
2522 }
2523
2524 while (pendingNotifies != NULL &&
2525 pendingNotifies->nestingLevel >= my_level)
2526 {
2528
2531 }
2532}
2533
2534/*
2535 * HandleNotifyInterrupt
2536 *
2537 * Signal handler portion of interrupt handling. Let the backend know
2538 * that there's a pending notify interrupt. If we're currently reading
2539 * from the client, this will interrupt the read and
2540 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2541 */
2542void
2544{
2545 /*
2546 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2547 * you do here.
2548 */
2549
2550 /* signal that work needs to be done */
2552
2553 /* make sure the event is processed in due course */
2555}
2556
2557/*
2558 * ProcessNotifyInterrupt
2559 *
2560 * This is called if we see notifyInterruptPending set, just before
2561 * transmitting ReadyForQuery at the end of a frontend command, and
2562 * also if a notify signal occurs while reading from the frontend.
2563 * HandleNotifyInterrupt() will cause the read to be interrupted
2564 * via the process's latch, and this routine will get called.
2565 * If we are truly idle (ie, *not* inside a transaction block),
2566 * process the incoming notifies.
2567 *
2568 * If "flush" is true, force any frontend messages out immediately.
2569 * This can be false when being called at the end of a frontend command,
2570 * since we'll flush after sending ReadyForQuery.
2571 */
2572void
2574{
2576 return; /* not really idle */
2577
2578 /* Loop in case another signal arrives while sending messages */
2580 ProcessIncomingNotify(flush);
2581}
2582
2583
2584/*
2585 * Read all pending notifications from the queue, and deliver appropriate
2586 * ones to my frontend. Stop when we reach queue head or an uncommitted
2587 * notification.
2588 */
2589static void
2591{
2592 QueuePosition pos;
2593 QueuePosition head;
2594 Snapshot snapshot;
2595
2596 /*
2597 * Fetch current state, indicate to others that we have woken up, and that
2598 * we are in process of advancing our position.
2599 */
2601 /* Assert checks that we have a valid state entry */
2605 head = QUEUE_HEAD;
2606
2607 if (QUEUE_POS_EQUAL(pos, head))
2608 {
2609 /* Nothing to do, we have read all notifications already. */
2611 return;
2612 }
2613
2616
2617 /*----------
2618 * Get snapshot we'll use to decide which xacts are still in progress.
2619 * This is trickier than it might seem, because of race conditions.
2620 * Consider the following example:
2621 *
2622 * Backend 1: Backend 2:
2623 *
2624 * transaction starts
2625 * UPDATE foo SET ...;
2626 * NOTIFY foo;
2627 * commit starts
2628 * queue the notify message
2629 * transaction starts
2630 * LISTEN foo; -- first LISTEN in session
2631 * SELECT * FROM foo WHERE ...;
2632 * commit to clog
2633 * commit starts
2634 * add backend 2 to array of listeners
2635 * advance to queue head (this code)
2636 * commit to clog
2637 *
2638 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2639 * wasn't committed yet. Ideally we'd ensure that client 2 would
2640 * eventually get transaction 1's notify message, but there's no way
2641 * to do that; until we're in the listener array, there's no guarantee
2642 * that the notify message doesn't get removed from the queue.
2643 *
2644 * Therefore the coding technique transaction 2 is using is unsafe:
2645 * applications must commit a LISTEN before inspecting database state,
2646 * if they want to ensure they will see notifications about subsequent
2647 * changes to that state.
2648 *
2649 * What we do guarantee is that we'll see all notifications from
2650 * transactions committing after the snapshot we take here.
2651 * BecomeRegisteredListener has already added us to the listener array,
2652 * so no not-yet-committed messages can be removed from the queue
2653 * before we see them.
2654 *----------
2655 */
2656 snapshot = RegisterSnapshot(GetLatestSnapshot());
2657
2658 /*
2659 * It is possible that we fail while trying to send a message to our
2660 * frontend (for example, because of encoding conversion failure). If
2661 * that happens it is critical that we not try to send the same message
2662 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2663 * ERRORs to FATAL, causing the client connection to be closed on error.
2664 *
2665 * We used to only skip over the offending message and try to soldier on,
2666 * but it was somewhat questionable to lose a notification and give the
2667 * client an ERROR instead. A client application is not be prepared for
2668 * that and can't tell that a notification was missed. It was also not
2669 * very useful in practice because notifications are often processed while
2670 * a connection is idle and reading a message from the client, and in that
2671 * state, any error is upgraded to FATAL anyway. Closing the connection
2672 * is a clear signal to the application that it might have missed
2673 * notifications.
2674 */
2675 {
2677 bool reachedStop;
2678
2679 ExitOnAnyError = true;
2680
2681 do
2682 {
2683 /*
2684 * Process messages up to the stop position, end of page, or an
2685 * uncommitted message.
2686 *
2687 * Our stop position is what we found to be the head's position
2688 * when we entered this function. It might have changed already.
2689 * But if it has, we will receive (or have already received and
2690 * queued) another signal and come here again.
2691 *
2692 * We are not holding NotifyQueueLock here! The queue can only
2693 * extend beyond the head pointer (see above) and we leave our
2694 * backend's pointer where it is so nobody will truncate or
2695 * rewrite pages under us. Especially we don't want to hold a lock
2696 * while sending the notifications to the frontend.
2697 */
2698 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2699 } while (!reachedStop);
2700
2701 /* Update shared state */
2706
2708 }
2709
2710 /* Done with snapshot */
2711 UnregisterSnapshot(snapshot);
2712}
2713
2714/*
2715 * Fetch notifications from the shared queue, beginning at position current,
2716 * and deliver relevant ones to my frontend.
2717 *
2718 * The function returns true once we have reached the stop position or an
2719 * uncommitted notification, and false if we have finished with the page.
2720 * In other words: once it returns true there is no need to look further.
2721 * The QueuePosition *current is advanced past all processed messages.
2722 */
2723static bool
2725 QueuePosition stop,
2726 Snapshot snapshot)
2727{
2728 int64 curpage = QUEUE_POS_PAGE(*current);
2729 int slotno;
2730 char *page_buffer;
2731 bool reachedStop = false;
2732 bool reachedEndOfPage;
2733
2734 /*
2735 * We copy the entries into a local buffer to avoid holding the SLRU lock
2736 * while we transmit them to our frontend. The local buffer must be
2737 * adequately aligned.
2738 */
2740 char *local_buf_end = local_buf;
2741
2744 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2745
2746 do
2747 {
2748 QueuePosition thisentry = *current;
2750
2751 if (QUEUE_POS_EQUAL(thisentry, stop))
2752 break;
2753
2754 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2755
2756 /*
2757 * Advance *current over this message, possibly to the next page. As
2758 * noted in the comments for asyncQueueReadAllNotifications, we must
2759 * do this before possibly failing while processing the message.
2760 */
2761 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2762
2763 /* Ignore messages destined for other databases */
2764 if (qe->dboid == MyDatabaseId)
2765 {
2766 if (XidInMVCCSnapshot(qe->xid, snapshot))
2767 {
2768 /*
2769 * The source transaction is still in progress, so we can't
2770 * process this message yet. Break out of the loop, but first
2771 * back up *current so we will reprocess the message next
2772 * time. (Note: it is unlikely but not impossible for
2773 * TransactionIdDidCommit to fail, so we can't really avoid
2774 * this advance-then-back-up behavior when dealing with an
2775 * uncommitted message.)
2776 *
2777 * Note that we must test XidInMVCCSnapshot before we test
2778 * TransactionIdDidCommit, else we might return a message from
2779 * a transaction that is not yet visible to snapshots; compare
2780 * the comments at the head of heapam_visibility.c.
2781 *
2782 * Also, while our own xact won't be listed in the snapshot,
2783 * we need not check for TransactionIdIsCurrentTransactionId
2784 * because our transaction cannot (yet) have queued any
2785 * messages.
2786 */
2787 *current = thisentry;
2788 reachedStop = true;
2789 break;
2790 }
2791
2792 /*
2793 * Quick check for the case that we're not listening on any
2794 * channels, before calling TransactionIdDidCommit(). This makes
2795 * that case a little faster, but more importantly, it ensures
2796 * that if there's a bad entry in the queue for which
2797 * TransactionIdDidCommit() fails for some reason, we can skip
2798 * over it on the first LISTEN in a session, and not get stuck on
2799 * it indefinitely. (This is a little trickier than it looks: it
2800 * works because BecomeRegisteredListener runs this code before we
2801 * have made the first entry in localChannelTable.)
2802 */
2804 continue;
2805
2806 if (TransactionIdDidCommit(qe->xid))
2807 {
2808 memcpy(local_buf_end, qe, qe->length);
2809 local_buf_end += qe->length;
2810 }
2811 else
2812 {
2813 /*
2814 * The source transaction aborted or crashed, so we just
2815 * ignore its notifications.
2816 */
2817 }
2818 }
2819
2820 /* Loop back if we're not at end of page */
2821 } while (!reachedEndOfPage);
2822
2823 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2825
2826 /*
2827 * Now that we have let go of the SLRU bank lock, send the notifications
2828 * to our backend
2829 */
2831 for (char *p = local_buf; p < local_buf_end;)
2832 {
2834
2835 /* qe->data is the null-terminated channel name */
2836 char *channel = qe->data;
2837
2838 if (IsListeningOn(channel))
2839 {
2840 /* payload follows channel name */
2841 char *payload = qe->data + strlen(channel) + 1;
2842
2843 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2844 }
2845
2846 p += qe->length;
2847 }
2848
2849 if (QUEUE_POS_EQUAL(*current, stop))
2850 reachedStop = true;
2851
2852 return reachedStop;
2853}
2854
2855/*
2856 * Advance the shared queue tail variable to the minimum of all the
2857 * per-backend tail pointers. Truncate pg_notify space if possible.
2858 *
2859 * This is (usually) called during CommitTransaction(), so it's important for
2860 * it to have very low probability of failure.
2861 */
2862static void
2864{
2865 QueuePosition min;
2868 int64 boundary;
2869
2870 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2872
2873 /*
2874 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2875 * (ie, exactly match at least one backend's queue position), so it must
2876 * be updated atomically with the actual computation. Since v13, we could
2877 * get away with not doing it like that, but it seems prudent to keep it
2878 * so.
2879 *
2880 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2881 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2882 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2883 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2884 * there are pages we can truncate but haven't yet finished doing so.
2885 *
2886 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2887 * performing SimpleLruTruncate. This is OK because no backend will try
2888 * to access the pages we are in the midst of truncating.
2889 */
2891 min = QUEUE_HEAD;
2893 {
2895 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2896 }
2897 QUEUE_TAIL = min;
2900
2901 /*
2902 * We can truncate something if the global tail advanced across an SLRU
2903 * segment boundary.
2904 *
2905 * XXX it might be better to truncate only once every several segments, to
2906 * reduce the number of directory scans.
2907 */
2910 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2911 {
2912 /*
2913 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2914 * release the lock again.
2915 */
2917
2921 }
2922
2924}
2925
2926/*
2927 * AsyncNotifyFreezeXids
2928 *
2929 * Prepare the async notification queue for CLOG truncation by freezing
2930 * transaction IDs that are about to become inaccessible.
2931 *
2932 * This function is called by VACUUM before advancing datfrozenxid. It scans
2933 * the notification queue and replaces XIDs that would become inaccessible
2934 * after CLOG truncation with special markers:
2935 * - Committed transactions are set to FrozenTransactionId
2936 * - Aborted/crashed transactions are set to InvalidTransactionId
2937 *
2938 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2939 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2940 * (or it would have held back newFrozenXid through ProcArray).
2941 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2942 * either aborted explicitly or crashed, and we can safely mark it invalid.
2943 */
2944void
2946{
2947 QueuePosition pos;
2948 QueuePosition head;
2949 int64 curpage = -1;
2950 int slotno = -1;
2951 char *page_buffer = NULL;
2952 bool page_dirty = false;
2953
2954 /*
2955 * Acquire locks in the correct order to avoid deadlocks. As per the
2956 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2957 * bank locks.
2958 *
2959 * We only need SHARED mode since we're just reading the head/tail
2960 * positions, not modifying them.
2961 */
2964
2965 pos = QUEUE_TAIL;
2966 head = QUEUE_HEAD;
2967
2968 /* Release NotifyQueueLock early, we only needed to read the positions */
2970
2971 /*
2972 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2973 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2974 * we're working.
2975 */
2976 while (!QUEUE_POS_EQUAL(pos, head))
2977 {
2979 TransactionId xid;
2980 int64 pageno = QUEUE_POS_PAGE(pos);
2981 int offset = QUEUE_POS_OFFSET(pos);
2982
2983 /* If we need a different page, release old lock and get new one */
2984 if (pageno != curpage)
2985 {
2986 LWLock *lock;
2987
2988 /* Release previous page if any */
2989 if (slotno >= 0)
2990 {
2991 if (page_dirty)
2992 {
2993 NotifyCtl->shared->page_dirty[slotno] = true;
2994 page_dirty = false;
2995 }
2997 }
2998
2999 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3001 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3003 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3004 curpage = pageno;
3005 }
3006
3007 qe = (AsyncQueueEntry *) (page_buffer + offset);
3008 xid = qe->xid;
3009
3010 if (TransactionIdIsNormal(xid) &&
3012 {
3013 if (TransactionIdDidCommit(xid))
3014 {
3015 qe->xid = FrozenTransactionId;
3016 page_dirty = true;
3017 }
3018 else
3019 {
3020 qe->xid = InvalidTransactionId;
3021 page_dirty = true;
3022 }
3023 }
3024
3025 /* Advance to next entry */
3026 asyncQueueAdvance(&pos, qe->length);
3027 }
3028
3029 /* Release final page lock if we acquired one */
3030 if (slotno >= 0)
3031 {
3032 if (page_dirty)
3033 NotifyCtl->shared->page_dirty[slotno] = true;
3035 }
3036
3038}
3039
3040/*
3041 * ProcessIncomingNotify
3042 *
3043 * Scan the queue for arriving notifications and report them to the front
3044 * end. The notifications might be from other sessions, or our own;
3045 * there's no need to distinguish here.
3046 *
3047 * If "flush" is true, force any frontend messages out immediately.
3048 *
3049 * NOTE: since we are outside any transaction, we must create our own.
3050 */
3051static void
3053{
3054 /* We *must* reset the flag */
3055 notifyInterruptPending = false;
3056
3057 /* Do nothing else if we aren't actively listening */
3059 return;
3060
3061 if (Trace_notify)
3062 elog(DEBUG1, "ProcessIncomingNotify");
3063
3064 set_ps_display("notify interrupt");
3065
3066 /*
3067 * We must run asyncQueueReadAllNotifications inside a transaction, else
3068 * bad things happen if it gets an error.
3069 */
3071
3073
3075
3076 /*
3077 * If this isn't an end-of-command case, we must flush the notify messages
3078 * to ensure frontend gets them promptly.
3079 */
3080 if (flush)
3081 pq_flush();
3082
3083 set_ps_display("idle");
3084
3085 if (Trace_notify)
3086 elog(DEBUG1, "ProcessIncomingNotify: done");
3087}
3088
3089/*
3090 * Send NOTIFY message to my front end.
3091 */
3092void
3093NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3094{
3096 {
3098
3100 pq_sendint32(&buf, srcPid);
3101 pq_sendstring(&buf, channel);
3102 pq_sendstring(&buf, payload);
3104
3105 /*
3106 * NOTE: we do not do pq_flush() here. Some level of caller will
3107 * handle it later, allowing this message to be combined into a packet
3108 * with other ones.
3109 */
3110 }
3111 else
3112 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3113}
3114
3115/* Does pendingNotifies include a match for the given event? */
3116static bool
3118{
3119 if (pendingNotifies == NULL)
3120 return false;
3121
3123 {
3124 /* Use the hash table to probe for a match */
3126 &n,
3127 HASH_FIND,
3128 NULL))
3129 return true;
3130 }
3131 else
3132 {
3133 /* Must scan the event list */
3134 ListCell *l;
3135
3136 foreach(l, pendingNotifies->events)
3137 {
3139
3140 if (n->channel_len == oldn->channel_len &&
3141 n->payload_len == oldn->payload_len &&
3142 memcmp(n->data, oldn->data,
3143 n->channel_len + n->payload_len + 2) == 0)
3144 return true;
3145 }
3146 }
3147
3148 return false;
3149}
3150
3151/*
3152 * Add a notification event to a pre-existing pendingNotifies list.
3153 *
3154 * Because pendingNotifies->events is already nonempty, this works
3155 * correctly no matter what CurrentMemoryContext is.
3156 */
3157static void
3159{
3161
3162 /* Create the hash tables if it's time to */
3165 {
3167 ListCell *l;
3168
3169 /* Create the hash table */
3170 hash_ctl.keysize = sizeof(Notification *);
3171 hash_ctl.entrysize = sizeof(struct NotificationHash);
3176 hash_create("Pending Notifies",
3177 256L,
3178 &hash_ctl,
3180
3181 /* Create the unique channel name table */
3183 hash_ctl.keysize = NAMEDATALEN;
3184 hash_ctl.entrysize = sizeof(ChannelName);
3187 hash_create("Pending Notify Channel Names",
3188 64L,
3189 &hash_ctl,
3191
3192 /* Insert all the already-existing events */
3193 foreach(l, pendingNotifies->events)
3194 {
3196 char *channel = oldn->data;
3197 bool found;
3198
3200 &oldn,
3201 HASH_ENTER,
3202 &found);
3203 Assert(!found);
3204
3205 /* Add channel name to uniqueChannelHash; might be there already */
3207 channel,
3208 HASH_ENTER,
3209 NULL);
3210 }
3211 }
3212
3213 /* Add new event to the list, in order */
3215
3216 /* Add event to the hash tables if needed */
3218 {
3219 char *channel = n->data;
3220 bool found;
3221
3223 &n,
3224 HASH_ENTER,
3225 &found);
3226 Assert(!found);
3227
3228 /* Add channel name to uniqueChannelHash; might be there already */
3230 channel,
3231 HASH_ENTER,
3232 NULL);
3233 }
3234}
3235
3236/*
3237 * notification_hash: hash function for notification hash table
3238 *
3239 * The hash "keys" are pointers to Notification structs.
3240 */
3241static uint32
3242notification_hash(const void *key, Size keysize)
3243{
3244 const Notification *k = *(const Notification *const *) key;
3245
3246 Assert(keysize == sizeof(Notification *));
3247 /* We don't bother to include the payload's trailing null in the hash */
3248 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3249 k->channel_len + k->payload_len + 1));
3250}
3251
3252/*
3253 * notification_match: match function to use with notification_hash
3254 */
3255static int
3256notification_match(const void *key1, const void *key2, Size keysize)
3257{
3258 const Notification *k1 = *(const Notification *const *) key1;
3259 const Notification *k2 = *(const Notification *const *) key2;
3260
3261 Assert(keysize == sizeof(Notification *));
3262 if (k1->channel_len == k2->channel_len &&
3263 k1->payload_len == k2->payload_len &&
3264 memcmp(k1->data, k2->data,
3265 k1->channel_len + k1->payload_len + 2) == 0)
3266 return 0; /* equal */
3267 return 1; /* not equal */
3268}
3269
3270/* Clear the pendingActions and pendingNotifies lists. */
3271static void
3273{
3274 /*
3275 * Everything's allocated in either TopTransactionContext or the context
3276 * for the subtransaction to which it corresponds. So, there's nothing to
3277 * do here except reset the pointers; the space will be reclaimed when the
3278 * contexts are deleted.
3279 */
3282 /* Also clear pendingListenActions, which is derived from pendingActions */
3284}
3285
3286/*
3287 * GUC check_hook for notify_buffers
3288 */
3289bool
3291{
3292 return check_slru_buffers("notify_buffers", newval);
3293}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2259
static double asyncQueueUsage(void)
Definition async.c:2186
#define MIN_HASHABLE_NOTIFIES
Definition async.c:513
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1523
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:1999
#define QUEUE_FIRST_LISTENER
Definition async.c:351
#define QUEUE_POS_MAX(x, y)
Definition async.c:259
static bool tryAdvanceTail
Definition async.c:564
void HandleNotifyInterrupt(void)
Definition async.c:2543
static void BecomeRegisteredListener(void)
Definition async.c:1422
static void asyncQueueAdvanceTail(void)
Definition async.c:2863
int max_notify_queue_pages
Definition async.c:570
static ActionList * pendingActions
Definition async.c:444
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1713
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:357
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3242
void Async_UnlistenAll(void)
Definition async.c:1067
static int32 * signalPids
Definition async.c:560
static SlruCtlData NotifyCtlData
Definition async.c:362
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3093
void AtCommit_Notify(void)
Definition async.c:1370
#define QUEUE_POS_MIN(x, y)
Definition async.c:253
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1626
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2573
ListenActionKind
Definition async.c:425
@ LISTEN_LISTEN
Definition async.c:426
@ LISTEN_UNLISTEN_ALL
Definition async.c:428
@ LISTEN_UNLISTEN
Definition async.c:427
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3117
#define QUEUE_BACKEND_POS(i)
Definition async.c:355
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:663
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:378
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3256
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:650
#define SET_QUEUE_POS(x, y, z)
Definition async.c:240
static ProcNumber * signalProcnos
Definition async.c:561
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2724
static void ProcessIncomingNotify(bool flush)
Definition async.c:3052
static void asyncQueueReadAllNotifications(void)
Definition async.c:2590
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1139
#define QUEUE_POS_OFFSET(x)
Definition async.c:238
static QueuePosition queueHeadAfterWrite
Definition async.c:553
bool Trace_notify
Definition async.c:567
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2035
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3272
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1087
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:852
static NotificationList * pendingNotifies
Definition async.c:520
#define AsyncQueueEntryEmptySize
Definition async.c:226
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3158
static AsyncQueueControl * asyncQueueControl
Definition async.c:346
static bool unlistenExitRegistered
Definition async.c:541
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:628
static dsa_area * globalChannelDSA
Definition async.c:401
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1966
#define QUEUE_TAIL
Definition async.c:349
void AtAbort_Notify(void)
Definition async.c:2411
#define QUEUE_POS_PAGE(x)
Definition async.c:237
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1686
void PreCommit_Notify(void)
Definition async.c:1177
#define QUEUE_CLEANUP_DELAY
Definition async.c:281
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1656
static void asyncQueueFillWarning(void)
Definition async.c:2207
#define QUEUE_BACKEND_PID(i)
Definition async.c:352
static void CleanupListenersOnExit(void)
Definition async.c:1838
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:638
Size AsyncShmemSize(void)
Definition async.c:776
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:367
void Async_Unlisten(const char *channel)
Definition async.c:1049
static HTAB * pendingListenActions
Definition async.c:466
void Async_Listen(const char *channel)
Definition async.c:1035
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:200
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:249
static void initGlobalChannelTable(void)
Definition async.c:677
#define NotifyCtl
Definition async.c:364
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:356
static HTAB * localChannelTable
Definition async.c:408
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:618
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:988
#define QUEUEALIGN(len)
Definition async.c:224
static bool amRegisteredListener
Definition async.c:544
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:265
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:354
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:353
void AtSubAbort_Notify(void)
Definition async.c:2500
void AtPrepare_Notify(void)
Definition async.c:1152
#define QUEUE_PAGESIZE
Definition async.c:365
void AtSubCommit_Notify(void)
Definition async.c:2430
static bool asyncQueueIsFull(void)
Definition async.c:1951
#define QUEUE_HEAD
Definition async.c:348
void AsyncShmemInit(void)
Definition async.c:793
static void initLocalChannelTable(void)
Definition async.c:728
PendingListenAction
Definition async.c:455
@ PENDING_UNLISTEN
Definition async.c:457
@ PENDING_LISTEN
Definition async.c:456
static dshash_table * globalChannelTable
Definition async.c:400
static void asyncQueueUnregister(void)
Definition async.c:1908
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2161
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:246
#define LocalChannelTableIsEmpty()
Definition async.c:411
static void initPendingListenActions(void)
Definition async.c:754
static QueuePosition queueHeadBeforeWrite
Definition async.c:552
static bool IsListeningOn(const char *channel)
Definition async.c:1895
void Async_Notify(const char *channel, const char *payload)
Definition async.c:886
volatile sig_atomic_t notifyInterruptPending
Definition async.c:538
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2945
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3290
#define QUEUE_STOP_PAGE
Definition async.c:350
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:885
int64_t int64
Definition c.h:555
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:492
int32_t int32
Definition c.h:554
uint16_t uint16
Definition c.h:557
uint32_t uint32
Definition c.h:558
uint32 TransactionId
Definition c.h:678
size_t Size
Definition c.h:631
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
dsm_handle dsa_handle
Definition dsa.h:136
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:592
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:543
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:560
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:640
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:392
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:369
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:272
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:749
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
Definition dshash.c:435
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:659
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:208
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:574
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:759
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
dsa_pointer dshash_table_handle
Definition dshash.h:24
uint32 dshash_hash
Definition dshash.h:30
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:28
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define INFO
Definition elog.h:34
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:47
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
bool ExitOnAnyError
Definition globals.c:123
int notify_buffers
Definition globals.c:164
struct Latch * MyLatch
Definition globals.c:63
Oid MyDatabaseId
Definition globals.c:94
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:96
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_COMPARE
Definition hsearch.h:99
#define HASH_FUNCTION
Definition hsearch.h:98
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:343
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:93
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:232
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:286
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:252
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition slru.c:630
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1816
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1769
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition slru.c:527
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:375
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1433
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:198
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:355
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:160
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:440
int nestingLevel
Definition async.c:439
struct ActionList * upper
Definition async.c:441
ProcNumber firstListener
Definition async.c:337
QueuePosition tail
Definition async.c:333
QueuePosition head
Definition async.c:332
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:343
dshash_table_handle globalChannelTableDSH
Definition async.c:341
TimestampTz lastQueueFillWarn
Definition async.c:339
dsa_handle globalChannelTableDSA
Definition async.c:340
int32 srcPid
Definition async.c:219
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:220
TransactionId xid
Definition async.c:218
char channel[NAMEDATALEN]
Definition async.c:528
dsa_pointer listenersArray
Definition async.c:395
int allocatedListeners
Definition async.c:397
GlobalChannelKey key
Definition async.c:394
char channel[NAMEDATALEN]
Definition async.c:383
Size keysize
Definition hsearch.h:75
Definition pg_list.h:54
ListenActionKind action
Definition async.c:433
ProcNumber procNo
Definition async.c:388
bool listening
Definition async.c:389
Notification * event
Definition async.c:517
List * uniqueChannelNames
Definition async.c:508
HTAB * uniqueChannelHash
Definition async.c:509
HTAB * hashtab
Definition async.c:507
List * events
Definition async.c:506
struct NotificationList * upper
Definition async.c:510
uint16 payload_len
Definition async.c:498
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:500
uint16 channel_len
Definition async.c:497
PendingListenAction action
Definition async.c:463
char channel[NAMEDATALEN]
Definition async.c:462
ProcNumber nextListener
Definition async.c:290
QueuePosition pos
Definition async.c:291
int64 page
Definition async.c:233
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:443
char * text_to_cstring(const text *t)
Definition varlena.c:215
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011
int GetCurrentTransactionNestLevel(void)
Definition xact.c:930
void StartTransactionCommand(void)
Definition xact.c:3080
void CommitTransactionCommand(void)
Definition xact.c:3178
TransactionId GetCurrentTransactionId(void)
Definition xact.c:455