PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "storage/subsystems.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  GlobalChannelKey
 
struct  ListenerEntry
 
struct  GlobalChannelEntry
 
struct  ListenAction
 
struct  ActionList
 
struct  PendingListenEntry
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 
struct  ChannelName
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_POS_PRECEDES(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define QUEUE_BACKEND_WAKEUP_PENDING(i)   (asyncQueueControl->backend[i].wakeupPending)
 
#define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
 
#define NotifyCtl   (&NotifySlruDesc)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define INITIAL_LISTENERS_ARRAY_SIZE   4
 
#define LocalChannelTableIsEmpty()    (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct GlobalChannelKey GlobalChannelKey
 
typedef struct ListenerEntry ListenerEntry
 
typedef struct GlobalChannelEntry GlobalChannelEntry
 
typedef struct ActionList ActionList
 
typedef struct PendingListenEntry PendingListenEntry
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct ChannelName ChannelName
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 
enum  PendingListenAction { PENDING_LISTEN , PENDING_UNLISTEN }
 

Functions

static void AsyncShmemRequest (void *arg)
 
static void AsyncShmemInit (void *arg)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static int asyncQueueErrdetailForIoError (const void *opaque_data)
 
static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static void GlobalChannelKeyInit (GlobalChannelKey *key, Oid dboid, const char *channel)
 
static dshash_hash globalChannelTableHash (const void *key, size_t size, void *arg)
 
static void initGlobalChannelTable (void)
 
static void initLocalChannelTable (void)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void BecomeRegisteredListener (void)
 
static void PrepareTableEntriesForListen (const char *channel)
 
static void PrepareTableEntriesForUnlisten (const char *channel)
 
static void PrepareTableEntriesForUnlistenAll (void)
 
static void RemoveListenerFromChannel (GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
 
static void ApplyPendingListenActions (bool isCommit)
 
static void CleanupListenersOnExit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (QueuePosition *current, QueuePosition stop, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
static void initPendingListenActions (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void AsyncNotifyFreezeXids (TransactionId newFrozenXid)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
const ShmemCallbacks AsyncShmemCallbacks
 
static SlruDesc NotifySlruDesc
 
static dshash_tableglobalChannelTable = NULL
 
static dsa_areaglobalChannelDSA = NULL
 
static HTABlocalChannelTable = NULL
 
static ActionListpendingActions = NULL
 
static HTABpendingListenActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static QueuePosition queueHeadBeforeWrite
 
static QueuePosition queueHeadAfterWrite
 
static int32signalPids = NULL
 
static ProcNumbersignalProcnos = NULL
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 
static const dshash_parameters globalChannelTableDSHParams
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 227 of file async.c.

◆ INITIAL_LISTENERS_ARRAY_SIZE

#define INITIAL_LISTENERS_ARRAY_SIZE   4

Definition at line 392 of file async.c.

◆ LocalChannelTableIsEmpty

#define LocalChannelTableIsEmpty ( )     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)

Definition at line 425 of file async.c.

438{
443
444typedef struct
445{
447 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
449
450typedef struct ActionList
451{
452 int nestingLevel; /* current transaction nesting depth */
453 List *actions; /* list of ListenAction structs */
454 struct ActionList *upper; /* details for upper transaction levels */
455} ActionList;
456
458
459/*
460 * Hash table recording the final listen/unlisten intent per channel for
461 * the current transaction. Key is channel name, value is PENDING_LISTEN or
462 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
463 * per channel instead of replaying every action. This is built from the
464 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
465 * AtAbort_Notify.
466 */
467typedef enum
468{
472
473typedef struct PendingListenEntry
474{
475 char channel[NAMEDATALEN]; /* hash key */
476 PendingListenAction action; /* which action should we perform? */
478
480
481/*
482 * State for outbound notifies consists of a list of all channels+payloads
483 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
484 * until and unless the transaction commits. pendingNotifies is NULL if no
485 * NOTIFYs have been done in the current (sub) transaction.
486 *
487 * We discard duplicate notify events issued in the same transaction.
488 * Hence, in addition to the list proper (which we need to track the order
489 * of the events, since we guarantee to deliver them in order), we build a
490 * hash table which we can probe to detect duplicates. Since building the
491 * hash table is somewhat expensive, we do so only once we have at least
492 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
493 * before that we just scan the events linearly.
494 *
495 * The list is kept in CurTransactionContext. In subtransactions, each
496 * subtransaction has its own list in its own CurTransactionContext, but
497 * successful subtransactions add their entries to their parent's list.
498 * Failed subtransactions simply discard their lists. Since these lists
499 * are independent, there may be notify events in a subtransaction's list
500 * that duplicate events in some ancestor (sub) transaction; we get rid of
501 * the dups when merging the subtransaction's list into its parent's.
502 *
503 * Note: the action and notify lists do not interact within a transaction.
504 * In particular, if a transaction does NOTIFY and then LISTEN on the same
505 * condition name, it will get a self-notify at commit. This is a bit odd
506 * but is consistent with our historical behavior.
507 */
508typedef struct Notification
509{
510 uint16 channel_len; /* length of channel-name string */
511 uint16 payload_len; /* length of payload string */
512 /* null-terminated channel name, then null-terminated payload follow */
515
516typedef struct NotificationList
517{
518 int nestingLevel; /* current transaction nesting depth */
519 List *events; /* list of Notification structs */
520 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
521 List *uniqueChannelNames; /* unique channel names being notified */
522 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
523 struct NotificationList *upper; /* details for upper transaction levels */
525
526#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
527
528struct NotificationHash
529{
530 Notification *event; /* => the actual Notification struct */
531};
532
534
535/*
536 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
537 * (both just carry the channel name, with no payload).
538 */
539typedef struct ChannelName
540{
541 char channel[NAMEDATALEN]; /* hash key */
543
544/*
545 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
546 * called from inside a signal handler. That just sets the
547 * notifyInterruptPending flag and sets the process
548 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
549 * actually deal with the interrupt.
550 */
551volatile sig_atomic_t notifyInterruptPending = false;
552
553/* True if we've registered an on_shmem_exit cleanup */
554static bool unlistenExitRegistered = false;
555
556/* True if we're currently registered as a listener in asyncQueueControl */
557static bool amRegisteredListener = false;
558
559/*
560 * Queue head positions for direct advancement.
561 * These are captured during PreCommit_Notify while holding the heavyweight
562 * lock on database 0, ensuring no other backend can insert notifications
563 * between them. SignalBackends uses these to advance idle backends.
564 */
567
568/*
569 * Workspace arrays for SignalBackends. These are preallocated in
570 * PreCommit_Notify to avoid needing memory allocation after committing to
571 * clog.
572 */
573static int32 *signalPids = NULL;
575
576/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
577static bool tryAdvanceTail = false;
578
579/* GUC parameters */
580bool Trace_notify = false;
581
582/* For 8 KB pages this gives 8 GB of disk space */
583int max_notify_queue_pages = 1048576;
584
585/* local function prototypes */
586static inline int64 asyncQueuePageDiff(int64 p, int64 q);
587static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
588 const char *channel);
589static dshash_hash globalChannelTableHash(const void *key, size_t size,
590 void *arg);
591static void initGlobalChannelTable(void);
592static void initLocalChannelTable(void);
593static void queue_listen(ListenActionKind action, const char *channel);
594static void Async_UnlistenOnExit(int code, Datum arg);
595static void BecomeRegisteredListener(void);
596static void PrepareTableEntriesForListen(const char *channel);
597static void PrepareTableEntriesForUnlisten(const char *channel);
598static void PrepareTableEntriesForUnlistenAll(void);
601 int idx);
602static void ApplyPendingListenActions(bool isCommit);
603static void CleanupListenersOnExit(void);
604static bool IsListeningOn(const char *channel);
605static void asyncQueueUnregister(void);
606static bool asyncQueueIsFull(void);
607static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
610static double asyncQueueUsage(void);
611static void asyncQueueFillWarning(void);
612static void SignalBackends(void);
613static void asyncQueueReadAllNotifications(void);
615 QueuePosition stop,
616 Snapshot snapshot);
617static void asyncQueueAdvanceTail(void);
618static void ProcessIncomingNotify(bool flush);
621static uint32 notification_hash(const void *key, Size keysize);
622static int notification_match(const void *key1, const void *key2, Size keysize);
623static void ClearPendingActionsAndNotifies(void);
624
625static int
627{
628 const QueuePosition *position = opaque_data;
629
630 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
631 position->page, position->offset);
632}
633
634/*
635 * Compute the difference between two queue page numbers.
636 * Previously this function accounted for a wraparound.
637 */
638static inline int64
640{
641 return p - q;
642}
643
644/*
645 * Determines whether p precedes q.
646 * Previously this function accounted for a wraparound.
647 */
648static inline bool
650{
651 return p < q;
652}
653
654/*
655 * GlobalChannelKeyInit
656 * Prepare a global channel table key for hashing.
657 */
658static inline void
659GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
660{
661 memset(key, 0, sizeof(GlobalChannelKey));
662 key->dboid = dboid;
663 strlcpy(key->channel, channel, NAMEDATALEN);
664}
665
666/*
667 * globalChannelTableHash
668 * Hash function for global channel table keys.
669 */
670static dshash_hash
671globalChannelTableHash(const void *key, size_t size, void *arg)
672{
673 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
674 dshash_hash h;
675
677 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
679
680 return h;
681}
682
683/* parameters for the global channel table */
685 sizeof(GlobalChannelKey),
686 sizeof(GlobalChannelEntry),
691};
692
693/*
694 * initGlobalChannelTable
695 * Lazy initialization of the global channel table.
696 */
697static void
699{
700 MemoryContext oldcontext;
701
702 /* Quick exit if we already did this */
705 return;
706
707 /* Otherwise, use a lock to ensure only one process creates the table */
709
710 /* Be sure any local memory allocated by DSA routines is persistent */
712
714 {
715 /* Initialize dynamic shared hash table for global channels */
721 NULL);
722
723 /* Store handles in shared memory for other backends to use */
727 }
728 else if (!globalChannelTable)
729 {
730 /* Attach to existing dynamic shared hash table */
736 NULL);
737 }
738
739 MemoryContextSwitchTo(oldcontext);
741}
742
743/*
744 * initLocalChannelTable
745 * Lazy initialization of the local channel table.
746 * Once created, this table lasts for the life of the session.
747 */
748static void
750{
752
753 /* Quick exit if we already did this */
754 if (localChannelTable != NULL)
755 return;
756
757 /* Initialize local hash table for this backend's listened channels */
759 hash_ctl.entrysize = sizeof(ChannelName);
760
762 hash_create("Local Listen Channels",
763 64,
764 &hash_ctl,
766}
767
768/*
769 * initPendingListenActions
770 * Lazy initialization of the pending listen actions hash table.
771 * This is allocated in CurTransactionContext during PreCommit_Notify,
772 * and destroyed at transaction end.
773 */
774static void
776{
778
780 return;
781
783 hash_ctl.entrysize = sizeof(PendingListenEntry);
785
787 hash_create("Pending Listen Actions",
789 &hash_ctl,
791}
792
793/*
794 * Register our shared memory needs
795 */
796static void
798{
799 Size size;
800
801 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
802 size = add_size(size, offsetof(AsyncQueueControl, backend));
803
804 ShmemRequestStruct(.name = "Async Queue Control",
805 .size = size,
806 .ptr = (void **) &asyncQueueControl,
807 );
808
810 .name = "notify",
811 .Dir = "pg_notify",
812
813 /* long segment names are used in order to avoid wraparound */
814 .long_segment_names = true,
815
816 .nslots = notify_buffers,
817
818 .sync_handler = SYNC_HANDLER_NONE,
819 .PagePrecedes = asyncQueuePagePrecedes,
820 .errdetail_for_io_error = asyncQueueErrdetailForIoError,
821
822 .buffer_tranche_id = LWTRANCHE_NOTIFY_BUFFER,
823 .bank_tranche_id = LWTRANCHE_NOTIFY_SLRU,
824 );
825}
826
827static void
828AsyncShmemInit(void *arg)
829{
832 QUEUE_STOP_PAGE = 0;
837 for (int i = 0; i < MaxBackends; i++)
838 {
845 }
846
847 /*
848 * During start or reboot, clean out the pg_notify directory.
849 */
851}
852
853
854/*
855 * pg_notify -
856 * SQL function to send a notification event
857 */
858Datum
860{
861 const char *channel;
862 const char *payload;
863
864 if (PG_ARGISNULL(0))
865 channel = "";
866 else
868
869 if (PG_ARGISNULL(1))
870 payload = "";
871 else
873
874 /* For NOTIFY as a statement, this is checked in ProcessUtility */
876
877 Async_Notify(channel, payload);
878
880}
881
882
883/*
884 * Async_Notify
885 *
886 * This is executed by the SQL notify command.
887 *
888 * Adds the message to the list of pending notifies.
889 * Actual notification happens during transaction commit.
890 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
891 */
892void
893Async_Notify(const char *channel, const char *payload)
894{
895 int my_level = GetCurrentTransactionNestLevel();
896 size_t channel_len;
897 size_t payload_len;
898 Notification *n;
899 MemoryContext oldcontext;
900
901 if (IsParallelWorker())
902 elog(ERROR, "cannot send notifications from a parallel worker");
903
904 if (Trace_notify)
905 elog(DEBUG1, "Async_Notify(%s)", channel);
906
907 channel_len = channel ? strlen(channel) : 0;
908 payload_len = payload ? strlen(payload) : 0;
909
910 /* a channel name must be specified */
911 if (channel_len == 0)
914 errmsg("channel name cannot be empty")));
915
916 /* enforce length limits */
917 if (channel_len >= NAMEDATALEN)
920 errmsg("channel name too long")));
921
922 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
925 errmsg("payload string too long")));
926
927 /*
928 * We must construct the Notification entry, even if we end up not using
929 * it, in order to compare it cheaply to existing list entries.
930 *
931 * The notification list needs to live until end of transaction, so store
932 * it in the transaction context.
933 */
935
937 channel_len + payload_len + 2);
938 n->channel_len = channel_len;
939 n->payload_len = payload_len;
940 strcpy(n->data, channel);
941 if (payload)
942 strcpy(n->data + channel_len + 1, payload);
943 else
944 n->data[channel_len + 1] = '\0';
945
946 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
947 {
949
950 /*
951 * First notify event in current (sub)xact. Note that we allocate the
952 * NotificationList in TopTransactionContext; the nestingLevel might
953 * get changed later by AtSubCommit_Notify.
954 */
957 sizeof(NotificationList));
958 notifies->nestingLevel = my_level;
959 notifies->events = list_make1(n);
960 /* We certainly don't need a hashtable yet */
961 notifies->hashtab = NULL;
962 /* We won't build uniqueChannelNames/Hash till later, either */
963 notifies->uniqueChannelNames = NIL;
964 notifies->uniqueChannelHash = NULL;
965 notifies->upper = pendingNotifies;
967 }
968 else
969 {
970 /* Now check for duplicates */
972 {
973 /* It's a dup, so forget it */
974 pfree(n);
975 MemoryContextSwitchTo(oldcontext);
976 return;
977 }
978
979 /* Append more events to existing list */
981 }
982
983 MemoryContextSwitchTo(oldcontext);
984}
985
986/*
987 * queue_listen
988 * Common code for listen, unlisten, unlisten all commands.
989 *
990 * Adds the request to the list of pending actions.
991 * Actual update of localChannelTable and globalChannelTable happens during
992 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
993 */
994static void
995queue_listen(ListenActionKind action, const char *channel)
996{
997 MemoryContext oldcontext;
999 int my_level = GetCurrentTransactionNestLevel();
1000
1001 /*
1002 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1003 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1004 * final per-channel intent is computed during PreCommit_Notify.
1005 */
1007
1008 /* space for terminating null is included in sizeof(ListenAction) */
1010 strlen(channel) + 1);
1011 actrec->action = action;
1012 strcpy(actrec->channel, channel);
1013
1014 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1015 {
1016 ActionList *actions;
1017
1018 /*
1019 * First action in current sub(xact). Note that we allocate the
1020 * ActionList in TopTransactionContext; the nestingLevel might get
1021 * changed later by AtSubCommit_Notify.
1022 */
1023 actions = (ActionList *)
1025 actions->nestingLevel = my_level;
1026 actions->actions = list_make1(actrec);
1027 actions->upper = pendingActions;
1028 pendingActions = actions;
1029 }
1030 else
1032
1033 MemoryContextSwitchTo(oldcontext);
1034}
1035
1036/*
1037 * Async_Listen
1038 *
1039 * This is executed by the SQL listen command.
1040 */
1041void
1042Async_Listen(const char *channel)
1043{
1044 if (Trace_notify)
1045 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1046
1047 queue_listen(LISTEN_LISTEN, channel);
1048}
1049
1050/*
1051 * Async_Unlisten
1052 *
1053 * This is executed by the SQL unlisten command.
1054 */
1055void
1056Async_Unlisten(const char *channel)
1057{
1058 if (Trace_notify)
1059 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1060
1061 /* If we couldn't possibly be listening, no need to queue anything */
1063 return;
1064
1065 queue_listen(LISTEN_UNLISTEN, channel);
1066}
1067
1068/*
1069 * Async_UnlistenAll
1070 *
1071 * This is invoked by UNLISTEN * command, and also at backend exit.
1072 */
1073void
1075{
1076 if (Trace_notify)
1077 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1078
1079 /* If we couldn't possibly be listening, no need to queue anything */
1081 return;
1082
1084}
1085
1086/*
1087 * SQL function: return a set of the channel names this backend is actively
1088 * listening to.
1089 *
1090 * Note: this coding relies on the fact that the localChannelTable cannot
1091 * change within a transaction.
1092 */
1093Datum
1095{
1097 HASH_SEQ_STATUS *status;
1098
1099 /* stuff done only on the first call of the function */
1100 if (SRF_IS_FIRSTCALL())
1101 {
1102 /* create a function context for cross-call persistence */
1104
1105 /* Initialize hash table iteration if we have any channels */
1106 if (localChannelTable != NULL)
1107 {
1108 MemoryContext oldcontext;
1109
1110 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1111 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1113 funcctx->user_fctx = status;
1114 MemoryContextSwitchTo(oldcontext);
1115 }
1116 else
1117 {
1118 funcctx->user_fctx = NULL;
1119 }
1120 }
1121
1122 /* stuff done on every call of the function */
1124 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1125
1126 if (status != NULL)
1127 {
1128 ChannelName *entry;
1129
1130 entry = (ChannelName *) hash_seq_search(status);
1131 if (entry != NULL)
1133 }
1134
1136}
1137
1138/*
1139 * Async_UnlistenOnExit
1140 *
1141 * This is executed at backend exit if we have done any LISTENs in this
1142 * backend. It might not be necessary anymore, if the user UNLISTENed
1143 * everything, but we don't try to detect that case.
1144 */
1145static void
1147{
1150}
1151
1152/*
1153 * AtPrepare_Notify
1154 *
1155 * This is called at the prepare phase of a two-phase
1156 * transaction. Save the state for possible commit later.
1157 */
1158void
1159AtPrepare_Notify(void)
1160{
1161 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1163 ereport(ERROR,
1165 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1166}
1167
1168/*
1169 * PreCommit_Notify
1170 *
1171 * This is called at transaction commit, before actually committing to
1172 * clog.
1173 *
1174 * If there are pending LISTEN actions, make sure we are listed in the
1175 * shared-memory listener array. This must happen before commit to
1176 * ensure we don't miss any notifies from transactions that commit
1177 * just after ours.
1178 *
1179 * If there are outbound notify requests in the pendingNotifies list,
1180 * add them to the global queue. We do that before commit so that
1181 * we can still throw error if we run out of queue space.
1182 */
1183void
1184PreCommit_Notify(void)
1185{
1186 ListCell *p;
1187
1189 return; /* no relevant statements in this xact */
1190
1191 if (Trace_notify)
1192 elog(DEBUG1, "PreCommit_Notify");
1193
1194 /* Preflight for any pending listen/unlisten actions */
1196
1197 if (pendingActions != NULL)
1198 {
1199 /* Ensure we have a local channel table */
1201 /* Create pendingListenActions hash table for this transaction */
1203
1204 /* Stage all the actions this transaction wants to perform */
1205 foreach(p, pendingActions->actions)
1206 {
1208
1209 switch (actrec->action)
1210 {
1211 case LISTEN_LISTEN:
1214 break;
1215 case LISTEN_UNLISTEN:
1217 break;
1220 break;
1221 }
1222 }
1223 }
1224
1225 /* Queue any pending notifies (must happen after the above) */
1226 if (pendingNotifies)
1227 {
1229 bool firstIteration = true;
1230
1231 /*
1232 * Build list of unique channel names being notified for use by
1233 * SignalBackends().
1234 *
1235 * If uniqueChannelHash is available, use it to efficiently get the
1236 * unique channels. Otherwise, fall back to the O(N^2) approach.
1237 */
1240 {
1241 HASH_SEQ_STATUS status;
1243
1245 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1248 channelEntry->channel);
1249 }
1250 else
1251 {
1252 /* O(N^2) approach is better for small number of notifications */
1254 {
1255 char *channel = n->data;
1256 bool found = false;
1257
1258 /* Name present in list? */
1260 {
1261 if (strcmp(oldchan, channel) == 0)
1262 {
1263 found = true;
1264 break;
1265 }
1266 }
1267 /* Add if not already in list */
1268 if (!found)
1271 channel);
1272 }
1273 }
1274
1275 /* Preallocate workspace that will be needed by SignalBackends() */
1276 if (signalPids == NULL)
1278 MaxBackends * sizeof(int32));
1279
1280 if (signalProcnos == NULL)
1282 MaxBackends * sizeof(ProcNumber));
1283
1284 /*
1285 * Make sure that we have an XID assigned to the current transaction.
1286 * GetCurrentTransactionId is cheap if we already have an XID, but not
1287 * so cheap if we don't, and we'd prefer not to do that work while
1288 * holding NotifyQueueLock.
1289 */
1291
1292 /*
1293 * Serialize writers by acquiring a special lock that we hold till
1294 * after commit. This ensures that queue entries appear in commit
1295 * order, and in particular that there are never uncommitted queue
1296 * entries ahead of committed ones, so an uncommitted transaction
1297 * can't block delivery of deliverable notifications.
1298 *
1299 * We use a heavyweight lock so that it'll automatically be released
1300 * after either commit or abort. This also allows deadlocks to be
1301 * detected, though really a deadlock shouldn't be possible here.
1302 *
1303 * The lock is on "database 0", which is pretty ugly but it doesn't
1304 * seem worth inventing a special locktag category just for this.
1305 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1306 * used by the flatfiles mechanism.)
1307 */
1310
1311 /*
1312 * For the direct advancement optimization in SignalBackends(), we
1313 * need to ensure that no other backend can insert queue entries
1314 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1315 * heavyweight lock above provides this guarantee, since it serializes
1316 * all writers.
1317 *
1318 * Note: if the heavyweight lock were ever removed for scalability
1319 * reasons, we could achieve the same guarantee by holding
1320 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1321 * than releasing and reacquiring it for each page as we do below.
1322 */
1323
1324 /* Initialize values to a safe default in case list is empty */
1327
1328 /* Now push the notifications into the queue */
1330 while (nextNotify != NULL)
1331 {
1332 /*
1333 * Add the pending notifications to the queue. We acquire and
1334 * release NotifyQueueLock once per page, which might be overkill
1335 * but it does allow readers to get in while we're doing this.
1336 *
1337 * A full queue is very uncommon and should really not happen,
1338 * given that we have so much space available in the SLRU pages.
1339 * Nevertheless we need to deal with this possibility. Note that
1340 * when we get here we are in the process of committing our
1341 * transaction, but we have not yet committed to clog, so at this
1342 * point in time we can still roll the transaction back.
1343 */
1345 if (firstIteration)
1346 {
1348 firstIteration = false;
1349 }
1351 if (asyncQueueIsFull())
1352 ereport(ERROR,
1354 errmsg("too many notifications in the NOTIFY queue")));
1358 }
1359
1360 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1361 }
1362}
1363
1364/*
1365 * AtCommit_Notify
1366 *
1367 * This is called at transaction commit, after committing to clog.
1368 *
1369 * Apply pending listen/unlisten changes and clear transaction-local state.
1370 *
1371 * If we issued any notifications in the transaction, send signals to
1372 * listening backends (possibly including ourselves) to process them.
1373 * Also, if we filled enough queue pages with new notifies, try to
1374 * advance the queue tail pointer.
1375 */
1376void
1377AtCommit_Notify(void)
1378{
1379 /*
1380 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1381 * return as soon as possible
1382 */
1384 return;
1385
1386 if (Trace_notify)
1387 elog(DEBUG1, "AtCommit_Notify");
1388
1389 /* Apply staged listen/unlisten changes */
1391
1392 /* If no longer listening to anything, get out of listener array */
1395
1396 /*
1397 * Send signals to listening backends. We need do this only if there are
1398 * pending notifies, which were previously added to the shared queue by
1399 * PreCommit_Notify().
1400 */
1401 if (pendingNotifies != NULL)
1403
1404 /*
1405 * If it's time to try to advance the global tail pointer, do that.
1406 *
1407 * (It might seem odd to do this in the sender, when more than likely the
1408 * listeners won't yet have read the messages we just sent. However,
1409 * there's less contention if only the sender does it, and there is little
1410 * need for urgency in advancing the global tail. So this typically will
1411 * be clearing out messages that were sent some time ago.)
1412 */
1413 if (tryAdvanceTail)
1414 {
1415 tryAdvanceTail = false;
1417 }
1418
1419 /* And clean up */
1421}
1422
1423/*
1424 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1425 *
1426 * This function must make sure we are ready to catch any incoming messages.
1427 */
1428static void
1430{
1431 QueuePosition head;
1432 QueuePosition max;
1434
1435 /*
1436 * Nothing to do if we are already listening to something, nor if we
1437 * already ran this routine in this transaction.
1438 */
1440 return;
1441
1442 if (Trace_notify)
1443 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1444
1445 /*
1446 * Before registering, make sure we will unlisten before dying. (Note:
1447 * this action does not get undone if we abort later.)
1448 */
1450 {
1453 }
1454
1455 /*
1456 * This is our first LISTEN, so establish our pointer.
1457 *
1458 * We set our pointer to the global tail pointer and then move it forward
1459 * over already-committed notifications. This ensures we cannot miss any
1460 * not-yet-committed notifications. We might get a few more but that
1461 * doesn't hurt.
1462 *
1463 * In some scenarios there might be a lot of committed notifications that
1464 * have not yet been pruned away (because some backend is being lazy about
1465 * reading them). To reduce our startup time, we can look at other
1466 * backends and adopt the maximum "pos" pointer of any backend that's in
1467 * our database; any notifications it's already advanced over are surely
1468 * committed and need not be re-examined by us. (We must consider only
1469 * backends connected to our DB, because others will not have bothered to
1470 * check committed-ness of notifications in our DB.)
1471 *
1472 * We need exclusive lock here so we can look at other backends' entries
1473 * and manipulate the list links.
1474 */
1476 head = QUEUE_HEAD;
1477 max = QUEUE_TAIL;
1480 {
1482 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1483 /* Also find last listening backend before this one */
1484 if (i < MyProcNumber)
1485 prevListener = i;
1486 }
1492 /* Insert backend into list of listeners at correct position */
1494 {
1497 }
1498 else
1499 {
1502 }
1504
1505 /* Now we are listed in the global array, so remember we're listening */
1506 amRegisteredListener = true;
1507
1508 /*
1509 * Try to move our pointer forward as far as possible. This will skip
1510 * over already-committed notifications, which we want to do because they
1511 * might be quite stale. Note that we are not yet listening on anything,
1512 * so we won't deliver such notifications to our frontend. Also, although
1513 * our transaction might have executed NOTIFY, those message(s) aren't
1514 * queued yet so we won't skip them here.
1515 */
1516 if (!QUEUE_POS_EQUAL(max, head))
1518}
1519
1520/*
1521 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1522 *
1523 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1524 * an entry in localChannelTable, and pre-allocating an entry in the shared
1525 * globalChannelTable with listening=false. The listening flag will be set
1526 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1527 * will be removed.
1528 */
1529static void
1530PrepareTableEntriesForListen(const char *channel)
1531{
1533 GlobalChannelEntry *entry;
1534 bool found;
1536 PendingListenEntry *pending;
1537
1538 /*
1539 * Record in local pending hash that we want to LISTEN, overwriting any
1540 * earlier attempt to UNLISTEN.
1541 */
1542 pending = (PendingListenEntry *)
1544 pending->action = PENDING_LISTEN;
1545
1546 /*
1547 * Ensure that there is an entry for the channel in localChannelTable.
1548 * (Should this fail, we can just roll back.) If the transaction fails
1549 * after this point, we will remove the entry if appropriate during
1550 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1551 * to return TRUE; we assume nothing is going to consult that before
1552 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1553 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1554 * present to ensure they do the right things; see
1555 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1556 */
1558
1559 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1560 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1561 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1562
1563 if (!found)
1564 {
1565 /* New channel entry, so initialize it to a safe state */
1567 entry->numListeners = 0;
1568 entry->allocatedListeners = 0;
1569 }
1570
1571 /*
1572 * Create listenersArray if entry doesn't have one. It's tempting to fold
1573 * this into the !found case, but this coding allows us to cope in case
1574 * dsa_allocate() failed in an earlier attempt.
1575 */
1576 if (!DsaPointerIsValid(entry->listenersArray))
1577 {
1581 }
1582
1585
1586 /*
1587 * Check if we already have a ListenerEntry (possibly from earlier in this
1588 * transaction)
1589 */
1590 for (int i = 0; i < entry->numListeners; i++)
1591 {
1592 if (listeners[i].procNo == MyProcNumber)
1593 {
1594 /* Already have an entry; listening flag stays as-is until commit */
1596 return;
1597 }
1598 }
1599
1600 /* Need to add a new entry; grow array if necessary */
1601 if (entry->numListeners >= entry->allocatedListeners)
1602 {
1603 int new_size = entry->allocatedListeners * 2;
1606 sizeof(ListenerEntry) * new_size);
1608
1610 entry->listenersArray = new_array;
1614 }
1615
1616 listeners[entry->numListeners].procNo = MyProcNumber;
1617 listeners[entry->numListeners].listening = false; /* staged, not yet
1618 * committed */
1619 entry->numListeners++;
1620
1622}
1623
1624/*
1625 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1626 *
1627 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1628 * we're currently listening (committed or staged). We don't touch
1629 * globalChannelTable yet - the listener keeps receiving signals until
1630 * commit, when the entry is removed.
1631 */
1632static void
1633PrepareTableEntriesForUnlisten(const char *channel)
1634{
1635 PendingListenEntry *pending;
1636
1637 /*
1638 * If the channel name is not in localChannelTable, then we are neither
1639 * listening on it nor preparing to listen on it, so we don't need to
1640 * record an UNLISTEN action.
1641 */
1643 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1644 return;
1645
1646 /*
1647 * Record in local pending hash that we want to UNLISTEN, overwriting any
1648 * earlier attempt to LISTEN. Don't touch localChannelTable or
1649 * globalChannelTable yet - we keep receiving signals until commit.
1650 */
1651 pending = (PendingListenEntry *)
1653 pending->action = PENDING_UNLISTEN;
1654}
1655
1656/*
1657 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1658 *
1659 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1660 * about-to-be-listened channels in pendingListenActions.
1661 */
1662static void
1664{
1667 PendingListenEntry *pending;
1668
1669 /*
1670 * Scan localChannelTable, which will have the names of all channels that
1671 * we are listening on or have prepared to listen on. Record an UNLISTEN
1672 * action for each one, overwriting any earlier attempt to LISTEN.
1673 */
1675 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1676 {
1677 pending = (PendingListenEntry *)
1679 pending->action = PENDING_UNLISTEN;
1680 }
1681}
1682
1683/*
1684 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1685 *
1686 * Decrements numListeners, compacts the array, and frees the entry if empty.
1687 * Sets *entry_ptr to NULL if the entry was deleted.
1688 *
1689 * We could get the listeners pointer from the entry, but all callers
1690 * already have it at hand.
1691 */
1692static void
1695 int idx)
1696{
1697 GlobalChannelEntry *entry = *entry_ptr;
1698
1699 entry->numListeners--;
1700 if (idx < entry->numListeners)
1702 sizeof(ListenerEntry) * (entry->numListeners - idx));
1703
1704 if (entry->numListeners == 0)
1705 {
1708 /* tells caller not to release the entry's lock: */
1709 *entry_ptr = NULL;
1710 }
1711}
1712
1713/*
1714 * ApplyPendingListenActions
1715 *
1716 * Apply, or revert, staged listen/unlisten changes to the local and global
1717 * hash tables.
1718 */
1719static void
1721{
1723 PendingListenEntry *pending;
1724
1725 /* Quick exit if nothing to do */
1727 return;
1728
1729 /* We made a globalChannelTable before building pendingListenActions */
1730 if (globalChannelTable == NULL)
1731 elog(PANIC, "global channel table missing post-commit/abort");
1732
1733 /* For each staged action ... */
1735 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1736 {
1738 GlobalChannelEntry *entry;
1739 bool removeLocal = true;
1740 bool foundListener = false;
1741
1742 /*
1743 * Find the global entry for this channel. If isCommit, it had better
1744 * exist (it was created in PreCommit). In an abort, it might not
1745 * exist, in which case we are not listening and should discard any
1746 * local entry that PreCommit may have managed to create.
1747 */
1748 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1749 entry = dshash_find(globalChannelTable, &key, true);
1750 if (entry != NULL)
1751 {
1752 /* Scan entry to find the ListenerEntry for this backend */
1754
1757
1758 for (int i = 0; i < entry->numListeners; i++)
1759 {
1760 if (listeners[i].procNo != MyProcNumber)
1761 continue;
1762 foundListener = true;
1763 if (isCommit)
1764 {
1765 if (pending->action == PENDING_LISTEN)
1766 {
1767 /*
1768 * LISTEN being committed: set listening=true.
1769 * localChannelTable entry was created during
1770 * PreCommit and should be kept.
1771 */
1772 listeners[i].listening = true;
1773 removeLocal = false;
1774 }
1775 else
1776 {
1777 /*
1778 * UNLISTEN being committed: remove pre-allocated
1779 * entries from both tables.
1780 */
1782 }
1783 }
1784 else
1785 {
1786 /*
1787 * Note: this part is reachable only if the transaction
1788 * aborts after PreCommit_Notify() has made some
1789 * pendingListenActions entries, so it's pretty hard to
1790 * test.
1791 */
1792 if (!listeners[i].listening)
1793 {
1794 /*
1795 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1796 * and we weren't listening before, so remove
1797 * pre-allocated entries from both tables.
1798 */
1800 }
1801 else
1802 {
1803 /*
1804 * We're aborting, but the previous state was that
1805 * we're listening, so keep localChannelTable entry.
1806 */
1807 removeLocal = false;
1808 }
1809 }
1810 break; /* there shouldn't be another match */
1811 }
1812
1813 /* We might have already released the entry by removing it */
1814 if (entry != NULL)
1816 }
1817
1818 /*
1819 * If we're committing a LISTEN action, we should have found a
1820 * matching ListenerEntry, but otherwise it's okay if we didn't.
1821 */
1822 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1823 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1824 pending->channel, MyProcNumber);
1825
1826 /*
1827 * If we did not find a globalChannelTable entry for our backend, or
1828 * if we are unlistening, remove any localChannelTable entry that may
1829 * exist. (Note in particular that this cleans up if we created a
1830 * localChannelTable entry and then failed while trying to create a
1831 * globalChannelTable entry.)
1832 */
1835 HASH_REMOVE, NULL);
1836 }
1837}
1838
1839/*
1840 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1841 *
1842 * Remove this backend from all channels in the shared global table.
1843 */
1844static void
1846{
1847 dshash_seq_status status;
1848 GlobalChannelEntry *entry;
1849
1850 if (Trace_notify)
1851 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1852
1853 /* Clear our local cache (not really necessary, but be consistent) */
1854 if (localChannelTable != NULL)
1855 {
1858 }
1859
1860 /* Now remove our entries from the shared globalChannelTable */
1861 if (globalChannelTable == NULL)
1862 return;
1863
1864 dshash_seq_init(&status, globalChannelTable, true);
1865 while ((entry = dshash_seq_next(&status)) != NULL)
1866 {
1868
1869 if (entry->key.dboid != MyDatabaseId)
1870 continue; /* not relevant */
1871
1874
1875 for (int i = 0; i < entry->numListeners; i++)
1876 {
1877 if (listeners[i].procNo == MyProcNumber)
1878 {
1879 entry->numListeners--;
1880 if (i < entry->numListeners)
1881 memmove(&listeners[i], &listeners[i + 1],
1882 sizeof(ListenerEntry) * (entry->numListeners - i));
1883
1884 if (entry->numListeners == 0)
1885 {
1887 dshash_delete_current(&status);
1888 }
1889 break;
1890 }
1891 }
1892 }
1893 dshash_seq_term(&status);
1894}
1895
1896/*
1897 * Test whether we are actively listening on the given channel name.
1898 *
1899 * Note: this function is executed for every notification found in the queue.
1900 */
1901static bool
1902IsListeningOn(const char *channel)
1903{
1904 if (localChannelTable == NULL)
1905 return false;
1906
1907 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1908}
1909
1910/*
1911 * Remove our entry from the listeners array when we are no longer listening
1912 * on any channel. NB: must not fail if we're already not listening.
1913 */
1914static void
1916{
1917 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1918
1919 if (!amRegisteredListener) /* nothing to do */
1920 return;
1921
1922 /*
1923 * Need exclusive lock here to manipulate list links.
1924 */
1926 /* Mark our entry as invalid */
1931 /* and remove it from the list */
1934 else
1935 {
1937 {
1939 {
1941 break;
1942 }
1943 }
1944 }
1947
1948 /* mark ourselves as no longer listed in the global array */
1949 amRegisteredListener = false;
1950}
1951
1952/*
1953 * Test whether there is room to insert more notification messages.
1954 *
1955 * Caller must hold at least shared NotifyQueueLock.
1956 */
1957static bool
1958asyncQueueIsFull(void)
1959{
1960 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1962 int64 occupied = headPage - tailPage;
1963
1965}
1966
1967/*
1968 * Advance the QueuePosition to the next entry, assuming that the current
1969 * entry is of length entryLength. If we jump to a new page the function
1970 * returns true, else false.
1971 */
1972static bool
1973asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1974{
1975 int64 pageno = QUEUE_POS_PAGE(*position);
1976 int offset = QUEUE_POS_OFFSET(*position);
1977 bool pageJump = false;
1978
1979 /*
1980 * Move to the next writing position: First jump over what we have just
1981 * written or read.
1982 */
1983 offset += entryLength;
1984 Assert(offset <= QUEUE_PAGESIZE);
1985
1986 /*
1987 * In a second step check if another entry can possibly be written to the
1988 * page. If so, stay here, we have reached the next position. If not, then
1989 * we need to move on to the next page.
1990 */
1992 {
1993 pageno++;
1994 offset = 0;
1995 pageJump = true;
1996 }
1997
1998 SET_QUEUE_POS(*position, pageno, offset);
1999 return pageJump;
2000}
2001
2002/*
2003 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
2004 */
2005static void
2007{
2008 size_t channellen = n->channel_len;
2009 size_t payloadlen = n->payload_len;
2010 int entryLength;
2011
2014
2015 /* The terminators are already included in AsyncQueueEntryEmptySize */
2018 qe->length = entryLength;
2019 qe->dboid = MyDatabaseId;
2020 qe->xid = GetCurrentTransactionId();
2021 qe->srcPid = MyProcPid;
2022 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2023}
2024
2025/*
2026 * Add pending notifications to the queue.
2027 *
2028 * We go page by page here, i.e. we stop once we have to go to a new page but
2029 * we will be called again and then fill that next page. If an entry does not
2030 * fit into the current page, we write a dummy entry with an InvalidOid as the
2031 * database OID in order to fill the page. So every page is always used up to
2032 * the last byte which simplifies reading the page later.
2033 *
2034 * We are passed the list cell (in pendingNotifies->events) containing the next
2035 * notification to write and return the first still-unwritten cell back.
2036 * Eventually we will return NULL indicating all is done.
2037 *
2038 * We are holding NotifyQueueLock already from the caller and grab
2039 * page specific SLRU bank lock locally in this function.
2040 */
2041static ListCell *
2043{
2046 int64 pageno;
2047 int offset;
2048 int slotno;
2050
2051 /*
2052 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2053 * memory upon exiting. The reason for this is that if we have to advance
2054 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2055 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2056 * subsequent insertions would try to put entries into a page that slru.c
2057 * thinks doesn't exist yet.) So, use a local position variable. Note
2058 * that if we do fail, any already-inserted queue entries are forgotten;
2059 * this is okay, since they'd be useless anyway after our transaction
2060 * rolls back.
2061 */
2063
2064 /*
2065 * If this is the first write since the postmaster started, we need to
2066 * initialize the first page of the async SLRU. Otherwise, the current
2067 * page should be initialized already, so just fetch it.
2068 */
2069 pageno = QUEUE_POS_PAGE(queue_head);
2071
2072 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2074
2077 else
2078 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2079
2080 /* Note we mark the page dirty before writing in it */
2081 NotifyCtl->shared->page_dirty[slotno] = true;
2082
2083 while (nextNotify != NULL)
2084 {
2086
2087 /* Construct a valid queue entry in local variable qe */
2089
2090 offset = QUEUE_POS_OFFSET(queue_head);
2091
2092 /* Check whether the entry really fits on the current page */
2093 if (offset + qe.length <= QUEUE_PAGESIZE)
2094 {
2095 /* OK, so advance nextNotify past this item */
2097 }
2098 else
2099 {
2100 /*
2101 * Write a dummy entry to fill up the page. Actually readers will
2102 * only check dboid and since it won't match any reader's database
2103 * OID, they will ignore this entry and move on.
2104 */
2105 qe.length = QUEUE_PAGESIZE - offset;
2106 qe.dboid = InvalidOid;
2108 qe.data[0] = '\0'; /* empty channel */
2109 qe.data[1] = '\0'; /* empty payload */
2110 }
2111
2112 /* Now copy qe into the shared buffer page */
2113 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2114 &qe,
2115 qe.length);
2116
2117 /* Advance queue_head appropriately, and detect if page is full */
2118 if (asyncQueueAdvance(&(queue_head), qe.length))
2119 {
2120 LWLock *lock;
2121
2122 pageno = QUEUE_POS_PAGE(queue_head);
2123 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2124 if (lock != prevlock)
2125 {
2128 prevlock = lock;
2129 }
2130
2131 /*
2132 * Page is full, so we're done here, but first fill the next page
2133 * with zeroes. The reason to do this is to ensure that slru.c's
2134 * idea of the head page is always the same as ours, which avoids
2135 * boundary problems in SimpleLruTruncate. The test in
2136 * asyncQueueIsFull() ensured that there is room to create this
2137 * page without overrunning the queue.
2138 */
2140
2141 /*
2142 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2143 * set flag to remember that we should try to advance the tail
2144 * pointer (we don't want to actually do that right here).
2145 */
2147 tryAdvanceTail = true;
2148
2149 /* And exit the loop */
2150 break;
2151 }
2152 }
2153
2154 /* Success, so update the global QUEUE_HEAD */
2156
2158
2159 return nextNotify;
2160}
2161
2162/*
2163 * SQL function to return the fraction of the notification queue currently
2164 * occupied.
2165 */
2166Datum
2168{
2169 double usage;
2170
2171 /* Advance the queue tail so we don't report a too-large result */
2173
2177
2179}
2180
2181/*
2182 * Return the fraction of the queue that is currently occupied.
2183 *
2184 * The caller must hold NotifyQueueLock in (at least) shared mode.
2185 *
2186 * Note: we measure the distance to the logical tail page, not the physical
2187 * tail page. In some sense that's wrong, but the relative position of the
2188 * physical tail is affected by details such as SLRU segment boundaries,
2189 * so that a result based on that is unpleasantly unstable.
2190 */
2191static double
2192asyncQueueUsage(void)
2193{
2194 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2196 int64 occupied = headPage - tailPage;
2197
2198 if (occupied == 0)
2199 return (double) 0; /* fast exit for common case */
2200
2201 return (double) occupied / (double) max_notify_queue_pages;
2202}
2203
2204/*
2205 * Check whether the queue is at least half full, and emit a warning if so.
2206 *
2207 * This is unlikely given the size of the queue, but possible.
2208 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2209 *
2210 * Caller must hold exclusive NotifyQueueLock.
2211 */
2212static void
2214{
2215 double fillDegree;
2216 TimestampTz t;
2217
2219 if (fillDegree < 0.5)
2220 return;
2221
2222 t = GetCurrentTimestamp();
2223
2226 {
2229
2231 {
2233 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2236 }
2237
2239 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2240 (minPid != InvalidPid ?
2241 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2242 : 0),
2243 (minPid != InvalidPid ?
2244 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2245 : 0)));
2246
2248 }
2249}
2250
2251/*
2252 * Send signals to listening backends.
2253 *
2254 * Normally we signal only backends that are interested in the notifies that
2255 * we just sent. However, that will leave idle listeners falling further and
2256 * further behind. Waken them anyway if they're far enough behind, so they'll
2257 * advance their queue position pointers, allowing the global tail to advance.
2258 *
2259 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2260 *
2261 * This is called during CommitTransaction(), so it's important for it
2262 * to have very low probability of failure.
2263 */
2264static void
2265SignalBackends(void)
2266{
2267 int count;
2268
2269 /* Can't get here without PreCommit_Notify having made the global table */
2271
2272 /* It should have set up these arrays, too */
2274
2275 /*
2276 * Identify backends that we need to signal. We don't want to send
2277 * signals while holding the NotifyQueueLock, so this part just builds a
2278 * list of target PIDs in signalPids[] and signalProcnos[].
2279 */
2280 count = 0;
2281
2283
2284 /* Scan each channel name that we notified in this transaction */
2286 {
2288 GlobalChannelEntry *entry;
2290
2291 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2292 entry = dshash_find(globalChannelTable, &key, false);
2293 if (entry == NULL)
2294 continue; /* nobody is listening */
2295
2297 entry->listenersArray);
2298
2299 /* Identify listeners that now need waking, add them to arrays */
2300 for (int j = 0; j < entry->numListeners; j++)
2301 {
2302 ProcNumber i;
2303 int32 pid;
2304 QueuePosition pos;
2305
2306 if (!listeners[j].listening)
2307 continue; /* ignore not-yet-committed listeners */
2308
2309 i = listeners[j].procNo;
2310
2312 continue; /* already signaled, no need to repeat */
2313
2314 pid = QUEUE_BACKEND_PID(i);
2315 pos = QUEUE_BACKEND_POS(i);
2316
2317 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2318 continue; /* it's fully caught up already */
2319
2320 Assert(pid != InvalidPid);
2321
2323 signalPids[count] = pid;
2324 signalProcnos[count] = i;
2325 count++;
2326 }
2327
2329 }
2330
2331 /*
2332 * Scan all listeners. Any that are not already pending wakeup must not
2333 * be interested in our notifications (else we'd have set their wakeup
2334 * flags above). Check to see if we can directly advance their queue
2335 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2336 * them anyway so they will catch up.
2337 */
2339 {
2340 int32 pid;
2341 QueuePosition pos;
2342
2344 continue;
2345
2346 /* If it's currently advancing, we should not touch it */
2348 continue;
2349
2350 pid = QUEUE_BACKEND_PID(i);
2351 pos = QUEUE_BACKEND_POS(i);
2352
2353 /*
2354 * We can directly advance the other backend's queue pointer if it's
2355 * not currently advancing (else there are race conditions), and its
2356 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2357 * it miss some older messages), and we'd not be moving the pointer
2358 * backward.
2359 */
2362 {
2363 /* We can directly advance its pointer past what we wrote */
2365 }
2368 {
2369 /* It's idle and far behind, so wake it up */
2370 Assert(pid != InvalidPid);
2371
2373 signalPids[count] = pid;
2374 signalProcnos[count] = i;
2375 count++;
2376 }
2377 }
2378
2380
2381 /* Now send signals */
2382 for (int i = 0; i < count; i++)
2383 {
2384 int32 pid = signalPids[i];
2385
2386 /*
2387 * If we are signaling our own process, no need to involve the kernel;
2388 * just set the flag directly.
2389 */
2390 if (pid == MyProcPid)
2391 {
2393 continue;
2394 }
2395
2396 /*
2397 * Note: assuming things aren't broken, a signal failure here could
2398 * only occur if the target backend exited since we released
2399 * NotifyQueueLock; which is unlikely but certainly possible. So we
2400 * just log a low-level debug message if it happens.
2401 */
2403 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2404 }
2405}
2406
2407/*
2408 * AtAbort_Notify
2409 *
2410 * This is called at transaction abort.
2411 *
2412 * Revert any staged listen/unlisten changes and clean up transaction state.
2413 * This only does anything if we abort after PreCommit_Notify has staged
2414 * some entries.
2415 */
2416void
2417AtAbort_Notify(void)
2418{
2419 /* Revert staged listen/unlisten changes */
2421
2422 /* If we're no longer listening on anything, unregister */
2425
2426 /* And clean up */
2428}
2429
2430/*
2431 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2432 *
2433 * Reassign all items in the pending lists to the parent transaction.
2434 */
2435void
2437{
2438 int my_level = GetCurrentTransactionNestLevel();
2439
2440 /* If there are actions at our nesting level, we must reparent them. */
2441 if (pendingActions != NULL &&
2442 pendingActions->nestingLevel >= my_level)
2443 {
2444 if (pendingActions->upper == NULL ||
2445 pendingActions->upper->nestingLevel < my_level - 1)
2446 {
2447 /* nothing to merge; give the whole thing to the parent */
2449 }
2450 else
2451 {
2453
2455
2456 /*
2457 * Mustn't try to eliminate duplicates here --- see queue_listen()
2458 */
2461 childPendingActions->actions);
2463 }
2464 }
2465
2466 /* If there are notifies at our nesting level, we must reparent them. */
2467 if (pendingNotifies != NULL &&
2468 pendingNotifies->nestingLevel >= my_level)
2469 {
2470 Assert(pendingNotifies->nestingLevel == my_level);
2471
2472 if (pendingNotifies->upper == NULL ||
2473 pendingNotifies->upper->nestingLevel < my_level - 1)
2474 {
2475 /* nothing to merge; give the whole thing to the parent */
2477 }
2478 else
2479 {
2480 /*
2481 * Formerly, we didn't bother to eliminate duplicates here, but
2482 * now we must, else we fall foul of "Assert(!found)", either here
2483 * or during a later attempt to build the parent-level hashtable.
2484 */
2486 ListCell *l;
2487
2489 /* Insert all the subxact's events into parent, except for dups */
2490 foreach(l, childPendingNotifies->events)
2491 {
2493
2496 }
2498 }
2499 }
2500}
2501
2502/*
2503 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2504 */
2505void
2507{
2508 int my_level = GetCurrentTransactionNestLevel();
2509
2510 /*
2511 * All we have to do is pop the stack --- the actions/notifies made in
2512 * this subxact are no longer interesting, and the space will be freed
2513 * when CurTransactionContext is recycled. We still have to free the
2514 * ActionList and NotificationList objects themselves, though, because
2515 * those are allocated in TopTransactionContext.
2516 *
2517 * Note that there might be no entries at all, or no entries for the
2518 * current subtransaction level, either because none were ever created, or
2519 * because we reentered this routine due to trouble during subxact abort.
2520 */
2521 while (pendingActions != NULL &&
2522 pendingActions->nestingLevel >= my_level)
2523 {
2525
2528 }
2529
2530 while (pendingNotifies != NULL &&
2531 pendingNotifies->nestingLevel >= my_level)
2532 {
2534
2537 }
2538}
2539
2540/*
2541 * HandleNotifyInterrupt
2542 *
2543 * Signal handler portion of interrupt handling. Let the backend know
2544 * that there's a pending notify interrupt. If we're currently reading
2545 * from the client, this will interrupt the read and
2546 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2547 */
2548void
2550{
2551 /*
2552 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2553 * you do here.
2554 */
2555
2556 /* signal that work needs to be done */
2558
2559 /* latch will be set by procsignal_sigusr1_handler */
2560}
2561
2562/*
2563 * ProcessNotifyInterrupt
2564 *
2565 * This is called if we see notifyInterruptPending set, just before
2566 * transmitting ReadyForQuery at the end of a frontend command, and
2567 * also if a notify signal occurs while reading from the frontend.
2568 * HandleNotifyInterrupt() will cause the read to be interrupted
2569 * via the process's latch, and this routine will get called.
2570 * If we are truly idle (ie, *not* inside a transaction block),
2571 * process the incoming notifies.
2572 *
2573 * If "flush" is true, force any frontend messages out immediately.
2574 * This can be false when being called at the end of a frontend command,
2575 * since we'll flush after sending ReadyForQuery.
2576 */
2577void
2578ProcessNotifyInterrupt(bool flush)
2579{
2581 return; /* not really idle */
2582
2583 /* Loop in case another signal arrives while sending messages */
2585 ProcessIncomingNotify(flush);
2586}
2587
2588
2589/*
2590 * Read all pending notifications from the queue, and deliver appropriate
2591 * ones to my frontend. Stop when we reach queue head or an uncommitted
2592 * notification.
2593 */
2594static void
2596{
2597 QueuePosition pos;
2598 QueuePosition head;
2599 Snapshot snapshot;
2600
2601 /*
2602 * Fetch current state, indicate to others that we have woken up, and that
2603 * we are in process of advancing our position.
2604 */
2606 /* Assert checks that we have a valid state entry */
2610 head = QUEUE_HEAD;
2611
2612 if (QUEUE_POS_EQUAL(pos, head))
2613 {
2614 /* Nothing to do, we have read all notifications already. */
2616 return;
2617 }
2618
2621
2622 /*----------
2623 * Get snapshot we'll use to decide which xacts are still in progress.
2624 * This is trickier than it might seem, because of race conditions.
2625 * Consider the following example:
2626 *
2627 * Backend 1: Backend 2:
2628 *
2629 * transaction starts
2630 * UPDATE foo SET ...;
2631 * NOTIFY foo;
2632 * commit starts
2633 * queue the notify message
2634 * transaction starts
2635 * LISTEN foo; -- first LISTEN in session
2636 * SELECT * FROM foo WHERE ...;
2637 * commit to clog
2638 * commit starts
2639 * add backend 2 to array of listeners
2640 * advance to queue head (this code)
2641 * commit to clog
2642 *
2643 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2644 * wasn't committed yet. Ideally we'd ensure that client 2 would
2645 * eventually get transaction 1's notify message, but there's no way
2646 * to do that; until we're in the listener array, there's no guarantee
2647 * that the notify message doesn't get removed from the queue.
2648 *
2649 * Therefore the coding technique transaction 2 is using is unsafe:
2650 * applications must commit a LISTEN before inspecting database state,
2651 * if they want to ensure they will see notifications about subsequent
2652 * changes to that state.
2653 *
2654 * What we do guarantee is that we'll see all notifications from
2655 * transactions committing after the snapshot we take here.
2656 * BecomeRegisteredListener has already added us to the listener array,
2657 * so no not-yet-committed messages can be removed from the queue
2658 * before we see them.
2659 *----------
2660 */
2661 snapshot = RegisterSnapshot(GetLatestSnapshot());
2662
2663 /*
2664 * It is possible that we fail while trying to send a message to our
2665 * frontend (for example, because of encoding conversion failure). If
2666 * that happens it is critical that we not try to send the same message
2667 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2668 * ERRORs to FATAL, causing the client connection to be closed on error.
2669 *
2670 * We used to only skip over the offending message and try to soldier on,
2671 * but it was somewhat questionable to lose a notification and give the
2672 * client an ERROR instead. A client application is not be prepared for
2673 * that and can't tell that a notification was missed. It was also not
2674 * very useful in practice because notifications are often processed while
2675 * a connection is idle and reading a message from the client, and in that
2676 * state, any error is upgraded to FATAL anyway. Closing the connection
2677 * is a clear signal to the application that it might have missed
2678 * notifications.
2679 */
2680 {
2682 bool reachedStop;
2683
2684 ExitOnAnyError = true;
2685
2686 do
2687 {
2688 /*
2689 * Process messages up to the stop position, end of page, or an
2690 * uncommitted message.
2691 *
2692 * Our stop position is what we found to be the head's position
2693 * when we entered this function. It might have changed already.
2694 * But if it has, we will receive (or have already received and
2695 * queued) another signal and come here again.
2696 *
2697 * We are not holding NotifyQueueLock here! The queue can only
2698 * extend beyond the head pointer (see above) and we leave our
2699 * backend's pointer where it is so nobody will truncate or
2700 * rewrite pages under us. Especially we don't want to hold a lock
2701 * while sending the notifications to the frontend.
2702 */
2703 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2704 } while (!reachedStop);
2705
2706 /* Update shared state */
2711
2713 }
2714
2715 /* Done with snapshot */
2716 UnregisterSnapshot(snapshot);
2717}
2718
2719/*
2720 * Fetch notifications from the shared queue, beginning at position current,
2721 * and deliver relevant ones to my frontend.
2722 *
2723 * The function returns true once we have reached the stop position or an
2724 * uncommitted notification, and false if we have finished with the page.
2725 * In other words: once it returns true there is no need to look further.
2726 * The QueuePosition *current is advanced past all processed messages.
2727 */
2728static bool
2730 QueuePosition stop,
2731 Snapshot snapshot)
2732{
2733 int64 curpage = QUEUE_POS_PAGE(*current);
2734 int slotno;
2735 char *page_buffer;
2736 bool reachedStop = false;
2737 bool reachedEndOfPage;
2738
2739 /*
2740 * We copy the entries into a local buffer to avoid holding the SLRU lock
2741 * while we transmit them to our frontend. The local buffer must be
2742 * adequately aligned.
2743 */
2745 char *local_buf_end = local_buf;
2746
2748 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2749
2750 do
2751 {
2752 QueuePosition thisentry = *current;
2754
2755 if (QUEUE_POS_EQUAL(thisentry, stop))
2756 break;
2757
2758 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2759
2760 /*
2761 * Advance *current over this message, possibly to the next page. As
2762 * noted in the comments for asyncQueueReadAllNotifications, we must
2763 * do this before possibly failing while processing the message.
2764 */
2765 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2766
2767 /* Ignore messages destined for other databases */
2768 if (qe->dboid == MyDatabaseId)
2769 {
2770 if (XidInMVCCSnapshot(qe->xid, snapshot))
2771 {
2772 /*
2773 * The source transaction is still in progress, so we can't
2774 * process this message yet. Break out of the loop, but first
2775 * back up *current so we will reprocess the message next
2776 * time. (Note: it is unlikely but not impossible for
2777 * TransactionIdDidCommit to fail, so we can't really avoid
2778 * this advance-then-back-up behavior when dealing with an
2779 * uncommitted message.)
2780 *
2781 * Note that we must test XidInMVCCSnapshot before we test
2782 * TransactionIdDidCommit, else we might return a message from
2783 * a transaction that is not yet visible to snapshots; compare
2784 * the comments at the head of heapam_visibility.c.
2785 *
2786 * Also, while our own xact won't be listed in the snapshot,
2787 * we need not check for TransactionIdIsCurrentTransactionId
2788 * because our transaction cannot (yet) have queued any
2789 * messages.
2790 */
2791 *current = thisentry;
2792 reachedStop = true;
2793 break;
2794 }
2795
2796 /*
2797 * Quick check for the case that we're not listening on any
2798 * channels, before calling TransactionIdDidCommit(). This makes
2799 * that case a little faster, but more importantly, it ensures
2800 * that if there's a bad entry in the queue for which
2801 * TransactionIdDidCommit() fails for some reason, we can skip
2802 * over it on the first LISTEN in a session, and not get stuck on
2803 * it indefinitely. (This is a little trickier than it looks: it
2804 * works because BecomeRegisteredListener runs this code before we
2805 * have made the first entry in localChannelTable.)
2806 */
2808 continue;
2809
2810 if (TransactionIdDidCommit(qe->xid))
2811 {
2812 memcpy(local_buf_end, qe, qe->length);
2813 local_buf_end += qe->length;
2814 }
2815 else
2816 {
2817 /*
2818 * The source transaction aborted or crashed, so we just
2819 * ignore its notifications.
2820 */
2821 }
2822 }
2823
2824 /* Loop back if we're not at end of page */
2825 } while (!reachedEndOfPage);
2826
2827 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2829
2830 /*
2831 * Now that we have let go of the SLRU bank lock, send the notifications
2832 * to our backend
2833 */
2835 for (char *p = local_buf; p < local_buf_end;)
2836 {
2838
2839 /* qe->data is the null-terminated channel name */
2840 char *channel = qe->data;
2841
2842 if (IsListeningOn(channel))
2843 {
2844 /* payload follows channel name */
2845 char *payload = qe->data + strlen(channel) + 1;
2846
2847 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2848 }
2849
2850 p += qe->length;
2851 }
2852
2853 if (QUEUE_POS_EQUAL(*current, stop))
2854 reachedStop = true;
2855
2856 return reachedStop;
2857}
2858
2859/*
2860 * Advance the shared queue tail variable to the minimum of all the
2861 * per-backend tail pointers. Truncate pg_notify space if possible.
2862 *
2863 * This is (usually) called during CommitTransaction(), so it's important for
2864 * it to have very low probability of failure.
2865 */
2866static void
2868{
2869 QueuePosition min;
2872 int64 boundary;
2873
2874 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2876
2877 /*
2878 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2879 * (ie, exactly match at least one backend's queue position), so it must
2880 * be updated atomically with the actual computation. Since v13, we could
2881 * get away with not doing it like that, but it seems prudent to keep it
2882 * so.
2883 *
2884 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2885 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2886 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2887 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2888 * there are pages we can truncate but haven't yet finished doing so.
2889 *
2890 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2891 * performing SimpleLruTruncate. This is OK because no backend will try
2892 * to access the pages we are in the midst of truncating.
2893 */
2895 min = QUEUE_HEAD;
2897 {
2899 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2900 }
2901 QUEUE_TAIL = min;
2904
2905 /*
2906 * We can truncate something if the global tail advanced across an SLRU
2907 * segment boundary.
2908 *
2909 * XXX it might be better to truncate only once every several segments, to
2910 * reduce the number of directory scans.
2911 */
2914 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2915 {
2916 /*
2917 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2918 * release the lock again.
2919 */
2921
2925 }
2926
2928}
2929
2930/*
2931 * AsyncNotifyFreezeXids
2932 *
2933 * Prepare the async notification queue for CLOG truncation by freezing
2934 * transaction IDs that are about to become inaccessible.
2935 *
2936 * This function is called by VACUUM before advancing datfrozenxid. It scans
2937 * the notification queue and replaces XIDs that would become inaccessible
2938 * after CLOG truncation with special markers:
2939 * - Committed transactions are set to FrozenTransactionId
2940 * - Aborted/crashed transactions are set to InvalidTransactionId
2941 *
2942 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2943 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2944 * (or it would have held back newFrozenXid through ProcArray).
2945 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2946 * either aborted explicitly or crashed, and we can safely mark it invalid.
2947 */
2948void
2950{
2951 QueuePosition pos;
2952 QueuePosition head;
2953 int64 curpage = -1;
2954 int slotno = -1;
2955 char *page_buffer = NULL;
2956 bool page_dirty = false;
2957
2958 /*
2959 * Acquire locks in the correct order to avoid deadlocks. As per the
2960 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2961 * bank locks.
2962 *
2963 * We only need SHARED mode since we're just reading the head/tail
2964 * positions, not modifying them.
2965 */
2968
2969 pos = QUEUE_TAIL;
2970 head = QUEUE_HEAD;
2971
2972 /* Release NotifyQueueLock early, we only needed to read the positions */
2974
2975 /*
2976 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2977 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2978 * we're working.
2979 */
2980 while (!QUEUE_POS_EQUAL(pos, head))
2981 {
2983 TransactionId xid;
2984 int64 pageno = QUEUE_POS_PAGE(pos);
2985 int offset = QUEUE_POS_OFFSET(pos);
2986
2987 /* If we need a different page, release old lock and get new one */
2988 if (pageno != curpage)
2989 {
2990 LWLock *lock;
2991
2992 /* Release previous page if any */
2993 if (slotno >= 0)
2994 {
2995 if (page_dirty)
2996 {
2997 NotifyCtl->shared->page_dirty[slotno] = true;
2998 page_dirty = false;
2999 }
3001 }
3002
3003 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3005 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3006 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3007 curpage = pageno;
3008 }
3009
3010 qe = (AsyncQueueEntry *) (page_buffer + offset);
3011 xid = qe->xid;
3012
3013 if (TransactionIdIsNormal(xid) &&
3015 {
3016 if (TransactionIdDidCommit(xid))
3017 {
3018 qe->xid = FrozenTransactionId;
3019 page_dirty = true;
3020 }
3021 else
3022 {
3023 qe->xid = InvalidTransactionId;
3024 page_dirty = true;
3025 }
3026 }
3027
3028 /* Advance to next entry */
3029 asyncQueueAdvance(&pos, qe->length);
3030 }
3031
3032 /* Release final page lock if we acquired one */
3033 if (slotno >= 0)
3034 {
3035 if (page_dirty)
3036 NotifyCtl->shared->page_dirty[slotno] = true;
3038 }
3039
3041}
3042
3043/*
3044 * ProcessIncomingNotify
3045 *
3046 * Scan the queue for arriving notifications and report them to the front
3047 * end. The notifications might be from other sessions, or our own;
3048 * there's no need to distinguish here.
3049 *
3050 * If "flush" is true, force any frontend messages out immediately.
3051 *
3052 * NOTE: since we are outside any transaction, we must create our own.
3053 */
3054static void
3055ProcessIncomingNotify(bool flush)
3056{
3057 /* We *must* reset the flag */
3058 notifyInterruptPending = false;
3059
3060 /* Do nothing else if we aren't actively listening */
3062 return;
3063
3064 if (Trace_notify)
3065 elog(DEBUG1, "ProcessIncomingNotify");
3066
3067 set_ps_display("notify interrupt");
3068
3069 /*
3070 * We must run asyncQueueReadAllNotifications inside a transaction, else
3071 * bad things happen if it gets an error.
3072 */
3074
3076
3078
3079 /*
3080 * If this isn't an end-of-command case, we must flush the notify messages
3081 * to ensure frontend gets them promptly.
3082 */
3083 if (flush)
3084 pq_flush();
3085
3086 set_ps_display("idle");
3087
3088 if (Trace_notify)
3089 elog(DEBUG1, "ProcessIncomingNotify: done");
3090}
3091
3092/*
3093 * Send NOTIFY message to my front end.
3094 */
3095void
3096NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3097{
3099 {
3101
3103 pq_sendint32(&buf, srcPid);
3104 pq_sendstring(&buf, channel);
3105 pq_sendstring(&buf, payload);
3107
3108 /*
3109 * NOTE: we do not do pq_flush() here. Some level of caller will
3110 * handle it later, allowing this message to be combined into a packet
3111 * with other ones.
3112 */
3113 }
3114 else
3115 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3116}
3117
3118/* Does pendingNotifies include a match for the given event? */
3119static bool
3121{
3122 if (pendingNotifies == NULL)
3123 return false;
3124
3126 {
3127 /* Use the hash table to probe for a match */
3129 &n,
3130 HASH_FIND,
3131 NULL))
3132 return true;
3133 }
3134 else
3135 {
3136 /* Must scan the event list */
3137 ListCell *l;
3138
3139 foreach(l, pendingNotifies->events)
3140 {
3142
3143 if (n->channel_len == oldn->channel_len &&
3144 n->payload_len == oldn->payload_len &&
3145 memcmp(n->data, oldn->data,
3146 n->channel_len + n->payload_len + 2) == 0)
3147 return true;
3148 }
3149 }
3150
3151 return false;
3152}
3153
3154/*
3155 * Add a notification event to a pre-existing pendingNotifies list.
3156 *
3157 * Because pendingNotifies->events is already nonempty, this works
3158 * correctly no matter what CurrentMemoryContext is.
3159 */
3160static void
3162{
3164
3165 /* Create the hash tables if it's time to */
3168 {
3170 ListCell *l;
3171
3172 /* Create the hash table */
3173 hash_ctl.keysize = sizeof(Notification *);
3174 hash_ctl.entrysize = sizeof(struct NotificationHash);
3179 hash_create("Pending Notifies",
3180 256L,
3181 &hash_ctl,
3183
3184 /* Create the unique channel name table */
3186 hash_ctl.keysize = NAMEDATALEN;
3187 hash_ctl.entrysize = sizeof(ChannelName);
3190 hash_create("Pending Notify Channel Names",
3191 64L,
3192 &hash_ctl,
3194
3195 /* Insert all the already-existing events */
3196 foreach(l, pendingNotifies->events)
3197 {
3199 char *channel = oldn->data;
3200 bool found;
3201
3203 &oldn,
3204 HASH_ENTER,
3205 &found);
3206 Assert(!found);
3207
3208 /* Add channel name to uniqueChannelHash; might be there already */
3210 channel,
3211 HASH_ENTER,
3212 NULL);
3213 }
3214 }
3215
3216 /* Add new event to the list, in order */
3218
3219 /* Add event to the hash tables if needed */
3221 {
3222 char *channel = n->data;
3223 bool found;
3224
3226 &n,
3227 HASH_ENTER,
3228 &found);
3229 Assert(!found);
3230
3231 /* Add channel name to uniqueChannelHash; might be there already */
3233 channel,
3234 HASH_ENTER,
3235 NULL);
3236 }
3237}
3238
3239/*
3240 * notification_hash: hash function for notification hash table
3241 *
3242 * The hash "keys" are pointers to Notification structs.
3243 */
3244static uint32
3245notification_hash(const void *key, Size keysize)
3246{
3247 const Notification *k = *(const Notification *const *) key;
3248
3249 Assert(keysize == sizeof(Notification *));
3250 /* We don't bother to include the payload's trailing null in the hash */
3251 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3252 k->channel_len + k->payload_len + 1));
3253}
3254
3255/*
3256 * notification_match: match function to use with notification_hash
3257 */
3258static int
3259notification_match(const void *key1, const void *key2, Size keysize)
3260{
3261 const Notification *k1 = *(const Notification *const *) key1;
3262 const Notification *k2 = *(const Notification *const *) key2;
3263
3264 Assert(keysize == sizeof(Notification *));
3265 if (k1->channel_len == k2->channel_len &&
3266 k1->payload_len == k2->payload_len &&
3267 memcmp(k1->data, k2->data,
3268 k1->channel_len + k1->payload_len + 2) == 0)
3269 return 0; /* equal */
3270 return 1; /* not equal */
3271}
3272
3273/* Clear the pendingActions and pendingNotifies lists. */
3274static void
3276{
3277 /*
3278 * Everything's allocated in either TopTransactionContext or the context
3279 * for the subtransaction to which it corresponds. So, there's nothing to
3280 * do here except reset the pointers; the space will be reclaimed when the
3281 * contexts are deleted.
3282 */
3285 /* Also clear pendingListenActions, which is derived from pendingActions */
3287}
3288
3289/*
3290 * GUC check_hook for notify_buffers
3291 */
3292bool
3293check_notify_buffers(int *newval, void **extra, GucSource source)
3294{
3295 return check_slru_buffers("notify_buffers", newval);
3296}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2266
static double asyncQueueUsage(void)
Definition async.c:2193
#define MIN_HASHABLE_NOTIFIES
Definition async.c:527
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1531
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:2007
#define QUEUE_FIRST_LISTENER
Definition async.c:361
#define QUEUE_POS_MAX(x, y)
Definition async.c:260
static bool tryAdvanceTail
Definition async.c:578
void HandleNotifyInterrupt(void)
Definition async.c:2550
static void BecomeRegisteredListener(void)
Definition async.c:1430
static void asyncQueueAdvanceTail(void)
Definition async.c:2868
int max_notify_queue_pages
Definition async.c:584
static ActionList * pendingActions
Definition async.c:458
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1721
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:367
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3246
void Async_UnlistenAll(void)
Definition async.c:1075
static int32 * signalPids
Definition async.c:574
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3097
void AtCommit_Notify(void)
Definition async.c:1378
#define QUEUE_POS_MIN(x, y)
Definition async.c:254
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1634
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2579
ListenActionKind
Definition async.c:439
@ LISTEN_LISTEN
Definition async.c:440
@ LISTEN_UNLISTEN_ALL
Definition async.c:442
@ LISTEN_UNLISTEN
Definition async.c:441
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3121
#define QUEUE_BACKEND_POS(i)
Definition async.c:365
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:685
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:392
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3260
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:672
#define SET_QUEUE_POS(x, y, z)
Definition async.c:241
static ProcNumber * signalProcnos
Definition async.c:575
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2730
static void ProcessIncomingNotify(bool flush)
Definition async.c:3056
static void asyncQueueReadAllNotifications(void)
Definition async.c:2596
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1147
#define QUEUE_POS_OFFSET(x)
Definition async.c:239
static QueuePosition queueHeadAfterWrite
Definition async.c:567
static int asyncQueueErrdetailForIoError(const void *opaque_data)
Definition async.c:627
bool Trace_notify
Definition async.c:581
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2043
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3276
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1095
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:860
static NotificationList * pendingNotifies
Definition async.c:534
#define AsyncQueueEntryEmptySize
Definition async.c:227
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3162
static AsyncQueueControl * asyncQueueControl
Definition async.c:347
static bool unlistenExitRegistered
Definition async.c:555
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:650
static dsa_area * globalChannelDSA
Definition async.c:415
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1974
#define QUEUE_TAIL
Definition async.c:359
void AtAbort_Notify(void)
Definition async.c:2418
#define QUEUE_POS_PAGE(x)
Definition async.c:238
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1694
void PreCommit_Notify(void)
Definition async.c:1185
#define QUEUE_CLEANUP_DELAY
Definition async.c:282
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1664
static void asyncQueueFillWarning(void)
Definition async.c:2214
#define QUEUE_BACKEND_PID(i)
Definition async.c:362
static SlruDesc NotifySlruDesc
Definition async.c:375
static void AsyncShmemRequest(void *arg)
Definition async.c:798
static void CleanupListenersOnExit(void)
Definition async.c:1846
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:660
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:381
void Async_Unlisten(const char *channel)
Definition async.c:1057
static HTAB * pendingListenActions
Definition async.c:480
void Async_Listen(const char *channel)
Definition async.c:1043
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:201
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:250
static void initGlobalChannelTable(void)
Definition async.c:699
#define NotifyCtl
Definition async.c:378
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:366
static HTAB * localChannelTable
Definition async.c:422
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:640
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:996
#define QUEUEALIGN(len)
Definition async.c:225
static bool amRegisteredListener
Definition async.c:558
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:266
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:364
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:363
void AtSubAbort_Notify(void)
Definition async.c:2507
void AtPrepare_Notify(void)
Definition async.c:1160
#define QUEUE_PAGESIZE
Definition async.c:379
void AtSubCommit_Notify(void)
Definition async.c:2437
static bool asyncQueueIsFull(void)
Definition async.c:1959
#define QUEUE_HEAD
Definition async.c:358
static void AsyncShmemInit(void *arg)
Definition async.c:829
static void initLocalChannelTable(void)
Definition async.c:750
PendingListenAction
Definition async.c:469
@ PENDING_UNLISTEN
Definition async.c:471
@ PENDING_LISTEN
Definition async.c:470
static dshash_table * globalChannelTable
Definition async.c:414
static void asyncQueueUnregister(void)
Definition async.c:1916
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2168
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:247
#define LocalChannelTableIsEmpty()
Definition async.c:425
static void initPendingListenActions(void)
Definition async.c:776
static QueuePosition queueHeadBeforeWrite
Definition async.c:566
static bool IsListeningOn(const char *channel)
Definition async.c:1903
void Async_Notify(const char *channel, const char *payload)
Definition async.c:894
volatile sig_atomic_t notifyInterruptPending
Definition async.c:552
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2950
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3294
#define QUEUE_STOP_PAGE
Definition async.c:360
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1775
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
int32_t int32
Definition c.h:620
uint16_t uint16
Definition c.h:623
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:611
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:562
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:579
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:659
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:394
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:371
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:274
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:768
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:678
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:210
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:593
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:778
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
uint32 dshash_hash
Definition dshash.h:30
#define dshash_find_or_insert(hash_table, key, found)
Definition dshash.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
void hash_destroy(HTAB *hashp)
Definition dynahash.c:802
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1317
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:29
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define INFO
Definition elog.h:35
#define ereport(elevel,...)
Definition elog.h:152
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:49
ProcNumber MyProcNumber
Definition globals.c:92
int MaxBackends
Definition globals.c:149
bool ExitOnAnyError
Definition globals.c:125
int notify_buffers
Definition globals.c:167
Oid MyDatabaseId
Definition globals.c:96
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:91
@ HASH_FIND
Definition hsearch.h:108
@ HASH_REMOVE
Definition hsearch.h:110
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_CONTEXT
Definition hsearch.h:97
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_COMPARE
Definition hsearch.h:94
#define HASH_FUNCTION
Definition hsearch.h:93
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:375
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:97
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:288
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176
bool SlruScanDirectory(SlruDesc *ctl, SlruScanCallback callback, void *data)
Definition slru.c:1844
int SimpleLruReadPage_ReadOnly(SlruDesc *ctl, int64 pageno, const void *opaque_data)
Definition slru.c:654
void SimpleLruTruncate(SlruDesc *ctl, int64 cutoffPage)
Definition slru.c:1458
int SimpleLruZeroPage(SlruDesc *ctl, int64 pageno)
Definition slru.c:397
bool SlruScanDirCbDeleteAll(SlruDesc *ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1797
int SimpleLruReadPage(SlruDesc *ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:550
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:377
#define SimpleLruRequest(...)
Definition slru.h:218
static LWLock * SimpleLruGetBankLock(SlruDesc *ctl, int64 pageno)
Definition slru.h:207
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:454
int nestingLevel
Definition async.c:453
struct ActionList * upper
Definition async.c:455
dshash_table_handle globalChannelTableDSH
Definition async.c:342
TimestampTz lastQueueFillWarn
Definition async.c:340
dsa_handle globalChannelTableDSA
Definition async.c:341
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:221
char channel[NAMEDATALEN]
Definition async.c:542
dsa_pointer listenersArray
Definition async.c:409
int allocatedListeners
Definition async.c:411
GlobalChannelKey key
Definition async.c:408
char channel[NAMEDATALEN]
Definition async.c:397
Size keysize
Definition hsearch.h:69
Definition pg_list.h:54
bool listening
Definition async.c:403
Notification * event
Definition async.c:531
List * uniqueChannelNames
Definition async.c:522
HTAB * uniqueChannelHash
Definition async.c:523
HTAB * hashtab
Definition async.c:521
List * events
Definition async.c:520
struct NotificationList * upper
Definition async.c:524
uint16 payload_len
Definition async.c:512
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:514
uint16 channel_len
Definition async.c:511
PendingListenAction action
Definition async.c:477
char channel[NAMEDATALEN]
Definition async.c:476
int64 page
Definition async.c:234
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:446
char * text_to_cstring(const text *t)
Definition varlena.c:217
const char * name
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040
int GetCurrentTransactionNestLevel(void)
Definition xact.c:931
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207
TransactionId GetCurrentTransactionId(void)
Definition xact.c:456

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 527 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 201 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifySlruDesc)

Definition at line 378 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 363 of file async.c.

◆ QUEUE_BACKEND_IS_ADVANCING

#define QUEUE_BACKEND_IS_ADVANCING (   i)    (asyncQueueControl->backend[i].isAdvancing)

Definition at line 367 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 362 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 365 of file async.c.

◆ QUEUE_BACKEND_WAKEUP_PENDING

#define QUEUE_BACKEND_WAKEUP_PENDING (   i)    (asyncQueueControl->backend[i].wakeupPending)

Definition at line 366 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 282 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 361 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 381 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 358 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 364 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 379 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 247 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 250 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
int y
Definition isn.c:76
int x
Definition isn.c:75

Definition at line 260 of file async.c.

261 : \
262 (x).page != (y).page ? (x) : \
263 (x).offset > (y).offset ? (x) : (y))

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 254 of file async.c.

