PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  GlobalChannelKey
 
struct  ListenerEntry
 
struct  GlobalChannelEntry
 
struct  ListenAction
 
struct  ActionList
 
struct  PendingListenEntry
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 
struct  ChannelName
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_POS_PRECEDES(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define QUEUE_BACKEND_WAKEUP_PENDING(i)   (asyncQueueControl->backend[i].wakeupPending)
 
#define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define INITIAL_LISTENERS_ARRAY_SIZE   4
 
#define LocalChannelTableIsEmpty()    (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct GlobalChannelKey GlobalChannelKey
 
typedef struct ListenerEntry ListenerEntry
 
typedef struct GlobalChannelEntry GlobalChannelEntry
 
typedef struct ActionList ActionList
 
typedef struct PendingListenEntry PendingListenEntry
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct ChannelName ChannelName
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 
enum  PendingListenAction { PENDING_LISTEN , PENDING_UNLISTEN }
 

Functions

static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static void GlobalChannelKeyInit (GlobalChannelKey *key, Oid dboid, const char *channel)
 
static dshash_hash globalChannelTableHash (const void *key, size_t size, void *arg)
 
static void initGlobalChannelTable (void)
 
static void initLocalChannelTable (void)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void BecomeRegisteredListener (void)
 
static void PrepareTableEntriesForListen (const char *channel)
 
static void PrepareTableEntriesForUnlisten (const char *channel)
 
static void PrepareTableEntriesForUnlistenAll (void)
 
static void RemoveListenerFromChannel (GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
 
static void ApplyPendingListenActions (bool isCommit)
 
static void CleanupListenersOnExit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (QueuePosition *current, QueuePosition stop, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
static void initPendingListenActions (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void AsyncNotifyFreezeXids (TransactionId newFrozenXid)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static dshash_tableglobalChannelTable = NULL
 
static dsa_areaglobalChannelDSA = NULL
 
static HTABlocalChannelTable = NULL
 
static ActionListpendingActions = NULL
 
static HTABpendingListenActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static QueuePosition queueHeadBeforeWrite
 
static QueuePosition queueHeadAfterWrite
 
static int32signalPids = NULL
 
static ProcNumbersignalProcnos = NULL
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 
static const dshash_parameters globalChannelTableDSHParams
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 226 of file async.c.

◆ INITIAL_LISTENERS_ARRAY_SIZE

#define INITIAL_LISTENERS_ARRAY_SIZE   4

Definition at line 378 of file async.c.

◆ LocalChannelTableIsEmpty

#define LocalChannelTableIsEmpty ( )     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)

Definition at line 411 of file async.c.

424{
429
430typedef struct
431{
433 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
435
436typedef struct ActionList
437{
438 int nestingLevel; /* current transaction nesting depth */
439 List *actions; /* list of ListenAction structs */
440 struct ActionList *upper; /* details for upper transaction levels */
441} ActionList;
442
444
445/*
446 * Hash table recording the final listen/unlisten intent per channel for
447 * the current transaction. Key is channel name, value is PENDING_LISTEN or
448 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
449 * per channel instead of replaying every action. This is built from the
450 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
451 * AtAbort_Notify.
452 */
453typedef enum
454{
458
459typedef struct PendingListenEntry
460{
461 char channel[NAMEDATALEN]; /* hash key */
462 PendingListenAction action; /* which action should we perform? */
464
466
467/*
468 * State for outbound notifies consists of a list of all channels+payloads
469 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
470 * until and unless the transaction commits. pendingNotifies is NULL if no
471 * NOTIFYs have been done in the current (sub) transaction.
472 *
473 * We discard duplicate notify events issued in the same transaction.
474 * Hence, in addition to the list proper (which we need to track the order
475 * of the events, since we guarantee to deliver them in order), we build a
476 * hash table which we can probe to detect duplicates. Since building the
477 * hash table is somewhat expensive, we do so only once we have at least
478 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
479 * before that we just scan the events linearly.
480 *
481 * The list is kept in CurTransactionContext. In subtransactions, each
482 * subtransaction has its own list in its own CurTransactionContext, but
483 * successful subtransactions add their entries to their parent's list.
484 * Failed subtransactions simply discard their lists. Since these lists
485 * are independent, there may be notify events in a subtransaction's list
486 * that duplicate events in some ancestor (sub) transaction; we get rid of
487 * the dups when merging the subtransaction's list into its parent's.
488 *
489 * Note: the action and notify lists do not interact within a transaction.
490 * In particular, if a transaction does NOTIFY and then LISTEN on the same
491 * condition name, it will get a self-notify at commit. This is a bit odd
492 * but is consistent with our historical behavior.
493 */
494typedef struct Notification
495{
496 uint16 channel_len; /* length of channel-name string */
497 uint16 payload_len; /* length of payload string */
498 /* null-terminated channel name, then null-terminated payload follow */
501
502typedef struct NotificationList
503{
504 int nestingLevel; /* current transaction nesting depth */
505 List *events; /* list of Notification structs */
506 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
507 List *uniqueChannelNames; /* unique channel names being notified */
508 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
509 struct NotificationList *upper; /* details for upper transaction levels */
511
512#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
513
514struct NotificationHash
515{
516 Notification *event; /* => the actual Notification struct */
517};
518
520
521/*
522 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
523 * (both just carry the channel name, with no payload).
524 */
525typedef struct ChannelName
526{
527 char channel[NAMEDATALEN]; /* hash key */
529
530/*
531 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
532 * called from inside a signal handler. That just sets the
533 * notifyInterruptPending flag and sets the process
534 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
535 * actually deal with the interrupt.
536 */
537volatile sig_atomic_t notifyInterruptPending = false;
538
539/* True if we've registered an on_shmem_exit cleanup */
540static bool unlistenExitRegistered = false;
541
542/* True if we're currently registered as a listener in asyncQueueControl */
543static bool amRegisteredListener = false;
544
545/*
546 * Queue head positions for direct advancement.
547 * These are captured during PreCommit_Notify while holding the heavyweight
548 * lock on database 0, ensuring no other backend can insert notifications
549 * between them. SignalBackends uses these to advance idle backends.
550 */
553
554/*
555 * Workspace arrays for SignalBackends. These are preallocated in
556 * PreCommit_Notify to avoid needing memory allocation after committing to
557 * clog.
558 */
559static int32 *signalPids = NULL;
561
562/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
563static bool tryAdvanceTail = false;
564
565/* GUC parameters */
566bool Trace_notify = false;
567
568/* For 8 KB pages this gives 8 GB of disk space */
569int max_notify_queue_pages = 1048576;
570
571/* local function prototypes */
572static inline int64 asyncQueuePageDiff(int64 p, int64 q);
573static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
574static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
575 const char *channel);
576static dshash_hash globalChannelTableHash(const void *key, size_t size,
577 void *arg);
578static void initGlobalChannelTable(void);
579static void initLocalChannelTable(void);
580static void queue_listen(ListenActionKind action, const char *channel);
581static void Async_UnlistenOnExit(int code, Datum arg);
582static void BecomeRegisteredListener(void);
583static void PrepareTableEntriesForListen(const char *channel);
584static void PrepareTableEntriesForUnlisten(const char *channel);
585static void PrepareTableEntriesForUnlistenAll(void);
588 int idx);
589static void ApplyPendingListenActions(bool isCommit);
590static void CleanupListenersOnExit(void);
591static bool IsListeningOn(const char *channel);
592static void asyncQueueUnregister(void);
593static bool asyncQueueIsFull(void);
594static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
597static double asyncQueueUsage(void);
598static void asyncQueueFillWarning(void);
599static void SignalBackends(void);
600static void asyncQueueReadAllNotifications(void);
602 QueuePosition stop,
603 Snapshot snapshot);
604static void asyncQueueAdvanceTail(void);
605static void ProcessIncomingNotify(bool flush);
608static uint32 notification_hash(const void *key, Size keysize);
609static int notification_match(const void *key1, const void *key2, Size keysize);
610static void ClearPendingActionsAndNotifies(void);
611
612/*
613 * Compute the difference between two queue page numbers.
614 * Previously this function accounted for a wraparound.
615 */
616static inline int64
618{
619 return p - q;
620}
621
622/*
623 * Determines whether p precedes q.
624 * Previously this function accounted for a wraparound.
625 */
626static inline bool
628{
629 return p < q;
630}
631
632/*
633 * GlobalChannelKeyInit
634 * Prepare a global channel table key for hashing.
635 */
636static inline void
637GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
638{
639 memset(key, 0, sizeof(GlobalChannelKey));
640 key->dboid = dboid;
641 strlcpy(key->channel, channel, NAMEDATALEN);
642}
643
644/*
645 * globalChannelTableHash
646 * Hash function for global channel table keys.
647 */
648static dshash_hash
649globalChannelTableHash(const void *key, size_t size, void *arg)
650{
651 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
652 dshash_hash h;
653
655 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
657
658 return h;
659}
660
661/* parameters for the global channel table */
663 sizeof(GlobalChannelKey),
664 sizeof(GlobalChannelEntry),
669};
670
671/*
672 * initGlobalChannelTable
673 * Lazy initialization of the global channel table.
674 */
675static void
677{
678 MemoryContext oldcontext;
679
680 /* Quick exit if we already did this */
683 return;
684
685 /* Otherwise, use a lock to ensure only one process creates the table */
687
688 /* Be sure any local memory allocated by DSA routines is persistent */
690
692 {
693 /* Initialize dynamic shared hash table for global channels */
699 NULL);
700
701 /* Store handles in shared memory for other backends to use */
705 }
706 else if (!globalChannelTable)
707 {
708 /* Attach to existing dynamic shared hash table */
714 NULL);
715 }
716
717 MemoryContextSwitchTo(oldcontext);
719}
720
721/*
722 * initLocalChannelTable
723 * Lazy initialization of the local channel table.
724 * Once created, this table lasts for the life of the session.
725 */
726static void
728{
730
731 /* Quick exit if we already did this */
732 if (localChannelTable != NULL)
733 return;
734
735 /* Initialize local hash table for this backend's listened channels */
737 hash_ctl.entrysize = sizeof(ChannelName);
738
740 hash_create("Local Listen Channels",
741 64,
742 &hash_ctl,
744}
745
746/*
747 * initPendingListenActions
748 * Lazy initialization of the pending listen actions hash table.
749 * This is allocated in CurTransactionContext during PreCommit_Notify,
750 * and destroyed at transaction end.
751 */
752static void
754{
756
758 return;
759
761 hash_ctl.entrysize = sizeof(PendingListenEntry);
763
765 hash_create("Pending Listen Actions",
767 &hash_ctl,
769}
770
771/*
772 * Report space needed for our shared memory area
773 */
774Size
775AsyncShmemSize(void)
776{
777 Size size;
778
779 /* This had better match AsyncShmemInit */
780 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
781 size = add_size(size, offsetof(AsyncQueueControl, backend));
782
784
785 return size;
786}
787
788/*
789 * Initialize our shared memory area
790 */
791void
792AsyncShmemInit(void)
793{
794 bool found;
795 Size size;
796
797 /*
798 * Create or attach to the AsyncQueueControl structure.
799 */
800 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
801 size = add_size(size, offsetof(AsyncQueueControl, backend));
802
804 ShmemInitStruct("Async Queue Control", size, &found);
805
806 if (!found)
807 {
808 /* First time through, so initialize it */
811 QUEUE_STOP_PAGE = 0;
816 for (int i = 0; i < MaxBackends; i++)
817 {
824 }
825 }
826
827 /*
828 * Set up SLRU management of the pg_notify data. Note that long segment
829 * names are used in order to avoid wraparound.
830 */
831 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
834 SYNC_HANDLER_NONE, true);
835
836 if (!found)
837 {
838 /*
839 * During start or reboot, clean out the pg_notify directory.
840 */
842 }
843}
844
845
846/*
847 * pg_notify -
848 * SQL function to send a notification event
849 */
850Datum
852{
853 const char *channel;
854 const char *payload;
855
856 if (PG_ARGISNULL(0))
857 channel = "";
858 else
860
861 if (PG_ARGISNULL(1))
862 payload = "";
863 else
865
866 /* For NOTIFY as a statement, this is checked in ProcessUtility */
868
869 Async_Notify(channel, payload);
870
872}
873
874
875/*
876 * Async_Notify
877 *
878 * This is executed by the SQL notify command.
879 *
880 * Adds the message to the list of pending notifies.
881 * Actual notification happens during transaction commit.
882 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
883 */
884void
885Async_Notify(const char *channel, const char *payload)
886{
887 int my_level = GetCurrentTransactionNestLevel();
888 size_t channel_len;
889 size_t payload_len;
890 Notification *n;
891 MemoryContext oldcontext;
892
893 if (IsParallelWorker())
894 elog(ERROR, "cannot send notifications from a parallel worker");
895
896 if (Trace_notify)
897 elog(DEBUG1, "Async_Notify(%s)", channel);
898
899 channel_len = channel ? strlen(channel) : 0;
900 payload_len = payload ? strlen(payload) : 0;
901
902 /* a channel name must be specified */
903 if (channel_len == 0)
906 errmsg("channel name cannot be empty")));
907
908 /* enforce length limits */
909 if (channel_len >= NAMEDATALEN)
912 errmsg("channel name too long")));
913
914 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
917 errmsg("payload string too long")));
918
919 /*
920 * We must construct the Notification entry, even if we end up not using
921 * it, in order to compare it cheaply to existing list entries.
922 *
923 * The notification list needs to live until end of transaction, so store
924 * it in the transaction context.
925 */
927
929 channel_len + payload_len + 2);
930 n->channel_len = channel_len;
931 n->payload_len = payload_len;
932 strcpy(n->data, channel);
933 if (payload)
934 strcpy(n->data + channel_len + 1, payload);
935 else
936 n->data[channel_len + 1] = '\0';
937
938 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
939 {
941
942 /*
943 * First notify event in current (sub)xact. Note that we allocate the
944 * NotificationList in TopTransactionContext; the nestingLevel might
945 * get changed later by AtSubCommit_Notify.
946 */
949 sizeof(NotificationList));
950 notifies->nestingLevel = my_level;
951 notifies->events = list_make1(n);
952 /* We certainly don't need a hashtable yet */
953 notifies->hashtab = NULL;
954 /* We won't build uniqueChannelNames/Hash till later, either */
955 notifies->uniqueChannelNames = NIL;
956 notifies->uniqueChannelHash = NULL;
957 notifies->upper = pendingNotifies;
959 }
960 else
961 {
962 /* Now check for duplicates */
964 {
965 /* It's a dup, so forget it */
966 pfree(n);
967 MemoryContextSwitchTo(oldcontext);
968 return;
969 }
970
971 /* Append more events to existing list */
973 }
974
975 MemoryContextSwitchTo(oldcontext);
976}
977
978/*
979 * queue_listen
980 * Common code for listen, unlisten, unlisten all commands.
981 *
982 * Adds the request to the list of pending actions.
983 * Actual update of localChannelTable and globalChannelTable happens during
984 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
985 */
986static void
987queue_listen(ListenActionKind action, const char *channel)
988{
989 MemoryContext oldcontext;
991 int my_level = GetCurrentTransactionNestLevel();
992
993 /*
994 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
995 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
996 * final per-channel intent is computed during PreCommit_Notify.
997 */
999
1000 /* space for terminating null is included in sizeof(ListenAction) */
1002 strlen(channel) + 1);
1003 actrec->action = action;
1004 strcpy(actrec->channel, channel);
1005
1006 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1007 {
1008 ActionList *actions;
1009
1010 /*
1011 * First action in current sub(xact). Note that we allocate the
1012 * ActionList in TopTransactionContext; the nestingLevel might get
1013 * changed later by AtSubCommit_Notify.
1014 */
1015 actions = (ActionList *)
1017 actions->nestingLevel = my_level;
1018 actions->actions = list_make1(actrec);
1019 actions->upper = pendingActions;
1020 pendingActions = actions;
1021 }
1022 else
1024
1025 MemoryContextSwitchTo(oldcontext);
1026}
1027
1028/*
1029 * Async_Listen
1030 *
1031 * This is executed by the SQL listen command.
1032 */
1033void
1034Async_Listen(const char *channel)
1035{
1036 if (Trace_notify)
1037 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1038
1039 queue_listen(LISTEN_LISTEN, channel);
1040}
1041
1042/*
1043 * Async_Unlisten
1044 *
1045 * This is executed by the SQL unlisten command.
1046 */
1047void
1048Async_Unlisten(const char *channel)
1049{
1050 if (Trace_notify)
1051 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1052
1053 /* If we couldn't possibly be listening, no need to queue anything */
1055 return;
1056
1057 queue_listen(LISTEN_UNLISTEN, channel);
1058}
1059
1060/*
1061 * Async_UnlistenAll
1062 *
1063 * This is invoked by UNLISTEN * command, and also at backend exit.
1064 */
1065void
1067{
1068 if (Trace_notify)
1069 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1070
1071 /* If we couldn't possibly be listening, no need to queue anything */
1073 return;
1074
1076}
1077
1078/*
1079 * SQL function: return a set of the channel names this backend is actively
1080 * listening to.
1081 *
1082 * Note: this coding relies on the fact that the localChannelTable cannot
1083 * change within a transaction.
1084 */
1085Datum
1087{
1089 HASH_SEQ_STATUS *status;
1090
1091 /* stuff done only on the first call of the function */
1092 if (SRF_IS_FIRSTCALL())
1093 {
1094 /* create a function context for cross-call persistence */
1096
1097 /* Initialize hash table iteration if we have any channels */
1098 if (localChannelTable != NULL)
1099 {
1100 MemoryContext oldcontext;
1101
1102 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1103 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1105 funcctx->user_fctx = status;
1106 MemoryContextSwitchTo(oldcontext);
1107 }
1108 else
1109 {
1110 funcctx->user_fctx = NULL;
1111 }
1112 }
1113
1114 /* stuff done on every call of the function */
1116 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1117
1118 if (status != NULL)
1119 {
1120 ChannelName *entry;
1121
1122 entry = (ChannelName *) hash_seq_search(status);
1123 if (entry != NULL)
1125 }
1126
1128}
1129
1130/*
1131 * Async_UnlistenOnExit
1132 *
1133 * This is executed at backend exit if we have done any LISTENs in this
1134 * backend. It might not be necessary anymore, if the user UNLISTENed
1135 * everything, but we don't try to detect that case.
1136 */
1137static void
1139{
1142}
1143
1144/*
1145 * AtPrepare_Notify
1146 *
1147 * This is called at the prepare phase of a two-phase
1148 * transaction. Save the state for possible commit later.
1149 */
1150void
1151AtPrepare_Notify(void)
1152{
1153 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1155 ereport(ERROR,
1157 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1158}
1159
1160/*
1161 * PreCommit_Notify
1162 *
1163 * This is called at transaction commit, before actually committing to
1164 * clog.
1165 *
1166 * If there are pending LISTEN actions, make sure we are listed in the
1167 * shared-memory listener array. This must happen before commit to
1168 * ensure we don't miss any notifies from transactions that commit
1169 * just after ours.
1170 *
1171 * If there are outbound notify requests in the pendingNotifies list,
1172 * add them to the global queue. We do that before commit so that
1173 * we can still throw error if we run out of queue space.
1174 */
1175void
1176PreCommit_Notify(void)
1177{
1178 ListCell *p;
1179
1181 return; /* no relevant statements in this xact */
1182
1183 if (Trace_notify)
1184 elog(DEBUG1, "PreCommit_Notify");
1185
1186 /* Preflight for any pending listen/unlisten actions */
1188
1189 if (pendingActions != NULL)
1190 {
1191 /* Ensure we have a local channel table */
1193 /* Create pendingListenActions hash table for this transaction */
1195
1196 /* Stage all the actions this transaction wants to perform */
1197 foreach(p, pendingActions->actions)
1198 {
1200
1201 switch (actrec->action)
1202 {
1203 case LISTEN_LISTEN:
1206 break;
1207 case LISTEN_UNLISTEN:
1209 break;
1212 break;
1213 }
1214 }
1215 }
1216
1217 /* Queue any pending notifies (must happen after the above) */
1218 if (pendingNotifies)
1219 {
1221 bool firstIteration = true;
1222
1223 /*
1224 * Build list of unique channel names being notified for use by
1225 * SignalBackends().
1226 *
1227 * If uniqueChannelHash is available, use it to efficiently get the
1228 * unique channels. Otherwise, fall back to the O(N^2) approach.
1229 */
1232 {
1233 HASH_SEQ_STATUS status;
1235
1237 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1240 channelEntry->channel);
1241 }
1242 else
1243 {
1244 /* O(N^2) approach is better for small number of notifications */
1246 {
1247 char *channel = n->data;
1248 bool found = false;
1249
1250 /* Name present in list? */
1252 {
1253 if (strcmp(oldchan, channel) == 0)
1254 {
1255 found = true;
1256 break;
1257 }
1258 }
1259 /* Add if not already in list */
1260 if (!found)
1263 channel);
1264 }
1265 }
1266
1267 /* Preallocate workspace that will be needed by SignalBackends() */
1268 if (signalPids == NULL)
1270 MaxBackends * sizeof(int32));
1271
1272 if (signalProcnos == NULL)
1274 MaxBackends * sizeof(ProcNumber));
1275
1276 /*
1277 * Make sure that we have an XID assigned to the current transaction.
1278 * GetCurrentTransactionId is cheap if we already have an XID, but not
1279 * so cheap if we don't, and we'd prefer not to do that work while
1280 * holding NotifyQueueLock.
1281 */
1283
1284 /*
1285 * Serialize writers by acquiring a special lock that we hold till
1286 * after commit. This ensures that queue entries appear in commit
1287 * order, and in particular that there are never uncommitted queue
1288 * entries ahead of committed ones, so an uncommitted transaction
1289 * can't block delivery of deliverable notifications.
1290 *
1291 * We use a heavyweight lock so that it'll automatically be released
1292 * after either commit or abort. This also allows deadlocks to be
1293 * detected, though really a deadlock shouldn't be possible here.
1294 *
1295 * The lock is on "database 0", which is pretty ugly but it doesn't
1296 * seem worth inventing a special locktag category just for this.
1297 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1298 * used by the flatfiles mechanism.)
1299 */
1302
1303 /*
1304 * For the direct advancement optimization in SignalBackends(), we
1305 * need to ensure that no other backend can insert queue entries
1306 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1307 * heavyweight lock above provides this guarantee, since it serializes
1308 * all writers.
1309 *
1310 * Note: if the heavyweight lock were ever removed for scalability
1311 * reasons, we could achieve the same guarantee by holding
1312 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1313 * than releasing and reacquiring it for each page as we do below.
1314 */
1315
1316 /* Initialize values to a safe default in case list is empty */
1319
1320 /* Now push the notifications into the queue */
1322 while (nextNotify != NULL)
1323 {
1324 /*
1325 * Add the pending notifications to the queue. We acquire and
1326 * release NotifyQueueLock once per page, which might be overkill
1327 * but it does allow readers to get in while we're doing this.
1328 *
1329 * A full queue is very uncommon and should really not happen,
1330 * given that we have so much space available in the SLRU pages.
1331 * Nevertheless we need to deal with this possibility. Note that
1332 * when we get here we are in the process of committing our
1333 * transaction, but we have not yet committed to clog, so at this
1334 * point in time we can still roll the transaction back.
1335 */
1337 if (firstIteration)
1338 {
1340 firstIteration = false;
1341 }
1343 if (asyncQueueIsFull())
1344 ereport(ERROR,
1346 errmsg("too many notifications in the NOTIFY queue")));
1350 }
1351
1352 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1353 }
1354}
1355
1356/*
1357 * AtCommit_Notify
1358 *
1359 * This is called at transaction commit, after committing to clog.
1360 *
1361 * Apply pending listen/unlisten changes and clear transaction-local state.
1362 *
1363 * If we issued any notifications in the transaction, send signals to
1364 * listening backends (possibly including ourselves) to process them.
1365 * Also, if we filled enough queue pages with new notifies, try to
1366 * advance the queue tail pointer.
1367 */
1368void
1369AtCommit_Notify(void)
1370{
1371 /*
1372 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1373 * return as soon as possible
1374 */
1376 return;
1377
1378 if (Trace_notify)
1379 elog(DEBUG1, "AtCommit_Notify");
1380
1381 /* Apply staged listen/unlisten changes */
1383
1384 /* If no longer listening to anything, get out of listener array */
1387
1388 /*
1389 * Send signals to listening backends. We need do this only if there are
1390 * pending notifies, which were previously added to the shared queue by
1391 * PreCommit_Notify().
1392 */
1393 if (pendingNotifies != NULL)
1395
1396 /*
1397 * If it's time to try to advance the global tail pointer, do that.
1398 *
1399 * (It might seem odd to do this in the sender, when more than likely the
1400 * listeners won't yet have read the messages we just sent. However,
1401 * there's less contention if only the sender does it, and there is little
1402 * need for urgency in advancing the global tail. So this typically will
1403 * be clearing out messages that were sent some time ago.)
1404 */
1405 if (tryAdvanceTail)
1406 {
1407 tryAdvanceTail = false;
1409 }
1410
1411 /* And clean up */
1413}
1414
1415/*
1416 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1417 *
1418 * This function must make sure we are ready to catch any incoming messages.
1419 */
1420static void
1422{
1423 QueuePosition head;
1424 QueuePosition max;
1426
1427 /*
1428 * Nothing to do if we are already listening to something, nor if we
1429 * already ran this routine in this transaction.
1430 */
1432 return;
1433
1434 if (Trace_notify)
1435 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1436
1437 /*
1438 * Before registering, make sure we will unlisten before dying. (Note:
1439 * this action does not get undone if we abort later.)
1440 */
1442 {
1445 }
1446
1447 /*
1448 * This is our first LISTEN, so establish our pointer.
1449 *
1450 * We set our pointer to the global tail pointer and then move it forward
1451 * over already-committed notifications. This ensures we cannot miss any
1452 * not-yet-committed notifications. We might get a few more but that
1453 * doesn't hurt.
1454 *
1455 * In some scenarios there might be a lot of committed notifications that
1456 * have not yet been pruned away (because some backend is being lazy about
1457 * reading them). To reduce our startup time, we can look at other
1458 * backends and adopt the maximum "pos" pointer of any backend that's in
1459 * our database; any notifications it's already advanced over are surely
1460 * committed and need not be re-examined by us. (We must consider only
1461 * backends connected to our DB, because others will not have bothered to
1462 * check committed-ness of notifications in our DB.)
1463 *
1464 * We need exclusive lock here so we can look at other backends' entries
1465 * and manipulate the list links.
1466 */
1468 head = QUEUE_HEAD;
1469 max = QUEUE_TAIL;
1472 {
1474 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1475 /* Also find last listening backend before this one */
1476 if (i < MyProcNumber)
1477 prevListener = i;
1478 }
1484 /* Insert backend into list of listeners at correct position */
1486 {
1489 }
1490 else
1491 {
1494 }
1496
1497 /* Now we are listed in the global array, so remember we're listening */
1498 amRegisteredListener = true;
1499
1500 /*
1501 * Try to move our pointer forward as far as possible. This will skip
1502 * over already-committed notifications, which we want to do because they
1503 * might be quite stale. Note that we are not yet listening on anything,
1504 * so we won't deliver such notifications to our frontend. Also, although
1505 * our transaction might have executed NOTIFY, those message(s) aren't
1506 * queued yet so we won't skip them here.
1507 */
1508 if (!QUEUE_POS_EQUAL(max, head))
1510}
1511
1512/*
1513 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1514 *
1515 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1516 * an entry in localChannelTable, and pre-allocating an entry in the shared
1517 * globalChannelTable with listening=false. The listening flag will be set
1518 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1519 * will be removed.
1520 */
1521static void
1522PrepareTableEntriesForListen(const char *channel)
1523{
1525 GlobalChannelEntry *entry;
1526 bool found;
1528 PendingListenEntry *pending;
1529
1530 /*
1531 * Record in local pending hash that we want to LISTEN, overwriting any
1532 * earlier attempt to UNLISTEN.
1533 */
1534 pending = (PendingListenEntry *)
1536 pending->action = PENDING_LISTEN;
1537
1538 /*
1539 * Ensure that there is an entry for the channel in localChannelTable.
1540 * (Should this fail, we can just roll back.) If the transaction fails
1541 * after this point, we will remove the entry if appropriate during
1542 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1543 * to return TRUE; we assume nothing is going to consult that before
1544 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1545 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1546 * present to ensure they do the right things; see
1547 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1548 */
1550
1551 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1552 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1553 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1554
1555 if (!found)
1556 {
1557 /* New channel entry, so initialize it to a safe state */
1559 entry->numListeners = 0;
1560 entry->allocatedListeners = 0;
1561 }
1562
1563 /*
1564 * Create listenersArray if entry doesn't have one. It's tempting to fold
1565 * this into the !found case, but this coding allows us to cope in case
1566 * dsa_allocate() failed in an earlier attempt.
1567 */
1568 if (!DsaPointerIsValid(entry->listenersArray))
1569 {
1573 }
1574
1577
1578 /*
1579 * Check if we already have a ListenerEntry (possibly from earlier in this
1580 * transaction)
1581 */
1582 for (int i = 0; i < entry->numListeners; i++)
1583 {
1584 if (listeners[i].procNo == MyProcNumber)
1585 {
1586 /* Already have an entry; listening flag stays as-is until commit */
1588 return;
1589 }
1590 }
1591
1592 /* Need to add a new entry; grow array if necessary */
1593 if (entry->numListeners >= entry->allocatedListeners)
1594 {
1595 int new_size = entry->allocatedListeners * 2;
1598 sizeof(ListenerEntry) * new_size);
1600
1602 entry->listenersArray = new_array;
1606 }
1607
1608 listeners[entry->numListeners].procNo = MyProcNumber;
1609 listeners[entry->numListeners].listening = false; /* staged, not yet
1610 * committed */
1611 entry->numListeners++;
1612
1614}
1615
1616/*
1617 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1618 *
1619 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1620 * we're currently listening (committed or staged). We don't touch
1621 * globalChannelTable yet - the listener keeps receiving signals until
1622 * commit, when the entry is removed.
1623 */
1624static void
1625PrepareTableEntriesForUnlisten(const char *channel)
1626{
1627 PendingListenEntry *pending;
1628
1629 /*
1630 * If the channel name is not in localChannelTable, then we are neither
1631 * listening on it nor preparing to listen on it, so we don't need to
1632 * record an UNLISTEN action.
1633 */
1635 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1636 return;
1637
1638 /*
1639 * Record in local pending hash that we want to UNLISTEN, overwriting any
1640 * earlier attempt to LISTEN. Don't touch localChannelTable or
1641 * globalChannelTable yet - we keep receiving signals until commit.
1642 */
1643 pending = (PendingListenEntry *)
1645 pending->action = PENDING_UNLISTEN;
1646}
1647
1648/*
1649 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1650 *
1651 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1652 * about-to-be-listened channels in pendingListenActions.
1653 */
1654static void
1656{
1659 PendingListenEntry *pending;
1660
1661 /*
1662 * Scan localChannelTable, which will have the names of all channels that
1663 * we are listening on or have prepared to listen on. Record an UNLISTEN
1664 * action for each one, overwriting any earlier attempt to LISTEN.
1665 */
1667 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1668 {
1669 pending = (PendingListenEntry *)
1671 pending->action = PENDING_UNLISTEN;
1672 }
1673}
1674
1675/*
1676 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1677 *
1678 * Decrements numListeners, compacts the array, and frees the entry if empty.
1679 * Sets *entry_ptr to NULL if the entry was deleted.
1680 *
1681 * We could get the listeners pointer from the entry, but all callers
1682 * already have it at hand.
1683 */
1684static void
1687 int idx)
1688{
1689 GlobalChannelEntry *entry = *entry_ptr;
1690
1691 entry->numListeners--;
1692 if (idx < entry->numListeners)
1694 sizeof(ListenerEntry) * (entry->numListeners - idx));
1695
1696 if (entry->numListeners == 0)
1697 {
1700 /* tells caller not to release the entry's lock: */
1701 *entry_ptr = NULL;
1702 }
1703}
1704
1705/*
1706 * ApplyPendingListenActions
1707 *
1708 * Apply, or revert, staged listen/unlisten changes to the local and global
1709 * hash tables.
1710 */
1711static void
1713{
1715 PendingListenEntry *pending;
1716
1717 /* Quick exit if nothing to do */
1719 return;
1720
1721 /* We made a globalChannelTable before building pendingListenActions */
1722 if (globalChannelTable == NULL)
1723 elog(PANIC, "global channel table missing post-commit/abort");
1724
1725 /* For each staged action ... */
1727 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1728 {
1730 GlobalChannelEntry *entry;
1731 bool removeLocal = true;
1732 bool foundListener = false;
1733
1734 /*
1735 * Find the global entry for this channel. If isCommit, it had better
1736 * exist (it was created in PreCommit). In an abort, it might not
1737 * exist, in which case we are not listening and should discard any
1738 * local entry that PreCommit may have managed to create.
1739 */
1740 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1741 entry = dshash_find(globalChannelTable, &key, true);
1742 if (entry != NULL)
1743 {
1744 /* Scan entry to find the ListenerEntry for this backend */
1746
1749
1750 for (int i = 0; i < entry->numListeners; i++)
1751 {
1752 if (listeners[i].procNo != MyProcNumber)
1753 continue;
1754 foundListener = true;
1755 if (isCommit)
1756 {
1757 if (pending->action == PENDING_LISTEN)
1758 {
1759 /*
1760 * LISTEN being committed: set listening=true.
1761 * localChannelTable entry was created during
1762 * PreCommit and should be kept.
1763 */
1764 listeners[i].listening = true;
1765 removeLocal = false;
1766 }
1767 else
1768 {
1769 /*
1770 * UNLISTEN being committed: remove pre-allocated
1771 * entries from both tables.
1772 */
1774 }
1775 }
1776 else
1777 {
1778 /*
1779 * Note: this part is reachable only if the transaction
1780 * aborts after PreCommit_Notify() has made some
1781 * pendingListenActions entries, so it's pretty hard to
1782 * test.
1783 */
1784 if (!listeners[i].listening)
1785 {
1786 /*
1787 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1788 * and we weren't listening before, so remove
1789 * pre-allocated entries from both tables.
1790 */
1792 }
1793 else
1794 {
1795 /*
1796 * We're aborting, but the previous state was that
1797 * we're listening, so keep localChannelTable entry.
1798 */
1799 removeLocal = false;
1800 }
1801 }
1802 break; /* there shouldn't be another match */
1803 }
1804
1805 /* We might have already released the entry by removing it */
1806 if (entry != NULL)
1808 }
1809
1810 /*
1811 * If we're committing a LISTEN action, we should have found a
1812 * matching ListenerEntry, but otherwise it's okay if we didn't.
1813 */
1814 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1815 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1816 pending->channel, MyProcNumber);
1817
1818 /*
1819 * If we did not find a globalChannelTable entry for our backend, or
1820 * if we are unlistening, remove any localChannelTable entry that may
1821 * exist. (Note in particular that this cleans up if we created a
1822 * localChannelTable entry and then failed while trying to create a
1823 * globalChannelTable entry.)
1824 */
1827 HASH_REMOVE, NULL);
1828 }
1829}
1830
1831/*
1832 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1833 *
1834 * Remove this backend from all channels in the shared global table.
1835 */
1836static void
1838{
1839 dshash_seq_status status;
1840 GlobalChannelEntry *entry;
1841
1842 if (Trace_notify)
1843 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1844
1845 /* Clear our local cache (not really necessary, but be consistent) */
1846 if (localChannelTable != NULL)
1847 {
1850 }
1851
1852 /* Now remove our entries from the shared globalChannelTable */
1853 if (globalChannelTable == NULL)
1854 return;
1855
1856 dshash_seq_init(&status, globalChannelTable, true);
1857 while ((entry = dshash_seq_next(&status)) != NULL)
1858 {
1860
1861 if (entry->key.dboid != MyDatabaseId)
1862 continue; /* not relevant */
1863
1866
1867 for (int i = 0; i < entry->numListeners; i++)
1868 {
1869 if (listeners[i].procNo == MyProcNumber)
1870 {
1871 entry->numListeners--;
1872 if (i < entry->numListeners)
1873 memmove(&listeners[i], &listeners[i + 1],
1874 sizeof(ListenerEntry) * (entry->numListeners - i));
1875
1876 if (entry->numListeners == 0)
1877 {
1879 dshash_delete_current(&status);
1880 }
1881 break;
1882 }
1883 }
1884 }
1885 dshash_seq_term(&status);
1886}
1887
1888/*
1889 * Test whether we are actively listening on the given channel name.
1890 *
1891 * Note: this function is executed for every notification found in the queue.
1892 */
1893static bool
1894IsListeningOn(const char *channel)
1895{
1896 if (localChannelTable == NULL)
1897 return false;
1898
1899 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1900}
1901
1902/*
1903 * Remove our entry from the listeners array when we are no longer listening
1904 * on any channel. NB: must not fail if we're already not listening.
1905 */
1906static void
1908{
1909 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1910
1911 if (!amRegisteredListener) /* nothing to do */
1912 return;
1913
1914 /*
1915 * Need exclusive lock here to manipulate list links.
1916 */
1918 /* Mark our entry as invalid */
1923 /* and remove it from the list */
1926 else
1927 {
1929 {
1931 {
1933 break;
1934 }
1935 }
1936 }
1939
1940 /* mark ourselves as no longer listed in the global array */
1941 amRegisteredListener = false;
1942}
1943
1944/*
1945 * Test whether there is room to insert more notification messages.
1946 *
1947 * Caller must hold at least shared NotifyQueueLock.
1948 */
1949static bool
1950asyncQueueIsFull(void)
1951{
1952 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1954 int64 occupied = headPage - tailPage;
1955
1957}
1958
1959/*
1960 * Advance the QueuePosition to the next entry, assuming that the current
1961 * entry is of length entryLength. If we jump to a new page the function
1962 * returns true, else false.
1963 */
1964static bool
1965asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1966{
1967 int64 pageno = QUEUE_POS_PAGE(*position);
1968 int offset = QUEUE_POS_OFFSET(*position);
1969 bool pageJump = false;
1970
1971 /*
1972 * Move to the next writing position: First jump over what we have just
1973 * written or read.
1974 */
1975 offset += entryLength;
1976 Assert(offset <= QUEUE_PAGESIZE);
1977
1978 /*
1979 * In a second step check if another entry can possibly be written to the
1980 * page. If so, stay here, we have reached the next position. If not, then
1981 * we need to move on to the next page.
1982 */
1984 {
1985 pageno++;
1986 offset = 0;
1987 pageJump = true;
1988 }
1989
1990 SET_QUEUE_POS(*position, pageno, offset);
1991 return pageJump;
1992}
1993
1994/*
1995 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
1996 */
1997static void
1999{
2000 size_t channellen = n->channel_len;
2001 size_t payloadlen = n->payload_len;
2002 int entryLength;
2003
2006
2007 /* The terminators are already included in AsyncQueueEntryEmptySize */
2010 qe->length = entryLength;
2011 qe->dboid = MyDatabaseId;
2012 qe->xid = GetCurrentTransactionId();
2013 qe->srcPid = MyProcPid;
2014 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2015}
2016
2017/*
2018 * Add pending notifications to the queue.
2019 *
2020 * We go page by page here, i.e. we stop once we have to go to a new page but
2021 * we will be called again and then fill that next page. If an entry does not
2022 * fit into the current page, we write a dummy entry with an InvalidOid as the
2023 * database OID in order to fill the page. So every page is always used up to
2024 * the last byte which simplifies reading the page later.
2025 *
2026 * We are passed the list cell (in pendingNotifies->events) containing the next
2027 * notification to write and return the first still-unwritten cell back.
2028 * Eventually we will return NULL indicating all is done.
2029 *
2030 * We are holding NotifyQueueLock already from the caller and grab
2031 * page specific SLRU bank lock locally in this function.
2032 */
2033static ListCell *
2035{
2038 int64 pageno;
2039 int offset;
2040 int slotno;
2042
2043 /*
2044 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2045 * memory upon exiting. The reason for this is that if we have to advance
2046 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2047 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2048 * subsequent insertions would try to put entries into a page that slru.c
2049 * thinks doesn't exist yet.) So, use a local position variable. Note
2050 * that if we do fail, any already-inserted queue entries are forgotten;
2051 * this is okay, since they'd be useless anyway after our transaction
2052 * rolls back.
2053 */
2055
2056 /*
2057 * If this is the first write since the postmaster started, we need to
2058 * initialize the first page of the async SLRU. Otherwise, the current
2059 * page should be initialized already, so just fetch it.
2060 */
2061 pageno = QUEUE_POS_PAGE(queue_head);
2063
2064 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2066
2069 else
2070 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2072
2073 /* Note we mark the page dirty before writing in it */
2074 NotifyCtl->shared->page_dirty[slotno] = true;
2075
2076 while (nextNotify != NULL)
2077 {
2079
2080 /* Construct a valid queue entry in local variable qe */
2082
2083 offset = QUEUE_POS_OFFSET(queue_head);
2084
2085 /* Check whether the entry really fits on the current page */
2086 if (offset + qe.length <= QUEUE_PAGESIZE)
2087 {
2088 /* OK, so advance nextNotify past this item */
2090 }
2091 else
2092 {
2093 /*
2094 * Write a dummy entry to fill up the page. Actually readers will
2095 * only check dboid and since it won't match any reader's database
2096 * OID, they will ignore this entry and move on.
2097 */
2098 qe.length = QUEUE_PAGESIZE - offset;
2099 qe.dboid = InvalidOid;
2101 qe.data[0] = '\0'; /* empty channel */
2102 qe.data[1] = '\0'; /* empty payload */
2103 }
2104
2105 /* Now copy qe into the shared buffer page */
2106 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2107 &qe,
2108 qe.length);
2109
2110 /* Advance queue_head appropriately, and detect if page is full */
2111 if (asyncQueueAdvance(&(queue_head), qe.length))
2112 {
2113 LWLock *lock;
2114
2115 pageno = QUEUE_POS_PAGE(queue_head);
2116 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2117 if (lock != prevlock)
2118 {
2121 prevlock = lock;
2122 }
2123
2124 /*
2125 * Page is full, so we're done here, but first fill the next page
2126 * with zeroes. The reason to do this is to ensure that slru.c's
2127 * idea of the head page is always the same as ours, which avoids
2128 * boundary problems in SimpleLruTruncate. The test in
2129 * asyncQueueIsFull() ensured that there is room to create this
2130 * page without overrunning the queue.
2131 */
2133
2134 /*
2135 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2136 * set flag to remember that we should try to advance the tail
2137 * pointer (we don't want to actually do that right here).
2138 */
2140 tryAdvanceTail = true;
2141
2142 /* And exit the loop */
2143 break;
2144 }
2145 }
2146
2147 /* Success, so update the global QUEUE_HEAD */
2149
2151
2152 return nextNotify;
2153}
2154
2155/*
2156 * SQL function to return the fraction of the notification queue currently
2157 * occupied.
2158 */
2159Datum
2161{
2162 double usage;
2163
2164 /* Advance the queue tail so we don't report a too-large result */
2166
2170
2172}
2173
2174/*
2175 * Return the fraction of the queue that is currently occupied.
2176 *
2177 * The caller must hold NotifyQueueLock in (at least) shared mode.
2178 *
2179 * Note: we measure the distance to the logical tail page, not the physical
2180 * tail page. In some sense that's wrong, but the relative position of the
2181 * physical tail is affected by details such as SLRU segment boundaries,
2182 * so that a result based on that is unpleasantly unstable.
2183 */
2184static double
2185asyncQueueUsage(void)
2186{
2187 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2189 int64 occupied = headPage - tailPage;
2190
2191 if (occupied == 0)
2192 return (double) 0; /* fast exit for common case */
2193
2194 return (double) occupied / (double) max_notify_queue_pages;
2195}
2196
2197/*
2198 * Check whether the queue is at least half full, and emit a warning if so.
2199 *
2200 * This is unlikely given the size of the queue, but possible.
2201 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2202 *
2203 * Caller must hold exclusive NotifyQueueLock.
2204 */
2205static void
2207{
2208 double fillDegree;
2209 TimestampTz t;
2210
2212 if (fillDegree < 0.5)
2213 return;
2214
2215 t = GetCurrentTimestamp();
2216
2219 {
2222
2224 {
2226 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2229 }
2230
2232 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2233 (minPid != InvalidPid ?
2234 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2235 : 0),
2236 (minPid != InvalidPid ?
2237 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2238 : 0)));
2239
2241 }
2242}
2243
2244/*
2245 * Send signals to listening backends.
2246 *
2247 * Normally we signal only backends that are interested in the notifies that
2248 * we just sent. However, that will leave idle listeners falling further and
2249 * further behind. Waken them anyway if they're far enough behind, so they'll
2250 * advance their queue position pointers, allowing the global tail to advance.
2251 *
2252 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2253 *
2254 * This is called during CommitTransaction(), so it's important for it
2255 * to have very low probability of failure.
2256 */
2257static void
2258SignalBackends(void)
2259{
2260 int count;
2261
2262 /* Can't get here without PreCommit_Notify having made the global table */
2264
2265 /* It should have set up these arrays, too */
2267
2268 /*
2269 * Identify backends that we need to signal. We don't want to send
2270 * signals while holding the NotifyQueueLock, so this part just builds a
2271 * list of target PIDs in signalPids[] and signalProcnos[].
2272 */
2273 count = 0;
2274
2276
2277 /* Scan each channel name that we notified in this transaction */
2279 {
2281 GlobalChannelEntry *entry;
2283
2284 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2285 entry = dshash_find(globalChannelTable, &key, false);
2286 if (entry == NULL)
2287 continue; /* nobody is listening */
2288
2290 entry->listenersArray);
2291
2292 /* Identify listeners that now need waking, add them to arrays */
2293 for (int j = 0; j < entry->numListeners; j++)
2294 {
2295 ProcNumber i;
2296 int32 pid;
2297 QueuePosition pos;
2298
2299 if (!listeners[j].listening)
2300 continue; /* ignore not-yet-committed listeners */
2301
2302 i = listeners[j].procNo;
2303
2305 continue; /* already signaled, no need to repeat */
2306
2307 pid = QUEUE_BACKEND_PID(i);
2308 pos = QUEUE_BACKEND_POS(i);
2309
2310 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2311 continue; /* it's fully caught up already */
2312
2313 Assert(pid != InvalidPid);
2314
2316 signalPids[count] = pid;
2317 signalProcnos[count] = i;
2318 count++;
2319 }
2320
2322 }
2323
2324 /*
2325 * Scan all listeners. Any that are not already pending wakeup must not
2326 * be interested in our notifications (else we'd have set their wakeup
2327 * flags above). Check to see if we can directly advance their queue
2328 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2329 * them anyway so they will catch up.
2330 */
2332 {
2333 int32 pid;
2334 QueuePosition pos;
2335
2337 continue;
2338
2339 /* If it's currently advancing, we should not touch it */
2341 continue;
2342
2343 pid = QUEUE_BACKEND_PID(i);
2344 pos = QUEUE_BACKEND_POS(i);
2345
2346 /*
2347 * We can directly advance the other backend's queue pointer if it's
2348 * not currently advancing (else there are race conditions), and its
2349 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2350 * it miss some older messages), and we'd not be moving the pointer
2351 * backward.
2352 */
2355 {
2356 /* We can directly advance its pointer past what we wrote */
2358 }
2361 {
2362 /* It's idle and far behind, so wake it up */
2363 Assert(pid != InvalidPid);
2364
2366 signalPids[count] = pid;
2367 signalProcnos[count] = i;
2368 count++;
2369 }
2370 }
2371
2373
2374 /* Now send signals */
2375 for (int i = 0; i < count; i++)
2376 {
2377 int32 pid = signalPids[i];
2378
2379 /*
2380 * If we are signaling our own process, no need to involve the kernel;
2381 * just set the flag directly.
2382 */
2383 if (pid == MyProcPid)
2384 {
2386 continue;
2387 }
2388
2389 /*
2390 * Note: assuming things aren't broken, a signal failure here could
2391 * only occur if the target backend exited since we released
2392 * NotifyQueueLock; which is unlikely but certainly possible. So we
2393 * just log a low-level debug message if it happens.
2394 */
2396 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2397 }
2398}
2399
2400/*
2401 * AtAbort_Notify
2402 *
2403 * This is called at transaction abort.
2404 *
2405 * Revert any staged listen/unlisten changes and clean up transaction state.
2406 * This only does anything if we abort after PreCommit_Notify has staged
2407 * some entries.
2408 */
2409void
2410AtAbort_Notify(void)
2411{
2412 /* Revert staged listen/unlisten changes */
2414
2415 /* If we're no longer listening on anything, unregister */
2418
2419 /* And clean up */
2421}
2422
2423/*
2424 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2425 *
2426 * Reassign all items in the pending lists to the parent transaction.
2427 */
2428void
2430{
2431 int my_level = GetCurrentTransactionNestLevel();
2432
2433 /* If there are actions at our nesting level, we must reparent them. */
2434 if (pendingActions != NULL &&
2435 pendingActions->nestingLevel >= my_level)
2436 {
2437 if (pendingActions->upper == NULL ||
2438 pendingActions->upper->nestingLevel < my_level - 1)
2439 {
2440 /* nothing to merge; give the whole thing to the parent */
2442 }
2443 else
2444 {
2446
2448
2449 /*
2450 * Mustn't try to eliminate duplicates here --- see queue_listen()
2451 */
2454 childPendingActions->actions);
2456 }
2457 }
2458
2459 /* If there are notifies at our nesting level, we must reparent them. */
2460 if (pendingNotifies != NULL &&
2461 pendingNotifies->nestingLevel >= my_level)
2462 {
2463 Assert(pendingNotifies->nestingLevel == my_level);
2464
2465 if (pendingNotifies->upper == NULL ||
2466 pendingNotifies->upper->nestingLevel < my_level - 1)
2467 {
2468 /* nothing to merge; give the whole thing to the parent */
2470 }
2471 else
2472 {
2473 /*
2474 * Formerly, we didn't bother to eliminate duplicates here, but
2475 * now we must, else we fall foul of "Assert(!found)", either here
2476 * or during a later attempt to build the parent-level hashtable.
2477 */
2479 ListCell *l;
2480
2482 /* Insert all the subxact's events into parent, except for dups */
2483 foreach(l, childPendingNotifies->events)
2484 {
2486
2489 }
2491 }
2492 }
2493}
2494
2495/*
2496 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2497 */
2498void
2500{
2501 int my_level = GetCurrentTransactionNestLevel();
2502
2503 /*
2504 * All we have to do is pop the stack --- the actions/notifies made in
2505 * this subxact are no longer interesting, and the space will be freed
2506 * when CurTransactionContext is recycled. We still have to free the
2507 * ActionList and NotificationList objects themselves, though, because
2508 * those are allocated in TopTransactionContext.
2509 *
2510 * Note that there might be no entries at all, or no entries for the
2511 * current subtransaction level, either because none were ever created, or
2512 * because we reentered this routine due to trouble during subxact abort.
2513 */
2514 while (pendingActions != NULL &&
2515 pendingActions->nestingLevel >= my_level)
2516 {
2518
2521 }
2522
2523 while (pendingNotifies != NULL &&
2524 pendingNotifies->nestingLevel >= my_level)
2525 {
2527
2530 }
2531}
2532
2533/*
2534 * HandleNotifyInterrupt
2535 *
2536 * Signal handler portion of interrupt handling. Let the backend know
2537 * that there's a pending notify interrupt. If we're currently reading
2538 * from the client, this will interrupt the read and
2539 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2540 */
2541void
2543{
2544 /*
2545 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2546 * you do here.
2547 */
2548
2549 /* signal that work needs to be done */
2551
2552 /* make sure the event is processed in due course */
2554}
2555
2556/*
2557 * ProcessNotifyInterrupt
2558 *
2559 * This is called if we see notifyInterruptPending set, just before
2560 * transmitting ReadyForQuery at the end of a frontend command, and
2561 * also if a notify signal occurs while reading from the frontend.
2562 * HandleNotifyInterrupt() will cause the read to be interrupted
2563 * via the process's latch, and this routine will get called.
2564 * If we are truly idle (ie, *not* inside a transaction block),
2565 * process the incoming notifies.
2566 *
2567 * If "flush" is true, force any frontend messages out immediately.
2568 * This can be false when being called at the end of a frontend command,
2569 * since we'll flush after sending ReadyForQuery.
2570 */
2571void
2572ProcessNotifyInterrupt(bool flush)
2573{
2575 return; /* not really idle */
2576
2577 /* Loop in case another signal arrives while sending messages */
2579 ProcessIncomingNotify(flush);
2580}
2581
2582
2583/*
2584 * Read all pending notifications from the queue, and deliver appropriate
2585 * ones to my frontend. Stop when we reach queue head or an uncommitted
2586 * notification.
2587 */
2588static void
2590{
2591 QueuePosition pos;
2592 QueuePosition head;
2593 Snapshot snapshot;
2594
2595 /*
2596 * Fetch current state, indicate to others that we have woken up, and that
2597 * we are in process of advancing our position.
2598 */
2600 /* Assert checks that we have a valid state entry */
2604 head = QUEUE_HEAD;
2605
2606 if (QUEUE_POS_EQUAL(pos, head))
2607 {
2608 /* Nothing to do, we have read all notifications already. */
2610 return;
2611 }
2612
2615
2616 /*----------
2617 * Get snapshot we'll use to decide which xacts are still in progress.
2618 * This is trickier than it might seem, because of race conditions.
2619 * Consider the following example:
2620 *
2621 * Backend 1: Backend 2:
2622 *
2623 * transaction starts
2624 * UPDATE foo SET ...;
2625 * NOTIFY foo;
2626 * commit starts
2627 * queue the notify message
2628 * transaction starts
2629 * LISTEN foo; -- first LISTEN in session
2630 * SELECT * FROM foo WHERE ...;
2631 * commit to clog
2632 * commit starts
2633 * add backend 2 to array of listeners
2634 * advance to queue head (this code)
2635 * commit to clog
2636 *
2637 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2638 * wasn't committed yet. Ideally we'd ensure that client 2 would
2639 * eventually get transaction 1's notify message, but there's no way
2640 * to do that; until we're in the listener array, there's no guarantee
2641 * that the notify message doesn't get removed from the queue.
2642 *
2643 * Therefore the coding technique transaction 2 is using is unsafe:
2644 * applications must commit a LISTEN before inspecting database state,
2645 * if they want to ensure they will see notifications about subsequent
2646 * changes to that state.
2647 *
2648 * What we do guarantee is that we'll see all notifications from
2649 * transactions committing after the snapshot we take here.
2650 * BecomeRegisteredListener has already added us to the listener array,
2651 * so no not-yet-committed messages can be removed from the queue
2652 * before we see them.
2653 *----------
2654 */
2655 snapshot = RegisterSnapshot(GetLatestSnapshot());
2656
2657 /*
2658 * It is possible that we fail while trying to send a message to our
2659 * frontend (for example, because of encoding conversion failure). If
2660 * that happens it is critical that we not try to send the same message
2661 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2662 * ERRORs to FATAL, causing the client connection to be closed on error.
2663 *
2664 * We used to only skip over the offending message and try to soldier on,
2665 * but it was somewhat questionable to lose a notification and give the
2666 * client an ERROR instead. A client application is not be prepared for
2667 * that and can't tell that a notification was missed. It was also not
2668 * very useful in practice because notifications are often processed while
2669 * a connection is idle and reading a message from the client, and in that
2670 * state, any error is upgraded to FATAL anyway. Closing the connection
2671 * is a clear signal to the application that it might have missed
2672 * notifications.
2673 */
2674 {
2676 bool reachedStop;
2677
2678 ExitOnAnyError = true;
2679
2680 do
2681 {
2682 /*
2683 * Process messages up to the stop position, end of page, or an
2684 * uncommitted message.
2685 *
2686 * Our stop position is what we found to be the head's position
2687 * when we entered this function. It might have changed already.
2688 * But if it has, we will receive (or have already received and
2689 * queued) another signal and come here again.
2690 *
2691 * We are not holding NotifyQueueLock here! The queue can only
2692 * extend beyond the head pointer (see above) and we leave our
2693 * backend's pointer where it is so nobody will truncate or
2694 * rewrite pages under us. Especially we don't want to hold a lock
2695 * while sending the notifications to the frontend.
2696 */
2697 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2698 } while (!reachedStop);
2699
2700 /* Update shared state */
2705
2707 }
2708
2709 /* Done with snapshot */
2710 UnregisterSnapshot(snapshot);
2711}
2712
2713/*
2714 * Fetch notifications from the shared queue, beginning at position current,
2715 * and deliver relevant ones to my frontend.
2716 *
2717 * The function returns true once we have reached the stop position or an
2718 * uncommitted notification, and false if we have finished with the page.
2719 * In other words: once it returns true there is no need to look further.
2720 * The QueuePosition *current is advanced past all processed messages.
2721 */
2722static bool
2724 QueuePosition stop,
2725 Snapshot snapshot)
2726{
2727 int64 curpage = QUEUE_POS_PAGE(*current);
2728 int slotno;
2729 char *page_buffer;
2730 bool reachedStop = false;
2731 bool reachedEndOfPage;
2732
2733 /*
2734 * We copy the entries into a local buffer to avoid holding the SLRU lock
2735 * while we transmit them to our frontend. The local buffer must be
2736 * adequately aligned.
2737 */
2739 char *local_buf_end = local_buf;
2740
2743 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2744
2745 do
2746 {
2747 QueuePosition thisentry = *current;
2749
2750 if (QUEUE_POS_EQUAL(thisentry, stop))
2751 break;
2752
2753 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2754
2755 /*
2756 * Advance *current over this message, possibly to the next page. As
2757 * noted in the comments for asyncQueueReadAllNotifications, we must
2758 * do this before possibly failing while processing the message.
2759 */
2760 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2761
2762 /* Ignore messages destined for other databases */
2763 if (qe->dboid == MyDatabaseId)
2764 {
2765 if (XidInMVCCSnapshot(qe->xid, snapshot))
2766 {
2767 /*
2768 * The source transaction is still in progress, so we can't
2769 * process this message yet. Break out of the loop, but first
2770 * back up *current so we will reprocess the message next
2771 * time. (Note: it is unlikely but not impossible for
2772 * TransactionIdDidCommit to fail, so we can't really avoid
2773 * this advance-then-back-up behavior when dealing with an
2774 * uncommitted message.)
2775 *
2776 * Note that we must test XidInMVCCSnapshot before we test
2777 * TransactionIdDidCommit, else we might return a message from
2778 * a transaction that is not yet visible to snapshots; compare
2779 * the comments at the head of heapam_visibility.c.
2780 *
2781 * Also, while our own xact won't be listed in the snapshot,
2782 * we need not check for TransactionIdIsCurrentTransactionId
2783 * because our transaction cannot (yet) have queued any
2784 * messages.
2785 */
2786 *current = thisentry;
2787 reachedStop = true;
2788 break;
2789 }
2790
2791 /*
2792 * Quick check for the case that we're not listening on any
2793 * channels, before calling TransactionIdDidCommit(). This makes
2794 * that case a little faster, but more importantly, it ensures
2795 * that if there's a bad entry in the queue for which
2796 * TransactionIdDidCommit() fails for some reason, we can skip
2797 * over it on the first LISTEN in a session, and not get stuck on
2798 * it indefinitely. (This is a little trickier than it looks: it
2799 * works because BecomeRegisteredListener runs this code before we
2800 * have made the first entry in localChannelTable.)
2801 */
2803 continue;
2804
2805 if (TransactionIdDidCommit(qe->xid))
2806 {
2807 memcpy(local_buf_end, qe, qe->length);
2808 local_buf_end += qe->length;
2809 }
2810 else
2811 {
2812 /*
2813 * The source transaction aborted or crashed, so we just
2814 * ignore its notifications.
2815 */
2816 }
2817 }
2818
2819 /* Loop back if we're not at end of page */
2820 } while (!reachedEndOfPage);
2821
2822 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2824
2825 /*
2826 * Now that we have let go of the SLRU bank lock, send the notifications
2827 * to our backend
2828 */
2830 for (char *p = local_buf; p < local_buf_end;)
2831 {
2833
2834 /* qe->data is the null-terminated channel name */
2835 char *channel = qe->data;
2836
2837 if (IsListeningOn(channel))
2838 {
2839 /* payload follows channel name */
2840 char *payload = qe->data + strlen(channel) + 1;
2841
2842 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2843 }
2844
2845 p += qe->length;
2846 }
2847
2848 if (QUEUE_POS_EQUAL(*current, stop))
2849 reachedStop = true;
2850
2851 return reachedStop;
2852}
2853
2854/*
2855 * Advance the shared queue tail variable to the minimum of all the
2856 * per-backend tail pointers. Truncate pg_notify space if possible.
2857 *
2858 * This is (usually) called during CommitTransaction(), so it's important for
2859 * it to have very low probability of failure.
2860 */
2861static void
2863{
2864 QueuePosition min;
2867 int64 boundary;
2868
2869 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2871
2872 /*
2873 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2874 * (ie, exactly match at least one backend's queue position), so it must
2875 * be updated atomically with the actual computation. Since v13, we could
2876 * get away with not doing it like that, but it seems prudent to keep it
2877 * so.
2878 *
2879 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2880 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2881 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2882 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2883 * there are pages we can truncate but haven't yet finished doing so.
2884 *
2885 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2886 * performing SimpleLruTruncate. This is OK because no backend will try
2887 * to access the pages we are in the midst of truncating.
2888 */
2890 min = QUEUE_HEAD;
2892 {
2894 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2895 }
2896 QUEUE_TAIL = min;
2899
2900 /*
2901 * We can truncate something if the global tail advanced across an SLRU
2902 * segment boundary.
2903 *
2904 * XXX it might be better to truncate only once every several segments, to
2905 * reduce the number of directory scans.
2906 */
2909 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2910 {
2911 /*
2912 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2913 * release the lock again.
2914 */
2916
2920 }
2921
2923}
2924
2925/*
2926 * AsyncNotifyFreezeXids
2927 *
2928 * Prepare the async notification queue for CLOG truncation by freezing
2929 * transaction IDs that are about to become inaccessible.
2930 *
2931 * This function is called by VACUUM before advancing datfrozenxid. It scans
2932 * the notification queue and replaces XIDs that would become inaccessible
2933 * after CLOG truncation with special markers:
2934 * - Committed transactions are set to FrozenTransactionId
2935 * - Aborted/crashed transactions are set to InvalidTransactionId
2936 *
2937 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2938 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2939 * (or it would have held back newFrozenXid through ProcArray).
2940 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2941 * either aborted explicitly or crashed, and we can safely mark it invalid.
2942 */
2943void
2945{
2946 QueuePosition pos;
2947 QueuePosition head;
2948 int64 curpage = -1;
2949 int slotno = -1;
2950 char *page_buffer = NULL;
2951 bool page_dirty = false;
2952
2953 /*
2954 * Acquire locks in the correct order to avoid deadlocks. As per the
2955 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2956 * bank locks.
2957 *
2958 * We only need SHARED mode since we're just reading the head/tail
2959 * positions, not modifying them.
2960 */
2963
2964 pos = QUEUE_TAIL;
2965 head = QUEUE_HEAD;
2966
2967 /* Release NotifyQueueLock early, we only needed to read the positions */
2969
2970 /*
2971 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2972 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2973 * we're working.
2974 */
2975 while (!QUEUE_POS_EQUAL(pos, head))
2976 {
2978 TransactionId xid;
2979 int64 pageno = QUEUE_POS_PAGE(pos);
2980 int offset = QUEUE_POS_OFFSET(pos);
2981
2982 /* If we need a different page, release old lock and get new one */
2983 if (pageno != curpage)
2984 {
2985 LWLock *lock;
2986
2987 /* Release previous page if any */
2988 if (slotno >= 0)
2989 {
2990 if (page_dirty)
2991 {
2992 NotifyCtl->shared->page_dirty[slotno] = true;
2993 page_dirty = false;
2994 }
2996 }
2997
2998 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3000 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3002 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3003 curpage = pageno;
3004 }
3005
3006 qe = (AsyncQueueEntry *) (page_buffer + offset);
3007 xid = qe->xid;
3008
3009 if (TransactionIdIsNormal(xid) &&
3011 {
3012 if (TransactionIdDidCommit(xid))
3013 {
3014 qe->xid = FrozenTransactionId;
3015 page_dirty = true;
3016 }
3017 else
3018 {
3019 qe->xid = InvalidTransactionId;
3020 page_dirty = true;
3021 }
3022 }
3023
3024 /* Advance to next entry */
3025 asyncQueueAdvance(&pos, qe->length);
3026 }
3027
3028 /* Release final page lock if we acquired one */
3029 if (slotno >= 0)
3030 {
3031 if (page_dirty)
3032 NotifyCtl->shared->page_dirty[slotno] = true;
3034 }
3035
3037}
3038
3039/*
3040 * ProcessIncomingNotify
3041 *
3042 * Scan the queue for arriving notifications and report them to the front
3043 * end. The notifications might be from other sessions, or our own;
3044 * there's no need to distinguish here.
3045 *
3046 * If "flush" is true, force any frontend messages out immediately.
3047 *
3048 * NOTE: since we are outside any transaction, we must create our own.
3049 */
3050static void
3051ProcessIncomingNotify(bool flush)
3052{
3053 /* We *must* reset the flag */
3054 notifyInterruptPending = false;
3055
3056 /* Do nothing else if we aren't actively listening */
3058 return;
3059
3060 if (Trace_notify)
3061 elog(DEBUG1, "ProcessIncomingNotify");
3062
3063 set_ps_display("notify interrupt");
3064
3065 /*
3066 * We must run asyncQueueReadAllNotifications inside a transaction, else
3067 * bad things happen if it gets an error.
3068 */
3070
3072
3074
3075 /*
3076 * If this isn't an end-of-command case, we must flush the notify messages
3077 * to ensure frontend gets them promptly.
3078 */
3079 if (flush)
3080 pq_flush();
3081
3082 set_ps_display("idle");
3083
3084 if (Trace_notify)
3085 elog(DEBUG1, "ProcessIncomingNotify: done");
3086}
3087
3088/*
3089 * Send NOTIFY message to my front end.
3090 */
3091void
3092NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3093{
3095 {
3097
3099 pq_sendint32(&buf, srcPid);
3100 pq_sendstring(&buf, channel);
3101 pq_sendstring(&buf, payload);
3103
3104 /*
3105 * NOTE: we do not do pq_flush() here. Some level of caller will
3106 * handle it later, allowing this message to be combined into a packet
3107 * with other ones.
3108 */
3109 }
3110 else
3111 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3112}
3113
3114/* Does pendingNotifies include a match for the given event? */
3115static bool
3117{
3118 if (pendingNotifies == NULL)
3119 return false;
3120
3122 {
3123 /* Use the hash table to probe for a match */
3125 &n,
3126 HASH_FIND,
3127 NULL))
3128 return true;
3129 }
3130 else
3131 {
3132 /* Must scan the event list */
3133 ListCell *l;
3134
3135 foreach(l, pendingNotifies->events)
3136 {
3138
3139 if (n->channel_len == oldn->channel_len &&
3140 n->payload_len == oldn->payload_len &&
3141 memcmp(n->data, oldn->data,
3142 n->channel_len + n->payload_len + 2) == 0)
3143 return true;
3144 }
3145 }
3146
3147 return false;
3148}
3149
3150/*
3151 * Add a notification event to a pre-existing pendingNotifies list.
3152 *
3153 * Because pendingNotifies->events is already nonempty, this works
3154 * correctly no matter what CurrentMemoryContext is.
3155 */
3156static void
3158{
3160
3161 /* Create the hash tables if it's time to */
3164 {
3166 ListCell *l;
3167
3168 /* Create the hash table */
3169 hash_ctl.keysize = sizeof(Notification *);
3170 hash_ctl.entrysize = sizeof(struct NotificationHash);
3175 hash_create("Pending Notifies",
3176 256L,
3177 &hash_ctl,
3179
3180 /* Create the unique channel name table */
3182 hash_ctl.keysize = NAMEDATALEN;
3183 hash_ctl.entrysize = sizeof(ChannelName);
3186 hash_create("Pending Notify Channel Names",
3187 64L,
3188 &hash_ctl,
3190
3191 /* Insert all the already-existing events */
3192 foreach(l, pendingNotifies->events)
3193 {
3195 char *channel = oldn->data;
3196 bool found;
3197
3199 &oldn,
3200 HASH_ENTER,
3201 &found);
3202 Assert(!found);
3203
3204 /* Add channel name to uniqueChannelHash; might be there already */
3206 channel,
3207 HASH_ENTER,
3208 NULL);
3209 }
3210 }
3211
3212 /* Add new event to the list, in order */
3214
3215 /* Add event to the hash tables if needed */
3217 {
3218 char *channel = n->data;
3219 bool found;
3220
3222 &n,
3223 HASH_ENTER,
3224 &found);
3225 Assert(!found);
3226
3227 /* Add channel name to uniqueChannelHash; might be there already */
3229 channel,
3230 HASH_ENTER,
3231 NULL);
3232 }
3233}
3234
3235/*
3236 * notification_hash: hash function for notification hash table
3237 *
3238 * The hash "keys" are pointers to Notification structs.
3239 */
3240static uint32
3241notification_hash(const void *key, Size keysize)
3242{
3243 const Notification *k = *(const Notification *const *) key;
3244
3245 Assert(keysize == sizeof(Notification *));
3246 /* We don't bother to include the payload's trailing null in the hash */
3247 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3248 k->channel_len + k->payload_len + 1));
3249}
3250
3251/*
3252 * notification_match: match function to use with notification_hash
3253 */
3254static int
3255notification_match(const void *key1, const void *key2, Size keysize)
3256{
3257 const Notification *k1 = *(const Notification *const *) key1;
3258 const Notification *k2 = *(const Notification *const *) key2;
3259
3260 Assert(keysize == sizeof(Notification *));
3261 if (k1->channel_len == k2->channel_len &&
3262 k1->payload_len == k2->payload_len &&
3263 memcmp(k1->data, k2->data,
3264 k1->channel_len + k1->payload_len + 2) == 0)
3265 return 0; /* equal */
3266 return 1; /* not equal */
3267}
3268
3269/* Clear the pendingActions and pendingNotifies lists. */
3270static void
3272{
3273 /*
3274 * Everything's allocated in either TopTransactionContext or the context
3275 * for the subtransaction to which it corresponds. So, there's nothing to
3276 * do here except reset the pointers; the space will be reclaimed when the
3277 * contexts are deleted.
3278 */
3281 /* Also clear pendingListenActions, which is derived from pendingActions */
3283}
3284
3285/*
3286 * GUC check_hook for notify_buffers
3287 */
3288bool
3289check_notify_buffers(int *newval, void **extra, GucSource source)
3290{
3291 return check_slru_buffers("notify_buffers", newval);
3292}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2259
static double asyncQueueUsage(void)
Definition async.c:2186
#define MIN_HASHABLE_NOTIFIES
Definition async.c:513
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1523
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:1999
#define QUEUE_FIRST_LISTENER
Definition async.c:351
#define QUEUE_POS_MAX(x, y)
Definition async.c:259
static bool tryAdvanceTail
Definition async.c:564
void HandleNotifyInterrupt(void)
Definition async.c:2543
static void BecomeRegisteredListener(void)
Definition async.c:1422
static void asyncQueueAdvanceTail(void)
Definition async.c:2863
int max_notify_queue_pages
Definition async.c:570
static ActionList * pendingActions
Definition async.c:444
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1713
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:357
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3242
void Async_UnlistenAll(void)
Definition async.c:1067
static int32 * signalPids
Definition async.c:560
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3093
void AtCommit_Notify(void)
Definition async.c:1370
#define QUEUE_POS_MIN(x, y)
Definition async.c:253
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1626
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2573
ListenActionKind
Definition async.c:425
@ LISTEN_LISTEN
Definition async.c:426
@ LISTEN_UNLISTEN_ALL
Definition async.c:428
@ LISTEN_UNLISTEN
Definition async.c:427
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3117
#define QUEUE_BACKEND_POS(i)
Definition async.c:355
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:663
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:378
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3256
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:650
#define SET_QUEUE_POS(x, y, z)
Definition async.c:240
static ProcNumber * signalProcnos
Definition async.c:561
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2724
static void ProcessIncomingNotify(bool flush)
Definition async.c:3052
static void asyncQueueReadAllNotifications(void)
Definition async.c:2590
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1139
#define QUEUE_POS_OFFSET(x)
Definition async.c:238
static QueuePosition queueHeadAfterWrite
Definition async.c:553
bool Trace_notify
Definition async.c:567
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2035
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3272
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1087
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:852
static NotificationList * pendingNotifies
Definition async.c:520
#define AsyncQueueEntryEmptySize
Definition async.c:226
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3158
static AsyncQueueControl * asyncQueueControl
Definition async.c:346
static bool unlistenExitRegistered
Definition async.c:541
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:628
static dsa_area * globalChannelDSA
Definition async.c:401
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1966
#define QUEUE_TAIL
Definition async.c:349
void AtAbort_Notify(void)
Definition async.c:2411
#define QUEUE_POS_PAGE(x)
Definition async.c:237
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1686
void PreCommit_Notify(void)
Definition async.c:1177
#define QUEUE_CLEANUP_DELAY
Definition async.c:281
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1656
static void asyncQueueFillWarning(void)
Definition async.c:2207
#define QUEUE_BACKEND_PID(i)
Definition async.c:352
static void CleanupListenersOnExit(void)
Definition async.c:1838
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:638
Size AsyncShmemSize(void)
Definition async.c:776
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:367
void Async_Unlisten(const char *channel)
Definition async.c:1049
static HTAB * pendingListenActions
Definition async.c:466
void Async_Listen(const char *channel)
Definition async.c:1035
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:200
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:249
static void initGlobalChannelTable(void)
Definition async.c:677
#define NotifyCtl
Definition async.c:364
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:356
static HTAB * localChannelTable
Definition async.c:408
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:618
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:988
#define QUEUEALIGN(len)
Definition async.c:224
static bool amRegisteredListener
Definition async.c:544
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:265
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:354
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:353
void AtSubAbort_Notify(void)
Definition async.c:2500
void AtPrepare_Notify(void)
Definition async.c:1152
#define QUEUE_PAGESIZE
Definition async.c:365
void AtSubCommit_Notify(void)
Definition async.c:2430
static bool asyncQueueIsFull(void)
Definition async.c:1951
#define QUEUE_HEAD
Definition async.c:348
void AsyncShmemInit(void)
Definition async.c:793
static void initLocalChannelTable(void)
Definition async.c:728
PendingListenAction
Definition async.c:455
@ PENDING_UNLISTEN
Definition async.c:457
@ PENDING_LISTEN
Definition async.c:456
static dshash_table * globalChannelTable
Definition async.c:400
static void asyncQueueUnregister(void)
Definition async.c:1908
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2161
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:246
#define LocalChannelTableIsEmpty()
Definition async.c:411
static void initPendingListenActions(void)
Definition async.c:754
static QueuePosition queueHeadBeforeWrite
Definition async.c:552
static bool IsListeningOn(const char *channel)
Definition async.c:1895
void Async_Notify(const char *channel, const char *payload)
Definition async.c:886
volatile sig_atomic_t notifyInterruptPending
Definition async.c:538
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2945
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3290
#define QUEUE_STOP_PAGE
Definition async.c:350
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:885
int64_t int64
Definition c.h:555
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:492
int32_t int32
Definition c.h:554
uint16_t uint16
Definition c.h:557
uint32_t uint32
Definition c.h:558
uint32 TransactionId
Definition c.h:678
size_t Size
Definition c.h:631
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:592
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:543
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:560
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:640
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:392
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:369
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:272
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:749
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
Definition dshash.c:435
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:659
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:208
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:574
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:759
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
uint32 dshash_hash
Definition dshash.h:30
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:28
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define INFO
Definition elog.h:34
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:47
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
bool ExitOnAnyError
Definition globals.c:123
int notify_buffers
Definition globals.c:164
struct Latch * MyLatch
Definition globals.c:63
Oid MyDatabaseId
Definition globals.c:94
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:96
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_COMPARE
Definition hsearch.h:99
#define HASH_FUNCTION
Definition hsearch.h:98
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:343
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:93
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:232
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:286
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:252
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition slru.c:630
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1816
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1769
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition slru.c:527
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:375
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1433
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:198
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:355
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:160
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:440
int nestingLevel
Definition async.c:439
struct ActionList * upper
Definition async.c:441
dshash_table_handle globalChannelTableDSH
Definition async.c:341
TimestampTz lastQueueFillWarn
Definition async.c:339
dsa_handle globalChannelTableDSA
Definition async.c:340
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:220
char channel[NAMEDATALEN]
Definition async.c:528
dsa_pointer listenersArray
Definition async.c:395
int allocatedListeners
Definition async.c:397
GlobalChannelKey key
Definition async.c:394
char channel[NAMEDATALEN]
Definition async.c:383
Size keysize
Definition hsearch.h:75
Definition pg_list.h:54
bool listening
Definition async.c:389
Notification * event
Definition async.c:517
List * uniqueChannelNames
Definition async.c:508
HTAB * uniqueChannelHash
Definition async.c:509
HTAB * hashtab
Definition async.c:507
List * events
Definition async.c:506
struct NotificationList * upper
Definition async.c:510
uint16 payload_len
Definition async.c:498
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:500
uint16 channel_len
Definition async.c:497
PendingListenAction action
Definition async.c:463
char channel[NAMEDATALEN]
Definition async.c:462
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:443
char * text_to_cstring(const text *t)
Definition varlena.c:215
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011
int GetCurrentTransactionNestLevel(void)
Definition xact.c:930
void StartTransactionCommand(void)
Definition xact.c:3080
void CommitTransactionCommand(void)
Definition xact.c:3178
TransactionId GetCurrentTransactionId(void)
Definition xact.c:455

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 513 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 200 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 364 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 353 of file async.c.

