PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * async.c
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/async.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15/*-------------------------------------------------------------------------
16 * Async Notification Model as of v19:
17 *
18 * 1. Multiple backends on same machine. Multiple backends may be listening
19 * on each of several channels.
20 *
21 * 2. There is one central queue in disk-based storage (directory pg_notify/),
22 * with actively-used pages mapped into shared memory by the slru.c module.
23 * All notification messages are placed in the queue and later read out
24 * by listening backends. The single queue allows us to guarantee that
25 * notifications are received in commit order.
26 *
27 * Although there is only one queue, notifications are treated as being
28 * database-local; this is done by including the sender's database OID
29 * in each notification message. Listening backends ignore messages
30 * that don't match their database OID. This is important because it
31 * ensures senders and receivers have the same database encoding and won't
32 * misinterpret non-ASCII text in the channel name or payload string.
33 *
34 * Since notifications are not expected to survive database crashes,
35 * we can simply clean out the pg_notify data at any reboot, and there
36 * is no need for WAL support or fsync'ing.
37 *
38 * 3. Every backend that is listening on at least one channel registers by
39 * entering its PID into the array in AsyncQueueControl. It then scans all
40 * incoming notifications in the central queue and first compares the
41 * database OID of the notification with its own database OID and then
42 * compares the notified channel with the list of channels that it listens
43 * to. In case there is a match it delivers the notification event to its
44 * frontend. Non-matching events are simply skipped.
45 *
46 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
47 * a backend-local list which will not be processed until transaction end.
48 *
49 * Duplicate notifications from the same transaction are sent out as one
50 * notification only. This is done to save work when for example a trigger
51 * on a 2 million row table fires a notification for each row that has been
52 * changed. If the application needs to receive every single notification
53 * that has been sent, it can easily add some unique string into the extra
54 * payload parameter.
55 *
56 * When the transaction is ready to commit, PreCommit_Notify() adds the
57 * pending notifications to the head of the queue. The head pointer of the
58 * queue always points to the next free position and a position is just a
59 * page number and the offset in that page. This is done before marking the
60 * transaction as committed in clog. If we run into problems writing the
61 * notifications, we can still call elog(ERROR, ...) and the transaction
62 * will roll back safely.
63 *
64 * Once we have put all of the notifications into the queue, we return to
65 * CommitTransaction() which will then do the actual transaction commit.
66 *
67 * After commit we are called another time (AtCommit_Notify()). Here we
68 * make any required updates to the effective listen state (see below).
69 * Then we signal any backends that may be interested in our messages
70 * (including our own backend, if listening). This is done by
71 * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
72 * each relevant backend, as described below.
73 *
74 * Finally, after we are out of the transaction altogether and about to go
75 * idle, we scan the queue for messages that need to be sent to our
76 * frontend (which might be notifies from other backends, or self-notifies
77 * from our own). This step is not part of the CommitTransaction sequence
78 * for two important reasons. First, we could get errors while sending
79 * data to our frontend, and it's really bad for errors to happen in
80 * post-commit cleanup. Second, in cases where a procedure issues commits
81 * within a single frontend command, we don't want to send notifies to our
82 * frontend until the command is done; but notifies to other backends
83 * should go out immediately after each commit.
84 *
85 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
86 * sets the process's latch, which triggers the event to be processed
87 * immediately if this backend is idle (i.e., it is waiting for a frontend
88 * command and is not within a transaction block. C.f.
89 * ProcessClientReadInterrupt()). Otherwise the handler may only set a
90 * flag, which will cause the processing to occur just before we next go
91 * idle.
92 *
93 * Inbound-notify processing consists of reading all of the notifications
94 * that have arrived since scanning last time. We read every notification
95 * until we reach either a notification from an uncommitted transaction or
96 * the head pointer's position.
97 *
98 * 6. To limit disk space consumption, the tail pointer needs to be advanced
99 * so that old pages can be truncated. This is relatively expensive
100 * (notably, it requires an exclusive lock), so we don't want to do it
101 * often. We make sending backends do this work if they advanced the queue
102 * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
103 *
104 * 7. So far we have not discussed how backends change their listening state,
105 * nor how notification senders know which backends to awaken. To handle
106 * the latter, we maintain a global channel table (implemented as a dynamic
107 * shared hash table, or dshash) that maps channel names to the set of
108 * backends listening on each channel. This table is created lazily on the
109 * first LISTEN command and grows dynamically as needed. There is also a
110 * local channel table (a plain dynahash table) in each listening backend,
111 * tracking which channels that backend is listening to. The local table
112 * serves to reduce the number of accesses needed to the shared table.
113 *
114 * If the current transaction has executed any LISTEN/UNLISTEN actions,
115 * PreCommit_Notify() prepares to commit those. For LISTEN, it
116 * pre-allocates entries in both the per-backend localChannelTable and the
117 * shared globalChannelTable (with listening=false so that these entries
118 * are no-ops for the moment). It also records the final per-channel
119 * intent in pendingListenActions, so post-commit/abort processing can
120 * apply that in a single step. Since all these allocations happen before
121 * committing to clog, we can safely abort the transaction on failure.
122 *
123 * After commit, AtCommit_Notify() runs through pendingListenActions and
124 * updates the backend's per-channel listening flags to activate or
125 * deactivate listening. This happens before sending signals.
126 *
127 * SignalBackends() consults the shared global channel table to identify
128 * listeners for the channels that the current transaction sent
129 * notification(s) to. Each selected backend is marked as having a wakeup
130 * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
131 * signal is sent to it.
132 *
133 * 8. While writing notifications, PreCommit_Notify() records the queue head
134 * position both before and after the write. Because all writers serialize
135 * on a cluster-wide heavyweight lock, no other backend can insert entries
136 * between these two points. SignalBackends() uses this fact to directly
137 * advance the queue pointer for any backend that is still positioned at
138 * the old head, or within the range written, but is not interested in any
139 * of our notifications. This avoids unnecessary wakeups for idle
140 * listeners that have nothing to read. Backends that are not interested
141 * in our notifications, but cannot be directly advanced, are signaled only
142 * if they are far behind the current queue head; that is to ensure that
143 * we can advance the queue tail without undue delay.
144 *
145 * An application that listens on the same channel it notifies will get
146 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
147 * by comparing be_pid in the NOTIFY message to the application's own backend's
148 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
149 * frontend during startup.) The above design guarantees that notifies from
150 * other backends will never be missed by ignoring self-notifies.
151 *
152 * The amount of shared memory used for notify management (notify_buffers)
153 * can be varied without affecting anything but performance. The maximum
154 * amount of notification data that can be queued at one time is determined
155 * by the max_notify_queue_pages GUC.
156 *-------------------------------------------------------------------------
157 */
158
159#include "postgres.h"
160
161#include <limits.h>
162#include <unistd.h>
163#include <signal.h>
164
165#include "access/parallel.h"
166#include "access/slru.h"
167#include "access/transam.h"
168#include "access/xact.h"
169#include "catalog/pg_database.h"
170#include "commands/async.h"
171#include "common/hashfn.h"
172#include "funcapi.h"
173#include "lib/dshash.h"
174#include "libpq/libpq.h"
175#include "libpq/pqformat.h"
176#include "miscadmin.h"
177#include "storage/dsm_registry.h"
178#include "storage/ipc.h"
179#include "storage/lmgr.h"
180#include "storage/procsignal.h"
181#include "tcop/tcopprot.h"
182#include "utils/builtins.h"
183#include "utils/dsa.h"
184#include "utils/guc_hooks.h"
185#include "utils/memutils.h"
186#include "utils/ps_status.h"
187#include "utils/snapmgr.h"
188#include "utils/timestamp.h"
189
190
191/*
192 * Maximum size of a NOTIFY payload, including terminating NULL. This
193 * must be kept small enough so that a notification message fits on one
194 * SLRU page. The magic fudge factor here is noncritical as long as it's
195 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
196 * than that, so changes in that data structure won't affect user-visible
197 * restrictions.
198 */
199#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
200
201/*
202 * Struct representing an entry in the global notify queue
203 *
204 * This struct declaration has the maximal length, but in a real queue entry
205 * the data area is only big enough for the actual channel and payload strings
206 * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
207 * entry size, if both channel and payload strings are empty (but note it
208 * doesn't include alignment padding).
209 *
210 * The "length" field should always be rounded up to the next QUEUEALIGN
211 * multiple so that all fields are properly aligned.
212 */
213typedef struct AsyncQueueEntry
214{
215 int length; /* total allocated length of entry */
216 Oid dboid; /* sender's database OID */
217 TransactionId xid; /* sender's XID */
218 int32 srcPid; /* sender's PID */
221
222/* Currently, no field of AsyncQueueEntry requires more than int alignment */
223#define QUEUEALIGN(len) INTALIGN(len)
224
225#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
226
227/*
228 * Struct describing a queue position, and assorted macros for working with it
229 */
230typedef struct QueuePosition
231{
232 int64 page; /* SLRU page number */
233 int offset; /* byte offset within page */
235
236#define QUEUE_POS_PAGE(x) ((x).page)
237#define QUEUE_POS_OFFSET(x) ((x).offset)
238
239#define SET_QUEUE_POS(x,y,z) \
240 do { \
241 (x).page = (y); \
242 (x).offset = (z); \
243 } while (0)
244
245#define QUEUE_POS_EQUAL(x,y) \
246 ((x).page == (y).page && (x).offset == (y).offset)
247
248#define QUEUE_POS_IS_ZERO(x) \
249 ((x).page == 0 && (x).offset == 0)
250
251/* choose logically smaller QueuePosition */
252#define QUEUE_POS_MIN(x,y) \
253 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
254 (x).page != (y).page ? (y) : \
255 (x).offset < (y).offset ? (x) : (y))
256
257/* choose logically larger QueuePosition */
258#define QUEUE_POS_MAX(x,y) \
259 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
260 (x).page != (y).page ? (x) : \
261 (x).offset > (y).offset ? (x) : (y))
262
263/* returns true if x comes before y in queue order */
264#define QUEUE_POS_PRECEDES(x,y) \
265 (asyncQueuePagePrecedes((x).page, (y).page) || \
266 ((x).page == (y).page && (x).offset < (y).offset))
267
268/*
269 * Parameter determining how often we try to advance the tail pointer:
270 * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
271 * also the distance by which a backend that's not interested in our
272 * notifications needs to be behind before we'll decide we need to wake it
273 * up so it can advance its pointer.
274 *
275 * Resist the temptation to make this really large. While that would save
276 * work in some places, it would add cost in others. In particular, this
277 * should likely be less than notify_buffers, to ensure that backends
278 * catch up before the pages they'll need to read fall out of SLRU cache.
279 */
280#define QUEUE_CLEANUP_DELAY 4
281
282/*
283 * Struct describing a listening backend's status
284 */
285typedef struct QueueBackendStatus
286{
287 int32 pid; /* either a PID or InvalidPid */
288 Oid dboid; /* backend's database OID, or InvalidOid */
289 ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
290 QueuePosition pos; /* backend has read queue up to here */
291 bool wakeupPending; /* signal sent to backend, not yet processed */
292 bool isAdvancing; /* backend is advancing its position */
294
295/*
296 * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
297 *
298 * The AsyncQueueControl structure is protected by the NotifyQueueLock and
299 * NotifyQueueTailLock.
300 *
301 * When holding NotifyQueueLock in SHARED mode, backends may only inspect
302 * their own entries as well as the head and tail pointers. Consequently we
303 * can allow a backend to update its own record while holding only SHARED lock
304 * (since no other backend will inspect it).
305 *
306 * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
307 * entries of other backends and also change the head pointer. They can
308 * also advance other backends' queue positions, unless the other backend
309 * has isAdvancing set (i.e., is in process of doing that itself).
310 *
311 * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
312 * mode, backends can change the tail pointers.
313 *
314 * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
315 * the control lock for the pg_notify SLRU buffers.
316 * In order to avoid deadlocks, whenever we need multiple locks, we first get
317 * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
318 * globalChannelTable partition locks.
319 *
320 * Each backend uses the backend[] array entry with index equal to its
321 * ProcNumber. We rely on this to make SendProcSignal fast.
322 *
323 * The backend[] array entries for actively-listening backends are threaded
324 * together using firstListener and the nextListener links, so that we can
325 * scan them without having to iterate over inactive entries. We keep this
326 * list in order by ProcNumber so that the scan is cache-friendly when there
327 * are many active entries.
328 */
329typedef struct AsyncQueueControl
330{
331 QueuePosition head; /* head points to the next free location */
332 QueuePosition tail; /* tail must be <= the queue position of every
333 * listening backend */
334 int64 stopPage; /* oldest unrecycled page; must be <=
335 * tail.page */
336 ProcNumber firstListener; /* id of first listener, or
337 * INVALID_PROC_NUMBER */
338 TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
339 dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */
340 dshash_table_handle globalChannelTableDSH; /* and its dshash handle */
341 /* Array with room for MaxBackends entries: */
344
346
347#define QUEUE_HEAD (asyncQueueControl->head)
348#define QUEUE_TAIL (asyncQueueControl->tail)
349#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
350#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
351#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
352#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
353#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
354#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
355#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
356#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
357
358/*
359 * The SLRU buffer area through which we access the notification queue
360 */
362
363#define NotifyCtl (&NotifyCtlData)
364#define QUEUE_PAGESIZE BLCKSZ
365
366#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
367
368/*
369 * Global channel table definitions
370 *
371 * This hash table maps (database OID, channel name) keys to arrays of
372 * ProcNumbers representing the backends listening or about to listen
373 * on each channel. The "listening" flags allow us to create hash table
374 * entries pre-commit and not have to assume that creating them post-commit
375 * will succeed.
376 */
377#define INITIAL_LISTENERS_ARRAY_SIZE 4
378
384
385typedef struct ListenerEntry
386{
387 ProcNumber procNo; /* listener's ProcNumber */
388 bool listening; /* true if committed listener */
390
391typedef struct GlobalChannelEntry
392{
393 GlobalChannelKey key; /* hash key */
394 dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
395 int numListeners; /* Number of listeners currently stored */
396 int allocatedListeners; /* Allocated size of array */
398
401
402/*
403 * localChannelTable caches the channel names this backend is listening on
404 * (including those we have staged to be listened on, but not yet committed).
405 * Used by IsListeningOn() for fast lookups when reading notifications.
406 */
408
409/* We test this condition to detect that we're not listening at all */
410#define LocalChannelTableIsEmpty() \
411 (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
412
413/*
414 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
415 * all actions requested in the current transaction. As explained above,
416 * we don't actually change listen state until we reach transaction commit.
417 *
418 * The list is kept in CurTransactionContext. In subtransactions, each
419 * subtransaction has its own list in its own CurTransactionContext, but
420 * successful subtransactions attach their lists to their parent's list.
421 * Failed subtransactions simply discard their lists.
422 */
429
430typedef struct
431{
433 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
435
436typedef struct ActionList
437{
438 int nestingLevel; /* current transaction nesting depth */
439 List *actions; /* list of ListenAction structs */
440 struct ActionList *upper; /* details for upper transaction levels */
442
444
445/*
446 * Hash table recording the final listen/unlisten intent per channel for
447 * the current transaction. Key is channel name, value is PENDING_LISTEN or
448 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
449 * per channel instead of replaying every action. This is built from the
450 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
451 * AtAbort_Notify.
452 */
458
459typedef struct PendingListenEntry
460{
461 char channel[NAMEDATALEN]; /* hash key */
462 PendingListenAction action; /* which action should we perform? */
464
466
467/*
468 * State for outbound notifies consists of a list of all channels+payloads
469 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
470 * until and unless the transaction commits. pendingNotifies is NULL if no
471 * NOTIFYs have been done in the current (sub) transaction.
472 *
473 * We discard duplicate notify events issued in the same transaction.
474 * Hence, in addition to the list proper (which we need to track the order
475 * of the events, since we guarantee to deliver them in order), we build a
476 * hash table which we can probe to detect duplicates. Since building the
477 * hash table is somewhat expensive, we do so only once we have at least
478 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
479 * before that we just scan the events linearly.
480 *
481 * The list is kept in CurTransactionContext. In subtransactions, each
482 * subtransaction has its own list in its own CurTransactionContext, but
483 * successful subtransactions add their entries to their parent's list.
484 * Failed subtransactions simply discard their lists. Since these lists
485 * are independent, there may be notify events in a subtransaction's list
486 * that duplicate events in some ancestor (sub) transaction; we get rid of
487 * the dups when merging the subtransaction's list into its parent's.
488 *
489 * Note: the action and notify lists do not interact within a transaction.
490 * In particular, if a transaction does NOTIFY and then LISTEN on the same
491 * condition name, it will get a self-notify at commit. This is a bit odd
492 * but is consistent with our historical behavior.
493 */
494typedef struct Notification
495{
496 uint16 channel_len; /* length of channel-name string */
497 uint16 payload_len; /* length of payload string */
498 /* null-terminated channel name, then null-terminated payload follow */
501
502typedef struct NotificationList
503{
504 int nestingLevel; /* current transaction nesting depth */
505 List *events; /* list of Notification structs */
506 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
507 List *uniqueChannelNames; /* unique channel names being notified */
508 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
509 struct NotificationList *upper; /* details for upper transaction levels */
511
512#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
513
515{
516 Notification *event; /* => the actual Notification struct */
517};
518
520
521/*
522 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
523 * (both just carry the channel name, with no payload).
524 */
525typedef struct ChannelName
526{
527 char channel[NAMEDATALEN]; /* hash key */
529
530/*
531 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
532 * called from inside a signal handler. That just sets the
533 * notifyInterruptPending flag and sets the process
534 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
535 * actually deal with the interrupt.
536 */
538
539/* True if we've registered an on_shmem_exit cleanup */
540static bool unlistenExitRegistered = false;
541
542/* True if we're currently registered as a listener in asyncQueueControl */
543static bool amRegisteredListener = false;
544
545/*
546 * Queue head positions for direct advancement.
547 * These are captured during PreCommit_Notify while holding the heavyweight
548 * lock on database 0, ensuring no other backend can insert notifications
549 * between them. SignalBackends uses these to advance idle backends.
550 */
553
554/*
555 * Workspace arrays for SignalBackends. These are preallocated in
556 * PreCommit_Notify to avoid needing memory allocation after committing to
557 * clog.
558 */
561
562/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
563static bool tryAdvanceTail = false;
564
565/* GUC parameters */
566bool Trace_notify = false;
567
568/* For 8 KB pages this gives 8 GB of disk space */
570
571/* local function prototypes */
572static inline int64 asyncQueuePageDiff(int64 p, int64 q);
573static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
574static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
575 const char *channel);
576static dshash_hash globalChannelTableHash(const void *key, size_t size,
577 void *arg);
578static void initGlobalChannelTable(void);
579static void initLocalChannelTable(void);
580static void queue_listen(ListenActionKind action, const char *channel);
581static void Async_UnlistenOnExit(int code, Datum arg);
582static void BecomeRegisteredListener(void);
583static void PrepareTableEntriesForListen(const char *channel);
584static void PrepareTableEntriesForUnlisten(const char *channel);
585static void PrepareTableEntriesForUnlistenAll(void);
588 int idx);
589static void ApplyPendingListenActions(bool isCommit);
590static void CleanupListenersOnExit(void);
591static bool IsListeningOn(const char *channel);
592static void asyncQueueUnregister(void);
593static bool asyncQueueIsFull(void);
594static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
597static double asyncQueueUsage(void);
598static void asyncQueueFillWarning(void);
599static void SignalBackends(void);
600static void asyncQueueReadAllNotifications(void);
602 QueuePosition stop,
603 Snapshot snapshot);
604static void asyncQueueAdvanceTail(void);
605static void ProcessIncomingNotify(bool flush);
608static uint32 notification_hash(const void *key, Size keysize);
609static int notification_match(const void *key1, const void *key2, Size keysize);
610static void ClearPendingActionsAndNotifies(void);
611
612/*
613 * Compute the difference between two queue page numbers.
614 * Previously this function accounted for a wraparound.
615 */
616static inline int64
618{
619 return p - q;
620}
621
622/*
623 * Determines whether p precedes q.
624 * Previously this function accounted for a wraparound.
625 */
626static inline bool
628{
629 return p < q;
630}
631
632/*
633 * GlobalChannelKeyInit
634 * Prepare a global channel table key for hashing.
635 */
636static inline void
637GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
638{
639 memset(key, 0, sizeof(GlobalChannelKey));
640 key->dboid = dboid;
641 strlcpy(key->channel, channel, NAMEDATALEN);
642}
643
644/*
645 * globalChannelTableHash
646 * Hash function for global channel table keys.
647 */
648static dshash_hash
649globalChannelTableHash(const void *key, size_t size, void *arg)
650{
651 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
652 dshash_hash h;
653
655 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
657
658 return h;
659}
660
661/* parameters for the global channel table */
670
671/*
672 * initGlobalChannelTable
673 * Lazy initialization of the global channel table.
674 */
675static void
677{
678 MemoryContext oldcontext;
679
680 /* Quick exit if we already did this */
683 return;
684
685 /* Otherwise, use a lock to ensure only one process creates the table */
687
688 /* Be sure any local memory allocated by DSA routines is persistent */
690
692 {
693 /* Initialize dynamic shared hash table for global channels */
699 NULL);
700
701 /* Store handles in shared memory for other backends to use */
705 }
706 else if (!globalChannelTable)
707 {
708 /* Attach to existing dynamic shared hash table */
714 NULL);
715 }
716
717 MemoryContextSwitchTo(oldcontext);
719}
720
721/*
722 * initLocalChannelTable
723 * Lazy initialization of the local channel table.
724 * Once created, this table lasts for the life of the session.
725 */
726static void
728{
730
731 /* Quick exit if we already did this */
732 if (localChannelTable != NULL)
733 return;
734
735 /* Initialize local hash table for this backend's listened channels */
737 hash_ctl.entrysize = sizeof(ChannelName);
738
740 hash_create("Local Listen Channels",
741 64,
742 &hash_ctl,
744}
745
746/*
747 * initPendingListenActions
748 * Lazy initialization of the pending listen actions hash table.
749 * This is allocated in CurTransactionContext during PreCommit_Notify,
750 * and destroyed at transaction end.
751 */
752static void
754{
756
758 return;
759
761 hash_ctl.entrysize = sizeof(PendingListenEntry);
763
765 hash_create("Pending Listen Actions",
767 &hash_ctl,
769}
770
771/*
772 * Report space needed for our shared memory area
773 */
774Size
776{
777 Size size;
778
779 /* This had better match AsyncShmemInit */
780 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
781 size = add_size(size, offsetof(AsyncQueueControl, backend));
782
784
785 return size;
786}
787
788/*
789 * Initialize our shared memory area
790 */
791void
793{
794 bool found;
795 Size size;
796
797 /*
798 * Create or attach to the AsyncQueueControl structure.
799 */
800 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
801 size = add_size(size, offsetof(AsyncQueueControl, backend));
802
804 ShmemInitStruct("Async Queue Control", size, &found);
805
806 if (!found)
807 {
808 /* First time through, so initialize it */
811 QUEUE_STOP_PAGE = 0;
816 for (int i = 0; i < MaxBackends; i++)
817 {
824 }
825 }
826
827 /*
828 * Set up SLRU management of the pg_notify data. Note that long segment
829 * names are used in order to avoid wraparound.
830 */
831 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
834 SYNC_HANDLER_NONE, true);
835
836 if (!found)
837 {
838 /*
839 * During start or reboot, clean out the pg_notify directory.
840 */
842 }
843}
844
845
846/*
847 * pg_notify -
848 * SQL function to send a notification event
849 */
850Datum
852{
853 const char *channel;
854 const char *payload;
855
856 if (PG_ARGISNULL(0))
857 channel = "";
858 else
860
861 if (PG_ARGISNULL(1))
862 payload = "";
863 else
865
866 /* For NOTIFY as a statement, this is checked in ProcessUtility */
868
869 Async_Notify(channel, payload);
870
872}
873
874
875/*
876 * Async_Notify
877 *
878 * This is executed by the SQL notify command.
879 *
880 * Adds the message to the list of pending notifies.
881 * Actual notification happens during transaction commit.
882 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
883 */
884void
885Async_Notify(const char *channel, const char *payload)
886{
887 int my_level = GetCurrentTransactionNestLevel();
888 size_t channel_len;
889 size_t payload_len;
890 Notification *n;
891 MemoryContext oldcontext;
892
893 if (IsParallelWorker())
894 elog(ERROR, "cannot send notifications from a parallel worker");
895
896 if (Trace_notify)
897 elog(DEBUG1, "Async_Notify(%s)", channel);
898
899 channel_len = channel ? strlen(channel) : 0;
900 payload_len = payload ? strlen(payload) : 0;
901
902 /* a channel name must be specified */
903 if (channel_len == 0)
906 errmsg("channel name cannot be empty")));
907
908 /* enforce length limits */
909 if (channel_len >= NAMEDATALEN)
912 errmsg("channel name too long")));
913
914 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
917 errmsg("payload string too long")));
918
919 /*
920 * We must construct the Notification entry, even if we end up not using
921 * it, in order to compare it cheaply to existing list entries.
922 *
923 * The notification list needs to live until end of transaction, so store
924 * it in the transaction context.
925 */
927
929 channel_len + payload_len + 2);
930 n->channel_len = channel_len;
931 n->payload_len = payload_len;
932 strcpy(n->data, channel);
933 if (payload)
934 strcpy(n->data + channel_len + 1, payload);
935 else
936 n->data[channel_len + 1] = '\0';
937
938 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
939 {
941
942 /*
943 * First notify event in current (sub)xact. Note that we allocate the
944 * NotificationList in TopTransactionContext; the nestingLevel might
945 * get changed later by AtSubCommit_Notify.
946 */
949 sizeof(NotificationList));
950 notifies->nestingLevel = my_level;
951 notifies->events = list_make1(n);
952 /* We certainly don't need a hashtable yet */
953 notifies->hashtab = NULL;
954 /* We won't build uniqueChannelNames/Hash till later, either */
955 notifies->uniqueChannelNames = NIL;
956 notifies->uniqueChannelHash = NULL;
957 notifies->upper = pendingNotifies;
959 }
960 else
961 {
962 /* Now check for duplicates */
964 {
965 /* It's a dup, so forget it */
966 pfree(n);
967 MemoryContextSwitchTo(oldcontext);
968 return;
969 }
970
971 /* Append more events to existing list */
973 }
974
975 MemoryContextSwitchTo(oldcontext);
976}
977
978/*
979 * queue_listen
980 * Common code for listen, unlisten, unlisten all commands.
981 *
982 * Adds the request to the list of pending actions.
983 * Actual update of localChannelTable and globalChannelTable happens during
984 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
985 */
986static void
987queue_listen(ListenActionKind action, const char *channel)
988{
989 MemoryContext oldcontext;
991 int my_level = GetCurrentTransactionNestLevel();
992
993 /*
994 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
995 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
996 * final per-channel intent is computed during PreCommit_Notify.
997 */
999
1000 /* space for terminating null is included in sizeof(ListenAction) */
1002 strlen(channel) + 1);
1003 actrec->action = action;
1004 strcpy(actrec->channel, channel);
1005
1006 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1007 {
1008 ActionList *actions;
1009
1010 /*
1011 * First action in current sub(xact). Note that we allocate the
1012 * ActionList in TopTransactionContext; the nestingLevel might get
1013 * changed later by AtSubCommit_Notify.
1014 */
1015 actions = (ActionList *)
1017 actions->nestingLevel = my_level;
1018 actions->actions = list_make1(actrec);
1019 actions->upper = pendingActions;
1020 pendingActions = actions;
1021 }
1022 else
1024
1025 MemoryContextSwitchTo(oldcontext);
1026}
1027
1028/*
1029 * Async_Listen
1030 *
1031 * This is executed by the SQL listen command.
1032 */
1033void
1034Async_Listen(const char *channel)
1035{
1036 if (Trace_notify)
1037 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1038
1039 queue_listen(LISTEN_LISTEN, channel);
1040}
1041
1042/*
1043 * Async_Unlisten
1044 *
1045 * This is executed by the SQL unlisten command.
1046 */
1047void
1048Async_Unlisten(const char *channel)
1049{
1050 if (Trace_notify)
1051 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1052
1053 /* If we couldn't possibly be listening, no need to queue anything */
1055 return;
1056
1057 queue_listen(LISTEN_UNLISTEN, channel);
1058}
1059
1060/*
1061 * Async_UnlistenAll
1062 *
1063 * This is invoked by UNLISTEN * command, and also at backend exit.
1064 */
1065void
1067{
1068 if (Trace_notify)
1069 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1070
1071 /* If we couldn't possibly be listening, no need to queue anything */
1073 return;
1074
1076}
1077
1078/*
1079 * SQL function: return a set of the channel names this backend is actively
1080 * listening to.
1081 *
1082 * Note: this coding relies on the fact that the localChannelTable cannot
1083 * change within a transaction.
1084 */
1085Datum
1087{
1089 HASH_SEQ_STATUS *status;
1090
1091 /* stuff done only on the first call of the function */
1092 if (SRF_IS_FIRSTCALL())
1093 {
1094 /* create a function context for cross-call persistence */
1096
1097 /* Initialize hash table iteration if we have any channels */
1098 if (localChannelTable != NULL)
1099 {
1100 MemoryContext oldcontext;
1101
1102 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1103 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1105 funcctx->user_fctx = status;
1106 MemoryContextSwitchTo(oldcontext);
1107 }
1108 else
1109 {
1110 funcctx->user_fctx = NULL;
1111 }
1112 }
1113
1114 /* stuff done on every call of the function */
1116 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1117
1118 if (status != NULL)
1119 {
1120 ChannelName *entry;
1121
1122 entry = (ChannelName *) hash_seq_search(status);
1123 if (entry != NULL)
1125 }
1126
1128}
1129
1130/*
1131 * Async_UnlistenOnExit
1132 *
1133 * This is executed at backend exit if we have done any LISTENs in this
1134 * backend. It might not be necessary anymore, if the user UNLISTENed
1135 * everything, but we don't try to detect that case.
1136 */
1137static void
1143
1144/*
1145 * AtPrepare_Notify
1146 *
1147 * This is called at the prepare phase of a two-phase
1148 * transaction. Save the state for possible commit later.
1149 */
1150void
1152{
1153 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1155 ereport(ERROR,
1157 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1158}
1159
1160/*
1161 * PreCommit_Notify
1162 *
1163 * This is called at transaction commit, before actually committing to
1164 * clog.
1165 *
1166 * If there are pending LISTEN actions, make sure we are listed in the
1167 * shared-memory listener array. This must happen before commit to
1168 * ensure we don't miss any notifies from transactions that commit
1169 * just after ours.
1170 *
1171 * If there are outbound notify requests in the pendingNotifies list,
1172 * add them to the global queue. We do that before commit so that
1173 * we can still throw error if we run out of queue space.
1174 */
1175void
1177{
1178 ListCell *p;
1179
1181 return; /* no relevant statements in this xact */
1182
1183 if (Trace_notify)
1184 elog(DEBUG1, "PreCommit_Notify");
1185
1186 /* Preflight for any pending listen/unlisten actions */
1188
1189 if (pendingActions != NULL)
1190 {
1191 /* Ensure we have a local channel table */
1193 /* Create pendingListenActions hash table for this transaction */
1195
1196 /* Stage all the actions this transaction wants to perform */
1197 foreach(p, pendingActions->actions)
1198 {
1200
1201 switch (actrec->action)
1202 {
1203 case LISTEN_LISTEN:
1206 break;
1207 case LISTEN_UNLISTEN:
1209 break;
1212 break;
1213 }
1214 }
1215 }
1216
1217 /* Queue any pending notifies (must happen after the above) */
1218 if (pendingNotifies)
1219 {
1221 bool firstIteration = true;
1222
1223 /*
1224 * Build list of unique channel names being notified for use by
1225 * SignalBackends().
1226 *
1227 * If uniqueChannelHash is available, use it to efficiently get the
1228 * unique channels. Otherwise, fall back to the O(N^2) approach.
1229 */
1232 {
1233 HASH_SEQ_STATUS status;
1235
1237 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1240 channelEntry->channel);
1241 }
1242 else
1243 {
1244 /* O(N^2) approach is better for small number of notifications */
1246 {
1247 char *channel = n->data;
1248 bool found = false;
1249
1250 /* Name present in list? */
1252 {
1253 if (strcmp(oldchan, channel) == 0)
1254 {
1255 found = true;
1256 break;
1257 }
1258 }
1259 /* Add if not already in list */
1260 if (!found)
1263 channel);
1264 }
1265 }
1266
1267 /* Preallocate workspace that will be needed by SignalBackends() */
1268 if (signalPids == NULL)
1270 MaxBackends * sizeof(int32));
1271
1272 if (signalProcnos == NULL)
1274 MaxBackends * sizeof(ProcNumber));
1275
1276 /*
1277 * Make sure that we have an XID assigned to the current transaction.
1278 * GetCurrentTransactionId is cheap if we already have an XID, but not
1279 * so cheap if we don't, and we'd prefer not to do that work while
1280 * holding NotifyQueueLock.
1281 */
1283
1284 /*
1285 * Serialize writers by acquiring a special lock that we hold till
1286 * after commit. This ensures that queue entries appear in commit
1287 * order, and in particular that there are never uncommitted queue
1288 * entries ahead of committed ones, so an uncommitted transaction
1289 * can't block delivery of deliverable notifications.
1290 *
1291 * We use a heavyweight lock so that it'll automatically be released
1292 * after either commit or abort. This also allows deadlocks to be
1293 * detected, though really a deadlock shouldn't be possible here.
1294 *
1295 * The lock is on "database 0", which is pretty ugly but it doesn't
1296 * seem worth inventing a special locktag category just for this.
1297 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1298 * used by the flatfiles mechanism.)
1299 */
1302
1303 /*
1304 * For the direct advancement optimization in SignalBackends(), we
1305 * need to ensure that no other backend can insert queue entries
1306 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1307 * heavyweight lock above provides this guarantee, since it serializes
1308 * all writers.
1309 *
1310 * Note: if the heavyweight lock were ever removed for scalability
1311 * reasons, we could achieve the same guarantee by holding
1312 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1313 * than releasing and reacquiring it for each page as we do below.
1314 */
1315
1316 /* Initialize values to a safe default in case list is empty */
1319
1320 /* Now push the notifications into the queue */
1322 while (nextNotify != NULL)
1323 {
1324 /*
1325 * Add the pending notifications to the queue. We acquire and
1326 * release NotifyQueueLock once per page, which might be overkill
1327 * but it does allow readers to get in while we're doing this.
1328 *
1329 * A full queue is very uncommon and should really not happen,
1330 * given that we have so much space available in the SLRU pages.
1331 * Nevertheless we need to deal with this possibility. Note that
1332 * when we get here we are in the process of committing our
1333 * transaction, but we have not yet committed to clog, so at this
1334 * point in time we can still roll the transaction back.
1335 */
1337 if (firstIteration)
1338 {
1340 firstIteration = false;
1341 }
1343 if (asyncQueueIsFull())
1344 ereport(ERROR,
1346 errmsg("too many notifications in the NOTIFY queue")));
1350 }
1351
1352 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1353 }
1354}
1355
1356/*
1357 * AtCommit_Notify
1358 *
1359 * This is called at transaction commit, after committing to clog.
1360 *
1361 * Apply pending listen/unlisten changes and clear transaction-local state.
1362 *
1363 * If we issued any notifications in the transaction, send signals to
1364 * listening backends (possibly including ourselves) to process them.
1365 * Also, if we filled enough queue pages with new notifies, try to
1366 * advance the queue tail pointer.
1367 */
1368void
1370{
1371 /*
1372 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1373 * return as soon as possible
1374 */
1376 return;
1377
1378 if (Trace_notify)
1379 elog(DEBUG1, "AtCommit_Notify");
1380
1381 /* Apply staged listen/unlisten changes */
1383
1384 /* If no longer listening to anything, get out of listener array */
1387
1388 /*
1389 * Send signals to listening backends. We need do this only if there are
1390 * pending notifies, which were previously added to the shared queue by
1391 * PreCommit_Notify().
1392 */
1393 if (pendingNotifies != NULL)
1395
1396 /*
1397 * If it's time to try to advance the global tail pointer, do that.
1398 *
1399 * (It might seem odd to do this in the sender, when more than likely the
1400 * listeners won't yet have read the messages we just sent. However,
1401 * there's less contention if only the sender does it, and there is little
1402 * need for urgency in advancing the global tail. So this typically will
1403 * be clearing out messages that were sent some time ago.)
1404 */
1405 if (tryAdvanceTail)
1406 {
1407 tryAdvanceTail = false;
1409 }
1410
1411 /* And clean up */
1413}
1414
1415/*
1416 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1417 *
1418 * This function must make sure we are ready to catch any incoming messages.
1419 */
1420static void
1422{
1423 QueuePosition head;
1424 QueuePosition max;
1426
1427 /*
1428 * Nothing to do if we are already listening to something, nor if we
1429 * already ran this routine in this transaction.
1430 */
1432 return;
1433
1434 if (Trace_notify)
1435 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1436
1437 /*
1438 * Before registering, make sure we will unlisten before dying. (Note:
1439 * this action does not get undone if we abort later.)
1440 */
1442 {
1445 }
1446
1447 /*
1448 * This is our first LISTEN, so establish our pointer.
1449 *
1450 * We set our pointer to the global tail pointer and then move it forward
1451 * over already-committed notifications. This ensures we cannot miss any
1452 * not-yet-committed notifications. We might get a few more but that
1453 * doesn't hurt.
1454 *
1455 * In some scenarios there might be a lot of committed notifications that
1456 * have not yet been pruned away (because some backend is being lazy about
1457 * reading them). To reduce our startup time, we can look at other
1458 * backends and adopt the maximum "pos" pointer of any backend that's in
1459 * our database; any notifications it's already advanced over are surely
1460 * committed and need not be re-examined by us. (We must consider only
1461 * backends connected to our DB, because others will not have bothered to
1462 * check committed-ness of notifications in our DB.)
1463 *
1464 * We need exclusive lock here so we can look at other backends' entries
1465 * and manipulate the list links.
1466 */
1468 head = QUEUE_HEAD;
1469 max = QUEUE_TAIL;
1472 {
1474 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1475 /* Also find last listening backend before this one */
1476 if (i < MyProcNumber)
1477 prevListener = i;
1478 }
1484 /* Insert backend into list of listeners at correct position */
1486 {
1489 }
1490 else
1491 {
1494 }
1496
1497 /* Now we are listed in the global array, so remember we're listening */
1498 amRegisteredListener = true;
1499
1500 /*
1501 * Try to move our pointer forward as far as possible. This will skip
1502 * over already-committed notifications, which we want to do because they
1503 * might be quite stale. Note that we are not yet listening on anything,
1504 * so we won't deliver such notifications to our frontend. Also, although
1505 * our transaction might have executed NOTIFY, those message(s) aren't
1506 * queued yet so we won't skip them here.
1507 */
1508 if (!QUEUE_POS_EQUAL(max, head))
1510}
1511
1512/*
1513 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1514 *
1515 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1516 * an entry in localChannelTable, and pre-allocating an entry in the shared
1517 * globalChannelTable with listening=false. The listening flag will be set
1518 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1519 * will be removed.
1520 */
1521static void
1523{
1524 GlobalChannelKey key;
1525 GlobalChannelEntry *entry;
1526 bool found;
1528 PendingListenEntry *pending;
1529
1530 /*
1531 * Record in local pending hash that we want to LISTEN, overwriting any
1532 * earlier attempt to UNLISTEN.
1533 */
1534 pending = (PendingListenEntry *)
1536 pending->action = PENDING_LISTEN;
1537
1538 /*
1539 * Ensure that there is an entry for the channel in localChannelTable.
1540 * (Should this fail, we can just roll back.) If the transaction fails
1541 * after this point, we will remove the entry if appropriate during
1542 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1543 * to return TRUE; we assume nothing is going to consult that before
1544 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1545 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1546 * present to ensure they do the right things; see
1547 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1548 */
1550
1551 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1552 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1553 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1554
1555 if (!found)
1556 {
1557 /* New channel entry, so initialize it to a safe state */
1559 entry->numListeners = 0;
1560 entry->allocatedListeners = 0;
1561 }
1562
1563 /*
1564 * Create listenersArray if entry doesn't have one. It's tempting to fold
1565 * this into the !found case, but this coding allows us to cope in case
1566 * dsa_allocate() failed in an earlier attempt.
1567 */
1568 if (!DsaPointerIsValid(entry->listenersArray))
1569 {
1573 }
1574
1577
1578 /*
1579 * Check if we already have a ListenerEntry (possibly from earlier in this
1580 * transaction)
1581 */
1582 for (int i = 0; i < entry->numListeners; i++)
1583 {
1584 if (listeners[i].procNo == MyProcNumber)
1585 {
1586 /* Already have an entry; listening flag stays as-is until commit */
1588 return;
1589 }
1590 }
1591
1592 /* Need to add a new entry; grow array if necessary */
1593 if (entry->numListeners >= entry->allocatedListeners)
1594 {
1595 int new_size = entry->allocatedListeners * 2;
1598 sizeof(ListenerEntry) * new_size);
1600
1602 entry->listenersArray = new_array;
1606 }
1607
1608 listeners[entry->numListeners].procNo = MyProcNumber;
1609 listeners[entry->numListeners].listening = false; /* staged, not yet
1610 * committed */
1611 entry->numListeners++;
1612
1614}
1615
1616/*
1617 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1618 *
1619 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1620 * we're currently listening (committed or staged). We don't touch
1621 * globalChannelTable yet - the listener keeps receiving signals until
1622 * commit, when the entry is removed.
1623 */
1624static void
1626{
1627 PendingListenEntry *pending;
1628
1629 /*
1630 * If the channel name is not in localChannelTable, then we are neither
1631 * listening on it nor preparing to listen on it, so we don't need to
1632 * record an UNLISTEN action.
1633 */
1635 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1636 return;
1637
1638 /*
1639 * Record in local pending hash that we want to UNLISTEN, overwriting any
1640 * earlier attempt to LISTEN. Don't touch localChannelTable or
1641 * globalChannelTable yet - we keep receiving signals until commit.
1642 */
1643 pending = (PendingListenEntry *)
1645 pending->action = PENDING_UNLISTEN;
1646}
1647
1648/*
1649 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1650 *
1651 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1652 * about-to-be-listened channels in pendingListenActions.
1653 */
1654static void
1656{
1659 PendingListenEntry *pending;
1660
1661 /*
1662 * Scan localChannelTable, which will have the names of all channels that
1663 * we are listening on or have prepared to listen on. Record an UNLISTEN
1664 * action for each one, overwriting any earlier attempt to LISTEN.
1665 */
1667 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1668 {
1669 pending = (PendingListenEntry *)
1671 pending->action = PENDING_UNLISTEN;
1672 }
1673}
1674
1675/*
1676 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1677 *
1678 * Decrements numListeners, compacts the array, and frees the entry if empty.
1679 * Sets *entry_ptr to NULL if the entry was deleted.
1680 *
1681 * We could get the listeners pointer from the entry, but all callers
1682 * already have it at hand.
1683 */
1684static void
1687 int idx)
1688{
1689 GlobalChannelEntry *entry = *entry_ptr;
1690
1691 entry->numListeners--;
1692 if (idx < entry->numListeners)
1694 sizeof(ListenerEntry) * (entry->numListeners - idx));
1695
1696 if (entry->numListeners == 0)
1697 {
1700 /* tells caller not to release the entry's lock: */
1701 *entry_ptr = NULL;
1702 }
1703}
1704
1705/*
1706 * ApplyPendingListenActions
1707 *
1708 * Apply, or revert, staged listen/unlisten changes to the local and global
1709 * hash tables.
1710 */
1711static void
1713{
1715 PendingListenEntry *pending;
1716
1717 /* Quick exit if nothing to do */
1719 return;
1720
1721 /* We made a globalChannelTable before building pendingListenActions */
1722 if (globalChannelTable == NULL)
1723 elog(PANIC, "global channel table missing post-commit/abort");
1724
1725 /* For each staged action ... */
1727 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1728 {
1729 GlobalChannelKey key;
1730 GlobalChannelEntry *entry;
1731 bool removeLocal = true;
1732 bool foundListener = false;
1733
1734 /*
1735 * Find the global entry for this channel. If isCommit, it had better
1736 * exist (it was created in PreCommit). In an abort, it might not
1737 * exist, in which case we are not listening and should discard any
1738 * local entry that PreCommit may have managed to create.
1739 */
1740 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1741 entry = dshash_find(globalChannelTable, &key, true);
1742 if (entry != NULL)
1743 {
1744 /* Scan entry to find the ListenerEntry for this backend */
1746
1749
1750 for (int i = 0; i < entry->numListeners; i++)
1751 {
1752 if (listeners[i].procNo != MyProcNumber)
1753 continue;
1754 foundListener = true;
1755 if (isCommit)
1756 {
1757 if (pending->action == PENDING_LISTEN)
1758 {
1759 /*
1760 * LISTEN being committed: set listening=true.
1761 * localChannelTable entry was created during
1762 * PreCommit and should be kept.
1763 */
1764 listeners[i].listening = true;
1765 removeLocal = false;
1766 }
1767 else
1768 {
1769 /*
1770 * UNLISTEN being committed: remove pre-allocated
1771 * entries from both tables.
1772 */
1774 }
1775 }
1776 else
1777 {
1778 /*
1779 * Note: this part is reachable only if the transaction
1780 * aborts after PreCommit_Notify() has made some
1781 * pendingListenActions entries, so it's pretty hard to
1782 * test.
1783 */
1784 if (!listeners[i].listening)
1785 {
1786 /*
1787 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1788 * and we weren't listening before, so remove
1789 * pre-allocated entries from both tables.
1790 */
1792 }
1793 else
1794 {
1795 /*
1796 * We're aborting, but the previous state was that
1797 * we're listening, so keep localChannelTable entry.
1798 */
1799 removeLocal = false;
1800 }
1801 }
1802 break; /* there shouldn't be another match */
1803 }
1804
1805 /* We might have already released the entry by removing it */
1806 if (entry != NULL)
1808 }
1809
1810 /*
1811 * If we're committing a LISTEN action, we should have found a
1812 * matching ListenerEntry, but otherwise it's okay if we didn't.
1813 */
1814 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1815 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1816 pending->channel, MyProcNumber);
1817
1818 /*
1819 * If we did not find a globalChannelTable entry for our backend, or
1820 * if we are unlistening, remove any localChannelTable entry that may
1821 * exist. (Note in particular that this cleans up if we created a
1822 * localChannelTable entry and then failed while trying to create a
1823 * globalChannelTable entry.)
1824 */
1827 HASH_REMOVE, NULL);
1828 }
1829}
1830
1831/*
1832 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1833 *
1834 * Remove this backend from all channels in the shared global table.
1835 */
1836static void
1838{
1839 dshash_seq_status status;
1840 GlobalChannelEntry *entry;
1841
1842 if (Trace_notify)
1843 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1844
1845 /* Clear our local cache (not really necessary, but be consistent) */
1846 if (localChannelTable != NULL)
1847 {
1850 }
1851
1852 /* Now remove our entries from the shared globalChannelTable */
1853 if (globalChannelTable == NULL)
1854 return;
1855
1856 dshash_seq_init(&status, globalChannelTable, true);
1857 while ((entry = dshash_seq_next(&status)) != NULL)
1858 {
1860
1861 if (entry->key.dboid != MyDatabaseId)
1862 continue; /* not relevant */
1863
1866
1867 for (int i = 0; i < entry->numListeners; i++)
1868 {
1869 if (listeners[i].procNo == MyProcNumber)
1870 {
1871 entry->numListeners--;
1872 if (i < entry->numListeners)
1873 memmove(&listeners[i], &listeners[i + 1],
1874 sizeof(ListenerEntry) * (entry->numListeners - i));
1875
1876 if (entry->numListeners == 0)
1877 {
1879 dshash_delete_current(&status);
1880 }
1881 break;
1882 }
1883 }
1884 }
1885 dshash_seq_term(&status);
1886}
1887
1888/*
1889 * Test whether we are actively listening on the given channel name.
1890 *
1891 * Note: this function is executed for every notification found in the queue.
1892 */
1893static bool
1894IsListeningOn(const char *channel)
1895{
1896 if (localChannelTable == NULL)
1897 return false;
1898
1899 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1900}
1901
1902/*
1903 * Remove our entry from the listeners array when we are no longer listening
1904 * on any channel. NB: must not fail if we're already not listening.
1905 */
1906static void
1908{
1909 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1910
1911 if (!amRegisteredListener) /* nothing to do */
1912 return;
1913
1914 /*
1915 * Need exclusive lock here to manipulate list links.
1916 */
1918 /* Mark our entry as invalid */
1923 /* and remove it from the list */
1926 else
1927 {
1929 {
1931 {
1933 break;
1934 }
1935 }
1936 }
1939
1940 /* mark ourselves as no longer listed in the global array */
1941 amRegisteredListener = false;
1942}
1943
1944/*
1945 * Test whether there is room to insert more notification messages.
1946 *
1947 * Caller must hold at least shared NotifyQueueLock.
1948 */
1949static bool
1951{
1952 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1954 int64 occupied = headPage - tailPage;
1955
1957}
1958
1959/*
1960 * Advance the QueuePosition to the next entry, assuming that the current
1961 * entry is of length entryLength. If we jump to a new page the function
1962 * returns true, else false.
1963 */
1964static bool
1966{
1967 int64 pageno = QUEUE_POS_PAGE(*position);
1968 int offset = QUEUE_POS_OFFSET(*position);
1969 bool pageJump = false;
1970
1971 /*
1972 * Move to the next writing position: First jump over what we have just
1973 * written or read.
1974 */
1975 offset += entryLength;
1976 Assert(offset <= QUEUE_PAGESIZE);
1977
1978 /*
1979 * In a second step check if another entry can possibly be written to the
1980 * page. If so, stay here, we have reached the next position. If not, then
1981 * we need to move on to the next page.
1982 */
1984 {
1985 pageno++;
1986 offset = 0;
1987 pageJump = true;
1988 }
1989
1990 SET_QUEUE_POS(*position, pageno, offset);
1991 return pageJump;
1992}
1993
1994/*
1995 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
1996 */
1997static void
1999{
2000 size_t channellen = n->channel_len;
2001 size_t payloadlen = n->payload_len;
2002 int entryLength;
2003
2006
2007 /* The terminators are already included in AsyncQueueEntryEmptySize */
2010 qe->length = entryLength;
2011 qe->dboid = MyDatabaseId;
2012 qe->xid = GetCurrentTransactionId();
2013 qe->srcPid = MyProcPid;
2014 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2015}
2016
2017/*
2018 * Add pending notifications to the queue.
2019 *
2020 * We go page by page here, i.e. we stop once we have to go to a new page but
2021 * we will be called again and then fill that next page. If an entry does not
2022 * fit into the current page, we write a dummy entry with an InvalidOid as the
2023 * database OID in order to fill the page. So every page is always used up to
2024 * the last byte which simplifies reading the page later.
2025 *
2026 * We are passed the list cell (in pendingNotifies->events) containing the next
2027 * notification to write and return the first still-unwritten cell back.
2028 * Eventually we will return NULL indicating all is done.
2029 *
2030 * We are holding NotifyQueueLock already from the caller and grab
2031 * page specific SLRU bank lock locally in this function.
2032 */
2033static ListCell *
2035{
2038 int64 pageno;
2039 int offset;
2040 int slotno;
2042
2043 /*
2044 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2045 * memory upon exiting. The reason for this is that if we have to advance
2046 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2047 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2048 * subsequent insertions would try to put entries into a page that slru.c
2049 * thinks doesn't exist yet.) So, use a local position variable. Note
2050 * that if we do fail, any already-inserted queue entries are forgotten;
2051 * this is okay, since they'd be useless anyway after our transaction
2052 * rolls back.
2053 */
2055
2056 /*
2057 * If this is the first write since the postmaster started, we need to
2058 * initialize the first page of the async SLRU. Otherwise, the current
2059 * page should be initialized already, so just fetch it.
2060 */
2061 pageno = QUEUE_POS_PAGE(queue_head);
2063
2064 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2066
2069 else
2070 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2072
2073 /* Note we mark the page dirty before writing in it */
2074 NotifyCtl->shared->page_dirty[slotno] = true;
2075
2076 while (nextNotify != NULL)
2077 {
2079
2080 /* Construct a valid queue entry in local variable qe */
2082
2083 offset = QUEUE_POS_OFFSET(queue_head);
2084
2085 /* Check whether the entry really fits on the current page */
2086 if (offset + qe.length <= QUEUE_PAGESIZE)
2087 {
2088 /* OK, so advance nextNotify past this item */
2090 }
2091 else
2092 {
2093 /*
2094 * Write a dummy entry to fill up the page. Actually readers will
2095 * only check dboid and since it won't match any reader's database
2096 * OID, they will ignore this entry and move on.
2097 */
2098 qe.length = QUEUE_PAGESIZE - offset;
2099 qe.dboid = InvalidOid;
2101 qe.data[0] = '\0'; /* empty channel */
2102 qe.data[1] = '\0'; /* empty payload */
2103 }
2104
2105 /* Now copy qe into the shared buffer page */
2106 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2107 &qe,
2108 qe.length);
2109
2110 /* Advance queue_head appropriately, and detect if page is full */
2111 if (asyncQueueAdvance(&(queue_head), qe.length))
2112 {
2113 LWLock *lock;
2114
2115 pageno = QUEUE_POS_PAGE(queue_head);
2116 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2117 if (lock != prevlock)
2118 {
2121 prevlock = lock;
2122 }
2123
2124 /*
2125 * Page is full, so we're done here, but first fill the next page
2126 * with zeroes. The reason to do this is to ensure that slru.c's
2127 * idea of the head page is always the same as ours, which avoids
2128 * boundary problems in SimpleLruTruncate. The test in
2129 * asyncQueueIsFull() ensured that there is room to create this
2130 * page without overrunning the queue.
2131 */
2133
2134 /*
2135 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2136 * set flag to remember that we should try to advance the tail
2137 * pointer (we don't want to actually do that right here).
2138 */
2140 tryAdvanceTail = true;
2141
2142 /* And exit the loop */
2143 break;
2144 }
2145 }
2146
2147 /* Success, so update the global QUEUE_HEAD */
2149
2151
2152 return nextNotify;
2153}
2154
2155/*
2156 * SQL function to return the fraction of the notification queue currently
2157 * occupied.
2158 */
2159Datum
2161{
2162 double usage;
2163
2164 /* Advance the queue tail so we don't report a too-large result */
2166
2170
2172}
2173
2174/*
2175 * Return the fraction of the queue that is currently occupied.
2176 *
2177 * The caller must hold NotifyQueueLock in (at least) shared mode.
2178 *
2179 * Note: we measure the distance to the logical tail page, not the physical
2180 * tail page. In some sense that's wrong, but the relative position of the
2181 * physical tail is affected by details such as SLRU segment boundaries,
2182 * so that a result based on that is unpleasantly unstable.
2183 */
2184static double
2186{
2187 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2189 int64 occupied = headPage - tailPage;
2190
2191 if (occupied == 0)
2192 return (double) 0; /* fast exit for common case */
2193
2194 return (double) occupied / (double) max_notify_queue_pages;
2195}
2196
2197/*
2198 * Check whether the queue is at least half full, and emit a warning if so.
2199 *
2200 * This is unlikely given the size of the queue, but possible.
2201 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2202 *
2203 * Caller must hold exclusive NotifyQueueLock.
2204 */
2205static void
2207{
2208 double fillDegree;
2209 TimestampTz t;
2210
2212 if (fillDegree < 0.5)
2213 return;
2214
2215 t = GetCurrentTimestamp();
2216
2219 {
2222
2224 {
2226 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2229 }
2230
2232 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2233 (minPid != InvalidPid ?
2234 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2235 : 0),
2236 (minPid != InvalidPid ?
2237 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2238 : 0)));
2239
2241 }
2242}
2243
2244/*
2245 * Send signals to listening backends.
2246 *
2247 * Normally we signal only backends that are interested in the notifies that
2248 * we just sent. However, that will leave idle listeners falling further and
2249 * further behind. Waken them anyway if they're far enough behind, so they'll
2250 * advance their queue position pointers, allowing the global tail to advance.
2251 *
2252 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2253 *
2254 * This is called during CommitTransaction(), so it's important for it
2255 * to have very low probability of failure.
2256 */
2257static void
2259{
2260 int count;
2261
2262 /* Can't get here without PreCommit_Notify having made the global table */
2264
2265 /* It should have set up these arrays, too */
2267
2268 /*
2269 * Identify backends that we need to signal. We don't want to send
2270 * signals while holding the NotifyQueueLock, so this part just builds a
2271 * list of target PIDs in signalPids[] and signalProcnos[].
2272 */
2273 count = 0;
2274
2276
2277 /* Scan each channel name that we notified in this transaction */
2279 {
2280 GlobalChannelKey key;
2281 GlobalChannelEntry *entry;
2283
2284 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2285 entry = dshash_find(globalChannelTable, &key, false);
2286 if (entry == NULL)
2287 continue; /* nobody is listening */
2288
2290 entry->listenersArray);
2291
2292 /* Identify listeners that now need waking, add them to arrays */
2293 for (int j = 0; j < entry->numListeners; j++)
2294 {
2295 ProcNumber i;
2296 int32 pid;
2297 QueuePosition pos;
2298
2299 if (!listeners[j].listening)
2300 continue; /* ignore not-yet-committed listeners */
2301
2302 i = listeners[j].procNo;
2303
2305 continue; /* already signaled, no need to repeat */
2306
2307 pid = QUEUE_BACKEND_PID(i);
2308 pos = QUEUE_BACKEND_POS(i);
2309
2310 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2311 continue; /* it's fully caught up already */
2312
2313 Assert(pid != InvalidPid);
2314
2316 signalPids[count] = pid;
2317 signalProcnos[count] = i;
2318 count++;
2319 }
2320
2322 }
2323
2324 /*
2325 * Scan all listeners. Any that are not already pending wakeup must not
2326 * be interested in our notifications (else we'd have set their wakeup
2327 * flags above). Check to see if we can directly advance their queue
2328 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2329 * them anyway so they will catch up.
2330 */
2332 {
2333 int32 pid;
2334 QueuePosition pos;
2335
2337 continue;
2338
2339 /* If it's currently advancing, we should not touch it */
2341 continue;
2342
2343 pid = QUEUE_BACKEND_PID(i);
2344 pos = QUEUE_BACKEND_POS(i);
2345
2346 /*
2347 * We can directly advance the other backend's queue pointer if it's
2348 * not currently advancing (else there are race conditions), and its
2349 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2350 * it miss some older messages), and we'd not be moving the pointer
2351 * backward.
2352 */
2355 {
2356 /* We can directly advance its pointer past what we wrote */
2358 }
2361 {
2362 /* It's idle and far behind, so wake it up */
2363 Assert(pid != InvalidPid);
2364
2366 signalPids[count] = pid;
2367 signalProcnos[count] = i;
2368 count++;
2369 }
2370 }
2371
2373
2374 /* Now send signals */
2375 for (int i = 0; i < count; i++)
2376 {
2377 int32 pid = signalPids[i];
2378
2379 /*
2380 * If we are signaling our own process, no need to involve the kernel;
2381 * just set the flag directly.
2382 */
2383 if (pid == MyProcPid)
2384 {
2386 continue;
2387 }
2388
2389 /*
2390 * Note: assuming things aren't broken, a signal failure here could
2391 * only occur if the target backend exited since we released
2392 * NotifyQueueLock; which is unlikely but certainly possible. So we
2393 * just log a low-level debug message if it happens.
2394 */
2396 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2397 }
2398}
2399
2400/*
2401 * AtAbort_Notify
2402 *
2403 * This is called at transaction abort.
2404 *
2405 * Revert any staged listen/unlisten changes and clean up transaction state.
2406 * This only does anything if we abort after PreCommit_Notify has staged
2407 * some entries.
2408 */
2409void
2411{
2412 /* Revert staged listen/unlisten changes */
2414
2415 /* If we're no longer listening on anything, unregister */
2418
2419 /* And clean up */
2421}
2422
2423/*
2424 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2425 *
2426 * Reassign all items in the pending lists to the parent transaction.
2427 */
2428void
2430{
2431 int my_level = GetCurrentTransactionNestLevel();
2432
2433 /* If there are actions at our nesting level, we must reparent them. */
2434 if (pendingActions != NULL &&
2435 pendingActions->nestingLevel >= my_level)
2436 {
2437 if (pendingActions->upper == NULL ||
2438 pendingActions->upper->nestingLevel < my_level - 1)
2439 {
2440 /* nothing to merge; give the whole thing to the parent */
2442 }
2443 else
2444 {
2446
2448
2449 /*
2450 * Mustn't try to eliminate duplicates here --- see queue_listen()
2451 */
2454 childPendingActions->actions);
2456 }
2457 }
2458
2459 /* If there are notifies at our nesting level, we must reparent them. */
2460 if (pendingNotifies != NULL &&
2461 pendingNotifies->nestingLevel >= my_level)
2462 {
2463 Assert(pendingNotifies->nestingLevel == my_level);
2464
2465 if (pendingNotifies->upper == NULL ||
2466 pendingNotifies->upper->nestingLevel < my_level - 1)
2467 {
2468 /* nothing to merge; give the whole thing to the parent */
2470 }
2471 else
2472 {
2473 /*
2474 * Formerly, we didn't bother to eliminate duplicates here, but
2475 * now we must, else we fall foul of "Assert(!found)", either here
2476 * or during a later attempt to build the parent-level hashtable.
2477 */
2479 ListCell *l;
2480
2482 /* Insert all the subxact's events into parent, except for dups */
2483 foreach(l, childPendingNotifies->events)
2484 {
2486
2489 }
2491 }
2492 }
2493}
2494
2495/*
2496 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2497 */
2498void
2500{
2501 int my_level = GetCurrentTransactionNestLevel();
2502
2503 /*
2504 * All we have to do is pop the stack --- the actions/notifies made in
2505 * this subxact are no longer interesting, and the space will be freed
2506 * when CurTransactionContext is recycled. We still have to free the
2507 * ActionList and NotificationList objects themselves, though, because
2508 * those are allocated in TopTransactionContext.
2509 *
2510 * Note that there might be no entries at all, or no entries for the
2511 * current subtransaction level, either because none were ever created, or
2512 * because we reentered this routine due to trouble during subxact abort.
2513 */
2514 while (pendingActions != NULL &&
2515 pendingActions->nestingLevel >= my_level)
2516 {
2518
2521 }
2522
2523 while (pendingNotifies != NULL &&
2524 pendingNotifies->nestingLevel >= my_level)
2525 {
2527
2530 }
2531}
2532
2533/*
2534 * HandleNotifyInterrupt
2535 *
2536 * Signal handler portion of interrupt handling. Let the backend know
2537 * that there's a pending notify interrupt. If we're currently reading
2538 * from the client, this will interrupt the read and
2539 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2540 */
2541void
2543{
2544 /*
2545 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2546 * you do here.
2547 */
2548
2549 /* signal that work needs to be done */
2551
2552 /* make sure the event is processed in due course */
2554}
2555
2556/*
2557 * ProcessNotifyInterrupt
2558 *
2559 * This is called if we see notifyInterruptPending set, just before
2560 * transmitting ReadyForQuery at the end of a frontend command, and
2561 * also if a notify signal occurs while reading from the frontend.
2562 * HandleNotifyInterrupt() will cause the read to be interrupted
2563 * via the process's latch, and this routine will get called.
2564 * If we are truly idle (ie, *not* inside a transaction block),
2565 * process the incoming notifies.
2566 *
2567 * If "flush" is true, force any frontend messages out immediately.
2568 * This can be false when being called at the end of a frontend command,
2569 * since we'll flush after sending ReadyForQuery.
2570 */
2571void
2573{
2575 return; /* not really idle */
2576
2577 /* Loop in case another signal arrives while sending messages */
2579 ProcessIncomingNotify(flush);
2580}
2581
2582
2583/*
2584 * Read all pending notifications from the queue, and deliver appropriate
2585 * ones to my frontend. Stop when we reach queue head or an uncommitted
2586 * notification.
2587 */
2588static void
2590{
2591 QueuePosition pos;
2592 QueuePosition head;
2593 Snapshot snapshot;
2594
2595 /*
2596 * Fetch current state, indicate to others that we have woken up, and that
2597 * we are in process of advancing our position.
2598 */
2600 /* Assert checks that we have a valid state entry */
2604 head = QUEUE_HEAD;
2605
2606 if (QUEUE_POS_EQUAL(pos, head))
2607 {
2608 /* Nothing to do, we have read all notifications already. */
2610 return;
2611 }
2612
2615
2616 /*----------
2617 * Get snapshot we'll use to decide which xacts are still in progress.
2618 * This is trickier than it might seem, because of race conditions.
2619 * Consider the following example:
2620 *
2621 * Backend 1: Backend 2:
2622 *
2623 * transaction starts
2624 * UPDATE foo SET ...;
2625 * NOTIFY foo;
2626 * commit starts
2627 * queue the notify message
2628 * transaction starts
2629 * LISTEN foo; -- first LISTEN in session
2630 * SELECT * FROM foo WHERE ...;
2631 * commit to clog
2632 * commit starts
2633 * add backend 2 to array of listeners
2634 * advance to queue head (this code)
2635 * commit to clog
2636 *
2637 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2638 * wasn't committed yet. Ideally we'd ensure that client 2 would
2639 * eventually get transaction 1's notify message, but there's no way
2640 * to do that; until we're in the listener array, there's no guarantee
2641 * that the notify message doesn't get removed from the queue.
2642 *
2643 * Therefore the coding technique transaction 2 is using is unsafe:
2644 * applications must commit a LISTEN before inspecting database state,
2645 * if they want to ensure they will see notifications about subsequent
2646 * changes to that state.
2647 *
2648 * What we do guarantee is that we'll see all notifications from
2649 * transactions committing after the snapshot we take here.
2650 * BecomeRegisteredListener has already added us to the listener array,
2651 * so no not-yet-committed messages can be removed from the queue
2652 * before we see them.
2653 *----------
2654 */
2655 snapshot = RegisterSnapshot(GetLatestSnapshot());
2656
2657 /*
2658 * It is possible that we fail while trying to send a message to our
2659 * frontend (for example, because of encoding conversion failure). If
2660 * that happens it is critical that we not try to send the same message
2661 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2662 * ERRORs to FATAL, causing the client connection to be closed on error.
2663 *
2664 * We used to only skip over the offending message and try to soldier on,
2665 * but it was somewhat questionable to lose a notification and give the
2666 * client an ERROR instead. A client application is not be prepared for
2667 * that and can't tell that a notification was missed. It was also not
2668 * very useful in practice because notifications are often processed while
2669 * a connection is idle and reading a message from the client, and in that
2670 * state, any error is upgraded to FATAL anyway. Closing the connection
2671 * is a clear signal to the application that it might have missed
2672 * notifications.
2673 */
2674 {
2676 bool reachedStop;
2677
2678 ExitOnAnyError = true;
2679
2680 do
2681 {
2682 /*
2683 * Process messages up to the stop position, end of page, or an
2684 * uncommitted message.
2685 *
2686 * Our stop position is what we found to be the head's position
2687 * when we entered this function. It might have changed already.
2688 * But if it has, we will receive (or have already received and
2689 * queued) another signal and come here again.
2690 *
2691 * We are not holding NotifyQueueLock here! The queue can only
2692 * extend beyond the head pointer (see above) and we leave our
2693 * backend's pointer where it is so nobody will truncate or
2694 * rewrite pages under us. Especially we don't want to hold a lock
2695 * while sending the notifications to the frontend.
2696 */
2697 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2698 } while (!reachedStop);
2699
2700 /* Update shared state */
2705
2707 }
2708
2709 /* Done with snapshot */
2710 UnregisterSnapshot(snapshot);
2711}
2712
2713/*
2714 * Fetch notifications from the shared queue, beginning at position current,
2715 * and deliver relevant ones to my frontend.
2716 *
2717 * The function returns true once we have reached the stop position or an
2718 * uncommitted notification, and false if we have finished with the page.
2719 * In other words: once it returns true there is no need to look further.
2720 * The QueuePosition *current is advanced past all processed messages.
2721 */
2722static bool
2724 QueuePosition stop,
2725 Snapshot snapshot)
2726{
2727 int64 curpage = QUEUE_POS_PAGE(*current);
2728 int slotno;
2729 char *page_buffer;
2730 bool reachedStop = false;
2731 bool reachedEndOfPage;
2732
2733 /*
2734 * We copy the entries into a local buffer to avoid holding the SLRU lock
2735 * while we transmit them to our frontend. The local buffer must be
2736 * adequately aligned.
2737 */
2739 char *local_buf_end = local_buf;
2740
2743 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2744
2745 do
2746 {
2747 QueuePosition thisentry = *current;
2749
2750 if (QUEUE_POS_EQUAL(thisentry, stop))
2751 break;
2752
2753 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2754
2755 /*
2756 * Advance *current over this message, possibly to the next page. As
2757 * noted in the comments for asyncQueueReadAllNotifications, we must
2758 * do this before possibly failing while processing the message.
2759 */
2760 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2761
2762 /* Ignore messages destined for other databases */
2763 if (qe->dboid == MyDatabaseId)
2764 {
2765 if (XidInMVCCSnapshot(qe->xid, snapshot))
2766 {
2767 /*
2768 * The source transaction is still in progress, so we can't
2769 * process this message yet. Break out of the loop, but first
2770 * back up *current so we will reprocess the message next
2771 * time. (Note: it is unlikely but not impossible for
2772 * TransactionIdDidCommit to fail, so we can't really avoid
2773 * this advance-then-back-up behavior when dealing with an
2774 * uncommitted message.)
2775 *
2776 * Note that we must test XidInMVCCSnapshot before we test
2777 * TransactionIdDidCommit, else we might return a message from
2778 * a transaction that is not yet visible to snapshots; compare
2779 * the comments at the head of heapam_visibility.c.
2780 *
2781 * Also, while our own xact won't be listed in the snapshot,
2782 * we need not check for TransactionIdIsCurrentTransactionId
2783 * because our transaction cannot (yet) have queued any
2784 * messages.
2785 */
2786 *current = thisentry;
2787 reachedStop = true;
2788 break;
2789 }
2790
2791 /*
2792 * Quick check for the case that we're not listening on any
2793 * channels, before calling TransactionIdDidCommit(). This makes
2794 * that case a little faster, but more importantly, it ensures
2795 * that if there's a bad entry in the queue for which
2796 * TransactionIdDidCommit() fails for some reason, we can skip
2797 * over it on the first LISTEN in a session, and not get stuck on
2798 * it indefinitely. (This is a little trickier than it looks: it
2799 * works because BecomeRegisteredListener runs this code before we
2800 * have made the first entry in localChannelTable.)
2801 */
2803 continue;
2804
2805 if (TransactionIdDidCommit(qe->xid))
2806 {
2807 memcpy(local_buf_end, qe, qe->length);
2808 local_buf_end += qe->length;
2809 }
2810 else
2811 {
2812 /*
2813 * The source transaction aborted or crashed, so we just
2814 * ignore its notifications.
2815 */
2816 }
2817 }
2818
2819 /* Loop back if we're not at end of page */
2820 } while (!reachedEndOfPage);
2821
2822 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2824
2825 /*
2826 * Now that we have let go of the SLRU bank lock, send the notifications
2827 * to our backend
2828 */
2830 for (char *p = local_buf; p < local_buf_end;)
2831 {
2833
2834 /* qe->data is the null-terminated channel name */
2835 char *channel = qe->data;
2836
2837 if (IsListeningOn(channel))
2838 {
2839 /* payload follows channel name */
2840 char *payload = qe->data + strlen(channel) + 1;
2841
2842 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2843 }
2844
2845 p += qe->length;
2846 }
2847
2848 if (QUEUE_POS_EQUAL(*current, stop))
2849 reachedStop = true;
2850
2851 return reachedStop;
2852}
2853
2854/*
2855 * Advance the shared queue tail variable to the minimum of all the
2856 * per-backend tail pointers. Truncate pg_notify space if possible.
2857 *
2858 * This is (usually) called during CommitTransaction(), so it's important for
2859 * it to have very low probability of failure.
2860 */
2861static void
2863{
2864 QueuePosition min;
2867 int64 boundary;
2868
2869 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2871
2872 /*
2873 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2874 * (ie, exactly match at least one backend's queue position), so it must
2875 * be updated atomically with the actual computation. Since v13, we could
2876 * get away with not doing it like that, but it seems prudent to keep it
2877 * so.
2878 *
2879 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2880 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2881 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2882 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2883 * there are pages we can truncate but haven't yet finished doing so.
2884 *
2885 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2886 * performing SimpleLruTruncate. This is OK because no backend will try
2887 * to access the pages we are in the midst of truncating.
2888 */
2890 min = QUEUE_HEAD;
2892 {
2894 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2895 }
2896 QUEUE_TAIL = min;
2899
2900 /*
2901 * We can truncate something if the global tail advanced across an SLRU
2902 * segment boundary.
2903 *
2904 * XXX it might be better to truncate only once every several segments, to
2905 * reduce the number of directory scans.
2906 */
2909 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2910 {
2911 /*
2912 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2913 * release the lock again.
2914 */
2916
2920 }
2921
2923}
2924
2925/*
2926 * AsyncNotifyFreezeXids
2927 *
2928 * Prepare the async notification queue for CLOG truncation by freezing
2929 * transaction IDs that are about to become inaccessible.
2930 *
2931 * This function is called by VACUUM before advancing datfrozenxid. It scans
2932 * the notification queue and replaces XIDs that would become inaccessible
2933 * after CLOG truncation with special markers:
2934 * - Committed transactions are set to FrozenTransactionId
2935 * - Aborted/crashed transactions are set to InvalidTransactionId
2936 *
2937 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2938 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2939 * (or it would have held back newFrozenXid through ProcArray).
2940 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2941 * either aborted explicitly or crashed, and we can safely mark it invalid.
2942 */
2943void
2945{
2946 QueuePosition pos;
2947 QueuePosition head;
2948 int64 curpage = -1;
2949 int slotno = -1;
2950 char *page_buffer = NULL;
2951 bool page_dirty = false;
2952
2953 /*
2954 * Acquire locks in the correct order to avoid deadlocks. As per the
2955 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2956 * bank locks.
2957 *
2958 * We only need SHARED mode since we're just reading the head/tail
2959 * positions, not modifying them.
2960 */
2963
2964 pos = QUEUE_TAIL;
2965 head = QUEUE_HEAD;
2966
2967 /* Release NotifyQueueLock early, we only needed to read the positions */
2969
2970 /*
2971 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2972 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2973 * we're working.
2974 */
2975 while (!QUEUE_POS_EQUAL(pos, head))
2976 {
2978 TransactionId xid;
2979 int64 pageno = QUEUE_POS_PAGE(pos);
2980 int offset = QUEUE_POS_OFFSET(pos);
2981
2982 /* If we need a different page, release old lock and get new one */
2983 if (pageno != curpage)
2984 {
2985 LWLock *lock;
2986
2987 /* Release previous page if any */
2988 if (slotno >= 0)
2989 {
2990 if (page_dirty)
2991 {
2992 NotifyCtl->shared->page_dirty[slotno] = true;
2993 page_dirty = false;
2994 }
2996 }
2997
2998 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3000 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3002 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3003 curpage = pageno;
3004 }
3005
3006 qe = (AsyncQueueEntry *) (page_buffer + offset);
3007 xid = qe->xid;
3008
3009 if (TransactionIdIsNormal(xid) &&
3011 {
3012 if (TransactionIdDidCommit(xid))
3013 {
3014 qe->xid = FrozenTransactionId;
3015 page_dirty = true;
3016 }
3017 else
3018 {
3019 qe->xid = InvalidTransactionId;
3020 page_dirty = true;
3021 }
3022 }
3023
3024 /* Advance to next entry */
3025 asyncQueueAdvance(&pos, qe->length);
3026 }
3027
3028 /* Release final page lock if we acquired one */
3029 if (slotno >= 0)
3030 {
3031 if (page_dirty)
3032 NotifyCtl->shared->page_dirty[slotno] = true;
3034 }
3035
3037}
3038
3039/*
3040 * ProcessIncomingNotify
3041 *
3042 * Scan the queue for arriving notifications and report them to the front
3043 * end. The notifications might be from other sessions, or our own;
3044 * there's no need to distinguish here.
3045 *
3046 * If "flush" is true, force any frontend messages out immediately.
3047 *
3048 * NOTE: since we are outside any transaction, we must create our own.
3049 */
3050static void
3052{
3053 /* We *must* reset the flag */
3054 notifyInterruptPending = false;
3055
3056 /* Do nothing else if we aren't actively listening */
3058 return;
3059
3060 if (Trace_notify)
3061 elog(DEBUG1, "ProcessIncomingNotify");
3062
3063 set_ps_display("notify interrupt");
3064
3065 /*
3066 * We must run asyncQueueReadAllNotifications inside a transaction, else
3067 * bad things happen if it gets an error.
3068 */
3070
3072
3074
3075 /*
3076 * If this isn't an end-of-command case, we must flush the notify messages
3077 * to ensure frontend gets them promptly.
3078 */
3079 if (flush)
3080 pq_flush();
3081
3082 set_ps_display("idle");
3083
3084 if (Trace_notify)
3085 elog(DEBUG1, "ProcessIncomingNotify: done");
3086}
3087
3088/*
3089 * Send NOTIFY message to my front end.
3090 */
3091void
3092NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3093{
3095 {
3097
3099 pq_sendint32(&buf, srcPid);
3100 pq_sendstring(&buf, channel);
3101 pq_sendstring(&buf, payload);
3103
3104 /*
3105 * NOTE: we do not do pq_flush() here. Some level of caller will
3106 * handle it later, allowing this message to be combined into a packet
3107 * with other ones.
3108 */
3109 }
3110 else
3111 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3112}
3113
3114/* Does pendingNotifies include a match for the given event? */
3115static bool
3117{
3118 if (pendingNotifies == NULL)
3119 return false;
3120
3122 {
3123 /* Use the hash table to probe for a match */
3125 &n,
3126 HASH_FIND,
3127 NULL))
3128 return true;
3129 }
3130 else
3131 {
3132 /* Must scan the event list */
3133 ListCell *l;
3134
3135 foreach(l, pendingNotifies->events)
3136 {
3138
3139 if (n->channel_len == oldn->channel_len &&
3140 n->payload_len == oldn->payload_len &&
3141 memcmp(n->data, oldn->data,
3142 n->channel_len + n->payload_len + 2) == 0)
3143 return true;
3144 }
3145 }
3146
3147 return false;
3148}
3149
3150/*
3151 * Add a notification event to a pre-existing pendingNotifies list.
3152 *
3153 * Because pendingNotifies->events is already nonempty, this works
3154 * correctly no matter what CurrentMemoryContext is.
3155 */
3156static void
3158{
3160
3161 /* Create the hash tables if it's time to */
3164 {
3166 ListCell *l;
3167
3168 /* Create the hash table */
3169 hash_ctl.keysize = sizeof(Notification *);
3170 hash_ctl.entrysize = sizeof(struct NotificationHash);
3175 hash_create("Pending Notifies",
3176 256L,
3177 &hash_ctl,
3179
3180 /* Create the unique channel name table */
3182 hash_ctl.keysize = NAMEDATALEN;
3183 hash_ctl.entrysize = sizeof(ChannelName);
3186 hash_create("Pending Notify Channel Names",
3187 64L,
3188 &hash_ctl,
3190
3191 /* Insert all the already-existing events */
3192 foreach(l, pendingNotifies->events)
3193 {
3195 char *channel = oldn->data;
3196 bool found;
3197
3199 &oldn,
3200 HASH_ENTER,
3201 &found);
3202 Assert(!found);
3203
3204 /* Add channel name to uniqueChannelHash; might be there already */
3206 channel,
3207 HASH_ENTER,
3208 NULL);
3209 }
3210 }
3211
3212 /* Add new event to the list, in order */
3214
3215 /* Add event to the hash tables if needed */
3217 {
3218 char *channel = n->data;
3219 bool found;
3220
3222 &n,
3223 HASH_ENTER,
3224 &found);
3225 Assert(!found);
3226
3227 /* Add channel name to uniqueChannelHash; might be there already */
3229 channel,
3230 HASH_ENTER,
3231 NULL);
3232 }
3233}
3234
3235/*
3236 * notification_hash: hash function for notification hash table
3237 *
3238 * The hash "keys" are pointers to Notification structs.
3239 */
3240static uint32
3241notification_hash(const void *key, Size keysize)
3242{
3243 const Notification *k = *(const Notification *const *) key;
3244
3245 Assert(keysize == sizeof(Notification *));
3246 /* We don't bother to include the payload's trailing null in the hash */
3247 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3248 k->channel_len + k->payload_len + 1));
3249}
3250
3251/*
3252 * notification_match: match function to use with notification_hash
3253 */
3254static int
3255notification_match(const void *key1, const void *key2, Size keysize)
3256{
3257 const Notification *k1 = *(const Notification *const *) key1;
3258 const Notification *k2 = *(const Notification *const *) key2;
3259
3260 Assert(keysize == sizeof(Notification *));
3261 if (k1->channel_len == k2->channel_len &&
3262 k1->payload_len == k2->payload_len &&
3263 memcmp(k1->data, k2->data,
3264 k1->channel_len + k1->payload_len + 2) == 0)
3265 return 0; /* equal */
3266 return 1; /* not equal */
3267}
3268
3269/* Clear the pendingActions and pendingNotifies lists. */
3270static void
3272{
3273 /*
3274 * Everything's allocated in either TopTransactionContext or the context
3275 * for the subtransaction to which it corresponds. So, there's nothing to
3276 * do here except reset the pointers; the space will be reclaimed when the
3277 * contexts are deleted.
3278 */
3281 /* Also clear pendingListenActions, which is derived from pendingActions */
3283}
3284
3285/*
3286 * GUC check_hook for notify_buffers
3287 */
3288bool
3290{
3291 return check_slru_buffers("notify_buffers", newval);
3292}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2258
static double asyncQueueUsage(void)
Definition async.c:2185
#define MIN_HASHABLE_NOTIFIES
Definition async.c:512
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1522
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:1998
#define QUEUE_FIRST_LISTENER
Definition async.c:350
#define QUEUE_POS_MAX(x, y)
Definition async.c:258
static bool tryAdvanceTail
Definition async.c:563
void HandleNotifyInterrupt(void)
Definition async.c:2542
static void BecomeRegisteredListener(void)
Definition async.c:1421
static void asyncQueueAdvanceTail(void)
Definition async.c:2862
int max_notify_queue_pages
Definition async.c:569
static ActionList * pendingActions
Definition async.c:443
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1712
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:356
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3241
void Async_UnlistenAll(void)
Definition async.c:1066
static int32 * signalPids
Definition async.c:559
static SlruCtlData NotifyCtlData
Definition async.c:361
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3092
void AtCommit_Notify(void)
Definition async.c:1369
#define QUEUE_POS_MIN(x, y)
Definition async.c:252
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1625
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2572
ListenActionKind
Definition async.c:424
@ LISTEN_LISTEN
Definition async.c:425
@ LISTEN_UNLISTEN_ALL
Definition async.c:427
@ LISTEN_UNLISTEN
Definition async.c:426
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3116
#define QUEUE_BACKEND_POS(i)
Definition async.c:354
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:662
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:377
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3255
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:649
#define SET_QUEUE_POS(x, y, z)
Definition async.c:239
static ProcNumber * signalProcnos
Definition async.c:560
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2723
static void ProcessIncomingNotify(bool flush)
Definition async.c:3051
static void asyncQueueReadAllNotifications(void)
Definition async.c:2589
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1138
#define QUEUE_POS_OFFSET(x)
Definition async.c:237
static QueuePosition queueHeadAfterWrite
Definition async.c:552
bool Trace_notify
Definition async.c:566
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2034
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3271
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1086
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:851
static NotificationList * pendingNotifies
Definition async.c:519
#define AsyncQueueEntryEmptySize
Definition async.c:225
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3157
static AsyncQueueControl * asyncQueueControl
Definition async.c:345
static bool unlistenExitRegistered
Definition async.c:540
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:627
static dsa_area * globalChannelDSA
Definition async.c:400
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1965
#define QUEUE_TAIL
Definition async.c:348
void AtAbort_Notify(void)
Definition async.c:2410
#define QUEUE_POS_PAGE(x)
Definition async.c:236
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1685
void PreCommit_Notify(void)
Definition async.c:1176
#define QUEUE_CLEANUP_DELAY
Definition async.c:280
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1655
static void asyncQueueFillWarning(void)
Definition async.c:2206
#define QUEUE_BACKEND_PID(i)
Definition async.c:351
static void CleanupListenersOnExit(void)
Definition async.c:1837
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:637
Size AsyncShmemSize(void)
Definition async.c:775
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:366
void Async_Unlisten(const char *channel)
Definition async.c:1048
static HTAB * pendingListenActions
Definition async.c:465
void Async_Listen(const char *channel)
Definition async.c:1034
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:199
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:248
static void initGlobalChannelTable(void)
Definition async.c:676
#define NotifyCtl
Definition async.c:363
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:355
static HTAB * localChannelTable
Definition async.c:407
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:617
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:987
#define QUEUEALIGN(len)
Definition async.c:223
static bool amRegisteredListener
Definition async.c:543
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:264
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:353
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:352
void AtSubAbort_Notify(void)
Definition async.c:2499
void AtPrepare_Notify(void)
Definition async.c:1151
#define QUEUE_PAGESIZE
Definition async.c:364
void AtSubCommit_Notify(void)
Definition async.c:2429
static bool asyncQueueIsFull(void)
Definition async.c:1950
#define QUEUE_HEAD
Definition async.c:347
void AsyncShmemInit(void)
Definition async.c:792
static void initLocalChannelTable(void)
Definition async.c:727
PendingListenAction
Definition async.c:454
@ PENDING_UNLISTEN
Definition async.c:456
@ PENDING_LISTEN
Definition async.c:455
static dshash_table * globalChannelTable
Definition async.c:399
static void asyncQueueUnregister(void)
Definition async.c:1907
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2160
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:245
#define LocalChannelTableIsEmpty()
Definition async.c:410
static void initPendingListenActions(void)
Definition async.c:753
static QueuePosition queueHeadBeforeWrite
Definition async.c:551
static bool IsListeningOn(const char *channel)
Definition async.c:1894
void Async_Notify(const char *channel, const char *payload)
Definition async.c:885
volatile sig_atomic_t notifyInterruptPending
Definition async.c:537
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2944
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3289
#define QUEUE_STOP_PAGE
Definition async.c:349
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
#define CStringGetTextDatum(s)
Definition builtins.h:97
#define Assert(condition)
Definition c.h:873
int64_t int64
Definition c.h:543
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:480
int32_t int32
Definition c.h:542
uint16_t uint16
Definition c.h:545
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
size_t Size
Definition c.h:619
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
dsm_handle dsa_handle
Definition dsa.h:136
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:592
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:543
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:560
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:640
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:392
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:369
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:272
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:749
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
Definition dshash.c:435
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:659
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:208
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:574
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:759
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
dsa_pointer dshash_table_handle
Definition dshash.h:24
uint32 dshash_hash
Definition dshash.h:30
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define DEBUG3
Definition elog.h:28
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define INFO
Definition elog.h:34
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:47
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
bool ExitOnAnyError
Definition globals.c:123
int notify_buffers
Definition globals.c:164
struct Latch * MyLatch
Definition globals.c:63
Oid MyDatabaseId
Definition globals.c:94
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:96
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_COMPARE
Definition hsearch.h:99
#define HASH_FUNCTION
Definition hsearch.h:98
#define IsParallelWorker()
Definition parallel.h:60
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
#define pq_flush()
Definition libpq.h:46
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
void * arg
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:343
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:92
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:232
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:284
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:252
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition slru.c:630
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1816
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1769
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition slru.c:527
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:375
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1433
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:198
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:355
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:160
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:439
int nestingLevel
Definition async.c:438
struct ActionList * upper
Definition async.c:440
ProcNumber firstListener
Definition async.c:336
QueuePosition tail
Definition async.c:332
QueuePosition head
Definition async.c:331
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:342
dshash_table_handle globalChannelTableDSH
Definition async.c:340
TimestampTz lastQueueFillWarn
Definition async.c:338
dsa_handle globalChannelTableDSA
Definition async.c:339
int32 srcPid
Definition async.c:218
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:219
TransactionId xid
Definition async.c:217
char channel[NAMEDATALEN]
Definition async.c:527
dsa_pointer listenersArray
Definition async.c:394
int allocatedListeners
Definition async.c:396
GlobalChannelKey key
Definition async.c:393
char channel[NAMEDATALEN]
Definition async.c:382
Size keysize
Definition hsearch.h:75
Definition pg_list.h:54
ListenActionKind action
Definition async.c:432
ProcNumber procNo
Definition async.c:387
bool listening
Definition async.c:388
Notification * event
Definition async.c:516
List * uniqueChannelNames
Definition async.c:507
HTAB * uniqueChannelHash
Definition async.c:508
HTAB * hashtab
Definition async.c:506
List * events
Definition async.c:505
struct NotificationList * upper
Definition async.c:509
uint16 payload_len
Definition async.c:497
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:499
uint16 channel_len
Definition async.c:496
PendingListenAction action
Definition async.c:462
char channel[NAMEDATALEN]
Definition async.c:461
ProcNumber nextListener
Definition async.c:289
QueuePosition pos
Definition async.c:290
int64 page
Definition async.c:232
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:443
char * text_to_cstring(const text *t)
Definition varlena.c:214
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011
int GetCurrentTransactionNestLevel(void)
Definition xact.c:930
void StartTransactionCommand(void)
Definition xact.c:3080
void CommitTransactionCommand(void)
Definition xact.c:3178
TransactionId GetCurrentTransactionId(void)
Definition xact.c:455