PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  GlobalChannelKey
 
struct  ListenerEntry
 
struct  GlobalChannelEntry
 
struct  ListenAction
 
struct  ActionList
 
struct  PendingListenEntry
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 
struct  ChannelName
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_POS_PRECEDES(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define QUEUE_BACKEND_WAKEUP_PENDING(i)   (asyncQueueControl->backend[i].wakeupPending)
 
#define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define INITIAL_LISTENERS_ARRAY_SIZE   4
 
#define LocalChannelTableIsEmpty()    (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct GlobalChannelKey GlobalChannelKey
 
typedef struct ListenerEntry ListenerEntry
 
typedef struct GlobalChannelEntry GlobalChannelEntry
 
typedef struct ActionList ActionList
 
typedef struct PendingListenEntry PendingListenEntry
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct ChannelName ChannelName
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 
enum  PendingListenAction { PENDING_LISTEN , PENDING_UNLISTEN }
 

Functions

static int asyncQueueErrdetailForIoError (const void *opaque_data)
 
static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static void GlobalChannelKeyInit (GlobalChannelKey *key, Oid dboid, const char *channel)
 
static dshash_hash globalChannelTableHash (const void *key, size_t size, void *arg)
 
static void initGlobalChannelTable (void)
 
static void initLocalChannelTable (void)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void BecomeRegisteredListener (void)
 
static void PrepareTableEntriesForListen (const char *channel)
 
static void PrepareTableEntriesForUnlisten (const char *channel)
 
static void PrepareTableEntriesForUnlistenAll (void)
 
static void RemoveListenerFromChannel (GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
 
static void ApplyPendingListenActions (bool isCommit)
 
static void CleanupListenersOnExit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (QueuePosition *current, QueuePosition stop, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
static void initPendingListenActions (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void AsyncNotifyFreezeXids (TransactionId newFrozenXid)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static dshash_tableglobalChannelTable = NULL
 
static dsa_areaglobalChannelDSA = NULL
 
static HTABlocalChannelTable = NULL
 
static ActionListpendingActions = NULL
 
static HTABpendingListenActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static QueuePosition queueHeadBeforeWrite
 
static QueuePosition queueHeadAfterWrite
 
static int32signalPids = NULL
 
static ProcNumbersignalProcnos = NULL
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 
static const dshash_parameters globalChannelTableDSHParams
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 226 of file async.c.

◆ INITIAL_LISTENERS_ARRAY_SIZE

#define INITIAL_LISTENERS_ARRAY_SIZE   4

Definition at line 378 of file async.c.

◆ LocalChannelTableIsEmpty

#define LocalChannelTableIsEmpty ( )     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)

Definition at line 411 of file async.c.

424{
429
430typedef struct
431{
433 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
435
436typedef struct ActionList
437{
438 int nestingLevel; /* current transaction nesting depth */
439 List *actions; /* list of ListenAction structs */
440 struct ActionList *upper; /* details for upper transaction levels */
441} ActionList;
442
444
445/*
446 * Hash table recording the final listen/unlisten intent per channel for
447 * the current transaction. Key is channel name, value is PENDING_LISTEN or
448 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
449 * per channel instead of replaying every action. This is built from the
450 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
451 * AtAbort_Notify.
452 */
453typedef enum
454{
458
459typedef struct PendingListenEntry
460{
461 char channel[NAMEDATALEN]; /* hash key */
462 PendingListenAction action; /* which action should we perform? */
464
466
467/*
468 * State for outbound notifies consists of a list of all channels+payloads
469 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
470 * until and unless the transaction commits. pendingNotifies is NULL if no
471 * NOTIFYs have been done in the current (sub) transaction.
472 *
473 * We discard duplicate notify events issued in the same transaction.
474 * Hence, in addition to the list proper (which we need to track the order
475 * of the events, since we guarantee to deliver them in order), we build a
476 * hash table which we can probe to detect duplicates. Since building the
477 * hash table is somewhat expensive, we do so only once we have at least
478 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
479 * before that we just scan the events linearly.
480 *
481 * The list is kept in CurTransactionContext. In subtransactions, each
482 * subtransaction has its own list in its own CurTransactionContext, but
483 * successful subtransactions add their entries to their parent's list.
484 * Failed subtransactions simply discard their lists. Since these lists
485 * are independent, there may be notify events in a subtransaction's list
486 * that duplicate events in some ancestor (sub) transaction; we get rid of
487 * the dups when merging the subtransaction's list into its parent's.
488 *
489 * Note: the action and notify lists do not interact within a transaction.
490 * In particular, if a transaction does NOTIFY and then LISTEN on the same
491 * condition name, it will get a self-notify at commit. This is a bit odd
492 * but is consistent with our historical behavior.
493 */
494typedef struct Notification
495{
496 uint16 channel_len; /* length of channel-name string */
497 uint16 payload_len; /* length of payload string */
498 /* null-terminated channel name, then null-terminated payload follow */
501
502typedef struct NotificationList
503{
504 int nestingLevel; /* current transaction nesting depth */
505 List *events; /* list of Notification structs */
506 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
507 List *uniqueChannelNames; /* unique channel names being notified */
508 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
509 struct NotificationList *upper; /* details for upper transaction levels */
511
512#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
513
514struct NotificationHash
515{
516 Notification *event; /* => the actual Notification struct */
517};
518
520
521/*
522 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
523 * (both just carry the channel name, with no payload).
524 */
525typedef struct ChannelName
526{
527 char channel[NAMEDATALEN]; /* hash key */
529
530/*
531 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
532 * called from inside a signal handler. That just sets the
533 * notifyInterruptPending flag and sets the process
534 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
535 * actually deal with the interrupt.
536 */
537volatile sig_atomic_t notifyInterruptPending = false;
538
539/* True if we've registered an on_shmem_exit cleanup */
540static bool unlistenExitRegistered = false;
541
542/* True if we're currently registered as a listener in asyncQueueControl */
543static bool amRegisteredListener = false;
544
545/*
546 * Queue head positions for direct advancement.
547 * These are captured during PreCommit_Notify while holding the heavyweight
548 * lock on database 0, ensuring no other backend can insert notifications
549 * between them. SignalBackends uses these to advance idle backends.
550 */
553
554/*
555 * Workspace arrays for SignalBackends. These are preallocated in
556 * PreCommit_Notify to avoid needing memory allocation after committing to
557 * clog.
558 */
559static int32 *signalPids = NULL;
561
562/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
563static bool tryAdvanceTail = false;
564
565/* GUC parameters */
566bool Trace_notify = false;
567
568/* For 8 KB pages this gives 8 GB of disk space */
569int max_notify_queue_pages = 1048576;
570
571/* local function prototypes */
572static int asyncQueueErrdetailForIoError(const void *opaque_data);
573static inline int64 asyncQueuePageDiff(int64 p, int64 q);
574static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
575static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
576 const char *channel);
577static dshash_hash globalChannelTableHash(const void *key, size_t size,
578 void *arg);
579static void initGlobalChannelTable(void);
580static void initLocalChannelTable(void);
581static void queue_listen(ListenActionKind action, const char *channel);
582static void Async_UnlistenOnExit(int code, Datum arg);
583static void BecomeRegisteredListener(void);
584static void PrepareTableEntriesForListen(const char *channel);
585static void PrepareTableEntriesForUnlisten(const char *channel);
586static void PrepareTableEntriesForUnlistenAll(void);
589 int idx);
590static void ApplyPendingListenActions(bool isCommit);
591static void CleanupListenersOnExit(void);
592static bool IsListeningOn(const char *channel);
593static void asyncQueueUnregister(void);
594static bool asyncQueueIsFull(void);
595static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
598static double asyncQueueUsage(void);
599static void asyncQueueFillWarning(void);
600static void SignalBackends(void);
601static void asyncQueueReadAllNotifications(void);
603 QueuePosition stop,
604 Snapshot snapshot);
605static void asyncQueueAdvanceTail(void);
606static void ProcessIncomingNotify(bool flush);
609static uint32 notification_hash(const void *key, Size keysize);
610static int notification_match(const void *key1, const void *key2, Size keysize);
611static void ClearPendingActionsAndNotifies(void);
612
613static int
615{
616 const QueuePosition *position = opaque_data;
617
618 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
619 position->page, position->offset);
620}
621
622/*
623 * Compute the difference between two queue page numbers.
624 * Previously this function accounted for a wraparound.
625 */
626static inline int64
628{
629 return p - q;
630}
631
632/*
633 * Determines whether p precedes q.
634 * Previously this function accounted for a wraparound.
635 */
636static inline bool
638{
639 return p < q;
640}
641
642/*
643 * GlobalChannelKeyInit
644 * Prepare a global channel table key for hashing.
645 */
646static inline void
647GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
648{
649 memset(key, 0, sizeof(GlobalChannelKey));
650 key->dboid = dboid;
651 strlcpy(key->channel, channel, NAMEDATALEN);
652}
653
654/*
655 * globalChannelTableHash
656 * Hash function for global channel table keys.
657 */
658static dshash_hash
659globalChannelTableHash(const void *key, size_t size, void *arg)
660{
661 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
662 dshash_hash h;
663
665 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
667
668 return h;
669}
670
671/* parameters for the global channel table */
673 sizeof(GlobalChannelKey),
674 sizeof(GlobalChannelEntry),
679};
680
681/*
682 * initGlobalChannelTable
683 * Lazy initialization of the global channel table.
684 */
685static void
687{
688 MemoryContext oldcontext;
689
690 /* Quick exit if we already did this */
693 return;
694
695 /* Otherwise, use a lock to ensure only one process creates the table */
697
698 /* Be sure any local memory allocated by DSA routines is persistent */
700
702 {
703 /* Initialize dynamic shared hash table for global channels */
709 NULL);
710
711 /* Store handles in shared memory for other backends to use */
715 }
716 else if (!globalChannelTable)
717 {
718 /* Attach to existing dynamic shared hash table */
724 NULL);
725 }
726
727 MemoryContextSwitchTo(oldcontext);
729}
730
731/*
732 * initLocalChannelTable
733 * Lazy initialization of the local channel table.
734 * Once created, this table lasts for the life of the session.
735 */
736static void
738{
740
741 /* Quick exit if we already did this */
742 if (localChannelTable != NULL)
743 return;
744
745 /* Initialize local hash table for this backend's listened channels */
747 hash_ctl.entrysize = sizeof(ChannelName);
748
750 hash_create("Local Listen Channels",
751 64,
752 &hash_ctl,
754}
755
756/*
757 * initPendingListenActions
758 * Lazy initialization of the pending listen actions hash table.
759 * This is allocated in CurTransactionContext during PreCommit_Notify,
760 * and destroyed at transaction end.
761 */
762static void
764{
766
768 return;
769
771 hash_ctl.entrysize = sizeof(PendingListenEntry);
773
775 hash_create("Pending Listen Actions",
777 &hash_ctl,
779}
780
781/*
782 * Report space needed for our shared memory area
783 */
784Size
785AsyncShmemSize(void)
786{
787 Size size;
788
789 /* This had better match AsyncShmemInit */
790 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
791 size = add_size(size, offsetof(AsyncQueueControl, backend));
792
794
795 return size;
796}
797
798/*
799 * Initialize our shared memory area
800 */
801void
802AsyncShmemInit(void)
803{
804 bool found;
805 Size size;
806
807 /*
808 * Create or attach to the AsyncQueueControl structure.
809 */
810 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
811 size = add_size(size, offsetof(AsyncQueueControl, backend));
812
814 ShmemInitStruct("Async Queue Control", size, &found);
815
816 if (!found)
817 {
818 /* First time through, so initialize it */
821 QUEUE_STOP_PAGE = 0;
826 for (int i = 0; i < MaxBackends; i++)
827 {
834 }
835 }
836
837 /*
838 * Set up SLRU management of the pg_notify data. Note that long segment
839 * names are used in order to avoid wraparound.
840 */
841 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
842 NotifyCtl->errdetail_for_io_error = asyncQueueErrdetailForIoError;
845 SYNC_HANDLER_NONE, true);
846
847 if (!found)
848 {
849 /*
850 * During start or reboot, clean out the pg_notify directory.
851 */
853 }
854}
855
856
857/*
858 * pg_notify -
859 * SQL function to send a notification event
860 */
861Datum
863{
864 const char *channel;
865 const char *payload;
866
867 if (PG_ARGISNULL(0))
868 channel = "";
869 else
871
872 if (PG_ARGISNULL(1))
873 payload = "";
874 else
876
877 /* For NOTIFY as a statement, this is checked in ProcessUtility */
879
880 Async_Notify(channel, payload);
881
883}
884
885
886/*
887 * Async_Notify
888 *
889 * This is executed by the SQL notify command.
890 *
891 * Adds the message to the list of pending notifies.
892 * Actual notification happens during transaction commit.
893 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
894 */
895void
896Async_Notify(const char *channel, const char *payload)
897{
898 int my_level = GetCurrentTransactionNestLevel();
899 size_t channel_len;
900 size_t payload_len;
901 Notification *n;
902 MemoryContext oldcontext;
903
904 if (IsParallelWorker())
905 elog(ERROR, "cannot send notifications from a parallel worker");
906
907 if (Trace_notify)
908 elog(DEBUG1, "Async_Notify(%s)", channel);
909
910 channel_len = channel ? strlen(channel) : 0;
911 payload_len = payload ? strlen(payload) : 0;
912
913 /* a channel name must be specified */
914 if (channel_len == 0)
917 errmsg("channel name cannot be empty")));
918
919 /* enforce length limits */
920 if (channel_len >= NAMEDATALEN)
923 errmsg("channel name too long")));
924
925 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
928 errmsg("payload string too long")));
929
930 /*
931 * We must construct the Notification entry, even if we end up not using
932 * it, in order to compare it cheaply to existing list entries.
933 *
934 * The notification list needs to live until end of transaction, so store
935 * it in the transaction context.
936 */
938
940 channel_len + payload_len + 2);
941 n->channel_len = channel_len;
942 n->payload_len = payload_len;
943 strcpy(n->data, channel);
944 if (payload)
945 strcpy(n->data + channel_len + 1, payload);
946 else
947 n->data[channel_len + 1] = '\0';
948
949 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
950 {
952
953 /*
954 * First notify event in current (sub)xact. Note that we allocate the
955 * NotificationList in TopTransactionContext; the nestingLevel might
956 * get changed later by AtSubCommit_Notify.
957 */
960 sizeof(NotificationList));
961 notifies->nestingLevel = my_level;
962 notifies->events = list_make1(n);
963 /* We certainly don't need a hashtable yet */
964 notifies->hashtab = NULL;
965 /* We won't build uniqueChannelNames/Hash till later, either */
966 notifies->uniqueChannelNames = NIL;
967 notifies->uniqueChannelHash = NULL;
968 notifies->upper = pendingNotifies;
970 }
971 else
972 {
973 /* Now check for duplicates */
975 {
976 /* It's a dup, so forget it */
977 pfree(n);
978 MemoryContextSwitchTo(oldcontext);
979 return;
980 }
981
982 /* Append more events to existing list */
984 }
985
986 MemoryContextSwitchTo(oldcontext);
987}
988
989/*
990 * queue_listen
991 * Common code for listen, unlisten, unlisten all commands.
992 *
993 * Adds the request to the list of pending actions.
994 * Actual update of localChannelTable and globalChannelTable happens during
995 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
996 */
997static void
998queue_listen(ListenActionKind action, const char *channel)
999{
1000 MemoryContext oldcontext;
1002 int my_level = GetCurrentTransactionNestLevel();
1003
1004 /*
1005 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1006 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1007 * final per-channel intent is computed during PreCommit_Notify.
1008 */
1010
1011 /* space for terminating null is included in sizeof(ListenAction) */
1013 strlen(channel) + 1);
1014 actrec->action = action;
1015 strcpy(actrec->channel, channel);
1016
1017 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1018 {
1019 ActionList *actions;
1020
1021 /*
1022 * First action in current sub(xact). Note that we allocate the
1023 * ActionList in TopTransactionContext; the nestingLevel might get
1024 * changed later by AtSubCommit_Notify.
1025 */
1026 actions = (ActionList *)
1028 actions->nestingLevel = my_level;
1029 actions->actions = list_make1(actrec);
1030 actions->upper = pendingActions;
1031 pendingActions = actions;
1032 }
1033 else
1035
1036 MemoryContextSwitchTo(oldcontext);
1037}
1038
1039/*
1040 * Async_Listen
1041 *
1042 * This is executed by the SQL listen command.
1043 */
1044void
1045Async_Listen(const char *channel)
1046{
1047 if (Trace_notify)
1048 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1049
1050 queue_listen(LISTEN_LISTEN, channel);
1051}
1052
1053/*
1054 * Async_Unlisten
1055 *
1056 * This is executed by the SQL unlisten command.
1057 */
1058void
1059Async_Unlisten(const char *channel)
1060{
1061 if (Trace_notify)
1062 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1063
1064 /* If we couldn't possibly be listening, no need to queue anything */
1066 return;
1067
1068 queue_listen(LISTEN_UNLISTEN, channel);
1069}
1070
1071/*
1072 * Async_UnlistenAll
1073 *
1074 * This is invoked by UNLISTEN * command, and also at backend exit.
1075 */
1076void
1078{
1079 if (Trace_notify)
1080 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1081
1082 /* If we couldn't possibly be listening, no need to queue anything */
1084 return;
1085
1087}
1088
1089/*
1090 * SQL function: return a set of the channel names this backend is actively
1091 * listening to.
1092 *
1093 * Note: this coding relies on the fact that the localChannelTable cannot
1094 * change within a transaction.
1095 */
1096Datum
1098{
1100 HASH_SEQ_STATUS *status;
1101
1102 /* stuff done only on the first call of the function */
1103 if (SRF_IS_FIRSTCALL())
1104 {
1105 /* create a function context for cross-call persistence */
1107
1108 /* Initialize hash table iteration if we have any channels */
1109 if (localChannelTable != NULL)
1110 {
1111 MemoryContext oldcontext;
1112
1113 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1114 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1116 funcctx->user_fctx = status;
1117 MemoryContextSwitchTo(oldcontext);
1118 }
1119 else
1120 {
1121 funcctx->user_fctx = NULL;
1122 }
1123 }
1124
1125 /* stuff done on every call of the function */
1127 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1128
1129 if (status != NULL)
1130 {
1131 ChannelName *entry;
1132
1133 entry = (ChannelName *) hash_seq_search(status);
1134 if (entry != NULL)
1136 }
1137
1139}
1140
1141/*
1142 * Async_UnlistenOnExit
1143 *
1144 * This is executed at backend exit if we have done any LISTENs in this
1145 * backend. It might not be necessary anymore, if the user UNLISTENed
1146 * everything, but we don't try to detect that case.
1147 */
1148static void
1150{
1153}
1154
1155/*
1156 * AtPrepare_Notify
1157 *
1158 * This is called at the prepare phase of a two-phase
1159 * transaction. Save the state for possible commit later.
1160 */
1161void
1162AtPrepare_Notify(void)
1163{
1164 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1166 ereport(ERROR,
1168 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1169}
1170
1171/*
1172 * PreCommit_Notify
1173 *
1174 * This is called at transaction commit, before actually committing to
1175 * clog.
1176 *
1177 * If there are pending LISTEN actions, make sure we are listed in the
1178 * shared-memory listener array. This must happen before commit to
1179 * ensure we don't miss any notifies from transactions that commit
1180 * just after ours.
1181 *
1182 * If there are outbound notify requests in the pendingNotifies list,
1183 * add them to the global queue. We do that before commit so that
1184 * we can still throw error if we run out of queue space.
1185 */
1186void
1187PreCommit_Notify(void)
1188{
1189 ListCell *p;
1190
1192 return; /* no relevant statements in this xact */
1193
1194 if (Trace_notify)
1195 elog(DEBUG1, "PreCommit_Notify");
1196
1197 /* Preflight for any pending listen/unlisten actions */
1199
1200 if (pendingActions != NULL)
1201 {
1202 /* Ensure we have a local channel table */
1204 /* Create pendingListenActions hash table for this transaction */
1206
1207 /* Stage all the actions this transaction wants to perform */
1208 foreach(p, pendingActions->actions)
1209 {
1211
1212 switch (actrec->action)
1213 {
1214 case LISTEN_LISTEN:
1217 break;
1218 case LISTEN_UNLISTEN:
1220 break;
1223 break;
1224 }
1225 }
1226 }
1227
1228 /* Queue any pending notifies (must happen after the above) */
1229 if (pendingNotifies)
1230 {
1232 bool firstIteration = true;
1233
1234 /*
1235 * Build list of unique channel names being notified for use by
1236 * SignalBackends().
1237 *
1238 * If uniqueChannelHash is available, use it to efficiently get the
1239 * unique channels. Otherwise, fall back to the O(N^2) approach.
1240 */
1243 {
1244 HASH_SEQ_STATUS status;
1246
1248 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1251 channelEntry->channel);
1252 }
1253 else
1254 {
1255 /* O(N^2) approach is better for small number of notifications */
1257 {
1258 char *channel = n->data;
1259 bool found = false;
1260
1261 /* Name present in list? */
1263 {
1264 if (strcmp(oldchan, channel) == 0)
1265 {
1266 found = true;
1267 break;
1268 }
1269 }
1270 /* Add if not already in list */
1271 if (!found)
1274 channel);
1275 }
1276 }
1277
1278 /* Preallocate workspace that will be needed by SignalBackends() */
1279 if (signalPids == NULL)
1281 MaxBackends * sizeof(int32));
1282
1283 if (signalProcnos == NULL)
1285 MaxBackends * sizeof(ProcNumber));
1286
1287 /*
1288 * Make sure that we have an XID assigned to the current transaction.
1289 * GetCurrentTransactionId is cheap if we already have an XID, but not
1290 * so cheap if we don't, and we'd prefer not to do that work while
1291 * holding NotifyQueueLock.
1292 */
1294
1295 /*
1296 * Serialize writers by acquiring a special lock that we hold till
1297 * after commit. This ensures that queue entries appear in commit
1298 * order, and in particular that there are never uncommitted queue
1299 * entries ahead of committed ones, so an uncommitted transaction
1300 * can't block delivery of deliverable notifications.
1301 *
1302 * We use a heavyweight lock so that it'll automatically be released
1303 * after either commit or abort. This also allows deadlocks to be
1304 * detected, though really a deadlock shouldn't be possible here.
1305 *
1306 * The lock is on "database 0", which is pretty ugly but it doesn't
1307 * seem worth inventing a special locktag category just for this.
1308 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1309 * used by the flatfiles mechanism.)
1310 */
1313
1314 /*
1315 * For the direct advancement optimization in SignalBackends(), we
1316 * need to ensure that no other backend can insert queue entries
1317 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1318 * heavyweight lock above provides this guarantee, since it serializes
1319 * all writers.
1320 *
1321 * Note: if the heavyweight lock were ever removed for scalability
1322 * reasons, we could achieve the same guarantee by holding
1323 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1324 * than releasing and reacquiring it for each page as we do below.
1325 */
1326
1327 /* Initialize values to a safe default in case list is empty */
1330
1331 /* Now push the notifications into the queue */
1333 while (nextNotify != NULL)
1334 {
1335 /*
1336 * Add the pending notifications to the queue. We acquire and
1337 * release NotifyQueueLock once per page, which might be overkill
1338 * but it does allow readers to get in while we're doing this.
1339 *
1340 * A full queue is very uncommon and should really not happen,
1341 * given that we have so much space available in the SLRU pages.
1342 * Nevertheless we need to deal with this possibility. Note that
1343 * when we get here we are in the process of committing our
1344 * transaction, but we have not yet committed to clog, so at this
1345 * point in time we can still roll the transaction back.
1346 */
1348 if (firstIteration)
1349 {
1351 firstIteration = false;
1352 }
1354 if (asyncQueueIsFull())
1355 ereport(ERROR,
1357 errmsg("too many notifications in the NOTIFY queue")));
1361 }
1362
1363 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1364 }
1365}
1366
1367/*
1368 * AtCommit_Notify
1369 *
1370 * This is called at transaction commit, after committing to clog.
1371 *
1372 * Apply pending listen/unlisten changes and clear transaction-local state.
1373 *
1374 * If we issued any notifications in the transaction, send signals to
1375 * listening backends (possibly including ourselves) to process them.
1376 * Also, if we filled enough queue pages with new notifies, try to
1377 * advance the queue tail pointer.
1378 */
1379void
1380AtCommit_Notify(void)
1381{
1382 /*
1383 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1384 * return as soon as possible
1385 */
1387 return;
1388
1389 if (Trace_notify)
1390 elog(DEBUG1, "AtCommit_Notify");
1391
1392 /* Apply staged listen/unlisten changes */
1394
1395 /* If no longer listening to anything, get out of listener array */
1398
1399 /*
1400 * Send signals to listening backends. We need do this only if there are
1401 * pending notifies, which were previously added to the shared queue by
1402 * PreCommit_Notify().
1403 */
1404 if (pendingNotifies != NULL)
1406
1407 /*
1408 * If it's time to try to advance the global tail pointer, do that.
1409 *
1410 * (It might seem odd to do this in the sender, when more than likely the
1411 * listeners won't yet have read the messages we just sent. However,
1412 * there's less contention if only the sender does it, and there is little
1413 * need for urgency in advancing the global tail. So this typically will
1414 * be clearing out messages that were sent some time ago.)
1415 */
1416 if (tryAdvanceTail)
1417 {
1418 tryAdvanceTail = false;
1420 }
1421
1422 /* And clean up */
1424}
1425
1426/*
1427 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1428 *
1429 * This function must make sure we are ready to catch any incoming messages.
1430 */
1431static void
1433{
1434 QueuePosition head;
1435 QueuePosition max;
1437
1438 /*
1439 * Nothing to do if we are already listening to something, nor if we
1440 * already ran this routine in this transaction.
1441 */
1443 return;
1444
1445 if (Trace_notify)
1446 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1447
1448 /*
1449 * Before registering, make sure we will unlisten before dying. (Note:
1450 * this action does not get undone if we abort later.)
1451 */
1453 {
1456 }
1457
1458 /*
1459 * This is our first LISTEN, so establish our pointer.
1460 *
1461 * We set our pointer to the global tail pointer and then move it forward
1462 * over already-committed notifications. This ensures we cannot miss any
1463 * not-yet-committed notifications. We might get a few more but that
1464 * doesn't hurt.
1465 *
1466 * In some scenarios there might be a lot of committed notifications that
1467 * have not yet been pruned away (because some backend is being lazy about
1468 * reading them). To reduce our startup time, we can look at other
1469 * backends and adopt the maximum "pos" pointer of any backend that's in
1470 * our database; any notifications it's already advanced over are surely
1471 * committed and need not be re-examined by us. (We must consider only
1472 * backends connected to our DB, because others will not have bothered to
1473 * check committed-ness of notifications in our DB.)
1474 *
1475 * We need exclusive lock here so we can look at other backends' entries
1476 * and manipulate the list links.
1477 */
1479 head = QUEUE_HEAD;
1480 max = QUEUE_TAIL;
1483 {
1485 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1486 /* Also find last listening backend before this one */
1487 if (i < MyProcNumber)
1488 prevListener = i;
1489 }
1495 /* Insert backend into list of listeners at correct position */
1497 {
1500 }
1501 else
1502 {
1505 }
1507
1508 /* Now we are listed in the global array, so remember we're listening */
1509 amRegisteredListener = true;
1510
1511 /*
1512 * Try to move our pointer forward as far as possible. This will skip
1513 * over already-committed notifications, which we want to do because they
1514 * might be quite stale. Note that we are not yet listening on anything,
1515 * so we won't deliver such notifications to our frontend. Also, although
1516 * our transaction might have executed NOTIFY, those message(s) aren't
1517 * queued yet so we won't skip them here.
1518 */
1519 if (!QUEUE_POS_EQUAL(max, head))
1521}
1522
1523/*
1524 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1525 *
1526 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1527 * an entry in localChannelTable, and pre-allocating an entry in the shared
1528 * globalChannelTable with listening=false. The listening flag will be set
1529 * to true in AtCommit_Notify. If we abort later, unwanted table entries
1530 * will be removed.
1531 */
1532static void
1533PrepareTableEntriesForListen(const char *channel)
1534{
1536 GlobalChannelEntry *entry;
1537 bool found;
1539 PendingListenEntry *pending;
1540
1541 /*
1542 * Record in local pending hash that we want to LISTEN, overwriting any
1543 * earlier attempt to UNLISTEN.
1544 */
1545 pending = (PendingListenEntry *)
1547 pending->action = PENDING_LISTEN;
1548
1549 /*
1550 * Ensure that there is an entry for the channel in localChannelTable.
1551 * (Should this fail, we can just roll back.) If the transaction fails
1552 * after this point, we will remove the entry if appropriate during
1553 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1554 * to return TRUE; we assume nothing is going to consult that before
1555 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1556 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1557 * present to ensure they do the right things; see
1558 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1559 */
1561
1562 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1563 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1564 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1565
1566 if (!found)
1567 {
1568 /* New channel entry, so initialize it to a safe state */
1570 entry->numListeners = 0;
1571 entry->allocatedListeners = 0;
1572 }
1573
1574 /*
1575 * Create listenersArray if entry doesn't have one. It's tempting to fold
1576 * this into the !found case, but this coding allows us to cope in case
1577 * dsa_allocate() failed in an earlier attempt.
1578 */
1579 if (!DsaPointerIsValid(entry->listenersArray))
1580 {
1584 }
1585
1588
1589 /*
1590 * Check if we already have a ListenerEntry (possibly from earlier in this
1591 * transaction)
1592 */
1593 for (int i = 0; i < entry->numListeners; i++)
1594 {
1595 if (listeners[i].procNo == MyProcNumber)
1596 {
1597 /* Already have an entry; listening flag stays as-is until commit */
1599 return;
1600 }
1601 }
1602
1603 /* Need to add a new entry; grow array if necessary */
1604 if (entry->numListeners >= entry->allocatedListeners)
1605 {
1606 int new_size = entry->allocatedListeners * 2;
1609 sizeof(ListenerEntry) * new_size);
1611
1613 entry->listenersArray = new_array;
1617 }
1618
1619 listeners[entry->numListeners].procNo = MyProcNumber;
1620 listeners[entry->numListeners].listening = false; /* staged, not yet
1621 * committed */
1622 entry->numListeners++;
1623
1625}
1626
1627/*
1628 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1629 *
1630 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1631 * we're currently listening (committed or staged). We don't touch
1632 * globalChannelTable yet - the listener keeps receiving signals until
1633 * commit, when the entry is removed.
1634 */
1635static void
1636PrepareTableEntriesForUnlisten(const char *channel)
1637{
1638 PendingListenEntry *pending;
1639
1640 /*
1641 * If the channel name is not in localChannelTable, then we are neither
1642 * listening on it nor preparing to listen on it, so we don't need to
1643 * record an UNLISTEN action.
1644 */
1646 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1647 return;
1648
1649 /*
1650 * Record in local pending hash that we want to UNLISTEN, overwriting any
1651 * earlier attempt to LISTEN. Don't touch localChannelTable or
1652 * globalChannelTable yet - we keep receiving signals until commit.
1653 */
1654 pending = (PendingListenEntry *)
1656 pending->action = PENDING_UNLISTEN;
1657}
1658
1659/*
1660 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1661 *
1662 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1663 * about-to-be-listened channels in pendingListenActions.
1664 */
1665static void
1667{
1670 PendingListenEntry *pending;
1671
1672 /*
1673 * Scan localChannelTable, which will have the names of all channels that
1674 * we are listening on or have prepared to listen on. Record an UNLISTEN
1675 * action for each one, overwriting any earlier attempt to LISTEN.
1676 */
1678 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1679 {
1680 pending = (PendingListenEntry *)
1682 pending->action = PENDING_UNLISTEN;
1683 }
1684}
1685
1686/*
1687 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1688 *
1689 * Decrements numListeners, compacts the array, and frees the entry if empty.
1690 * Sets *entry_ptr to NULL if the entry was deleted.
1691 *
1692 * We could get the listeners pointer from the entry, but all callers
1693 * already have it at hand.
1694 */
1695static void
1698 int idx)
1699{
1700 GlobalChannelEntry *entry = *entry_ptr;
1701
1702 entry->numListeners--;
1703 if (idx < entry->numListeners)
1705 sizeof(ListenerEntry) * (entry->numListeners - idx));
1706
1707 if (entry->numListeners == 0)
1708 {
1711 /* tells caller not to release the entry's lock: */
1712 *entry_ptr = NULL;
1713 }
1714}
1715
1716/*
1717 * ApplyPendingListenActions
1718 *
1719 * Apply, or revert, staged listen/unlisten changes to the local and global
1720 * hash tables.
1721 */
1722static void
1724{
1726 PendingListenEntry *pending;
1727
1728 /* Quick exit if nothing to do */
1730 return;
1731
1732 /* We made a globalChannelTable before building pendingListenActions */
1733 if (globalChannelTable == NULL)
1734 elog(PANIC, "global channel table missing post-commit/abort");
1735
1736 /* For each staged action ... */
1738 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1739 {
1741 GlobalChannelEntry *entry;
1742 bool removeLocal = true;
1743 bool foundListener = false;
1744
1745 /*
1746 * Find the global entry for this channel. If isCommit, it had better
1747 * exist (it was created in PreCommit). In an abort, it might not
1748 * exist, in which case we are not listening and should discard any
1749 * local entry that PreCommit may have managed to create.
1750 */
1751 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1752 entry = dshash_find(globalChannelTable, &key, true);
1753 if (entry != NULL)
1754 {
1755 /* Scan entry to find the ListenerEntry for this backend */
1757
1760
1761 for (int i = 0; i < entry->numListeners; i++)
1762 {
1763 if (listeners[i].procNo != MyProcNumber)
1764 continue;
1765 foundListener = true;
1766 if (isCommit)
1767 {
1768 if (pending->action == PENDING_LISTEN)
1769 {
1770 /*
1771 * LISTEN being committed: set listening=true.
1772 * localChannelTable entry was created during
1773 * PreCommit and should be kept.
1774 */
1775 listeners[i].listening = true;
1776 removeLocal = false;
1777 }
1778 else
1779 {
1780 /*
1781 * UNLISTEN being committed: remove pre-allocated
1782 * entries from both tables.
1783 */
1785 }
1786 }
1787 else
1788 {
1789 /*
1790 * Note: this part is reachable only if the transaction
1791 * aborts after PreCommit_Notify() has made some
1792 * pendingListenActions entries, so it's pretty hard to
1793 * test.
1794 */
1795 if (!listeners[i].listening)
1796 {
1797 /*
1798 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1799 * and we weren't listening before, so remove
1800 * pre-allocated entries from both tables.
1801 */
1803 }
1804 else
1805 {
1806 /*
1807 * We're aborting, but the previous state was that
1808 * we're listening, so keep localChannelTable entry.
1809 */
1810 removeLocal = false;
1811 }
1812 }
1813 break; /* there shouldn't be another match */
1814 }
1815
1816 /* We might have already released the entry by removing it */
1817 if (entry != NULL)
1819 }
1820
1821 /*
1822 * If we're committing a LISTEN action, we should have found a
1823 * matching ListenerEntry, but otherwise it's okay if we didn't.
1824 */
1825 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1826 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1827 pending->channel, MyProcNumber);
1828
1829 /*
1830 * If we did not find a globalChannelTable entry for our backend, or
1831 * if we are unlistening, remove any localChannelTable entry that may
1832 * exist. (Note in particular that this cleans up if we created a
1833 * localChannelTable entry and then failed while trying to create a
1834 * globalChannelTable entry.)
1835 */
1838 HASH_REMOVE, NULL);
1839 }
1840}
1841
1842/*
1843 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1844 *
1845 * Remove this backend from all channels in the shared global table.
1846 */
1847static void
1849{
1850 dshash_seq_status status;
1851 GlobalChannelEntry *entry;
1852
1853 if (Trace_notify)
1854 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1855
1856 /* Clear our local cache (not really necessary, but be consistent) */
1857 if (localChannelTable != NULL)
1858 {
1861 }
1862
1863 /* Now remove our entries from the shared globalChannelTable */
1864 if (globalChannelTable == NULL)
1865 return;
1866
1867 dshash_seq_init(&status, globalChannelTable, true);
1868 while ((entry = dshash_seq_next(&status)) != NULL)
1869 {
1871
1872 if (entry->key.dboid != MyDatabaseId)
1873 continue; /* not relevant */
1874
1877
1878 for (int i = 0; i < entry->numListeners; i++)
1879 {
1880 if (listeners[i].procNo == MyProcNumber)
1881 {
1882 entry->numListeners--;
1883 if (i < entry->numListeners)
1884 memmove(&listeners[i], &listeners[i + 1],
1885 sizeof(ListenerEntry) * (entry->numListeners - i));
1886
1887 if (entry->numListeners == 0)
1888 {
1890 dshash_delete_current(&status);
1891 }
1892 break;
1893 }
1894 }
1895 }
1896 dshash_seq_term(&status);
1897}
1898
1899/*
1900 * Test whether we are actively listening on the given channel name.
1901 *
1902 * Note: this function is executed for every notification found in the queue.
1903 */
1904static bool
1905IsListeningOn(const char *channel)
1906{
1907 if (localChannelTable == NULL)
1908 return false;
1909
1910 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1911}
1912
1913/*
1914 * Remove our entry from the listeners array when we are no longer listening
1915 * on any channel. NB: must not fail if we're already not listening.
1916 */
1917static void
1919{
1920 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1921
1922 if (!amRegisteredListener) /* nothing to do */
1923 return;
1924
1925 /*
1926 * Need exclusive lock here to manipulate list links.
1927 */
1929 /* Mark our entry as invalid */
1934 /* and remove it from the list */
1937 else
1938 {
1940 {
1942 {
1944 break;
1945 }
1946 }
1947 }
1950
1951 /* mark ourselves as no longer listed in the global array */
1952 amRegisteredListener = false;
1953}
1954
1955/*
1956 * Test whether there is room to insert more notification messages.
1957 *
1958 * Caller must hold at least shared NotifyQueueLock.
1959 */
1960static bool
1961asyncQueueIsFull(void)
1962{
1963 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1965 int64 occupied = headPage - tailPage;
1966
1968}
1969
1970/*
1971 * Advance the QueuePosition to the next entry, assuming that the current
1972 * entry is of length entryLength. If we jump to a new page the function
1973 * returns true, else false.
1974 */
1975static bool
1976asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1977{
1978 int64 pageno = QUEUE_POS_PAGE(*position);
1979 int offset = QUEUE_POS_OFFSET(*position);
1980 bool pageJump = false;
1981
1982 /*
1983 * Move to the next writing position: First jump over what we have just
1984 * written or read.
1985 */
1986 offset += entryLength;
1987 Assert(offset <= QUEUE_PAGESIZE);
1988
1989 /*
1990 * In a second step check if another entry can possibly be written to the
1991 * page. If so, stay here, we have reached the next position. If not, then
1992 * we need to move on to the next page.
1993 */
1995 {
1996 pageno++;
1997 offset = 0;
1998 pageJump = true;
1999 }
2000
2001 SET_QUEUE_POS(*position, pageno, offset);
2002 return pageJump;
2003}
2004
2005/*
2006 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
2007 */
2008static void
2010{
2011 size_t channellen = n->channel_len;
2012 size_t payloadlen = n->payload_len;
2013 int entryLength;
2014
2017
2018 /* The terminators are already included in AsyncQueueEntryEmptySize */
2021 qe->length = entryLength;
2022 qe->dboid = MyDatabaseId;
2023 qe->xid = GetCurrentTransactionId();
2024 qe->srcPid = MyProcPid;
2025 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2026}
2027
2028/*
2029 * Add pending notifications to the queue.
2030 *
2031 * We go page by page here, i.e. we stop once we have to go to a new page but
2032 * we will be called again and then fill that next page. If an entry does not
2033 * fit into the current page, we write a dummy entry with an InvalidOid as the
2034 * database OID in order to fill the page. So every page is always used up to
2035 * the last byte which simplifies reading the page later.
2036 *
2037 * We are passed the list cell (in pendingNotifies->events) containing the next
2038 * notification to write and return the first still-unwritten cell back.
2039 * Eventually we will return NULL indicating all is done.
2040 *
2041 * We are holding NotifyQueueLock already from the caller and grab
2042 * page specific SLRU bank lock locally in this function.
2043 */
2044static ListCell *
2046{
2049 int64 pageno;
2050 int offset;
2051 int slotno;
2053
2054 /*
2055 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2056 * memory upon exiting. The reason for this is that if we have to advance
2057 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2058 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2059 * subsequent insertions would try to put entries into a page that slru.c
2060 * thinks doesn't exist yet.) So, use a local position variable. Note
2061 * that if we do fail, any already-inserted queue entries are forgotten;
2062 * this is okay, since they'd be useless anyway after our transaction
2063 * rolls back.
2064 */
2066
2067 /*
2068 * If this is the first write since the postmaster started, we need to
2069 * initialize the first page of the async SLRU. Otherwise, the current
2070 * page should be initialized already, so just fetch it.
2071 */
2072 pageno = QUEUE_POS_PAGE(queue_head);
2074
2075 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2077
2080 else
2081 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2082
2083 /* Note we mark the page dirty before writing in it */
2084 NotifyCtl->shared->page_dirty[slotno] = true;
2085
2086 while (nextNotify != NULL)
2087 {
2089
2090 /* Construct a valid queue entry in local variable qe */
2092
2093 offset = QUEUE_POS_OFFSET(queue_head);
2094
2095 /* Check whether the entry really fits on the current page */
2096 if (offset + qe.length <= QUEUE_PAGESIZE)
2097 {
2098 /* OK, so advance nextNotify past this item */
2100 }
2101 else
2102 {
2103 /*
2104 * Write a dummy entry to fill up the page. Actually readers will
2105 * only check dboid and since it won't match any reader's database
2106 * OID, they will ignore this entry and move on.
2107 */
2108 qe.length = QUEUE_PAGESIZE - offset;
2109 qe.dboid = InvalidOid;
2111 qe.data[0] = '\0'; /* empty channel */
2112 qe.data[1] = '\0'; /* empty payload */
2113 }
2114
2115 /* Now copy qe into the shared buffer page */
2116 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2117 &qe,
2118 qe.length);
2119
2120 /* Advance queue_head appropriately, and detect if page is full */
2121 if (asyncQueueAdvance(&(queue_head), qe.length))
2122 {
2123 LWLock *lock;
2124
2125 pageno = QUEUE_POS_PAGE(queue_head);
2126 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2127 if (lock != prevlock)
2128 {
2131 prevlock = lock;
2132 }
2133
2134 /*
2135 * Page is full, so we're done here, but first fill the next page
2136 * with zeroes. The reason to do this is to ensure that slru.c's
2137 * idea of the head page is always the same as ours, which avoids
2138 * boundary problems in SimpleLruTruncate. The test in
2139 * asyncQueueIsFull() ensured that there is room to create this
2140 * page without overrunning the queue.
2141 */
2143
2144 /*
2145 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2146 * set flag to remember that we should try to advance the tail
2147 * pointer (we don't want to actually do that right here).
2148 */
2150 tryAdvanceTail = true;
2151
2152 /* And exit the loop */
2153 break;
2154 }
2155 }
2156
2157 /* Success, so update the global QUEUE_HEAD */
2159
2161
2162 return nextNotify;
2163}
2164
2165/*
2166 * SQL function to return the fraction of the notification queue currently
2167 * occupied.
2168 */
2169Datum
2171{
2172 double usage;
2173
2174 /* Advance the queue tail so we don't report a too-large result */
2176
2180
2182}
2183
2184/*
2185 * Return the fraction of the queue that is currently occupied.
2186 *
2187 * The caller must hold NotifyQueueLock in (at least) shared mode.
2188 *
2189 * Note: we measure the distance to the logical tail page, not the physical
2190 * tail page. In some sense that's wrong, but the relative position of the
2191 * physical tail is affected by details such as SLRU segment boundaries,
2192 * so that a result based on that is unpleasantly unstable.
2193 */
2194static double
2195asyncQueueUsage(void)
2196{
2197 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2199 int64 occupied = headPage - tailPage;
2200
2201 if (occupied == 0)
2202 return (double) 0; /* fast exit for common case */
2203
2204 return (double) occupied / (double) max_notify_queue_pages;
2205}
2206
2207/*
2208 * Check whether the queue is at least half full, and emit a warning if so.
2209 *
2210 * This is unlikely given the size of the queue, but possible.
2211 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2212 *
2213 * Caller must hold exclusive NotifyQueueLock.
2214 */
2215static void
2217{
2218 double fillDegree;
2219 TimestampTz t;
2220
2222 if (fillDegree < 0.5)
2223 return;
2224
2225 t = GetCurrentTimestamp();
2226
2229 {
2232
2234 {
2236 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2239 }
2240
2242 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2243 (minPid != InvalidPid ?
2244 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2245 : 0),
2246 (minPid != InvalidPid ?
2247 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2248 : 0)));
2249
2251 }
2252}
2253
2254/*
2255 * Send signals to listening backends.
2256 *
2257 * Normally we signal only backends that are interested in the notifies that
2258 * we just sent. However, that will leave idle listeners falling further and
2259 * further behind. Waken them anyway if they're far enough behind, so they'll
2260 * advance their queue position pointers, allowing the global tail to advance.
2261 *
2262 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2263 *
2264 * This is called during CommitTransaction(), so it's important for it
2265 * to have very low probability of failure.
2266 */
2267static void
2268SignalBackends(void)
2269{
2270 int count;
2271
2272 /* Can't get here without PreCommit_Notify having made the global table */
2274
2275 /* It should have set up these arrays, too */
2277
2278 /*
2279 * Identify backends that we need to signal. We don't want to send
2280 * signals while holding the NotifyQueueLock, so this part just builds a
2281 * list of target PIDs in signalPids[] and signalProcnos[].
2282 */
2283 count = 0;
2284
2286
2287 /* Scan each channel name that we notified in this transaction */
2289 {
2291 GlobalChannelEntry *entry;
2293
2294 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2295 entry = dshash_find(globalChannelTable, &key, false);
2296 if (entry == NULL)
2297 continue; /* nobody is listening */
2298
2300 entry->listenersArray);
2301
2302 /* Identify listeners that now need waking, add them to arrays */
2303 for (int j = 0; j < entry->numListeners; j++)
2304 {
2305 ProcNumber i;
2306 int32 pid;
2307 QueuePosition pos;
2308
2309 if (!listeners[j].listening)
2310 continue; /* ignore not-yet-committed listeners */
2311
2312 i = listeners[j].procNo;
2313
2315 continue; /* already signaled, no need to repeat */
2316
2317 pid = QUEUE_BACKEND_PID(i);
2318 pos = QUEUE_BACKEND_POS(i);
2319
2320 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2321 continue; /* it's fully caught up already */
2322
2323 Assert(pid != InvalidPid);
2324
2326 signalPids[count] = pid;
2327 signalProcnos[count] = i;
2328 count++;
2329 }
2330
2332 }
2333
2334 /*
2335 * Scan all listeners. Any that are not already pending wakeup must not
2336 * be interested in our notifications (else we'd have set their wakeup
2337 * flags above). Check to see if we can directly advance their queue
2338 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2339 * them anyway so they will catch up.
2340 */
2342 {
2343 int32 pid;
2344 QueuePosition pos;
2345
2347 continue;
2348
2349 /* If it's currently advancing, we should not touch it */
2351 continue;
2352
2353 pid = QUEUE_BACKEND_PID(i);
2354 pos = QUEUE_BACKEND_POS(i);
2355
2356 /*
2357 * We can directly advance the other backend's queue pointer if it's
2358 * not currently advancing (else there are race conditions), and its
2359 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2360 * it miss some older messages), and we'd not be moving the pointer
2361 * backward.
2362 */
2365 {
2366 /* We can directly advance its pointer past what we wrote */
2368 }
2371 {
2372 /* It's idle and far behind, so wake it up */
2373 Assert(pid != InvalidPid);
2374
2376 signalPids[count] = pid;
2377 signalProcnos[count] = i;
2378 count++;
2379 }
2380 }
2381
2383
2384 /* Now send signals */
2385 for (int i = 0; i < count; i++)
2386 {
2387 int32 pid = signalPids[i];
2388
2389 /*
2390 * If we are signaling our own process, no need to involve the kernel;
2391 * just set the flag directly.
2392 */
2393 if (pid == MyProcPid)
2394 {
2396 continue;
2397 }
2398
2399 /*
2400 * Note: assuming things aren't broken, a signal failure here could
2401 * only occur if the target backend exited since we released
2402 * NotifyQueueLock; which is unlikely but certainly possible. So we
2403 * just log a low-level debug message if it happens.
2404 */
2406 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2407 }
2408}
2409
2410/*
2411 * AtAbort_Notify
2412 *
2413 * This is called at transaction abort.
2414 *
2415 * Revert any staged listen/unlisten changes and clean up transaction state.
2416 * This only does anything if we abort after PreCommit_Notify has staged
2417 * some entries.
2418 */
2419void
2420AtAbort_Notify(void)
2421{
2422 /* Revert staged listen/unlisten changes */
2424
2425 /* If we're no longer listening on anything, unregister */
2428
2429 /* And clean up */
2431}
2432
2433/*
2434 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2435 *
2436 * Reassign all items in the pending lists to the parent transaction.
2437 */
2438void
2440{
2441 int my_level = GetCurrentTransactionNestLevel();
2442
2443 /* If there are actions at our nesting level, we must reparent them. */
2444 if (pendingActions != NULL &&
2445 pendingActions->nestingLevel >= my_level)
2446 {
2447 if (pendingActions->upper == NULL ||
2448 pendingActions->upper->nestingLevel < my_level - 1)
2449 {
2450 /* nothing to merge; give the whole thing to the parent */
2452 }
2453 else
2454 {
2456
2458
2459 /*
2460 * Mustn't try to eliminate duplicates here --- see queue_listen()
2461 */
2464 childPendingActions->actions);
2466 }
2467 }
2468
2469 /* If there are notifies at our nesting level, we must reparent them. */
2470 if (pendingNotifies != NULL &&
2471 pendingNotifies->nestingLevel >= my_level)
2472 {
2473 Assert(pendingNotifies->nestingLevel == my_level);
2474
2475 if (pendingNotifies->upper == NULL ||
2476 pendingNotifies->upper->nestingLevel < my_level - 1)
2477 {
2478 /* nothing to merge; give the whole thing to the parent */
2480 }
2481 else
2482 {
2483 /*
2484 * Formerly, we didn't bother to eliminate duplicates here, but
2485 * now we must, else we fall foul of "Assert(!found)", either here
2486 * or during a later attempt to build the parent-level hashtable.
2487 */
2489 ListCell *l;
2490
2492 /* Insert all the subxact's events into parent, except for dups */
2493 foreach(l, childPendingNotifies->events)
2494 {
2496
2499 }
2501 }
2502 }
2503}
2504
2505/*
2506 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2507 */
2508void
2510{
2511 int my_level = GetCurrentTransactionNestLevel();
2512
2513 /*
2514 * All we have to do is pop the stack --- the actions/notifies made in
2515 * this subxact are no longer interesting, and the space will be freed
2516 * when CurTransactionContext is recycled. We still have to free the
2517 * ActionList and NotificationList objects themselves, though, because
2518 * those are allocated in TopTransactionContext.
2519 *
2520 * Note that there might be no entries at all, or no entries for the
2521 * current subtransaction level, either because none were ever created, or
2522 * because we reentered this routine due to trouble during subxact abort.
2523 */
2524 while (pendingActions != NULL &&
2525 pendingActions->nestingLevel >= my_level)
2526 {
2528
2531 }
2532
2533 while (pendingNotifies != NULL &&
2534 pendingNotifies->nestingLevel >= my_level)
2535 {
2537
2540 }
2541}
2542
2543/*
2544 * HandleNotifyInterrupt
2545 *
2546 * Signal handler portion of interrupt handling. Let the backend know
2547 * that there's a pending notify interrupt. If we're currently reading
2548 * from the client, this will interrupt the read and
2549 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2550 */
2551void
2553{
2554 /*
2555 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2556 * you do here.
2557 */
2558
2559 /* signal that work needs to be done */
2561
2562 /* make sure the event is processed in due course */
2564}
2565
2566/*
2567 * ProcessNotifyInterrupt
2568 *
2569 * This is called if we see notifyInterruptPending set, just before
2570 * transmitting ReadyForQuery at the end of a frontend command, and
2571 * also if a notify signal occurs while reading from the frontend.
2572 * HandleNotifyInterrupt() will cause the read to be interrupted
2573 * via the process's latch, and this routine will get called.
2574 * If we are truly idle (ie, *not* inside a transaction block),
2575 * process the incoming notifies.
2576 *
2577 * If "flush" is true, force any frontend messages out immediately.
2578 * This can be false when being called at the end of a frontend command,
2579 * since we'll flush after sending ReadyForQuery.
2580 */
2581void
2582ProcessNotifyInterrupt(bool flush)
2583{
2585 return; /* not really idle */
2586
2587 /* Loop in case another signal arrives while sending messages */
2589 ProcessIncomingNotify(flush);
2590}
2591
2592
2593/*
2594 * Read all pending notifications from the queue, and deliver appropriate
2595 * ones to my frontend. Stop when we reach queue head or an uncommitted
2596 * notification.
2597 */
2598static void
2600{
2601 QueuePosition pos;
2602 QueuePosition head;
2603 Snapshot snapshot;
2604
2605 /*
2606 * Fetch current state, indicate to others that we have woken up, and that
2607 * we are in process of advancing our position.
2608 */
2610 /* Assert checks that we have a valid state entry */
2614 head = QUEUE_HEAD;
2615
2616 if (QUEUE_POS_EQUAL(pos, head))
2617 {
2618 /* Nothing to do, we have read all notifications already. */
2620 return;
2621 }
2622
2625
2626 /*----------
2627 * Get snapshot we'll use to decide which xacts are still in progress.
2628 * This is trickier than it might seem, because of race conditions.
2629 * Consider the following example:
2630 *
2631 * Backend 1: Backend 2:
2632 *
2633 * transaction starts
2634 * UPDATE foo SET ...;
2635 * NOTIFY foo;
2636 * commit starts
2637 * queue the notify message
2638 * transaction starts
2639 * LISTEN foo; -- first LISTEN in session
2640 * SELECT * FROM foo WHERE ...;
2641 * commit to clog
2642 * commit starts
2643 * add backend 2 to array of listeners
2644 * advance to queue head (this code)
2645 * commit to clog
2646 *
2647 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2648 * wasn't committed yet. Ideally we'd ensure that client 2 would
2649 * eventually get transaction 1's notify message, but there's no way
2650 * to do that; until we're in the listener array, there's no guarantee
2651 * that the notify message doesn't get removed from the queue.
2652 *
2653 * Therefore the coding technique transaction 2 is using is unsafe:
2654 * applications must commit a LISTEN before inspecting database state,
2655 * if they want to ensure they will see notifications about subsequent
2656 * changes to that state.
2657 *
2658 * What we do guarantee is that we'll see all notifications from
2659 * transactions committing after the snapshot we take here.
2660 * BecomeRegisteredListener has already added us to the listener array,
2661 * so no not-yet-committed messages can be removed from the queue
2662 * before we see them.
2663 *----------
2664 */
2665 snapshot = RegisterSnapshot(GetLatestSnapshot());
2666
2667 /*
2668 * It is possible that we fail while trying to send a message to our
2669 * frontend (for example, because of encoding conversion failure). If
2670 * that happens it is critical that we not try to send the same message
2671 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2672 * ERRORs to FATAL, causing the client connection to be closed on error.
2673 *
2674 * We used to only skip over the offending message and try to soldier on,
2675 * but it was somewhat questionable to lose a notification and give the
2676 * client an ERROR instead. A client application is not be prepared for
2677 * that and can't tell that a notification was missed. It was also not
2678 * very useful in practice because notifications are often processed while
2679 * a connection is idle and reading a message from the client, and in that
2680 * state, any error is upgraded to FATAL anyway. Closing the connection
2681 * is a clear signal to the application that it might have missed
2682 * notifications.
2683 */
2684 {
2686 bool reachedStop;
2687
2688 ExitOnAnyError = true;
2689
2690 do
2691 {
2692 /*
2693 * Process messages up to the stop position, end of page, or an
2694 * uncommitted message.
2695 *
2696 * Our stop position is what we found to be the head's position
2697 * when we entered this function. It might have changed already.
2698 * But if it has, we will receive (or have already received and
2699 * queued) another signal and come here again.
2700 *
2701 * We are not holding NotifyQueueLock here! The queue can only
2702 * extend beyond the head pointer (see above) and we leave our
2703 * backend's pointer where it is so nobody will truncate or
2704 * rewrite pages under us. Especially we don't want to hold a lock
2705 * while sending the notifications to the frontend.
2706 */
2707 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2708 } while (!reachedStop);
2709
2710 /* Update shared state */
2715
2717 }
2718
2719 /* Done with snapshot */
2720 UnregisterSnapshot(snapshot);
2721}
2722
2723/*
2724 * Fetch notifications from the shared queue, beginning at position current,
2725 * and deliver relevant ones to my frontend.
2726 *
2727 * The function returns true once we have reached the stop position or an
2728 * uncommitted notification, and false if we have finished with the page.
2729 * In other words: once it returns true there is no need to look further.
2730 * The QueuePosition *current is advanced past all processed messages.
2731 */
2732static bool
2734 QueuePosition stop,
2735 Snapshot snapshot)
2736{
2737 int64 curpage = QUEUE_POS_PAGE(*current);
2738 int slotno;
2739 char *page_buffer;
2740 bool reachedStop = false;
2741 bool reachedEndOfPage;
2742
2743 /*
2744 * We copy the entries into a local buffer to avoid holding the SLRU lock
2745 * while we transmit them to our frontend. The local buffer must be
2746 * adequately aligned.
2747 */
2749 char *local_buf_end = local_buf;
2750
2752 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2753
2754 do
2755 {
2756 QueuePosition thisentry = *current;
2758
2759 if (QUEUE_POS_EQUAL(thisentry, stop))
2760 break;
2761
2762 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2763
2764 /*
2765 * Advance *current over this message, possibly to the next page. As
2766 * noted in the comments for asyncQueueReadAllNotifications, we must
2767 * do this before possibly failing while processing the message.
2768 */
2769 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2770
2771 /* Ignore messages destined for other databases */
2772 if (qe->dboid == MyDatabaseId)
2773 {
2774 if (XidInMVCCSnapshot(qe->xid, snapshot))
2775 {
2776 /*
2777 * The source transaction is still in progress, so we can't
2778 * process this message yet. Break out of the loop, but first
2779 * back up *current so we will reprocess the message next
2780 * time. (Note: it is unlikely but not impossible for
2781 * TransactionIdDidCommit to fail, so we can't really avoid
2782 * this advance-then-back-up behavior when dealing with an
2783 * uncommitted message.)
2784 *
2785 * Note that we must test XidInMVCCSnapshot before we test
2786 * TransactionIdDidCommit, else we might return a message from
2787 * a transaction that is not yet visible to snapshots; compare
2788 * the comments at the head of heapam_visibility.c.
2789 *
2790 * Also, while our own xact won't be listed in the snapshot,
2791 * we need not check for TransactionIdIsCurrentTransactionId
2792 * because our transaction cannot (yet) have queued any
2793 * messages.
2794 */
2795 *current = thisentry;
2796 reachedStop = true;
2797 break;
2798 }
2799
2800 /*
2801 * Quick check for the case that we're not listening on any
2802 * channels, before calling TransactionIdDidCommit(). This makes
2803 * that case a little faster, but more importantly, it ensures
2804 * that if there's a bad entry in the queue for which
2805 * TransactionIdDidCommit() fails for some reason, we can skip
2806 * over it on the first LISTEN in a session, and not get stuck on
2807 * it indefinitely. (This is a little trickier than it looks: it
2808 * works because BecomeRegisteredListener runs this code before we
2809 * have made the first entry in localChannelTable.)
2810 */
2812 continue;
2813
2814 if (TransactionIdDidCommit(qe->xid))
2815 {
2816 memcpy(local_buf_end, qe, qe->length);
2817 local_buf_end += qe->length;
2818 }
2819 else
2820 {
2821 /*
2822 * The source transaction aborted or crashed, so we just
2823 * ignore its notifications.
2824 */
2825 }
2826 }
2827
2828 /* Loop back if we're not at end of page */
2829 } while (!reachedEndOfPage);
2830
2831 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2833
2834 /*
2835 * Now that we have let go of the SLRU bank lock, send the notifications
2836 * to our backend
2837 */
2839 for (char *p = local_buf; p < local_buf_end;)
2840 {
2842
2843 /* qe->data is the null-terminated channel name */
2844 char *channel = qe->data;
2845
2846 if (IsListeningOn(channel))
2847 {
2848 /* payload follows channel name */
2849 char *payload = qe->data + strlen(channel) + 1;
2850
2851 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2852 }
2853
2854 p += qe->length;
2855 }
2856
2857 if (QUEUE_POS_EQUAL(*current, stop))
2858 reachedStop = true;
2859
2860 return reachedStop;
2861}
2862
2863/*
2864 * Advance the shared queue tail variable to the minimum of all the
2865 * per-backend tail pointers. Truncate pg_notify space if possible.
2866 *
2867 * This is (usually) called during CommitTransaction(), so it's important for
2868 * it to have very low probability of failure.
2869 */
2870static void
2872{
2873 QueuePosition min;
2876 int64 boundary;
2877
2878 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2880
2881 /*
2882 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2883 * (ie, exactly match at least one backend's queue position), so it must
2884 * be updated atomically with the actual computation. Since v13, we could
2885 * get away with not doing it like that, but it seems prudent to keep it
2886 * so.
2887 *
2888 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2889 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2890 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2891 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2892 * there are pages we can truncate but haven't yet finished doing so.
2893 *
2894 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2895 * performing SimpleLruTruncate. This is OK because no backend will try
2896 * to access the pages we are in the midst of truncating.
2897 */
2899 min = QUEUE_HEAD;
2901 {
2903 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2904 }
2905 QUEUE_TAIL = min;
2908
2909 /*
2910 * We can truncate something if the global tail advanced across an SLRU
2911 * segment boundary.
2912 *
2913 * XXX it might be better to truncate only once every several segments, to
2914 * reduce the number of directory scans.
2915 */
2918 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2919 {
2920 /*
2921 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2922 * release the lock again.
2923 */
2925
2929 }
2930
2932}
2933
2934/*
2935 * AsyncNotifyFreezeXids
2936 *
2937 * Prepare the async notification queue for CLOG truncation by freezing
2938 * transaction IDs that are about to become inaccessible.
2939 *
2940 * This function is called by VACUUM before advancing datfrozenxid. It scans
2941 * the notification queue and replaces XIDs that would become inaccessible
2942 * after CLOG truncation with special markers:
2943 * - Committed transactions are set to FrozenTransactionId
2944 * - Aborted/crashed transactions are set to InvalidTransactionId
2945 *
2946 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2947 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2948 * (or it would have held back newFrozenXid through ProcArray).
2949 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2950 * either aborted explicitly or crashed, and we can safely mark it invalid.
2951 */
2952void
2954{
2955 QueuePosition pos;
2956 QueuePosition head;
2957 int64 curpage = -1;
2958 int slotno = -1;
2959 char *page_buffer = NULL;
2960 bool page_dirty = false;
2961
2962 /*
2963 * Acquire locks in the correct order to avoid deadlocks. As per the
2964 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2965 * bank locks.
2966 *
2967 * We only need SHARED mode since we're just reading the head/tail
2968 * positions, not modifying them.
2969 */
2972
2973 pos = QUEUE_TAIL;
2974 head = QUEUE_HEAD;
2975
2976 /* Release NotifyQueueLock early, we only needed to read the positions */
2978
2979 /*
2980 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2981 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2982 * we're working.
2983 */
2984 while (!QUEUE_POS_EQUAL(pos, head))
2985 {
2987 TransactionId xid;
2988 int64 pageno = QUEUE_POS_PAGE(pos);
2989 int offset = QUEUE_POS_OFFSET(pos);
2990
2991 /* If we need a different page, release old lock and get new one */
2992 if (pageno != curpage)
2993 {
2994 LWLock *lock;
2995
2996 /* Release previous page if any */
2997 if (slotno >= 0)
2998 {
2999 if (page_dirty)
3000 {
3001 NotifyCtl->shared->page_dirty[slotno] = true;
3002 page_dirty = false;
3003 }
3005 }
3006
3007 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3009 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3010 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3011 curpage = pageno;
3012 }
3013
3014 qe = (AsyncQueueEntry *) (page_buffer + offset);
3015 xid = qe->xid;
3016
3017 if (TransactionIdIsNormal(xid) &&
3019 {
3020 if (TransactionIdDidCommit(xid))
3021 {
3022 qe->xid = FrozenTransactionId;
3023 page_dirty = true;
3024 }
3025 else
3026 {
3027 qe->xid = InvalidTransactionId;
3028 page_dirty = true;
3029 }
3030 }
3031
3032 /* Advance to next entry */
3033 asyncQueueAdvance(&pos, qe->length);
3034 }
3035
3036 /* Release final page lock if we acquired one */
3037 if (slotno >= 0)
3038 {
3039 if (page_dirty)
3040 NotifyCtl->shared->page_dirty[slotno] = true;
3042 }
3043
3045}
3046
3047/*
3048 * ProcessIncomingNotify
3049 *
3050 * Scan the queue for arriving notifications and report them to the front
3051 * end. The notifications might be from other sessions, or our own;
3052 * there's no need to distinguish here.
3053 *
3054 * If "flush" is true, force any frontend messages out immediately.
3055 *
3056 * NOTE: since we are outside any transaction, we must create our own.
3057 */
3058static void
3059ProcessIncomingNotify(bool flush)
3060{
3061 /* We *must* reset the flag */
3062 notifyInterruptPending = false;
3063
3064 /* Do nothing else if we aren't actively listening */
3066 return;
3067
3068 if (Trace_notify)
3069 elog(DEBUG1, "ProcessIncomingNotify");
3070
3071 set_ps_display("notify interrupt");
3072
3073 /*
3074 * We must run asyncQueueReadAllNotifications inside a transaction, else
3075 * bad things happen if it gets an error.
3076 */
3078
3080
3082
3083 /*
3084 * If this isn't an end-of-command case, we must flush the notify messages
3085 * to ensure frontend gets them promptly.
3086 */
3087 if (flush)
3088 pq_flush();
3089
3090 set_ps_display("idle");
3091
3092 if (Trace_notify)
3093 elog(DEBUG1, "ProcessIncomingNotify: done");
3094}
3095
3096/*
3097 * Send NOTIFY message to my front end.
3098 */
3099void
3100NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3101{
3103 {
3105
3107 pq_sendint32(&buf, srcPid);
3108 pq_sendstring(&buf, channel);
3109 pq_sendstring(&buf, payload);
3111
3112 /*
3113 * NOTE: we do not do pq_flush() here. Some level of caller will
3114 * handle it later, allowing this message to be combined into a packet
3115 * with other ones.
3116 */
3117 }
3118 else
3119 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3120}
3121
3122/* Does pendingNotifies include a match for the given event? */
3123static bool
3125{
3126 if (pendingNotifies == NULL)
3127 return false;
3128
3130 {
3131 /* Use the hash table to probe for a match */
3133 &n,
3134 HASH_FIND,
3135 NULL))
3136 return true;
3137 }
3138 else
3139 {
3140 /* Must scan the event list */
3141 ListCell *l;
3142
3143 foreach(l, pendingNotifies->events)
3144 {
3146
3147 if (n->channel_len == oldn->channel_len &&
3148 n->payload_len == oldn->payload_len &&
3149 memcmp(n->data, oldn->data,
3150 n->channel_len + n->payload_len + 2) == 0)
3151 return true;
3152 }
3153 }
3154
3155 return false;
3156}
3157
3158/*
3159 * Add a notification event to a pre-existing pendingNotifies list.
3160 *
3161 * Because pendingNotifies->events is already nonempty, this works
3162 * correctly no matter what CurrentMemoryContext is.
3163 */
3164static void
3166{
3168
3169 /* Create the hash tables if it's time to */
3172 {
3174 ListCell *l;
3175
3176 /* Create the hash table */
3177 hash_ctl.keysize = sizeof(Notification *);
3178 hash_ctl.entrysize = sizeof(struct NotificationHash);
3183 hash_create("Pending Notifies",
3184 256L,
3185 &hash_ctl,
3187
3188 /* Create the unique channel name table */
3190 hash_ctl.keysize = NAMEDATALEN;
3191 hash_ctl.entrysize = sizeof(ChannelName);
3194 hash_create("Pending Notify Channel Names",
3195 64L,
3196 &hash_ctl,
3198
3199 /* Insert all the already-existing events */
3200 foreach(l, pendingNotifies->events)
3201 {
3203 char *channel = oldn->data;
3204 bool found;
3205
3207 &oldn,
3208 HASH_ENTER,
3209 &found);
3210 Assert(!found);
3211
3212 /* Add channel name to uniqueChannelHash; might be there already */
3214 channel,
3215 HASH_ENTER,
3216 NULL);
3217 }
3218 }
3219
3220 /* Add new event to the list, in order */
3222
3223 /* Add event to the hash tables if needed */
3225 {
3226 char *channel = n->data;
3227 bool found;
3228
3230 &n,
3231 HASH_ENTER,
3232 &found);
3233 Assert(!found);
3234
3235 /* Add channel name to uniqueChannelHash; might be there already */
3237 channel,
3238 HASH_ENTER,
3239 NULL);
3240 }
3241}
3242
3243/*
3244 * notification_hash: hash function for notification hash table
3245 *
3246 * The hash "keys" are pointers to Notification structs.
3247 */
3248static uint32
3249notification_hash(const void *key, Size keysize)
3250{
3251 const Notification *k = *(const Notification *const *) key;
3252
3253 Assert(keysize == sizeof(Notification *));
3254 /* We don't bother to include the payload's trailing null in the hash */
3255 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3256 k->channel_len + k->payload_len + 1));
3257}
3258
3259/*
3260 * notification_match: match function to use with notification_hash
3261 */
3262static int
3263notification_match(const void *key1, const void *key2, Size keysize)
3264{
3265 const Notification *k1 = *(const Notification *const *) key1;
3266 const Notification *k2 = *(const Notification *const *) key2;
3267
3268 Assert(keysize == sizeof(Notification *));
3269 if (k1->channel_len == k2->channel_len &&
3270 k1->payload_len == k2->payload_len &&
3271 memcmp(k1->data, k2->data,
3272 k1->channel_len + k1->payload_len + 2) == 0)
3273 return 0; /* equal */
3274 return 1; /* not equal */
3275}
3276
3277/* Clear the pendingActions and pendingNotifies lists. */
3278static void
3280{
3281 /*
3282 * Everything's allocated in either TopTransactionContext or the context
3283 * for the subtransaction to which it corresponds. So, there's nothing to
3284 * do here except reset the pointers; the space will be reclaimed when the
3285 * contexts are deleted.
3286 */
3289 /* Also clear pendingListenActions, which is derived from pendingActions */
3291}
3292
3293/*
3294 * GUC check_hook for notify_buffers
3295 */
3296bool
3297check_notify_buffers(int *newval, void **extra, GucSource source)
3298{
3299 return check_slru_buffers("notify_buffers", newval);
3300}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static void SignalBackends(void)
Definition async.c:2269
static double asyncQueueUsage(void)
Definition async.c:2196
#define MIN_HASHABLE_NOTIFIES
Definition async.c:513
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1534
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:2010
#define QUEUE_FIRST_LISTENER
Definition async.c:351
#define QUEUE_POS_MAX(x, y)
Definition async.c:259
static bool tryAdvanceTail
Definition async.c:564
void HandleNotifyInterrupt(void)
Definition async.c:2553
static void BecomeRegisteredListener(void)
Definition async.c:1433
static void asyncQueueAdvanceTail(void)
Definition async.c:2872
int max_notify_queue_pages
Definition async.c:570
static ActionList * pendingActions
Definition async.c:444
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1724
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:357
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3250
void Async_UnlistenAll(void)
Definition async.c:1078
static int32 * signalPids
Definition async.c:560
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3101
void AtCommit_Notify(void)
Definition async.c:1381
#define QUEUE_POS_MIN(x, y)
Definition async.c:253
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1637
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2583
ListenActionKind
Definition async.c:425
@ LISTEN_LISTEN
Definition async.c:426
@ LISTEN_UNLISTEN_ALL
Definition async.c:428
@ LISTEN_UNLISTEN
Definition async.c:427
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3125
#define QUEUE_BACKEND_POS(i)
Definition async.c:355
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:673
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:378
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3264
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:660
#define SET_QUEUE_POS(x, y, z)
Definition async.c:240
static ProcNumber * signalProcnos
Definition async.c:561
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2734
static void ProcessIncomingNotify(bool flush)
Definition async.c:3060
static void asyncQueueReadAllNotifications(void)
Definition async.c:2600
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1150
#define QUEUE_POS_OFFSET(x)
Definition async.c:238
static QueuePosition queueHeadAfterWrite
Definition async.c:553
static int asyncQueueErrdetailForIoError(const void *opaque_data)
Definition async.c:615
bool Trace_notify
Definition async.c:567
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2046
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3280
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1098
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:863
static NotificationList * pendingNotifies
Definition async.c:520
#define AsyncQueueEntryEmptySize
Definition async.c:226
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3166
static AsyncQueueControl * asyncQueueControl
Definition async.c:346
static bool unlistenExitRegistered
Definition async.c:541
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:638
static dsa_area * globalChannelDSA
Definition async.c:401
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1977
#define QUEUE_TAIL
Definition async.c:349
void AtAbort_Notify(void)
Definition async.c:2421
#define QUEUE_POS_PAGE(x)
Definition async.c:237
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1697
void PreCommit_Notify(void)
Definition async.c:1188
#define QUEUE_CLEANUP_DELAY
Definition async.c:281
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1667
static void asyncQueueFillWarning(void)
Definition async.c:2217
#define QUEUE_BACKEND_PID(i)
Definition async.c:352
static void CleanupListenersOnExit(void)
Definition async.c:1849
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:648
Size AsyncShmemSize(void)
Definition async.c:786
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:367
void Async_Unlisten(const char *channel)
Definition async.c:1060
static HTAB * pendingListenActions
Definition async.c:466
void Async_Listen(const char *channel)
Definition async.c:1046
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:200
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:249
static void initGlobalChannelTable(void)
Definition async.c:687
#define NotifyCtl
Definition async.c:364
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:356
static HTAB * localChannelTable
Definition async.c:408
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:628
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:999
#define QUEUEALIGN(len)
Definition async.c:224
static bool amRegisteredListener
Definition async.c:544
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:265
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:354
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:353
void AtSubAbort_Notify(void)
Definition async.c:2510
void AtPrepare_Notify(void)
Definition async.c:1163
#define QUEUE_PAGESIZE
Definition async.c:365
void AtSubCommit_Notify(void)
Definition async.c:2440
static bool asyncQueueIsFull(void)
Definition async.c:1962
#define QUEUE_HEAD
Definition async.c:348
void AsyncShmemInit(void)
Definition async.c:803
static void initLocalChannelTable(void)
Definition async.c:738
PendingListenAction
Definition async.c:455
@ PENDING_UNLISTEN
Definition async.c:457
@ PENDING_LISTEN
Definition async.c:456
static dshash_table * globalChannelTable
Definition async.c:400
static void asyncQueueUnregister(void)
Definition async.c:1919
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2171
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:246
#define LocalChannelTableIsEmpty()
Definition async.c:411
static void initPendingListenActions(void)
Definition async.c:764
static QueuePosition queueHeadBeforeWrite
Definition async.c:552
static bool IsListeningOn(const char *channel)
Definition async.c:1906
void Async_Notify(const char *channel, const char *payload)
Definition async.c:897
volatile sig_atomic_t notifyInterruptPending
Definition async.c:538
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2954
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3298
#define QUEUE_STOP_PAGE
Definition async.c:350
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1772
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:552
int32_t int32
Definition c.h:614
uint16_t uint16
Definition c.h:617
uint32_t uint32
Definition c.h:618
uint32 TransactionId
Definition c.h:738
size_t Size
Definition c.h:691
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:611
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:562
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:579
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:659
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:394
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:371
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:274
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:768
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:678
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:210
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:593
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:778
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
uint32 dshash_hash
Definition dshash.h:30
#define dshash_find_or_insert(hash_table, key, found)
Definition dshash.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:28
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define INFO
Definition elog.h:34
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:47
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
bool ExitOnAnyError
Definition globals.c:123
int notify_buffers
Definition globals.c:164
struct Latch * MyLatch
Definition globals.c:63
Oid MyDatabaseId
Definition globals.c:94
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:96
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_COMPARE
Definition hsearch.h:99
#define HASH_FUNCTION
Definition hsearch.h:98
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
#define InvalidPid
Definition miscadmin.h:32
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:343
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:94
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:287
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:254
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:533
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1824
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1777
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:380
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1441
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, const void *opaque_data)
Definition slru.c:637
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:200
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:360
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:171
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:440
int nestingLevel
Definition async.c:439
struct ActionList * upper
Definition async.c:441
dshash_table_handle globalChannelTableDSH
Definition async.c:341
TimestampTz lastQueueFillWarn
Definition async.c:339
dsa_handle globalChannelTableDSA
Definition async.c:340
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:220
char channel[NAMEDATALEN]
Definition async.c:528
dsa_pointer listenersArray
Definition async.c:395
int allocatedListeners
Definition async.c:397
GlobalChannelKey key
Definition async.c:394
char channel[NAMEDATALEN]
Definition async.c:383
Size keysize
Definition hsearch.h:75
Definition pg_list.h:54
bool listening
Definition async.c:389
Notification * event
Definition async.c:517
List * uniqueChannelNames
Definition async.c:508
HTAB * uniqueChannelHash
Definition async.c:509
HTAB * hashtab
Definition async.c:507
List * events
Definition async.c:506
struct NotificationList * upper
Definition async.c:510
uint16 payload_len
Definition async.c:498
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:500
uint16 channel_len
Definition async.c:497
PendingListenAction action
Definition async.c:463
char channel[NAMEDATALEN]
Definition async.c:462
int64 page
Definition async.c:233
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:446
char * text_to_cstring(const text *t)
Definition varlena.c:217
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5012
int GetCurrentTransactionNestLevel(void)
Definition xact.c:931
void StartTransactionCommand(void)
Definition xact.c:3081
void CommitTransactionCommand(void)
Definition xact.c:3179
TransactionId GetCurrentTransactionId(void)
Definition xact.c:456

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 513 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 200 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 364 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 353 of file async.c.