◆ QUEUE_BACKEND_IS_ADVANCING

#define QUEUE_BACKEND_IS_ADVANCING (   i)    (asyncQueueControl->backend[i].isAdvancing)

Definition at line 357 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 352 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 355 of file async.c.

◆ QUEUE_BACKEND_WAKEUP_PENDING

#define QUEUE_BACKEND_WAKEUP_PENDING (   i)    (asyncQueueControl->backend[i].wakeupPending)

Definition at line 356 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 281 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 351 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 367 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 348 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 354 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 365 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 246 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 249 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
int y
Definition isn.c:76
int x
Definition isn.c:75

Definition at line 259 of file async.c.

260 : \
261 (x).page != (y).page ? (x) : \
262 (x).offset > (y).offset ? (x) : (y))

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 253 of file async.c.

254 : \
255 (x).page != (y).page ? (y) : \
256 (x).offset < (y).offset ? (x) : (y))

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 238 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 237 of file async.c.

◆ QUEUE_POS_PRECEDES

#define QUEUE_POS_PRECEDES (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) || \
((x).page == (y).page && (x).offset < (y).offset))

Definition at line 265 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 350 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 349 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 224 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 240 of file async.c.

241 { \
242 (x).page = (y); \
243 (x).offset = (z); \
244 } while (0)