255 : \
256 (x).page != (y).page ? (y) : \
257 (x).offset < (y).offset ? (x) : (y))

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 239 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 238 of file async.c.

◆ QUEUE_POS_PRECEDES

#define QUEUE_POS_PRECEDES (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) || \
((x).page == (y).page && (x).offset < (y).offset))

Definition at line 266 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 360 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 359 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 225 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 241 of file async.c.

242 { \
243 (x).page = (y); \
244 (x).offset = (z); \
245 } while (0)

Typedef Documentation

◆ ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ ChannelName

◆ GlobalChannelEntry

◆ GlobalChannelKey

◆ ListenerEntry

◆ Notification

◆ NotificationList

◆ PendingListenEntry

◆ QueueBackendStatus

◆ QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 438 of file async.c.

◆ PendingListenAction

Enumerator
PENDING_LISTEN 
PENDING_UNLISTEN 

Definition at line 468 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 3162 of file async.c.

3163{
3165
3166 /* Create the hash tables if it's time to */
3169 {
3171 ListCell *l;
3172
3173 /* Create the hash table */
3174 hash_ctl.keysize = sizeof(Notification *);
3175 hash_ctl.entrysize = sizeof(struct NotificationHash);
3180 hash_create("Pending Notifies",
3181 256L,
3182 &hash_ctl,
3184
3185 /* Create the unique channel name table */
3187 hash_ctl.keysize = NAMEDATALEN;
3188 hash_ctl.entrysize = sizeof(ChannelName);
3191 hash_create("Pending Notify Channel Names",
3192 64L,
3193 &hash_ctl,
3195
3196 /* Insert all the already-existing events */
3197 foreach(l, pendingNotifies->events)
3198 {
3200 char *channel = oldn->data;
3201 bool found;
3202
3204 &oldn,
3205 HASH_ENTER,
3206 &found);
3207 Assert(!found);
3208
3209 /* Add channel name to uniqueChannelHash; might be there already */
3211 channel,
3212 HASH_ENTER,
3213 NULL);
3214 }
3215 }
3216
3217 /* Add new event to the list, in order */
3219
3220 /* Add event to the hash tables if needed */
3222 {
3223 char *channel = n->data;
3224 bool found;
3225
3227 &n,
3228 HASH_ENTER,
3229 &found);
3230 Assert(!found);
3231
3232 /* Add channel name to uniqueChannelHash; might be there already */
3234 channel,
3235 HASH_ENTER,
3236 NULL);
3237 }
3238}

