PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  GlobalChannelKey
 
struct  ListenerEntry
 
struct  GlobalChannelEntry
 
struct  ListenAction
 
struct  ActionList
 
struct  PendingListenEntry
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 
struct  ChannelName
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_POS_PRECEDES(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define QUEUE_BACKEND_WAKEUP_PENDING(i)   (asyncQueueControl->backend[i].wakeupPending)
 
#define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define INITIAL_LISTENERS_ARRAY_SIZE   4
 
#define LocalChannelTableIsEmpty()    (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct GlobalChannelKey GlobalChannelKey
 
typedef struct ListenerEntry ListenerEntry
 
typedef struct GlobalChannelEntry GlobalChannelEntry
 
typedef struct ActionList ActionList
 
typedef struct PendingListenEntry PendingListenEntry
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct ChannelName ChannelName
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 
enum  PendingListenAction { PENDING_LISTEN , PENDING_UNLISTEN }
 

Functions

static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static void GlobalChannelKeyInit (GlobalChannelKey *key, Oid dboid, const char *channel)
 
static dshash_hash globalChannelTableHash (const void *key, size_t size, void *arg)
 
static void initGlobalChannelTable (void)
 
static void initLocalChannelTable (void)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void BecomeRegisteredListener (void)
 
static void PrepareTableEntriesForListen (const char *channel)
 
static void PrepareTableEntriesForUnlisten (const char *channel)
 
static void PrepareTableEntriesForUnlistenAll (void)
 
static void RemoveListenerFromChannel (GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
 
static void ApplyPendingListenActions (bool isCommit)
 
static void CleanupListenersOnExit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (QueuePosition *current, QueuePosition stop, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
static void initPendingListenActions (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void AsyncNotifyFreezeXids (TransactionId newFrozenXid)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static dshash_tableglobalChannelTable = NULL
 
static dsa_areaglobalChannelDSA = NULL
 
static HTABlocalChannelTable = NULL
 
static ActionListpendingActions = NULL
 
static HTABpendingListenActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static QueuePosition queueHeadBeforeWrite
 
static QueuePosition queueHeadAfterWrite
 
static int32signalPids = NULL
 
static ProcNumbersignalProcnos = NULL
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 
static const dshash_parameters globalChannelTableDSHParams
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 225 of file async.c.

◆ INITIAL_LISTENERS_ARRAY_SIZE

#define INITIAL_LISTENERS_ARRAY_SIZE   4

Definition at line 377 of file async.c.

◆ LocalChannelTableIsEmpty

#define LocalChannelTableIsEmpty ( )     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)

Definition at line 410 of file async.c.

423{
428
429typedef struct
430{
432 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
434
435typedef struct ActionList
436{
437 int nestingLevel; /* current transaction nesting depth */
438 List *actions; /* list of ListenAction structs */
439 struct ActionList *upper; /* details for upper transaction levels */
440} ActionList;
441
443
444/*
445 * Hash table recording the final listen/unlisten intent per channel for
446 * the current transaction. Key is channel name, value is PENDING_LISTEN or
447 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
448 * per channel instead of replaying every action. This is built from the
449 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
450 * AtAbort_Notify.
451 */
452typedef enum
453{
457
458typedef struct PendingListenEntry
459{
460 char channel[NAMEDATALEN]; /* hash key */
461 PendingListenAction action; /* which action should we perform? */
463
465
466/*
467 * State for outbound notifies consists of a list of all channels+payloads
468 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
469 * until and unless the transaction commits. pendingNotifies is NULL if no
470 * NOTIFYs have been done in the current (sub) transaction.
471 *
472 * We discard duplicate notify events issued in the same transaction.
473 * Hence, in addition to the list proper (which we need to track the order
474 * of the events, since we guarantee to deliver them in order), we build a
475 * hash table which we can probe to detect duplicates. Since building the
476 * hash table is somewhat expensive, we do so only once we have at least
477 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
478 * before that we just scan the events linearly.
479 *
480 * The list is kept in CurTransactionContext. In subtransactions, each
481 * subtransaction has its own list in its own CurTransactionContext, but
482 * successful subtransactions add their entries to their parent's list.
483 * Failed subtransactions simply discard their lists. Since these lists
484 * are independent, there may be notify events in a subtransaction's list
485 * that duplicate events in some ancestor (sub) transaction; we get rid of
486 * the dups when merging the subtransaction's list into its parent's.
487 *
488 * Note: the action and notify lists do not interact within a transaction.
489 * In particular, if a transaction does NOTIFY and then LISTEN on the same
490 * condition name, it will get a self-notify at commit. This is a bit odd
491 * but is consistent with our historical behavior.
492 */
493typedef struct Notification
494{
495 uint16 channel_len; /* length of channel-name string */
496 uint16 payload_len; /* length of payload string */
497 /* null-terminated channel name, then null-terminated payload follow */
500
501typedef struct NotificationList
502{
503 int nestingLevel; /* current transaction nesting depth */
504 List *events; /* list of Notification structs */
505 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
506 List *uniqueChannelNames; /* unique channel names being notified */
507 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
508 struct NotificationList *upper; /* details for upper transaction levels */
510
511#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
512
513struct NotificationHash
514{
515 Notification *event; /* => the actual Notification struct */
516};
517
519
520/*
521 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
522 * (both just carry the channel name, with no payload).
523 */
524typedef struct ChannelName
525{
526 char channel[NAMEDATALEN]; /* hash key */
528
529/*
530 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
531 * called from inside a signal handler. That just sets the
532 * notifyInterruptPending flag and sets the process
533 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
534 * actually deal with the interrupt.
535 */
536volatile sig_atomic_t notifyInterruptPending = false;
537
538/* True if we've registered an on_shmem_exit cleanup */
539static bool unlistenExitRegistered = false;
540
541/* True if we're currently registered as a listener in asyncQueueControl */
542static bool amRegisteredListener = false;
543
544/*
545 * Queue head positions for direct advancement.
546 * These are captured during PreCommit_Notify while holding the heavyweight
547 * lock on database 0, ensuring no other backend can insert notifications
548 * between them. SignalBackends uses these to advance idle backends.
549 */
552
553/*
554 * Workspace arrays for SignalBackends. These are preallocated in
555 * PreCommit_Notify to avoid needing memory allocation after committing to
556 * clog.
557 */
558static int32 *signalPids = NULL;
560
561/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
562static bool tryAdvanceTail = false;
563
564/* GUC parameters */
565bool Trace_notify = false;
566
567/* For 8 KB pages this gives 8 GB of disk space */
568int max_notify_queue_pages = 1048576;
569
570/* local function prototypes */
571static inline int64 asyncQueuePageDiff(int64 p, int64 q);
572static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
573static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
574 const char *channel);
575static dshash_hash globalChannelTableHash(const void *key, size_t size,
576 void *arg);
577static void initGlobalChannelTable(void);
578static void initLocalChannelTable(void);
579static void queue_listen(ListenActionKind action, const char *channel);
580static void Async_UnlistenOnExit(int code, Datum arg);
581static void BecomeRegisteredListener(void);
582static void PrepareTableEntriesForListen(const char *channel);
583static void PrepareTableEntriesForUnlisten(const char *channel);
584static void PrepareTableEntriesForUnlistenAll(void);
587 int idx);
588static void ApplyPendingListenActions(bool isCommit);
589static void CleanupListenersOnExit(void);
590static bool IsListeningOn(const char *channel);
591static void asyncQueueUnregister(void);
592static bool asyncQueueIsFull(void);
593static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
596static double asyncQueueUsage(void);
597static void asyncQueueFillWarning(void);
598static void SignalBackends(void);
599static void asyncQueueReadAllNotifications(void);
601 QueuePosition stop,
602 Snapshot snapshot);
603static void asyncQueueAdvanceTail(void);
604static void ProcessIncomingNotify(bool flush);
607static uint32 notification_hash(const void *key, Size keysize);
608static int notification_match(const void *key1, const void *key2, Size keysize);
609static void ClearPendingActionsAndNotifies(void);
610
611/*
612 * Compute the difference between two queue page numbers.
613 * Previously this function accounted for a wraparound.
614 */
615static inline int64
617{
618 return p - q;
619}
620
621/*
622 * Determines whether p precedes q.
623 * Previously this function accounted for a wraparound.
624 */
625static inline bool
627{
628 return p < q;
629}
630
631/*
632 * GlobalChannelKeyInit
633 * Prepare a global channel table key for hashing.
634 */
635static inline void
636GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
637{
638 memset(key, 0, sizeof(GlobalChannelKey));
639 key->dboid = dboid;
640 strlcpy(key->channel, channel, NAMEDATALEN);
641}
642
643/*
644 * globalChannelTableHash
645 * Hash function for global channel table keys.
646 */
647static dshash_hash
648globalChannelTableHash(const void *key, size_t size, void *arg)
649{
650 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
651 dshash_hash h;
652
654 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
656
657 return h;
658}
659
660/* parameters for the global channel table */
662 sizeof(GlobalChannelKey),
663 sizeof(GlobalChannelEntry),
668};
669
670/*
671 * initGlobalChannelTable
672 * Lazy initialization of the global channel table.
673 */
674static void
676{
677 MemoryContext oldcontext;
678
679 /* Quick exit if we already did this */
682 return;
683
684 /* Otherwise, use a lock to ensure only one process creates the table */
686
687 /* Be sure any local memory allocated by DSA routines is persistent */
689
691 {
692 /* Initialize dynamic shared hash table for global channels */
698 NULL);
699
700 /* Store handles in shared memory for other backends to use */
704 }
705 else if (!globalChannelTable)
706 {
707 /* Attach to existing dynamic shared hash table */
713 NULL);
714 }
715
716 MemoryContextSwitchTo(oldcontext);
718}
719
720/*
721 * initLocalChannelTable
722 * Lazy initialization of the local channel table.
723 * Once created, this table lasts for the life of the session.
724 */
725static void
727{
729
730 /* Quick exit if we already did this */
731 if (localChannelTable != NULL)
732 return;
733
734 /* Initialize local hash table for this backend's listened channels */
736 hash_ctl.entrysize = sizeof(ChannelName);
737
739 hash_create("Local Listen Channels",
740 64,
741 &hash_ctl,
743}
744
745/*
746 * initPendingListenActions
747 * Lazy initialization of the pending listen actions hash table.
748 * This is allocated in CurTransactionContext during PreCommit_Notify,
749 * and destroyed at transaction end.
750 */
751static void
753{
755
757 return;
758
760 hash_ctl.entrysize = sizeof(PendingListenEntry);
762
764 hash_create("Pending Listen Actions",
766 &hash_ctl,
768}
769
770/*
771 * Report space needed for our shared memory area
772 */
773Size
774AsyncShmemSize(void)
775{
776 Size size;
777
778 /* This had better match AsyncShmemInit */
779 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
780 size = add_size(size, offsetof(AsyncQueueControl, backend));
781
783
784 return size;
785}
786
787/*
788 * Initialize our shared memory area
789 */
790void
791AsyncShmemInit(void)
792{
793 bool found;
794 Size size;
795
796 /*
797 * Create or attach to the AsyncQueueControl structure.
798 */
799 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
800 size = add_size(size, offsetof(AsyncQueueControl, backend));
801
803 ShmemInitStruct("Async Queue Control", size, &found);
804
805 if (!found)
806 {
807 /* First time through, so initialize it */
810 QUEUE_STOP_PAGE = 0;
815 for (int i = 0; i < MaxBackends; i++)
816 {
823 }
824 }
825
826 /*
827 * Set up SLRU management of the pg_notify data. Note that long segment
828 * names are used in order to avoid wraparound.
829 */
830 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
833 SYNC_HANDLER_NONE, true);
834
835 if (!found)
836 {
837 /*
838 * During start or reboot, clean out the pg_notify directory.
839 */
841 }
842}
843
844
845/*
846 * pg_notify -
847 * SQL function to send a notification event
848 */
849Datum
851{
852 const char *channel;
853 const char *payload;
854
855 if (PG_ARGISNULL(0))
856 channel = "";
857 else
859
860 if (PG_ARGISNULL(1))
861 payload = "";
862 else
864
865 /* For NOTIFY as a statement, this is checked in ProcessUtility */
867
868 Async_Notify(channel, payload);
869
871}
872
873
874/*
875 * Async_Notify
876 *
877 * This is executed by the SQL notify command.
878 *
879 * Adds the message to the list of pending notifies.
880 * Actual notification happens during transaction commit.
881 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
882 */
883void
884Async_Notify(const char *channel, const char *payload)
885{
886 int my_level = GetCurrentTransactionNestLevel();
887 size_t channel_len;
888 size_t payload_len;
889 Notification *n;
890 MemoryContext oldcontext;
891
892 if (IsParallelWorker())
893 elog(ERROR, "cannot send notifications from a parallel worker");
894
895 if (Trace_notify)
896 elog(DEBUG1, "Async_Notify(%s)", channel);
897
898 channel_len = channel ? strlen(channel) : 0;
899 payload_len = payload ? strlen(payload) : 0;
900
901 /* a channel name must be specified */
902 if (channel_len == 0)
905 errmsg("channel name cannot be empty")));
906
907 /* enforce length limits */
908 if (channel_len >= NAMEDATALEN)
911 errmsg("channel name too long")));
912
913 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
916 errmsg("payload string too long")));
917
918 /*
919 * We must construct the Notification entry, even if we end up not using
920 * it, in order to compare it cheaply to existing list entries.
921 *
922 * The notification list needs to live until end of transaction, so store
923 * it in the transaction context.
924 */
926
928 channel_len + payload_len + 2);
929 n->channel_len = channel_len;
930 n->payload_len = payload_len;
931 strcpy(n->data, channel);
932 if (payload)
933 strcpy(n->data + channel_len + 1, payload);
934 else
935 n->data[channel_len + 1] = '\0';
936
937 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
938 {
940
941 /*
942 * First notify event in current (sub)xact. Note that we allocate the
943 * NotificationList in TopTransactionContext; the nestingLevel might
944 * get changed later by AtSubCommit_Notify.
945 */
948 sizeof(NotificationList));
949 notifies->nestingLevel = my_level;
950 notifies->events = list_make1(n);
951 /* We certainly don't need a hashtable yet */
952 notifies->hashtab = NULL;
953 /* We won't build uniqueChannelNames/Hash till later, either */
954 notifies->uniqueChannelNames = NIL;
955 notifies->uniqueChannelHash = NULL;
956 notifies->upper = pendingNotifies;
958 }
959 else
960 {
961 /* Now check for duplicates */
963 {
964 /* It's a dup, so forget it */
965 pfree(n);
966 MemoryContextSwitchTo(oldcontext);
967 return;
968 }
969
970 /* Append more events to existing list */
972 }
973
974 MemoryContextSwitchTo(oldcontext);
975}
976
977/*
978 * queue_listen
979 * Common code for listen, unlisten, unlisten all commands.
980 *
981 * Adds the request to the list of pending actions.
982 * Actual update of localChannelTable and globalChannelTable happens during
983 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
984 */
985static void
986queue_listen(ListenActionKind action, const char *channel)
987{
988 MemoryContext oldcontext;
990 int my_level = GetCurrentTransactionNestLevel();
991
992 /*
993 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
994 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
995 * final per-channel intent is computed during PreCommit_Notify.
996 */
998
999 /* space for terminating null is included in sizeof(ListenAction) */
1001 strlen(channel) + 1);
1002 actrec->action = action;
1003 strcpy(actrec->channel, channel);
1004
1005 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1006 {
1007 ActionList *actions;
1008
1009 /*
1010 * First action in current sub(xact). Note that we allocate the
1011 * ActionList in TopTransactionContext; the nestingLevel might get
1012 * changed later by AtSubCommit_Notify.
1013 */
1014 actions = (ActionList *)
1016 actions->nestingLevel = my_level;
1017 actions->actions = list_make1(actrec);
1018 actions->upper = pendingActions;
1019 pendingActions = actions;
1020 }
1021 else
1023
1024 MemoryContextSwitchTo(oldcontext);
1025}
1026
1027/*
1028 * Async_Listen
1029 *
1030 * This is executed by the SQL listen command.
1031 */
1032void
1033Async_Listen(const char *channel)
1034{
1035 if (Trace_notify)
1036 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1037
1038 queue_listen(LISTEN_LISTEN, channel);
1039}
1040
1041/*
1042 * Async_Unlisten
1043 *
1044 * This is executed by the SQL unlisten command.
1045 */
1046void
1047Async_Unlisten(const char *channel)
1048{
1049 if (Trace_notify)
1050 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1051
1052 /* If we couldn't possibly be listening, no need to queue anything */
1054 return;
1055
1056 queue_listen(LISTEN_UNLISTEN, channel);
1057}
1058
1059/*
1060 * Async_UnlistenAll
1061 *
1062 * This is invoked by UNLISTEN * command, and also at backend exit.
1063 */
1064void
1066{
1067 if (Trace_notify)
1068 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1069
1070 /* If we couldn't possibly be listening, no need to queue anything */
1072 return;
1073
1075}
1076
1077/*
1078 * SQL function: return a set of the channel names this backend is actively
1079 * listening to.
1080 *
1081 * Note: this coding relies on the fact that the localChannelTable cannot
1082 * change within a transaction.
1083 */
1084Datum
1086{
1088 HASH_SEQ_STATUS *status;
1089
1090 /* stuff done only on the first call of the function */
1091 if (SRF_IS_FIRSTCALL())
1092 {
1093 /* create a function context for cross-call persistence */
1095
1096 /* Initialize hash table iteration if we have any channels */
1097 if (localChannelTable != NULL)
1098 {
1099 MemoryContext oldcontext;
1100
1101 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1102 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1104 funcctx->user_fctx = status;
1105 MemoryContextSwitchTo(oldcontext);
1106 }
1107 else
1108 {
1109 funcctx->user_fctx = NULL;
1110 }
1111 }
1112
1113 /* stuff done on every call of the function */
1115 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1116
1117 if (status != NULL)
1118 {
1119 ChannelName *entry;
1120
1121 entry = (ChannelName *) hash_seq_search(status);
1122 if (entry != NULL)
1124 }
1125
1127}
1128
1129/*
1130 * Async_UnlistenOnExit
1131 *
1132 * This is executed at backend exit if we have done any LISTENs in this
1133 * backend. It might not be necessary anymore, if the user UNLISTENed
1134 * everything, but we don't try to detect that case.
1135 */
1136static void
1138{
1141}
1142
1143/*
1144 * AtPrepare_Notify
1145 *
1146 * This is called at the prepare phase of a two-phase
1147 * transaction. Save the state for possible commit later.
1148 */
1149void
1150AtPrepare_Notify(void)
1151{
1152 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1154 ereport(ERROR,
1156 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1157}
1158
1159/*
1160 * PreCommit_Notify
1161 *
1162 * This is called at transaction commit, before actually committing to
1163 * clog.
1164 *
1165 * If there are pending LISTEN actions, make sure we are listed in the
1166 * shared-memory listener array. This must happen before commit to
1167 * ensure we don't miss any notifies from transactions that commit
1168 * just after ours.
1169 *
1170 * If there are outbound notify requests in the pendingNotifies list,
1171 * add them to the global queue. We do that before commit so that
1172 * we can still throw error if we run out of queue space.
1173 */
1174void
1175PreCommit_Notify(void)
1176{
1177 ListCell *p;
1178
1180 return; /* no relevant statements in this xact */
1181
1182 if (Trace_notify)
1183 elog(DEBUG1, "PreCommit_Notify");
1184
1185 /* Preflight for any pending listen/unlisten actions */
1187
1188 if (pendingActions != NULL)
1189 {
1190 /* Ensure we have a local channel table */
1192 /* Create pendingListenActions hash table for this transaction */
1194
1195 /* Stage all the actions this transaction wants to perform */
1196 foreach(p, pendingActions->actions)
1197 {
1199
1200 switch (actrec->action)
1201 {
1202 case LISTEN_LISTEN:
1205 break;
1206 case LISTEN_UNLISTEN:
1208 break;
1211 break;
1212 }
1213 }
1214 }
1215
1216 /* Queue any pending notifies (must happen after the above) */
1217 if (pendingNotifies)
1218 {
1220 bool firstIteration = true;
1221
1222 /*
1223 * Build list of unique channel names being notified for use by
1224 * SignalBackends().
1225 *
1226 * If uniqueChannelHash is available, use it to efficiently get the
1227 * unique channels. Otherwise, fall back to the O(N^2) approach.
1228 */
1231 {
1232 HASH_SEQ_STATUS status;
1234
1236 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1239 channelEntry->channel);
1240 }
1241 else
1242 {
1243 /* O(N^2) approach is better for small number of notifications */
1245 {
1246 char *channel = n->data;
1247 bool found = false;
1248
1249 /* Name present in list? */
1251 {
1252 if (strcmp(oldchan, channel) == 0)
1253 {
1254 found = true;
1255 break;
1256 }
1257 }
1258 /* Add if not already in list */
1259 if (!found)
1262 channel);
1263 }
1264 }
1265
1266 /* Preallocate workspace that will be needed by SignalBackends() */
1267 if (signalPids == NULL)
1269 MaxBackends * sizeof(int32));
1270
1271 if (signalProcnos == NULL)
1273 MaxBackends * sizeof(ProcNumber));
1274
1275 /*
1276 * Make sure that we have an XID assigned to the current transaction.
1277 * GetCurrentTransactionId is cheap if we already have an XID, but not
1278 * so cheap if we don't, and we'd prefer not to do that work while
1279 * holding NotifyQueueLock.
1280 */
1282
1283 /*
1284 * Serialize writers by acquiring a special lock that we hold till
1285 * after commit. This ensures that queue entries appear in commit
1286 * order, and in particular that there are never uncommitted queue
1287 * entries ahead of committed ones, so an uncommitted transaction
1288 * can't block delivery of deliverable notifications.
1289 *
1290 * We use a heavyweight lock so that it'll automatically be released
1291 * after either commit or abort. This also allows deadlocks to be
1292 * detected, though really a deadlock shouldn't be possible here.
1293 *
1294 * The lock is on "database 0", which is pretty ugly but it doesn't
1295 * seem worth inventing a special locktag category just for this.
1296 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1297 * used by the flatfiles mechanism.)
1298 */
1301
1302 /*
1303 * For the direct advancement optimization in SignalBackends(), we
1304 * need to ensure that no other backend can insert queue entries
1305 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1306 * heavyweight lock above provides this guarantee, since it serializes
1307 * all writers.
1308 *
1309 * Note: if the heavyweight lock were ever removed for scalability
1310 * reasons, we could achieve the same guarantee by holding
1311 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1312 * than releasing and reacquiring it for each page as we do below.
1313 */
1314
1315 /* Initialize values to a safe default in case list is empty */
1318
1319 /* Now push the notifications into the queue */
1321 while (nextNotify != NULL)
1322 {
1323 /*
1324 * Add the pending notifications to the queue. We acquire and
1325 * release NotifyQueueLock once per page, which might be overkill
1326 * but it does allow readers to get in while we're doing this.
1327 *
1328 * A full queue is very uncommon and should really not happen,
1329 * given that we have so much space available in the SLRU pages.
1330 * Nevertheless we need to deal with this possibility. Note that
1331 * when we get here we are in the process of committing our
1332 * transaction, but we have not yet committed to clog, so at this
1333 * point in time we can still roll the transaction back.
1334 */
1336 if (firstIteration)
1337 {
1339 firstIteration = false;
1340 }
1342 if (asyncQueueIsFull())
1343 ereport(ERROR,
1345 errmsg("too many notifications in the NOTIFY queue")));
1349 }
1350
1351 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1352 }
1353}
1354
1355/*
1356 * AtCommit_Notify
1357 *
1358 * This is called at transaction commit, after committing to clog.
1359 *
1360 * Apply pending listen/unlisten changes and clear transaction-local state.
1361 *
1362 * If we issued any notifications in the transaction, send signals to
1363 * listening backends (possibly including ourselves) to process them.
1364 * Also, if we filled enough queue pages with new notifies, try to
1365 * advance the queue tail pointer.
1366 */
1367void
1368AtCommit_Notify(void)
1369{
1370 /*
1371 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1372 * return as soon as possible
1373 */
1375 return;
1376
1377 if (Trace_notify)
1378 elog(DEBUG1, "AtCommit_Notify");
1379
1380 /* Apply staged listen/unlisten changes */
1382
1383 /* If no longer listening to anything, get out of listener array */
1386
1387 /*
1388 * Send signals to listening backends. We need do this only if there are
1389 * pending notifies, which were previously added to the shared queue by
1390 * PreCommit_Notify().
1391 */
1392 if (pendingNotifies != NULL)
1394
1395 /*
1396 * If it's time to try to advance the global tail pointer, do that.
1397 *
1398 * (It might seem odd to do this in the sender, when more than likely the
1399 * listeners won't yet have read the messages we just sent. However,
1400 * there's less contention if only the sender does it, and there is little
1401 * need for urgency in advancing the global tail. So this typically will
1402 * be clearing out messages that were sent some time ago.)
1403 */
1404 if (tryAdvanceTail)
1405 {
1406 tryAdvanceTail = false;
1408 }
1409
1410 /* And clean up */
1412}
1413
1414/*
1415 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1416 *
1417 * This function must make sure we are ready to catch any incoming messages.
1418 */
1419static void
1421{
1422 QueuePosition head;
1423 QueuePosition max;
1425
1426 /*
1427 * Nothing to do if we are already listening to something, nor if we
1428 * already ran this routine in this transaction.
1429 */
1431 return;
1432
1433 if (Trace_notify)
1434 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1435
1436 /*
1437 * Before registering, make sure we will unlisten before dying. (Note:
1438 * this action does not get undone if we abort later.)
1439 */
1441 {
1444 }
1445
1446 /*
1447 * This is our first LISTEN, so establish our pointer.
1448 *
1449 * We set our pointer to the global tail pointer and then move it forward
1450 * over already-committed notifications. This ensures we cannot miss any
1451 * not-yet-committed notifications. We might get a few more but that
1452 * doesn't hurt.
1453 *
1454 * In some scenarios there might be a lot of committed notifications that
1455 * have not yet been pruned away (because some backend is being lazy about
1456 * reading them). To reduce our startup time, we can look at other
1457 * backends and adopt the maximum "pos" pointer of any backend that's in
1458 * our database; any notifications it's already advanced over are surely
1459 * committed and need not be re-examined by us. (We must consider only
1460 * backends connected to our DB, because others will not have bothered to
1461 * check committed-ness of notifications in our DB.)
1462 *
1463 * We need exclusive lock here so we can look at other backends' entries
1464 * and manipulate the list links.
1465 */
1467 head = QUEUE_HEAD;
1468 max = QUEUE_TAIL;
1471 {
1473 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1474 /* Also find last listening backend before this one */
1475 if (i < MyProcNumber)
1476 prevListener = i;
1477 }
1483 /* Insert backend into list of listeners at correct position */
1485 {
1488 }
1489 else
1490 {
1493 }
1495
1496 /* Now we are listed in the global array, so remember we're listening */
1497 amRegisteredListener = true;
1498
1499 /*
1500 * Try to move our pointer forward as far as possible. This will skip
1501 * over already-committed notifications, which we want to do because they
1502 * might be quite stale. Note that we are not yet listening on anything,
1503 * so we won't deliver such notifications to our frontend. Also, although
1504 * our transaction might have executed NOTIFY, those message(s) aren't
1505 * queued yet so we won't skip them here.
1506 */
1507 if (!QUEUE_POS_EQUAL(max, head))
1509}
1510
1511/*
1512 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1513 *
1514 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1515 * an entry in localChannelTable, and pre-allocating an entry in the shared
1516 * globalChannelTable with listening=false. The listening flag will be set
1517 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1518 * will be removed.
1519 */
1520static void
1521PrepareTableEntriesForListen(const char *channel)
1522{
1524 GlobalChannelEntry *entry;
1525 bool found;
1527 PendingListenEntry *pending;
1528
1529 /*
1530 * Record in local pending hash that we want to LISTEN, overwriting any
1531 * earlier attempt to UNLISTEN.
1532 */
1533 pending = (PendingListenEntry *)
1535 pending->action = PENDING_LISTEN;
1536
1537 /*
1538 * Ensure that there is an entry for the channel in localChannelTable.
1539 * (Should this fail, we can just roll back.) If the transaction fails
1540 * after this point, we will remove the entry if appropriate during
1541 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1542 * to return TRUE; we assume nothing is going to consult that before
1543 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1544 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1545 * present to ensure they do the right things; see
1546 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1547 */
1549
1550 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1551 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1552 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1553
1554 if (!found)
1555 {
1556 /* New channel entry, so initialize it to a safe state */
1558 entry->numListeners = 0;
1559 entry->allocatedListeners = 0;
1560 }
1561
1562 /*
1563 * Create listenersArray if entry doesn't have one. It's tempting to fold
1564 * this into the !found case, but this coding allows us to cope in case
1565 * dsa_allocate() failed in an earlier attempt.
1566 */
1567 if (!DsaPointerIsValid(entry->listenersArray))
1568 {
1572 }
1573
1576
1577 /*
1578 * Check if we already have a ListenerEntry (possibly from earlier in this
1579 * transaction)
1580 */
1581 for (int i = 0; i < entry->numListeners; i++)
1582 {
1583 if (listeners[i].procNo == MyProcNumber)
1584 {
1585 /* Already have an entry; listening flag stays as-is until commit */
1587 return;
1588 }
1589 }
1590
1591 /* Need to add a new entry; grow array if necessary */
1592 if (entry->numListeners >= entry->allocatedListeners)
1593 {
1594 int new_size = entry->allocatedListeners * 2;
1597 sizeof(ListenerEntry) * new_size);
1599
1601 entry->listenersArray = new_array;
1605 }
1606
1607 listeners[entry->numListeners].procNo = MyProcNumber;
1608 listeners[entry->numListeners].listening = false; /* staged, not yet
1609 * committed */
1610 entry->numListeners++;
1611
1613}
1614
1615/*
1616 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1617 *
1618 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1619 * we're currently listening (committed or staged). We don't touch
1620 * globalChannelTable yet - the listener keeps receiving signals until
1621 * commit, when the entry is removed.
1622 */
1623static void
1624PrepareTableEntriesForUnlisten(const char *channel)
1625{
1626 PendingListenEntry *pending;
1627
1628 /*
1629 * If the channel name is not in localChannelTable, then we are neither
1630 * listening on it nor preparing to listen on it, so we don't need to
1631 * record an UNLISTEN action.
1632 */
1634 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1635 return;
1636
1637 /*
1638 * Record in local pending hash that we want to UNLISTEN, overwriting any
1639 * earlier attempt to LISTEN. Don't touch localChannelTable or
1640 * globalChannelTable yet - we keep receiving signals until commit.
1641 */
1642 pending = (PendingListenEntry *)
1644 pending->action = PENDING_UNLISTEN;
1645}
1646
1647/*
1648 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1649 *
1650 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1651 * about-to-be-listened channels in pendingListenActions.
1652 */
1653static void
1655{
1658 PendingListenEntry *pending;
1659
1660 /*
1661 * Scan localChannelTable, which will have the names of all channels that
1662 * we are listening on or have prepared to listen on. Record an UNLISTEN
1663 * action for each one, overwriting any earlier attempt to LISTEN.
1664 */
1666 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1667 {
1668 pending = (PendingListenEntry *)
1670 pending->action = PENDING_UNLISTEN;
1671 }
1672}
1673
1674/*
1675 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1676 *
1677 * Decrements numListeners, compacts the array, and frees the entry if empty.
1678 * Sets *entry_ptr to NULL if the entry was deleted.
1679 *
1680 * We could get the listeners pointer from the entry, but all callers
1681 * already have it at hand.
1682 */
1683static void
1686 int idx)
1687{
1688 GlobalChannelEntry *entry = *entry_ptr;
1689
1690 entry->numListeners--;
1691 if (idx < entry->numListeners)
1693 sizeof(ListenerEntry) * (entry->numListeners - idx));
1694
1695 if (entry->numListeners == 0)
1696 {
1699 /* tells caller not to release the entry's lock: */
1700 *entry_ptr = NULL;
1701 }
1702}
1703
1704/*
1705 * ApplyPendingListenActions
1706 *
1707 * Apply, or revert, staged listen/unlisten changes to the local and global
1708 * hash tables.
1709 */
1710static void
1712{
1714 PendingListenEntry *pending;
1715
1716 /* Quick exit if nothing to do */
1718 return;
1719
1720 /* We made a globalChannelTable before building pendingListenActions */
1721 if (globalChannelTable == NULL)
1722 elog(PANIC, "global channel table missing post-commit/abort");
1723
1724 /* For each staged action ... */
1726 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1727 {
1729 GlobalChannelEntry *entry;
1730 bool removeLocal = true;
1731 bool foundListener = false;
1732
1733 /*
1734 * Find the global entry for this channel. If isCommit, it had better
1735 * exist (it was created in PreCommit). In an abort, it might not
1736 * exist, in which case we are not listening and should discard any
1737 * local entry that PreCommit may have managed to create.
1738 */
1739 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1740 entry = dshash_find(globalChannelTable, &key, true);
1741 if (entry != NULL)
1742 {
1743 /* Scan entry to find the ListenerEntry for this backend */
1745
1748
1749 for (int i = 0; i < entry->numListeners; i++)
1750 {
1751 if (listeners[i].procNo != MyProcNumber)
1752 continue;
1753 foundListener = true;
1754 if (isCommit)
1755 {
1756 if (pending->action == PENDING_LISTEN)
1757 {
1758 /*
1759 * LISTEN being committed: set listening=true.
1760 * localChannelTable entry was created during
1761 * PreCommit and should be kept.
1762 */
1763 listeners[i].listening = true;
1764 removeLocal = false;
1765 }
1766 else
1767 {
1768 /*
1769 * UNLISTEN being committed: remove pre-allocated
1770 * entries from both tables.
1771 */
1773 }
1774 }
1775 else
1776 {
1777 /*
1778 * Note: this part is reachable only if the transaction
1779 * aborts after PreCommit_Notify() has made some
1780 * pendingListenActions entries, so it's pretty hard to
1781 * test.
1782 */
1783 if (!listeners[i].listening)
1784 {
1785 /*
1786 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1787 * and we weren't listening before, so remove
1788 * pre-allocated entries from both tables.
1789 */
1791 }
1792 else
1793 {
1794 /*
1795 * We're aborting, but the previous state was that
1796 * we're listening, so keep localChannelTable entry.
1797 */
1798 removeLocal = false;
1799 }
1800 }
1801 break; /* there shouldn't be another match */
1802 }
1803
1804 /* We might have already released the entry by removing it */
1805 if (entry != NULL)
1807 }
1808
1809 /*
1810 * If we're committing a LISTEN action, we should have found a
1811 * matching ListenerEntry, but otherwise it's okay if we didn't.
1812 */
1813 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1814 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1815 pending->channel, MyProcNumber);
1816
1817 /*
1818 * If we did not find a globalChannelTable entry for our backend, or
1819 * if we are unlistening, remove any localChannelTable entry that may
1820 * exist. (Note in particular that this cleans up if we created a
1821 * localChannelTable entry and then failed while trying to create a
1822 * globalChannelTable entry.)
1823 */
1826 HASH_REMOVE, NULL);
1827 }
1828}
1829
1830/*
1831 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1832 *
1833 * Remove this backend from all channels in the shared global table.
1834 */
1835static void
1837{
1838 dshash_seq_status status;
1839 GlobalChannelEntry *entry;
1840
1841 if (Trace_notify)
1842 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1843
1844 /* Clear our local cache (not really necessary, but be consistent) */
1845 if (localChannelTable != NULL)
1846 {
1849 }
1850
1851 /* Now remove our entries from the shared globalChannelTable */
1852 if (globalChannelTable == NULL)
1853 return;
1854
1855 dshash_seq_init(&status, globalChannelTable, true);
1856 while ((entry = dshash_seq_next(&status)) != NULL)
1857 {
1859
1860 if (entry->key.dboid != MyDatabaseId)
1861 continue; /* not relevant */
1862
1865
1866 for (int i = 0; i < entry->numListeners; i++)
1867 {
1868 if (listeners[i].procNo == MyProcNumber)
1869 {
1870 entry->numListeners--;
1871 if (i < entry->numListeners)
1872 memmove(&listeners[i], &listeners[i + 1],
1873 sizeof(ListenerEntry) * (entry->numListeners - i));
1874
1875 if (entry->numListeners == 0)
1876 {
1878 dshash_delete_current(&status);
1879 }
1880 break;
1881 }
1882 }
1883 }
1884 dshash_seq_term(&status);
1885}
1886
1887/*
1888 * Test whether we are actively listening on the given channel name.
1889 *
1890 * Note: this function is executed for every notification found in the queue.
1891 */
1892static bool
1893IsListeningOn(const char *channel)
1894{
1895 if (localChannelTable == NULL)
1896 return false;
1897
1898 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1899}
1900
1901/*
1902 * Remove our entry from the listeners array when we are no longer listening
1903 * on any channel. NB: must not fail if we're already not listening.
1904 */
1905static void
1907{
1908 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1909
1910 if (!amRegisteredListener) /* nothing to do */
1911 return;
1912
1913 /*
1914 * Need exclusive lock here to manipulate list links.
1915 */
1917 /* Mark our entry as invalid */
1922 /* and remove it from the list */
1925 else
1926 {
1928 {
1930 {
1932 break;
1933 }
1934 }
1935 }
1938
1939 /* mark ourselves as no longer listed in the global array */
1940 amRegisteredListener = false;
1941}
1942
1943/*
1944 * Test whether there is room to insert more notification messages.
1945 *
1946 * Caller must hold at least shared NotifyQueueLock.
1947 */
1948static bool
1949asyncQueueIsFull(void)
1950{
1951 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1953 int64 occupied = headPage - tailPage;
1954
1956}
1957
1958/*
1959 * Advance the QueuePosition to the next entry, assuming that the current
1960 * entry is of length entryLength. If we jump to a new page the function
1961 * returns true, else false.
1962 */
1963static bool
1964asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1965{
1966 int64 pageno = QUEUE_POS_PAGE(*position);
1967 int offset = QUEUE_POS_OFFSET(*position);
1968 bool pageJump = false;
1969
1970 /*
1971 * Move to the next writing position: First jump over what we have just
1972 * written or read.
1973 */
1974 offset += entryLength;
1975 Assert(offset <= QUEUE_PAGESIZE);
1976
1977 /*
1978 * In a second step check if another entry can possibly be written to the
1979 * page. If so, stay here, we have reached the next position. If not, then
1980 * we need to move on to the next page.
1981 */
1983 {
1984 pageno++;
1985 offset = 0;
1986 pageJump = true;
1987 }
1988
1989 SET_QUEUE_POS(*position, pageno, offset);
1990 return pageJump;
1991}
1992
1993/*
1994 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
1995 */
1996static void
1998{
1999 size_t channellen = n->channel_len;
2000 size_t payloadlen = n->payload_len;
2001 int entryLength;
2002
2005
2006 /* The terminators are already included in AsyncQueueEntryEmptySize */
2009 qe->length = entryLength;
2010 qe->dboid = MyDatabaseId;
2011 qe->xid = GetCurrentTransactionId();
2012 qe->srcPid = MyProcPid;
2013 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2014}
2015
2016/*
2017 * Add pending notifications to the queue.
2018 *
2019 * We go page by page here, i.e. we stop once we have to go to a new page but
2020 * we will be called again and then fill that next page. If an entry does not
2021 * fit into the current page, we write a dummy entry with an InvalidOid as the
2022 * database OID in order to fill the page. So every page is always used up to
2023 * the last byte which simplifies reading the page later.
2024 *
2025 * We are passed the list cell (in pendingNotifies->events) containing the next
2026 * notification to write and return the first still-unwritten cell back.
2027 * Eventually we will return NULL indicating all is done.
2028 *
2029 * We are holding NotifyQueueLock already from the caller and grab
2030 * page specific SLRU bank lock locally in this function.
2031 */
2032static ListCell *
2034{
2037 int64 pageno;
2038 int offset;
2039 int slotno;
2041
2042 /*
2043 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2044 * memory upon exiting. The reason for this is that if we have to advance
2045 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2046 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2047 * subsequent insertions would try to put entries into a page that slru.c
2048 * thinks doesn't exist yet.) So, use a local position variable. Note
2049 * that if we do fail, any already-inserted queue entries are forgotten;
2050 * this is okay, since they'd be useless anyway after our transaction
2051 * rolls back.
2052 */
2054
2055 /*
2056 * If this is the first write since the postmaster started, we need to
2057 * initialize the first page of the async SLRU. Otherwise, the current
2058 * page should be initialized already, so just fetch it.
2059 */
2060 pageno = QUEUE_POS_PAGE(queue_head);
2062
2063 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2065
2068 else
2069 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2071
2072 /* Note we mark the page dirty before writing in it */
2073 NotifyCtl->shared->page_dirty[slotno] = true;
2074
2075 while (nextNotify != NULL)
2076 {
2078
2079 /* Construct a valid queue entry in local variable qe */
2081
2082 offset = QUEUE_POS_OFFSET(queue_head);
2083
2084 /* Check whether the entry really fits on the current page */
2085 if (offset + qe.length <= QUEUE_PAGESIZE)
2086 {
2087 /* OK, so advance nextNotify past this item */
2089 }
2090 else
2091 {
2092 /*
2093 * Write a dummy entry to fill up the page. Actually readers will
2094 * only check dboid and since it won't match any reader's database
2095 * OID, they will ignore this entry and move on.
2096 */
2097 qe.length = QUEUE_PAGESIZE - offset;
2098 qe.dboid = InvalidOid;
2100 qe.data[0] = '\0'; /* empty channel */
2101 qe.data[1] = '\0'; /* empty payload */
2102 }
2103
2104 /* Now copy qe into the shared buffer page */
2105 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2106 &qe,
2107 qe.length);
2108
2109 /* Advance queue_head appropriately, and detect if page is full */
2110 if (asyncQueueAdvance(&(queue_head), qe.length))
2111 {
2112 LWLock *lock;
2113
2114 pageno = QUEUE_POS_PAGE(queue_head);
2115 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2116 if (lock != prevlock)
2117 {
2120 prevlock = lock;
2121 }
2122
2123 /*
2124 * Page is full, so we're done here, but first fill the next page
2125 * with zeroes. The reason to do this is to ensure that slru.c's
2126 * idea of the head page is always the same as ours, which avoids
2127 * boundary problems in SimpleLruTruncate. The test in
2128 * asyncQueueIsFull() ensured that there is room to create this
2129 * page without overrunning the queue.
2130 */
2132
2133 /*
2134 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2135 * set flag to remember that we should try to advance the tail
2136 * pointer (we don't want to actually do that right here).
2137 */
2139 tryAdvanceTail = true;
2140
2141 /* And exit the loop */
2142 break;
2143 }
2144 }
2145
2146 /* Success, so update the global QUEUE_HEAD */
2148
2150
2151 return nextNotify;
2152}
2153
2154/*
2155 * SQL function to return the fraction of the notification queue currently
2156 * occupied.
2157 */
2158Datum
2160{
2161 double usage;
2162
2163 /* Advance the queue tail so we don't report a too-large result */
2165
2169
2171}
2172
2173/*
2174 * Return the fraction of the queue that is currently occupied.
2175 *
2176 * The caller must hold NotifyQueueLock in (at least) shared mode.
2177 *
2178 * Note: we measure the distance to the logical tail page, not the physical
2179 * tail page. In some sense that's wrong, but the relative position of the
2180 * physical tail is affected by details such as SLRU segment boundaries,
2181 * so that a result based on that is unpleasantly unstable.
2182 */
2183static double
2184asyncQueueUsage(void)
2185{
2186 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2188 int64 occupied = headPage - tailPage;
2189
2190 if (occupied == 0)
2191 return (double) 0; /* fast exit for common case */
2192
2193 return (double) occupied / (double) max_notify_queue_pages;
2194}
2195
2196/*
2197 * Check whether the queue is at least half full, and emit a warning if so.
2198 *
2199 * This is unlikely given the size of the queue, but possible.
2200 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2201 *
2202 * Caller must hold exclusive NotifyQueueLock.
2203 */
2204static void
2206{
2207 double fillDegree;
2208 TimestampTz t;
2209
2211 if (fillDegree < 0.5)
2212 return;
2213
2214 t = GetCurrentTimestamp();
2215
2218 {
2221
2223 {
2225 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2228 }
2229
2231 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2232 (minPid != InvalidPid ?
2233 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2234 : 0),
2235 (minPid != InvalidPid ?
2236 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2237 : 0)));
2238
2240 }
2241}
2242
2243/*
2244 * Send signals to listening backends.
2245 *
2246 * Normally we signal only backends that are interested in the notifies that
2247 * we just sent. However, that will leave idle listeners falling further and
2248 * further behind. Waken them anyway if they're far enough behind, so they'll
2249 * advance their queue position pointers, allowing the global tail to advance.
2250 *
2251 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2252 *
2253 * This is called during CommitTransaction(), so it's important for it
2254 * to have very low probability of failure.
2255 */
2256static void
2257SignalBackends(void)
2258{
2259 int count;
2260
2261 /* Can't get here without PreCommit_Notify having made the global table */
2263
2264 /* It should have set up these arrays, too */
2266
2267 /*
2268 * Identify backends that we need to signal. We don't want to send
2269 * signals while holding the NotifyQueueLock, so this part just builds a
2270 * list of target PIDs in signalPids[] and signalProcnos[].
2271 */
2272 count = 0;
2273
2275
2276 /* Scan each channel name that we notified in this transaction */
2278 {
2280 GlobalChannelEntry *entry;
2282
2283 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2284 entry = dshash_find(globalChannelTable, &key, false);
2285 if (entry == NULL)
2286 continue; /* nobody is listening */
2287
2289 entry->listenersArray);
2290
2291 /* Identify listeners that now need waking, add them to arrays */
2292 for (int j = 0; j < entry->numListeners; j++)
2293 {
2294 ProcNumber i;
2295 int32 pid;
2296 QueuePosition pos;
2297
2298 if (!listeners[j].listening)
2299 continue; /* ignore not-yet-committed listeners */
2300
2301 i = listeners[j].procNo;
2302
2304 continue; /* already signaled, no need to repeat */
2305
2306 pid = QUEUE_BACKEND_PID(i);
2307 pos = QUEUE_BACKEND_POS(i);
2308
2309 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2310 continue; /* it's fully caught up already */
2311
2312 Assert(pid != InvalidPid);
2313
2315 signalPids[count] = pid;
2316 signalProcnos[count] = i;
2317 count++;
2318 }
2319
2321 }
2322
2323 /*
2324 * Scan all listeners. Any that are not already pending wakeup must not
2325 * be interested in our notifications (else we'd have set their wakeup
2326 * flags above). Check to see if we can directly advance their queue
2327 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2328 * them anyway so they will catch up.
2329 */
2331 {
2332 int32 pid;
2333 QueuePosition pos;
2334
2336 continue;
2337
2338 /* If it's currently advancing, we should not touch it */
2340 continue;
2341
2342 pid = QUEUE_BACKEND_PID(i);
2343 pos = QUEUE_BACKEND_POS(i);
2344
2345 /*
2346 * We can directly advance the other backend's queue pointer if it's
2347 * not currently advancing (else there are race conditions), and its
2348 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2349 * it miss some older messages), and we'd not be moving the pointer
2350 * backward.
2351 */
2354 {
2355 /* We can directly advance its pointer past what we wrote */
2357 }
2360 {
2361 /* It's idle and far behind, so wake it up */
2362 Assert(pid != InvalidPid);
2363
2365 signalPids[count] = pid;
2366 signalProcnos[count] = i;
2367 count++;
2368 }
2369 }
2370
2372
2373 /* Now send signals */
2374 for (int i = 0; i < count; i++)
2375 {
2376 int32 pid = signalPids[i];
2377
2378 /*
2379 * If we are signaling our own process, no need to involve the kernel;
2380 * just set the flag directly.
2381 */
2382 if (pid == MyProcPid)
2383 {
2385 continue;
2386 }
2387
2388 /*
2389 * Note: assuming things aren't broken, a signal failure here could
2390 * only occur if the target backend exited since we released
2391 * NotifyQueueLock; which is unlikely but certainly possible. So we
2392 * just log a low-level debug message if it happens.
2393 */
2395 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2396 }
2397}
2398
2399/*
2400 * AtAbort_Notify
2401 *
2402 * This is called at transaction abort.
2403 *
2404 * Revert any staged listen/unlisten changes and clean up transaction state.
2405 * This only does anything if we abort after PreCommit_Notify has staged
2406 * some entries.
2407 */
2408void
2409AtAbort_Notify(void)
2410{
2411 /* Revert staged listen/unlisten changes */
2413
2414 /* If we're no longer listening on anything, unregister */
2417
2418 /* And clean up */
2420}
2421
2422/*
2423 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2424 *
2425 * Reassign all items in the pending lists to the parent transaction.
2426 */
2427void
2429{
2430 int my_level = GetCurrentTransactionNestLevel();
2431
2432 /* If there are actions at our nesting level, we must reparent them. */
2433 if (pendingActions != NULL &&
2434 pendingActions->nestingLevel >= my_level)
2435 {
2436 if (pendingActions->upper == NULL ||
2437 pendingActions->upper->nestingLevel < my_level - 1)
2438 {
2439 /* nothing to merge; give the whole thing to the parent */
2441 }
2442 else
2443 {
2445
2447
2448 /*
2449 * Mustn't try to eliminate duplicates here --- see queue_listen()
2450 */
2453 childPendingActions->actions);
2455 }
2456 }
2457
2458 /* If there are notifies at our nesting level, we must reparent them. */
2459 if (pendingNotifies != NULL &&
2460 pendingNotifies->nestingLevel >= my_level)
2461 {
2462 Assert(pendingNotifies->nestingLevel == my_level);
2463
2464 if (pendingNotifies->upper == NULL ||
2465 pendingNotifies->upper->nestingLevel < my_level - 1)
2466 {
2467 /* nothing to merge; give the whole thing to the parent */
2469 }
2470 else
2471 {
2472 /*
2473 * Formerly, we didn't bother to eliminate duplicates here, but
2474 * now we must, else we fall foul of "Assert(!found)", either here
2475 * or during a later attempt to build the parent-level hashtable.
2476 */
2478 ListCell *l;
2479
2481 /* Insert all the subxact's events into parent, except for dups */
2482 foreach(l, childPendingNotifies->events)
2483 {
2485
2488 }
2490 }
2491 }
2492}
2493
2494/*
2495 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2496 */
2497void
2499{
2500 int my_level = GetCurrentTransactionNestLevel();
2501
2502 /*
2503 * All we have to do is pop the stack --- the actions/notifies made in
2504 * this subxact are no longer interesting, and the space will be freed
2505 * when CurTransactionContext is recycled. We still have to free the
2506 * ActionList and NotificationList objects themselves, though, because
2507 * those are allocated in TopTransactionContext.
2508 *
2509 * Note that there might be no entries at all, or no entries for the
2510 * current subtransaction level, either because none were ever created, or
2511 * because we reentered this routine due to trouble during subxact abort.
2512 */
2513 while (pendingActions != NULL &&
2514 pendingActions->nestingLevel >= my_level)
2515 {
2517
2520 }
2521
2522 while (pendingNotifies != NULL &&
2523 pendingNotifies->nestingLevel >= my_level)
2524 {
2526
2529 }
2530}
2531
2532/*
2533 * HandleNotifyInterrupt
2534 *
2535 * Signal handler portion of interrupt handling. Let the backend know
2536 * that there's a pending notify interrupt. If we're currently reading
2537 * from the client, this will interrupt the read and
2538 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2539 */
2540void
2542{
2543 /*
2544 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2545 * you do here.
2546 */
2547
2548 /* signal that work needs to be done */
2550
2551 /* make sure the event is processed in due course */
2553}
2554
2555/*
2556 * ProcessNotifyInterrupt
2557 *
2558 * This is called if we see notifyInterruptPending set, just before
2559 * transmitting ReadyForQuery at the end of a frontend command, and
2560 * also if a notify signal occurs while reading from the frontend.
2561 * HandleNotifyInterrupt() will cause the read to be interrupted
2562 * via the process's latch, and this routine will get called.
2563 * If we are truly idle (ie, *not* inside a transaction block),
2564 * process the incoming notifies.
2565 *
2566 * If "flush" is true, force any frontend messages out immediately.
2567 * This can be false when being called at the end of a frontend command,
2568 * since we'll flush after sending ReadyForQuery.
2569 */
2570void
2571ProcessNotifyInterrupt(bool flush)
2572{
2574 return; /* not really idle */
2575
2576 /* Loop in case another signal arrives while sending messages */
2578 ProcessIncomingNotify(flush);
2579}
2580
2581
2582/*
2583 * Read all pending notifications from the queue, and deliver appropriate
2584 * ones to my frontend. Stop when we reach queue head or an uncommitted
2585 * notification.
2586 */
2587static void
2589{
2590 QueuePosition pos;
2591 QueuePosition head;
2592 Snapshot snapshot;
2593
2594 /*
2595 * Fetch current state, indicate to others that we have woken up, and that
2596 * we are in process of advancing our position.
2597 */
2599 /* Assert checks that we have a valid state entry */
2603 head = QUEUE_HEAD;
2604
2605 if (QUEUE_POS_EQUAL(pos, head))
2606 {
2607 /* Nothing to do, we have read all notifications already. */
2609 return;
2610 }
2611
2614
2615 /*----------
2616 * Get snapshot we'll use to decide which xacts are still in progress.
2617 * This is trickier than it might seem, because of race conditions.
2618 * Consider the following example:
2619 *
2620 * Backend 1: Backend 2:
2621 *
2622 * transaction starts
2623 * UPDATE foo SET ...;
2624 * NOTIFY foo;
2625 * commit starts
2626 * queue the notify message
2627 * transaction starts
2628 * LISTEN foo; -- first LISTEN in session
2629 * SELECT * FROM foo WHERE ...;
2630 * commit to clog
2631 * commit starts
2632 * add backend 2 to array of listeners
2633 * advance to queue head (this code)
2634 * commit to clog
2635 *
2636 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2637 * wasn't committed yet. Ideally we'd ensure that client 2 would
2638 * eventually get transaction 1's notify message, but there's no way
2639 * to do that; until we're in the listener array, there's no guarantee
2640 * that the notify message doesn't get removed from the queue.
2641 *
2642 * Therefore the coding technique transaction 2 is using is unsafe:
2643 * applications must commit a LISTEN before inspecting database state,
2644 * if they want to ensure they will see notifications about subsequent
2645 * changes to that state.
2646 *
2647 * What we do guarantee is that we'll see all notifications from
2648 * transactions committing after the snapshot we take here.
2649 * BecomeRegisteredListener has already added us to the listener array,
2650 * so no not-yet-committed messages can be removed from the queue
2651 * before we see them.
2652 *----------
2653 */
2654 snapshot = RegisterSnapshot(GetLatestSnapshot());
2655
2656 /*
2657 * It is possible that we fail while trying to send a message to our
2658 * frontend (for example, because of encoding conversion failure). If
2659 * that happens it is critical that we not try to send the same message
2660 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2661 * ERRORs to FATAL, causing the client connection to be closed on error.
2662 *
2663 * We used to only skip over the offending message and try to soldier on,
2664 * but it was somewhat questionable to lose a notification and give the
2665 * client an ERROR instead. A client application is not be prepared for
2666 * that and can't tell that a notification was missed. It was also not
2667 * very useful in practice because notifications are often processed while
2668 * a connection is idle and reading a message from the client, and in that
2669 * state, any error is upgraded to FATAL anyway. Closing the connection
2670 * is a clear signal to the application that it might have missed
2671 * notifications.
2672 */
2673 {
2675 bool reachedStop;
2676
2677 ExitOnAnyError = true;
2678
2679 do
2680 {
2681 /*
2682 * Process messages up to the stop position, end of page, or an
2683 * uncommitted message.
2684 *
2685 * Our stop position is what we found to be the head's position
2686 * when we entered this function. It might have changed already.
2687 * But if it has, we will receive (or have already received and
2688 * queued) another signal and come here again.
2689 *
2690 * We are not holding NotifyQueueLock here! The queue can only
2691 * extend beyond the head pointer (see above) and we leave our
2692 * backend's pointer where it is so nobody will truncate or
2693 * rewrite pages under us. Especially we don't want to hold a lock
2694 * while sending the notifications to the frontend.
2695 */
2696 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2697 } while (!reachedStop);
2698
2699 /* Update shared state */
2704
2706 }
2707
2708 /* Done with snapshot */
2709 UnregisterSnapshot(snapshot);
2710}
2711
2712/*
2713 * Fetch notifications from the shared queue, beginning at position current,
2714 * and deliver relevant ones to my frontend.
2715 *
2716 * The function returns true once we have reached the stop position or an
2717 * uncommitted notification, and false if we have finished with the page.
2718 * In other words: once it returns true there is no need to look further.
2719 * The QueuePosition *current is advanced past all processed messages.
2720 */
2721static bool
2723 QueuePosition stop,
2724 Snapshot snapshot)
2725{
2726 int64 curpage = QUEUE_POS_PAGE(*current);
2727 int slotno;
2728 char *page_buffer;
2729 bool reachedStop = false;
2730 bool reachedEndOfPage;
2731
2732 /*
2733 * We copy the entries into a local buffer to avoid holding the SLRU lock
2734 * while we transmit them to our frontend. The local buffer must be
2735 * adequately aligned.
2736 */
2738 char *local_buf_end = local_buf;
2739
2742 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2743
2744 do
2745 {
2746 QueuePosition thisentry = *current;
2748
2749 if (QUEUE_POS_EQUAL(thisentry, stop))
2750 break;
2751
2752 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2753
2754 /*
2755 * Advance *current over this message, possibly to the next page. As
2756 * noted in the comments for asyncQueueReadAllNotifications, we must
2757 * do this before possibly failing while processing the message.
2758 */
2759 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2760
2761 /* Ignore messages destined for other databases */
2762 if (qe->dboid == MyDatabaseId)
2763 {
2764 if (XidInMVCCSnapshot(qe->xid, snapshot))
2765 {
2766 /*
2767 * The source transaction is still in progress, so we can't
2768 * process this message yet. Break out of the loop, but first
2769 * back up *current so we will reprocess the message next
2770 * time. (Note: it is unlikely but not impossible for
2771 * TransactionIdDidCommit to fail, so we can't really avoid
2772 * this advance-then-back-up behavior when dealing with an
2773 * uncommitted message.)
2774 *
2775 * Note that we must test XidInMVCCSnapshot before we test
2776 * TransactionIdDidCommit, else we might return a message from
2777 * a transaction that is not yet visible to snapshots; compare
2778 * the comments at the head of heapam_visibility.c.
2779 *
2780 * Also, while our own xact won't be listed in the snapshot,
2781 * we need not check for TransactionIdIsCurrentTransactionId
2782 * because our transaction cannot (yet) have queued any
2783 * messages.
2784 */
2785 *current = thisentry;
2786 reachedStop = true;
2787 break;
2788 }
2789
2790 /*
2791 * Quick check for the case that we're not listening on any
2792 * channels, before calling TransactionIdDidCommit(). This makes
2793 * that case a little faster, but more importantly, it ensures
2794 * that if there's a bad entry in the queue for which
2795 * TransactionIdDidCommit() fails for some reason, we can skip
2796 * over it on the first LISTEN in a session, and not get stuck on
2797 * it indefinitely. (This is a little trickier than it looks: it
2798 * works because BecomeRegisteredListener runs this code before we
2799 * have made the first entry in localChannelTable.)
2800 */
2802 continue;
2803
2804 if (TransactionIdDidCommit(qe->xid))
2805 {
2806 memcpy(local_buf_end, qe, qe->length);
2807 local_buf_end += qe->length;
2808 }
2809 else
2810 {
2811 /*
2812 * The source transaction aborted or crashed, so we just
2813 * ignore its notifications.
2814 */
2815 }
2816 }
2817
2818 /* Loop back if we're not at end of page */
2819 } while (!reachedEndOfPage);
2820
2821 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2823
2824 /*
2825 * Now that we have let go of the SLRU bank lock, send the notifications
2826 * to our backend
2827 */
2829 for (char *p = local_buf; p < local_buf_end;)
2830 {
2832
2833 /* qe->data is the null-terminated channel name */
2834 char *channel = qe->data;
2835
2836 if (IsListeningOn(channel))
2837 {
2838 /* payload follows channel name */
2839 char *payload = qe->data + strlen(channel) + 1;
2840
2841 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2842 }
2843
2844 p += qe->length;
2845 }
2846
2847 if (QUEUE_POS_EQUAL(*current, stop))
2848 reachedStop = true;
2849
2850 return reachedStop;
2851}
2852
2853/*
2854 * Advance the shared queue tail variable to the minimum of all the
2855 * per-backend tail pointers. Truncate pg_notify space if possible.
2856 *
2857 * This is (usually) called during CommitTransaction(), so it's important for
2858 * it to have very low probability of failure.
2859 */
2860static void
2862{
2863 QueuePosition min;
2866 int64 boundary;
2867
2868 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2870
2871 /*
2872 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2873 * (ie, exactly match at least one backend's queue position), so it must
2874 * be updated atomically with the actual computation. Since v13, we could
2875 * get away with not doing it like that, but it seems prudent to keep it
2876 * so.
2877 *
2878 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2879 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2880 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2881 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2882 * there are pages we can truncate but haven't yet finished doing so.
2883 *
2884 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2885 * performing SimpleLruTruncate. This is OK because no backend will try
2886 * to access the pages we are in the midst of truncating.
2887 */
2889 min = QUEUE_HEAD;
2891 {
2893 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2894 }
2895 QUEUE_TAIL = min;
2898
2899 /*
2900 * We can truncate something if the global tail advanced across an SLRU
2901 * segment boundary.
2902 *
2903 * XXX it might be better to truncate only once every several segments, to
2904 * reduce the number of directory scans.
2905 */
2908 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2909 {
2910 /*
2911 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2912 * release the lock again.
2913 */
2915
2919 }
2920
2922}
2923
2924/*
2925 * AsyncNotifyFreezeXids
2926 *
2927 * Prepare the async notification queue for CLOG truncation by freezing
2928 * transaction IDs that are about to become inaccessible.
2929 *
2930 * This function is called by VACUUM before advancing datfrozenxid. It scans
2931 * the notification queue and replaces XIDs that would become inaccessible
2932 * after CLOG truncation with special markers:
2933 * - Committed transactions are set to FrozenTransactionId
2934 * - Aborted/crashed transactions are set to InvalidTransactionId
2935 *
2936 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2937 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2938 * (or it would have held back newFrozenXid through ProcArray).
2939 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2940 * either aborted explicitly or crashed, and we can safely mark it invalid.
2941 */
2942void
2944{
2945 QueuePosition pos;
2946 QueuePosition head;
2947 int64 curpage = -1;
2948 int slotno = -1;
2949 char *page_buffer = NULL;
2950 bool page_dirty = false;
2951
2952 /*
2953 * Acquire locks in the correct order to avoid deadlocks. As per the
2954 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2955 * bank locks.
2956 *
2957 * We only need SHARED mode since we're just reading the head/tail
2958 * positions, not modifying them.
2959 */
2962
2963 pos = QUEUE_TAIL;
2964 head = QUEUE_HEAD;
2965
2966 /* Release NotifyQueueLock early, we only needed to read the positions */
2968
2969 /*
2970 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2971 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2972 * we're working.
2973 */
2974 while (!QUEUE_POS_EQUAL(pos, head))
2975 {
2977 TransactionId xid;
2978 int64 pageno = QUEUE_POS_PAGE(pos);
2979 int offset = QUEUE_POS_OFFSET(pos);
2980
2981 /* If we need a different page, release old lock and get new one */
2982 if (pageno != curpage)
2983 {
2984 LWLock *lock;
2985
2986 /* Release previous page if any */
2987 if (slotno >= 0)
2988 {
2989 if (page_dirty)
2990 {
2991 NotifyCtl->shared->page_dirty[slotno] = true;
2992 page_dirty = false;
2993 }
2995 }
2996
2997 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2999 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3001 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3002 curpage = pageno;
3003 }
3004
3005 qe = (AsyncQueueEntry *) (page_buffer + offset);
3006 xid = qe->xid;
3007
3008 if (TransactionIdIsNormal(xid) &&
3010 {
3011 if (TransactionIdDidCommit(xid))
3012 {
3013 qe->xid = FrozenTransactionId;
3014 page_dirty = true;
3015 }
3016 else
3017 {
3018 qe->xid = InvalidTransactionId;
3019 page_dirty = true;
3020 }
3021 }
3022
3023 /* Advance to next entry */
3024 asyncQueueAdvance(&pos, qe->length);
3025 }
3026
3027 /* Release final page lock if we acquired one */
3028 if (slotno >= 0)
3029 {
3030 if (page_dirty)
3031 NotifyCtl->shared->page_dirty[slotno] = true;
3033 }
3034
3036}
3037
3038/*
3039 * ProcessIncomingNotify
3040 *
3041 * Scan the queue for arriving notifications and report them to the front
3042 * end. The notifications might be from other sessions, or our own;
3043 * there's no need to distinguish here.
3044 *
3045 * If "flush" is true, force any frontend messages out immediately.
3046 *
3047 * NOTE: since we are outside any transaction, we must create our own.
3048 */
3049static void
3050ProcessIncomingNotify(bool flush)
3051{
3052 /* We *must* reset the flag */
3053 notifyInterruptPending = false;
3054
3055 /* Do nothing else if we aren't actively listening */
3057 return;
3058
3059 if (Trace_notify)
3060 elog(DEBUG1, "ProcessIncomingNotify");
3061
3062 set_ps_display("notify interrupt");
3063
3064 /*
3065 * We must run asyncQueueReadAllNotifications inside a transaction, else
3066 * bad things happen if it gets an error.
3067 */
3069
3071
3073
3074 /*
3075 * If this isn't an end-of-command case, we must flush the notify messages
3076 * to ensure frontend gets them promptly.
3077 */
3078 if (flush)
3079 pq_flush();
3080
3081 set_ps_display("idle");
3082
3083 if (Trace_notify)
3084 elog(DEBUG1, "ProcessIncomingNotify: done");
3085}
3086
3087/*
3088 * Send NOTIFY message to my front end.
3089 */
3090void
3091NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3092{
3094 {
3096
3098 pq_sendint32(&buf, srcPid);
3099 pq_sendstring(&buf, channel);
3100 pq_sendstring(&buf, payload);
3102
3103 /*
3104 * NOTE: we do not do pq_flush() here. Some level of caller will
3105 * handle it later, allowing this message to be combined into a packet
3106 * with other ones.
3107 */
3108 }
3109 else
3110 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3111}
3112
3113/* Does pendingNotifies include a match for the given event? */
3114static bool
3116{
3117 if (pendingNotifies == NULL)
3118 return false;
3119
3121 {
3122 /* Use the hash table to probe for a match */
3124 &n,
3125 HASH_FIND,
3126 NULL))
3127 return true;
3128 }
3129 else
3130 {
3131 /* Must scan the event list */
3132 ListCell *l;
3133
3134 foreach(l, pendingNotifies->events)
3135 {
3137
3138 if (n->channel_len == oldn->channel_len &&
3139 n->payload_len == oldn->payload_len &&
3140 memcmp(n->data, oldn->data,
3141 n->channel_len + n->payload_len + 2) == 0)
3142 return true;
3143 }
3144 }
3145
3146 return false;
3147}
3148
3149/*
3150 * Add a notification event to a pre-existing pendingNotifies list.
3151 *
3152 * Because pendingNotifies->events is already nonempty, this works
3153 * correctly no matter what CurrentMemoryContext is.
3154 */
3155static void
3157{
3159
3160 /* Create the hash tables if it's time to */
3163 {
3165 ListCell *l;
3166
3167 /* Create the hash table */
3168 hash_ctl.keysize = sizeof(Notification *);
3169 hash_ctl.entrysize = sizeof(struct NotificationHash);
3174 hash_create("Pending Notifies",
3175 256L,
3176 &hash_ctl,
3178
3179 /* Create the unique channel name table */
3181 hash_ctl.keysize = NAMEDATALEN;
3182 hash_ctl.entrysize = sizeof(ChannelName);
3185 hash_create("Pending Notify Channel Names",
3186 64L,
3187 &hash_ctl,
3189
3190 /* Insert all the already-existing events */
3191 foreach(l, pendingNotifies->events)
3192 {
3194 char *channel = oldn->data;
3195 bool found;
3196
3198 &oldn,
3199 HASH_ENTER,
3200 &found);
3201 Assert(!found);
3202
3203 /* Add channel name to uniqueChannelHash; might be there already */
3205 channel,
3206 HASH_ENTER,
3207 NULL);
3208 }
3209 }
3210
3211 /* Add new event to the list, in order */
3213
3214 /* Add event to the hash tables if needed */
3216 {
3217 char *channel = n->data;
3218 bool found;
3219
3221 &n,
3222 HASH_ENTER,
3223 &found);
3224 Assert(!found);
3225
3226 /* Add channel name to uniqueChannelHash; might be there already */
3228 channel,
3229 HASH_ENTER,
3230 NULL);
3231 }
3232}
3233
3234/*
3235 * notification_hash: hash function for notification hash table
3236 *
3237 * The hash "keys" are pointers to Notification structs.
3238 */
3239static uint32
3240notification_hash(const void *key, Size keysize)
3241{
3242 const Notification *k = *(const Notification *const *) key;
3243
3244 Assert(keysize == sizeof(Notification *));
3245 /* We don't bother to include the payload's trailing null in the hash */
3246 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3247 k->channel_len + k->payload_len + 1));
3248}
3249
3250/*
3251 * notification_match: match function to use with notification_hash
3252 */
3253static int
3254notification_match(const void *key1, const void *key2, Size keysize)
3255{
3256 const Notification *k1 = *(const Notification *const *) key1;
3257 const Notification *k2 = *(const Notification *const *) key2;
3258
3259 Assert(keysize == sizeof(Notification *));
3260 if (k1->channel_len == k2->channel_len &&
3261 k1->payload_len == k2->payload_len &&
3262 memcmp(k1->data, k2->data,
3263 k1->channel_len + k1->payload_len + 2) == 0)
3264 return 0; /* equal */
3265 return 1; /* not equal */
3266}
3267
3268/* Clear the pendingActions and pendingNotifies lists. */
3269static void
3271{
3272 /*
3273 * Everything's allocated in either TopTransactionContext or the context
3274 * for the subtransaction to which it corresponds. So, there's nothing to
3275 * do here except reset the pointers; the space will be reclaimed when the
3276 * contexts are deleted.
3277 */
3280 /* Also clear pendingListenActions, which is derived from pendingActions */
3282}
3283
3284/*
3285 * GUC check_hook for notify_buffers
3286 */
3287bool
3288check_notify_buffers(int *newval, void **extra, GucSource source)
3289{
3290 return check_slru_buffers("notify_buffers", newval);
3291}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2258
static double asyncQueueUsage(void)
Definition async.c:2185
#define MIN_HASHABLE_NOTIFIES
Definition async.c:512
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1522
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:1998
#define QUEUE_FIRST_LISTENER
Definition async.c:350
#define QUEUE_POS_MAX(x, y)
Definition async.c:258
static bool tryAdvanceTail
Definition async.c:563
void HandleNotifyInterrupt(void)
Definition async.c:2542
static void BecomeRegisteredListener(void)
Definition async.c:1421
static void asyncQueueAdvanceTail(void)
Definition async.c:2862
int max_notify_queue_pages
Definition async.c:569
static ActionList * pendingActions
Definition async.c:443
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1712
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:356
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3241
void Async_UnlistenAll(void)
Definition async.c:1066
static int32 * signalPids
Definition async.c:559
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3092
void AtCommit_Notify(void)
Definition async.c:1369
#define QUEUE_POS_MIN(x, y)
Definition async.c:252
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1625
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2572
ListenActionKind
Definition async.c:424
@ LISTEN_LISTEN
Definition async.c:425
@ LISTEN_UNLISTEN_ALL
Definition async.c:427
@ LISTEN_UNLISTEN
Definition async.c:426
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3116
#define QUEUE_BACKEND_POS(i)
Definition async.c:354
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:662
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:377
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3255
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:649
#define SET_QUEUE_POS(x, y, z)
Definition async.c:239
static ProcNumber * signalProcnos
Definition async.c:560
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2723
static void ProcessIncomingNotify(bool flush)
Definition async.c:3051
static void asyncQueueReadAllNotifications(void)
Definition async.c:2589
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1138
#define QUEUE_POS_OFFSET(x)
Definition async.c:237
static QueuePosition queueHeadAfterWrite
Definition async.c:552
bool Trace_notify
Definition async.c:566
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2034
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3271
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1086
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:851
static NotificationList * pendingNotifies
Definition async.c:519
#define AsyncQueueEntryEmptySize
Definition async.c:225
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3157
static AsyncQueueControl * asyncQueueControl
Definition async.c:345
static bool unlistenExitRegistered
Definition async.c:540
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:627
static dsa_area * globalChannelDSA
Definition async.c:400
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1965
#define QUEUE_TAIL
Definition async.c:348
void AtAbort_Notify(void)
Definition async.c:2410
#define QUEUE_POS_PAGE(x)
Definition async.c:236
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1685
void PreCommit_Notify(void)
Definition async.c:1176
#define QUEUE_CLEANUP_DELAY
Definition async.c:280
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1655
static void asyncQueueFillWarning(void)
Definition async.c:2206
#define QUEUE_BACKEND_PID(i)
Definition async.c:351
static void CleanupListenersOnExit(void)
Definition async.c:1837
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:637
Size AsyncShmemSize(void)
Definition async.c:775
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:366
void Async_Unlisten(const char *channel)
Definition async.c:1048
static HTAB * pendingListenActions
Definition async.c:465
void Async_Listen(const char *channel)
Definition async.c:1034
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:199
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:248
static void initGlobalChannelTable(void)
Definition async.c:676
#define NotifyCtl
Definition async.c:363
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:355
static HTAB * localChannelTable
Definition async.c:407
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:617
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:987
#define QUEUEALIGN(len)
Definition async.c:223
static bool amRegisteredListener
Definition async.c:543
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:264
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:353
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:352
void AtSubAbort_Notify(void)
Definition async.c:2499
void AtPrepare_Notify(void)
Definition async.c:1151
#define QUEUE_PAGESIZE
Definition async.c:364
void AtSubCommit_Notify(void)
Definition async.c:2429
static bool asyncQueueIsFull(void)
Definition async.c:1950
#define QUEUE_HEAD
Definition async.c:347
void AsyncShmemInit(void)
Definition async.c:792
static void initLocalChannelTable(void)
Definition async.c:727
PendingListenAction
Definition async.c:454
@ PENDING_UNLISTEN
Definition async.c:456
@ PENDING_LISTEN
Definition async.c:455
static dshash_table * globalChannelTable
Definition async.c:399
static void asyncQueueUnregister(void)
Definition async.c:1907
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2160
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:245
#define LocalChannelTableIsEmpty()
Definition async.c:410
static void initPendingListenActions(void)
Definition async.c:753
static QueuePosition queueHeadBeforeWrite
Definition async.c:551
static bool IsListeningOn(const char *channel)
Definition async.c:1894
void Async_Notify(const char *channel, const char *payload)
Definition async.c:885
volatile sig_atomic_t notifyInterruptPending
Definition async.c:537
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2944
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3289
#define QUEUE_STOP_PAGE
Definition async.c:349
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
#define CStringGetTextDatum(s)
Definition builtins.h:97
#define Assert(condition)
Definition c.h:873
int64_t int64
Definition c.h:543
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:480
int32_t int32
Definition c.h:542
uint16_t uint16
Definition c.h:545
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
size_t Size
Definition c.h:619
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:592
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:543
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:560
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:640
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:392
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:369
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:272
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:749
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
Definition dshash.c:435
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:659
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:208
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:574
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:759
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
uint32 dshash_hash
Definition dshash.h:30
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define DEBUG3
Definition elog.h:28
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define INFO
Definition elog.h:34
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:47
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
bool ExitOnAnyError
Definition globals.c:123
int notify_buffers
Definition globals.c:164
struct Latch * MyLatch
Definition globals.c:63
Oid MyDatabaseId
Definition globals.c:94
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:96
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_COMPARE
Definition hsearch.h:99
#define HASH_FUNCTION
Definition hsearch.h:98
#define IsParallelWorker()
Definition parallel.h:60
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
#define pq_flush()
Definition libpq.h:46
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
void * arg
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:343
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:92
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:232
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:284
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:252
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition slru.c:630
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1816
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1769
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition slru.c:527
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:375
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1433
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:198
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:355
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:160
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:439
int nestingLevel
Definition async.c:438
struct ActionList * upper
Definition async.c:440
dshash_table_handle globalChannelTableDSH
Definition async.c:340
TimestampTz lastQueueFillWarn
Definition async.c:338
dsa_handle globalChannelTableDSA
Definition async.c:339
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:219
char channel[NAMEDATALEN]
Definition async.c:527
dsa_pointer listenersArray
Definition async.c:394
int allocatedListeners
Definition async.c:396
GlobalChannelKey key
Definition async.c:393
char channel[NAMEDATALEN]
Definition async.c:382
Size keysize
Definition hsearch.h:75
Definition pg_list.h:54
bool listening
Definition async.c:388
Notification * event
Definition async.c:516
List * uniqueChannelNames
Definition async.c:507
HTAB * uniqueChannelHash
Definition async.c:508
HTAB * hashtab
Definition async.c:506
List * events
Definition async.c:505
struct NotificationList * upper
Definition async.c:509
uint16 payload_len
Definition async.c:497
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:499
uint16 channel_len
Definition async.c:496
PendingListenAction action
Definition async.c:462
char channel[NAMEDATALEN]
Definition async.c:461
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:443
char * text_to_cstring(const text *t)
Definition varlena.c:214
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011
int GetCurrentTransactionNestLevel(void)
Definition xact.c:930
void StartTransactionCommand(void)
Definition xact.c:3080
void CommitTransactionCommand(void)
Definition xact.c:3178
TransactionId GetCurrentTransactionId(void)
Definition xact.c:455

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 512 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 199 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 363 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 352 of file async.c.