Typedef Documentation

◆ ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ ChannelName

◆ GlobalChannelEntry

◆ GlobalChannelKey

◆ ListenerEntry

◆ Notification

◆ NotificationList

◆ PendingListenEntry

◆ QueueBackendStatus

◆ QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 424 of file async.c.

◆ PendingListenAction

Enumerator
PENDING_LISTEN 
PENDING_UNLISTEN 

Definition at line 454 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 3158 of file async.c.

3159{
3161
3162 /* Create the hash tables if it's time to */
3165 {
3167 ListCell *l;
3168
3169 /* Create the hash table */
3170 hash_ctl.keysize = sizeof(Notification *);
3171 hash_ctl.entrysize = sizeof(struct NotificationHash);
3176 hash_create("Pending Notifies",
3177 256L,
3178 &hash_ctl,
3180
3181 /* Create the unique channel name table */
3183 hash_ctl.keysize = NAMEDATALEN;
3184 hash_ctl.entrysize = sizeof(ChannelName);
3187 hash_create("Pending Notify Channel Names",
3188 64L,
3189 &hash_ctl,
3191
3192 /* Insert all the already-existing events */
3193 foreach(l, pendingNotifies->events)
3194 {
3196 char *channel = oldn->data;
3197 bool found;
3198
3200 &oldn,
3201 HASH_ENTER,
3202 &found);
3203 Assert(!found);
3204
3205 /* Add channel name to uniqueChannelHash; might be there already */
3207 channel,
3208 HASH_ENTER,
3209 NULL);
3210 }
3211 }
3212
3213 /* Add new event to the list, in order */
3215
3216 /* Add event to the hash tables if needed */
3218 {
3219 char *channel = n->data;
3220 bool found;
3221
3223 &n,
3224 HASH_ENTER,
3225 &found);
3226 Assert(!found);
3227
3228 /* Add channel name to uniqueChannelHash; might be there already */
3230 channel,
3231 HASH_ENTER,
3232 NULL);
3233 }
3234}