References Assert, CurTransactionContext, Notification::data, NotificationList::events, fb(), HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), HASH_STRINGS, NotificationList::hashtab, lappend(), lfirst, list_length(), MIN_HASHABLE_NOTIFIES, NAMEDATALEN, NIL, notification_hash(), notification_match(), pendingNotifies, and NotificationList::uniqueChannelHash.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ ApplyPendingListenActions()

static void ApplyPendingListenActions ( bool  isCommit)
static

Definition at line 1721 of file async.c.

1722{
1724 PendingListenEntry *pending;
1725
1726 /* Quick exit if nothing to do */
1728 return;
1729
1730 /* We made a globalChannelTable before building pendingListenActions */
1731 if (globalChannelTable == NULL)
1732 elog(PANIC, "global channel table missing post-commit/abort");
1733
1734 /* For each staged action ... */
1736 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1737 {
1739 GlobalChannelEntry *entry;
1740 bool removeLocal = true;
1741 bool foundListener = false;
1742
1743 /*
1744 * Find the global entry for this channel. If isCommit, it had better
1745 * exist (it was created in PreCommit). In an abort, it might not
1746 * exist, in which case we are not listening and should discard any
1747 * local entry that PreCommit may have managed to create.
1748 */
1749 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1750 entry = dshash_find(globalChannelTable, &key, true);
1751 if (entry != NULL)
1752 {
1753 /* Scan entry to find the ListenerEntry for this backend */
1755
1758
1759 for (int i = 0; i < entry->numListeners; i++)
1760 {
1761 if (listeners[i].procNo != MyProcNumber)
1762 continue;
1763 foundListener = true;
1764 if (isCommit)
1765 {
1766 if (pending->action == PENDING_LISTEN)
1767 {
1768 /*
1769 * LISTEN being committed: set listening=true.
1770 * localChannelTable entry was created during
1771 * PreCommit and should be kept.
1772 */
1773 listeners[i].listening = true;
1774 removeLocal = false;
1775 }
1776 else
1777 {
1778 /*
1779 * UNLISTEN being committed: remove pre-allocated
1780 * entries from both tables.
1781 */
1783 }
1784 }
1785 else
1786 {
1787 /*
1788 * Note: this part is reachable only if the transaction
1789 * aborts after PreCommit_Notify() has made some
1790 * pendingListenActions entries, so it's pretty hard to
1791 * test.
1792 */
1793 if (!listeners[i].listening)
1794 {
1795 /*
1796 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1797 * and we weren't listening before, so remove
1798 * pre-allocated entries from both tables.
1799 */
1801 }
1802 else
1803 {
1804 /*
1805 * We're aborting, but the previous state was that
1806 * we're listening, so keep localChannelTable entry.
1807 */
1808 removeLocal = false;
1809 }
1810 }
1811 break; /* there shouldn't be another match */
1812 }
1813
1814 /* We might have already released the entry by removing it */
1815 if (entry != NULL)
1817 }
1818
1819 /*
1820 * If we're committing a LISTEN action, we should have found a
1821 * matching ListenerEntry, but otherwise it's okay if we didn't.
1822 */
1823 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1824 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1825 pending->channel, MyProcNumber);
1826
1827 /*
1828 * If we did not find a globalChannelTable entry for our backend, or
1829 * if we are unlistening, remove any localChannelTable entry that may
1830 * exist. (Note in particular that this cleans up if we created a
1831 * localChannelTable entry and then failed while trying to create a
1832 * globalChannelTable entry.)
1833 */
1836 HASH_REMOVE, NULL);
1837 }
1838}