◆ QUEUE_BACKEND_IS_ADVANCING

#define QUEUE_BACKEND_IS_ADVANCING (   i)    (asyncQueueControl->backend[i].isAdvancing)

Definition at line 357 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 352 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 355 of file async.c.

◆ QUEUE_BACKEND_WAKEUP_PENDING

#define QUEUE_BACKEND_WAKEUP_PENDING (   i)    (asyncQueueControl->backend[i].wakeupPending)

Definition at line 356 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 281 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 351 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 367 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 348 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 354 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 365 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 246 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 249 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
int y
Definition isn.c:76
int x
Definition isn.c:75

Definition at line 259 of file async.c.

260 : \
261 (x).page != (y).page ? (x) : \
262 (x).offset > (y).offset ? (x) : (y))

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 253 of file async.c.

254 : \
255 (x).page != (y).page ? (y) : \
256 (x).offset < (y).offset ? (x) : (y))

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 238 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 237 of file async.c.

◆ QUEUE_POS_PRECEDES

#define QUEUE_POS_PRECEDES (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) || \
((x).page == (y).page && (x).offset < (y).offset))

Definition at line 265 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 350 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 349 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 224 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 240 of file async.c.

241 { \
242 (x).page = (y); \
243 (x).offset = (z); \
244 } while (0)