References Assert, CurTransactionContext, Notification::data, NotificationList::events, fb(), HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), HASH_STRINGS, NotificationList::hashtab, lappend(), lfirst, list_length(), MIN_HASHABLE_NOTIFIES, NAMEDATALEN, NIL, notification_hash(), notification_match(), pendingNotifies, and NotificationList::uniqueChannelHash.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ ApplyPendingListenActions()

static void ApplyPendingListenActions ( bool  isCommit)
static

Definition at line 1713 of file async.c.

1714{
1716 PendingListenEntry *pending;
1717
1718 /* Quick exit if nothing to do */
1720 return;
1721
1722 /* We made a globalChannelTable before building pendingListenActions */
1723 if (globalChannelTable == NULL)
1724 elog(PANIC, "global channel table missing post-commit/abort");
1725
1726 /* For each staged action ... */
1728 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1729 {
1731 GlobalChannelEntry *entry;
1732 bool removeLocal = true;
1733 bool foundListener = false;
1734
1735 /*
1736 * Find the global entry for this channel. If isCommit, it had better
1737 * exist (it was created in PreCommit). In an abort, it might not
1738 * exist, in which case we are not listening and should discard any
1739 * local entry that PreCommit may have managed to create.
1740 */
1741 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1742 entry = dshash_find(globalChannelTable, &key, true);
1743 if (entry != NULL)
1744 {
1745 /* Scan entry to find the ListenerEntry for this backend */
1747
1750
1751 for (int i = 0; i < entry->numListeners; i++)
1752 {
1753 if (listeners[i].procNo != MyProcNumber)
1754 continue;
1755 foundListener = true;
1756 if (isCommit)
1757 {
1758 if (pending->action == PENDING_LISTEN)
1759 {
1760 /*
1761 * LISTEN being committed: set listening=true.
1762 * localChannelTable entry was created during
1763 * PreCommit and should be kept.
1764 */
1765 listeners[i].listening = true;
1766 removeLocal = false;
1767 }
1768 else
1769 {
1770 /*
1771 * UNLISTEN being committed: remove pre-allocated
1772 * entries from both tables.
1773 */
1775 }
1776 }
1777 else
1778 {
1779 /*
1780 * Note: this part is reachable only if the transaction
1781 * aborts after PreCommit_Notify() has made some
1782 * pendingListenActions entries, so it's pretty hard to
1783 * test.
1784 */
1785 if (!listeners[i].listening)
1786 {
1787 /*
1788 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1789 * and we weren't listening before, so remove
1790 * pre-allocated entries from both tables.
1791 */
1793 }
1794 else
1795 {
1796 /*
1797 * We're aborting, but the previous state was that
1798 * we're listening, so keep localChannelTable entry.
1799 */
1800 removeLocal = false;
1801 }
1802 }
1803 break; /* there shouldn't be another match */
1804 }
1805
1806 /* We might have already released the entry by removing it */
1807 if (entry != NULL)
1809 }
1810
1811 /*
1812 * If we're committing a LISTEN action, we should have found a
1813 * matching ListenerEntry, but otherwise it's okay if we didn't.
1814 */
1815 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1816 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1817 pending->channel, MyProcNumber);
1818
1819 /*
1820 * If we did not find a globalChannelTable entry for our backend, or
1821 * if we are unlistening, remove any localChannelTable entry that may
1822 * exist. (Note in particular that this cleans up if we created a
1823 * localChannelTable entry and then failed while trying to create a
1824 * globalChannelTable entry.)
1825 */
1828 HASH_REMOVE, NULL);
1829 }
1830}