◆ QUEUE_BACKEND_IS_ADVANCING

#define QUEUE_BACKEND_IS_ADVANCING (   i)    (asyncQueueControl->backend[i].isAdvancing)

Definition at line 356 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 351 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 354 of file async.c.

◆ QUEUE_BACKEND_WAKEUP_PENDING

#define QUEUE_BACKEND_WAKEUP_PENDING (   i)    (asyncQueueControl->backend[i].wakeupPending)

Definition at line 355 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 280 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 350 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 366 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 347 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 353 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 364 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 245 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 248 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
int y
Definition isn.c:76
int x
Definition isn.c:75

Definition at line 258 of file async.c.

259 : \
260 (x).page != (y).page ? (x) : \
261 (x).offset > (y).offset ? (x) : (y))

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 252 of file async.c.

253 : \
254 (x).page != (y).page ? (y) : \
255 (x).offset < (y).offset ? (x) : (y))

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 237 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 236 of file async.c.

◆ QUEUE_POS_PRECEDES

#define QUEUE_POS_PRECEDES (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) || \
((x).page == (y).page && (x).offset < (y).offset))

Definition at line 264 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 349 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 348 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 223 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 239 of file async.c.

240 { \
241 (x).page = (y); \
242 (x).offset = (z); \
243 } while (0)