Typedef Documentation

◆ ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ ChannelName

◆ GlobalChannelEntry

◆ GlobalChannelKey

◆ ListenerEntry

◆ Notification

◆ NotificationList

◆ PendingListenEntry

◆ QueueBackendStatus

◆ QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 424 of file async.c.

◆ PendingListenAction

Enumerator
PENDING_LISTEN 
PENDING_UNLISTEN 

Definition at line 454 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 3166 of file async.c.

3167{
3169
3170 /* Create the hash tables if it's time to */
3173 {
3175 ListCell *l;
3176
3177 /* Create the hash table */
3178 hash_ctl.keysize = sizeof(Notification *);
3179 hash_ctl.entrysize = sizeof(struct NotificationHash);
3184 hash_create("Pending Notifies",
3185 256L,
3186 &hash_ctl,
3188
3189 /* Create the unique channel name table */
3191 hash_ctl.keysize = NAMEDATALEN;
3192 hash_ctl.entrysize = sizeof(ChannelName);
3195 hash_create("Pending Notify Channel Names",
3196 64L,
3197 &hash_ctl,
3199
3200 /* Insert all the already-existing events */
3201 foreach(l, pendingNotifies->events)
3202 {
3204 char *channel = oldn->data;
3205 bool found;
3206
3208 &oldn,
3209 HASH_ENTER,
3210 &found);
3211 Assert(!found);
3212
3213 /* Add channel name to uniqueChannelHash; might be there already */
3215 channel,
3216 HASH_ENTER,
3217 NULL);
3218 }
3219 }
3220
3221 /* Add new event to the list, in order */
3223
3224 /* Add event to the hash tables if needed */
3226 {
3227 char *channel = n->data;
3228 bool found;
3229
3231 &n,
3232 HASH_ENTER,
3233 &found);
3234 Assert(!found);
3235
3236 /* Add channel name to uniqueChannelHash; might be there already */
3238 channel,
3239 HASH_ENTER,
3240 NULL);
3241 }
3242}