References PendingListenEntry::action, PendingListenEntry::channel, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_REMOVE, hash_search(), hash_seq_init(), hash_seq_search(), i, GlobalChannelEntry::listenersArray, ListenerEntry::listening, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PANIC, PENDING_LISTEN, pendingListenActions, and RemoveListenerFromChannel().

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char channel)

Definition at line 1043 of file async.c.

1044{
1045 if (Trace_notify)
1046 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1047
1048 queue_listen(LISTEN_LISTEN, channel);
1049}

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char channel,
const char payload 
)

Definition at line 894 of file async.c.

895{
896 int my_level = GetCurrentTransactionNestLevel();
897 size_t channel_len;
898 size_t payload_len;
899 Notification *n;
900 MemoryContext oldcontext;
901
902 if (IsParallelWorker())
903 elog(ERROR, "cannot send notifications from a parallel worker");
904
905 if (Trace_notify)
906 elog(DEBUG1, "Async_Notify(%s)", channel);
907
908 channel_len = channel ? strlen(channel) : 0;
909 payload_len = payload ? strlen(payload) : 0;
910
911 /* a channel name must be specified */
912 if (channel_len == 0)
915 errmsg("channel name cannot be empty")));
916
917 /* enforce length limits */
918 if (channel_len >= NAMEDATALEN)
921 errmsg("channel name too long")));
922
923 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
926 errmsg("payload string too long")));
927
928 /*
929 * We must construct the Notification entry, even if we end up not using
930 * it, in order to compare it cheaply to existing list entries.
931 *
932 * The notification list needs to live until end of transaction, so store
933 * it in the transaction context.
934 */
936
938 channel_len + payload_len + 2);
939 n->channel_len = channel_len;
940 n->payload_len = payload_len;
941 strcpy(n->data, channel);
942 if (payload)
943 strcpy(n->data + channel_len + 1, payload);
944 else
945 n->data[channel_len + 1] = '\0';
946
947 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
948 {
950
951 /*
952 * First notify event in current (sub)xact. Note that we allocate the
953 * NotificationList in TopTransactionContext; the nestingLevel might
954 * get changed later by AtSubCommit_Notify.
955 */
958 sizeof(NotificationList));
959 notifies->nestingLevel = my_level;
960 notifies->events = list_make1(n);
961 /* We certainly don't need a hashtable yet */
962 notifies->hashtab = NULL;
963 /* We won't build uniqueChannelNames/Hash till later, either */
964 notifies->uniqueChannelNames = NIL;
965 notifies->uniqueChannelHash = NULL;
966 notifies->upper = pendingNotifies;
968 }
969 else
970 {
971 /* Now check for duplicates */
973 {
974 /* It's a dup, so forget it */
975 pfree(n);
976 MemoryContextSwitchTo(oldcontext);
977 return;
978 }
979
980 /* Append more events to existing list */
982 }
983
984 MemoryContextSwitchTo(oldcontext);
985}

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg, ERROR, fb(), GetCurrentTransactionNestLevel(), IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NIL, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char channel)