Typedef Documentation

◆ ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ ChannelName

◆ GlobalChannelEntry

◆ GlobalChannelKey

◆ ListenerEntry

◆ Notification

◆ NotificationList

◆ PendingListenEntry

◆ QueueBackendStatus

◆ QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 423 of file async.c.

◆ PendingListenAction

Enumerator
PENDING_LISTEN 
PENDING_UNLISTEN 

Definition at line 453 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 3157 of file async.c.

3158{
3160
3161 /* Create the hash tables if it's time to */
3164 {
3166 ListCell *l;
3167
3168 /* Create the hash table */
3169 hash_ctl.keysize = sizeof(Notification *);
3170 hash_ctl.entrysize = sizeof(struct NotificationHash);
3175 hash_create("Pending Notifies",
3176 256L,
3177 &hash_ctl,
3179
3180 /* Create the unique channel name table */
3182 hash_ctl.keysize = NAMEDATALEN;
3183 hash_ctl.entrysize = sizeof(ChannelName);
3186 hash_create("Pending Notify Channel Names",
3187 64L,
3188 &hash_ctl,
3190
3191 /* Insert all the already-existing events */
3192 foreach(l, pendingNotifies->events)
3193 {
3195 char *channel = oldn->data;
3196 bool found;
3197
3199 &oldn,
3200 HASH_ENTER,
3201 &found);
3202 Assert(!found);
3203
3204 /* Add channel name to uniqueChannelHash; might be there already */
3206 channel,
3207 HASH_ENTER,
3208 NULL);
3209 }
3210 }
3211
3212 /* Add new event to the list, in order */
3214
3215 /* Add event to the hash tables if needed */
3217 {
3218 char *channel = n->data;
3219 bool found;
3220
3222 &n,
3223 HASH_ENTER,
3224 &found);
3225 Assert(!found);
3226
3227 /* Add channel name to uniqueChannelHash; might be there already */
3229 channel,
3230 HASH_ENTER,
3231 NULL);
3232 }
3233}