References PendingListenEntry::action, PendingListenEntry::channel, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_REMOVE, hash_search(), hash_seq_init(), hash_seq_search(), i, GlobalChannelEntry::listenersArray, ListenerEntry::listening, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PANIC, PENDING_LISTEN, pendingListenActions, and RemoveListenerFromChannel().

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char channel)

Definition at line 1035 of file async.c.

1036{
1037 if (Trace_notify)
1038 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1039
1040 queue_listen(LISTEN_LISTEN, channel);
1041}

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char channel,
const char payload 
)

Definition at line 886 of file async.c.

887{
888 int my_level = GetCurrentTransactionNestLevel();
889 size_t channel_len;
890 size_t payload_len;
891 Notification *n;
892 MemoryContext oldcontext;
893
894 if (IsParallelWorker())
895 elog(ERROR, "cannot send notifications from a parallel worker");
896
897 if (Trace_notify)
898 elog(DEBUG1, "Async_Notify(%s)", channel);
899
900 channel_len = channel ? strlen(channel) : 0;
901 payload_len = payload ? strlen(payload) : 0;
902
903 /* a channel name must be specified */
904 if (channel_len == 0)
907 errmsg("channel name cannot be empty")));
908
909 /* enforce length limits */
910 if (channel_len >= NAMEDATALEN)
913 errmsg("channel name too long")));
914
915 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
918 errmsg("payload string too long")));
919
920 /*
921 * We must construct the Notification entry, even if we end up not using
922 * it, in order to compare it cheaply to existing list entries.
923 *
924 * The notification list needs to live until end of transaction, so store
925 * it in the transaction context.
926 */
928
930 channel_len + payload_len + 2);
931 n->channel_len = channel_len;
932 n->payload_len = payload_len;
933 strcpy(n->data, channel);
934 if (payload)
935 strcpy(n->data + channel_len + 1, payload);
936 else
937 n->data[channel_len + 1] = '\0';
938
939 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
940 {
942
943 /*
944 * First notify event in current (sub)xact. Note that we allocate the
945 * NotificationList in TopTransactionContext; the nestingLevel might
946 * get changed later by AtSubCommit_Notify.
947 */
950 sizeof(NotificationList));
951 notifies->nestingLevel = my_level;
952 notifies->events = list_make1(n);
953 /* We certainly don't need a hashtable yet */
954 notifies->hashtab = NULL;
955 /* We won't build uniqueChannelNames/Hash till later, either */
956 notifies->uniqueChannelNames = NIL;
957 notifies->uniqueChannelHash = NULL;
958 notifies->upper = pendingNotifies;
960 }
961 else
962 {
963 /* Now check for duplicates */
965 {
966 /* It's a dup, so forget it */
967 pfree(n);
968 MemoryContextSwitchTo(oldcontext);
969 return;
970 }
971
972 /* Append more events to existing list */
974 }
975
976 MemoryContextSwitchTo(oldcontext);
977}

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, fb(), GetCurrentTransactionNestLevel(), IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NIL, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char channel)