References Assert, CurTransactionContext, Notification::data, NotificationList::events, fb(), HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), HASH_STRINGS, NotificationList::hashtab, lappend(), lfirst, list_length(), MIN_HASHABLE_NOTIFIES, NAMEDATALEN, NIL, notification_hash(), notification_match(), pendingNotifies, and NotificationList::uniqueChannelHash.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ ApplyPendingListenActions()

static void ApplyPendingListenActions ( bool  isCommit)
static

Definition at line 1724 of file async.c.

1725{
1727 PendingListenEntry *pending;
1728
1729 /* Quick exit if nothing to do */
1731 return;
1732
1733 /* We made a globalChannelTable before building pendingListenActions */
1734 if (globalChannelTable == NULL)
1735 elog(PANIC, "global channel table missing post-commit/abort");
1736
1737 /* For each staged action ... */
1739 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1740 {
1742 GlobalChannelEntry *entry;
1743 bool removeLocal = true;
1744 bool foundListener = false;
1745
1746 /*
1747 * Find the global entry for this channel. If isCommit, it had better
1748 * exist (it was created in PreCommit). In an abort, it might not
1749 * exist, in which case we are not listening and should discard any
1750 * local entry that PreCommit may have managed to create.
1751 */
1752 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1753 entry = dshash_find(globalChannelTable, &key, true);
1754 if (entry != NULL)
1755 {
1756 /* Scan entry to find the ListenerEntry for this backend */
1758
1761
1762 for (int i = 0; i < entry->numListeners; i++)
1763 {
1764 if (listeners[i].procNo != MyProcNumber)
1765 continue;
1766 foundListener = true;
1767 if (isCommit)
1768 {
1769 if (pending->action == PENDING_LISTEN)
1770 {
1771 /*
1772 * LISTEN being committed: set listening=true.
1773 * localChannelTable entry was created during
1774 * PreCommit and should be kept.
1775 */
1776 listeners[i].listening = true;
1777 removeLocal = false;
1778 }
1779 else
1780 {
1781 /*
1782 * UNLISTEN being committed: remove pre-allocated
1783 * entries from both tables.
1784 */
1786 }
1787 }
1788 else
1789 {
1790 /*
1791 * Note: this part is reachable only if the transaction
1792 * aborts after PreCommit_Notify() has made some
1793 * pendingListenActions entries, so it's pretty hard to
1794 * test.
1795 */
1796 if (!listeners[i].listening)
1797 {
1798 /*
1799 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1800 * and we weren't listening before, so remove
1801 * pre-allocated entries from both tables.
1802 */
1804 }
1805 else
1806 {
1807 /*
1808 * We're aborting, but the previous state was that
1809 * we're listening, so keep localChannelTable entry.
1810 */
1811 removeLocal = false;
1812 }
1813 }
1814 break; /* there shouldn't be another match */
1815 }
1816
1817 /* We might have already released the entry by removing it */
1818 if (entry != NULL)
1820 }
1821
1822 /*
1823 * If we're committing a LISTEN action, we should have found a
1824 * matching ListenerEntry, but otherwise it's okay if we didn't.
1825 */
1826 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1827 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1828 pending->channel, MyProcNumber);
1829
1830 /*
1831 * If we did not find a globalChannelTable entry for our backend, or
1832 * if we are unlistening, remove any localChannelTable entry that may
1833 * exist. (Note in particular that this cleans up if we created a
1834 * localChannelTable entry and then failed while trying to create a
1835 * globalChannelTable entry.)
1836 */
1839 HASH_REMOVE, NULL);
1840 }
1841}