Definition at line 1057 of file async.c.

1058{
1059 if (Trace_notify)
1060 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1061
1062 /* If we couldn't possibly be listening, no need to queue anything */
1064 return;
1065
1066 queue_listen(LISTEN_UNLISTEN, channel);
1067}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 1075 of file async.c.

1076{
1077 if (Trace_notify)
1078 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1079
1080 /* If we couldn't possibly be listening, no need to queue anything */
1082 return;
1083
1085}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 1147 of file async.c.

1148{
1151}

References asyncQueueUnregister(), and CleanupListenersOnExit().

Referenced by BecomeRegisteredListener().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 3121 of file async.c.

3122{
3123 if (pendingNotifies == NULL)
3124 return false;
3125
3127 {
3128 /* Use the hash table to probe for a match */
3130 &n,
3131 HASH_FIND,
3132 NULL))
3133 return true;
3134 }
3135 else
3136 {
3137 /* Must scan the event list */
3138 ListCell *l;
3139
3140 foreach(l, pendingNotifies->events)
3141 {
3143
3144 if (n->channel_len == oldn->channel_len &&
3145 n->payload_len == oldn->payload_len &&
3146 memcmp(n->data, oldn->data,
3147 n->channel_len + n->payload_len + 2) == 0)
3148 return true;
3149 }
3150 }
3151
3152 return false;
3153}