References Assert, CurTransactionContext, Notification::data, NotificationList::events, fb(), HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), HASH_STRINGS, NotificationList::hashtab, lappend(), lfirst, list_length(), MIN_HASHABLE_NOTIFIES, NAMEDATALEN, NIL, notification_hash(), notification_match(), pendingNotifies, and NotificationList::uniqueChannelHash.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ ApplyPendingListenActions()

static void ApplyPendingListenActions ( bool  isCommit)
static

Definition at line 1712 of file async.c.

1713{
1715 PendingListenEntry *pending;
1716
1717 /* Quick exit if nothing to do */
1719 return;
1720
1721 /* We made a globalChannelTable before building pendingListenActions */
1722 if (globalChannelTable == NULL)
1723 elog(PANIC, "global channel table missing post-commit/abort");
1724
1725 /* For each staged action ... */
1727 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1728 {
1730 GlobalChannelEntry *entry;
1731 bool removeLocal = true;
1732 bool foundListener = false;
1733
1734 /*
1735 * Find the global entry for this channel. If isCommit, it had better
1736 * exist (it was created in PreCommit). In an abort, it might not
1737 * exist, in which case we are not listening and should discard any
1738 * local entry that PreCommit may have managed to create.
1739 */
1740 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1741 entry = dshash_find(globalChannelTable, &key, true);
1742 if (entry != NULL)
1743 {
1744 /* Scan entry to find the ListenerEntry for this backend */
1746
1749
1750 for (int i = 0; i < entry->numListeners; i++)
1751 {
1752 if (listeners[i].procNo != MyProcNumber)
1753 continue;
1754 foundListener = true;
1755 if (isCommit)
1756 {
1757 if (pending->action == PENDING_LISTEN)
1758 {
1759 /*
1760 * LISTEN being committed: set listening=true.
1761 * localChannelTable entry was created during
1762 * PreCommit and should be kept.
1763 */
1764 listeners[i].listening = true;
1765 removeLocal = false;
1766 }
1767 else
1768 {
1769 /*
1770 * UNLISTEN being committed: remove pre-allocated
1771 * entries from both tables.
1772 */
1774 }
1775 }
1776 else
1777 {
1778 /*
1779 * Note: this part is reachable only if the transaction
1780 * aborts after PreCommit_Notify() has made some
1781 * pendingListenActions entries, so it's pretty hard to
1782 * test.
1783 */
1784 if (!listeners[i].listening)
1785 {
1786 /*
1787 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1788 * and we weren't listening before, so remove
1789 * pre-allocated entries from both tables.
1790 */
1792 }
1793 else
1794 {
1795 /*
1796 * We're aborting, but the previous state was that
1797 * we're listening, so keep localChannelTable entry.
1798 */
1799 removeLocal = false;
1800 }
1801 }
1802 break; /* there shouldn't be another match */
1803 }
1804
1805 /* We might have already released the entry by removing it */
1806 if (entry != NULL)
1808 }
1809
1810 /*
1811 * If we're committing a LISTEN action, we should have found a
1812 * matching ListenerEntry, but otherwise it's okay if we didn't.
1813 */
1814 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1815 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1816 pending->channel, MyProcNumber);
1817
1818 /*
1819 * If we did not find a globalChannelTable entry for our backend, or
1820 * if we are unlistening, remove any localChannelTable entry that may
1821 * exist. (Note in particular that this cleans up if we created a
1822 * localChannelTable entry and then failed while trying to create a
1823 * globalChannelTable entry.)
1824 */
1827 HASH_REMOVE, NULL);
1828 }
1829}