References PendingListenEntry::action, PendingListenEntry::channel, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_REMOVE, hash_search(), hash_seq_init(), hash_seq_search(), i, GlobalChannelEntry::listenersArray, ListenerEntry::listening, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PANIC, PENDING_LISTEN, pendingListenActions, and RemoveListenerFromChannel().

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char channel)

Definition at line 1046 of file async.c.

1047{
1048 if (Trace_notify)
1049 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1050
1051 queue_listen(LISTEN_LISTEN, channel);
1052}

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char channel,
const char payload 
)

Definition at line 897 of file async.c.

898{
899 int my_level = GetCurrentTransactionNestLevel();
900 size_t channel_len;
901 size_t payload_len;
902 Notification *n;
903 MemoryContext oldcontext;
904
905 if (IsParallelWorker())
906 elog(ERROR, "cannot send notifications from a parallel worker");
907
908 if (Trace_notify)
909 elog(DEBUG1, "Async_Notify(%s)", channel);
910
911 channel_len = channel ? strlen(channel) : 0;
912 payload_len = payload ? strlen(payload) : 0;
913
914 /* a channel name must be specified */
915 if (channel_len == 0)
918 errmsg("channel name cannot be empty")));
919
920 /* enforce length limits */
921 if (channel_len >= NAMEDATALEN)
924 errmsg("channel name too long")));
925
926 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
929 errmsg("payload string too long")));
930
931 /*
932 * We must construct the Notification entry, even if we end up not using
933 * it, in order to compare it cheaply to existing list entries.
934 *
935 * The notification list needs to live until end of transaction, so store
936 * it in the transaction context.
937 */
939
941 channel_len + payload_len + 2);
942 n->channel_len = channel_len;
943 n->payload_len = payload_len;
944 strcpy(n->data, channel);
945 if (payload)
946 strcpy(n->data + channel_len + 1, payload);
947 else
948 n->data[channel_len + 1] = '\0';
949
950 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
951 {
953
954 /*
955 * First notify event in current (sub)xact. Note that we allocate the
956 * NotificationList in TopTransactionContext; the nestingLevel might
957 * get changed later by AtSubCommit_Notify.
958 */
961 sizeof(NotificationList));
962 notifies->nestingLevel = my_level;
963 notifies->events = list_make1(n);
964 /* We certainly don't need a hashtable yet */
965 notifies->hashtab = NULL;
966 /* We won't build uniqueChannelNames/Hash till later, either */
967 notifies->uniqueChannelNames = NIL;
968 notifies->uniqueChannelHash = NULL;
969 notifies->upper = pendingNotifies;
971 }
972 else
973 {
974 /* Now check for duplicates */
976 {
977 /* It's a dup, so forget it */
978 pfree(n);
979 MemoryContextSwitchTo(oldcontext);
980 return;
981 }
982
983 /* Append more events to existing list */
985 }
986
987 MemoryContextSwitchTo(oldcontext);
988}

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg, ERROR, fb(), GetCurrentTransactionNestLevel(), IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NIL, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char channel)

Definition at line 1060 of file async.c.

1061{
1062 if (Trace_notify)
1063 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1064
1065 /* If we couldn't possibly be listening, no need to queue anything */
1067 return;
1068
1069 queue_listen(LISTEN_UNLISTEN, channel);
1070}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 1078 of file async.c.