Definition at line 1049 of file async.c.

1050{
1051 if (Trace_notify)
1052 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1053
1054 /* If we couldn't possibly be listening, no need to queue anything */
1056 return;
1057
1058 queue_listen(LISTEN_UNLISTEN, channel);
1059}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 1067 of file async.c.

1068{
1069 if (Trace_notify)
1070 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1071
1072 /* If we couldn't possibly be listening, no need to queue anything */
1074 return;
1075
1077}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 1139 of file async.c.

1140{
1143}

References asyncQueueUnregister(), and CleanupListenersOnExit().

Referenced by BecomeRegisteredListener().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 3117 of file async.c.

3118{
3119 if (pendingNotifies == NULL)
3120 return false;
3121
3123 {
3124 /* Use the hash table to probe for a match */
3126 &n,
3127 HASH_FIND,
3128 NULL))
3129 return true;
3130 }
3131 else
3132 {
3133 /* Must scan the event list */
3134 ListCell *l;
3135
3136 foreach(l, pendingNotifies->events)
3137 {
3139
3140 if (n->channel_len == oldn->channel_len &&
3141 n->payload_len == oldn->payload_len &&
3142 memcmp(n->data, oldn->data,
3143 n->channel_len + n->payload_len + 2) == 0)
3144 return true;
3145 }
3146 }
3147
3148 return false;
3149}

References Notification::channel_len, Notification::data, NotificationList::events, fb(), HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ AsyncNotifyFreezeXids()

void AsyncNotifyFreezeXids ( TransactionId  newFrozenXid)

Definition at line 2945 of file async.c.

2946{
2947 QueuePosition pos;
2948 QueuePosition head;
2949 int64 curpage = -1;
2950 int slotno = -1;
2951 char *page_buffer = NULL;
2952 bool page_dirty = false;
2953
2954 /*
2955 * Acquire locks in the correct order to avoid deadlocks. As per the
2956 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2957 * bank locks.
2958 *
2959 * We only need SHARED mode since we're just reading the head/tail
2960 * positions, not modifying them.
2961 */
2964
2965 pos = QUEUE_TAIL;
2966 head = QUEUE_HEAD;
2967
2968 /* Release NotifyQueueLock early, we only needed to read the positions */
2970
2971 /*
2972 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2973 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2974 * we're working.
2975 */
2976 while (!QUEUE_POS_EQUAL(pos, head))
2977 {
2979 TransactionId xid;
2980 int64 pageno = QUEUE_POS_PAGE(pos);
2981 int offset = QUEUE_POS_OFFSET(pos);
2982
2983 /* If we need a different page, release old lock and get new one */
2984 if (pageno != curpage)
2985 {
2986 LWLock *lock;
2987
2988 /* Release previous page if any */
2989 if (slotno >= 0)
2990 {
2991 if (page_dirty)
2992 {
2993 NotifyCtl->shared->page_dirty[slotno] = true;
2994 page_dirty = false;
2995 }
2997 }
2998
2999 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3001 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3003 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3004 curpage = pageno;
3005 }
3006
3007 qe = (AsyncQueueEntry *) (page_buffer + offset);
3008 xid = qe->xid;
3009
3010 if (TransactionIdIsNormal(xid) &&
3012 {
3013 if (TransactionIdDidCommit(xid))
3014 {
3015 qe->xid = FrozenTransactionId;
3016 page_dirty = true;
3017 }
3018 else
3019 {
3020 qe->xid = InvalidTransactionId;
3021 page_dirty = true;
3022 }
3023 }
3024
3025 /* Advance to next entry */
3026 asyncQueueAdvance(&pos, qe->length);
3027 }
3028
3029 /* Release final page lock if we acquired one */
3030 if (slotno >= 0)
3031 {
3032 if (page_dirty)
3033 NotifyCtl->shared->page_dirty[slotno] = true;
3035 }
3036
3038}

References asyncQueueAdvance(), fb(), FrozenTransactionId, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruGetBankLock(), SimpleLruReadPage(), TransactionIdDidCommit(), TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by vac_truncate_clog().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 2035 of file async.c.

2036{
2039 int64 pageno;
2040 int offset;
2041 int slotno;
2043
2044 /*
2045 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2046 * memory upon exiting. The reason for this is that if we have to advance
2047 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2048 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2049 * subsequent insertions would try to put entries into a page that slru.c
2050 * thinks doesn't exist yet.) So, use a local position variable. Note
2051 * that if we do fail, any already-inserted queue entries are forgotten;
2052 * this is okay, since they'd be useless anyway after our transaction
2053 * rolls back.
2054 */
2056
2057 /*
2058 * If this is the first write since the postmaster started, we need to
2059 * initialize the first page of the async SLRU. Otherwise, the current
2060 * page should be initialized already, so just fetch it.
2061 */
2062 pageno = QUEUE_POS_PAGE(queue_head);
2064
2065 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2067
2070 else
2071 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2073
2074 /* Note we mark the page dirty before writing in it */
2075 NotifyCtl->shared->page_dirty[slotno] = true;
2076
2077 while (nextNotify != NULL)
2078 {
2080
2081 /* Construct a valid queue entry in local variable qe */
2083
2084 offset = QUEUE_POS_OFFSET(queue_head);
2085
2086 /* Check whether the entry really fits on the current page */
2087 if (offset + qe.length <= QUEUE_PAGESIZE)
2088 {
2089 /* OK, so advance nextNotify past this item */
2091 }
2092 else
2093 {
2094 /*
2095 * Write a dummy entry to fill up the page. Actually readers will
2096 * only check dboid and since it won't match any reader's database
2097 * OID, they will ignore this entry and move on.
2098 */
2099 qe.length = QUEUE_PAGESIZE - offset;
2100 qe.dboid = InvalidOid;
2102 qe.data[0] = '\0'; /* empty channel */
2103 qe.data[1] = '\0'; /* empty payload */
2104 }
2105
2106 /* Now copy qe into the shared buffer page */
2107 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2108 &qe,
2109 qe.length);
2110
2111 /* Advance queue_head appropriately, and detect if page is full */
2112 if (asyncQueueAdvance(&(queue_head), qe.length))
2113 {
2114 LWLock *lock;
2115
2116 pageno = QUEUE_POS_PAGE(queue_head);
2117 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2118 if (lock != prevlock)
2119 {
2122 prevlock = lock;
2123 }
2124
2125 /*
2126 * Page is full, so we're done here, but first fill the next page
2127 * with zeroes. The reason to do this is to ensure that slru.c's
2128 * idea of the head page is always the same as ours, which avoids
2129 * boundary problems in SimpleLruTruncate. The test in
2130 * asyncQueueIsFull() ensured that there is room to create this
2131 * page without overrunning the queue.
2132 */
2134
2135 /*
2136 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2137 * set flag to remember that we should try to advance the tail
2138 * pointer (we don't want to actually do that right here).
2139 */
2141 tryAdvanceTail = true;
2142
2143 /* And exit the loop */
2144 break;
2145 }
2146 }
2147
2148 /* Success, so update the global QUEUE_HEAD */
2150
2152
2153 return nextNotify;
2154}

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), NotificationList::events, fb(), InvalidOid, InvalidTransactionId, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1966 of file async.c.

1967{
1968 int64 pageno = QUEUE_POS_PAGE(*position);
1969 int offset = QUEUE_POS_OFFSET(*position);
1970 bool pageJump = false;
1971
1972 /*
1973 * Move to the next writing position: First jump over what we have just
1974 * written or read.
1975 */
1976 offset += entryLength;
1977 Assert(offset <= QUEUE_PAGESIZE);
1978
1979 /*
1980 * In a second step check if another entry can possibly be written to the
1981 * page. If so, stay here, we have reached the next position. If not, then
1982 * we need to move on to the next page.
1983 */
1985 {
1986 pageno++;
1987 offset = 0;
1988 pageJump = true;
1989 }
1990
1991 SET_QUEUE_POS(*position, pageno, offset);
1992 return pageJump;
1993}

References Assert, AsyncQueueEntryEmptySize, fb(), QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by AsyncNotifyFreezeXids(), asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2863 of file async.c.

2864{
2865 QueuePosition min;
2868 int64 boundary;
2869
2870 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2872
2873 /*
2874 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2875 * (ie, exactly match at least one backend's queue position), so it must
2876 * be updated atomically with the actual computation. Since v13, we could
2877 * get away with not doing it like that, but it seems prudent to keep it
2878 * so.
2879 *
2880 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2881 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2882 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2883 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2884 * there are pages we can truncate but haven't yet finished doing so.
2885 *
2886 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2887 * performing SimpleLruTruncate. This is OK because no backend will try
2888 * to access the pages we are in the midst of truncating.
2889 */
2891 min = QUEUE_HEAD;
2893 {
2895 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2896 }
2897 QUEUE_TAIL = min;
2900
2901 /*
2902 * We can truncate something if the global tail advanced across an SLRU
2903 * segment boundary.
2904 *
2905 * XXX it might be better to truncate only once every several segments, to
2906 * reduce the number of directory scans.
2907 */
2910 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2911 {
2912 /*
2913 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2914 * release the lock again.
2915 */
2917
2921 }
2922
2924}

References Assert, asyncQueuePagePrecedes(), fb(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 2207 of file async.c.

2208{
2209 double fillDegree;
2210 TimestampTz t;
2211
2213 if (fillDegree < 0.5)
2214 return;
2215
2216 t = GetCurrentTimestamp();
2217
2220 {
2223
2225 {
2227 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2230 }
2231
2233 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2234 (minPid != InvalidPid ?
2235 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2236 : 0),
2237 (minPid != InvalidPid ?
2238 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2239 : 0)));
2240
2242 }
2243}