References Notification::channel_len, Notification::data, NotificationList::events, fb(), HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ AsyncNotifyFreezeXids()

void AsyncNotifyFreezeXids ( TransactionId  newFrozenXid)

Definition at line 2950 of file async.c.

2951{
2952 QueuePosition pos;
2953 QueuePosition head;
2954 int64 curpage = -1;
2955 int slotno = -1;
2956 char *page_buffer = NULL;
2957 bool page_dirty = false;
2958
2959 /*
2960 * Acquire locks in the correct order to avoid deadlocks. As per the
2961 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2962 * bank locks.
2963 *
2964 * We only need SHARED mode since we're just reading the head/tail
2965 * positions, not modifying them.
2966 */
2969
2970 pos = QUEUE_TAIL;
2971 head = QUEUE_HEAD;
2972
2973 /* Release NotifyQueueLock early, we only needed to read the positions */
2975
2976 /*
2977 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2978 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2979 * we're working.
2980 */
2981 while (!QUEUE_POS_EQUAL(pos, head))
2982 {
2984 TransactionId xid;
2985 int64 pageno = QUEUE_POS_PAGE(pos);
2986 int offset = QUEUE_POS_OFFSET(pos);
2987
2988 /* If we need a different page, release old lock and get new one */
2989 if (pageno != curpage)
2990 {
2991 LWLock *lock;
2992
2993 /* Release previous page if any */
2994 if (slotno >= 0)
2995 {
2996 if (page_dirty)
2997 {
2998 NotifyCtl->shared->page_dirty[slotno] = true;
2999 page_dirty = false;
3000 }
3002 }
3003
3004 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3006 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3007 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3008 curpage = pageno;
3009 }
3010
3011 qe = (AsyncQueueEntry *) (page_buffer + offset);
3012 xid = qe->xid;
3013
3014 if (TransactionIdIsNormal(xid) &&
3016 {
3017 if (TransactionIdDidCommit(xid))
3018 {
3019 qe->xid = FrozenTransactionId;
3020 page_dirty = true;
3021 }
3022 else
3023 {
3024 qe->xid = InvalidTransactionId;
3025 page_dirty = true;
3026 }
3027 }
3028
3029 /* Advance to next entry */
3030 asyncQueueAdvance(&pos, qe->length);
3031 }
3032
3033 /* Release final page lock if we acquired one */
3034 if (slotno >= 0)
3035 {
3036 if (page_dirty)
3037 NotifyCtl->shared->page_dirty[slotno] = true;
3039 }
3040
3042}

References asyncQueueAdvance(), fb(), FrozenTransactionId, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruGetBankLock(), SimpleLruReadPage(), TransactionIdDidCommit(), TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by vac_truncate_clog().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 2043 of file async.c.

2044{
2047 int64 pageno;
2048 int offset;
2049 int slotno;
2051
2052 /*
2053 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2054 * memory upon exiting. The reason for this is that if we have to advance
2055 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2056 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2057 * subsequent insertions would try to put entries into a page that slru.c
2058 * thinks doesn't exist yet.) So, use a local position variable. Note
2059 * that if we do fail, any already-inserted queue entries are forgotten;
2060 * this is okay, since they'd be useless anyway after our transaction
2061 * rolls back.
2062 */
2064
2065 /*
2066 * If this is the first write since the postmaster started, we need to
2067 * initialize the first page of the async SLRU. Otherwise, the current
2068 * page should be initialized already, so just fetch it.
2069 */
2070 pageno = QUEUE_POS_PAGE(queue_head);
2072
2073 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2075
2078 else
2079 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2080
2081 /* Note we mark the page dirty before writing in it */
2082 NotifyCtl->shared->page_dirty[slotno] = true;
2083
2084 while (nextNotify != NULL)
2085 {
2087
2088 /* Construct a valid queue entry in local variable qe */
2090
2091 offset = QUEUE_POS_OFFSET(queue_head);
2092
2093 /* Check whether the entry really fits on the current page */
2094 if (offset + qe.length <= QUEUE_PAGESIZE)
2095 {
2096 /* OK, so advance nextNotify past this item */
2098 }
2099 else
2100 {
2101 /*
2102 * Write a dummy entry to fill up the page. Actually readers will
2103 * only check dboid and since it won't match any reader's database
2104 * OID, they will ignore this entry and move on.
2105 */
2106 qe.length = QUEUE_PAGESIZE - offset;
2107 qe.dboid = InvalidOid;
2109 qe.data[0] = '\0'; /* empty channel */
2110 qe.data[1] = '\0'; /* empty payload */
2111 }
2112
2113 /* Now copy qe into the shared buffer page */
2114 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2115 &qe,
2116 qe.length);
2117
2118 /* Advance queue_head appropriately, and detect if page is full */
2119 if (asyncQueueAdvance(&(queue_head), qe.length))
2120 {
2121 LWLock *lock;
2122
2123 pageno = QUEUE_POS_PAGE(queue_head);
2124 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2125 if (lock != prevlock)
2126 {
2129 prevlock = lock;
2130 }
2131
2132 /*
2133 * Page is full, so we're done here, but first fill the next page
2134 * with zeroes. The reason to do this is to ensure that slru.c's
2135 * idea of the head page is always the same as ours, which avoids
2136 * boundary problems in SimpleLruTruncate. The test in
2137 * asyncQueueIsFull() ensured that there is room to create this
2138 * page without overrunning the queue.
2139 */
2141
2142 /*
2143 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2144 * set flag to remember that we should try to advance the tail
2145 * pointer (we don't want to actually do that right here).
2146 */
2148 tryAdvanceTail = true;
2149
2150 /* And exit the loop */
2151 break;
2152 }
2153 }
2154
2155 /* Success, so update the global QUEUE_HEAD */
2157
2159
2160 return nextNotify;
2161}

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), NotificationList::events, fb(), InvalidOid, InvalidTransactionId, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), memcpy(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1974 of file async.c.

1975{
1976 int64 pageno = QUEUE_POS_PAGE(*position);
1977 int offset = QUEUE_POS_OFFSET(*position);
1978 bool pageJump = false;
1979
1980 /*
1981 * Move to the next writing position: First jump over what we have just
1982 * written or read.
1983 */
1984 offset += entryLength;
1985 Assert(offset <= QUEUE_PAGESIZE);
1986
1987 /*
1988 * In a second step check if another entry can possibly be written to the
1989 * page. If so, stay here, we have reached the next position. If not, then
1990 * we need to move on to the next page.
1991 */
1993 {
1994 pageno++;
1995 offset = 0;
1996 pageJump = true;
1997 }
1998
1999 SET_QUEUE_POS(*position, pageno, offset);
2000 return pageJump;
2001}

References Assert, AsyncQueueEntryEmptySize, fb(), QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by AsyncNotifyFreezeXids(), asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2868 of file async.c.

2869{
2870 QueuePosition min;
2873 int64 boundary;
2874
2875 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2877
2878 /*
2879 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2880 * (ie, exactly match at least one backend's queue position), so it must
2881 * be updated atomically with the actual computation. Since v13, we could
2882 * get away with not doing it like that, but it seems prudent to keep it
2883 * so.
2884 *
2885 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2886 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2887 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2888 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2889 * there are pages we can truncate but haven't yet finished doing so.
2890 *
2891 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2892 * performing SimpleLruTruncate. This is OK because no backend will try
2893 * to access the pages we are in the midst of truncating.
2894 */
2896 min = QUEUE_HEAD;
2898 {
2900 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2901 }
2902 QUEUE_TAIL = min;
2905
2906 /*
2907 * We can truncate something if the global tail advanced across an SLRU
2908 * segment boundary.
2909 *
2910 * XXX it might be better to truncate only once every several segments, to
2911 * reduce the number of directory scans.
2912 */
2915 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2916 {
2917 /*
2918 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2919 * release the lock again.
2920 */
2922
2926 }
2927
2929}

References Assert, asyncQueuePagePrecedes(), fb(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueErrdetailForIoError()

static int asyncQueueErrdetailForIoError ( const void opaque_data)
static

Definition at line 627 of file async.c.

628{
629 const QueuePosition *position = opaque_data;
630
631 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
632 position->page, position->offset);
633}

References errdetail(), fb(), QueuePosition::offset, and QueuePosition::page.

Referenced by AsyncShmemRequest().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 2214 of file async.c.

2215{
2216 double fillDegree;
2217 TimestampTz t;
2218
2220 if (fillDegree < 0.5)
2221 return;
2222
2223 t = GetCurrentTimestamp();
2224
2227 {
2230
2232 {
2234 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2237 }
2238
2240 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2241 (minPid != InvalidPid ?
2242 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2243 : 0),
2244 (minPid != InvalidPid ?
2245 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2246 : 0)));
2247
2249 }
2250}

References Assert, asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg, fb(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1959 of file async.c.

1960{
1961 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1963 int64 occupied = headPage - tailPage;
1964
1966}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 2007 of file async.c.

2008{
2009 size_t channellen = n->channel_len;
2010 size_t payloadlen = n->payload_len;
2011 int entryLength;
2012
2015
2016 /* The terminators are already included in AsyncQueueEntryEmptySize */
2019 qe->length = entryLength;
2020 qe->dboid = MyDatabaseId;
2021 qe->xid = GetCurrentTransactionId();
2022 qe->srcPid = MyProcPid;
2023 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2024}

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, fb(), GetCurrentTransactionId(), memcpy(), MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, and QUEUEALIGN.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 640 of file async.c.

641{
642 return p - q;
643}

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 650 of file async.c.

651{
652 return p < q;
653}

Referenced by asyncQueueAdvanceTail(), and AsyncShmemRequest().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( QueuePosition current,
QueuePosition  stop,
Snapshot  snapshot 
)
static

Definition at line 2730 of file async.c.

2733{
2734 int64 curpage = QUEUE_POS_PAGE(*current);
2735 int slotno;
2736 char *page_buffer;
2737 bool reachedStop = false;
2738 bool reachedEndOfPage;
2739
2740 /*
2741 * We copy the entries into a local buffer to avoid holding the SLRU lock
2742 * while we transmit them to our frontend. The local buffer must be
2743 * adequately aligned.
2744 */
2746 char *local_buf_end = local_buf;
2747
2749 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2750
2751 do
2752 {
2753 QueuePosition thisentry = *current;
2755
2756 if (QUEUE_POS_EQUAL(thisentry, stop))
2757 break;
2758
2759 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2760
2761 /*
2762 * Advance *current over this message, possibly to the next page. As
2763 * noted in the comments for asyncQueueReadAllNotifications, we must
2764 * do this before possibly failing while processing the message.
2765 */
2766 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2767
2768 /* Ignore messages destined for other databases */
2769 if (qe->dboid == MyDatabaseId)
2770 {
2771 if (XidInMVCCSnapshot(qe->xid, snapshot))
2772 {
2773 /*
2774 * The source transaction is still in progress, so we can't
2775 * process this message yet. Break out of the loop, but first
2776 * back up *current so we will reprocess the message next
2777 * time. (Note: it is unlikely but not impossible for
2778 * TransactionIdDidCommit to fail, so we can't really avoid
2779 * this advance-then-back-up behavior when dealing with an
2780 * uncommitted message.)
2781 *
2782 * Note that we must test XidInMVCCSnapshot before we test
2783 * TransactionIdDidCommit, else we might return a message from
2784 * a transaction that is not yet visible to snapshots; compare
2785 * the comments at the head of heapam_visibility.c.
2786 *
2787 * Also, while our own xact won't be listed in the snapshot,
2788 * we need not check for TransactionIdIsCurrentTransactionId
2789 * because our transaction cannot (yet) have queued any
2790 * messages.
2791 */
2792 *current = thisentry;
2793 reachedStop = true;
2794 break;
2795 }
2796
2797 /*
2798 * Quick check for the case that we're not listening on any
2799 * channels, before calling TransactionIdDidCommit(). This makes
2800 * that case a little faster, but more importantly, it ensures
2801 * that if there's a bad entry in the queue for which
2802 * TransactionIdDidCommit() fails for some reason, we can skip
2803 * over it on the first LISTEN in a session, and not get stuck on
2804 * it indefinitely. (This is a little trickier than it looks: it
2805 * works because BecomeRegisteredListener runs this code before we
2806 * have made the first entry in localChannelTable.)
2807 */
2809 continue;
2810
2811 if (TransactionIdDidCommit(qe->xid))
2812 {
2813 memcpy(local_buf_end, qe, qe->length);
2814 local_buf_end += qe->length;
2815 }
2816 else
2817 {
2818 /*
2819 * The source transaction aborted or crashed, so we just
2820 * ignore its notifications.
2821 */
2822 }
2823 }
2824
2825 /* Loop back if we're not at end of page */
2826 } while (!reachedEndOfPage);
2827
2828 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2830
2831 /*
2832 * Now that we have let go of the SLRU bank lock, send the notifications
2833 * to our backend
2834 */
2836 for (char *p = local_buf; p < local_buf_end;)
2837 {
2839
2840 /* qe->data is the null-terminated channel name */
2841 char *channel = qe->data;
2842
2843 if (IsListeningOn(channel))
2844 {
2845 /* payload follows channel name */
2846 char *payload = qe->data + strlen(channel) + 1;
2847
2848 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2849 }
2850
2851 p += qe->length;
2852 }
2853
2854 if (QUEUE_POS_EQUAL(*current, stop))
2855 reachedStop = true;
2856
2857 return reachedStop;
2858}

References Assert, asyncQueueAdvance(), AsyncQueueEntry::data, fb(), IsListeningOn(), LocalChannelTableIsEmpty, LWLockRelease(), memcpy(), MyDatabaseId, NotifyCtl, NotifyMyFrontEnd(), QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), TransactionIdDidCommit(), and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 2596 of file async.c.

2597{
2598 QueuePosition pos;
2599 QueuePosition head;
2600 Snapshot snapshot;
2601
2602 /*
2603 * Fetch current state, indicate to others that we have woken up, and that
2604 * we are in process of advancing our position.
2605 */
2607 /* Assert checks that we have a valid state entry */
2611 head = QUEUE_HEAD;
2612
2613 if (QUEUE_POS_EQUAL(pos, head))
2614 {
2615 /* Nothing to do, we have read all notifications already. */
2617 return;
2618 }
2619
2622
2623 /*----------
2624 * Get snapshot we'll use to decide which xacts are still in progress.
2625 * This is trickier than it might seem, because of race conditions.
2626 * Consider the following example:
2627 *
2628 * Backend 1: Backend 2:
2629 *
2630 * transaction starts
2631 * UPDATE foo SET ...;
2632 * NOTIFY foo;
2633 * commit starts
2634 * queue the notify message
2635 * transaction starts
2636 * LISTEN foo; -- first LISTEN in session
2637 * SELECT * FROM foo WHERE ...;
2638 * commit to clog
2639 * commit starts
2640 * add backend 2 to array of listeners
2641 * advance to queue head (this code)
2642 * commit to clog
2643 *
2644 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2645 * wasn't committed yet. Ideally we'd ensure that client 2 would
2646 * eventually get transaction 1's notify message, but there's no way
2647 * to do that; until we're in the listener array, there's no guarantee
2648 * that the notify message doesn't get removed from the queue.
2649 *
2650 * Therefore the coding technique transaction 2 is using is unsafe:
2651 * applications must commit a LISTEN before inspecting database state,
2652 * if they want to ensure they will see notifications about subsequent
2653 * changes to that state.
2654 *
2655 * What we do guarantee is that we'll see all notifications from
2656 * transactions committing after the snapshot we take here.
2657 * BecomeRegisteredListener has already added us to the listener array,
2658 * so no not-yet-committed messages can be removed from the queue
2659 * before we see them.
2660 *----------
2661 */
2662 snapshot = RegisterSnapshot(GetLatestSnapshot());
2663
2664 /*
2665 * It is possible that we fail while trying to send a message to our
2666 * frontend (for example, because of encoding conversion failure). If
2667 * that happens it is critical that we not try to send the same message
2668 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2669 * ERRORs to FATAL, causing the client connection to be closed on error.
2670 *
2671 * We used to only skip over the offending message and try to soldier on,
2672 * but it was somewhat questionable to lose a notification and give the
2673 * client an ERROR instead. A client application is not be prepared for
2674 * that and can't tell that a notification was missed. It was also not
2675 * very useful in practice because notifications are often processed while
2676 * a connection is idle and reading a message from the client, and in that
2677 * state, any error is upgraded to FATAL anyway. Closing the connection
2678 * is a clear signal to the application that it might have missed
2679 * notifications.
2680 */
2681 {
2683 bool reachedStop;
2684
2685 ExitOnAnyError = true;
2686
2687 do
2688 {
2689 /*
2690 * Process messages up to the stop position, end of page, or an
2691 * uncommitted message.
2692 *
2693 * Our stop position is what we found to be the head's position
2694 * when we entered this function. It might have changed already.
2695 * But if it has, we will receive (or have already received and
2696 * queued) another signal and come here again.
2697 *
2698 * We are not holding NotifyQueueLock here! The queue can only
2699 * extend beyond the head pointer (see above) and we leave our
2700 * backend's pointer where it is so nobody will truncate or
2701 * rewrite pages under us. Especially we don't want to hold a lock
2702 * while sending the notifications to the frontend.
2703 */
2704 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2705 } while (!reachedStop);
2706
2707 /* Update shared state */
2712
2714 }
2715
2716 /* Done with snapshot */
2717 UnregisterSnapshot(snapshot);
2718}