1079{
1080 if (Trace_notify)
1081 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1082
1083 /* If we couldn't possibly be listening, no need to queue anything */
1085 return;
1086
1088}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 1150 of file async.c.

1151{
1154}

References asyncQueueUnregister(), and CleanupListenersOnExit().

Referenced by BecomeRegisteredListener().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 3125 of file async.c.

3126{
3127 if (pendingNotifies == NULL)
3128 return false;
3129
3131 {
3132 /* Use the hash table to probe for a match */
3134 &n,
3135 HASH_FIND,
3136 NULL))
3137 return true;
3138 }
3139 else
3140 {
3141 /* Must scan the event list */
3142 ListCell *l;
3143
3144 foreach(l, pendingNotifies->events)
3145 {
3147
3148 if (n->channel_len == oldn->channel_len &&
3149 n->payload_len == oldn->payload_len &&
3150 memcmp(n->data, oldn->data,
3151 n->channel_len + n->payload_len + 2) == 0)
3152 return true;
3153 }
3154 }
3155
3156 return false;
3157}

References Notification::channel_len, Notification::data, NotificationList::events, fb(), HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ AsyncNotifyFreezeXids()

void AsyncNotifyFreezeXids ( TransactionId  newFrozenXid)

Definition at line 2954 of file async.c.

2955{
2956 QueuePosition pos;
2957 QueuePosition head;
2958 int64 curpage = -1;
2959 int slotno = -1;
2960 char *page_buffer = NULL;
2961 bool page_dirty = false;
2962
2963 /*
2964 * Acquire locks in the correct order to avoid deadlocks. As per the
2965 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2966 * bank locks.
2967 *
2968 * We only need SHARED mode since we're just reading the head/tail
2969 * positions, not modifying them.
2970 */
2973
2974 pos = QUEUE_TAIL;
2975 head = QUEUE_HEAD;
2976
2977 /* Release NotifyQueueLock early, we only needed to read the positions */
2979
2980 /*
2981 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2982 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2983 * we're working.
2984 */
2985 while (!QUEUE_POS_EQUAL(pos, head))
2986 {
2988 TransactionId xid;
2989 int64 pageno = QUEUE_POS_PAGE(pos);
2990 int offset = QUEUE_POS_OFFSET(pos);
2991
2992 /* If we need a different page, release old lock and get new one */
2993 if (pageno != curpage)
2994 {
2995 LWLock *lock;
2996
2997 /* Release previous page if any */
2998 if (slotno >= 0)
2999 {
3000 if (page_dirty)
3001 {
3002 NotifyCtl->shared->page_dirty[slotno] = true;
3003 page_dirty = false;
3004 }
3006 }
3007
3008 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3010 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3011 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3012 curpage = pageno;
3013 }
3014
3015 qe = (AsyncQueueEntry *) (page_buffer + offset);
3016 xid = qe->xid;
3017
3018 if (TransactionIdIsNormal(xid) &&
3020 {
3021 if (TransactionIdDidCommit(xid))
3022 {
3023 qe->xid = FrozenTransactionId;
3024 page_dirty = true;
3025 }
3026 else
3027 {
3028 qe->xid = InvalidTransactionId;
3029 page_dirty = true;
3030 }
3031 }
3032
3033 /* Advance to next entry */
3034 asyncQueueAdvance(&pos, qe->length);
3035 }
3036
3037 /* Release final page lock if we acquired one */
3038 if (slotno >= 0)
3039 {
3040 if (page_dirty)
3041 NotifyCtl->shared->page_dirty[slotno] = true;
3043 }
3044
3046}

References asyncQueueAdvance(), fb(), FrozenTransactionId, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruGetBankLock(), SimpleLruReadPage(), TransactionIdDidCommit(), TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by vac_truncate_clog().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 2046 of file async.c.

2047{
2050 int64 pageno;
2051 int offset;
2052 int slotno;
2054
2055 /*
2056 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2057 * memory upon exiting. The reason for this is that if we have to advance
2058 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2059 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2060 * subsequent insertions would try to put entries into a page that slru.c
2061 * thinks doesn't exist yet.) So, use a local position variable. Note
2062 * that if we do fail, any already-inserted queue entries are forgotten;
2063 * this is okay, since they'd be useless anyway after our transaction
2064 * rolls back.
2065 */
2067
2068 /*
2069 * If this is the first write since the postmaster started, we need to
2070 * initialize the first page of the async SLRU. Otherwise, the current
2071 * page should be initialized already, so just fetch it.
2072 */
2073 pageno = QUEUE_POS_PAGE(queue_head);
2075
2076 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2078
2081 else
2082 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2083
2084 /* Note we mark the page dirty before writing in it */
2085 NotifyCtl->shared->page_dirty[slotno] = true;
2086
2087 while (nextNotify != NULL)
2088 {
2090
2091 /* Construct a valid queue entry in local variable qe */
2093
2094 offset = QUEUE_POS_OFFSET(queue_head);
2095
2096 /* Check whether the entry really fits on the current page */
2097 if (offset + qe.length <= QUEUE_PAGESIZE)
2098 {
2099 /* OK, so advance nextNotify past this item */
2101 }
2102 else
2103 {
2104 /*
2105 * Write a dummy entry to fill up the page. Actually readers will
2106 * only check dboid and since it won't match any reader's database
2107 * OID, they will ignore this entry and move on.
2108 */
2109 qe.length = QUEUE_PAGESIZE - offset;
2110 qe.dboid = InvalidOid;
2112 qe.data[0] = '\0'; /* empty channel */
2113 qe.data[1] = '\0'; /* empty payload */
2114 }
2115
2116 /* Now copy qe into the shared buffer page */
2117 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2118 &qe,
2119 qe.length);
2120
2121 /* Advance queue_head appropriately, and detect if page is full */
2122 if (asyncQueueAdvance(&(queue_head), qe.length))
2123 {
2124 LWLock *lock;
2125
2126 pageno = QUEUE_POS_PAGE(queue_head);
2127 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2128 if (lock != prevlock)
2129 {
2132 prevlock = lock;
2133 }
2134
2135 /*
2136 * Page is full, so we're done here, but first fill the next page
2137 * with zeroes. The reason to do this is to ensure that slru.c's
2138 * idea of the head page is always the same as ours, which avoids
2139 * boundary problems in SimpleLruTruncate. The test in
2140 * asyncQueueIsFull() ensured that there is room to create this
2141 * page without overrunning the queue.
2142 */
2144
2145 /*
2146 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2147 * set flag to remember that we should try to advance the tail
2148 * pointer (we don't want to actually do that right here).
2149 */
2151 tryAdvanceTail = true;
2152
2153 /* And exit the loop */
2154 break;
2155 }
2156 }
2157
2158 /* Success, so update the global QUEUE_HEAD */
2160
2162
2163 return nextNotify;
2164}

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), NotificationList::events, fb(), InvalidOid, InvalidTransactionId, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1977 of file async.c.

1978{
1979 int64 pageno = QUEUE_POS_PAGE(*position);
1980 int offset = QUEUE_POS_OFFSET(*position);
1981 bool pageJump = false;
1982
1983 /*
1984 * Move to the next writing position: First jump over what we have just
1985 * written or read.
1986 */
1987 offset += entryLength;
1988 Assert(offset <= QUEUE_PAGESIZE);
1989
1990 /*
1991 * In a second step check if another entry can possibly be written to the
1992 * page. If so, stay here, we have reached the next position. If not, then
1993 * we need to move on to the next page.
1994 */
1996 {
1997 pageno++;
1998 offset = 0;
1999 pageJump = true;
2000 }
2001
2002 SET_QUEUE_POS(*position, pageno, offset);
2003 return pageJump;
2004}

References Assert, AsyncQueueEntryEmptySize, fb(), QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by AsyncNotifyFreezeXids(), asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2872 of file async.c.

2873{
2874 QueuePosition min;
2877 int64 boundary;
2878
2879 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2881
2882 /*
2883 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2884 * (ie, exactly match at least one backend's queue position), so it must
2885 * be updated atomically with the actual computation. Since v13, we could
2886 * get away with not doing it like that, but it seems prudent to keep it
2887 * so.
2888 *
2889 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2890 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2891 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2892 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2893 * there are pages we can truncate but haven't yet finished doing so.
2894 *
2895 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2896 * performing SimpleLruTruncate. This is OK because no backend will try
2897 * to access the pages we are in the midst of truncating.
2898 */
2900 min = QUEUE_HEAD;
2902 {
2904 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2905 }
2906 QUEUE_TAIL = min;
2909
2910 /*
2911 * We can truncate something if the global tail advanced across an SLRU
2912 * segment boundary.
2913 *
2914 * XXX it might be better to truncate only once every several segments, to
2915 * reduce the number of directory scans.
2916 */
2919 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2920 {
2921 /*
2922 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2923 * release the lock again.
2924 */
2926
2930 }
2931
2933}

References Assert, asyncQueuePagePrecedes(), fb(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueErrdetailForIoError()

static int asyncQueueErrdetailForIoError ( const void opaque_data)
static

Definition at line 615 of file async.c.

616{
617 const QueuePosition *position = opaque_data;
618
619 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
620 position->page, position->offset);
621}

References errdetail(), fb(), QueuePosition::offset, and QueuePosition::page.

Referenced by AsyncShmemInit().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 2217 of file async.c.

2218{
2219 double fillDegree;
2220 TimestampTz t;
2221
2223 if (fillDegree < 0.5)
2224 return;
2225
2226 t = GetCurrentTimestamp();
2227
2230 {
2233
2235 {
2237 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2240 }
2241
2243 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2244 (minPid != InvalidPid ?
2245 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2246 : 0),
2247 (minPid != InvalidPid ?
2248 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2249 : 0)));
2250
2252 }
2253}