References PendingListenEntry::action, PendingListenEntry::channel, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_REMOVE, hash_search(), hash_seq_init(), hash_seq_search(), i, GlobalChannelEntry::listenersArray, ListenerEntry::listening, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PANIC, PENDING_LISTEN, pendingListenActions, and RemoveListenerFromChannel().

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char channel)

Definition at line 1034 of file async.c.

1035{
1036 if (Trace_notify)
1037 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1038
1039 queue_listen(LISTEN_LISTEN, channel);
1040}

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char channel,
const char payload 
)

Definition at line 885 of file async.c.

886{
887 int my_level = GetCurrentTransactionNestLevel();
888 size_t channel_len;
889 size_t payload_len;
890 Notification *n;
891 MemoryContext oldcontext;
892
893 if (IsParallelWorker())
894 elog(ERROR, "cannot send notifications from a parallel worker");
895
896 if (Trace_notify)
897 elog(DEBUG1, "Async_Notify(%s)", channel);
898
899 channel_len = channel ? strlen(channel) : 0;
900 payload_len = payload ? strlen(payload) : 0;
901
902 /* a channel name must be specified */
903 if (channel_len == 0)
906 errmsg("channel name cannot be empty")));
907
908 /* enforce length limits */
909 if (channel_len >= NAMEDATALEN)
912 errmsg("channel name too long")));
913
914 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
917 errmsg("payload string too long")));
918
919 /*
920 * We must construct the Notification entry, even if we end up not using
921 * it, in order to compare it cheaply to existing list entries.
922 *
923 * The notification list needs to live until end of transaction, so store
924 * it in the transaction context.
925 */
927
929 channel_len + payload_len + 2);
930 n->channel_len = channel_len;
931 n->payload_len = payload_len;
932 strcpy(n->data, channel);
933 if (payload)
934 strcpy(n->data + channel_len + 1, payload);
935 else
936 n->data[channel_len + 1] = '\0';
937
938 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
939 {
941
942 /*
943 * First notify event in current (sub)xact. Note that we allocate the
944 * NotificationList in TopTransactionContext; the nestingLevel might
945 * get changed later by AtSubCommit_Notify.
946 */
949 sizeof(NotificationList));
950 notifies->nestingLevel = my_level;
951 notifies->events = list_make1(n);
952 /* We certainly don't need a hashtable yet */
953 notifies->hashtab = NULL;
954 /* We won't build uniqueChannelNames/Hash till later, either */
955 notifies->uniqueChannelNames = NIL;
956 notifies->uniqueChannelHash = NULL;
957 notifies->upper = pendingNotifies;
959 }
960 else
961 {
962 /* Now check for duplicates */
964 {
965 /* It's a dup, so forget it */
966 pfree(n);
967 MemoryContextSwitchTo(oldcontext);
968 return;
969 }
970
971 /* Append more events to existing list */
973 }
974
975 MemoryContextSwitchTo(oldcontext);
976}

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, fb(), GetCurrentTransactionNestLevel(), IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NIL, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char channel)