References Assert, asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), fb(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1951 of file async.c.

1952{
1953 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1955 int64 occupied = headPage - tailPage;
1956
1958}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1999 of file async.c.

2000{
2001 size_t channellen = n->channel_len;
2002 size_t payloadlen = n->payload_len;
2003 int entryLength;
2004
2007
2008 /* The terminators are already included in AsyncQueueEntryEmptySize */
2011 qe->length = entryLength;
2012 qe->dboid = MyDatabaseId;
2013 qe->xid = GetCurrentTransactionId();
2014 qe->srcPid = MyProcPid;
2015 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2016}

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, fb(), GetCurrentTransactionId(), MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, and QUEUEALIGN.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 618 of file async.c.

619{
620 return p - q;
621}

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 628 of file async.c.

629{
630 return p < q;
631}

Referenced by asyncQueueAdvanceTail(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( QueuePosition current,
QueuePosition  stop,
Snapshot  snapshot 
)
static

Definition at line 2724 of file async.c.

2727{
2728 int64 curpage = QUEUE_POS_PAGE(*current);
2729 int slotno;
2730 char *page_buffer;
2731 bool reachedStop = false;
2732 bool reachedEndOfPage;
2733
2734 /*
2735 * We copy the entries into a local buffer to avoid holding the SLRU lock
2736 * while we transmit them to our frontend. The local buffer must be
2737 * adequately aligned.
2738 */
2740 char *local_buf_end = local_buf;
2741
2744 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2745
2746 do
2747 {
2748 QueuePosition thisentry = *current;
2750
2751 if (QUEUE_POS_EQUAL(thisentry, stop))
2752 break;
2753
2754 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2755
2756 /*
2757 * Advance *current over this message, possibly to the next page. As
2758 * noted in the comments for asyncQueueReadAllNotifications, we must
2759 * do this before possibly failing while processing the message.
2760 */
2761 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2762
2763 /* Ignore messages destined for other databases */
2764 if (qe->dboid == MyDatabaseId)
2765 {
2766 if (XidInMVCCSnapshot(qe->xid, snapshot))
2767 {
2768 /*
2769 * The source transaction is still in progress, so we can't
2770 * process this message yet. Break out of the loop, but first
2771 * back up *current so we will reprocess the message next
2772 * time. (Note: it is unlikely but not impossible for
2773 * TransactionIdDidCommit to fail, so we can't really avoid
2774 * this advance-then-back-up behavior when dealing with an
2775 * uncommitted message.)
2776 *
2777 * Note that we must test XidInMVCCSnapshot before we test
2778 * TransactionIdDidCommit, else we might return a message from
2779 * a transaction that is not yet visible to snapshots; compare
2780 * the comments at the head of heapam_visibility.c.
2781 *
2782 * Also, while our own xact won't be listed in the snapshot,
2783 * we need not check for TransactionIdIsCurrentTransactionId
2784 * because our transaction cannot (yet) have queued any
2785 * messages.
2786 */
2787 *current = thisentry;
2788 reachedStop = true;
2789 break;
2790 }
2791
2792 /*
2793 * Quick check for the case that we're not listening on any
2794 * channels, before calling TransactionIdDidCommit(). This makes
2795 * that case a little faster, but more importantly, it ensures
2796 * that if there's a bad entry in the queue for which
2797 * TransactionIdDidCommit() fails for some reason, we can skip
2798 * over it on the first LISTEN in a session, and not get stuck on
2799 * it indefinitely. (This is a little trickier than it looks: it
2800 * works because BecomeRegisteredListener runs this code before we
2801 * have made the first entry in localChannelTable.)
2802 */
2804 continue;
2805
2806 if (TransactionIdDidCommit(qe->xid))
2807 {
2808 memcpy(local_buf_end, qe, qe->length);
2809 local_buf_end += qe->length;
2810 }
2811 else
2812 {
2813 /*
2814 * The source transaction aborted or crashed, so we just
2815 * ignore its notifications.
2816 */
2817 }
2818 }
2819
2820 /* Loop back if we're not at end of page */
2821 } while (!reachedEndOfPage);
2822
2823 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2825
2826 /*
2827 * Now that we have let go of the SLRU bank lock, send the notifications
2828 * to our backend
2829 */
2831 for (char *p = local_buf; p < local_buf_end;)
2832 {
2834
2835 /* qe->data is the null-terminated channel name */
2836 char *channel = qe->data;
2837
2838 if (IsListeningOn(channel))
2839 {
2840 /* payload follows channel name */
2841 char *payload = qe->data + strlen(channel) + 1;
2842
2843 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2844 }
2845
2846 p += qe->length;
2847 }
2848
2849 if (QUEUE_POS_EQUAL(*current, stop))
2850 reachedStop = true;
2851
2852 return reachedStop;
2853}

References Assert, asyncQueueAdvance(), AsyncQueueEntry::data, fb(), InvalidTransactionId, IsListeningOn(), LocalChannelTableIsEmpty, LWLockRelease(), MyDatabaseId, NotifyCtl, NotifyMyFrontEnd(), QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), TransactionIdDidCommit(), and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 2590 of file async.c.

2591{
2592 QueuePosition pos;
2593 QueuePosition head;
2594 Snapshot snapshot;
2595
2596 /*
2597 * Fetch current state, indicate to others that we have woken up, and that
2598 * we are in process of advancing our position.
2599 */
2601 /* Assert checks that we have a valid state entry */
2605 head = QUEUE_HEAD;
2606
2607 if (QUEUE_POS_EQUAL(pos, head))
2608 {
2609 /* Nothing to do, we have read all notifications already. */
2611 return;
2612 }
2613
2616
2617 /*----------
2618 * Get snapshot we'll use to decide which xacts are still in progress.
2619 * This is trickier than it might seem, because of race conditions.
2620 * Consider the following example:
2621 *
2622 * Backend 1: Backend 2:
2623 *
2624 * transaction starts
2625 * UPDATE foo SET ...;
2626 * NOTIFY foo;
2627 * commit starts
2628 * queue the notify message
2629 * transaction starts
2630 * LISTEN foo; -- first LISTEN in session
2631 * SELECT * FROM foo WHERE ...;
2632 * commit to clog
2633 * commit starts
2634 * add backend 2 to array of listeners
2635 * advance to queue head (this code)
2636 * commit to clog
2637 *
2638 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2639 * wasn't committed yet. Ideally we'd ensure that client 2 would
2640 * eventually get transaction 1's notify message, but there's no way
2641 * to do that; until we're in the listener array, there's no guarantee
2642 * that the notify message doesn't get removed from the queue.
2643 *
2644 * Therefore the coding technique transaction 2 is using is unsafe:
2645 * applications must commit a LISTEN before inspecting database state,
2646 * if they want to ensure they will see notifications about subsequent
2647 * changes to that state.
2648 *
2649 * What we do guarantee is that we'll see all notifications from
2650 * transactions committing after the snapshot we take here.
2651 * BecomeRegisteredListener has already added us to the listener array,
2652 * so no not-yet-committed messages can be removed from the queue
2653 * before we see them.
2654 *----------
2655 */
2656 snapshot = RegisterSnapshot(GetLatestSnapshot());
2657
2658 /*
2659 * It is possible that we fail while trying to send a message to our
2660 * frontend (for example, because of encoding conversion failure). If
2661 * that happens it is critical that we not try to send the same message
2662 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2663 * ERRORs to FATAL, causing the client connection to be closed on error.
2664 *
2665 * We used to only skip over the offending message and try to soldier on,
2666 * but it was somewhat questionable to lose a notification and give the
2667 * client an ERROR instead. A client application is not be prepared for
2668 * that and can't tell that a notification was missed. It was also not
2669 * very useful in practice because notifications are often processed while
2670 * a connection is idle and reading a message from the client, and in that
2671 * state, any error is upgraded to FATAL anyway. Closing the connection
2672 * is a clear signal to the application that it might have missed
2673 * notifications.
2674 */
2675 {
2677 bool reachedStop;
2678
2679 ExitOnAnyError = true;
2680
2681 do
2682 {
2683 /*
2684 * Process messages up to the stop position, end of page, or an
2685 * uncommitted message.
2686 *
2687 * Our stop position is what we found to be the head's position
2688 * when we entered this function. It might have changed already.
2689 * But if it has, we will receive (or have already received and
2690 * queued) another signal and come here again.
2691 *
2692 * We are not holding NotifyQueueLock here! The queue can only
2693 * extend beyond the head pointer (see above) and we leave our
2694 * backend's pointer where it is so nobody will truncate or
2695 * rewrite pages under us. Especially we don't want to hold a lock
2696 * while sending the notifications to the frontend.
2697 */
2698 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2699 } while (!reachedStop);
2700
2701 /* Update shared state */
2706
2708 }
2709
2710 /* Done with snapshot */
2711 UnregisterSnapshot(snapshot);
2712}

References Assert, asyncQueueProcessPageEntries(), ExitOnAnyError, fb(), GetLatestSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_HEAD, QUEUE_POS_EQUAL, RegisterSnapshot(), and UnregisterSnapshot().

Referenced by BecomeRegisteredListener(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1908 of file async.c.

1909{
1910 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1911
1912 if (!amRegisteredListener) /* nothing to do */
1913 return;
1914
1915 /*
1916 * Need exclusive lock here to manipulate list links.
1917 */
1919 /* Mark our entry as invalid */
1924 /* and remove it from the list */
1927 else
1928 {
1930 {
1932 {
1934 break;
1935 }
1936 }
1937 }
1940
1941 /* mark ourselves as no longer listed in the global array */
1942 amRegisteredListener = false;
1943}

References amRegisteredListener, Assert, fb(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, LocalChannelTableIsEmpty, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 2186 of file async.c.

2187{
2188 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2190 int64 occupied = headPage - tailPage;
2191
2192 if (occupied == 0)
2193 return (double) 0; /* fast exit for common case */
2194
2195 return (double) occupied / (double) max_notify_queue_pages;
2196}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 793 of file async.c.

794{
795 bool found;
796 Size size;
797
798 /*
799 * Create or attach to the AsyncQueueControl structure.
800 */
801 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
802 size = add_size(size, offsetof(AsyncQueueControl, backend));
803
805 ShmemInitStruct("Async Queue Control", size, &found);
806
807 if (!found)
808 {
809 /* First time through, so initialize it */
812 QUEUE_STOP_PAGE = 0;
817 for (int i = 0; i < MaxBackends; i++)
818 {
825 }
826 }
827
828 /*
829 * Set up SLRU management of the pg_notify data. Note that long segment
830 * names are used in order to avoid wraparound.
831 */
832 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
835 SYNC_HANDLER_NONE, true);
836
837 if (!found)
838 {
839 /*
840 * During start or reboot, clean out the pg_notify directory.
841 */
843 }
844}

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, fb(), AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 776 of file async.c.

777{
778 Size size;
779
780 /* This had better match AsyncShmemInit */
781 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
782 size = add_size(size, offsetof(AsyncQueueControl, backend));
783
785
786 return size;
787}

References add_size(), fb(), MaxBackends, mul_size(), notify_buffers, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 2411 of file async.c.

2412{
2413 /* Revert staged listen/unlisten changes */
2415
2416 /* If we're no longer listening on anything, unregister */
2419
2420 /* And clean up */
2422}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and LocalChannelTableIsEmpty.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1370 of file async.c.

1371{
1372 /*
1373 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1374 * return as soon as possible
1375 */
1377 return;
1378
1379 if (Trace_notify)
1380 elog(DEBUG1, "AtCommit_Notify");
1381
1382 /* Apply staged listen/unlisten changes */
1384
1385 /* If no longer listening to anything, get out of listener array */
1388
1389 /*
1390 * Send signals to listening backends. We need do this only if there are
1391 * pending notifies, which were previously added to the shared queue by
1392 * PreCommit_Notify().
1393 */
1394 if (pendingNotifies != NULL)
1396
1397 /*
1398 * If it's time to try to advance the global tail pointer, do that.
1399 *
1400 * (It might seem odd to do this in the sender, when more than likely the
1401 * listeners won't yet have read the messages we just sent. However,
1402 * there's less contention if only the sender does it, and there is little
1403 * need for urgency in advancing the global tail. So this typically will
1404 * be clearing out messages that were sent some time ago.)
1405 */
1406 if (tryAdvanceTail)
1407 {
1408 tryAdvanceTail = false;
1410 }
1411
1412 /* And clean up */
1414}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueAdvanceTail(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), DEBUG1, elog, fb(), LocalChannelTableIsEmpty, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 1152 of file async.c.

1153{
1154 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1156 ereport(ERROR,
1158 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1159}

References ereport, errcode(), errmsg(), ERROR, fb(), pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 2500 of file async.c.

2501{
2502 int my_level = GetCurrentTransactionNestLevel();
2503
2504 /*
2505 * All we have to do is pop the stack --- the actions/notifies made in
2506 * this subxact are no longer interesting, and the space will be freed
2507 * when CurTransactionContext is recycled. We still have to free the
2508 * ActionList and NotificationList objects themselves, though, because
2509 * those are allocated in TopTransactionContext.
2510 *
2511 * Note that there might be no entries at all, or no entries for the
2512 * current subtransaction level, either because none were ever created, or
2513 * because we reentered this routine due to trouble during subxact abort.
2514 */
2515 while (pendingActions != NULL &&
2516 pendingActions->nestingLevel >= my_level)
2517 {
2519
2522 }
2523
2524 while (pendingNotifies != NULL &&
2525 pendingNotifies->nestingLevel >= my_level)
2526 {
2528
2531 }
2532}

References fb(), GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 2430 of file async.c.

2431{
2432 int my_level = GetCurrentTransactionNestLevel();
2433
2434 /* If there are actions at our nesting level, we must reparent them. */
2435 if (pendingActions != NULL &&
2436 pendingActions->nestingLevel >= my_level)
2437 {
2438 if (pendingActions->upper == NULL ||
2439 pendingActions->upper->nestingLevel < my_level - 1)
2440 {
2441 /* nothing to merge; give the whole thing to the parent */
2443 }
2444 else
2445 {
2447
2449
2450 /*
2451 * Mustn't try to eliminate duplicates here --- see queue_listen()
2452 */
2455 childPendingActions->actions);
2457 }
2458 }
2459
2460 /* If there are notifies at our nesting level, we must reparent them. */
2461 if (pendingNotifies != NULL &&
2462 pendingNotifies->nestingLevel >= my_level)
2463 {
2464 Assert(pendingNotifies->nestingLevel == my_level);
2465
2466 if (pendingNotifies->upper == NULL ||
2467 pendingNotifies->upper->nestingLevel < my_level - 1)
2468 {
2469 /* nothing to merge; give the whole thing to the parent */
2471 }
2472 else
2473 {
2474 /*
2475 * Formerly, we didn't bother to eliminate duplicates here, but
2476 * now we must, else we fall foul of "Assert(!found)", either here
2477 * or during a later attempt to build the parent-level hashtable.
2478 */
2480 ListCell *l;
2481
2483 /* Insert all the subxact's events into parent, except for dups */
2484 foreach(l, childPendingNotifies->events)
2485 {
2487
2490 }
2492 }
2493 }
2494}

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), fb(), GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ BecomeRegisteredListener()

static void BecomeRegisteredListener ( void  )
static

Definition at line 1422 of file async.c.

1423{
1424 QueuePosition head;
1425 QueuePosition max;
1427
1428 /*
1429 * Nothing to do if we are already listening to something, nor if we
1430 * already ran this routine in this transaction.
1431 */
1433 return;
1434
1435 if (Trace_notify)
1436 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1437
1438 /*
1439 * Before registering, make sure we will unlisten before dying. (Note:
1440 * this action does not get undone if we abort later.)
1441 */
1443 {
1446 }
1447
1448 /*
1449 * This is our first LISTEN, so establish our pointer.
1450 *
1451 * We set our pointer to the global tail pointer and then move it forward
1452 * over already-committed notifications. This ensures we cannot miss any
1453 * not-yet-committed notifications. We might get a few more but that
1454 * doesn't hurt.
1455 *
1456 * In some scenarios there might be a lot of committed notifications that
1457 * have not yet been pruned away (because some backend is being lazy about
1458 * reading them). To reduce our startup time, we can look at other
1459 * backends and adopt the maximum "pos" pointer of any backend that's in
1460 * our database; any notifications it's already advanced over are surely
1461 * committed and need not be re-examined by us. (We must consider only
1462 * backends connected to our DB, because others will not have bothered to
1463 * check committed-ness of notifications in our DB.)
1464 *
1465 * We need exclusive lock here so we can look at other backends' entries
1466 * and manipulate the list links.
1467 */
1469 head = QUEUE_HEAD;
1470 max = QUEUE_TAIL;
1473 {
1475 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1476 /* Also find last listening backend before this one */
1477 if (i < MyProcNumber)
1478 prevListener = i;
1479 }
1485 /* Insert backend into list of listeners at correct position */
1487 {
1490 }
1491 else
1492 {
1495 }
1497
1498 /* Now we are listed in the global array, so remember we're listening */
1499 amRegisteredListener = true;
1500
1501 /*
1502 * Try to move our pointer forward as far as possible. This will skip
1503 * over already-committed notifications, which we want to do because they
1504 * might be quite stale. Note that we are not yet listening on anything,
1505 * so we won't deliver such notifications to our frontend. Also, although
1506 * our transaction might have executed NOTIFY, those message(s) aren't
1507 * queued yet so we won't skip them here.
1508 */
1509 if (!QUEUE_POS_EQUAL(max, head))
1511}

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, fb(), i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ check_notify_buffers()

bool check_notify_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 3290 of file async.c.

3291{
3292 return check_slru_buffers("notify_buffers", newval);
3293}

References check_slru_buffers(), and newval.

◆ CleanupListenersOnExit()

static void CleanupListenersOnExit ( void  )
static

Definition at line 1838 of file async.c.

1839{
1840 dshash_seq_status status;
1841 GlobalChannelEntry *entry;
1842
1843 if (Trace_notify)
1844 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1845
1846 /* Clear our local cache (not really necessary, but be consistent) */
1847 if (localChannelTable != NULL)
1848 {
1851 }
1852
1853 /* Now remove our entries from the shared globalChannelTable */
1854 if (globalChannelTable == NULL)
1855 return;
1856
1857 dshash_seq_init(&status, globalChannelTable, true);
1858 while ((entry = dshash_seq_next(&status)) != NULL)
1859 {
1861
1862 if (entry->key.dboid != MyDatabaseId)
1863 continue; /* not relevant */
1864
1867
1868 for (int i = 0; i < entry->numListeners; i++)
1869 {
1870 if (listeners[i].procNo == MyProcNumber)
1871 {
1872 entry->numListeners--;
1873 if (i < entry->numListeners)
1874 memmove(&listeners[i], &listeners[i + 1],
1875 sizeof(ListenerEntry) * (entry->numListeners - i));
1876
1877 if (entry->numListeners == 0)
1878 {
1880 dshash_delete_current(&status);
1881 }
1882 break;
1883 }
1884 }
1885 }
1886 dshash_seq_term(&status);
1887}

References GlobalChannelKey::dboid, DEBUG1, dsa_free(), dsa_get_address(), dshash_delete_current(), dshash_seq_init(), dshash_seq_next(), dshash_seq_term(), elog, fb(), globalChannelDSA, globalChannelTable, hash_destroy(), i, GlobalChannelEntry::key, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, MyProcPid, GlobalChannelEntry::numListeners, and Trace_notify.

Referenced by Async_UnlistenOnExit().

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 3272 of file async.c.

3273{
3274 /*
3275 * Everything's allocated in either TopTransactionContext or the context
3276 * for the subtransaction to which it corresponds. So, there's nothing to
3277 * do here except reset the pointers; the space will be reclaimed when the
3278 * contexts are deleted.
3279 */
3282 /* Also clear pendingListenActions, which is derived from pendingActions */
3284}

References fb(), pendingActions, pendingListenActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ GlobalChannelKeyInit()

static void GlobalChannelKeyInit ( GlobalChannelKey key,
Oid  dboid,
const char channel 
)
inlinestatic

Definition at line 638 of file async.c.

639{
640 memset(key, 0, sizeof(GlobalChannelKey));
641 key->dboid = dboid;
642 strlcpy(key->channel, channel, NAMEDATALEN);
643}

References fb(), NAMEDATALEN, and strlcpy().

Referenced by ApplyPendingListenActions(), PrepareTableEntriesForListen(), and SignalBackends().

◆ globalChannelTableHash()

static dshash_hash globalChannelTableHash ( const void key,
size_t  size,
void arg 
)
static

Definition at line 650 of file async.c.

651{
652 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
653 dshash_hash h;
654
656 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
658
659 return h;
660}

References GlobalChannelKey::channel, DatumGetUInt32(), GlobalChannelKey::dboid, fb(), hash_any(), hash_uint32(), and NAMEDATALEN.

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 2543 of file async.c.

2544{
2545 /*
2546 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2547 * you do here.
2548 */
2549
2550 /* signal that work needs to be done */
2552
2553 /* make sure the event is processed in due course */
2555}

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ initGlobalChannelTable()

static void initGlobalChannelTable ( void  )
static

Definition at line 677 of file async.c.

678{
679 MemoryContext oldcontext;
680
681 /* Quick exit if we already did this */
684 return;
685
686 /* Otherwise, use a lock to ensure only one process creates the table */
688
689 /* Be sure any local memory allocated by DSA routines is persistent */
691
693 {
694 /* Initialize dynamic shared hash table for global channels */
700 NULL);
701
702 /* Store handles in shared memory for other backends to use */
706 }
707 else if (!globalChannelTable)
708 {
709 /* Attach to existing dynamic shared hash table */
715 NULL);
716 }
717
718 MemoryContextSwitchTo(oldcontext);
720}

References asyncQueueControl, dsa_attach(), dsa_create, dsa_get_handle(), dsa_pin(), dsa_pin_mapping(), dshash_attach(), dshash_create(), dshash_get_hash_table_handle(), DSHASH_HANDLE_INVALID, fb(), globalChannelDSA, globalChannelTable, AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, globalChannelTableDSHParams, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MemoryContextSwitchTo(), and TopMemoryContext.

Referenced by PreCommit_Notify().

◆ initLocalChannelTable()

static void initLocalChannelTable ( void  )
static

Definition at line 728 of file async.c.

729{
731
732 /* Quick exit if we already did this */
733 if (localChannelTable != NULL)
734 return;
735
736 /* Initialize local hash table for this backend's listened channels */
738 hash_ctl.entrysize = sizeof(ChannelName);
739
741 hash_create("Local Listen Channels",
742 64,
743 &hash_ctl,
745}

References fb(), hash_create(), HASH_ELEM, HASH_STRINGS, HASHCTL::keysize, localChannelTable, and NAMEDATALEN.

Referenced by PreCommit_Notify().

◆ initPendingListenActions()

static void initPendingListenActions ( void  )
static

◆ IsListeningOn()

static bool IsListeningOn ( const char channel)
static

Definition at line 1895 of file async.c.

1896{
1897 if (localChannelTable == NULL)
1898 return false;
1899
1900 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1901}

References fb(), HASH_FIND, hash_search(), and localChannelTable.

Referenced by asyncQueueProcessPageEntries().

◆ notification_hash()

static uint32 notification_hash ( const void key,
Size  keysize 
)
static

Definition at line 3242 of file async.c.

3243{
3244 const Notification *k = *(const Notification *const *) key;
3245
3246 Assert(keysize == sizeof(Notification *));
3247 /* We don't bother to include the payload's trailing null in the hash */
3248 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3249 k->channel_len + k->payload_len + 1));
3250}

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void key1,
const void key2,
Size  keysize 
)
static

Definition at line 3256 of file async.c.

3257{
3258 const Notification *k1 = *(const Notification *const *) key1;
3259 const Notification *k2 = *(const Notification *const *) key2;
3260
3261 Assert(keysize == sizeof(Notification *));
3262 if (k1->channel_len == k2->channel_len &&
3263 k1->payload_len == k2->payload_len &&
3264 memcmp(k1->data, k2->data,
3265 k1->channel_len + k1->payload_len + 2) == 0)
3266 return 0; /* equal */
3267 return 1; /* not equal */
3268}

References Assert, and fb().

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char channel,
const char payload,
int32  srcPid 
)

Definition at line 3093 of file async.c.

3094{
3096 {
3098
3100 pq_sendint32(&buf, srcPid);
3101 pq_sendstring(&buf, channel);
3102 pq_sendstring(&buf, payload);
3104
3105 /*
3106 * NOTE: we do not do pq_flush() here. Some level of caller will
3107 * handle it later, allowing this message to be combined into a packet
3108 * with other ones.
3109 */
3110 }
3111 else
3112 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3113}

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 1087 of file async.c.

1088{
1090 HASH_SEQ_STATUS *status;
1091
1092 /* stuff done only on the first call of the function */
1093 if (SRF_IS_FIRSTCALL())
1094 {
1095 /* create a function context for cross-call persistence */
1097
1098 /* Initialize hash table iteration if we have any channels */
1099 if (localChannelTable != NULL)
1100 {
1101 MemoryContext oldcontext;
1102
1103 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1104 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1106 funcctx->user_fctx = status;
1107 MemoryContextSwitchTo(oldcontext);
1108 }
1109 else
1110 {
1111 funcctx->user_fctx = NULL;
1112 }
1113 }
1114
1115 /* stuff done on every call of the function */
1117 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1118
1119 if (status != NULL)
1120 {
1121 ChannelName *entry;
1122
1123 entry = (ChannelName *) hash_seq_search(status);
1124 if (entry != NULL)
1126 }
1127
1129}