References Assert, asyncQueueProcessPageEntries(), ExitOnAnyError, fb(), GetLatestSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_HEAD, QUEUE_POS_EQUAL, RegisterSnapshot(), and UnregisterSnapshot().

Referenced by BecomeRegisteredListener(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1916 of file async.c.

1917{
1918 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1919
1920 if (!amRegisteredListener) /* nothing to do */
1921 return;
1922
1923 /*
1924 * Need exclusive lock here to manipulate list links.
1925 */
1927 /* Mark our entry as invalid */
1932 /* and remove it from the list */
1935 else
1936 {
1938 {
1940 {
1942 break;
1943 }
1944 }
1945 }
1948
1949 /* mark ourselves as no longer listed in the global array */
1950 amRegisteredListener = false;
1951}

References amRegisteredListener, Assert, fb(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, LocalChannelTableIsEmpty, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 2193 of file async.c.

2194{
2195 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2197 int64 occupied = headPage - tailPage;
2198
2199 if (occupied == 0)
2200 return (double) 0; /* fast exit for common case */
2201
2202 return (double) occupied / (double) max_notify_queue_pages;
2203}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

◆ AsyncShmemRequest()

static void AsyncShmemRequest ( void arg)
static

Definition at line 798 of file async.c.

799{
800 Size size;
801
802 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
803 size = add_size(size, offsetof(AsyncQueueControl, backend));
804
805 ShmemRequestStruct(.name = "Async Queue Control",
806 .size = size,
807 .ptr = (void **) &asyncQueueControl,
808 );
809
811 .name = "notify",
812 .Dir = "pg_notify",
813
814 /* long segment names are used in order to avoid wraparound */
815 .long_segment_names = true,
816
817 .nslots = notify_buffers,
818
819 .sync_handler = SYNC_HANDLER_NONE,
820 .PagePrecedes = asyncQueuePagePrecedes,
821 .errdetail_for_io_error = asyncQueueErrdetailForIoError,
822
823 .buffer_tranche_id = LWTRANCHE_NOTIFY_BUFFER,
824 .bank_tranche_id = LWTRANCHE_NOTIFY_SLRU,
825 );
826}

References add_size(), asyncQueueControl, asyncQueueErrdetailForIoError(), asyncQueuePagePrecedes(), fb(), MaxBackends, mul_size(), name, notify_buffers, NotifySlruDesc, ShmemRequestStruct, SimpleLruRequest, and SYNC_HANDLER_NONE.

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 2418 of file async.c.

2419{
2420 /* Revert staged listen/unlisten changes */
2422
2423 /* If we're no longer listening on anything, unregister */
2426
2427 /* And clean up */
2429}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and LocalChannelTableIsEmpty.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1378 of file async.c.

1379{
1380 /*
1381 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1382 * return as soon as possible
1383 */
1385 return;
1386
1387 if (Trace_notify)
1388 elog(DEBUG1, "AtCommit_Notify");
1389
1390 /* Apply staged listen/unlisten changes */
1392
1393 /* If no longer listening to anything, get out of listener array */
1396
1397 /*
1398 * Send signals to listening backends. We need do this only if there are
1399 * pending notifies, which were previously added to the shared queue by
1400 * PreCommit_Notify().
1401 */
1402 if (pendingNotifies != NULL)
1404
1405 /*
1406 * If it's time to try to advance the global tail pointer, do that.
1407 *
1408 * (It might seem odd to do this in the sender, when more than likely the
1409 * listeners won't yet have read the messages we just sent. However,
1410 * there's less contention if only the sender does it, and there is little
1411 * need for urgency in advancing the global tail. So this typically will
1412 * be clearing out messages that were sent some time ago.)
1413 */
1414 if (tryAdvanceTail)
1415 {
1416 tryAdvanceTail = false;
1418 }
1419
1420 /* And clean up */
1422}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueAdvanceTail(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), DEBUG1, elog, fb(), LocalChannelTableIsEmpty, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 1160 of file async.c.

1161{
1162 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1164 ereport(ERROR,
1166 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1167}

References ereport, errcode(), errmsg, ERROR, fb(), pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 2507 of file async.c.

2508{
2509 int my_level = GetCurrentTransactionNestLevel();
2510
2511 /*
2512 * All we have to do is pop the stack --- the actions/notifies made in
2513 * this subxact are no longer interesting, and the space will be freed
2514 * when CurTransactionContext is recycled. We still have to free the
2515 * ActionList and NotificationList objects themselves, though, because
2516 * those are allocated in TopTransactionContext.
2517 *
2518 * Note that there might be no entries at all, or no entries for the
2519 * current subtransaction level, either because none were ever created, or
2520 * because we reentered this routine due to trouble during subxact abort.
2521 */
2522 while (pendingActions != NULL &&
2523 pendingActions->nestingLevel >= my_level)
2524 {
2526
2529 }
2530
2531 while (pendingNotifies != NULL &&
2532 pendingNotifies->nestingLevel >= my_level)
2533 {
2535
2538 }
2539}

References fb(), GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 2437 of file async.c.

2438{
2439 int my_level = GetCurrentTransactionNestLevel();
2440
2441 /* If there are actions at our nesting level, we must reparent them. */
2442 if (pendingActions != NULL &&
2443 pendingActions->nestingLevel >= my_level)
2444 {
2445 if (pendingActions->upper == NULL ||
2446 pendingActions->upper->nestingLevel < my_level - 1)
2447 {
2448 /* nothing to merge; give the whole thing to the parent */
2450 }
2451 else
2452 {
2454
2456
2457 /*
2458 * Mustn't try to eliminate duplicates here --- see queue_listen()
2459 */
2462 childPendingActions->actions);
2464 }
2465 }
2466
2467 /* If there are notifies at our nesting level, we must reparent them. */
2468 if (pendingNotifies != NULL &&
2469 pendingNotifies->nestingLevel >= my_level)
2470 {
2471 Assert(pendingNotifies->nestingLevel == my_level);
2472
2473 if (pendingNotifies->upper == NULL ||
2474 pendingNotifies->upper->nestingLevel < my_level - 1)
2475 {
2476 /* nothing to merge; give the whole thing to the parent */
2478 }
2479 else
2480 {
2481 /*
2482 * Formerly, we didn't bother to eliminate duplicates here, but
2483 * now we must, else we fall foul of "Assert(!found)", either here
2484 * or during a later attempt to build the parent-level hashtable.
2485 */
2487 ListCell *l;
2488
2490 /* Insert all the subxact's events into parent, except for dups */
2491 foreach(l, childPendingNotifies->events)
2492 {
2494
2497 }
2499 }
2500 }
2501}

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), fb(), GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ BecomeRegisteredListener()

static void BecomeRegisteredListener ( void  )
static

Definition at line 1430 of file async.c.

1431{
1432 QueuePosition head;
1433 QueuePosition max;
1435
1436 /*
1437 * Nothing to do if we are already listening to something, nor if we
1438 * already ran this routine in this transaction.
1439 */
1441 return;
1442
1443 if (Trace_notify)
1444 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1445
1446 /*
1447 * Before registering, make sure we will unlisten before dying. (Note:
1448 * this action does not get undone if we abort later.)
1449 */
1451 {
1454 }
1455
1456 /*
1457 * This is our first LISTEN, so establish our pointer.
1458 *
1459 * We set our pointer to the global tail pointer and then move it forward
1460 * over already-committed notifications. This ensures we cannot miss any
1461 * not-yet-committed notifications. We might get a few more but that
1462 * doesn't hurt.
1463 *
1464 * In some scenarios there might be a lot of committed notifications that
1465 * have not yet been pruned away (because some backend is being lazy about
1466 * reading them). To reduce our startup time, we can look at other
1467 * backends and adopt the maximum "pos" pointer of any backend that's in
1468 * our database; any notifications it's already advanced over are surely
1469 * committed and need not be re-examined by us. (We must consider only
1470 * backends connected to our DB, because others will not have bothered to
1471 * check committed-ness of notifications in our DB.)
1472 *
1473 * We need exclusive lock here so we can look at other backends' entries
1474 * and manipulate the list links.
1475 */
1477 head = QUEUE_HEAD;
1478 max = QUEUE_TAIL;
1481 {
1483 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1484 /* Also find last listening backend before this one */
1485 if (i < MyProcNumber)
1486 prevListener = i;
1487 }
1493 /* Insert backend into list of listeners at correct position */
1495 {
1498 }
1499 else
1500 {
1503 }
1505
1506 /* Now we are listed in the global array, so remember we're listening */
1507 amRegisteredListener = true;
1508
1509 /*
1510 * Try to move our pointer forward as far as possible. This will skip
1511 * over already-committed notifications, which we want to do because they
1512 * might be quite stale. Note that we are not yet listening on anything,
1513 * so we won't deliver such notifications to our frontend. Also, although
1514 * our transaction might have executed NOTIFY, those message(s) aren't
1515 * queued yet so we won't skip them here.
1516 */
1517 if (!QUEUE_POS_EQUAL(max, head))
1519}

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, fb(), i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ check_notify_buffers()

bool check_notify_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 3294 of file async.c.

3295{
3296 return check_slru_buffers("notify_buffers", newval);
3297}

References check_slru_buffers(), and newval.

◆ CleanupListenersOnExit()

static void CleanupListenersOnExit ( void  )
static

Definition at line 1846 of file async.c.

1847{
1848 dshash_seq_status status;
1849 GlobalChannelEntry *entry;
1850
1851 if (Trace_notify)
1852 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1853
1854 /* Clear our local cache (not really necessary, but be consistent) */
1855 if (localChannelTable != NULL)
1856 {
1859 }
1860
1861 /* Now remove our entries from the shared globalChannelTable */
1862 if (globalChannelTable == NULL)
1863 return;
1864
1865 dshash_seq_init(&status, globalChannelTable, true);
1866 while ((entry = dshash_seq_next(&status)) != NULL)
1867 {
1869
1870 if (entry->key.dboid != MyDatabaseId)
1871 continue; /* not relevant */
1872
1875
1876 for (int i = 0; i < entry->numListeners; i++)
1877 {
1878 if (listeners[i].procNo == MyProcNumber)
1879 {
1880 entry->numListeners--;
1881 if (i < entry->numListeners)
1882 memmove(&listeners[i], &listeners[i + 1],
1883 sizeof(ListenerEntry) * (entry->numListeners - i));
1884
1885 if (entry->numListeners == 0)
1886 {
1888 dshash_delete_current(&status);
1889 }
1890 break;
1891 }
1892 }
1893 }
1894 dshash_seq_term(&status);
1895}

References GlobalChannelKey::dboid, DEBUG1, dsa_free(), dsa_get_address(), dshash_delete_current(), dshash_seq_init(), dshash_seq_next(), dshash_seq_term(), elog, fb(), globalChannelDSA, globalChannelTable, hash_destroy(), i, GlobalChannelEntry::key, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, MyProcPid, GlobalChannelEntry::numListeners, and Trace_notify.

Referenced by Async_UnlistenOnExit().

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 3276 of file async.c.

3277{
3278 /*
3279 * Everything's allocated in either TopTransactionContext or the context
3280 * for the subtransaction to which it corresponds. So, there's nothing to
3281 * do here except reset the pointers; the space will be reclaimed when the
3282 * contexts are deleted.
3283 */
3286 /* Also clear pendingListenActions, which is derived from pendingActions */
3288}

References fb(), pendingActions, pendingListenActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ GlobalChannelKeyInit()

static void GlobalChannelKeyInit ( GlobalChannelKey key,
Oid  dboid,
const char channel 
)
inlinestatic

Definition at line 660 of file async.c.

661{
662 memset(key, 0, sizeof(GlobalChannelKey));
663 key->dboid = dboid;
664 strlcpy(key->channel, channel, NAMEDATALEN);
665}

References fb(), NAMEDATALEN, and strlcpy().

Referenced by ApplyPendingListenActions(), PrepareTableEntriesForListen(), and SignalBackends().

◆ globalChannelTableHash()

static dshash_hash globalChannelTableHash ( const void key,
size_t  size,
void arg 
)
static

Definition at line 672 of file async.c.

673{
674 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
675 dshash_hash h;
676
678 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
680
681 return h;
682}

References GlobalChannelKey::channel, DatumGetUInt32(), GlobalChannelKey::dboid, fb(), hash_any(), hash_uint32(), and NAMEDATALEN.

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 2550 of file async.c.

2551{
2552 /*
2553 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2554 * you do here.
2555 */
2556
2557 /* signal that work needs to be done */
2559
2560 /* latch will be set by procsignal_sigusr1_handler */
2561}

References notifyInterruptPending.

Referenced by procsignal_sigusr1_handler().

◆ initGlobalChannelTable()

static void initGlobalChannelTable ( void  )
static

Definition at line 699 of file async.c.

700{
701 MemoryContext oldcontext;
702
703 /* Quick exit if we already did this */
706 return;
707
708 /* Otherwise, use a lock to ensure only one process creates the table */
710
711 /* Be sure any local memory allocated by DSA routines is persistent */
713
715 {
716 /* Initialize dynamic shared hash table for global channels */
722 NULL);
723
724 /* Store handles in shared memory for other backends to use */
728 }
729 else if (!globalChannelTable)
730 {
731 /* Attach to existing dynamic shared hash table */
737 NULL);
738 }
739
740 MemoryContextSwitchTo(oldcontext);
742}

References asyncQueueControl, dsa_attach(), dsa_create, dsa_get_handle(), dsa_pin(), dsa_pin_mapping(), dshash_attach(), dshash_create(), dshash_get_hash_table_handle(), DSHASH_HANDLE_INVALID, fb(), globalChannelDSA, globalChannelTable, AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, globalChannelTableDSHParams, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MemoryContextSwitchTo(), and TopMemoryContext.

Referenced by PreCommit_Notify().

◆ initLocalChannelTable()

static void initLocalChannelTable ( void  )
static

Definition at line 750 of file async.c.

751{
753
754 /* Quick exit if we already did this */
755 if (localChannelTable != NULL)
756 return;
757
758 /* Initialize local hash table for this backend's listened channels */
760 hash_ctl.entrysize = sizeof(ChannelName);
761
763 hash_create("Local Listen Channels",
764 64,
765 &hash_ctl,
767}

References fb(), hash_create(), HASH_ELEM, HASH_STRINGS, HASHCTL::keysize, localChannelTable, and NAMEDATALEN.

Referenced by PreCommit_Notify().

◆ initPendingListenActions()

static void initPendingListenActions ( void  )
static

◆ IsListeningOn()

static bool IsListeningOn ( const char channel)
static

Definition at line 1903 of file async.c.

1904{
1905 if (localChannelTable == NULL)
1906 return false;
1907
1908 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1909}

References fb(), HASH_FIND, hash_search(), and localChannelTable.

Referenced by asyncQueueProcessPageEntries().

◆ notification_hash()

static uint32 notification_hash ( const void key,
Size  keysize 
)
static

Definition at line 3246 of file async.c.

3247{
3248 const Notification *k = *(const Notification *const *) key;
3249
3250 Assert(keysize == sizeof(Notification *));
3251 /* We don't bother to include the payload's trailing null in the hash */
3252 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3253 k->channel_len + k->payload_len + 1));
3254}

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void key1,
const void key2,
Size  keysize 
)
static

Definition at line 3260 of file async.c.

3261{
3262 const Notification *k1 = *(const Notification *const *) key1;
3263 const Notification *k2 = *(const Notification *const *) key2;
3264
3265 Assert(keysize == sizeof(Notification *));
3266 if (k1->channel_len == k2->channel_len &&
3267 k1->payload_len == k2->payload_len &&
3268 memcmp(k1->data, k2->data,
3269 k1->channel_len + k1->payload_len + 2) == 0)
3270 return 0; /* equal */
3271 return 1; /* not equal */
3272}

References Assert, and fb().

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char channel,
const char payload,
int32  srcPid 
)

Definition at line 3097 of file async.c.

3098{
3100 {
3102
3104 pq_sendint32(&buf, srcPid);
3105 pq_sendstring(&buf, channel);
3106 pq_sendstring(&buf, payload);
3108
3109 /*
3110 * NOTE: we do not do pq_flush() here. Some level of caller will
3111 * handle it later, allowing this message to be combined into a packet
3112 * with other ones.
3113 */
3114 }
3115 else
3116 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3117}

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 1095 of file async.c.

1096{
1098 HASH_SEQ_STATUS *status;
1099
1100 /* stuff done only on the first call of the function */
1101 if (SRF_IS_FIRSTCALL())
1102 {
1103 /* create a function context for cross-call persistence */
1105
1106 /* Initialize hash table iteration if we have any channels */
1107 if (localChannelTable != NULL)
1108 {
1109 MemoryContext oldcontext;
1110
1111 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1112 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1114 funcctx->user_fctx = status;
1115 MemoryContextSwitchTo(oldcontext);
1116 }
1117 else
1118 {
1119 funcctx->user_fctx = NULL;
1120 }
1121 }
1122
1123 /* stuff done on every call of the function */
1125 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1126
1127 if (status != NULL)
1128 {
1129 ChannelName *entry;
1130
1131 entry = (ChannelName *) hash_seq_search(status);
1132 if (entry != NULL)
1134 }
1135
1137}