Definition at line 1048 of file async.c.

1049{
1050 if (Trace_notify)
1051 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1052
1053 /* If we couldn't possibly be listening, no need to queue anything */
1055 return;
1056
1057 queue_listen(LISTEN_UNLISTEN, channel);
1058}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 1066 of file async.c.

1067{
1068 if (Trace_notify)
1069 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1070
1071 /* If we couldn't possibly be listening, no need to queue anything */
1073 return;
1074
1076}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 1138 of file async.c.

1139{
1142}

References asyncQueueUnregister(), and CleanupListenersOnExit().

Referenced by BecomeRegisteredListener().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 3116 of file async.c.

3117{
3118 if (pendingNotifies == NULL)
3119 return false;
3120
3122 {
3123 /* Use the hash table to probe for a match */
3125 &n,
3126 HASH_FIND,
3127 NULL))
3128 return true;
3129 }
3130 else
3131 {
3132 /* Must scan the event list */
3133 ListCell *l;
3134
3135 foreach(l, pendingNotifies->events)
3136 {
3138
3139 if (n->channel_len == oldn->channel_len &&
3140 n->payload_len == oldn->payload_len &&
3141 memcmp(n->data, oldn->data,
3142 n->channel_len + n->payload_len + 2) == 0)
3143 return true;
3144 }
3145 }
3146
3147 return false;
3148}

References Notification::channel_len, Notification::data, NotificationList::events, fb(), HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ AsyncNotifyFreezeXids()

void AsyncNotifyFreezeXids ( TransactionId  newFrozenXid)

Definition at line 2944 of file async.c.

2945{
2946 QueuePosition pos;
2947 QueuePosition head;
2948 int64 curpage = -1;
2949 int slotno = -1;
2950 char *page_buffer = NULL;
2951 bool page_dirty = false;
2952
2953 /*
2954 * Acquire locks in the correct order to avoid deadlocks. As per the
2955 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2956 * bank locks.
2957 *
2958 * We only need SHARED mode since we're just reading the head/tail
2959 * positions, not modifying them.
2960 */
2963
2964 pos = QUEUE_TAIL;
2965 head = QUEUE_HEAD;
2966
2967 /* Release NotifyQueueLock early, we only needed to read the positions */
2969
2970 /*
2971 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2972 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2973 * we're working.
2974 */
2975 while (!QUEUE_POS_EQUAL(pos, head))
2976 {
2978 TransactionId xid;
2979 int64 pageno = QUEUE_POS_PAGE(pos);
2980 int offset = QUEUE_POS_OFFSET(pos);
2981
2982 /* If we need a different page, release old lock and get new one */
2983 if (pageno != curpage)
2984 {
2985 LWLock *lock;
2986
2987 /* Release previous page if any */
2988 if (slotno >= 0)
2989 {
2990 if (page_dirty)
2991 {
2992 NotifyCtl->shared->page_dirty[slotno] = true;
2993 page_dirty = false;
2994 }
2996 }
2997
2998 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3000 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3002 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3003 curpage = pageno;
3004 }
3005
3006 qe = (AsyncQueueEntry *) (page_buffer + offset);
3007 xid = qe->xid;
3008
3009 if (TransactionIdIsNormal(xid) &&
3011 {
3012 if (TransactionIdDidCommit(xid))
3013 {
3014 qe->xid = FrozenTransactionId;
3015 page_dirty = true;
3016 }
3017 else
3018 {
3019 qe->xid = InvalidTransactionId;
3020 page_dirty = true;
3021 }
3022 }
3023
3024 /* Advance to next entry */
3025 asyncQueueAdvance(&pos, qe->length);
3026 }
3027
3028 /* Release final page lock if we acquired one */
3029 if (slotno >= 0)
3030 {
3031 if (page_dirty)
3032 NotifyCtl->shared->page_dirty[slotno] = true;
3034 }
3035
3037}

References asyncQueueAdvance(), fb(), FrozenTransactionId, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruGetBankLock(), SimpleLruReadPage(), TransactionIdDidCommit(), TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by vac_truncate_clog().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 2034 of file async.c.

2035{
2038 int64 pageno;
2039 int offset;
2040 int slotno;
2042
2043 /*
2044 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2045 * memory upon exiting. The reason for this is that if we have to advance
2046 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2047 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2048 * subsequent insertions would try to put entries into a page that slru.c
2049 * thinks doesn't exist yet.) So, use a local position variable. Note
2050 * that if we do fail, any already-inserted queue entries are forgotten;
2051 * this is okay, since they'd be useless anyway after our transaction
2052 * rolls back.
2053 */
2055
2056 /*
2057 * If this is the first write since the postmaster started, we need to
2058 * initialize the first page of the async SLRU. Otherwise, the current
2059 * page should be initialized already, so just fetch it.
2060 */
2061 pageno = QUEUE_POS_PAGE(queue_head);
2063
2064 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2066
2069 else
2070 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2072
2073 /* Note we mark the page dirty before writing in it */
2074 NotifyCtl->shared->page_dirty[slotno] = true;
2075
2076 while (nextNotify != NULL)
2077 {
2079
2080 /* Construct a valid queue entry in local variable qe */
2082
2083 offset = QUEUE_POS_OFFSET(queue_head);
2084
2085 /* Check whether the entry really fits on the current page */
2086 if (offset + qe.length <= QUEUE_PAGESIZE)
2087 {
2088 /* OK, so advance nextNotify past this item */
2090 }
2091 else
2092 {
2093 /*
2094 * Write a dummy entry to fill up the page. Actually readers will
2095 * only check dboid and since it won't match any reader's database
2096 * OID, they will ignore this entry and move on.
2097 */
2098 qe.length = QUEUE_PAGESIZE - offset;
2099 qe.dboid = InvalidOid;
2101 qe.data[0] = '\0'; /* empty channel */
2102 qe.data[1] = '\0'; /* empty payload */
2103 }
2104
2105 /* Now copy qe into the shared buffer page */
2106 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2107 &qe,
2108 qe.length);
2109
2110 /* Advance queue_head appropriately, and detect if page is full */
2111 if (asyncQueueAdvance(&(queue_head), qe.length))
2112 {
2113 LWLock *lock;
2114
2115 pageno = QUEUE_POS_PAGE(queue_head);
2116 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2117 if (lock != prevlock)
2118 {
2121 prevlock = lock;
2122 }
2123
2124 /*
2125 * Page is full, so we're done here, but first fill the next page
2126 * with zeroes. The reason to do this is to ensure that slru.c's
2127 * idea of the head page is always the same as ours, which avoids
2128 * boundary problems in SimpleLruTruncate. The test in
2129 * asyncQueueIsFull() ensured that there is room to create this
2130 * page without overrunning the queue.
2131 */
2133
2134 /*
2135 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2136 * set flag to remember that we should try to advance the tail
2137 * pointer (we don't want to actually do that right here).
2138 */
2140 tryAdvanceTail = true;
2141
2142 /* And exit the loop */
2143 break;
2144 }
2145 }
2146
2147 /* Success, so update the global QUEUE_HEAD */
2149
2151
2152 return nextNotify;
2153}

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), NotificationList::events, fb(), InvalidOid, InvalidTransactionId, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1965 of file async.c.

1966{
1967 int64 pageno = QUEUE_POS_PAGE(*position);
1968 int offset = QUEUE_POS_OFFSET(*position);
1969 bool pageJump = false;
1970
1971 /*
1972 * Move to the next writing position: First jump over what we have just
1973 * written or read.
1974 */
1975 offset += entryLength;
1976 Assert(offset <= QUEUE_PAGESIZE);
1977
1978 /*
1979 * In a second step check if another entry can possibly be written to the
1980 * page. If so, stay here, we have reached the next position. If not, then
1981 * we need to move on to the next page.
1982 */
1984 {
1985 pageno++;
1986 offset = 0;
1987 pageJump = true;
1988 }
1989
1990 SET_QUEUE_POS(*position, pageno, offset);
1991 return pageJump;
1992}

References Assert, AsyncQueueEntryEmptySize, fb(), QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by AsyncNotifyFreezeXids(), asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2862 of file async.c.

2863{
2864 QueuePosition min;
2867 int64 boundary;
2868
2869 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2871
2872 /*
2873 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2874 * (ie, exactly match at least one backend's queue position), so it must
2875 * be updated atomically with the actual computation. Since v13, we could
2876 * get away with not doing it like that, but it seems prudent to keep it
2877 * so.
2878 *
2879 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2880 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2881 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2882 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2883 * there are pages we can truncate but haven't yet finished doing so.
2884 *
2885 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2886 * performing SimpleLruTruncate. This is OK because no backend will try
2887 * to access the pages we are in the midst of truncating.
2888 */
2890 min = QUEUE_HEAD;
2892 {
2894 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2895 }
2896 QUEUE_TAIL = min;
2899
2900 /*
2901 * We can truncate something if the global tail advanced across an SLRU
2902 * segment boundary.
2903 *
2904 * XXX it might be better to truncate only once every several segments, to
2905 * reduce the number of directory scans.
2906 */
2909 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2910 {
2911 /*
2912 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2913 * release the lock again.
2914 */
2916
2920 }
2921
2923}

References Assert, asyncQueuePagePrecedes(), fb(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 2206 of file async.c.

2207{
2208 double fillDegree;
2209 TimestampTz t;
2210
2212 if (fillDegree < 0.5)
2213 return;
2214
2215 t = GetCurrentTimestamp();
2216
2219 {
2222
2224 {
2226 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2229 }
2230
2232 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2233 (minPid != InvalidPid ?
2234 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2235 : 0),
2236 (minPid != InvalidPid ?
2237 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2238 : 0)));
2239
2241 }
2242}