References Assert, asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg, fb(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1962 of file async.c.

1963{
1964 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1966 int64 occupied = headPage - tailPage;
1967
1969}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 2010 of file async.c.

2011{
2012 size_t channellen = n->channel_len;
2013 size_t payloadlen = n->payload_len;
2014 int entryLength;
2015
2018
2019 /* The terminators are already included in AsyncQueueEntryEmptySize */
2022 qe->length = entryLength;
2023 qe->dboid = MyDatabaseId;
2024 qe->xid = GetCurrentTransactionId();
2025 qe->srcPid = MyProcPid;
2026 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2027}

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, fb(), GetCurrentTransactionId(), MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, and QUEUEALIGN.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 628 of file async.c.

629{
630 return p - q;
631}

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 638 of file async.c.

639{
640 return p < q;
641}

Referenced by asyncQueueAdvanceTail(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( QueuePosition current,
QueuePosition  stop,
Snapshot  snapshot 
)
static

Definition at line 2734 of file async.c.

2737{
2738 int64 curpage = QUEUE_POS_PAGE(*current);
2739 int slotno;
2740 char *page_buffer;
2741 bool reachedStop = false;
2742 bool reachedEndOfPage;
2743
2744 /*
2745 * We copy the entries into a local buffer to avoid holding the SLRU lock
2746 * while we transmit them to our frontend. The local buffer must be
2747 * adequately aligned.
2748 */
2750 char *local_buf_end = local_buf;
2751
2753 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2754
2755 do
2756 {
2757 QueuePosition thisentry = *current;
2759
2760 if (QUEUE_POS_EQUAL(thisentry, stop))
2761 break;
2762
2763 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2764
2765 /*
2766 * Advance *current over this message, possibly to the next page. As
2767 * noted in the comments for asyncQueueReadAllNotifications, we must
2768 * do this before possibly failing while processing the message.
2769 */
2770 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2771
2772 /* Ignore messages destined for other databases */
2773 if (qe->dboid == MyDatabaseId)
2774 {
2775 if (XidInMVCCSnapshot(qe->xid, snapshot))
2776 {
2777 /*
2778 * The source transaction is still in progress, so we can't
2779 * process this message yet. Break out of the loop, but first
2780 * back up *current so we will reprocess the message next
2781 * time. (Note: it is unlikely but not impossible for
2782 * TransactionIdDidCommit to fail, so we can't really avoid
2783 * this advance-then-back-up behavior when dealing with an
2784 * uncommitted message.)
2785 *
2786 * Note that we must test XidInMVCCSnapshot before we test
2787 * TransactionIdDidCommit, else we might return a message from
2788 * a transaction that is not yet visible to snapshots; compare
2789 * the comments at the head of heapam_visibility.c.
2790 *
2791 * Also, while our own xact won't be listed in the snapshot,
2792 * we need not check for TransactionIdIsCurrentTransactionId
2793 * because our transaction cannot (yet) have queued any
2794 * messages.
2795 */
2796 *current = thisentry;
2797 reachedStop = true;
2798 break;
2799 }
2800
2801 /*
2802 * Quick check for the case that we're not listening on any
2803 * channels, before calling TransactionIdDidCommit(). This makes
2804 * that case a little faster, but more importantly, it ensures
2805 * that if there's a bad entry in the queue for which
2806 * TransactionIdDidCommit() fails for some reason, we can skip
2807 * over it on the first LISTEN in a session, and not get stuck on
2808 * it indefinitely. (This is a little trickier than it looks: it
2809 * works because BecomeRegisteredListener runs this code before we
2810 * have made the first entry in localChannelTable.)
2811 */
2813 continue;
2814
2815 if (TransactionIdDidCommit(qe->xid))
2816 {
2817 memcpy(local_buf_end, qe, qe->length);
2818 local_buf_end += qe->length;
2819 }
2820 else
2821 {
2822 /*
2823 * The source transaction aborted or crashed, so we just
2824 * ignore its notifications.
2825 */
2826 }
2827 }
2828
2829 /* Loop back if we're not at end of page */
2830 } while (!reachedEndOfPage);
2831
2832 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2834
2835 /*
2836 * Now that we have let go of the SLRU bank lock, send the notifications
2837 * to our backend
2838 */
2840 for (char *p = local_buf; p < local_buf_end;)
2841 {
2843
2844 /* qe->data is the null-terminated channel name */
2845 char *channel = qe->data;
2846
2847 if (IsListeningOn(channel))
2848 {
2849 /* payload follows channel name */
2850 char *payload = qe->data + strlen(channel) + 1;
2851
2852 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2853 }
2854
2855 p += qe->length;
2856 }
2857
2858 if (QUEUE_POS_EQUAL(*current, stop))
2859 reachedStop = true;
2860
2861 return reachedStop;
2862}

References Assert, asyncQueueAdvance(), AsyncQueueEntry::data, fb(), IsListeningOn(), LocalChannelTableIsEmpty, LWLockRelease(), MyDatabaseId, NotifyCtl, NotifyMyFrontEnd(), QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), TransactionIdDidCommit(), and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 2600 of file async.c.

2601{
2602 QueuePosition pos;
2603 QueuePosition head;
2604 Snapshot snapshot;
2605
2606 /*
2607 * Fetch current state, indicate to others that we have woken up, and that
2608 * we are in process of advancing our position.
2609 */
2611 /* Assert checks that we have a valid state entry */
2615 head = QUEUE_HEAD;
2616
2617 if (QUEUE_POS_EQUAL(pos, head))
2618 {
2619 /* Nothing to do, we have read all notifications already. */
2621 return;
2622 }
2623
2626
2627 /*----------
2628 * Get snapshot we'll use to decide which xacts are still in progress.
2629 * This is trickier than it might seem, because of race conditions.
2630 * Consider the following example:
2631 *
2632 * Backend 1: Backend 2:
2633 *
2634 * transaction starts
2635 * UPDATE foo SET ...;
2636 * NOTIFY foo;
2637 * commit starts
2638 * queue the notify message
2639 * transaction starts
2640 * LISTEN foo; -- first LISTEN in session
2641 * SELECT * FROM foo WHERE ...;
2642 * commit to clog
2643 * commit starts
2644 * add backend 2 to array of listeners
2645 * advance to queue head (this code)
2646 * commit to clog
2647 *
2648 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2649 * wasn't committed yet. Ideally we'd ensure that client 2 would
2650 * eventually get transaction 1's notify message, but there's no way
2651 * to do that; until we're in the listener array, there's no guarantee
2652 * that the notify message doesn't get removed from the queue.
2653 *
2654 * Therefore the coding technique transaction 2 is using is unsafe:
2655 * applications must commit a LISTEN before inspecting database state,
2656 * if they want to ensure they will see notifications about subsequent
2657 * changes to that state.
2658 *
2659 * What we do guarantee is that we'll see all notifications from
2660 * transactions committing after the snapshot we take here.
2661 * BecomeRegisteredListener has already added us to the listener array,
2662 * so no not-yet-committed messages can be removed from the queue
2663 * before we see them.
2664 *----------
2665 */
2666 snapshot = RegisterSnapshot(GetLatestSnapshot());
2667
2668 /*
2669 * It is possible that we fail while trying to send a message to our
2670 * frontend (for example, because of encoding conversion failure). If
2671 * that happens it is critical that we not try to send the same message
2672 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2673 * ERRORs to FATAL, causing the client connection to be closed on error.
2674 *
2675 * We used to only skip over the offending message and try to soldier on,
2676 * but it was somewhat questionable to lose a notification and give the
2677 * client an ERROR instead. A client application is not be prepared for
2678 * that and can't tell that a notification was missed. It was also not
2679 * very useful in practice because notifications are often processed while
2680 * a connection is idle and reading a message from the client, and in that
2681 * state, any error is upgraded to FATAL anyway. Closing the connection
2682 * is a clear signal to the application that it might have missed
2683 * notifications.
2684 */
2685 {
2687 bool reachedStop;
2688
2689 ExitOnAnyError = true;
2690
2691 do
2692 {
2693 /*
2694 * Process messages up to the stop position, end of page, or an
2695 * uncommitted message.
2696 *
2697 * Our stop position is what we found to be the head's position
2698 * when we entered this function. It might have changed already.
2699 * But if it has, we will receive (or have already received and
2700 * queued) another signal and come here again.
2701 *
2702 * We are not holding NotifyQueueLock here! The queue can only
2703 * extend beyond the head pointer (see above) and we leave our
2704 * backend's pointer where it is so nobody will truncate or
2705 * rewrite pages under us. Especially we don't want to hold a lock
2706 * while sending the notifications to the frontend.
2707 */
2708 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2709 } while (!reachedStop);
2710
2711 /* Update shared state */
2716
2718 }
2719
2720 /* Done with snapshot */
2721 UnregisterSnapshot(snapshot);
2722}

References Assert, asyncQueueProcessPageEntries(), ExitOnAnyError, fb(), GetLatestSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_HEAD, QUEUE_POS_EQUAL, RegisterSnapshot(), and UnregisterSnapshot().

Referenced by BecomeRegisteredListener(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1919 of file async.c.

1920{
1921 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1922
1923 if (!amRegisteredListener) /* nothing to do */
1924 return;
1925
1926 /*
1927 * Need exclusive lock here to manipulate list links.
1928 */
1930 /* Mark our entry as invalid */
1935 /* and remove it from the list */
1938 else
1939 {
1941 {
1943 {
1945 break;
1946 }
1947 }
1948 }
1951
1952 /* mark ourselves as no longer listed in the global array */
1953 amRegisteredListener = false;
1954}

References amRegisteredListener, Assert, fb(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, LocalChannelTableIsEmpty, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 2196 of file async.c.

2197{
2198 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2200 int64 occupied = headPage - tailPage;
2201
2202 if (occupied == 0)
2203 return (double) 0; /* fast exit for common case */
2204
2205 return (double) occupied / (double) max_notify_queue_pages;
2206}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 803 of file async.c.

804{
805 bool found;
806 Size size;
807
808 /*
809 * Create or attach to the AsyncQueueControl structure.
810 */
811 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
812 size = add_size(size, offsetof(AsyncQueueControl, backend));
813
815 ShmemInitStruct("Async Queue Control", size, &found);
816
817 if (!found)
818 {
819 /* First time through, so initialize it */
822 QUEUE_STOP_PAGE = 0;
827 for (int i = 0; i < MaxBackends; i++)
828 {
835 }
836 }
837
838 /*
839 * Set up SLRU management of the pg_notify data. Note that long segment
840 * names are used in order to avoid wraparound.
841 */
842 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
843 NotifyCtl->errdetail_for_io_error = asyncQueueErrdetailForIoError;
846 SYNC_HANDLER_NONE, true);
847
848 if (!found)
849 {
850 /*
851 * During start or reboot, clean out the pg_notify directory.
852 */
854 }
855}

References add_size(), asyncQueueControl, asyncQueueErrdetailForIoError(), asyncQueuePagePrecedes(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, fb(), AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 786 of file async.c.

787{
788 Size size;
789
790 /* This had better match AsyncShmemInit */
791 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
792 size = add_size(size, offsetof(AsyncQueueControl, backend));
793
795
796 return size;
797}

References add_size(), fb(), MaxBackends, mul_size(), notify_buffers, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 2421 of file async.c.

2422{
2423 /* Revert staged listen/unlisten changes */
2425
2426 /* If we're no longer listening on anything, unregister */
2429
2430 /* And clean up */
2432}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and LocalChannelTableIsEmpty.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1381 of file async.c.

1382{
1383 /*
1384 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1385 * return as soon as possible
1386 */
1388 return;
1389
1390 if (Trace_notify)
1391 elog(DEBUG1, "AtCommit_Notify");
1392
1393 /* Apply staged listen/unlisten changes */
1395
1396 /* If no longer listening to anything, get out of listener array */
1399
1400 /*
1401 * Send signals to listening backends. We need do this only if there are
1402 * pending notifies, which were previously added to the shared queue by
1403 * PreCommit_Notify().
1404 */
1405 if (pendingNotifies != NULL)
1407
1408 /*
1409 * If it's time to try to advance the global tail pointer, do that.
1410 *
1411 * (It might seem odd to do this in the sender, when more than likely the
1412 * listeners won't yet have read the messages we just sent. However,
1413 * there's less contention if only the sender does it, and there is little
1414 * need for urgency in advancing the global tail. So this typically will
1415 * be clearing out messages that were sent some time ago.)
1416 */
1417 if (tryAdvanceTail)
1418 {
1419 tryAdvanceTail = false;
1421 }
1422
1423 /* And clean up */
1425}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueAdvanceTail(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), DEBUG1, elog, fb(), LocalChannelTableIsEmpty, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 1163 of file async.c.

1164{
1165 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1167 ereport(ERROR,
1169 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1170}

References ereport, errcode(), errmsg, ERROR, fb(), pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 2510 of file async.c.

2511{
2512 int my_level = GetCurrentTransactionNestLevel();
2513
2514 /*
2515 * All we have to do is pop the stack --- the actions/notifies made in
2516 * this subxact are no longer interesting, and the space will be freed
2517 * when CurTransactionContext is recycled. We still have to free the
2518 * ActionList and NotificationList objects themselves, though, because
2519 * those are allocated in TopTransactionContext.
2520 *
2521 * Note that there might be no entries at all, or no entries for the
2522 * current subtransaction level, either because none were ever created, or
2523 * because we reentered this routine due to trouble during subxact abort.
2524 */
2525 while (pendingActions != NULL &&
2526 pendingActions->nestingLevel >= my_level)
2527 {
2529
2532 }
2533
2534 while (pendingNotifies != NULL &&
2535 pendingNotifies->nestingLevel >= my_level)
2536 {
2538
2541 }
2542}

References fb(), GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 2440 of file async.c.

2441{
2442 int my_level = GetCurrentTransactionNestLevel();
2443
2444 /* If there are actions at our nesting level, we must reparent them. */
2445 if (pendingActions != NULL &&
2446 pendingActions->nestingLevel >= my_level)
2447 {
2448 if (pendingActions->upper == NULL ||
2449 pendingActions->upper->nestingLevel < my_level - 1)
2450 {
2451 /* nothing to merge; give the whole thing to the parent */
2453 }
2454 else
2455 {
2457
2459
2460 /*
2461 * Mustn't try to eliminate duplicates here --- see queue_listen()
2462 */
2465 childPendingActions->actions);
2467 }
2468 }
2469
2470 /* If there are notifies at our nesting level, we must reparent them. */
2471 if (pendingNotifies != NULL &&
2472 pendingNotifies->nestingLevel >= my_level)
2473 {
2474 Assert(pendingNotifies->nestingLevel == my_level);
2475
2476 if (pendingNotifies->upper == NULL ||
2477 pendingNotifies->upper->nestingLevel < my_level - 1)
2478 {
2479 /* nothing to merge; give the whole thing to the parent */
2481 }
2482 else
2483 {
2484 /*
2485 * Formerly, we didn't bother to eliminate duplicates here, but
2486 * now we must, else we fall foul of "Assert(!found)", either here
2487 * or during a later attempt to build the parent-level hashtable.
2488 */
2490 ListCell *l;
2491
2493 /* Insert all the subxact's events into parent, except for dups */
2494 foreach(l, childPendingNotifies->events)
2495 {
2497
2500 }
2502 }
2503 }
2504}

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), fb(), GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ BecomeRegisteredListener()

static void BecomeRegisteredListener ( void  )
static

Definition at line 1433 of file async.c.

1434{
1435 QueuePosition head;
1436 QueuePosition max;
1438
1439 /*
1440 * Nothing to do if we are already listening to something, nor if we
1441 * already ran this routine in this transaction.
1442 */
1444 return;
1445
1446 if (Trace_notify)
1447 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1448
1449 /*
1450 * Before registering, make sure we will unlisten before dying. (Note:
1451 * this action does not get undone if we abort later.)
1452 */
1454 {
1457 }
1458
1459 /*
1460 * This is our first LISTEN, so establish our pointer.
1461 *
1462 * We set our pointer to the global tail pointer and then move it forward
1463 * over already-committed notifications. This ensures we cannot miss any
1464 * not-yet-committed notifications. We might get a few more but that
1465 * doesn't hurt.
1466 *
1467 * In some scenarios there might be a lot of committed notifications that
1468 * have not yet been pruned away (because some backend is being lazy about
1469 * reading them). To reduce our startup time, we can look at other
1470 * backends and adopt the maximum "pos" pointer of any backend that's in
1471 * our database; any notifications it's already advanced over are surely
1472 * committed and need not be re-examined by us. (We must consider only
1473 * backends connected to our DB, because others will not have bothered to
1474 * check committed-ness of notifications in our DB.)
1475 *
1476 * We need exclusive lock here so we can look at other backends' entries
1477 * and manipulate the list links.
1478 */
1480 head = QUEUE_HEAD;
1481 max = QUEUE_TAIL;
1484 {
1486 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1487 /* Also find last listening backend before this one */
1488 if (i < MyProcNumber)
1489 prevListener = i;
1490 }
1496 /* Insert backend into list of listeners at correct position */
1498 {
1501 }
1502 else
1503 {
1506 }
1508
1509 /* Now we are listed in the global array, so remember we're listening */
1510 amRegisteredListener = true;
1511
1512 /*
1513 * Try to move our pointer forward as far as possible. This will skip
1514 * over already-committed notifications, which we want to do because they
1515 * might be quite stale. Note that we are not yet listening on anything,
1516 * so we won't deliver such notifications to our frontend. Also, although
1517 * our transaction might have executed NOTIFY, those message(s) aren't
1518 * queued yet so we won't skip them here.
1519 */
1520 if (!QUEUE_POS_EQUAL(max, head))
1522}

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, fb(), i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ check_notify_buffers()

bool check_notify_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 3298 of file async.c.

3299{
3300 return check_slru_buffers("notify_buffers", newval);
3301}

References check_slru_buffers(), and newval.

◆ CleanupListenersOnExit()

static void CleanupListenersOnExit ( void  )
static

Definition at line 1849 of file async.c.

1850{
1851 dshash_seq_status status;
1852 GlobalChannelEntry *entry;
1853
1854 if (Trace_notify)
1855 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1856
1857 /* Clear our local cache (not really necessary, but be consistent) */
1858 if (localChannelTable != NULL)
1859 {
1862 }
1863
1864 /* Now remove our entries from the shared globalChannelTable */
1865 if (globalChannelTable == NULL)
1866 return;
1867
1868 dshash_seq_init(&status, globalChannelTable, true);
1869 while ((entry = dshash_seq_next(&status)) != NULL)
1870 {
1872
1873 if (entry->key.dboid != MyDatabaseId)
1874 continue; /* not relevant */
1875
1878
1879 for (int i = 0; i < entry->numListeners; i++)
1880 {
1881 if (listeners[i].procNo == MyProcNumber)
1882 {
1883 entry->numListeners--;
1884 if (i < entry->numListeners)
1885 memmove(&listeners[i], &listeners[i + 1],
1886 sizeof(ListenerEntry) * (entry->numListeners - i));
1887
1888 if (entry->numListeners == 0)
1889 {
1891 dshash_delete_current(&status);
1892 }
1893 break;
1894 }
1895 }
1896 }
1897 dshash_seq_term(&status);
1898}

References GlobalChannelKey::dboid, DEBUG1, dsa_free(), dsa_get_address(), dshash_delete_current(), dshash_seq_init(), dshash_seq_next(), dshash_seq_term(), elog, fb(), globalChannelDSA, globalChannelTable, hash_destroy(), i, GlobalChannelEntry::key, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, MyProcPid, GlobalChannelEntry::numListeners, and Trace_notify.

Referenced by Async_UnlistenOnExit().

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 3280 of file async.c.

3281{
3282 /*
3283 * Everything's allocated in either TopTransactionContext or the context
3284 * for the subtransaction to which it corresponds. So, there's nothing to
3285 * do here except reset the pointers; the space will be reclaimed when the
3286 * contexts are deleted.
3287 */
3290 /* Also clear pendingListenActions, which is derived from pendingActions */
3292}

References fb(), pendingActions, pendingListenActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ GlobalChannelKeyInit()

static void GlobalChannelKeyInit ( GlobalChannelKey key,
Oid  dboid,
const char channel 
)
inlinestatic

Definition at line 648 of file async.c.

649{
650 memset(key, 0, sizeof(GlobalChannelKey));
651 key->dboid = dboid;
652 strlcpy(key->channel, channel, NAMEDATALEN);
653}

References fb(), NAMEDATALEN, and strlcpy().

Referenced by ApplyPendingListenActions(), PrepareTableEntriesForListen(), and SignalBackends().

◆ globalChannelTableHash()

static dshash_hash globalChannelTableHash ( const void key,
size_t  size,
void arg 
)
static

Definition at line 660 of file async.c.

661{
662 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
663 dshash_hash h;
664
666 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
668
669 return h;
670}

References GlobalChannelKey::channel, DatumGetUInt32(), GlobalChannelKey::dboid, fb(), hash_any(), hash_uint32(), and NAMEDATALEN.

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 2553 of file async.c.

2554{
2555 /*
2556 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2557 * you do here.
2558 */
2559
2560 /* signal that work needs to be done */
2562
2563 /* make sure the event is processed in due course */
2565}

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ initGlobalChannelTable()

static void initGlobalChannelTable ( void  )
static

Definition at line 687 of file async.c.

688{
689 MemoryContext oldcontext;
690
691 /* Quick exit if we already did this */
694 return;
695
696 /* Otherwise, use a lock to ensure only one process creates the table */
698
699 /* Be sure any local memory allocated by DSA routines is persistent */
701
703 {
704 /* Initialize dynamic shared hash table for global channels */
710 NULL);
711
712 /* Store handles in shared memory for other backends to use */
716 }
717 else if (!globalChannelTable)
718 {
719 /* Attach to existing dynamic shared hash table */
725 NULL);
726 }
727
728 MemoryContextSwitchTo(oldcontext);
730}

References asyncQueueControl, dsa_attach(), dsa_create, dsa_get_handle(), dsa_pin(), dsa_pin_mapping(), dshash_attach(), dshash_create(), dshash_get_hash_table_handle(), DSHASH_HANDLE_INVALID, fb(), globalChannelDSA, globalChannelTable, AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, globalChannelTableDSHParams, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MemoryContextSwitchTo(), and TopMemoryContext.

Referenced by PreCommit_Notify().

◆ initLocalChannelTable()

static void initLocalChannelTable ( void  )
static

Definition at line 738 of file async.c.

739{
741
742 /* Quick exit if we already did this */
743 if (localChannelTable != NULL)
744 return;
745
746 /* Initialize local hash table for this backend's listened channels */
748 hash_ctl.entrysize = sizeof(ChannelName);
749
751 hash_create("Local Listen Channels",
752 64,
753 &hash_ctl,
755}

References fb(), hash_create(), HASH_ELEM, HASH_STRINGS, HASHCTL::keysize, localChannelTable, and NAMEDATALEN.

Referenced by PreCommit_Notify().

◆ initPendingListenActions()

static void initPendingListenActions ( void  )
static

◆ IsListeningOn()

static bool IsListeningOn ( const char channel)
static

Definition at line 1906 of file async.c.

1907{
1908 if (localChannelTable == NULL)
1909 return false;
1910
1911 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1912}

References fb(), HASH_FIND, hash_search(), and localChannelTable.

Referenced by asyncQueueProcessPageEntries().

◆ notification_hash()

static uint32 notification_hash ( const void key,
Size  keysize 
)
static

Definition at line 3250 of file async.c.

3251{
3252 const Notification *k = *(const Notification *const *) key;
3253
3254 Assert(keysize == sizeof(Notification *));
3255 /* We don't bother to include the payload's trailing null in the hash */
3256 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3257 k->channel_len + k->payload_len + 1));
3258}

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void key1,
const void key2,
Size  keysize 
)
static

Definition at line 3264 of file async.c.

3265{
3266 const Notification *k1 = *(const Notification *const *) key1;
3267 const Notification *k2 = *(const Notification *const *) key2;
3268
3269 Assert(keysize == sizeof(Notification *));
3270 if (k1->channel_len == k2->channel_len &&
3271 k1->payload_len == k2->payload_len &&
3272 memcmp(k1->data, k2->data,
3273 k1->channel_len + k1->payload_len + 2) == 0)
3274 return 0; /* equal */
3275 return 1; /* not equal */
3276}

References Assert, and fb().

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char channel,
const char payload,
int32  srcPid 
)

Definition at line 3101 of file async.c.

3102{
3104 {
3106
3108 pq_sendint32(&buf, srcPid);
3109 pq_sendstring(&buf, channel);
3110 pq_sendstring(&buf, payload);
3112
3113 /*
3114 * NOTE: we do not do pq_flush() here. Some level of caller will
3115 * handle it later, allowing this message to be combined into a packet
3116 * with other ones.
3117 */
3118 }
3119 else
3120 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3121}

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 1098 of file async.c.

1099{
1101 HASH_SEQ_STATUS *status;
1102
1103 /* stuff done only on the first call of the function */
1104 if (SRF_IS_FIRSTCALL())
1105 {
1106 /* create a function context for cross-call persistence */
1108
1109 /* Initialize hash table iteration if we have any channels */
1110 if (localChannelTable != NULL)
1111 {
1112 MemoryContext oldcontext;
1113
1114 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1115 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1117 funcctx->user_fctx = status;
1118 MemoryContextSwitchTo(oldcontext);
1119 }
1120 else
1121 {
1122 funcctx->user_fctx = NULL;
1123 }
1124 }
1125
1126 /* stuff done on every call of the function */
1128 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1129
1130 if (status != NULL)
1131 {
1132 ChannelName *entry;
1133
1134 entry = (ChannelName *) hash_seq_search(status);
1135 if (entry != NULL)
1137 }
1138
1140}