References ChannelName::channel, CStringGetTextDatum, fb(), hash_seq_init(), hash_seq_search(), localChannelTable, MemoryContextSwitchTo(), palloc(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 2161 of file async.c.

2162{
2163 double usage;
2164
2165 /* Advance the queue tail so we don't report a too-large result */
2167
2171
2173}

References asyncQueueAdvanceTail(), asyncQueueUsage(), fb(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 852 of file async.c.

853{
854 const char *channel;
855 const char *payload;
856
857 if (PG_ARGISNULL(0))
858 channel = "";
859 else
861
862 if (PG_ARGISNULL(1))
863 payload = "";
864 else
866
867 /* For NOTIFY as a statement, this is checked in ProcessUtility */
869
870 Async_Notify(channel, payload);
871
873}

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 1177 of file async.c.

1178{
1179 ListCell *p;
1180
1182 return; /* no relevant statements in this xact */
1183
1184 if (Trace_notify)
1185 elog(DEBUG1, "PreCommit_Notify");
1186
1187 /* Preflight for any pending listen/unlisten actions */
1189
1190 if (pendingActions != NULL)
1191 {
1192 /* Ensure we have a local channel table */
1194 /* Create pendingListenActions hash table for this transaction */
1196
1197 /* Stage all the actions this transaction wants to perform */
1198 foreach(p, pendingActions->actions)
1199 {
1201
1202 switch (actrec->action)
1203 {
1204 case LISTEN_LISTEN:
1207 break;
1208 case LISTEN_UNLISTEN:
1210 break;
1213 break;
1214 }
1215 }
1216 }
1217
1218 /* Queue any pending notifies (must happen after the above) */
1219 if (pendingNotifies)
1220 {
1222 bool firstIteration = true;
1223
1224 /*
1225 * Build list of unique channel names being notified for use by
1226 * SignalBackends().
1227 *
1228 * If uniqueChannelHash is available, use it to efficiently get the
1229 * unique channels. Otherwise, fall back to the O(N^2) approach.
1230 */
1233 {
1234 HASH_SEQ_STATUS status;
1236
1238 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1241 channelEntry->channel);
1242 }
1243 else
1244 {
1245 /* O(N^2) approach is better for small number of notifications */
1247 {
1248 char *channel = n->data;
1249 bool found = false;
1250
1251 /* Name present in list? */
1253 {
1254 if (strcmp(oldchan, channel) == 0)
1255 {
1256 found = true;
1257 break;
1258 }
1259 }
1260 /* Add if not already in list */
1261 if (!found)
1264 channel);
1265 }
1266 }
1267
1268 /* Preallocate workspace that will be needed by SignalBackends() */
1269 if (signalPids == NULL)
1271 MaxBackends * sizeof(int32));
1272
1273 if (signalProcnos == NULL)
1275 MaxBackends * sizeof(ProcNumber));
1276
1277 /*
1278 * Make sure that we have an XID assigned to the current transaction.
1279 * GetCurrentTransactionId is cheap if we already have an XID, but not
1280 * so cheap if we don't, and we'd prefer not to do that work while
1281 * holding NotifyQueueLock.
1282 */
1284
1285 /*
1286 * Serialize writers by acquiring a special lock that we hold till
1287 * after commit. This ensures that queue entries appear in commit
1288 * order, and in particular that there are never uncommitted queue
1289 * entries ahead of committed ones, so an uncommitted transaction
1290 * can't block delivery of deliverable notifications.
1291 *
1292 * We use a heavyweight lock so that it'll automatically be released
1293 * after either commit or abort. This also allows deadlocks to be
1294 * detected, though really a deadlock shouldn't be possible here.
1295 *
1296 * The lock is on "database 0", which is pretty ugly but it doesn't
1297 * seem worth inventing a special locktag category just for this.
1298 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1299 * used by the flatfiles mechanism.)
1300 */
1303
1304 /*
1305 * For the direct advancement optimization in SignalBackends(), we
1306 * need to ensure that no other backend can insert queue entries
1307 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1308 * heavyweight lock above provides this guarantee, since it serializes
1309 * all writers.
1310 *
1311 * Note: if the heavyweight lock were ever removed for scalability
1312 * reasons, we could achieve the same guarantee by holding
1313 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1314 * than releasing and reacquiring it for each page as we do below.
1315 */
1316
1317 /* Initialize values to a safe default in case list is empty */
1320
1321 /* Now push the notifications into the queue */
1323 while (nextNotify != NULL)
1324 {
1325 /*
1326 * Add the pending notifications to the queue. We acquire and
1327 * release NotifyQueueLock once per page, which might be overkill
1328 * but it does allow readers to get in while we're doing this.
1329 *
1330 * A full queue is very uncommon and should really not happen,
1331 * given that we have so much space available in the SLRU pages.
1332 * Nevertheless we need to deal with this possibility. Note that
1333 * when we get here we are in the process of committing our
1334 * transaction, but we have not yet committed to clog, so at this
1335 * point in time we can still roll the transaction back.
1336 */
1338 if (firstIteration)
1339 {
1341 firstIteration = false;
1342 }
1344 if (asyncQueueIsFull())
1345 ereport(ERROR,
1347 errmsg("too many notifications in the NOTIFY queue")));
1351 }
1352
1353 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1354 }
1355}

References AccessExclusiveLock, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), BecomeRegisteredListener(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, fb(), foreach_ptr, GetCurrentTransactionId(), hash_seq_init(), hash_seq_search(), initGlobalChannelTable(), initLocalChannelTable(), initPendingListenActions(), InvalidOid, lappend(), lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MemoryContextAlloc(), NIL, pendingActions, pendingNotifies, PrepareTableEntriesForListen(), PrepareTableEntriesForUnlisten(), PrepareTableEntriesForUnlistenAll(), QUEUE_HEAD, queueHeadAfterWrite, queueHeadBeforeWrite, SET_QUEUE_POS, signalPids, signalProcnos, TopMemoryContext, Trace_notify, NotificationList::uniqueChannelHash, and NotificationList::uniqueChannelNames.

Referenced by CommitTransaction().

◆ PrepareTableEntriesForListen()

static void PrepareTableEntriesForListen ( const char channel)
static

Definition at line 1523 of file async.c.

1524{
1526 GlobalChannelEntry *entry;
1527 bool found;
1529 PendingListenEntry *pending;
1530
1531 /*
1532 * Record in local pending hash that we want to LISTEN, overwriting any
1533 * earlier attempt to UNLISTEN.
1534 */
1535 pending = (PendingListenEntry *)
1537 pending->action = PENDING_LISTEN;
1538
1539 /*
1540 * Ensure that there is an entry for the channel in localChannelTable.
1541 * (Should this fail, we can just roll back.) If the transaction fails
1542 * after this point, we will remove the entry if appropriate during
1543 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1544 * to return TRUE; we assume nothing is going to consult that before
1545 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1546 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1547 * present to ensure they do the right things; see
1548 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1549 */
1551
1552 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1553 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1554 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1555
1556 if (!found)
1557 {
1558 /* New channel entry, so initialize it to a safe state */
1560 entry->numListeners = 0;
1561 entry->allocatedListeners = 0;
1562 }
1563
1564 /*
1565 * Create listenersArray if entry doesn't have one. It's tempting to fold
1566 * this into the !found case, but this coding allows us to cope in case
1567 * dsa_allocate() failed in an earlier attempt.
1568 */
1569 if (!DsaPointerIsValid(entry->listenersArray))
1570 {
1574 }
1575
1578
1579 /*
1580 * Check if we already have a ListenerEntry (possibly from earlier in this
1581 * transaction)
1582 */
1583 for (int i = 0; i < entry->numListeners; i++)
1584 {
1585 if (listeners[i].procNo == MyProcNumber)
1586 {
1587 /* Already have an entry; listening flag stays as-is until commit */
1589 return;
1590 }
1591 }
1592
1593 /* Need to add a new entry; grow array if necessary */
1594 if (entry->numListeners >= entry->allocatedListeners)
1595 {
1596 int new_size = entry->allocatedListeners * 2;
1599 sizeof(ListenerEntry) * new_size);
1601
1603 entry->listenersArray = new_array;
1607 }
1608
1609 listeners[entry->numListeners].procNo = MyProcNumber;
1610 listeners[entry->numListeners].listening = false; /* staged, not yet
1611 * committed */
1612 entry->numListeners++;
1613
1615}

References PendingListenEntry::action, GlobalChannelEntry::allocatedListeners, dsa_allocate, dsa_free(), dsa_get_address(), DsaPointerIsValid, dshash_find_or_insert(), dshash_release_lock(), fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_ENTER, hash_search(), i, INITIAL_LISTENERS_ARRAY_SIZE, InvalidDsaPointer, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PENDING_LISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlisten()

static void PrepareTableEntriesForUnlisten ( const char channel)
static

Definition at line 1626 of file async.c.

1627{
1628 PendingListenEntry *pending;
1629
1630 /*
1631 * If the channel name is not in localChannelTable, then we are neither
1632 * listening on it nor preparing to listen on it, so we don't need to
1633 * record an UNLISTEN action.
1634 */
1636 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1637 return;
1638
1639 /*
1640 * Record in local pending hash that we want to UNLISTEN, overwriting any
1641 * earlier attempt to LISTEN. Don't touch localChannelTable or
1642 * globalChannelTable yet - we keep receiving signals until commit.
1643 */
1644 pending = (PendingListenEntry *)
1646 pending->action = PENDING_UNLISTEN;
1647}

References PendingListenEntry::action, Assert, fb(), HASH_ENTER, HASH_FIND, hash_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlistenAll()

static void PrepareTableEntriesForUnlistenAll ( void  )
static

Definition at line 1656 of file async.c.

1657{
1660 PendingListenEntry *pending;
1661
1662 /*
1663 * Scan localChannelTable, which will have the names of all channels that
1664 * we are listening on or have prepared to listen on. Record an UNLISTEN
1665 * action for each one, overwriting any earlier attempt to LISTEN.
1666 */
1668 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1669 {
1670 pending = (PendingListenEntry *)
1672 pending->action = PENDING_UNLISTEN;
1673 }
1674}

References PendingListenEntry::action, fb(), HASH_ENTER, hash_search(), hash_seq_init(), hash_seq_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 3052 of file async.c.

3053{
3054 /* We *must* reset the flag */
3055 notifyInterruptPending = false;
3056
3057 /* Do nothing else if we aren't actively listening */
3059 return;
3060
3061 if (Trace_notify)
3062 elog(DEBUG1, "ProcessIncomingNotify");
3063
3064 set_ps_display("notify interrupt");
3065
3066 /*
3067 * We must run asyncQueueReadAllNotifications inside a transaction, else
3068 * bad things happen if it gets an error.
3069 */
3071
3073
3075
3076 /*
3077 * If this isn't an end-of-command case, we must flush the notify messages
3078 * to ensure frontend gets them promptly.
3079 */
3080 if (flush)
3081 pq_flush();
3082
3083 set_ps_display("idle");
3084
3085 if (Trace_notify)
3086 elog(DEBUG1, "ProcessIncomingNotify: done");
3087}

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, LocalChannelTableIsEmpty, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 2573 of file async.c.

2574{
2576 return; /* not really idle */
2577
2578 /* Loop in case another signal arrives while sending messages */
2580 ProcessIncomingNotify(flush);
2581}

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char channel 
)
static

Definition at line 988 of file async.c.

989{
990 MemoryContext oldcontext;
992 int my_level = GetCurrentTransactionNestLevel();
993
994 /*
995 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
996 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
997 * final per-channel intent is computed during PreCommit_Notify.
998 */
1000
1001 /* space for terminating null is included in sizeof(ListenAction) */
1003 strlen(channel) + 1);
1004 actrec->action = action;
1005 strcpy(actrec->channel, channel);
1006
1007 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1008 {
1009 ActionList *actions;
1010
1011 /*
1012 * First action in current sub(xact). Note that we allocate the
1013 * ActionList in TopTransactionContext; the nestingLevel might get
1014 * changed later by AtSubCommit_Notify.
1015 */
1016 actions = (ActionList *)
1018 actions->nestingLevel = my_level;
1019 actions->actions = list_make1(actrec);
1020 actions->upper = pendingActions;
1021 pendingActions = actions;
1022 }
1023 else
1025
1026 MemoryContextSwitchTo(oldcontext);
1027}

References ActionList::actions, CurTransactionContext, fb(), GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ RemoveListenerFromChannel()

static void RemoveListenerFromChannel ( GlobalChannelEntry **  entry_ptr,
ListenerEntry listeners,
int  idx 
)
static

Definition at line 1686 of file async.c.

1689{
1690 GlobalChannelEntry *entry = *entry_ptr;
1691
1692 entry->numListeners--;
1693 if (idx < entry->numListeners)
1695 sizeof(ListenerEntry) * (entry->numListeners - idx));
1696
1697 if (entry->numListeners == 0)
1698 {
1701 /* tells caller not to release the entry's lock: */
1702 *entry_ptr = NULL;
1703 }
1704}

References dsa_free(), dshash_delete_entry(), fb(), globalChannelDSA, globalChannelTable, idx(), GlobalChannelEntry::listenersArray, and GlobalChannelEntry::numListeners.

Referenced by ApplyPendingListenActions().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 2259 of file async.c.

2260{
2261 int count;
2262
2263 /* Can't get here without PreCommit_Notify having made the global table */
2265
2266 /* It should have set up these arrays, too */
2268
2269 /*
2270 * Identify backends that we need to signal. We don't want to send
2271 * signals while holding the NotifyQueueLock, so this part just builds a
2272 * list of target PIDs in signalPids[] and signalProcnos[].
2273 */
2274 count = 0;
2275
2277
2278 /* Scan each channel name that we notified in this transaction */
2280 {
2282 GlobalChannelEntry *entry;
2284
2285 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2286 entry = dshash_find(globalChannelTable, &key, false);
2287 if (entry == NULL)
2288 continue; /* nobody is listening */
2289
2291 entry->listenersArray);
2292
2293 /* Identify listeners that now need waking, add them to arrays */
2294 for (int j = 0; j < entry->numListeners; j++)
2295 {
2296 ProcNumber i;
2297 int32 pid;
2298 QueuePosition pos;
2299
2300 if (!listeners[j].listening)
2301 continue; /* ignore not-yet-committed listeners */
2302
2303 i = listeners[j].procNo;
2304
2306 continue; /* already signaled, no need to repeat */
2307
2308 pid = QUEUE_BACKEND_PID(i);
2309 pos = QUEUE_BACKEND_POS(i);
2310
2311 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2312 continue; /* it's fully caught up already */
2313
2314 Assert(pid != InvalidPid);
2315
2317 signalPids[count] = pid;
2318 signalProcnos[count] = i;
2319 count++;
2320 }
2321
2323 }
2324
2325 /*
2326 * Scan all listeners. Any that are not already pending wakeup must not
2327 * be interested in our notifications (else we'd have set their wakeup
2328 * flags above). Check to see if we can directly advance their queue
2329 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2330 * them anyway so they will catch up.
2331 */
2333 {
2334 int32 pid;
2335 QueuePosition pos;
2336
2338 continue;
2339
2340 /* If it's currently advancing, we should not touch it */
2342 continue;
2343
2344 pid = QUEUE_BACKEND_PID(i);
2345 pos = QUEUE_BACKEND_POS(i);
2346
2347 /*
2348 * We can directly advance the other backend's queue pointer if it's
2349 * not currently advancing (else there are race conditions), and its
2350 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2351 * it miss some older messages), and we'd not be moving the pointer
2352 * backward.
2353 */
2356 {
2357 /* We can directly advance its pointer past what we wrote */
2359 }
2362 {
2363 /* It's idle and far behind, so wake it up */
2364 Assert(pid != InvalidPid);
2365
2367 signalPids[count] = pid;
2368 signalProcnos[count] = i;
2369 count++;
2370 }
2371 }
2372
2374
2375 /* Now send signals */
2376 for (int i = 0; i < count; i++)
2377 {
2378 int32 pid = signalPids[i];
2379
2380 /*
2381 * If we are signaling our own process, no need to involve the kernel;
2382 * just set the flag directly.
2383 */
2384 if (pid == MyProcPid)
2385 {
2387 continue;
2388 }
2389
2390 /*
2391 * Note: assuming things aren't broken, a signal failure here could
2392 * only occur if the target backend exited since we released
2393 * NotifyQueueLock; which is unlikely but certainly possible. So we
2394 * just log a low-level debug message if it happens.
2395 */
2397 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2398 }
2399}

References Assert, asyncQueuePageDiff(), DEBUG3, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), foreach_ptr, globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, i, INVALID_PROC_NUMBER, InvalidPid, j, GlobalChannelEntry::listenersArray, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcPid, notifyInterruptPending, GlobalChannelEntry::numListeners, pendingNotifies, PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, QUEUE_POS_PRECEDES, queueHeadAfterWrite, queueHeadBeforeWrite, SendProcSignal(), signalPids, signalProcnos, and NotificationList::uniqueChannelNames.

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 346 of file async.c.

Referenced by asyncQueueFillWarning(), AsyncShmemInit(), and initGlobalChannelTable().

◆ globalChannelDSA

◆ globalChannelTable

◆ globalChannelTableDSHParams

const dshash_parameters globalChannelTableDSHParams
static

◆ localChannelTable

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 570 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 362 of file async.c.

◆ notifyInterruptPending

◆ pendingActions

◆ pendingListenActions

◆ pendingNotifies

◆ queueHeadAfterWrite

QueuePosition queueHeadAfterWrite
static

Definition at line 553 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ queueHeadBeforeWrite

QueuePosition queueHeadBeforeWrite
static

Definition at line 552 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalPids

int32* signalPids = NULL
static

Definition at line 560 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalProcnos

ProcNumber* signalProcnos = NULL
static

Definition at line 561 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 564 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 541 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and BecomeRegisteredListener().