References Assert, asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), fb(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1950 of file async.c.

1951{
1952 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1954 int64 occupied = headPage - tailPage;
1955
1957}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1998 of file async.c.

1999{
2000 size_t channellen = n->channel_len;
2001 size_t payloadlen = n->payload_len;
2002 int entryLength;
2003
2006
2007 /* The terminators are already included in AsyncQueueEntryEmptySize */
2010 qe->length = entryLength;
2011 qe->dboid = MyDatabaseId;
2012 qe->xid = GetCurrentTransactionId();
2013 qe->srcPid = MyProcPid;
2014 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2015}

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, fb(), GetCurrentTransactionId(), MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, and QUEUEALIGN.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 617 of file async.c.

618{
619 return p - q;
620}

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 627 of file async.c.

628{
629 return p < q;
630}

Referenced by asyncQueueAdvanceTail(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( QueuePosition current,
QueuePosition  stop,
Snapshot  snapshot 
)
static

Definition at line 2723 of file async.c.

2726{
2727 int64 curpage = QUEUE_POS_PAGE(*current);
2728 int slotno;
2729 char *page_buffer;
2730 bool reachedStop = false;
2731 bool reachedEndOfPage;
2732
2733 /*
2734 * We copy the entries into a local buffer to avoid holding the SLRU lock
2735 * while we transmit them to our frontend. The local buffer must be
2736 * adequately aligned.
2737 */
2739 char *local_buf_end = local_buf;
2740
2743 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2744
2745 do
2746 {
2747 QueuePosition thisentry = *current;
2749
2750 if (QUEUE_POS_EQUAL(thisentry, stop))
2751 break;
2752
2753 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2754
2755 /*
2756 * Advance *current over this message, possibly to the next page. As
2757 * noted in the comments for asyncQueueReadAllNotifications, we must
2758 * do this before possibly failing while processing the message.
2759 */
2760 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2761
2762 /* Ignore messages destined for other databases */
2763 if (qe->dboid == MyDatabaseId)
2764 {
2765 if (XidInMVCCSnapshot(qe->xid, snapshot))
2766 {
2767 /*
2768 * The source transaction is still in progress, so we can't
2769 * process this message yet. Break out of the loop, but first
2770 * back up *current so we will reprocess the message next
2771 * time. (Note: it is unlikely but not impossible for
2772 * TransactionIdDidCommit to fail, so we can't really avoid
2773 * this advance-then-back-up behavior when dealing with an
2774 * uncommitted message.)
2775 *
2776 * Note that we must test XidInMVCCSnapshot before we test
2777 * TransactionIdDidCommit, else we might return a message from
2778 * a transaction that is not yet visible to snapshots; compare
2779 * the comments at the head of heapam_visibility.c.
2780 *
2781 * Also, while our own xact won't be listed in the snapshot,
2782 * we need not check for TransactionIdIsCurrentTransactionId
2783 * because our transaction cannot (yet) have queued any
2784 * messages.
2785 */
2786 *current = thisentry;
2787 reachedStop = true;
2788 break;
2789 }
2790
2791 /*
2792 * Quick check for the case that we're not listening on any
2793 * channels, before calling TransactionIdDidCommit(). This makes
2794 * that case a little faster, but more importantly, it ensures
2795 * that if there's a bad entry in the queue for which
2796 * TransactionIdDidCommit() fails for some reason, we can skip
2797 * over it on the first LISTEN in a session, and not get stuck on
2798 * it indefinitely. (This is a little trickier than it looks: it
2799 * works because BecomeRegisteredListener runs this code before we
2800 * have made the first entry in localChannelTable.)
2801 */
2803 continue;
2804
2805 if (TransactionIdDidCommit(qe->xid))
2806 {
2807 memcpy(local_buf_end, qe, qe->length);
2808 local_buf_end += qe->length;
2809 }
2810 else
2811 {
2812 /*
2813 * The source transaction aborted or crashed, so we just
2814 * ignore its notifications.
2815 */
2816 }
2817 }
2818
2819 /* Loop back if we're not at end of page */
2820 } while (!reachedEndOfPage);
2821
2822 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2824
2825 /*
2826 * Now that we have let go of the SLRU bank lock, send the notifications
2827 * to our backend
2828 */
2830 for (char *p = local_buf; p < local_buf_end;)
2831 {
2833
2834 /* qe->data is the null-terminated channel name */
2835 char *channel = qe->data;
2836
2837 if (IsListeningOn(channel))
2838 {
2839 /* payload follows channel name */
2840 char *payload = qe->data + strlen(channel) + 1;
2841
2842 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2843 }
2844
2845 p += qe->length;
2846 }
2847
2848 if (QUEUE_POS_EQUAL(*current, stop))
2849 reachedStop = true;
2850
2851 return reachedStop;
2852}

References Assert, asyncQueueAdvance(), AsyncQueueEntry::data, fb(), InvalidTransactionId, IsListeningOn(), LocalChannelTableIsEmpty, LWLockRelease(), MyDatabaseId, NotifyCtl, NotifyMyFrontEnd(), QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), TransactionIdDidCommit(), and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 2589 of file async.c.

2590{
2591 QueuePosition pos;
2592 QueuePosition head;
2593 Snapshot snapshot;
2594
2595 /*
2596 * Fetch current state, indicate to others that we have woken up, and that
2597 * we are in process of advancing our position.
2598 */
2600 /* Assert checks that we have a valid state entry */
2604 head = QUEUE_HEAD;
2605
2606 if (QUEUE_POS_EQUAL(pos, head))
2607 {
2608 /* Nothing to do, we have read all notifications already. */
2610 return;
2611 }
2612
2615
2616 /*----------
2617 * Get snapshot we'll use to decide which xacts are still in progress.
2618 * This is trickier than it might seem, because of race conditions.
2619 * Consider the following example:
2620 *
2621 * Backend 1: Backend 2:
2622 *
2623 * transaction starts
2624 * UPDATE foo SET ...;
2625 * NOTIFY foo;
2626 * commit starts
2627 * queue the notify message
2628 * transaction starts
2629 * LISTEN foo; -- first LISTEN in session
2630 * SELECT * FROM foo WHERE ...;
2631 * commit to clog
2632 * commit starts
2633 * add backend 2 to array of listeners
2634 * advance to queue head (this code)
2635 * commit to clog
2636 *
2637 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2638 * wasn't committed yet. Ideally we'd ensure that client 2 would
2639 * eventually get transaction 1's notify message, but there's no way
2640 * to do that; until we're in the listener array, there's no guarantee
2641 * that the notify message doesn't get removed from the queue.
2642 *
2643 * Therefore the coding technique transaction 2 is using is unsafe:
2644 * applications must commit a LISTEN before inspecting database state,
2645 * if they want to ensure they will see notifications about subsequent
2646 * changes to that state.
2647 *
2648 * What we do guarantee is that we'll see all notifications from
2649 * transactions committing after the snapshot we take here.
2650 * BecomeRegisteredListener has already added us to the listener array,
2651 * so no not-yet-committed messages can be removed from the queue
2652 * before we see them.
2653 *----------
2654 */
2655 snapshot = RegisterSnapshot(GetLatestSnapshot());
2656
2657 /*
2658 * It is possible that we fail while trying to send a message to our
2659 * frontend (for example, because of encoding conversion failure). If
2660 * that happens it is critical that we not try to send the same message
2661 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2662 * ERRORs to FATAL, causing the client connection to be closed on error.
2663 *
2664 * We used to only skip over the offending message and try to soldier on,
2665 * but it was somewhat questionable to lose a notification and give the
2666 * client an ERROR instead. A client application is not be prepared for
2667 * that and can't tell that a notification was missed. It was also not
2668 * very useful in practice because notifications are often processed while
2669 * a connection is idle and reading a message from the client, and in that
2670 * state, any error is upgraded to FATAL anyway. Closing the connection
2671 * is a clear signal to the application that it might have missed
2672 * notifications.
2673 */
2674 {
2676 bool reachedStop;
2677
2678 ExitOnAnyError = true;
2679
2680 do
2681 {
2682 /*
2683 * Process messages up to the stop position, end of page, or an
2684 * uncommitted message.
2685 *
2686 * Our stop position is what we found to be the head's position
2687 * when we entered this function. It might have changed already.
2688 * But if it has, we will receive (or have already received and
2689 * queued) another signal and come here again.
2690 *
2691 * We are not holding NotifyQueueLock here! The queue can only
2692 * extend beyond the head pointer (see above) and we leave our
2693 * backend's pointer where it is so nobody will truncate or
2694 * rewrite pages under us. Especially we don't want to hold a lock
2695 * while sending the notifications to the frontend.
2696 */
2697 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2698 } while (!reachedStop);
2699
2700 /* Update shared state */
2705
2707 }
2708
2709 /* Done with snapshot */
2710 UnregisterSnapshot(snapshot);
2711}

References Assert, asyncQueueProcessPageEntries(), ExitOnAnyError, fb(), GetLatestSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_HEAD, QUEUE_POS_EQUAL, RegisterSnapshot(), and UnregisterSnapshot().

Referenced by BecomeRegisteredListener(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1907 of file async.c.

1908{
1909 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1910
1911 if (!amRegisteredListener) /* nothing to do */
1912 return;
1913
1914 /*
1915 * Need exclusive lock here to manipulate list links.
1916 */
1918 /* Mark our entry as invalid */
1923 /* and remove it from the list */
1926 else
1927 {
1929 {
1931 {
1933 break;
1934 }
1935 }
1936 }
1939
1940 /* mark ourselves as no longer listed in the global array */
1941 amRegisteredListener = false;
1942}

References amRegisteredListener, Assert, fb(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, LocalChannelTableIsEmpty, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 2185 of file async.c.

2186{
2187 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2189 int64 occupied = headPage - tailPage;
2190
2191 if (occupied == 0)
2192 return (double) 0; /* fast exit for common case */
2193
2194 return (double) occupied / (double) max_notify_queue_pages;
2195}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 792 of file async.c.

793{
794 bool found;
795 Size size;
796
797 /*
798 * Create or attach to the AsyncQueueControl structure.
799 */
800 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
801 size = add_size(size, offsetof(AsyncQueueControl, backend));
802
804 ShmemInitStruct("Async Queue Control", size, &found);
805
806 if (!found)
807 {
808 /* First time through, so initialize it */
811 QUEUE_STOP_PAGE = 0;
816 for (int i = 0; i < MaxBackends; i++)
817 {
824 }
825 }
826
827 /*
828 * Set up SLRU management of the pg_notify data. Note that long segment
829 * names are used in order to avoid wraparound.
830 */
831 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
834 SYNC_HANDLER_NONE, true);
835
836 if (!found)
837 {
838 /*
839 * During start or reboot, clean out the pg_notify directory.
840 */
842 }
843}

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, fb(), AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 775 of file async.c.

776{
777 Size size;
778
779 /* This had better match AsyncShmemInit */
780 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
781 size = add_size(size, offsetof(AsyncQueueControl, backend));
782
784
785 return size;
786}

References add_size(), fb(), MaxBackends, mul_size(), notify_buffers, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 2410 of file async.c.

2411{
2412 /* Revert staged listen/unlisten changes */
2414
2415 /* If we're no longer listening on anything, unregister */
2418
2419 /* And clean up */
2421}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and LocalChannelTableIsEmpty.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1369 of file async.c.

1370{
1371 /*
1372 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1373 * return as soon as possible
1374 */
1376 return;
1377
1378 if (Trace_notify)
1379 elog(DEBUG1, "AtCommit_Notify");
1380
1381 /* Apply staged listen/unlisten changes */
1383
1384 /* If no longer listening to anything, get out of listener array */
1387
1388 /*
1389 * Send signals to listening backends. We need do this only if there are
1390 * pending notifies, which were previously added to the shared queue by
1391 * PreCommit_Notify().
1392 */
1393 if (pendingNotifies != NULL)
1395
1396 /*
1397 * If it's time to try to advance the global tail pointer, do that.
1398 *
1399 * (It might seem odd to do this in the sender, when more than likely the
1400 * listeners won't yet have read the messages we just sent. However,
1401 * there's less contention if only the sender does it, and there is little
1402 * need for urgency in advancing the global tail. So this typically will
1403 * be clearing out messages that were sent some time ago.)
1404 */
1405 if (tryAdvanceTail)
1406 {
1407 tryAdvanceTail = false;
1409 }
1410
1411 /* And clean up */
1413}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueAdvanceTail(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), DEBUG1, elog, fb(), LocalChannelTableIsEmpty, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 1151 of file async.c.

1152{
1153 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1155 ereport(ERROR,
1157 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1158}

References ereport, errcode(), errmsg(), ERROR, fb(), pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 2499 of file async.c.

2500{
2501 int my_level = GetCurrentTransactionNestLevel();
2502
2503 /*
2504 * All we have to do is pop the stack --- the actions/notifies made in
2505 * this subxact are no longer interesting, and the space will be freed
2506 * when CurTransactionContext is recycled. We still have to free the
2507 * ActionList and NotificationList objects themselves, though, because
2508 * those are allocated in TopTransactionContext.
2509 *
2510 * Note that there might be no entries at all, or no entries for the
2511 * current subtransaction level, either because none were ever created, or
2512 * because we reentered this routine due to trouble during subxact abort.
2513 */
2514 while (pendingActions != NULL &&
2515 pendingActions->nestingLevel >= my_level)
2516 {
2518
2521 }
2522
2523 while (pendingNotifies != NULL &&
2524 pendingNotifies->nestingLevel >= my_level)
2525 {
2527
2530 }
2531}

References fb(), GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 2429 of file async.c.

2430{
2431 int my_level = GetCurrentTransactionNestLevel();
2432
2433 /* If there are actions at our nesting level, we must reparent them. */
2434 if (pendingActions != NULL &&
2435 pendingActions->nestingLevel >= my_level)
2436 {
2437 if (pendingActions->upper == NULL ||
2438 pendingActions->upper->nestingLevel < my_level - 1)
2439 {
2440 /* nothing to merge; give the whole thing to the parent */
2442 }
2443 else
2444 {
2446
2448
2449 /*
2450 * Mustn't try to eliminate duplicates here --- see queue_listen()
2451 */
2454 childPendingActions->actions);
2456 }
2457 }
2458
2459 /* If there are notifies at our nesting level, we must reparent them. */
2460 if (pendingNotifies != NULL &&
2461 pendingNotifies->nestingLevel >= my_level)
2462 {
2463 Assert(pendingNotifies->nestingLevel == my_level);
2464
2465 if (pendingNotifies->upper == NULL ||
2466 pendingNotifies->upper->nestingLevel < my_level - 1)
2467 {
2468 /* nothing to merge; give the whole thing to the parent */
2470 }
2471 else
2472 {
2473 /*
2474 * Formerly, we didn't bother to eliminate duplicates here, but
2475 * now we must, else we fall foul of "Assert(!found)", either here
2476 * or during a later attempt to build the parent-level hashtable.
2477 */
2479 ListCell *l;
2480
2482 /* Insert all the subxact's events into parent, except for dups */
2483 foreach(l, childPendingNotifies->events)
2484 {
2486
2489 }
2491 }
2492 }
2493}

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), fb(), GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ BecomeRegisteredListener()

static void BecomeRegisteredListener ( void  )
static

Definition at line 1421 of file async.c.

1422{
1423 QueuePosition head;
1424 QueuePosition max;
1426
1427 /*
1428 * Nothing to do if we are already listening to something, nor if we
1429 * already ran this routine in this transaction.
1430 */
1432 return;
1433
1434 if (Trace_notify)
1435 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1436
1437 /*
1438 * Before registering, make sure we will unlisten before dying. (Note:
1439 * this action does not get undone if we abort later.)
1440 */
1442 {
1445 }
1446
1447 /*
1448 * This is our first LISTEN, so establish our pointer.
1449 *
1450 * We set our pointer to the global tail pointer and then move it forward
1451 * over already-committed notifications. This ensures we cannot miss any
1452 * not-yet-committed notifications. We might get a few more but that
1453 * doesn't hurt.
1454 *
1455 * In some scenarios there might be a lot of committed notifications that
1456 * have not yet been pruned away (because some backend is being lazy about
1457 * reading them). To reduce our startup time, we can look at other
1458 * backends and adopt the maximum "pos" pointer of any backend that's in
1459 * our database; any notifications it's already advanced over are surely
1460 * committed and need not be re-examined by us. (We must consider only
1461 * backends connected to our DB, because others will not have bothered to
1462 * check committed-ness of notifications in our DB.)
1463 *
1464 * We need exclusive lock here so we can look at other backends' entries
1465 * and manipulate the list links.
1466 */
1468 head = QUEUE_HEAD;
1469 max = QUEUE_TAIL;
1472 {
1474 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1475 /* Also find last listening backend before this one */
1476 if (i < MyProcNumber)
1477 prevListener = i;
1478 }
1484 /* Insert backend into list of listeners at correct position */
1486 {
1489 }
1490 else
1491 {
1494 }
1496
1497 /* Now we are listed in the global array, so remember we're listening */
1498 amRegisteredListener = true;
1499
1500 /*
1501 * Try to move our pointer forward as far as possible. This will skip
1502 * over already-committed notifications, which we want to do because they
1503 * might be quite stale. Note that we are not yet listening on anything,
1504 * so we won't deliver such notifications to our frontend. Also, although
1505 * our transaction might have executed NOTIFY, those message(s) aren't
1506 * queued yet so we won't skip them here.
1507 */
1508 if (!QUEUE_POS_EQUAL(max, head))
1510}

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, fb(), i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ check_notify_buffers()

bool check_notify_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 3289 of file async.c.

3290{
3291 return check_slru_buffers("notify_buffers", newval);
3292}

References check_slru_buffers(), and newval.

◆ CleanupListenersOnExit()

static void CleanupListenersOnExit ( void  )
static

Definition at line 1837 of file async.c.

1838{
1839 dshash_seq_status status;
1840 GlobalChannelEntry *entry;
1841
1842 if (Trace_notify)
1843 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1844
1845 /* Clear our local cache (not really necessary, but be consistent) */
1846 if (localChannelTable != NULL)
1847 {
1850 }
1851
1852 /* Now remove our entries from the shared globalChannelTable */
1853 if (globalChannelTable == NULL)
1854 return;
1855
1856 dshash_seq_init(&status, globalChannelTable, true);
1857 while ((entry = dshash_seq_next(&status)) != NULL)
1858 {
1860
1861 if (entry->key.dboid != MyDatabaseId)
1862 continue; /* not relevant */
1863
1866
1867 for (int i = 0; i < entry->numListeners; i++)
1868 {
1869 if (listeners[i].procNo == MyProcNumber)
1870 {
1871 entry->numListeners--;
1872 if (i < entry->numListeners)
1873 memmove(&listeners[i], &listeners[i + 1],
1874 sizeof(ListenerEntry) * (entry->numListeners - i));
1875
1876 if (entry->numListeners == 0)
1877 {
1879 dshash_delete_current(&status);
1880 }
1881 break;
1882 }
1883 }
1884 }
1885 dshash_seq_term(&status);
1886}

References GlobalChannelKey::dboid, DEBUG1, dsa_free(), dsa_get_address(), dshash_delete_current(), dshash_seq_init(), dshash_seq_next(), dshash_seq_term(), elog, fb(), globalChannelDSA, globalChannelTable, hash_destroy(), i, GlobalChannelEntry::key, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, MyProcPid, GlobalChannelEntry::numListeners, and Trace_notify.

Referenced by Async_UnlistenOnExit().

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 3271 of file async.c.

3272{
3273 /*
3274 * Everything's allocated in either TopTransactionContext or the context
3275 * for the subtransaction to which it corresponds. So, there's nothing to
3276 * do here except reset the pointers; the space will be reclaimed when the
3277 * contexts are deleted.
3278 */
3281 /* Also clear pendingListenActions, which is derived from pendingActions */
3283}

References fb(), pendingActions, pendingListenActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ GlobalChannelKeyInit()

static void GlobalChannelKeyInit ( GlobalChannelKey key,
Oid  dboid,
const char channel 
)
inlinestatic

Definition at line 637 of file async.c.

638{
639 memset(key, 0, sizeof(GlobalChannelKey));
640 key->dboid = dboid;
641 strlcpy(key->channel, channel, NAMEDATALEN);
642}

References fb(), NAMEDATALEN, and strlcpy().

Referenced by ApplyPendingListenActions(), PrepareTableEntriesForListen(), and SignalBackends().

◆ globalChannelTableHash()

static dshash_hash globalChannelTableHash ( const void key,
size_t  size,
void arg 
)
static

Definition at line 649 of file async.c.

650{
651 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
652 dshash_hash h;
653
655 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
657
658 return h;
659}

References GlobalChannelKey::channel, DatumGetUInt32(), GlobalChannelKey::dboid, fb(), hash_any(), hash_uint32(), and NAMEDATALEN.

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 2542 of file async.c.

2543{
2544 /*
2545 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2546 * you do here.
2547 */
2548
2549 /* signal that work needs to be done */
2551
2552 /* make sure the event is processed in due course */
2554}

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ initGlobalChannelTable()

static void initGlobalChannelTable ( void  )
static

Definition at line 676 of file async.c.

677{
678 MemoryContext oldcontext;
679
680 /* Quick exit if we already did this */
683 return;
684
685 /* Otherwise, use a lock to ensure only one process creates the table */
687
688 /* Be sure any local memory allocated by DSA routines is persistent */
690
692 {
693 /* Initialize dynamic shared hash table for global channels */
699 NULL);
700
701 /* Store handles in shared memory for other backends to use */
705 }
706 else if (!globalChannelTable)
707 {
708 /* Attach to existing dynamic shared hash table */
714 NULL);
715 }
716
717 MemoryContextSwitchTo(oldcontext);
719}

References asyncQueueControl, dsa_attach(), dsa_create, dsa_get_handle(), dsa_pin(), dsa_pin_mapping(), dshash_attach(), dshash_create(), dshash_get_hash_table_handle(), DSHASH_HANDLE_INVALID, fb(), globalChannelDSA, globalChannelTable, AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, globalChannelTableDSHParams, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MemoryContextSwitchTo(), and TopMemoryContext.

Referenced by PreCommit_Notify().

◆ initLocalChannelTable()

static void initLocalChannelTable ( void  )
static

Definition at line 727 of file async.c.

728{
730
731 /* Quick exit if we already did this */
732 if (localChannelTable != NULL)
733 return;
734
735 /* Initialize local hash table for this backend's listened channels */
737 hash_ctl.entrysize = sizeof(ChannelName);
738
740 hash_create("Local Listen Channels",
741 64,
742 &hash_ctl,
744}

References fb(), hash_create(), HASH_ELEM, HASH_STRINGS, HASHCTL::keysize, localChannelTable, and NAMEDATALEN.

Referenced by PreCommit_Notify().

◆ initPendingListenActions()

static void initPendingListenActions ( void  )
static

◆ IsListeningOn()

static bool IsListeningOn ( const char channel)
static

Definition at line 1894 of file async.c.

1895{
1896 if (localChannelTable == NULL)
1897 return false;
1898
1899 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1900}

References fb(), HASH_FIND, hash_search(), and localChannelTable.

Referenced by asyncQueueProcessPageEntries().

◆ notification_hash()

static uint32 notification_hash ( const void key,
Size  keysize 
)
static

Definition at line 3241 of file async.c.

3242{
3243 const Notification *k = *(const Notification *const *) key;
3244
3245 Assert(keysize == sizeof(Notification *));
3246 /* We don't bother to include the payload's trailing null in the hash */
3247 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3248 k->channel_len + k->payload_len + 1));
3249}

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void key1,
const void key2,
Size  keysize 
)
static

Definition at line 3255 of file async.c.

3256{
3257 const Notification *k1 = *(const Notification *const *) key1;
3258 const Notification *k2 = *(const Notification *const *) key2;
3259
3260 Assert(keysize == sizeof(Notification *));
3261 if (k1->channel_len == k2->channel_len &&
3262 k1->payload_len == k2->payload_len &&
3263 memcmp(k1->data, k2->data,
3264 k1->channel_len + k1->payload_len + 2) == 0)
3265 return 0; /* equal */
3266 return 1; /* not equal */
3267}

References Assert, and fb().

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char channel,
const char payload,
int32  srcPid 
)

Definition at line 3092 of file async.c.

3093{
3095 {
3097
3099 pq_sendint32(&buf, srcPid);
3100 pq_sendstring(&buf, channel);
3101 pq_sendstring(&buf, payload);
3103
3104 /*
3105 * NOTE: we do not do pq_flush() here. Some level of caller will
3106 * handle it later, allowing this message to be combined into a packet
3107 * with other ones.
3108 */
3109 }
3110 else
3111 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3112}

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 1086 of file async.c.

1087{
1089 HASH_SEQ_STATUS *status;
1090
1091 /* stuff done only on the first call of the function */
1092 if (SRF_IS_FIRSTCALL())
1093 {
1094 /* create a function context for cross-call persistence */
1096
1097 /* Initialize hash table iteration if we have any channels */
1098 if (localChannelTable != NULL)
1099 {
1100 MemoryContext oldcontext;
1101
1102 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1103 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1105 funcctx->user_fctx = status;
1106 MemoryContextSwitchTo(oldcontext);
1107 }
1108 else
1109 {
1110 funcctx->user_fctx = NULL;
1111 }
1112 }
1113
1114 /* stuff done on every call of the function */
1116 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1117
1118 if (status != NULL)
1119 {
1120 ChannelName *entry;
1121
1122 entry = (ChannelName *) hash_seq_search(status);
1123 if (entry != NULL)
1125 }
1126
1128}