References ChannelName::channel, CStringGetTextDatum, fb(), hash_seq_init(), hash_seq_search(), localChannelTable, MemoryContextSwitchTo(), palloc(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 2171 of file async.c.

2172{
2173 double usage;
2174
2175 /* Advance the queue tail so we don't report a too-large result */
2177
2181
2183}

References asyncQueueAdvanceTail(), asyncQueueUsage(), fb(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 863 of file async.c.

864{
865 const char *channel;
866 const char *payload;
867
868 if (PG_ARGISNULL(0))
869 channel = "";
870 else
872
873 if (PG_ARGISNULL(1))
874 payload = "";
875 else
877
878 /* For NOTIFY as a statement, this is checked in ProcessUtility */
880
881 Async_Notify(channel, payload);
882
884}

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 1188 of file async.c.

1189{
1190 ListCell *p;
1191
1193 return; /* no relevant statements in this xact */
1194
1195 if (Trace_notify)
1196 elog(DEBUG1, "PreCommit_Notify");
1197
1198 /* Preflight for any pending listen/unlisten actions */
1200
1201 if (pendingActions != NULL)
1202 {
1203 /* Ensure we have a local channel table */
1205 /* Create pendingListenActions hash table for this transaction */
1207
1208 /* Stage all the actions this transaction wants to perform */
1209 foreach(p, pendingActions->actions)
1210 {
1212
1213 switch (actrec->action)
1214 {
1215 case LISTEN_LISTEN:
1218 break;
1219 case LISTEN_UNLISTEN:
1221 break;
1224 break;
1225 }
1226 }
1227 }
1228
1229 /* Queue any pending notifies (must happen after the above) */
1230 if (pendingNotifies)
1231 {
1233 bool firstIteration = true;
1234
1235 /*
1236 * Build list of unique channel names being notified for use by
1237 * SignalBackends().
1238 *
1239 * If uniqueChannelHash is available, use it to efficiently get the
1240 * unique channels. Otherwise, fall back to the O(N^2) approach.
1241 */
1244 {
1245 HASH_SEQ_STATUS status;
1247
1249 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1252 channelEntry->channel);
1253 }
1254 else
1255 {
1256 /* O(N^2) approach is better for small number of notifications */
1258 {
1259 char *channel = n->data;
1260 bool found = false;
1261
1262 /* Name present in list? */
1264 {
1265 if (strcmp(oldchan, channel) == 0)
1266 {
1267 found = true;
1268 break;
1269 }
1270 }
1271 /* Add if not already in list */
1272 if (!found)
1275 channel);
1276 }
1277 }
1278
1279 /* Preallocate workspace that will be needed by SignalBackends() */
1280 if (signalPids == NULL)
1282 MaxBackends * sizeof(int32));
1283
1284 if (signalProcnos == NULL)
1286 MaxBackends * sizeof(ProcNumber));
1287
1288 /*
1289 * Make sure that we have an XID assigned to the current transaction.
1290 * GetCurrentTransactionId is cheap if we already have an XID, but not
1291 * so cheap if we don't, and we'd prefer not to do that work while
1292 * holding NotifyQueueLock.
1293 */
1295
1296 /*
1297 * Serialize writers by acquiring a special lock that we hold till
1298 * after commit. This ensures that queue entries appear in commit
1299 * order, and in particular that there are never uncommitted queue
1300 * entries ahead of committed ones, so an uncommitted transaction
1301 * can't block delivery of deliverable notifications.
1302 *
1303 * We use a heavyweight lock so that it'll automatically be released
1304 * after either commit or abort. This also allows deadlocks to be
1305 * detected, though really a deadlock shouldn't be possible here.
1306 *
1307 * The lock is on "database 0", which is pretty ugly but it doesn't
1308 * seem worth inventing a special locktag category just for this.
1309 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1310 * used by the flatfiles mechanism.)
1311 */
1314
1315 /*
1316 * For the direct advancement optimization in SignalBackends(), we
1317 * need to ensure that no other backend can insert queue entries
1318 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1319 * heavyweight lock above provides this guarantee, since it serializes
1320 * all writers.
1321 *
1322 * Note: if the heavyweight lock were ever removed for scalability
1323 * reasons, we could achieve the same guarantee by holding
1324 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1325 * than releasing and reacquiring it for each page as we do below.
1326 */
1327
1328 /* Initialize values to a safe default in case list is empty */
1331
1332 /* Now push the notifications into the queue */
1334 while (nextNotify != NULL)
1335 {
1336 /*
1337 * Add the pending notifications to the queue. We acquire and
1338 * release NotifyQueueLock once per page, which might be overkill
1339 * but it does allow readers to get in while we're doing this.
1340 *
1341 * A full queue is very uncommon and should really not happen,
1342 * given that we have so much space available in the SLRU pages.
1343 * Nevertheless we need to deal with this possibility. Note that
1344 * when we get here we are in the process of committing our
1345 * transaction, but we have not yet committed to clog, so at this
1346 * point in time we can still roll the transaction back.
1347 */
1349 if (firstIteration)
1350 {
1352 firstIteration = false;
1353 }
1355 if (asyncQueueIsFull())
1356 ereport(ERROR,
1358 errmsg("too many notifications in the NOTIFY queue")));
1362 }
1363
1364 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1365 }
1366}

References AccessExclusiveLock, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), BecomeRegisteredListener(), DEBUG1, elog, ereport, errcode(), errmsg, ERROR, NotificationList::events, fb(), foreach_ptr, GetCurrentTransactionId(), hash_seq_init(), hash_seq_search(), initGlobalChannelTable(), initLocalChannelTable(), initPendingListenActions(), InvalidOid, lappend(), lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MemoryContextAlloc(), NIL, pendingActions, pendingNotifies, PrepareTableEntriesForListen(), PrepareTableEntriesForUnlisten(), PrepareTableEntriesForUnlistenAll(), QUEUE_HEAD, queueHeadAfterWrite, queueHeadBeforeWrite, SET_QUEUE_POS, signalPids, signalProcnos, TopMemoryContext, Trace_notify, NotificationList::uniqueChannelHash, and NotificationList::uniqueChannelNames.

Referenced by CommitTransaction().

◆ PrepareTableEntriesForListen()

static void PrepareTableEntriesForListen ( const char channel)
static

Definition at line 1534 of file async.c.

1535{
1537 GlobalChannelEntry *entry;
1538 bool found;
1540 PendingListenEntry *pending;
1541
1542 /*
1543 * Record in local pending hash that we want to LISTEN, overwriting any
1544 * earlier attempt to UNLISTEN.
1545 */
1546 pending = (PendingListenEntry *)
1548 pending->action = PENDING_LISTEN;
1549
1550 /*
1551 * Ensure that there is an entry for the channel in localChannelTable.
1552 * (Should this fail, we can just roll back.) If the transaction fails
1553 * after this point, we will remove the entry if appropriate during
1554 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1555 * to return TRUE; we assume nothing is going to consult that before
1556 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1557 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1558 * present to ensure they do the right things; see
1559 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1560 */
1562
1563 /* Pre-allocate entry in shared globalChannelTable with listening=false */
1564 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1565 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1566
1567 if (!found)
1568 {
1569 /* New channel entry, so initialize it to a safe state */
1571 entry->numListeners = 0;
1572 entry->allocatedListeners = 0;
1573 }
1574
1575 /*
1576 * Create listenersArray if entry doesn't have one. It's tempting to fold
1577 * this into the !found case, but this coding allows us to cope in case
1578 * dsa_allocate() failed in an earlier attempt.
1579 */
1580 if (!DsaPointerIsValid(entry->listenersArray))
1581 {
1585 }
1586
1589
1590 /*
1591 * Check if we already have a ListenerEntry (possibly from earlier in this
1592 * transaction)
1593 */
1594 for (int i = 0; i < entry->numListeners; i++)
1595 {
1596 if (listeners[i].procNo == MyProcNumber)
1597 {
1598 /* Already have an entry; listening flag stays as-is until commit */
1600 return;
1601 }
1602 }
1603
1604 /* Need to add a new entry; grow array if necessary */
1605 if (entry->numListeners >= entry->allocatedListeners)
1606 {
1607 int new_size = entry->allocatedListeners * 2;
1610 sizeof(ListenerEntry) * new_size);
1612
1614 entry->listenersArray = new_array;
1618 }
1619
1620 listeners[entry->numListeners].procNo = MyProcNumber;
1621 listeners[entry->numListeners].listening = false; /* staged, not yet
1622 * committed */
1623 entry->numListeners++;
1624
1626}

References PendingListenEntry::action, GlobalChannelEntry::allocatedListeners, dsa_allocate, dsa_free(), dsa_get_address(), DsaPointerIsValid, dshash_find_or_insert, dshash_release_lock(), fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_ENTER, hash_search(), i, INITIAL_LISTENERS_ARRAY_SIZE, InvalidDsaPointer, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PENDING_LISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlisten()

static void PrepareTableEntriesForUnlisten ( const char channel)
static

Definition at line 1637 of file async.c.

1638{
1639 PendingListenEntry *pending;
1640
1641 /*
1642 * If the channel name is not in localChannelTable, then we are neither
1643 * listening on it nor preparing to listen on it, so we don't need to
1644 * record an UNLISTEN action.
1645 */
1647 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1648 return;
1649
1650 /*
1651 * Record in local pending hash that we want to UNLISTEN, overwriting any
1652 * earlier attempt to LISTEN. Don't touch localChannelTable or
1653 * globalChannelTable yet - we keep receiving signals until commit.
1654 */
1655 pending = (PendingListenEntry *)
1657 pending->action = PENDING_UNLISTEN;
1658}

References PendingListenEntry::action, Assert, fb(), HASH_ENTER, HASH_FIND, hash_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlistenAll()

static void PrepareTableEntriesForUnlistenAll ( void  )
static

Definition at line 1667 of file async.c.

1668{
1671 PendingListenEntry *pending;
1672
1673 /*
1674 * Scan localChannelTable, which will have the names of all channels that
1675 * we are listening on or have prepared to listen on. Record an UNLISTEN
1676 * action for each one, overwriting any earlier attempt to LISTEN.
1677 */
1679 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1680 {
1681 pending = (PendingListenEntry *)
1683 pending->action = PENDING_UNLISTEN;
1684 }
1685}

References PendingListenEntry::action, fb(), HASH_ENTER, hash_search(), hash_seq_init(), hash_seq_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 3060 of file async.c.

3061{
3062 /* We *must* reset the flag */
3063 notifyInterruptPending = false;
3064
3065 /* Do nothing else if we aren't actively listening */
3067 return;
3068
3069 if (Trace_notify)
3070 elog(DEBUG1, "ProcessIncomingNotify");
3071
3072 set_ps_display("notify interrupt");
3073
3074 /*
3075 * We must run asyncQueueReadAllNotifications inside a transaction, else
3076 * bad things happen if it gets an error.
3077 */
3079
3081
3083
3084 /*
3085 * If this isn't an end-of-command case, we must flush the notify messages
3086 * to ensure frontend gets them promptly.
3087 */
3088 if (flush)
3089 pq_flush();
3090
3091 set_ps_display("idle");
3092
3093 if (Trace_notify)
3094 elog(DEBUG1, "ProcessIncomingNotify: done");
3095}

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, LocalChannelTableIsEmpty, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 2583 of file async.c.

2584{
2586 return; /* not really idle */
2587
2588 /* Loop in case another signal arrives while sending messages */
2590 ProcessIncomingNotify(flush);
2591}

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char channel 
)
static

Definition at line 999 of file async.c.

1000{
1001 MemoryContext oldcontext;
1003 int my_level = GetCurrentTransactionNestLevel();
1004
1005 /*
1006 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1007 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1008 * final per-channel intent is computed during PreCommit_Notify.
1009 */
1011
1012 /* space for terminating null is included in sizeof(ListenAction) */
1014 strlen(channel) + 1);
1015 actrec->action = action;
1016 strcpy(actrec->channel, channel);
1017
1018 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1019 {
1020 ActionList *actions;
1021
1022 /*
1023 * First action in current sub(xact). Note that we allocate the
1024 * ActionList in TopTransactionContext; the nestingLevel might get
1025 * changed later by AtSubCommit_Notify.
1026 */
1027 actions = (ActionList *)
1029 actions->nestingLevel = my_level;
1030 actions->actions = list_make1(actrec);
1031 actions->upper = pendingActions;
1032 pendingActions = actions;
1033 }
1034 else
1036
1037 MemoryContextSwitchTo(oldcontext);
1038}

References ActionList::actions, CurTransactionContext, fb(), GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ RemoveListenerFromChannel()

static void RemoveListenerFromChannel ( GlobalChannelEntry **  entry_ptr,
ListenerEntry listeners,
int  idx 
)
static

Definition at line 1697 of file async.c.

1700{
1701 GlobalChannelEntry *entry = *entry_ptr;
1702
1703 entry->numListeners--;
1704 if (idx < entry->numListeners)
1706 sizeof(ListenerEntry) * (entry->numListeners - idx));
1707
1708 if (entry->numListeners == 0)
1709 {
1712 /* tells caller not to release the entry's lock: */
1713 *entry_ptr = NULL;
1714 }
1715}

References dsa_free(), dshash_delete_entry(), fb(), globalChannelDSA, globalChannelTable, idx(), GlobalChannelEntry::listenersArray, and GlobalChannelEntry::numListeners.

Referenced by ApplyPendingListenActions().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 2269 of file async.c.

2270{
2271 int count;
2272
2273 /* Can't get here without PreCommit_Notify having made the global table */
2275
2276 /* It should have set up these arrays, too */
2278
2279 /*
2280 * Identify backends that we need to signal. We don't want to send
2281 * signals while holding the NotifyQueueLock, so this part just builds a
2282 * list of target PIDs in signalPids[] and signalProcnos[].
2283 */
2284 count = 0;
2285
2287
2288 /* Scan each channel name that we notified in this transaction */
2290 {
2292 GlobalChannelEntry *entry;
2294
2295 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2296 entry = dshash_find(globalChannelTable, &key, false);
2297 if (entry == NULL)
2298 continue; /* nobody is listening */
2299
2301 entry->listenersArray);
2302
2303 /* Identify listeners that now need waking, add them to arrays */
2304 for (int j = 0; j < entry->numListeners; j++)
2305 {
2306 ProcNumber i;
2307 int32 pid;
2308 QueuePosition pos;
2309
2310 if (!listeners[j].listening)
2311 continue; /* ignore not-yet-committed listeners */
2312
2313 i = listeners[j].procNo;
2314
2316 continue; /* already signaled, no need to repeat */
2317
2318 pid = QUEUE_BACKEND_PID(i);
2319 pos = QUEUE_BACKEND_POS(i);
2320
2321 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2322 continue; /* it's fully caught up already */
2323
2324 Assert(pid != InvalidPid);
2325
2327 signalPids[count] = pid;
2328 signalProcnos[count] = i;
2329 count++;
2330 }
2331
2333 }
2334
2335 /*
2336 * Scan all listeners. Any that are not already pending wakeup must not
2337 * be interested in our notifications (else we'd have set their wakeup
2338 * flags above). Check to see if we can directly advance their queue
2339 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2340 * them anyway so they will catch up.
2341 */
2343 {
2344 int32 pid;
2345 QueuePosition pos;
2346
2348 continue;
2349
2350 /* If it's currently advancing, we should not touch it */
2352 continue;
2353
2354 pid = QUEUE_BACKEND_PID(i);
2355 pos = QUEUE_BACKEND_POS(i);
2356
2357 /*
2358 * We can directly advance the other backend's queue pointer if it's
2359 * not currently advancing (else there are race conditions), and its
2360 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2361 * it miss some older messages), and we'd not be moving the pointer
2362 * backward.
2363 */
2366 {
2367 /* We can directly advance its pointer past what we wrote */
2369 }
2372 {
2373 /* It's idle and far behind, so wake it up */
2374 Assert(pid != InvalidPid);
2375
2377 signalPids[count] = pid;
2378 signalProcnos[count] = i;
2379 count++;
2380 }
2381 }
2382
2384
2385 /* Now send signals */
2386 for (int i = 0; i < count; i++)
2387 {
2388 int32 pid = signalPids[i];
2389
2390 /*
2391 * If we are signaling our own process, no need to involve the kernel;
2392 * just set the flag directly.
2393 */
2394 if (pid == MyProcPid)
2395 {
2397 continue;
2398 }
2399
2400 /*
2401 * Note: assuming things aren't broken, a signal failure here could
2402 * only occur if the target backend exited since we released
2403 * NotifyQueueLock; which is unlikely but certainly possible. So we
2404 * just log a low-level debug message if it happens.
2405 */
2407 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2408 }
2409}

References Assert, asyncQueuePageDiff(), DEBUG3, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), foreach_ptr, globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, i, INVALID_PROC_NUMBER, InvalidPid, j, GlobalChannelEntry::listenersArray, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcPid, notifyInterruptPending, GlobalChannelEntry::numListeners, pendingNotifies, PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, QUEUE_POS_PRECEDES, queueHeadAfterWrite, queueHeadBeforeWrite, SendProcSignal(), signalPids, signalProcnos, and NotificationList::uniqueChannelNames.

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 346 of file async.c.

Referenced by asyncQueueFillWarning(), AsyncShmemInit(), and initGlobalChannelTable().

◆ globalChannelDSA

◆ globalChannelTable

◆ globalChannelTableDSHParams

const dshash_parameters globalChannelTableDSHParams
static

◆ localChannelTable

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 570 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 362 of file async.c.

◆ notifyInterruptPending

◆ pendingActions

◆ pendingListenActions

◆ pendingNotifies

◆ queueHeadAfterWrite

QueuePosition queueHeadAfterWrite
static

Definition at line 553 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ queueHeadBeforeWrite

QueuePosition queueHeadBeforeWrite
static

Definition at line 552 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalPids

int32* signalPids = NULL
static

Definition at line 560 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalProcnos

ProcNumber* signalProcnos = NULL
static

Definition at line 561 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 564 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 541 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and BecomeRegisteredListener().