References ChannelName::channel, CStringGetTextDatum, fb(), hash_seq_init(), hash_seq_search(), localChannelTable, MemoryContextSwitchTo(), palloc(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 2168 of file async.c.

2169{
2170 double usage;
2171
2172 /* Advance the queue tail so we don't report a too-large result */
2174
2178
2180}

References asyncQueueAdvanceTail(), asyncQueueUsage(), fb(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 860 of file async.c.

861{
862 const char *channel;
863 const char *payload;
864
865 if (PG_ARGISNULL(0))
866 channel = "";
867 else
869
870 if (PG_ARGISNULL(1))
871 payload = "";
872 else
874
875 /* For NOTIFY as a statement, this is checked in ProcessUtility */
877
878 Async_Notify(channel, payload);
879
881}

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 1185 of file async.c.

1186{
1187 ListCell *p;
1188
1190 return; /* no relevant statements in this xact */
1191
1192 if (Trace_notify)
1193 elog(DEBUG1, "PreCommit_Notify");
1194
1195 /* Preflight for any pending listen/unlisten actions */
1197
1198 if (pendingActions != NULL)
1199 {
1200 /* Ensure we have a local channel table */
1202 /* Create pendingListenActions hash table for this transaction */
1204
1205 /* Stage all the actions this transaction wants to perform */
1206 foreach(p, pendingActions->actions)
1207 {
1209
1210 switch (actrec->action)
1211 {
1212 case LISTEN_LISTEN:
1215 break;
1216 case LISTEN_UNLISTEN:
1218 break;
1221 break;
1222 }
1223 }
1224 }
1225
1226 /* Queue any pending notifies (must happen after the above) */
1227 if (pendingNotifies)
1228 {
1230 bool firstIteration = true;
1231
1232 /*
1233 * Build list of unique channel names being notified for use by
1234 * SignalBackends().
1235 *
1236 * If uniqueChannelHash is available, use it to efficiently get the
1237 * unique channels. Otherwise, fall back to the O(N^2) approach.
1238 */
1241 {
1242 HASH_SEQ_STATUS status;
1244
1246 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1249 channelEntry->channel);
1250 }
1251 else
1252 {
1253 /* O(N^2) approach is better for small number of notifications */
1255 {
1256 char *channel = n->data;
1257 bool found = false;
1258
1259 /* Name present in list? */
1261 {
1262 if (strcmp(oldchan, channel) == 0)
1263 {
1264 found = true;
1265 break;
1266 }
1267 }
1268 /* Add if not already in list */
1269 if (!found)
1272 channel);
1273 }
1274 }
1275
1276 /* Preallocate workspace that will be needed by SignalBackends() */
1277 if (signalPids == NULL)
1279 MaxBackends * sizeof(int32));
1280
1281 if (signalProcnos == NULL)
1283 MaxBackends * sizeof(ProcNumber));
1284
1285 /*
1286 * Make sure that we have an XID assigned to the current transaction.
1287 * GetCurrentTransactionId is cheap if we already have an XID, but not
1288 * so cheap if we don't, and we'd prefer not to do that work while
1289 * holding NotifyQueueLock.
1290 */
1292
1293 /*
1294 * Serialize writers by acquiring a special lock that we hold till
1295 * after commit. This ensures that queue entries appear in commit
1296 * order, and in particular that there are never uncommitted queue
1297 * entries ahead of committed ones, so an uncommitted transaction
1298 * can't block delivery of deliverable notifications.
1299 *
1300 * We use a heavyweight lock so that it'll automatically be released
1301 * after either commit or abort. This also allows deadlocks to be
1302 * detected, though really a deadlock shouldn't be possible here.
1303 *
1304 * The lock is on "database 0", which is pretty ugly but it doesn't
1305 * seem worth inventing a special locktag category just for this.
1306 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1307 * used by the flatfiles mechanism.)
1308 */
1311
1312 /*
1313 * For the direct advancement optimization in SignalBackends(), we
1314 * need to ensure that no other backend can insert queue entries
1315 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1316 * heavyweight lock above provides this guarantee, since it serializes
1317 * all writers.
1318 *
1319 * Note: if the heavyweight lock were ever removed for scalability
1320 * reasons, we could achieve the same guarantee by holding
1321 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1322 * than releasing and reacquiring it for each page as we do below.
1323 */
1324
1325 /* Initialize values to a safe default in case list is empty */
1328
1329 /* Now push the notifications into the queue */
1331 while (nextNotify != NULL)
1332 {
1333 /*
1334 * Add the pending notifications to the queue. We acquire and
1335 * release NotifyQueueLock once per page, which might be overkill
1336 * but it does allow readers to get in while we're doing this.
1337 *
1338 * A full queue is very uncommon and should really not happen,
1339 * given that we have so much space available in the SLRU pages.
1340 * Nevertheless we need to deal with this possibility. Note that
1341 * when we get here we are in the process of committing our
1342 * transaction, but we have not yet committed to clog, so at this
1343 * point in time we can still roll the transaction back.
1344 */
1346 if (firstIteration)
1347 {
1349 firstIteration = false;
1350 }
1352 if (asyncQueueIsFull())
1353 ereport(ERROR,
1355 errmsg("too many notifications in the NOTIFY queue")));
1359 }
1360
1361 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1362 }
1363}

References AccessExclusiveLock, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), BecomeRegisteredListener(), DEBUG1, elog, ereport, errcode(), errmsg, ERROR, NotificationList::events, fb(), foreach_ptr, GetCurrentTransactionId(), hash_seq_init(), hash_seq_search(), initGlobalChannelTable(), initLocalChannelTable(), initPendingListenActions(), InvalidOid, lappend(), lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MemoryContextAlloc(), NIL, pendingActions, pendingNotifies, PrepareTableEntriesForListen(), PrepareTableEntriesForUnlisten(), PrepareTableEntriesForUnlistenAll(), QUEUE_HEAD, queueHeadAfterWrite, queueHeadBeforeWrite, SET_QUEUE_POS, signalPids, signalProcnos, TopMemoryContext, Trace_notify, NotificationList::uniqueChannelHash, and NotificationList::uniqueChannelNames.

Referenced by CommitTransaction().

◆ PrepareTableEntriesForListen()

static void PrepareTableEntriesForListen ( const char channel)
static

Definition at line 1531 of file async.c.

1532{
1534 GlobalChannelEntry *entry;
1535 bool found;
1537 PendingListenEntry *pending;
1538
1539 /*
1540 * Record in local pending hash that we want to LISTEN, overwriting any
1541 * earlier attempt to UNLISTEN.
1542 */
1543 pending = (PendingListenEntry *)
1545 pending->action = PENDING_LISTEN;
1546
1547 /*
1548 * Ensure that there is an entry for the channel in localChannelTable.
1549 * (Should this fail, we can just roll back.) If the transaction fails
1550 * after this point, we will remove the entry if appropriate during
1551 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1552 * to return TRUE; we assume nothing is going to consult that before
1553 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1554 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1555 * present to ensure they do the right things; see
1556 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1557 */
1559
1560 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1561 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1562 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1563
1564 if (!found)
1565 {
1566 /* New channel entry, so initialize it to a safe state */
1568 entry->numListeners = 0;
1569 entry->allocatedListeners = 0;
1570 }
1571
1572 /*
1573 * Create listenersArray if entry doesn't have one. It's tempting to fold
1574 * this into the !found case, but this coding allows us to cope in case
1575 * dsa_allocate() failed in an earlier attempt.
1576 */
1577 if (!DsaPointerIsValid(entry->listenersArray))
1578 {
1582 }
1583
1586
1587 /*
1588 * Check if we already have a ListenerEntry (possibly from earlier in this
1589 * transaction)
1590 */
1591 for (int i = 0; i < entry->numListeners; i++)
1592 {
1593 if (listeners[i].procNo == MyProcNumber)
1594 {
1595 /* Already have an entry; listening flag stays as-is until commit */
1597 return;
1598 }
1599 }
1600
1601 /* Need to add a new entry; grow array if necessary */
1602 if (entry->numListeners >= entry->allocatedListeners)
1603 {
1604 int new_size = entry->allocatedListeners * 2;
1607 sizeof(ListenerEntry) * new_size);
1609
1611 entry->listenersArray = new_array;
1615 }
1616
1617 listeners[entry->numListeners].procNo = MyProcNumber;
1618 listeners[entry->numListeners].listening = false; /* staged, not yet
1619 * committed */
1620 entry->numListeners++;
1621
1623}

References PendingListenEntry::action, GlobalChannelEntry::allocatedListeners, dsa_allocate, dsa_free(), dsa_get_address(), DsaPointerIsValid, dshash_find_or_insert, dshash_release_lock(), fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_ENTER, hash_search(), i, INITIAL_LISTENERS_ARRAY_SIZE, InvalidDsaPointer, GlobalChannelEntry::listenersArray, localChannelTable, memcpy(), MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PENDING_LISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlisten()

static void PrepareTableEntriesForUnlisten ( const char channel)
static

Definition at line 1634 of file async.c.

1635{
1636 PendingListenEntry *pending;
1637
1638 /*
1639 * If the channel name is not in localChannelTable, then we are neither
1640 * listening on it nor preparing to listen on it, so we don't need to
1641 * record an UNLISTEN action.
1642 */
1644 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1645 return;
1646
1647 /*
1648 * Record in local pending hash that we want to UNLISTEN, overwriting any
1649 * earlier attempt to LISTEN. Don't touch localChannelTable or
1650 * globalChannelTable yet - we keep receiving signals until commit.
1651 */
1652 pending = (PendingListenEntry *)
1654 pending->action = PENDING_UNLISTEN;
1655}

References PendingListenEntry::action, Assert, fb(), HASH_ENTER, HASH_FIND, hash_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlistenAll()

static void PrepareTableEntriesForUnlistenAll ( void  )
static

Definition at line 1664 of file async.c.

1665{
1668 PendingListenEntry *pending;
1669
1670 /*
1671 * Scan localChannelTable, which will have the names of all channels that
1672 * we are listening on or have prepared to listen on. Record an UNLISTEN
1673 * action for each one, overwriting any earlier attempt to LISTEN.
1674 */
1676 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1677 {
1678 pending = (PendingListenEntry *)
1680 pending->action = PENDING_UNLISTEN;
1681 }
1682}

References PendingListenEntry::action, fb(), HASH_ENTER, hash_search(), hash_seq_init(), hash_seq_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 3056 of file async.c.

3057{
3058 /* We *must* reset the flag */
3059 notifyInterruptPending = false;
3060
3061 /* Do nothing else if we aren't actively listening */
3063 return;
3064
3065 if (Trace_notify)
3066 elog(DEBUG1, "ProcessIncomingNotify");
3067
3068 set_ps_display("notify interrupt");
3069
3070 /*
3071 * We must run asyncQueueReadAllNotifications inside a transaction, else
3072 * bad things happen if it gets an error.
3073 */
3075
3077
3079
3080 /*
3081 * If this isn't an end-of-command case, we must flush the notify messages
3082 * to ensure frontend gets them promptly.
3083 */
3084 if (flush)
3085 pq_flush();
3086
3087 set_ps_display("idle");
3088
3089 if (Trace_notify)
3090 elog(DEBUG1, "ProcessIncomingNotify: done");
3091}

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, LocalChannelTableIsEmpty, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 2579 of file async.c.

2580{
2582 return; /* not really idle */
2583
2584 /* Loop in case another signal arrives while sending messages */
2586 ProcessIncomingNotify(flush);
2587}

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char channel 
)
static

Definition at line 996 of file async.c.

997{
998 MemoryContext oldcontext;
1000 int my_level = GetCurrentTransactionNestLevel();
1001
1002 /*
1003 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1004 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1005 * final per-channel intent is computed during PreCommit_Notify.
1006 */
1008
1009 /* space for terminating null is included in sizeof(ListenAction) */
1011 strlen(channel) + 1);
1012 actrec->action = action;
1013 strcpy(actrec->channel, channel);
1014
1015 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1016 {
1017 ActionList *actions;
1018
1019 /*
1020 * First action in current sub(xact). Note that we allocate the
1021 * ActionList in TopTransactionContext; the nestingLevel might get
1022 * changed later by AtSubCommit_Notify.
1023 */
1024 actions = (ActionList *)
1026 actions->nestingLevel = my_level;
1027 actions->actions = list_make1(actrec);
1028 actions->upper = pendingActions;
1029 pendingActions = actions;
1030 }
1031 else
1033
1034 MemoryContextSwitchTo(oldcontext);
1035}

References ActionList::actions, CurTransactionContext, fb(), GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ RemoveListenerFromChannel()

static void RemoveListenerFromChannel ( GlobalChannelEntry **  entry_ptr,
ListenerEntry listeners,
int  idx 
)
static

Definition at line 1694 of file async.c.

1697{
1698 GlobalChannelEntry *entry = *entry_ptr;
1699
1700 entry->numListeners--;
1701 if (idx < entry->numListeners)
1703 sizeof(ListenerEntry) * (entry->numListeners - idx));
1704
1705 if (entry->numListeners == 0)
1706 {
1709 /* tells caller not to release the entry's lock: */
1710 *entry_ptr = NULL;
1711 }
1712}

References dsa_free(), dshash_delete_entry(), fb(), globalChannelDSA, globalChannelTable, idx(), GlobalChannelEntry::listenersArray, and GlobalChannelEntry::numListeners.

Referenced by ApplyPendingListenActions().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 2266 of file async.c.

2267{
2268 int count;
2269
2270 /* Can't get here without PreCommit_Notify having made the global table */
2272
2273 /* It should have set up these arrays, too */
2275
2276 /*
2277 * Identify backends that we need to signal. We don't want to send
2278 * signals while holding the NotifyQueueLock, so this part just builds a
2279 * list of target PIDs in signalPids[] and signalProcnos[].
2280 */
2281 count = 0;
2282
2284
2285 /* Scan each channel name that we notified in this transaction */
2287 {
2289 GlobalChannelEntry *entry;
2291
2292 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2293 entry = dshash_find(globalChannelTable, &key, false);
2294 if (entry == NULL)
2295 continue; /* nobody is listening */
2296
2298 entry->listenersArray);
2299
2300 /* Identify listeners that now need waking, add them to arrays */
2301 for (int j = 0; j < entry->numListeners; j++)
2302 {
2303 ProcNumber i;
2304 int32 pid;
2305 QueuePosition pos;
2306
2307 if (!listeners[j].listening)
2308 continue; /* ignore not-yet-committed listeners */
2309
2310 i = listeners[j].procNo;
2311
2313 continue; /* already signaled, no need to repeat */
2314
2315 pid = QUEUE_BACKEND_PID(i);
2316 pos = QUEUE_BACKEND_POS(i);
2317
2318 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2319 continue; /* it's fully caught up already */
2320
2321 Assert(pid != InvalidPid);
2322
2324 signalPids[count] = pid;
2325 signalProcnos[count] = i;
2326 count++;
2327 }
2328
2330 }
2331
2332 /*
2333 * Scan all listeners. Any that are not already pending wakeup must not
2334 * be interested in our notifications (else we'd have set their wakeup
2335 * flags above). Check to see if we can directly advance their queue
2336 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2337 * them anyway so they will catch up.
2338 */
2340 {
2341 int32 pid;
2342 QueuePosition pos;
2343
2345 continue;
2346
2347 /* If it's currently advancing, we should not touch it */
2349 continue;
2350
2351 pid = QUEUE_BACKEND_PID(i);
2352 pos = QUEUE_BACKEND_POS(i);
2353
2354 /*
2355 * We can directly advance the other backend's queue pointer if it's
2356 * not currently advancing (else there are race conditions), and its
2357 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2358 * it miss some older messages), and we'd not be moving the pointer
2359 * backward.
2360 */
2363 {
2364 /* We can directly advance its pointer past what we wrote */
2366 }
2369 {
2370 /* It's idle and far behind, so wake it up */
2371 Assert(pid != InvalidPid);
2372
2374 signalPids[count] = pid;
2375 signalProcnos[count] = i;
2376 count++;
2377 }
2378 }
2379
2381
2382 /* Now send signals */
2383 for (int i = 0; i < count; i++)
2384 {
2385 int32 pid = signalPids[i];
2386
2387 /*
2388 * If we are signaling our own process, no need to involve the kernel;
2389 * just set the flag directly.
2390 */
2391 if (pid == MyProcPid)
2392 {
2394 continue;
2395 }
2396
2397 /*
2398 * Note: assuming things aren't broken, a signal failure here could
2399 * only occur if the target backend exited since we released
2400 * NotifyQueueLock; which is unlikely but certainly possible. So we
2401 * just log a low-level debug message if it happens.
2402 */
2404 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2405 }
2406}

References Assert, asyncQueuePageDiff(), DEBUG3, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), foreach_ptr, globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, i, INVALID_PROC_NUMBER, InvalidPid, j, GlobalChannelEntry::listenersArray, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcPid, notifyInterruptPending, GlobalChannelEntry::numListeners, pendingNotifies, PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, QUEUE_POS_PRECEDES, queueHeadAfterWrite, queueHeadBeforeWrite, SendProcSignal(), signalPids, signalProcnos, and NotificationList::uniqueChannelNames.

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

◆ AsyncShmemCallbacks

const ShmemCallbacks AsyncShmemCallbacks
Initial value:
= {
.request_fn = AsyncShmemRequest,
.init_fn = AsyncShmemInit,
}

Definition at line 352 of file async.c.

352 {
353 .request_fn = AsyncShmemRequest,
354 .init_fn = AsyncShmemInit,
355};

◆ globalChannelDSA

◆ globalChannelTable

◆ globalChannelTableDSHParams

const dshash_parameters globalChannelTableDSHParams
static

◆ localChannelTable

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 584 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ notifyInterruptPending

◆ NotifySlruDesc

SlruDesc NotifySlruDesc
static

Definition at line 375 of file async.c.

Referenced by AsyncShmemRequest().

◆ pendingActions

◆ pendingListenActions

◆ pendingNotifies

◆ queueHeadAfterWrite

QueuePosition queueHeadAfterWrite
static

Definition at line 567 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ queueHeadBeforeWrite

QueuePosition queueHeadBeforeWrite
static

Definition at line 566 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalPids

int32* signalPids = NULL
static

Definition at line 574 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalProcnos

ProcNumber* signalProcnos = NULL
static

Definition at line 575 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 578 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 555 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and BecomeRegisteredListener().