References ChannelName::channel, CStringGetTextDatum, fb(), hash_seq_init(), hash_seq_search(), localChannelTable, MemoryContextSwitchTo(), palloc(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 2160 of file async.c.

2161{
2162 double usage;
2163
2164 /* Advance the queue tail so we don't report a too-large result */
2166
2170
2172}

References asyncQueueAdvanceTail(), asyncQueueUsage(), fb(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 851 of file async.c.

852{
853 const char *channel;
854 const char *payload;
855
856 if (PG_ARGISNULL(0))
857 channel = "";
858 else
860
861 if (PG_ARGISNULL(1))
862 payload = "";
863 else
865
866 /* For NOTIFY as a statement, this is checked in ProcessUtility */
868
869 Async_Notify(channel, payload);
870
872}

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 1176 of file async.c.

1177{
1178 ListCell *p;
1179
1181 return; /* no relevant statements in this xact */
1182
1183 if (Trace_notify)
1184 elog(DEBUG1, "PreCommit_Notify");
1185
1186 /* Preflight for any pending listen/unlisten actions */
1188
1189 if (pendingActions != NULL)
1190 {
1191 /* Ensure we have a local channel table */
1193 /* Create pendingListenActions hash table for this transaction */
1195
1196 /* Stage all the actions this transaction wants to perform */
1197 foreach(p, pendingActions->actions)
1198 {
1200
1201 switch (actrec->action)
1202 {
1203 case LISTEN_LISTEN:
1206 break;
1207 case LISTEN_UNLISTEN:
1209 break;
1212 break;
1213 }
1214 }
1215 }
1216
1217 /* Queue any pending notifies (must happen after the above) */
1218 if (pendingNotifies)
1219 {
1221 bool firstIteration = true;
1222
1223 /*
1224 * Build list of unique channel names being notified for use by
1225 * SignalBackends().
1226 *
1227 * If uniqueChannelHash is available, use it to efficiently get the
1228 * unique channels. Otherwise, fall back to the O(N^2) approach.
1229 */
1232 {
1233 HASH_SEQ_STATUS status;
1235
1237 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1240 channelEntry->channel);
1241 }
1242 else
1243 {
1244 /* O(N^2) approach is better for small number of notifications */
1246 {
1247 char *channel = n->data;
1248 bool found = false;
1249
1250 /* Name present in list? */
1252 {
1253 if (strcmp(oldchan, channel) == 0)
1254 {
1255 found = true;
1256 break;
1257 }
1258 }
1259 /* Add if not already in list */
1260 if (!found)
1263 channel);
1264 }
1265 }
1266
1267 /* Preallocate workspace that will be needed by SignalBackends() */
1268 if (signalPids == NULL)
1270 MaxBackends * sizeof(int32));
1271
1272 if (signalProcnos == NULL)
1274 MaxBackends * sizeof(ProcNumber));
1275
1276 /*
1277 * Make sure that we have an XID assigned to the current transaction.
1278 * GetCurrentTransactionId is cheap if we already have an XID, but not
1279 * so cheap if we don't, and we'd prefer not to do that work while
1280 * holding NotifyQueueLock.
1281 */
1283
1284 /*
1285 * Serialize writers by acquiring a special lock that we hold till
1286 * after commit. This ensures that queue entries appear in commit
1287 * order, and in particular that there are never uncommitted queue
1288 * entries ahead of committed ones, so an uncommitted transaction
1289 * can't block delivery of deliverable notifications.
1290 *
1291 * We use a heavyweight lock so that it'll automatically be released
1292 * after either commit or abort. This also allows deadlocks to be
1293 * detected, though really a deadlock shouldn't be possible here.
1294 *
1295 * The lock is on "database 0", which is pretty ugly but it doesn't
1296 * seem worth inventing a special locktag category just for this.
1297 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1298 * used by the flatfiles mechanism.)
1299 */
1302
1303 /*
1304 * For the direct advancement optimization in SignalBackends(), we
1305 * need to ensure that no other backend can insert queue entries
1306 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1307 * heavyweight lock above provides this guarantee, since it serializes
1308 * all writers.
1309 *
1310 * Note: if the heavyweight lock were ever removed for scalability
1311 * reasons, we could achieve the same guarantee by holding
1312 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1313 * than releasing and reacquiring it for each page as we do below.
1314 */
1315
1316 /* Initialize values to a safe default in case list is empty */
1319
1320 /* Now push the notifications into the queue */
1322 while (nextNotify != NULL)
1323 {
1324 /*
1325 * Add the pending notifications to the queue. We acquire and
1326 * release NotifyQueueLock once per page, which might be overkill
1327 * but it does allow readers to get in while we're doing this.
1328 *
1329 * A full queue is very uncommon and should really not happen,
1330 * given that we have so much space available in the SLRU pages.
1331 * Nevertheless we need to deal with this possibility. Note that
1332 * when we get here we are in the process of committing our
1333 * transaction, but we have not yet committed to clog, so at this
1334 * point in time we can still roll the transaction back.
1335 */
1337 if (firstIteration)
1338 {
1340 firstIteration = false;
1341 }
1343 if (asyncQueueIsFull())
1344 ereport(ERROR,
1346 errmsg("too many notifications in the NOTIFY queue")));
1350 }
1351
1352 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1353 }
1354}

References AccessExclusiveLock, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), BecomeRegisteredListener(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, fb(), foreach_ptr, GetCurrentTransactionId(), hash_seq_init(), hash_seq_search(), initGlobalChannelTable(), initLocalChannelTable(), initPendingListenActions(), InvalidOid, lappend(), lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MemoryContextAlloc(), NIL, pendingActions, pendingNotifies, PrepareTableEntriesForListen(), PrepareTableEntriesForUnlisten(), PrepareTableEntriesForUnlistenAll(), QUEUE_HEAD, queueHeadAfterWrite, queueHeadBeforeWrite, SET_QUEUE_POS, signalPids, signalProcnos, TopMemoryContext, Trace_notify, NotificationList::uniqueChannelHash, and NotificationList::uniqueChannelNames.

Referenced by CommitTransaction().

◆ PrepareTableEntriesForListen()

static void PrepareTableEntriesForListen ( const char channel)
static

Definition at line 1522 of file async.c.

1523{
1525 GlobalChannelEntry *entry;
1526 bool found;
1528 PendingListenEntry *pending;
1529
1530 /*
1531 * Record in local pending hash that we want to LISTEN, overwriting any
1532 * earlier attempt to UNLISTEN.
1533 */
1534 pending = (PendingListenEntry *)
1536 pending->action = PENDING_LISTEN;
1537
1538 /*
1539 * Ensure that there is an entry for the channel in localChannelTable.
1540 * (Should this fail, we can just roll back.) If the transaction fails
1541 * after this point, we will remove the entry if appropriate during
1542 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1543 * to return TRUE; we assume nothing is going to consult that before
1544 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1545 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1546 * present to ensure they do the right things; see
1547 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1548 */
1550
1551 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1552 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1553 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1554
1555 if (!found)
1556 {
1557 /* New channel entry, so initialize it to a safe state */
1559 entry->numListeners = 0;
1560 entry->allocatedListeners = 0;
1561 }
1562
1563 /*
1564 * Create listenersArray if entry doesn't have one. It's tempting to fold
1565 * this into the !found case, but this coding allows us to cope in case
1566 * dsa_allocate() failed in an earlier attempt.
1567 */
1568 if (!DsaPointerIsValid(entry->listenersArray))
1569 {
1573 }
1574
1577
1578 /*
1579 * Check if we already have a ListenerEntry (possibly from earlier in this
1580 * transaction)
1581 */
1582 for (int i = 0; i < entry->numListeners; i++)
1583 {
1584 if (listeners[i].procNo == MyProcNumber)
1585 {
1586 /* Already have an entry; listening flag stays as-is until commit */
1588 return;
1589 }
1590 }
1591
1592 /* Need to add a new entry; grow array if necessary */
1593 if (entry->numListeners >= entry->allocatedListeners)
1594 {
1595 int new_size = entry->allocatedListeners * 2;
1598 sizeof(ListenerEntry) * new_size);
1600
1602 entry->listenersArray = new_array;
1606 }
1607
1608 listeners[entry->numListeners].procNo = MyProcNumber;
1609 listeners[entry->numListeners].listening = false; /* staged, not yet
1610 * committed */
1611 entry->numListeners++;
1612
1614}

References PendingListenEntry::action, GlobalChannelEntry::allocatedListeners, dsa_allocate, dsa_free(), dsa_get_address(), DsaPointerIsValid, dshash_find_or_insert(), dshash_release_lock(), fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_ENTER, hash_search(), i, INITIAL_LISTENERS_ARRAY_SIZE, InvalidDsaPointer, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PENDING_LISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlisten()

static void PrepareTableEntriesForUnlisten ( const char channel)
static

Definition at line 1625 of file async.c.

1626{
1627 PendingListenEntry *pending;
1628
1629 /*
1630 * If the channel name is not in localChannelTable, then we are neither
1631 * listening on it nor preparing to listen on it, so we don't need to
1632 * record an UNLISTEN action.
1633 */
1635 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1636 return;
1637
1638 /*
1639 * Record in local pending hash that we want to UNLISTEN, overwriting any
1640 * earlier attempt to LISTEN. Don't touch localChannelTable or
1641 * globalChannelTable yet - we keep receiving signals until commit.
1642 */
1643 pending = (PendingListenEntry *)
1645 pending->action = PENDING_UNLISTEN;
1646}

References PendingListenEntry::action, Assert, fb(), HASH_ENTER, HASH_FIND, hash_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlistenAll()

static void PrepareTableEntriesForUnlistenAll ( void  )
static

Definition at line 1655 of file async.c.

1656{
1659 PendingListenEntry *pending;
1660
1661 /*
1662 * Scan localChannelTable, which will have the names of all channels that
1663 * we are listening on or have prepared to listen on. Record an UNLISTEN
1664 * action for each one, overwriting any earlier attempt to LISTEN.
1665 */
1667 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1668 {
1669 pending = (PendingListenEntry *)
1671 pending->action = PENDING_UNLISTEN;
1672 }
1673}

References PendingListenEntry::action, fb(), HASH_ENTER, hash_search(), hash_seq_init(), hash_seq_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 3051 of file async.c.

3052{
3053 /* We *must* reset the flag */
3054 notifyInterruptPending = false;
3055
3056 /* Do nothing else if we aren't actively listening */
3058 return;
3059
3060 if (Trace_notify)
3061 elog(DEBUG1, "ProcessIncomingNotify");
3062
3063 set_ps_display("notify interrupt");
3064
3065 /*
3066 * We must run asyncQueueReadAllNotifications inside a transaction, else
3067 * bad things happen if it gets an error.
3068 */
3070
3072
3074
3075 /*
3076 * If this isn't an end-of-command case, we must flush the notify messages
3077 * to ensure frontend gets them promptly.
3078 */
3079 if (flush)
3080 pq_flush();
3081
3082 set_ps_display("idle");
3083
3084 if (Trace_notify)
3085 elog(DEBUG1, "ProcessIncomingNotify: done");
3086}

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, LocalChannelTableIsEmpty, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 2572 of file async.c.

2573{
2575 return; /* not really idle */
2576
2577 /* Loop in case another signal arrives while sending messages */
2579 ProcessIncomingNotify(flush);
2580}

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char channel 
)
static

Definition at line 987 of file async.c.

988{
989 MemoryContext oldcontext;
991 int my_level = GetCurrentTransactionNestLevel();
992
993 /*
994 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
995 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
996 * final per-channel intent is computed during PreCommit_Notify.
997 */
999
1000 /* space for terminating null is included in sizeof(ListenAction) */
1002 strlen(channel) + 1);
1003 actrec->action = action;
1004 strcpy(actrec->channel, channel);
1005
1006 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1007 {
1008 ActionList *actions;
1009
1010 /*
1011 * First action in current sub(xact). Note that we allocate the
1012 * ActionList in TopTransactionContext; the nestingLevel might get
1013 * changed later by AtSubCommit_Notify.
1014 */
1015 actions = (ActionList *)
1017 actions->nestingLevel = my_level;
1018 actions->actions = list_make1(actrec);
1019 actions->upper = pendingActions;
1020 pendingActions = actions;
1021 }
1022 else
1024
1025 MemoryContextSwitchTo(oldcontext);
1026}

References ActionList::actions, CurTransactionContext, fb(), GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ RemoveListenerFromChannel()

static void RemoveListenerFromChannel ( GlobalChannelEntry **  entry_ptr,
ListenerEntry listeners,
int  idx 
)
static

Definition at line 1685 of file async.c.

1688{
1689 GlobalChannelEntry *entry = *entry_ptr;
1690
1691 entry->numListeners--;
1692 if (idx < entry->numListeners)
1694 sizeof(ListenerEntry) * (entry->numListeners - idx));
1695
1696 if (entry->numListeners == 0)
1697 {
1700 /* tells caller not to release the entry's lock: */
1701 *entry_ptr = NULL;
1702 }
1703}

References dsa_free(), dshash_delete_entry(), fb(), globalChannelDSA, globalChannelTable, idx(), GlobalChannelEntry::listenersArray, and GlobalChannelEntry::numListeners.

Referenced by ApplyPendingListenActions().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 2258 of file async.c.

2259{
2260 int count;
2261
2262 /* Can't get here without PreCommit_Notify having made the global table */
2264
2265 /* It should have set up these arrays, too */
2267
2268 /*
2269 * Identify backends that we need to signal. We don't want to send
2270 * signals while holding the NotifyQueueLock, so this part just builds a
2271 * list of target PIDs in signalPids[] and signalProcnos[].
2272 */
2273 count = 0;
2274
2276
2277 /* Scan each channel name that we notified in this transaction */
2279 {
2281 GlobalChannelEntry *entry;
2283
2284 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2285 entry = dshash_find(globalChannelTable, &key, false);
2286 if (entry == NULL)
2287 continue; /* nobody is listening */
2288
2290 entry->listenersArray);
2291
2292 /* Identify listeners that now need waking, add them to arrays */
2293 for (int j = 0; j < entry->numListeners; j++)
2294 {
2295 ProcNumber i;
2296 int32 pid;
2297 QueuePosition pos;
2298
2299 if (!listeners[j].listening)
2300 continue; /* ignore not-yet-committed listeners */
2301
2302 i = listeners[j].procNo;
2303
2305 continue; /* already signaled, no need to repeat */
2306
2307 pid = QUEUE_BACKEND_PID(i);
2308 pos = QUEUE_BACKEND_POS(i);
2309
2310 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2311 continue; /* it's fully caught up already */
2312
2313 Assert(pid != InvalidPid);
2314
2316 signalPids[count] = pid;
2317 signalProcnos[count] = i;
2318 count++;
2319 }
2320
2322 }
2323
2324 /*
2325 * Scan all listeners. Any that are not already pending wakeup must not
2326 * be interested in our notifications (else we'd have set their wakeup
2327 * flags above). Check to see if we can directly advance their queue
2328 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2329 * them anyway so they will catch up.
2330 */
2332 {
2333 int32 pid;
2334 QueuePosition pos;
2335
2337 continue;
2338
2339 /* If it's currently advancing, we should not touch it */
2341 continue;
2342
2343 pid = QUEUE_BACKEND_PID(i);
2344 pos = QUEUE_BACKEND_POS(i);
2345
2346 /*
2347 * We can directly advance the other backend's queue pointer if it's
2348 * not currently advancing (else there are race conditions), and its
2349 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2350 * it miss some older messages), and we'd not be moving the pointer
2351 * backward.
2352 */
2355 {
2356 /* We can directly advance its pointer past what we wrote */
2358 }
2361 {
2362 /* It's idle and far behind, so wake it up */
2363 Assert(pid != InvalidPid);
2364
2366 signalPids[count] = pid;
2367 signalProcnos[count] = i;
2368 count++;
2369 }
2370 }
2371
2373
2374 /* Now send signals */
2375 for (int i = 0; i < count; i++)
2376 {
2377 int32 pid = signalPids[i];
2378
2379 /*
2380 * If we are signaling our own process, no need to involve the kernel;
2381 * just set the flag directly.
2382 */
2383 if (pid == MyProcPid)
2384 {
2386 continue;
2387 }
2388
2389 /*
2390 * Note: assuming things aren't broken, a signal failure here could
2391 * only occur if the target backend exited since we released
2392 * NotifyQueueLock; which is unlikely but certainly possible. So we
2393 * just log a low-level debug message if it happens.
2394 */
2396 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2397 }
2398}

References Assert, asyncQueuePageDiff(), DEBUG3, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), foreach_ptr, globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, i, INVALID_PROC_NUMBER, InvalidPid, j, GlobalChannelEntry::listenersArray, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcPid, notifyInterruptPending, GlobalChannelEntry::numListeners, pendingNotifies, PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, QUEUE_POS_PRECEDES, queueHeadAfterWrite, queueHeadBeforeWrite, SendProcSignal(), signalPids, signalProcnos, and NotificationList::uniqueChannelNames.

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 345 of file async.c.

Referenced by asyncQueueFillWarning(), AsyncShmemInit(), and initGlobalChannelTable().

◆ globalChannelDSA

◆ globalChannelTable

◆ globalChannelTableDSHParams

const dshash_parameters globalChannelTableDSHParams
static

◆ localChannelTable

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 569 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 361 of file async.c.

◆ notifyInterruptPending

◆ pendingActions

◆ pendingListenActions

◆ pendingNotifies

◆ queueHeadAfterWrite

QueuePosition queueHeadAfterWrite
static

Definition at line 552 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ queueHeadBeforeWrite

QueuePosition queueHeadBeforeWrite
static

Definition at line 551 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalPids

int32* signalPids = NULL
static

Definition at line 559 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalProcnos

ProcNumber* signalProcnos = NULL
static

Definition at line 560 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 563 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 540 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and BecomeRegisteredListener().