PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "storage/subsystems.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  GlobalChannelKey
 
struct  ListenerEntry
 
struct  GlobalChannelEntry
 
struct  ListenAction
 
struct  ActionList
 
struct  PendingListenEntry
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 
struct  ChannelName
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_POS_PRECEDES(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define QUEUE_BACKEND_WAKEUP_PENDING(i)   (asyncQueueControl->backend[i].wakeupPending)
 
#define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
 
#define NotifyCtl   (&NotifySlruDesc)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define INITIAL_LISTENERS_ARRAY_SIZE   4
 
#define LocalChannelTableIsEmpty()    (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct GlobalChannelKey GlobalChannelKey
 
typedef struct ListenerEntry ListenerEntry
 
typedef struct GlobalChannelEntry GlobalChannelEntry
 
typedef struct ActionList ActionList
 
typedef struct PendingListenEntry PendingListenEntry
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct ChannelName ChannelName
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 
enum  PendingListenAction { PENDING_LISTEN , PENDING_UNLISTEN }
 

Functions

static void AsyncShmemRequest (void *arg)
 
static void AsyncShmemInit (void *arg)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static int asyncQueueErrdetailForIoError (const void *opaque_data)
 
static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static void GlobalChannelKeyInit (GlobalChannelKey *key, Oid dboid, const char *channel)
 
static dshash_hash globalChannelTableHash (const void *key, size_t size, void *arg)
 
static void initGlobalChannelTable (void)
 
static void initLocalChannelTable (void)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void BecomeRegisteredListener (void)
 
static void PrepareTableEntriesForListen (const char *channel)
 
static void PrepareTableEntriesForUnlisten (const char *channel)
 
static void PrepareTableEntriesForUnlistenAll (void)
 
static void RemoveListenerFromChannel (GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
 
static void ApplyPendingListenActions (bool isCommit)
 
static void CleanupListenersOnExit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (QueuePosition *current, QueuePosition stop, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
static void initPendingListenActions (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void AsyncNotifyFreezeXids (TransactionId newFrozenXid)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
const ShmemCallbacks AsyncShmemCallbacks
 
static SlruDesc NotifySlruDesc
 
static dshash_tableglobalChannelTable = NULL
 
static dsa_areaglobalChannelDSA = NULL
 
static HTABlocalChannelTable = NULL
 
static ActionListpendingActions = NULL
 
static HTABpendingListenActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static QueuePosition queueHeadBeforeWrite
 
static QueuePosition queueHeadAfterWrite
 
static int32signalPids = NULL
 
static ProcNumbersignalProcnos = NULL
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 
static const dshash_parameters globalChannelTableDSHParams
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 227 of file async.c.

◆ INITIAL_LISTENERS_ARRAY_SIZE

#define INITIAL_LISTENERS_ARRAY_SIZE   4

Definition at line 392 of file async.c.

◆ LocalChannelTableIsEmpty

#define LocalChannelTableIsEmpty ( )     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)

Definition at line 425 of file async.c.

438{
443
444typedef struct
445{
447 char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
449
450typedef struct ActionList
451{
452 int nestingLevel; /* current transaction nesting depth */
453 List *actions; /* list of ListenAction structs */
454 struct ActionList *upper; /* details for upper transaction levels */
455} ActionList;
456
458
459/*
460 * Hash table recording the final listen/unlisten intent per channel for
461 * the current transaction. Key is channel name, value is PENDING_LISTEN or
462 * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
463 * per channel instead of replaying every action. This is built from the
464 * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
465 * AtAbort_Notify.
466 */
467typedef enum
468{
472
473typedef struct PendingListenEntry
474{
475 char channel[NAMEDATALEN]; /* hash key */
476 PendingListenAction action; /* which action should we perform? */
478
480
481/*
482 * State for outbound notifies consists of a list of all channels+payloads
483 * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
484 * until and unless the transaction commits. pendingNotifies is NULL if no
485 * NOTIFYs have been done in the current (sub) transaction.
486 *
487 * We discard duplicate notify events issued in the same transaction.
488 * Hence, in addition to the list proper (which we need to track the order
489 * of the events, since we guarantee to deliver them in order), we build a
490 * hash table which we can probe to detect duplicates. Since building the
491 * hash table is somewhat expensive, we do so only once we have at least
492 * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
493 * before that we just scan the events linearly.
494 *
495 * The list is kept in CurTransactionContext. In subtransactions, each
496 * subtransaction has its own list in its own CurTransactionContext, but
497 * successful subtransactions add their entries to their parent's list.
498 * Failed subtransactions simply discard their lists. Since these lists
499 * are independent, there may be notify events in a subtransaction's list
500 * that duplicate events in some ancestor (sub) transaction; we get rid of
501 * the dups when merging the subtransaction's list into its parent's.
502 *
503 * Note: the action and notify lists do not interact within a transaction.
504 * In particular, if a transaction does NOTIFY and then LISTEN on the same
505 * condition name, it will get a self-notify at commit. This is a bit odd
506 * but is consistent with our historical behavior.
507 */
508typedef struct Notification
509{
510 uint16 channel_len; /* length of channel-name string */
511 uint16 payload_len; /* length of payload string */
512 /* null-terminated channel name, then null-terminated payload follow */
515
516typedef struct NotificationList
517{
518 int nestingLevel; /* current transaction nesting depth */
519 List *events; /* list of Notification structs */
520 HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
521 List *uniqueChannelNames; /* unique channel names being notified */
522 HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
523 struct NotificationList *upper; /* details for upper transaction levels */
525
526#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
527
528struct NotificationHash
529{
530 Notification *event; /* => the actual Notification struct */
531};
532
534
535/*
536 * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
537 * (both just carry the channel name, with no payload).
538 */
539typedef struct ChannelName
540{
541 char channel[NAMEDATALEN]; /* hash key */
543
544/*
545 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
546 * called from inside a signal handler. That just sets the
547 * notifyInterruptPending flag and sets the process
548 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
549 * actually deal with the interrupt.
550 */
551volatile sig_atomic_t notifyInterruptPending = false;
552
553/* True if we've registered an on_shmem_exit cleanup */
554static bool unlistenExitRegistered = false;
555
556/* True if we're currently registered as a listener in asyncQueueControl */
557static bool amRegisteredListener = false;
558
559/*
560 * Queue head positions for direct advancement.
561 * These are captured during PreCommit_Notify while holding the heavyweight
562 * lock on database 0, ensuring no other backend can insert notifications
563 * between them. SignalBackends uses these to advance idle backends.
564 */
567
568/*
569 * Workspace arrays for SignalBackends. These are preallocated in
570 * PreCommit_Notify to avoid needing memory allocation after committing to
571 * clog.
572 */
573static int32 *signalPids = NULL;
575
576/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
577static bool tryAdvanceTail = false;
578
579/* GUC parameters */
580bool Trace_notify = false;
581
582/* For 8 KB pages this gives 8 GB of disk space */
583int max_notify_queue_pages = 1048576;
584
585/* local function prototypes */
586static inline int64 asyncQueuePageDiff(int64 p, int64 q);
587static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
588 const char *channel);
589static dshash_hash globalChannelTableHash(const void *key, size_t size,
590 void *arg);
591static void initGlobalChannelTable(void);
592static void initLocalChannelTable(void);
593static void queue_listen(ListenActionKind action, const char *channel);
594static void Async_UnlistenOnExit(int code, Datum arg);
595static void BecomeRegisteredListener(void);
596static void PrepareTableEntriesForListen(const char *channel);
597static void PrepareTableEntriesForUnlisten(const char *channel);
598static void PrepareTableEntriesForUnlistenAll(void);
601 int idx);
602static void ApplyPendingListenActions(bool isCommit);
603static void CleanupListenersOnExit(void);
604static bool IsListeningOn(const char *channel);
605static void asyncQueueUnregister(void);
606static bool asyncQueueIsFull(void);
607static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
610static double asyncQueueUsage(void);
611static void asyncQueueFillWarning(void);
612static void SignalBackends(void);
613static void asyncQueueReadAllNotifications(void);
615 QueuePosition stop,
616 Snapshot snapshot);
617static void asyncQueueAdvanceTail(void);
618static void ProcessIncomingNotify(bool flush);
621static uint32 notification_hash(const void *key, Size keysize);
622static int notification_match(const void *key1, const void *key2, Size keysize);
623static void ClearPendingActionsAndNotifies(void);
624
625static int
627{
628 const QueuePosition *position = opaque_data;
629
630 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
631 position->page, position->offset);
632}
633
634/*
635 * Compute the difference between two queue page numbers.
636 * Previously this function accounted for a wraparound.
637 */
638static inline int64
640{
641 return p - q;
642}
643
644/*
645 * Determines whether p precedes q.
646 * Previously this function accounted for a wraparound.
647 */
648static inline bool
650{
651 return p < q;
652}
653
654/*
655 * GlobalChannelKeyInit
656 * Prepare a global channel table key for hashing.
657 */
658static inline void
659GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
660{
661 memset(key, 0, sizeof(GlobalChannelKey));
662 key->dboid = dboid;
663 strlcpy(key->channel, channel, NAMEDATALEN);
664}
665
666/*
667 * globalChannelTableHash
668 * Hash function for global channel table keys.
669 */
670static dshash_hash
671globalChannelTableHash(const void *key, size_t size, void *arg)
672{
673 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
674 dshash_hash h;
675
677 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
679
680 return h;
681}
682
683/* parameters for the global channel table */
685 sizeof(GlobalChannelKey),
686 sizeof(GlobalChannelEntry),
691};
692
693/*
694 * initGlobalChannelTable
695 * Lazy initialization of the global channel table.
696 */
697static void
699{
700 MemoryContext oldcontext;
701
702 /* Quick exit if we already did this */
705 return;
706
707 /* Otherwise, use a lock to ensure only one process creates the table */
709
710 /* Be sure any local memory allocated by DSA routines is persistent */
712
714 {
715 /* Initialize dynamic shared hash table for global channels */
721 NULL);
722
723 /* Store handles in shared memory for other backends to use */
727 }
728 else if (!globalChannelTable)
729 {
730 /* Attach to existing dynamic shared hash table */
736 NULL);
737 }
738
739 MemoryContextSwitchTo(oldcontext);
741}
742
743/*
744 * initLocalChannelTable
745 * Lazy initialization of the local channel table.
746 * Once created, this table lasts for the life of the session.
747 */
748static void
750{
752
753 /* Quick exit if we already did this */
754 if (localChannelTable != NULL)
755 return;
756
757 /* Initialize local hash table for this backend's listened channels */
759 hash_ctl.entrysize = sizeof(ChannelName);
760
762 hash_create("Local Listen Channels",
763 64,
764 &hash_ctl,
766}
767
768/*
769 * initPendingListenActions
770 * Lazy initialization of the pending listen actions hash table.
771 * This is allocated in CurTransactionContext during PreCommit_Notify,
772 * and destroyed at transaction end.
773 */
774static void
776{
778
780 return;
781
783 hash_ctl.entrysize = sizeof(PendingListenEntry);
785
787 hash_create("Pending Listen Actions",
789 &hash_ctl,
791}
792
793/*
794 * Register our shared memory needs
795 */
796static void
798{
799 Size size;
800
801 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
802 size = add_size(size, offsetof(AsyncQueueControl, backend));
803
804 ShmemRequestStruct(.name = "Async Queue Control",
805 .size = size,
806 .ptr = (void **) &asyncQueueControl,
807 );
808
810 .name = "notify",
811 .Dir = "pg_notify",
812
813 /* long segment names are used in order to avoid wraparound */
814 .long_segment_names = true,
815
816 .nslots = notify_buffers,
817
818 .sync_handler = SYNC_HANDLER_NONE,
819 .PagePrecedes = asyncQueuePagePrecedes,
820 .errdetail_for_io_error = asyncQueueErrdetailForIoError,
821
822 .buffer_tranche_id = LWTRANCHE_NOTIFY_BUFFER,
823 .bank_tranche_id = LWTRANCHE_NOTIFY_SLRU,
824 );
825}
826
827static void
828AsyncShmemInit(void *arg)
829{
832 QUEUE_STOP_PAGE = 0;
837 for (int i = 0; i < MaxBackends; i++)
838 {
845 }
846
847 /*
848 * During start or reboot, clean out the pg_notify directory.
849 */
851}
852
853
854/*
855 * pg_notify -
856 * SQL function to send a notification event
857 */
858Datum
860{
861 const char *channel;
862 const char *payload;
863
864 if (PG_ARGISNULL(0))
865 channel = "";
866 else
868
869 if (PG_ARGISNULL(1))
870 payload = "";
871 else
873
874 /* For NOTIFY as a statement, this is checked in ProcessUtility */
876
877 Async_Notify(channel, payload);
878
880}
881
882
883/*
884 * Async_Notify
885 *
886 * This is executed by the SQL notify command.
887 *
888 * Adds the message to the list of pending notifies.
889 * Actual notification happens during transaction commit.
890 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
891 */
892void
893Async_Notify(const char *channel, const char *payload)
894{
895 int my_level = GetCurrentTransactionNestLevel();
896 size_t channel_len;
897 size_t payload_len;
898 Notification *n;
899 MemoryContext oldcontext;
900
901 if (IsParallelWorker())
902 elog(ERROR, "cannot send notifications from a parallel worker");
903
904 if (Trace_notify)
905 elog(DEBUG1, "Async_Notify(%s)", channel);
906
907 channel_len = channel ? strlen(channel) : 0;
908 payload_len = payload ? strlen(payload) : 0;
909
910 /* a channel name must be specified */
911 if (channel_len == 0)
914 errmsg("channel name cannot be empty")));
915
916 /* enforce length limits */
917 if (channel_len >= NAMEDATALEN)
920 errmsg("channel name too long")));
921
922 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
925 errmsg("payload string too long")));
926
927 /*
928 * We must construct the Notification entry, even if we end up not using
929 * it, in order to compare it cheaply to existing list entries.
930 *
931 * The notification list needs to live until end of transaction, so store
932 * it in the transaction context.
933 */
935
937 channel_len + payload_len + 2);
938 n->channel_len = channel_len;
939 n->payload_len = payload_len;
940 strcpy(n->data, channel);
941 if (payload)
942 strcpy(n->data + channel_len + 1, payload);
943 else
944 n->data[channel_len + 1] = '\0';
945
946 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
947 {
949
950 /*
951 * First notify event in current (sub)xact. Note that we allocate the
952 * NotificationList in TopTransactionContext; the nestingLevel might
953 * get changed later by AtSubCommit_Notify.
954 */
957 sizeof(NotificationList));
958 notifies->nestingLevel = my_level;
959 notifies->events = list_make1(n);
960 /* We certainly don't need a hashtable yet */
961 notifies->hashtab = NULL;
962 /* We won't build uniqueChannelNames/Hash till later, either */
963 notifies->uniqueChannelNames = NIL;
964 notifies->uniqueChannelHash = NULL;
965 notifies->upper = pendingNotifies;
967 }
968 else
969 {
970 /* Now check for duplicates */
972 {
973 /* It's a dup, so forget it */
974 pfree(n);
975 MemoryContextSwitchTo(oldcontext);
976 return;
977 }
978
979 /* Append more events to existing list */
981 }
982
983 MemoryContextSwitchTo(oldcontext);
984}
985
986/*
987 * queue_listen
988 * Common code for listen, unlisten, unlisten all commands.
989 *
990 * Adds the request to the list of pending actions.
991 * Actual update of localChannelTable and globalChannelTable happens during
992 * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
993 */
994static void
995queue_listen(ListenActionKind action, const char *channel)
996{
997 MemoryContext oldcontext;
999 int my_level = GetCurrentTransactionNestLevel();
1000
1001 /*
1002 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1003 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1004 * final per-channel intent is computed during PreCommit_Notify.
1005 */
1007
1008 /* space for terminating null is included in sizeof(ListenAction) */
1010 strlen(channel) + 1);
1011 actrec->action = action;
1012 strcpy(actrec->channel, channel);
1013
1014 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1015 {
1016 ActionList *actions;
1017
1018 /*
1019 * First action in current sub(xact). Note that we allocate the
1020 * ActionList in TopTransactionContext; the nestingLevel might get
1021 * changed later by AtSubCommit_Notify.
1022 */
1023 actions = (ActionList *)
1025 actions->nestingLevel = my_level;
1026 actions->actions = list_make1(actrec);
1027 actions->upper = pendingActions;
1028 pendingActions = actions;
1029 }
1030 else
1032
1033 MemoryContextSwitchTo(oldcontext);
1034}
1035
1036/*
1037 * Async_Listen
1038 *
1039 * This is executed by the SQL listen command.
1040 */
1041void
1042Async_Listen(const char *channel)
1043{
1044 if (Trace_notify)
1045 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1046
1047 queue_listen(LISTEN_LISTEN, channel);
1048}
1049
1050/*
1051 * Async_Unlisten
1052 *
1053 * This is executed by the SQL unlisten command.
1054 */
1055void
1056Async_Unlisten(const char *channel)
1057{
1058 if (Trace_notify)
1059 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1060
1061 /* If we couldn't possibly be listening, no need to queue anything */
1063 return;
1064
1065 queue_listen(LISTEN_UNLISTEN, channel);
1066}
1067
1068/*
1069 * Async_UnlistenAll
1070 *
1071 * This is invoked by UNLISTEN * command, and also at backend exit.
1072 */
1073void
1075{
1076 if (Trace_notify)
1077 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1078
1079 /* If we couldn't possibly be listening, no need to queue anything */
1081 return;
1082
1084}
1085
1086/*
1087 * SQL function: return a set of the channel names this backend is actively
1088 * listening to.
1089 *
1090 * Note: this coding relies on the fact that the localChannelTable cannot
1091 * change within a transaction.
1092 */
1093Datum
1095{
1097 HASH_SEQ_STATUS *status;
1098
1099 /* stuff done only on the first call of the function */
1100 if (SRF_IS_FIRSTCALL())
1101 {
1102 /* create a function context for cross-call persistence */
1104
1105 /* Initialize hash table iteration if we have any channels */
1106 if (localChannelTable != NULL)
1107 {
1108 MemoryContext oldcontext;
1109
1110 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1111 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1113 funcctx->user_fctx = status;
1114 MemoryContextSwitchTo(oldcontext);
1115 }
1116 else
1117 {
1118 funcctx->user_fctx = NULL;
1119 }
1120 }
1121
1122 /* stuff done on every call of the function */
1124 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1125
1126 if (status != NULL)
1127 {
1128 ChannelName *entry;
1129
1130 entry = (ChannelName *) hash_seq_search(status);
1131 if (entry != NULL)
1133 }
1134
1136}
1137
1138/*
1139 * Async_UnlistenOnExit
1140 *
1141 * This is executed at backend exit if we have done any LISTENs in this
1142 * backend. It might not be necessary anymore, if the user UNLISTENed
1143 * everything, but we don't try to detect that case.
1144 */
1145static void
1147{
1150}
1151
1152/*
1153 * AtPrepare_Notify
1154 *
1155 * This is called at the prepare phase of a two-phase
1156 * transaction. Save the state for possible commit later.
1157 */
1158void
1159AtPrepare_Notify(void)
1160{
1161 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1163 ereport(ERROR,
1165 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1166}
1167
1168/*
1169 * PreCommit_Notify
1170 *
1171 * This is called at transaction commit, before actually committing to
1172 * clog.
1173 *
1174 * If there are pending LISTEN actions, make sure we are listed in the
1175 * shared-memory listener array. This must happen before commit to
1176 * ensure we don't miss any notifies from transactions that commit
1177 * just after ours.
1178 *
1179 * If there are outbound notify requests in the pendingNotifies list,
1180 * add them to the global queue. We do that before commit so that
1181 * we can still throw error if we run out of queue space.
1182 */
1183void
1184PreCommit_Notify(void)
1185{
1186 ListCell *p;
1187
1189 return; /* no relevant statements in this xact */
1190
1191 if (Trace_notify)
1192 elog(DEBUG1, "PreCommit_Notify");
1193
1194 /* Preflight for any pending listen/unlisten actions */
1196
1197 if (pendingActions != NULL)
1198 {
1199 /* Ensure we have a local channel table */
1201 /* Create pendingListenActions hash table for this transaction */
1203
1204 /* Stage all the actions this transaction wants to perform */
1205 foreach(p, pendingActions->actions)
1206 {
1208
1209 switch (actrec->action)
1210 {
1211 case LISTEN_LISTEN:
1214 break;
1215 case LISTEN_UNLISTEN:
1217 break;
1220 break;
1221 }
1222 }
1223 }
1224
1225 /* Queue any pending notifies (must happen after the above) */
1226 if (pendingNotifies)
1227 {
1229 bool firstIteration = true;
1230
1231 /*
1232 * Build list of unique channel names being notified for use by
1233 * SignalBackends().
1234 *
1235 * If uniqueChannelHash is available, use it to efficiently get the
1236 * unique channels. Otherwise, fall back to the O(N^2) approach.
1237 */
1240 {
1241 HASH_SEQ_STATUS status;
1243
1245 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1248 channelEntry->channel);
1249 }
1250 else
1251 {
1252 /* O(N^2) approach is better for small number of notifications */
1254 {
1255 char *channel = n->data;
1256 bool found = false;
1257
1258 /* Name present in list? */
1260 {
1261 if (strcmp(oldchan, channel) == 0)
1262 {
1263 found = true;
1264 break;
1265 }
1266 }
1267 /* Add if not already in list */
1268 if (!found)
1271 channel);
1272 }
1273 }
1274
1275 /* Preallocate workspace that will be needed by SignalBackends() */
1276 if (signalPids == NULL)
1278 MaxBackends * sizeof(int32));
1279
1280 if (signalProcnos == NULL)
1282 MaxBackends * sizeof(ProcNumber));
1283
1284 /*
1285 * Make sure that we have an XID assigned to the current transaction.
1286 * GetCurrentTransactionId is cheap if we already have an XID, but not
1287 * so cheap if we don't, and we'd prefer not to do that work while
1288 * holding NotifyQueueLock.
1289 */
1291
1292 /*
1293 * Serialize writers by acquiring a special lock that we hold till
1294 * after commit. This ensures that queue entries appear in commit
1295 * order, and in particular that there are never uncommitted queue
1296 * entries ahead of committed ones, so an uncommitted transaction
1297 * can't block delivery of deliverable notifications.
1298 *
1299 * We use a heavyweight lock so that it'll automatically be released
1300 * after either commit or abort. This also allows deadlocks to be
1301 * detected, though really a deadlock shouldn't be possible here.
1302 *
1303 * The lock is on "database 0", which is pretty ugly but it doesn't
1304 * seem worth inventing a special locktag category just for this.
1305 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1306 * used by the flatfiles mechanism.)
1307 */
1310
1311 /*
1312 * For the direct advancement optimization in SignalBackends(), we
1313 * need to ensure that no other backend can insert queue entries
1314 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1315 * heavyweight lock above provides this guarantee, since it serializes
1316 * all writers.
1317 *
1318 * Note: if the heavyweight lock were ever removed for scalability
1319 * reasons, we could achieve the same guarantee by holding
1320 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1321 * than releasing and reacquiring it for each page as we do below.
1322 */
1323
1324 /* Initialize values to a safe default in case list is empty */
1327
1328 /* Now push the notifications into the queue */
1330 while (nextNotify != NULL)
1331 {
1332 /*
1333 * Add the pending notifications to the queue. We acquire and
1334 * release NotifyQueueLock once per page, which might be overkill
1335 * but it does allow readers to get in while we're doing this.
1336 *
1337 * A full queue is very uncommon and should really not happen,
1338 * given that we have so much space available in the SLRU pages.
1339 * Nevertheless we need to deal with this possibility. Note that
1340 * when we get here we are in the process of committing our
1341 * transaction, but we have not yet committed to clog, so at this
1342 * point in time we can still roll the transaction back.
1343 */
1345 if (firstIteration)
1346 {
1348 firstIteration = false;
1349 }
1351 if (asyncQueueIsFull())
1352 ereport(ERROR,
1354 errmsg("too many notifications in the NOTIFY queue")));
1358 }
1359
1360 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1361 }
1362}
1363
1364/*
1365 * AtCommit_Notify
1366 *
1367 * This is called at transaction commit, after committing to clog.
1368 *
1369 * Apply pending listen/unlisten changes and clear transaction-local state.
1370 *
1371 * If we issued any notifications in the transaction, send signals to
1372 * listening backends (possibly including ourselves) to process them.
1373 * Also, if we filled enough queue pages with new notifies, try to
1374 * advance the queue tail pointer.
1375 */
1376void
1377AtCommit_Notify(void)
1378{
1379 /*
1380 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1381 * return as soon as possible
1382 */
1384 return;
1385
1386 if (Trace_notify)
1387 elog(DEBUG1, "AtCommit_Notify");
1388
1389 /* Apply staged listen/unlisten changes */
1391
1392 /* If no longer listening to anything, get out of listener array */
1395
1396 /*
1397 * Send signals to listening backends. We need do this only if there are
1398 * pending notifies, which were previously added to the shared queue by
1399 * PreCommit_Notify().
1400 */
1401 if (pendingNotifies != NULL)
1403
1404 /*
1405 * If it's time to try to advance the global tail pointer, do that.
1406 *
1407 * (It might seem odd to do this in the sender, when more than likely the
1408 * listeners won't yet have read the messages we just sent. However,
1409 * there's less contention if only the sender does it, and there is little
1410 * need for urgency in advancing the global tail. So this typically will
1411 * be clearing out messages that were sent some time ago.)
1412 */
1413 if (tryAdvanceTail)
1414 {
1415 tryAdvanceTail = false;
1417 }
1418
1419 /* And clean up */
1421}
1422
1423/*
1424 * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1425 *
1426 * This function must make sure we are ready to catch any incoming messages.
1427 */
1428static void
1430{
1431 QueuePosition head;
1432 QueuePosition max;
1434
1435 /*
1436 * Nothing to do if we are already listening to something, nor if we
1437 * already ran this routine in this transaction.
1438 */
1440 return;
1441
1442 if (Trace_notify)
1443 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1444
1445 /*
1446 * Before registering, make sure we will unlisten before dying. (Note:
1447 * this action does not get undone if we abort later.)
1448 */
1450 {
1453 }
1454
1455 /*
1456 * This is our first LISTEN, so establish our pointer.
1457 *
1458 * We set our pointer to the global tail pointer and then move it forward
1459 * over already-committed notifications. This ensures we cannot miss any
1460 * not-yet-committed notifications. We might get a few more but that
1461 * doesn't hurt.
1462 *
1463 * In some scenarios there might be a lot of committed notifications that
1464 * have not yet been pruned away (because some backend is being lazy about
1465 * reading them). To reduce our startup time, we can look at other
1466 * backends and adopt the maximum "pos" pointer of any backend that's in
1467 * our database; any notifications it's already advanced over are surely
1468 * committed and need not be re-examined by us. (We must consider only
1469 * backends connected to our DB, because others will not have bothered to
1470 * check committed-ness of notifications in our DB.)
1471 *
1472 * We need exclusive lock here so we can look at other backends' entries
1473 * and manipulate the list links.
1474 */
1476 head = QUEUE_HEAD;
1477 max = QUEUE_TAIL;
1480 {
1482 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1483 /* Also find last listening backend before this one */
1484 if (i < MyProcNumber)
1485 prevListener = i;
1486 }
1492 /* Insert backend into list of listeners at correct position */
1494 {
1497 }
1498 else
1499 {
1502 }
1504
1505 /* Now we are listed in the global array, so remember we're listening */
1506 amRegisteredListener = true;
1507
1508 /*
1509 * Try to move our pointer forward as far as possible. This will skip
1510 * over already-committed notifications, which we want to do because they
1511 * might be quite stale. Note that we are not yet listening on anything,
1512 * so we won't deliver such notifications to our frontend. Also, although
1513 * our transaction might have executed NOTIFY, those message(s) aren't
1514 * queued yet so we won't skip them here.
1515 */
1516 if (!QUEUE_POS_EQUAL(max, head))
1518}
1519
1520/*
1521 * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1522 *
1523 * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1524 * an entry in localChannelTable, and pre-allocating an entry in the shared
1525 * globalChannelTable with removeOnAbort set. AtCommit_Notify will clear
1526 * removeOnAbort; abort processing will remove entries still marked so.
1527 */
1528static void
1529PrepareTableEntriesForListen(const char *channel)
1530{
1532 GlobalChannelEntry *entry;
1533 bool found;
1535 PendingListenEntry *pending;
1536
1537 /*
1538 * Record in local pending hash that we want to LISTEN, overwriting any
1539 * earlier attempt to UNLISTEN.
1540 */
1541 pending = (PendingListenEntry *)
1543 pending->action = PENDING_LISTEN;
1544
1545 /*
1546 * Ensure that there is an entry for the channel in localChannelTable.
1547 * (Should this fail, we can just roll back.) If the transaction fails
1548 * after this point, we will remove the entry if appropriate during
1549 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1550 * to return TRUE; we assume nothing is going to consult that before
1551 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1552 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1553 * present to ensure they do the right things; see
1554 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1555 */
1557
1558 /* Pre-allocate entry in shared globalChannelTable */
1559 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1560 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1561
1562 if (!found)
1563 {
1564 /* New channel entry, so initialize it to a safe state */
1566 entry->numListeners = 0;
1567 entry->allocatedListeners = 0;
1568 }
1569
1570 /*
1571 * Create listenersArray if entry doesn't have one. It's tempting to fold
1572 * this into the !found case, but this coding allows us to cope in case
1573 * dsa_allocate() failed in an earlier attempt.
1574 */
1575 if (!DsaPointerIsValid(entry->listenersArray))
1576 {
1580 }
1581
1584
1585 /*
1586 * Check if we already have a ListenerEntry (possibly from earlier in this
1587 * transaction)
1588 */
1589 for (int i = 0; i < entry->numListeners; i++)
1590 {
1591 if (listeners[i].procNo == MyProcNumber)
1592 {
1593 /* Already have an entry; leave removeOnAbort as-is */
1595 return;
1596 }
1597 }
1598
1599 /* Need to add a new entry; grow array if necessary */
1600 if (entry->numListeners >= entry->allocatedListeners)
1601 {
1602 int new_size = entry->allocatedListeners * 2;
1605 sizeof(ListenerEntry) * new_size);
1607
1609 entry->listenersArray = new_array;
1613 }
1614
1615 listeners[entry->numListeners].procNo = MyProcNumber;
1616 listeners[entry->numListeners].removeOnAbort = true;
1617 entry->numListeners++;
1618
1620}
1621
1622/*
1623 * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1624 *
1625 * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1626 * we're currently listening (committed or staged). We don't touch
1627 * globalChannelTable yet - the listener keeps receiving signals until
1628 * commit, when the entry is removed.
1629 */
1630static void
1631PrepareTableEntriesForUnlisten(const char *channel)
1632{
1633 PendingListenEntry *pending;
1634
1635 /*
1636 * If the channel name is not in localChannelTable, then we are neither
1637 * listening on it nor preparing to listen on it, so we don't need to
1638 * record an UNLISTEN action.
1639 */
1641 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1642 return;
1643
1644 /*
1645 * Record in local pending hash that we want to UNLISTEN, overwriting any
1646 * earlier attempt to LISTEN. Don't touch localChannelTable or
1647 * globalChannelTable yet - we keep receiving signals until commit.
1648 */
1649 pending = (PendingListenEntry *)
1651 pending->action = PENDING_UNLISTEN;
1652}
1653
1654/*
1655 * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1656 *
1657 * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1658 * about-to-be-listened channels in pendingListenActions.
1659 */
1660static void
1662{
1665 PendingListenEntry *pending;
1666
1667 /*
1668 * Scan localChannelTable, which will have the names of all channels that
1669 * we are listening on or have prepared to listen on. Record an UNLISTEN
1670 * action for each one, overwriting any earlier attempt to LISTEN.
1671 */
1673 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1674 {
1675 pending = (PendingListenEntry *)
1677 pending->action = PENDING_UNLISTEN;
1678 }
1679}
1680
1681/*
1682 * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1683 *
1684 * Decrements numListeners, compacts the array, and frees the entry if empty.
1685 * Sets *entry_ptr to NULL if the entry was deleted.
1686 *
1687 * We could get the listeners pointer from the entry, but all callers
1688 * already have it at hand.
1689 */
1690static void
1693 int idx)
1694{
1695 GlobalChannelEntry *entry = *entry_ptr;
1696
1697 entry->numListeners--;
1698 if (idx < entry->numListeners)
1700 sizeof(ListenerEntry) * (entry->numListeners - idx));
1701
1702 if (entry->numListeners == 0)
1703 {
1706 /* tells caller not to release the entry's lock: */
1707 *entry_ptr = NULL;
1708 }
1709}
1710
1711/*
1712 * ApplyPendingListenActions
1713 *
1714 * Apply, or revert, staged listen/unlisten changes to the local and global
1715 * hash tables.
1716 */
1717static void
1719{
1721 PendingListenEntry *pending;
1722
1723 /* Quick exit if nothing to do */
1725 return;
1726
1727 /* We made a globalChannelTable before building pendingListenActions */
1728 if (globalChannelTable == NULL)
1729 elog(PANIC, "global channel table missing post-commit/abort");
1730
1731 /* For each staged action ... */
1733 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1734 {
1736 GlobalChannelEntry *entry;
1737 bool removeLocal = true;
1738 bool foundListener = false;
1739
1740 /*
1741 * Find the global entry for this channel. If isCommit, it had better
1742 * exist (it was created in PreCommit). In an abort, it might not
1743 * exist, in which case we are not listening and should discard any
1744 * local entry that PreCommit may have managed to create.
1745 */
1746 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1747 entry = dshash_find(globalChannelTable, &key, true);
1748 if (entry != NULL)
1749 {
1750 /* Scan entry to find the ListenerEntry for this backend */
1752
1755
1756 for (int i = 0; i < entry->numListeners; i++)
1757 {
1758 if (listeners[i].procNo != MyProcNumber)
1759 continue;
1760 foundListener = true;
1761 if (isCommit)
1762 {
1763 if (pending->action == PENDING_LISTEN)
1764 {
1765 /*
1766 * LISTEN being committed: entry is now permanent.
1767 * localChannelTable entry was created during
1768 * PreCommit and should be kept.
1769 */
1770 listeners[i].removeOnAbort = false;
1771 removeLocal = false;
1772 }
1773 else
1774 {
1775 /*
1776 * UNLISTEN being committed: remove pre-allocated
1777 * entries from both tables.
1778 */
1780 }
1781 }
1782 else
1783 {
1784 /*
1785 * Note: this part is reachable only if the transaction
1786 * aborts after PreCommit_Notify() has made some
1787 * pendingListenActions entries, so it's pretty hard to
1788 * test.
1789 */
1790 if (listeners[i].removeOnAbort)
1791 {
1792 /*
1793 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1794 * so remove pre-allocated entries from both tables.
1795 */
1797 }
1798 else
1799 {
1800 /*
1801 * Entry predates this transaction, so keep the
1802 * localChannelTable entry.
1803 */
1804 removeLocal = false;
1805 }
1806 }
1807 break; /* there shouldn't be another match */
1808 }
1809
1810 /* We might have already released the entry by removing it */
1811 if (entry != NULL)
1813 }
1814
1815 /*
1816 * If we're committing a LISTEN action, we should have found a
1817 * matching ListenerEntry, but otherwise it's okay if we didn't.
1818 */
1819 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1820 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1821 pending->channel, MyProcNumber);
1822
1823 /*
1824 * If we did not find a globalChannelTable entry for our backend, or
1825 * if we are unlistening, remove any localChannelTable entry that may
1826 * exist. (Note in particular that this cleans up if we created a
1827 * localChannelTable entry and then failed while trying to create a
1828 * globalChannelTable entry.)
1829 */
1832 HASH_REMOVE, NULL);
1833 }
1834}
1835
1836/*
1837 * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1838 *
1839 * Remove this backend from all channels in the shared global table.
1840 */
1841static void
1843{
1844 dshash_seq_status status;
1845 GlobalChannelEntry *entry;
1846
1847 if (Trace_notify)
1848 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1849
1850 /* Clear our local cache (not really necessary, but be consistent) */
1851 if (localChannelTable != NULL)
1852 {
1855 }
1856
1857 /* Now remove our entries from the shared globalChannelTable */
1858 if (globalChannelTable == NULL)
1859 return;
1860
1861 dshash_seq_init(&status, globalChannelTable, true);
1862 while ((entry = dshash_seq_next(&status)) != NULL)
1863 {
1865
1866 if (entry->key.dboid != MyDatabaseId)
1867 continue; /* not relevant */
1868
1871
1872 for (int i = 0; i < entry->numListeners; i++)
1873 {
1874 if (listeners[i].procNo == MyProcNumber)
1875 {
1876 entry->numListeners--;
1877 if (i < entry->numListeners)
1878 memmove(&listeners[i], &listeners[i + 1],
1879 sizeof(ListenerEntry) * (entry->numListeners - i));
1880
1881 if (entry->numListeners == 0)
1882 {
1884 dshash_delete_current(&status);
1885 }
1886 break;
1887 }
1888 }
1889 }
1890 dshash_seq_term(&status);
1891}
1892
1893/*
1894 * Test whether we are actively listening on the given channel name.
1895 *
1896 * Note: this function is executed for every notification found in the queue.
1897 */
1898static bool
1899IsListeningOn(const char *channel)
1900{
1901 if (localChannelTable == NULL)
1902 return false;
1903
1904 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1905}
1906
1907/*
1908 * Remove our entry from the listeners array when we are no longer listening
1909 * on any channel. NB: must not fail if we're already not listening.
1910 */
1911static void
1913{
1914 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1915
1916 if (!amRegisteredListener) /* nothing to do */
1917 return;
1918
1919 /*
1920 * Need exclusive lock here to manipulate list links.
1921 */
1923 /* Mark our entry as invalid */
1928 /* and remove it from the list */
1931 else
1932 {
1934 {
1936 {
1938 break;
1939 }
1940 }
1941 }
1944
1945 /* mark ourselves as no longer listed in the global array */
1946 amRegisteredListener = false;
1947}
1948
1949/*
1950 * Test whether there is room to insert more notification messages.
1951 *
1952 * Caller must hold at least shared NotifyQueueLock.
1953 */
1954static bool
1955asyncQueueIsFull(void)
1956{
1957 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1959 int64 occupied = headPage - tailPage;
1960
1962}
1963
1964/*
1965 * Advance the QueuePosition to the next entry, assuming that the current
1966 * entry is of length entryLength. If we jump to a new page the function
1967 * returns true, else false.
1968 */
1969static bool
1970asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1971{
1972 int64 pageno = QUEUE_POS_PAGE(*position);
1973 int offset = QUEUE_POS_OFFSET(*position);
1974 bool pageJump = false;
1975
1976 /*
1977 * Move to the next writing position: First jump over what we have just
1978 * written or read.
1979 */
1980 offset += entryLength;
1981 Assert(offset <= QUEUE_PAGESIZE);
1982
1983 /*
1984 * In a second step check if another entry can possibly be written to the
1985 * page. If so, stay here, we have reached the next position. If not, then
1986 * we need to move on to the next page.
1987 */
1989 {
1990 pageno++;
1991 offset = 0;
1992 pageJump = true;
1993 }
1994
1995 SET_QUEUE_POS(*position, pageno, offset);
1996 return pageJump;
1997}
1998
1999/*
2000 * Fill the AsyncQueueEntry at *qe with an outbound notification message.
2001 */
2002static void
2004{
2005 size_t channellen = n->channel_len;
2006 size_t payloadlen = n->payload_len;
2007 int entryLength;
2008
2011
2012 /* The terminators are already included in AsyncQueueEntryEmptySize */
2015 qe->length = entryLength;
2016 qe->dboid = MyDatabaseId;
2017 qe->xid = GetCurrentTransactionId();
2018 qe->srcPid = MyProcPid;
2019 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2020}
2021
2022/*
2023 * Add pending notifications to the queue.
2024 *
2025 * We go page by page here, i.e. we stop once we have to go to a new page but
2026 * we will be called again and then fill that next page. If an entry does not
2027 * fit into the current page, we write a dummy entry with an InvalidOid as the
2028 * database OID in order to fill the page. So every page is always used up to
2029 * the last byte which simplifies reading the page later.
2030 *
2031 * We are passed the list cell (in pendingNotifies->events) containing the next
2032 * notification to write and return the first still-unwritten cell back.
2033 * Eventually we will return NULL indicating all is done.
2034 *
2035 * We are holding NotifyQueueLock already from the caller and grab
2036 * page specific SLRU bank lock locally in this function.
2037 */
2038static ListCell *
2040{
2043 int64 pageno;
2044 int offset;
2045 int slotno;
2047
2048 /*
2049 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2050 * memory upon exiting. The reason for this is that if we have to advance
2051 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2052 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2053 * subsequent insertions would try to put entries into a page that slru.c
2054 * thinks doesn't exist yet.) So, use a local position variable. Note
2055 * that if we do fail, any already-inserted queue entries are forgotten;
2056 * this is okay, since they'd be useless anyway after our transaction
2057 * rolls back.
2058 */
2060
2061 /*
2062 * If this is the first write since the postmaster started, we need to
2063 * initialize the first page of the async SLRU. Otherwise, the current
2064 * page should be initialized already, so just fetch it.
2065 */
2066 pageno = QUEUE_POS_PAGE(queue_head);
2068
2069 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2071
2074 else
2075 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2076
2077 /* Note we mark the page dirty before writing in it */
2078 NotifyCtl->shared->page_dirty[slotno] = true;
2079
2080 while (nextNotify != NULL)
2081 {
2083
2084 /* Construct a valid queue entry in local variable qe */
2086
2087 offset = QUEUE_POS_OFFSET(queue_head);
2088
2089 /* Check whether the entry really fits on the current page */
2090 if (offset + qe.length <= QUEUE_PAGESIZE)
2091 {
2092 /* OK, so advance nextNotify past this item */
2094 }
2095 else
2096 {
2097 /*
2098 * Write a dummy entry to fill up the page. Actually readers will
2099 * only check dboid and since it won't match any reader's database
2100 * OID, they will ignore this entry and move on.
2101 */
2102 qe.length = QUEUE_PAGESIZE - offset;
2103 qe.dboid = InvalidOid;
2105 qe.data[0] = '\0'; /* empty channel */
2106 qe.data[1] = '\0'; /* empty payload */
2107 }
2108
2109 /* Now copy qe into the shared buffer page */
2110 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2111 &qe,
2112 qe.length);
2113
2114 /* Advance queue_head appropriately, and detect if page is full */
2115 if (asyncQueueAdvance(&(queue_head), qe.length))
2116 {
2117 LWLock *lock;
2118
2119 pageno = QUEUE_POS_PAGE(queue_head);
2120 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2121 if (lock != prevlock)
2122 {
2125 prevlock = lock;
2126 }
2127
2128 /*
2129 * Page is full, so we're done here, but first fill the next page
2130 * with zeroes. The reason to do this is to ensure that slru.c's
2131 * idea of the head page is always the same as ours, which avoids
2132 * boundary problems in SimpleLruTruncate. The test in
2133 * asyncQueueIsFull() ensured that there is room to create this
2134 * page without overrunning the queue.
2135 */
2137
2138 /*
2139 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2140 * set flag to remember that we should try to advance the tail
2141 * pointer (we don't want to actually do that right here).
2142 */
2144 tryAdvanceTail = true;
2145
2146 /* And exit the loop */
2147 break;
2148 }
2149 }
2150
2151 /* Success, so update the global QUEUE_HEAD */
2153
2155
2156 return nextNotify;
2157}
2158
2159/*
2160 * SQL function to return the fraction of the notification queue currently
2161 * occupied.
2162 */
2163Datum
2165{
2166 double usage;
2167
2168 /* Advance the queue tail so we don't report a too-large result */
2170
2174
2176}
2177
2178/*
2179 * Return the fraction of the queue that is currently occupied.
2180 *
2181 * The caller must hold NotifyQueueLock in (at least) shared mode.
2182 *
2183 * Note: we measure the distance to the logical tail page, not the physical
2184 * tail page. In some sense that's wrong, but the relative position of the
2185 * physical tail is affected by details such as SLRU segment boundaries,
2186 * so that a result based on that is unpleasantly unstable.
2187 */
2188static double
2189asyncQueueUsage(void)
2190{
2191 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2193 int64 occupied = headPage - tailPage;
2194
2195 if (occupied == 0)
2196 return (double) 0; /* fast exit for common case */
2197
2198 return (double) occupied / (double) max_notify_queue_pages;
2199}
2200
2201/*
2202 * Check whether the queue is at least half full, and emit a warning if so.
2203 *
2204 * This is unlikely given the size of the queue, but possible.
2205 * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2206 *
2207 * Caller must hold exclusive NotifyQueueLock.
2208 */
2209static void
2211{
2212 double fillDegree;
2213 TimestampTz t;
2214
2216 if (fillDegree < 0.5)
2217 return;
2218
2219 t = GetCurrentTimestamp();
2220
2223 {
2226
2228 {
2230 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2233 }
2234
2236 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2237 (minPid != InvalidPid ?
2238 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2239 : 0),
2240 (minPid != InvalidPid ?
2241 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2242 : 0)));
2243
2245 }
2246}
2247
2248/*
2249 * Send signals to listening backends.
2250 *
2251 * Normally we signal only backends that are interested in the notifies that
2252 * we just sent. However, that will leave idle listeners falling further and
2253 * further behind. Waken them anyway if they're far enough behind, so they'll
2254 * advance their queue position pointers, allowing the global tail to advance.
2255 *
2256 * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2257 *
2258 * This is called during CommitTransaction(), so it's important for it
2259 * to have very low probability of failure.
2260 */
2261static void
2262SignalBackends(void)
2263{
2264 int count;
2265
2266 /* Can't get here without PreCommit_Notify having made the global table */
2268
2269 /* It should have set up these arrays, too */
2271
2272 /*
2273 * Identify backends that we need to signal. We don't want to send
2274 * signals while holding the NotifyQueueLock, so this part just builds a
2275 * list of target PIDs in signalPids[] and signalProcnos[].
2276 */
2277 count = 0;
2278
2280
2281 /* Scan each channel name that we notified in this transaction */
2283 {
2285 GlobalChannelEntry *entry;
2287
2288 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2289 entry = dshash_find(globalChannelTable, &key, false);
2290 if (entry == NULL)
2291 continue; /* nobody is listening */
2292
2294 entry->listenersArray);
2295
2296 /*
2297 * Identify listeners that now need waking, add them to arrays.
2298 *
2299 * Note that we signal listeners regardless of the state of their
2300 * removeOnAbort flags. Hence a new listener that reached PreCommit,
2301 * but then failed before AtCommit_Notify, can receive a signal even
2302 * though it was never really listening. This is okay because it will
2303 * not do anything in response to that signal. If we did not do it
2304 * like this then a new listener might miss some messages due to the
2305 * direct-advance logic below.
2306 */
2307 for (int j = 0; j < entry->numListeners; j++)
2308 {
2309 ProcNumber i = listeners[j].procNo;
2310 int32 pid;
2311 QueuePosition pos;
2312
2314 continue; /* already signaled, no need to repeat */
2315
2316 pid = QUEUE_BACKEND_PID(i);
2317 pos = QUEUE_BACKEND_POS(i);
2318
2319 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2320 continue; /* it's fully caught up already */
2321
2322 Assert(pid != InvalidPid);
2323
2325 signalPids[count] = pid;
2326 signalProcnos[count] = i;
2327 count++;
2328 }
2329
2331 }
2332
2333 /*
2334 * Scan all listeners. Any that are not already pending wakeup must not
2335 * be interested in our notifications (else we'd have set their wakeup
2336 * flags above). Check to see if we can directly advance their queue
2337 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2338 * them anyway so they will catch up.
2339 */
2341 {
2342 int32 pid;
2343 QueuePosition pos;
2344
2346 continue;
2347
2348 /* If it's currently advancing, we should not touch it */
2350 continue;
2351
2352 pid = QUEUE_BACKEND_PID(i);
2353 pos = QUEUE_BACKEND_POS(i);
2354
2355 /*
2356 * We can directly advance the other backend's queue pointer if it's
2357 * not currently advancing (else there are race conditions), and its
2358 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2359 * it miss some older messages), and we'd not be moving the pointer
2360 * backward.
2361 */
2364 {
2365 /* We can directly advance its pointer past what we wrote */
2367 }
2370 {
2371 /* It's idle and far behind, so wake it up */
2372 Assert(pid != InvalidPid);
2373
2375 signalPids[count] = pid;
2376 signalProcnos[count] = i;
2377 count++;
2378 }
2379 }
2380
2382
2383 /* Now send signals */
2384 for (int i = 0; i < count; i++)
2385 {
2386 int32 pid = signalPids[i];
2387
2388 /*
2389 * If we are signaling our own process, no need to involve the kernel;
2390 * just set the flag directly.
2391 */
2392 if (pid == MyProcPid)
2393 {
2395 continue;
2396 }
2397
2398 /*
2399 * Note: assuming things aren't broken, a signal failure here could
2400 * only occur if the target backend exited since we released
2401 * NotifyQueueLock; which is unlikely but certainly possible. So we
2402 * just log a low-level debug message if it happens.
2403 */
2405 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2406 }
2407}
2408
2409/*
2410 * AtAbort_Notify
2411 *
2412 * This is called at transaction abort.
2413 *
2414 * Revert any staged listen/unlisten changes and clean up transaction state.
2415 * This only does anything if we abort after PreCommit_Notify has staged
2416 * some entries.
2417 */
2418void
2419AtAbort_Notify(void)
2420{
2421 /* Revert staged listen/unlisten changes */
2423
2424 /* If we're no longer listening on anything, unregister */
2427
2428 /* And clean up */
2430}
2431
2432/*
2433 * AtSubCommit_Notify() --- Take care of subtransaction commit.
2434 *
2435 * Reassign all items in the pending lists to the parent transaction.
2436 */
2437void
2439{
2440 int my_level = GetCurrentTransactionNestLevel();
2441
2442 /* If there are actions at our nesting level, we must reparent them. */
2443 if (pendingActions != NULL &&
2444 pendingActions->nestingLevel >= my_level)
2445 {
2446 if (pendingActions->upper == NULL ||
2447 pendingActions->upper->nestingLevel < my_level - 1)
2448 {
2449 /* nothing to merge; give the whole thing to the parent */
2451 }
2452 else
2453 {
2455
2457
2458 /*
2459 * Mustn't try to eliminate duplicates here --- see queue_listen()
2460 */
2463 childPendingActions->actions);
2465 }
2466 }
2467
2468 /* If there are notifies at our nesting level, we must reparent them. */
2469 if (pendingNotifies != NULL &&
2470 pendingNotifies->nestingLevel >= my_level)
2471 {
2472 Assert(pendingNotifies->nestingLevel == my_level);
2473
2474 if (pendingNotifies->upper == NULL ||
2475 pendingNotifies->upper->nestingLevel < my_level - 1)
2476 {
2477 /* nothing to merge; give the whole thing to the parent */
2479 }
2480 else
2481 {
2482 /*
2483 * Formerly, we didn't bother to eliminate duplicates here, but
2484 * now we must, else we fall foul of "Assert(!found)", either here
2485 * or during a later attempt to build the parent-level hashtable.
2486 */
2488 ListCell *l;
2489
2491 /* Insert all the subxact's events into parent, except for dups */
2492 foreach(l, childPendingNotifies->events)
2493 {
2495
2498 }
2500 }
2501 }
2502}
2503
2504/*
2505 * AtSubAbort_Notify() --- Take care of subtransaction abort.
2506 */
2507void
2509{
2510 int my_level = GetCurrentTransactionNestLevel();
2511
2512 /*
2513 * All we have to do is pop the stack --- the actions/notifies made in
2514 * this subxact are no longer interesting, and the space will be freed
2515 * when CurTransactionContext is recycled. We still have to free the
2516 * ActionList and NotificationList objects themselves, though, because
2517 * those are allocated in TopTransactionContext.
2518 *
2519 * Note that there might be no entries at all, or no entries for the
2520 * current subtransaction level, either because none were ever created, or
2521 * because we reentered this routine due to trouble during subxact abort.
2522 */
2523 while (pendingActions != NULL &&
2524 pendingActions->nestingLevel >= my_level)
2525 {
2527
2530 }
2531
2532 while (pendingNotifies != NULL &&
2533 pendingNotifies->nestingLevel >= my_level)
2534 {
2536
2539 }
2540}
2541
2542/*
2543 * HandleNotifyInterrupt
2544 *
2545 * Signal handler portion of interrupt handling. Let the backend know
2546 * that there's a pending notify interrupt. If we're currently reading
2547 * from the client, this will interrupt the read and
2548 * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2549 */
2550void
2552{
2553 /*
2554 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2555 * you do here.
2556 */
2557
2558 /* signal that work needs to be done */
2560
2561 /* latch will be set by procsignal_sigusr1_handler */
2562}
2563
2564/*
2565 * ProcessNotifyInterrupt
2566 *
2567 * This is called if we see notifyInterruptPending set, just before
2568 * transmitting ReadyForQuery at the end of a frontend command, and
2569 * also if a notify signal occurs while reading from the frontend.
2570 * HandleNotifyInterrupt() will cause the read to be interrupted
2571 * via the process's latch, and this routine will get called.
2572 * If we are truly idle (ie, *not* inside a transaction block),
2573 * process the incoming notifies.
2574 *
2575 * If "flush" is true, force any frontend messages out immediately.
2576 * This can be false when being called at the end of a frontend command,
2577 * since we'll flush after sending ReadyForQuery.
2578 */
2579void
2580ProcessNotifyInterrupt(bool flush)
2581{
2583 return; /* not really idle */
2584
2585 /* Loop in case another signal arrives while sending messages */
2587 ProcessIncomingNotify(flush);
2588}
2589
2590
2591/*
2592 * Read all pending notifications from the queue, and deliver appropriate
2593 * ones to my frontend. Stop when we reach queue head or an uncommitted
2594 * notification.
2595 */
2596static void
2598{
2599 QueuePosition pos;
2600 QueuePosition head;
2601 Snapshot snapshot;
2602
2603 /*
2604 * Fetch current state, indicate to others that we have woken up, and that
2605 * we are in process of advancing our position.
2606 */
2608 /* Assert checks that we have a valid state entry */
2612 head = QUEUE_HEAD;
2613
2614 if (QUEUE_POS_EQUAL(pos, head))
2615 {
2616 /* Nothing to do, we have read all notifications already. */
2618 return;
2619 }
2620
2623
2624 /*----------
2625 * Get snapshot we'll use to decide which xacts are still in progress.
2626 * This is trickier than it might seem, because of race conditions.
2627 * Consider the following example:
2628 *
2629 * Backend 1: Backend 2:
2630 *
2631 * transaction starts
2632 * UPDATE foo SET ...;
2633 * NOTIFY foo;
2634 * commit starts
2635 * queue the notify message
2636 * transaction starts
2637 * LISTEN foo; -- first LISTEN in session
2638 * SELECT * FROM foo WHERE ...;
2639 * commit to clog
2640 * commit starts
2641 * add backend 2 to array of listeners
2642 * advance to queue head (this code)
2643 * commit to clog
2644 *
2645 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2646 * wasn't committed yet. Ideally we'd ensure that client 2 would
2647 * eventually get transaction 1's notify message, but there's no way
2648 * to do that; until we're in the listener array, there's no guarantee
2649 * that the notify message doesn't get removed from the queue.
2650 *
2651 * Therefore the coding technique transaction 2 is using is unsafe:
2652 * applications must commit a LISTEN before inspecting database state,
2653 * if they want to ensure they will see notifications about subsequent
2654 * changes to that state.
2655 *
2656 * What we do guarantee is that we'll see all notifications from
2657 * transactions committing after the snapshot we take here.
2658 * BecomeRegisteredListener has already added us to the listener array,
2659 * so no not-yet-committed messages can be removed from the queue
2660 * before we see them.
2661 *----------
2662 */
2663 snapshot = RegisterSnapshot(GetLatestSnapshot());
2664
2665 /*
2666 * It is possible that we fail while trying to send a message to our
2667 * frontend (for example, because of encoding conversion failure). If
2668 * that happens it is critical that we not try to send the same message
2669 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2670 * ERRORs to FATAL, causing the client connection to be closed on error.
2671 *
2672 * We used to only skip over the offending message and try to soldier on,
2673 * but it was somewhat questionable to lose a notification and give the
2674 * client an ERROR instead. A client application is not be prepared for
2675 * that and can't tell that a notification was missed. It was also not
2676 * very useful in practice because notifications are often processed while
2677 * a connection is idle and reading a message from the client, and in that
2678 * state, any error is upgraded to FATAL anyway. Closing the connection
2679 * is a clear signal to the application that it might have missed
2680 * notifications.
2681 */
2682 {
2684 bool reachedStop;
2685
2686 ExitOnAnyError = true;
2687
2688 do
2689 {
2690 /*
2691 * Process messages up to the stop position, end of page, or an
2692 * uncommitted message.
2693 *
2694 * Our stop position is what we found to be the head's position
2695 * when we entered this function. It might have changed already.
2696 * But if it has, we will receive (or have already received and
2697 * queued) another signal and come here again.
2698 *
2699 * We are not holding NotifyQueueLock here! The queue can only
2700 * extend beyond the head pointer (see above) and we leave our
2701 * backend's pointer where it is so nobody will truncate or
2702 * rewrite pages under us. Especially we don't want to hold a lock
2703 * while sending the notifications to the frontend.
2704 */
2705 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2706 } while (!reachedStop);
2707
2708 /* Update shared state */
2713
2715 }
2716
2717 /* Done with snapshot */
2718 UnregisterSnapshot(snapshot);
2719}
2720
2721/*
2722 * Fetch notifications from the shared queue, beginning at position current,
2723 * and deliver relevant ones to my frontend.
2724 *
2725 * The function returns true once we have reached the stop position or an
2726 * uncommitted notification, and false if we have finished with the page.
2727 * In other words: once it returns true there is no need to look further.
2728 * The QueuePosition *current is advanced past all processed messages.
2729 */
2730static bool
2732 QueuePosition stop,
2733 Snapshot snapshot)
2734{
2735 int64 curpage = QUEUE_POS_PAGE(*current);
2736 int slotno;
2737 char *page_buffer;
2738 bool reachedStop = false;
2739 bool reachedEndOfPage;
2740
2741 /*
2742 * We copy the entries into a local buffer to avoid holding the SLRU lock
2743 * while we transmit them to our frontend. The local buffer must be
2744 * adequately aligned.
2745 */
2747 char *local_buf_end = local_buf;
2748
2750 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2751
2752 do
2753 {
2754 QueuePosition thisentry = *current;
2756
2757 if (QUEUE_POS_EQUAL(thisentry, stop))
2758 break;
2759
2760 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2761
2762 /*
2763 * Advance *current over this message, possibly to the next page. As
2764 * noted in the comments for asyncQueueReadAllNotifications, we must
2765 * do this before possibly failing while processing the message.
2766 */
2767 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2768
2769 /* Ignore messages destined for other databases */
2770 if (qe->dboid == MyDatabaseId)
2771 {
2772 if (XidInMVCCSnapshot(qe->xid, snapshot))
2773 {
2774 /*
2775 * The source transaction is still in progress, so we can't
2776 * process this message yet. Break out of the loop, but first
2777 * back up *current so we will reprocess the message next
2778 * time. (Note: it is unlikely but not impossible for
2779 * TransactionIdDidCommit to fail, so we can't really avoid
2780 * this advance-then-back-up behavior when dealing with an
2781 * uncommitted message.)
2782 *
2783 * Note that we must test XidInMVCCSnapshot before we test
2784 * TransactionIdDidCommit, else we might return a message from
2785 * a transaction that is not yet visible to snapshots; compare
2786 * the comments at the head of heapam_visibility.c.
2787 *
2788 * Also, while our own xact won't be listed in the snapshot,
2789 * we need not check for TransactionIdIsCurrentTransactionId
2790 * because our transaction cannot (yet) have queued any
2791 * messages.
2792 */
2793 *current = thisentry;
2794 reachedStop = true;
2795 break;
2796 }
2797
2798 /*
2799 * Quick check for the case that we're not listening on any
2800 * channels, before calling TransactionIdDidCommit(). This makes
2801 * that case a little faster, but more importantly, it ensures
2802 * that if there's a bad entry in the queue for which
2803 * TransactionIdDidCommit() fails for some reason, we can skip
2804 * over it on the first LISTEN in a session, and not get stuck on
2805 * it indefinitely. (This is a little trickier than it looks: it
2806 * works because BecomeRegisteredListener runs this code before we
2807 * have made the first entry in localChannelTable.)
2808 */
2810 continue;
2811
2812 if (TransactionIdDidCommit(qe->xid))
2813 {
2814 memcpy(local_buf_end, qe, qe->length);
2815 local_buf_end += qe->length;
2816 }
2817 else
2818 {
2819 /*
2820 * The source transaction aborted or crashed, so we just
2821 * ignore its notifications.
2822 */
2823 }
2824 }
2825
2826 /* Loop back if we're not at end of page */
2827 } while (!reachedEndOfPage);
2828
2829 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2831
2832 /*
2833 * Now that we have let go of the SLRU bank lock, send the notifications
2834 * to our backend
2835 */
2837 for (char *p = local_buf; p < local_buf_end;)
2838 {
2840
2841 /* qe->data is the null-terminated channel name */
2842 char *channel = qe->data;
2843
2844 if (IsListeningOn(channel))
2845 {
2846 /* payload follows channel name */
2847 char *payload = qe->data + strlen(channel) + 1;
2848
2849 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2850 }
2851
2852 p += qe->length;
2853 }
2854
2855 if (QUEUE_POS_EQUAL(*current, stop))
2856 reachedStop = true;
2857
2858 return reachedStop;
2859}
2860
2861/*
2862 * Advance the shared queue tail variable to the minimum of all the
2863 * per-backend tail pointers. Truncate pg_notify space if possible.
2864 *
2865 * This is (usually) called during CommitTransaction(), so it's important for
2866 * it to have very low probability of failure.
2867 */
2868static void
2870{
2871 QueuePosition min;
2874 int64 boundary;
2875
2876 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2878
2879 /*
2880 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2881 * (ie, exactly match at least one backend's queue position), so it must
2882 * be updated atomically with the actual computation. Since v13, we could
2883 * get away with not doing it like that, but it seems prudent to keep it
2884 * so.
2885 *
2886 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2887 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2888 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2889 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2890 * there are pages we can truncate but haven't yet finished doing so.
2891 *
2892 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2893 * performing SimpleLruTruncate. This is OK because no backend will try
2894 * to access the pages we are in the midst of truncating.
2895 */
2897 min = QUEUE_HEAD;
2899 {
2901 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2902 }
2903 QUEUE_TAIL = min;
2906
2907 /*
2908 * We can truncate something if the global tail advanced across an SLRU
2909 * segment boundary.
2910 *
2911 * XXX it might be better to truncate only once every several segments, to
2912 * reduce the number of directory scans.
2913 */
2916 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2917 {
2918 /*
2919 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2920 * release the lock again.
2921 */
2923
2927 }
2928
2930}
2931
2932/*
2933 * AsyncNotifyFreezeXids
2934 *
2935 * Prepare the async notification queue for CLOG truncation by freezing
2936 * transaction IDs that are about to become inaccessible.
2937 *
2938 * This function is called by VACUUM before advancing datfrozenxid. It scans
2939 * the notification queue and replaces XIDs that would become inaccessible
2940 * after CLOG truncation with special markers:
2941 * - Committed transactions are set to FrozenTransactionId
2942 * - Aborted/crashed transactions are set to InvalidTransactionId
2943 *
2944 * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2945 * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2946 * (or it would have held back newFrozenXid through ProcArray).
2947 * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2948 * either aborted explicitly or crashed, and we can safely mark it invalid.
2949 */
2950void
2952{
2953 QueuePosition pos;
2954 QueuePosition head;
2955 int64 curpage = -1;
2956 int slotno = -1;
2957 char *page_buffer = NULL;
2958 bool page_dirty = false;
2959
2960 /*
2961 * Acquire locks in the correct order to avoid deadlocks. As per the
2962 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2963 * bank locks.
2964 *
2965 * We only need SHARED mode since we're just reading the head/tail
2966 * positions, not modifying them.
2967 */
2970
2971 pos = QUEUE_TAIL;
2972 head = QUEUE_HEAD;
2973
2974 /* Release NotifyQueueLock early, we only needed to read the positions */
2976
2977 /*
2978 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2979 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2980 * we're working.
2981 */
2982 while (!QUEUE_POS_EQUAL(pos, head))
2983 {
2985 TransactionId xid;
2986 int64 pageno = QUEUE_POS_PAGE(pos);
2987 int offset = QUEUE_POS_OFFSET(pos);
2988
2989 /* If we need a different page, release old lock and get new one */
2990 if (pageno != curpage)
2991 {
2992 LWLock *lock;
2993
2994 /* Release previous page if any */
2995 if (slotno >= 0)
2996 {
2997 if (page_dirty)
2998 {
2999 NotifyCtl->shared->page_dirty[slotno] = true;
3000 page_dirty = false;
3001 }
3003 }
3004
3005 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3007 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3008 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3009 curpage = pageno;
3010 }
3011
3012 qe = (AsyncQueueEntry *) (page_buffer + offset);
3013 xid = qe->xid;
3014
3015 if (TransactionIdIsNormal(xid) &&
3017 {
3018 if (TransactionIdDidCommit(xid))
3019 {
3020 qe->xid = FrozenTransactionId;
3021 page_dirty = true;
3022 }
3023 else
3024 {
3025 qe->xid = InvalidTransactionId;
3026 page_dirty = true;
3027 }
3028 }
3029
3030 /* Advance to next entry */
3031 asyncQueueAdvance(&pos, qe->length);
3032 }
3033
3034 /* Release final page lock if we acquired one */
3035 if (slotno >= 0)
3036 {
3037 if (page_dirty)
3038 NotifyCtl->shared->page_dirty[slotno] = true;
3040 }
3041
3043}
3044
3045/*
3046 * ProcessIncomingNotify
3047 *
3048 * Scan the queue for arriving notifications and report them to the front
3049 * end. The notifications might be from other sessions, or our own;
3050 * there's no need to distinguish here.
3051 *
3052 * If "flush" is true, force any frontend messages out immediately.
3053 *
3054 * NOTE: since we are outside any transaction, we must create our own.
3055 */
3056static void
3057ProcessIncomingNotify(bool flush)
3058{
3059 /* We *must* reset the flag */
3060 notifyInterruptPending = false;
3061
3062 /* Do nothing else if we aren't actively listening */
3064 return;
3065
3066 if (Trace_notify)
3067 elog(DEBUG1, "ProcessIncomingNotify");
3068
3069 set_ps_display("notify interrupt");
3070
3071 /*
3072 * We must run asyncQueueReadAllNotifications inside a transaction, else
3073 * bad things happen if it gets an error.
3074 */
3076
3078
3080
3081 /*
3082 * If this isn't an end-of-command case, we must flush the notify messages
3083 * to ensure frontend gets them promptly.
3084 */
3085 if (flush)
3086 pq_flush();
3087
3088 set_ps_display("idle");
3089
3090 if (Trace_notify)
3091 elog(DEBUG1, "ProcessIncomingNotify: done");
3092}
3093
3094/*
3095 * Send NOTIFY message to my front end.
3096 */
3097void
3098NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3099{
3101 {
3103
3105 pq_sendint32(&buf, srcPid);
3106 pq_sendstring(&buf, channel);
3107 pq_sendstring(&buf, payload);
3109
3110 /*
3111 * NOTE: we do not do pq_flush() here. Some level of caller will
3112 * handle it later, allowing this message to be combined into a packet
3113 * with other ones.
3114 */
3115 }
3116 else
3117 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3118}
3119
3120/* Does pendingNotifies include a match for the given event? */
3121static bool
3123{
3124 if (pendingNotifies == NULL)
3125 return false;
3126
3128 {
3129 /* Use the hash table to probe for a match */
3131 &n,
3132 HASH_FIND,
3133 NULL))
3134 return true;
3135 }
3136 else
3137 {
3138 /* Must scan the event list */
3139 ListCell *l;
3140
3141 foreach(l, pendingNotifies->events)
3142 {
3144
3145 if (n->channel_len == oldn->channel_len &&
3146 n->payload_len == oldn->payload_len &&
3147 memcmp(n->data, oldn->data,
3148 n->channel_len + n->payload_len + 2) == 0)
3149 return true;
3150 }
3151 }
3152
3153 return false;
3154}
3155
3156/*
3157 * Add a notification event to a pre-existing pendingNotifies list.
3158 *
3159 * Because pendingNotifies->events is already nonempty, this works
3160 * correctly no matter what CurrentMemoryContext is.
3161 */
3162static void
3164{
3166
3167 /* Create the hash tables if it's time to */
3170 {
3172 ListCell *l;
3173
3174 /* Create the hash table */
3175 hash_ctl.keysize = sizeof(Notification *);
3176 hash_ctl.entrysize = sizeof(struct NotificationHash);
3181 hash_create("Pending Notifies",
3182 256L,
3183 &hash_ctl,
3185
3186 /* Create the unique channel name table */
3188 hash_ctl.keysize = NAMEDATALEN;
3189 hash_ctl.entrysize = sizeof(ChannelName);
3192 hash_create("Pending Notify Channel Names",
3193 64L,
3194 &hash_ctl,
3196
3197 /* Insert all the already-existing events */
3198 foreach(l, pendingNotifies->events)
3199 {
3201 char *channel = oldn->data;
3202 bool found;
3203
3205 &oldn,
3206 HASH_ENTER,
3207 &found);
3208 Assert(!found);
3209
3210 /* Add channel name to uniqueChannelHash; might be there already */
3212 channel,
3213 HASH_ENTER,
3214 NULL);
3215 }
3216 }
3217
3218 /* Add new event to the list, in order */
3220
3221 /* Add event to the hash tables if needed */
3223 {
3224 char *channel = n->data;
3225 bool found;
3226
3228 &n,
3229 HASH_ENTER,
3230 &found);
3231 Assert(!found);
3232
3233 /* Add channel name to uniqueChannelHash; might be there already */
3235 channel,
3236 HASH_ENTER,
3237 NULL);
3238 }
3239}
3240
3241/*
3242 * notification_hash: hash function for notification hash table
3243 *
3244 * The hash "keys" are pointers to Notification structs.
3245 */
3246static uint32
3247notification_hash(const void *key, Size keysize)
3248{
3249 const Notification *k = *(const Notification *const *) key;
3250
3251 Assert(keysize == sizeof(Notification *));
3252 /* We don't bother to include the payload's trailing null in the hash */
3253 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3254 k->channel_len + k->payload_len + 1));
3255}
3256
3257/*
3258 * notification_match: match function to use with notification_hash
3259 */
3260static int
3261notification_match(const void *key1, const void *key2, Size keysize)
3262{
3263 const Notification *k1 = *(const Notification *const *) key1;
3264 const Notification *k2 = *(const Notification *const *) key2;
3265
3266 Assert(keysize == sizeof(Notification *));
3267 if (k1->channel_len == k2->channel_len &&
3268 k1->payload_len == k2->payload_len &&
3269 memcmp(k1->data, k2->data,
3270 k1->channel_len + k1->payload_len + 2) == 0)
3271 return 0; /* equal */
3272 return 1; /* not equal */
3273}
3274
3275/* Clear the pendingActions and pendingNotifies lists. */
3276static void
3278{
3279 /*
3280 * Everything's allocated in either TopTransactionContext or the context
3281 * for the subtransaction to which it corresponds. So, there's nothing to
3282 * do here except reset the pointers; the space will be reclaimed when the
3283 * contexts are deleted.
3284 */
3287 /* Also clear pendingListenActions, which is derived from pendingActions */
3289}
3290
3291/*
3292 * GUC check_hook for notify_buffers
3293 */
3294bool
3295check_notify_buffers(int *newval, void **extra, GucSource source)
3296{
3297 return check_slru_buffers("notify_buffers", newval);
3298}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:263
static void SignalBackends(void)
Definition async.c:2263
static double asyncQueueUsage(void)
Definition async.c:2190
#define MIN_HASHABLE_NOTIFIES
Definition async.c:527
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1530
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition async.c:2004
#define QUEUE_FIRST_LISTENER
Definition async.c:361
#define QUEUE_POS_MAX(x, y)
Definition async.c:260
static bool tryAdvanceTail
Definition async.c:578
void HandleNotifyInterrupt(void)
Definition async.c:2552
static void BecomeRegisteredListener(void)
Definition async.c:1430
static void asyncQueueAdvanceTail(void)
Definition async.c:2870
int max_notify_queue_pages
Definition async.c:584
static ActionList * pendingActions
Definition async.c:458
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1719
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:367
static uint32 notification_hash(const void *key, Size keysize)
Definition async.c:3248
void Async_UnlistenAll(void)
Definition async.c:1075
static int32 * signalPids
Definition async.c:574
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition async.c:3099
void AtCommit_Notify(void)
Definition async.c:1378
#define QUEUE_POS_MIN(x, y)
Definition async.c:254
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1632
void ProcessNotifyInterrupt(bool flush)
Definition async.c:2581
ListenActionKind
Definition async.c:439
@ LISTEN_LISTEN
Definition async.c:440
@ LISTEN_UNLISTEN_ALL
Definition async.c:442
@ LISTEN_UNLISTEN
Definition async.c:441
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3123
#define QUEUE_BACKEND_POS(i)
Definition async.c:365
static const dshash_parameters globalChannelTableDSHParams
Definition async.c:685
#define INITIAL_LISTENERS_ARRAY_SIZE
Definition async.c:392
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition async.c:3262
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
Definition async.c:672
#define SET_QUEUE_POS(x, y, z)
Definition async.c:241
static ProcNumber * signalProcnos
Definition async.c:575
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition async.c:2732
static void ProcessIncomingNotify(bool flush)
Definition async.c:3058
static void asyncQueueReadAllNotifications(void)
Definition async.c:2598
static void Async_UnlistenOnExit(int code, Datum arg)
Definition async.c:1147
#define QUEUE_POS_OFFSET(x)
Definition async.c:239
static QueuePosition queueHeadAfterWrite
Definition async.c:567
static int asyncQueueErrdetailForIoError(const void *opaque_data)
Definition async.c:627
bool Trace_notify
Definition async.c:581
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2040
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3278
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Definition async.c:1095
Datum pg_notify(PG_FUNCTION_ARGS)
Definition async.c:860
static NotificationList * pendingNotifies
Definition async.c:534
#define AsyncQueueEntryEmptySize
Definition async.c:227
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3164
static AsyncQueueControl * asyncQueueControl
Definition async.c:347
static bool unlistenExitRegistered
Definition async.c:555
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:650
static dsa_area * globalChannelDSA
Definition async.c:415
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1971
#define QUEUE_TAIL
Definition async.c:359
void AtAbort_Notify(void)
Definition async.c:2420
#define QUEUE_POS_PAGE(x)
Definition async.c:238
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
Definition async.c:1692
void PreCommit_Notify(void)
Definition async.c:1185
#define QUEUE_CLEANUP_DELAY
Definition async.c:282
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1662
static void asyncQueueFillWarning(void)
Definition async.c:2211
#define QUEUE_BACKEND_PID(i)
Definition async.c:362
static SlruDesc NotifySlruDesc
Definition async.c:375
static void AsyncShmemRequest(void *arg)
Definition async.c:798
static void CleanupListenersOnExit(void)
Definition async.c:1843
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Definition async.c:660
#define QUEUE_FULL_WARN_INTERVAL
Definition async.c:381
void Async_Unlisten(const char *channel)
Definition async.c:1057
static HTAB * pendingListenActions
Definition async.c:480
void Async_Listen(const char *channel)
Definition async.c:1043
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:201
#define QUEUE_POS_IS_ZERO(x)
Definition async.c:250
static void initGlobalChannelTable(void)
Definition async.c:699
#define NotifyCtl
Definition async.c:378
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:366
static HTAB * localChannelTable
Definition async.c:422
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition async.c:640
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:996
#define QUEUEALIGN(len)
Definition async.c:225
static bool amRegisteredListener
Definition async.c:558
#define QUEUE_POS_PRECEDES(x, y)
Definition async.c:266
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:364
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:363
void AtSubAbort_Notify(void)
Definition async.c:2509
void AtPrepare_Notify(void)
Definition async.c:1160
#define QUEUE_PAGESIZE
Definition async.c:379
void AtSubCommit_Notify(void)
Definition async.c:2439
static bool asyncQueueIsFull(void)
Definition async.c:1956
#define QUEUE_HEAD
Definition async.c:358
static void AsyncShmemInit(void *arg)
Definition async.c:829
static void initLocalChannelTable(void)
Definition async.c:750
PendingListenAction
Definition async.c:469
@ PENDING_UNLISTEN
Definition async.c:471
@ PENDING_LISTEN
Definition async.c:470
static dshash_table * globalChannelTable
Definition async.c:414
static void asyncQueueUnregister(void)
Definition async.c:1913
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
Definition async.c:2165
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:247
#define LocalChannelTableIsEmpty()
Definition async.c:425
static void initPendingListenActions(void)
Definition async.c:776
static QueuePosition queueHeadBeforeWrite
Definition async.c:566
static bool IsListeningOn(const char *channel)
Definition async.c:1900
void Async_Notify(const char *channel, const char *payload)
Definition async.c:894
volatile sig_atomic_t notifyInterruptPending
Definition async.c:552
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
Definition async.c:2952
bool check_notify_buffers(int *newval, void **extra, GucSource source)
Definition async.c:3296
#define QUEUE_STOP_PAGE
Definition async.c:360
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1789
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
int32_t int32
Definition c.h:620
uint16_t uint16
Definition c.h:623
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
int64 TimestampTz
Definition timestamp.h:39
@ DestRemote
Definition dest.h:89
dsa_area * dsa_attach(dsa_handle handle)
Definition dsa.c:510
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition dsa.c:957
void dsa_pin_mapping(dsa_area *area)
Definition dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition dsa.c:841
void dsa_pin(dsa_area *area)
Definition dsa.c:990
uint64 dsa_pointer
Definition dsa.h:62
#define dsa_create(tranche_id)
Definition dsa.h:117
#define dsa_allocate(area, size)
Definition dsa.h:109
#define InvalidDsaPointer
Definition dsa.h:78
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DsaPointerIsValid(x)
Definition dsa.h:106
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition dshash.c:611
void dshash_delete_entry(dshash_table *hash_table, void *entry)
Definition dshash.c:562
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition dshash.c:579
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
Definition dshash.c:659
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition dshash.c:394
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition dshash.c:371
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition dshash.c:274
void dshash_seq_term(dshash_seq_status *status)
Definition dshash.c:768
void * dshash_seq_next(dshash_seq_status *status)
Definition dshash.c:678
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition dshash.c:210
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition dshash.c:593
void dshash_delete_current(dshash_seq_status *status)
Definition dshash.c:778
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
uint32 dshash_hash
Definition dshash.h:30
#define dshash_find_or_insert(hash_table, key, found)
Definition dshash.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
void hash_destroy(HTAB *hashp)
Definition dynahash.c:802
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1317
Datum arg
Definition elog.c:1323
int errcode(int sqlerrcode)
Definition elog.c:875
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:29
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define INFO
Definition elog.h:35
#define ereport(elevel,...)
Definition elog.h:152
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_FLOAT8(x)
Definition fmgr.h:369
#define PG_ARGISNULL(n)
Definition fmgr.h:209
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
int MyProcPid
Definition globals.c:49
ProcNumber MyProcNumber
Definition globals.c:92
int MaxBackends
Definition globals.c:149
bool ExitOnAnyError
Definition globals.c:125
int notify_buffers
Definition globals.c:167
Oid MyDatabaseId
Definition globals.c:96
#define newval
GucSource
Definition guc.h:112
static Datum hash_uint32(uint32 k)
Definition hashfn.h:43
static Datum hash_any(const unsigned char *k, int keylen)
Definition hashfn.h:31
#define HASH_STRINGS
Definition hsearch.h:91
@ HASH_FIND
Definition hsearch.h:108
@ HASH_REMOVE
Definition hsearch.h:110
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_CONTEXT
Definition hsearch.h:97
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_COMPARE
Definition hsearch.h:94
#define HASH_FUNCTION
Definition hsearch.h:93
#define IsParallelWorker()
Definition parallel.h:62
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int j
Definition isn.c:78
int i
Definition isn.c:77
#define pq_flush()
Definition libpq.h:49
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_concat(List *list1, const List *list2)
Definition list.c:561
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1235
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
MemoryContext TopTransactionContext
Definition mcxt.c:172
void pfree(void *pointer)
Definition mcxt.c:1619
MemoryContext TopMemoryContext
Definition mcxt.c:167
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
void * palloc(Size size)
Definition mcxt.c:1390
MemoryContext CurTransactionContext
Definition mcxt.c:173
#define InvalidPid
Definition miscadmin.h:32
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
static void usage(void)
#define NAMEDATALEN
#define SLRU_PAGES_PER_SEGMENT
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static ListCell * list_head(const List *l)
Definition pg_list.h:128
static ListCell * lnext(const List *l, const ListCell *c)
Definition pg_list.h:375
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
CommandDest whereToSendOutput
Definition postgres.c:97
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:296
@ PROCSIG_NOTIFY_INTERRUPT
Definition procsignal.h:33
#define PqMsg_NotificationResponse
Definition protocol.h:41
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define ShmemRequestStruct(...)
Definition shmem.h:176
bool SlruScanDirectory(SlruDesc *ctl, SlruScanCallback callback, void *data)
Definition slru.c:1844
int SimpleLruReadPage_ReadOnly(SlruDesc *ctl, int64 pageno, const void *opaque_data)
Definition slru.c:654
void SimpleLruTruncate(SlruDesc *ctl, int64 cutoffPage)
Definition slru.c:1458
int SimpleLruZeroPage(SlruDesc *ctl, int64 pageno)
Definition slru.c:397
bool SlruScanDirCbDeleteAll(SlruDesc *ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1797
int SimpleLruReadPage(SlruDesc *ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:550
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:377
#define SimpleLruRequest(...)
Definition slru.h:218
static LWLock * SimpleLruGetBankLock(SlruDesc *ctl, int64 pageno)
Definition slru.h:207
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition snapmgr.c:1869
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void UnregisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:866
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition snapmgr.c:824
List * actions
Definition async.c:454
int nestingLevel
Definition async.c:453
struct ActionList * upper
Definition async.c:455
dshash_table_handle globalChannelTableDSH
Definition async.c:342
TimestampTz lastQueueFillWarn
Definition async.c:340
dsa_handle globalChannelTableDSA
Definition async.c:341
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition async.c:221
char channel[NAMEDATALEN]
Definition async.c:542
dsa_pointer listenersArray
Definition async.c:409
int allocatedListeners
Definition async.c:411
GlobalChannelKey key
Definition async.c:408
char channel[NAMEDATALEN]
Definition async.c:397
Size keysize
Definition hsearch.h:69
Definition pg_list.h:54
bool removeOnAbort
Definition async.c:403
Notification * event
Definition async.c:531
List * uniqueChannelNames
Definition async.c:522
HTAB * uniqueChannelHash
Definition async.c:523
HTAB * hashtab
Definition async.c:521
List * events
Definition async.c:520
struct NotificationList * upper
Definition async.c:524
uint16 payload_len
Definition async.c:512
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:514
uint16 channel_len
Definition async.c:511
PendingListenAction action
Definition async.c:477
char channel[NAMEDATALEN]
Definition async.c:476
int64 page
Definition async.c:234
@ SYNC_HANDLER_NONE
Definition sync.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void PreventCommandDuringRecovery(const char *cmdname)
Definition utility.c:446
char * text_to_cstring(const text *t)
Definition varlena.c:217
const char * name
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040
int GetCurrentTransactionNestLevel(void)
Definition xact.c:931
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207
TransactionId GetCurrentTransactionId(void)
Definition xact.c:456

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 527 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 201 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifySlruDesc)

Definition at line 378 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 363 of file async.c.

◆ QUEUE_BACKEND_IS_ADVANCING

#define QUEUE_BACKEND_IS_ADVANCING (   i)    (asyncQueueControl->backend[i].isAdvancing)

Definition at line 367 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 362 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 365 of file async.c.

◆ QUEUE_BACKEND_WAKEUP_PENDING

#define QUEUE_BACKEND_WAKEUP_PENDING (   i)    (asyncQueueControl->backend[i].wakeupPending)

Definition at line 366 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 282 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 361 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 381 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 358 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 364 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 379 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 247 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 250 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
int y
Definition isn.c:76
int x
Definition isn.c:75

Definition at line 260 of file async.c.

261 : \
262 (x).page != (y).page ? (x) : \
263 (x).offset > (y).offset ? (x) : (y))

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 254 of file async.c.

255 : \
256 (x).page != (y).page ? (y) : \
257 (x).offset < (y).offset ? (x) : (y))

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 239 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 238 of file async.c.

◆ QUEUE_POS_PRECEDES

#define QUEUE_POS_PRECEDES (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) || \
((x).page == (y).page && (x).offset < (y).offset))

Definition at line 266 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 360 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 359 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 225 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 241 of file async.c.

242 { \
243 (x).page = (y); \
244 (x).offset = (z); \
245 } while (0)

Typedef Documentation

◆ ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ ChannelName

◆ GlobalChannelEntry

◆ GlobalChannelKey

◆ ListenerEntry

◆ Notification

◆ NotificationList

◆ PendingListenEntry

◆ QueueBackendStatus

◆ QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 438 of file async.c.

◆ PendingListenAction

Enumerator
PENDING_LISTEN 
PENDING_UNLISTEN 

Definition at line 468 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 3164 of file async.c.

3165{
3167
3168 /* Create the hash tables if it's time to */
3171 {
3173 ListCell *l;
3174
3175 /* Create the hash table */
3176 hash_ctl.keysize = sizeof(Notification *);
3177 hash_ctl.entrysize = sizeof(struct NotificationHash);
3182 hash_create("Pending Notifies",
3183 256L,
3184 &hash_ctl,
3186
3187 /* Create the unique channel name table */
3189 hash_ctl.keysize = NAMEDATALEN;
3190 hash_ctl.entrysize = sizeof(ChannelName);
3193 hash_create("Pending Notify Channel Names",
3194 64L,
3195 &hash_ctl,
3197
3198 /* Insert all the already-existing events */
3199 foreach(l, pendingNotifies->events)
3200 {
3202 char *channel = oldn->data;
3203 bool found;
3204
3206 &oldn,
3207 HASH_ENTER,
3208 &found);
3209 Assert(!found);
3210
3211 /* Add channel name to uniqueChannelHash; might be there already */
3213 channel,
3214 HASH_ENTER,
3215 NULL);
3216 }
3217 }
3218
3219 /* Add new event to the list, in order */
3221
3222 /* Add event to the hash tables if needed */
3224 {
3225 char *channel = n->data;
3226 bool found;
3227
3229 &n,
3230 HASH_ENTER,
3231 &found);
3232 Assert(!found);
3233
3234 /* Add channel name to uniqueChannelHash; might be there already */
3236 channel,
3237 HASH_ENTER,
3238 NULL);
3239 }
3240}

References Assert, CurTransactionContext, Notification::data, NotificationList::events, fb(), HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), HASH_STRINGS, NotificationList::hashtab, lappend(), lfirst, list_length(), MIN_HASHABLE_NOTIFIES, NAMEDATALEN, NIL, notification_hash(), notification_match(), pendingNotifies, and NotificationList::uniqueChannelHash.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ ApplyPendingListenActions()

static void ApplyPendingListenActions ( bool  isCommit)
static

Definition at line 1719 of file async.c.

1720{
1722 PendingListenEntry *pending;
1723
1724 /* Quick exit if nothing to do */
1726 return;
1727
1728 /* We made a globalChannelTable before building pendingListenActions */
1729 if (globalChannelTable == NULL)
1730 elog(PANIC, "global channel table missing post-commit/abort");
1731
1732 /* For each staged action ... */
1734 while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1735 {
1737 GlobalChannelEntry *entry;
1738 bool removeLocal = true;
1739 bool foundListener = false;
1740
1741 /*
1742 * Find the global entry for this channel. If isCommit, it had better
1743 * exist (it was created in PreCommit). In an abort, it might not
1744 * exist, in which case we are not listening and should discard any
1745 * local entry that PreCommit may have managed to create.
1746 */
1747 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1748 entry = dshash_find(globalChannelTable, &key, true);
1749 if (entry != NULL)
1750 {
1751 /* Scan entry to find the ListenerEntry for this backend */
1753
1756
1757 for (int i = 0; i < entry->numListeners; i++)
1758 {
1759 if (listeners[i].procNo != MyProcNumber)
1760 continue;
1761 foundListener = true;
1762 if (isCommit)
1763 {
1764 if (pending->action == PENDING_LISTEN)
1765 {
1766 /*
1767 * LISTEN being committed: entry is now permanent.
1768 * localChannelTable entry was created during
1769 * PreCommit and should be kept.
1770 */
1771 listeners[i].removeOnAbort = false;
1772 removeLocal = false;
1773 }
1774 else
1775 {
1776 /*
1777 * UNLISTEN being committed: remove pre-allocated
1778 * entries from both tables.
1779 */
1781 }
1782 }
1783 else
1784 {
1785 /*
1786 * Note: this part is reachable only if the transaction
1787 * aborts after PreCommit_Notify() has made some
1788 * pendingListenActions entries, so it's pretty hard to
1789 * test.
1790 */
1791 if (listeners[i].removeOnAbort)
1792 {
1793 /*
1794 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1795 * so remove pre-allocated entries from both tables.
1796 */
1798 }
1799 else
1800 {
1801 /*
1802 * Entry predates this transaction, so keep the
1803 * localChannelTable entry.
1804 */
1805 removeLocal = false;
1806 }
1807 }
1808 break; /* there shouldn't be another match */
1809 }
1810
1811 /* We might have already released the entry by removing it */
1812 if (entry != NULL)
1814 }
1815
1816 /*
1817 * If we're committing a LISTEN action, we should have found a
1818 * matching ListenerEntry, but otherwise it's okay if we didn't.
1819 */
1820 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1821 elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1822 pending->channel, MyProcNumber);
1823
1824 /*
1825 * If we did not find a globalChannelTable entry for our backend, or
1826 * if we are unlistening, remove any localChannelTable entry that may
1827 * exist. (Note in particular that this cleans up if we created a
1828 * localChannelTable entry and then failed while trying to create a
1829 * globalChannelTable entry.)
1830 */
1833 HASH_REMOVE, NULL);
1834 }
1835}

References PendingListenEntry::action, PendingListenEntry::channel, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_REMOVE, hash_search(), hash_seq_init(), hash_seq_search(), i, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PANIC, PENDING_LISTEN, pendingListenActions, RemoveListenerFromChannel(), and ListenerEntry::removeOnAbort.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char channel)

Definition at line 1043 of file async.c.

1044{
1045 if (Trace_notify)
1046 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1047
1048 queue_listen(LISTEN_LISTEN, channel);
1049}

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char channel,
const char payload 
)

Definition at line 894 of file async.c.

895{
896 int my_level = GetCurrentTransactionNestLevel();
897 size_t channel_len;
898 size_t payload_len;
899 Notification *n;
900 MemoryContext oldcontext;
901
902 if (IsParallelWorker())
903 elog(ERROR, "cannot send notifications from a parallel worker");
904
905 if (Trace_notify)
906 elog(DEBUG1, "Async_Notify(%s)", channel);
907
908 channel_len = channel ? strlen(channel) : 0;
909 payload_len = payload ? strlen(payload) : 0;
910
911 /* a channel name must be specified */
912 if (channel_len == 0)
915 errmsg("channel name cannot be empty")));
916
917 /* enforce length limits */
918 if (channel_len >= NAMEDATALEN)
921 errmsg("channel name too long")));
922
923 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
926 errmsg("payload string too long")));
927
928 /*
929 * We must construct the Notification entry, even if we end up not using
930 * it, in order to compare it cheaply to existing list entries.
931 *
932 * The notification list needs to live until end of transaction, so store
933 * it in the transaction context.
934 */
936
938 channel_len + payload_len + 2);
939 n->channel_len = channel_len;
940 n->payload_len = payload_len;
941 strcpy(n->data, channel);
942 if (payload)
943 strcpy(n->data + channel_len + 1, payload);
944 else
945 n->data[channel_len + 1] = '\0';
946
947 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
948 {
950
951 /*
952 * First notify event in current (sub)xact. Note that we allocate the
953 * NotificationList in TopTransactionContext; the nestingLevel might
954 * get changed later by AtSubCommit_Notify.
955 */
958 sizeof(NotificationList));
959 notifies->nestingLevel = my_level;
960 notifies->events = list_make1(n);
961 /* We certainly don't need a hashtable yet */
962 notifies->hashtab = NULL;
963 /* We won't build uniqueChannelNames/Hash till later, either */
964 notifies->uniqueChannelNames = NIL;
965 notifies->uniqueChannelHash = NULL;
966 notifies->upper = pendingNotifies;
968 }
969 else
970 {
971 /* Now check for duplicates */
973 {
974 /* It's a dup, so forget it */
975 pfree(n);
976 MemoryContextSwitchTo(oldcontext);
977 return;
978 }
979
980 /* Append more events to existing list */
982 }
983
984 MemoryContextSwitchTo(oldcontext);
985}

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg, ERROR, fb(), GetCurrentTransactionNestLevel(), IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NIL, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char channel)

Definition at line 1057 of file async.c.

1058{
1059 if (Trace_notify)
1060 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1061
1062 /* If we couldn't possibly be listening, no need to queue anything */
1064 return;
1065
1066 queue_listen(LISTEN_UNLISTEN, channel);
1067}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 1075 of file async.c.

1076{
1077 if (Trace_notify)
1078 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1079
1080 /* If we couldn't possibly be listening, no need to queue anything */
1082 return;
1083
1085}

References DEBUG1, elog, fb(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 1147 of file async.c.

1148{
1151}

References asyncQueueUnregister(), and CleanupListenersOnExit().

Referenced by BecomeRegisteredListener().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 3123 of file async.c.

3124{
3125 if (pendingNotifies == NULL)
3126 return false;
3127
3129 {
3130 /* Use the hash table to probe for a match */
3132 &n,
3133 HASH_FIND,
3134 NULL))
3135 return true;
3136 }
3137 else
3138 {
3139 /* Must scan the event list */
3140 ListCell *l;
3141
3142 foreach(l, pendingNotifies->events)
3143 {
3145
3146 if (n->channel_len == oldn->channel_len &&
3147 n->payload_len == oldn->payload_len &&
3148 memcmp(n->data, oldn->data,
3149 n->channel_len + n->payload_len + 2) == 0)
3150 return true;
3151 }
3152 }
3153
3154 return false;
3155}

References Notification::channel_len, Notification::data, NotificationList::events, fb(), HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ AsyncNotifyFreezeXids()

void AsyncNotifyFreezeXids ( TransactionId  newFrozenXid)

Definition at line 2952 of file async.c.

2953{
2954 QueuePosition pos;
2955 QueuePosition head;
2956 int64 curpage = -1;
2957 int slotno = -1;
2958 char *page_buffer = NULL;
2959 bool page_dirty = false;
2960
2961 /*
2962 * Acquire locks in the correct order to avoid deadlocks. As per the
2963 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2964 * bank locks.
2965 *
2966 * We only need SHARED mode since we're just reading the head/tail
2967 * positions, not modifying them.
2968 */
2971
2972 pos = QUEUE_TAIL;
2973 head = QUEUE_HEAD;
2974
2975 /* Release NotifyQueueLock early, we only needed to read the positions */
2977
2978 /*
2979 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2980 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2981 * we're working.
2982 */
2983 while (!QUEUE_POS_EQUAL(pos, head))
2984 {
2986 TransactionId xid;
2987 int64 pageno = QUEUE_POS_PAGE(pos);
2988 int offset = QUEUE_POS_OFFSET(pos);
2989
2990 /* If we need a different page, release old lock and get new one */
2991 if (pageno != curpage)
2992 {
2993 LWLock *lock;
2994
2995 /* Release previous page if any */
2996 if (slotno >= 0)
2997 {
2998 if (page_dirty)
2999 {
3000 NotifyCtl->shared->page_dirty[slotno] = true;
3001 page_dirty = false;
3002 }
3004 }
3005
3006 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3008 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
3009 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3010 curpage = pageno;
3011 }
3012
3013 qe = (AsyncQueueEntry *) (page_buffer + offset);
3014 xid = qe->xid;
3015
3016 if (TransactionIdIsNormal(xid) &&
3018 {
3019 if (TransactionIdDidCommit(xid))
3020 {
3021 qe->xid = FrozenTransactionId;
3022 page_dirty = true;
3023 }
3024 else
3025 {
3026 qe->xid = InvalidTransactionId;
3027 page_dirty = true;
3028 }
3029 }
3030
3031 /* Advance to next entry */
3032 asyncQueueAdvance(&pos, qe->length);
3033 }
3034
3035 /* Release final page lock if we acquired one */
3036 if (slotno >= 0)
3037 {
3038 if (page_dirty)
3039 NotifyCtl->shared->page_dirty[slotno] = true;
3041 }
3042
3044}

References asyncQueueAdvance(), fb(), FrozenTransactionId, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruGetBankLock(), SimpleLruReadPage(), TransactionIdDidCommit(), TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by vac_truncate_clog().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 2040 of file async.c.

2041{
2044 int64 pageno;
2045 int offset;
2046 int slotno;
2048
2049 /*
2050 * We work with a local copy of QUEUE_HEAD, which we write back to shared
2051 * memory upon exiting. The reason for this is that if we have to advance
2052 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2053 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2054 * subsequent insertions would try to put entries into a page that slru.c
2055 * thinks doesn't exist yet.) So, use a local position variable. Note
2056 * that if we do fail, any already-inserted queue entries are forgotten;
2057 * this is okay, since they'd be useless anyway after our transaction
2058 * rolls back.
2059 */
2061
2062 /*
2063 * If this is the first write since the postmaster started, we need to
2064 * initialize the first page of the async SLRU. Otherwise, the current
2065 * page should be initialized already, so just fetch it.
2066 */
2067 pageno = QUEUE_POS_PAGE(queue_head);
2069
2070 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2072
2075 else
2076 slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
2077
2078 /* Note we mark the page dirty before writing in it */
2079 NotifyCtl->shared->page_dirty[slotno] = true;
2080
2081 while (nextNotify != NULL)
2082 {
2084
2085 /* Construct a valid queue entry in local variable qe */
2087
2088 offset = QUEUE_POS_OFFSET(queue_head);
2089
2090 /* Check whether the entry really fits on the current page */
2091 if (offset + qe.length <= QUEUE_PAGESIZE)
2092 {
2093 /* OK, so advance nextNotify past this item */
2095 }
2096 else
2097 {
2098 /*
2099 * Write a dummy entry to fill up the page. Actually readers will
2100 * only check dboid and since it won't match any reader's database
2101 * OID, they will ignore this entry and move on.
2102 */
2103 qe.length = QUEUE_PAGESIZE - offset;
2104 qe.dboid = InvalidOid;
2106 qe.data[0] = '\0'; /* empty channel */
2107 qe.data[1] = '\0'; /* empty payload */
2108 }
2109
2110 /* Now copy qe into the shared buffer page */
2111 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2112 &qe,
2113 qe.length);
2114
2115 /* Advance queue_head appropriately, and detect if page is full */
2116 if (asyncQueueAdvance(&(queue_head), qe.length))
2117 {
2118 LWLock *lock;
2119
2120 pageno = QUEUE_POS_PAGE(queue_head);
2121 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2122 if (lock != prevlock)
2123 {
2126 prevlock = lock;
2127 }
2128
2129 /*
2130 * Page is full, so we're done here, but first fill the next page
2131 * with zeroes. The reason to do this is to ensure that slru.c's
2132 * idea of the head page is always the same as ours, which avoids
2133 * boundary problems in SimpleLruTruncate. The test in
2134 * asyncQueueIsFull() ensured that there is room to create this
2135 * page without overrunning the queue.
2136 */
2138
2139 /*
2140 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2141 * set flag to remember that we should try to advance the tail
2142 * pointer (we don't want to actually do that right here).
2143 */
2145 tryAdvanceTail = true;
2146
2147 /* And exit the loop */
2148 break;
2149 }
2150 }
2151
2152 /* Success, so update the global QUEUE_HEAD */
2154
2156
2157 return nextNotify;
2158}

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), NotificationList::events, fb(), InvalidOid, InvalidTransactionId, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), memcpy(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1971 of file async.c.

1972{
1973 int64 pageno = QUEUE_POS_PAGE(*position);
1974 int offset = QUEUE_POS_OFFSET(*position);
1975 bool pageJump = false;
1976
1977 /*
1978 * Move to the next writing position: First jump over what we have just
1979 * written or read.
1980 */
1981 offset += entryLength;
1982 Assert(offset <= QUEUE_PAGESIZE);
1983
1984 /*
1985 * In a second step check if another entry can possibly be written to the
1986 * page. If so, stay here, we have reached the next position. If not, then
1987 * we need to move on to the next page.
1988 */
1990 {
1991 pageno++;
1992 offset = 0;
1993 pageJump = true;
1994 }
1995
1996 SET_QUEUE_POS(*position, pageno, offset);
1997 return pageJump;
1998}

References Assert, AsyncQueueEntryEmptySize, fb(), QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by AsyncNotifyFreezeXids(), asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2870 of file async.c.

2871{
2872 QueuePosition min;
2875 int64 boundary;
2876
2877 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2879
2880 /*
2881 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2882 * (ie, exactly match at least one backend's queue position), so it must
2883 * be updated atomically with the actual computation. Since v13, we could
2884 * get away with not doing it like that, but it seems prudent to keep it
2885 * so.
2886 *
2887 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2888 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2889 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2890 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2891 * there are pages we can truncate but haven't yet finished doing so.
2892 *
2893 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2894 * performing SimpleLruTruncate. This is OK because no backend will try
2895 * to access the pages we are in the midst of truncating.
2896 */
2898 min = QUEUE_HEAD;
2900 {
2902 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2903 }
2904 QUEUE_TAIL = min;
2907
2908 /*
2909 * We can truncate something if the global tail advanced across an SLRU
2910 * segment boundary.
2911 *
2912 * XXX it might be better to truncate only once every several segments, to
2913 * reduce the number of directory scans.
2914 */
2917 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2918 {
2919 /*
2920 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2921 * release the lock again.
2922 */
2924
2928 }
2929
2931}

References Assert, asyncQueuePagePrecedes(), fb(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueErrdetailForIoError()

static int asyncQueueErrdetailForIoError ( const void opaque_data)
static

Definition at line 627 of file async.c.

628{
629 const QueuePosition *position = opaque_data;
630
631 return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
632 position->page, position->offset);
633}

References errdetail(), fb(), QueuePosition::offset, and QueuePosition::page.

Referenced by AsyncShmemRequest().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 2211 of file async.c.

2212{
2213 double fillDegree;
2214 TimestampTz t;
2215
2217 if (fillDegree < 0.5)
2218 return;
2219
2220 t = GetCurrentTimestamp();
2221
2224 {
2227
2229 {
2231 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2234 }
2235
2237 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2238 (minPid != InvalidPid ?
2239 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2240 : 0),
2241 (minPid != InvalidPid ?
2242 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2243 : 0)));
2244
2246 }
2247}

References Assert, asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg, fb(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1956 of file async.c.

1957{
1958 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1960 int64 occupied = headPage - tailPage;
1961
1963}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 2004 of file async.c.

2005{
2006 size_t channellen = n->channel_len;
2007 size_t payloadlen = n->payload_len;
2008 int entryLength;
2009
2012
2013 /* The terminators are already included in AsyncQueueEntryEmptySize */
2016 qe->length = entryLength;
2017 qe->dboid = MyDatabaseId;
2018 qe->xid = GetCurrentTransactionId();
2019 qe->srcPid = MyProcPid;
2020 memcpy(qe->data, n->data, channellen + payloadlen + 2);
2021}

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, fb(), GetCurrentTransactionId(), memcpy(), MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, and QUEUEALIGN.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 640 of file async.c.

641{
642 return p - q;
643}

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 650 of file async.c.

651{
652 return p < q;
653}

Referenced by asyncQueueAdvanceTail(), and AsyncShmemRequest().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( QueuePosition current,
QueuePosition  stop,
Snapshot  snapshot 
)
static

Definition at line 2732 of file async.c.

2735{
2736 int64 curpage = QUEUE_POS_PAGE(*current);
2737 int slotno;
2738 char *page_buffer;
2739 bool reachedStop = false;
2740 bool reachedEndOfPage;
2741
2742 /*
2743 * We copy the entries into a local buffer to avoid holding the SLRU lock
2744 * while we transmit them to our frontend. The local buffer must be
2745 * adequately aligned.
2746 */
2748 char *local_buf_end = local_buf;
2749
2751 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2752
2753 do
2754 {
2755 QueuePosition thisentry = *current;
2757
2758 if (QUEUE_POS_EQUAL(thisentry, stop))
2759 break;
2760
2761 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2762
2763 /*
2764 * Advance *current over this message, possibly to the next page. As
2765 * noted in the comments for asyncQueueReadAllNotifications, we must
2766 * do this before possibly failing while processing the message.
2767 */
2768 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2769
2770 /* Ignore messages destined for other databases */
2771 if (qe->dboid == MyDatabaseId)
2772 {
2773 if (XidInMVCCSnapshot(qe->xid, snapshot))
2774 {
2775 /*
2776 * The source transaction is still in progress, so we can't
2777 * process this message yet. Break out of the loop, but first
2778 * back up *current so we will reprocess the message next
2779 * time. (Note: it is unlikely but not impossible for
2780 * TransactionIdDidCommit to fail, so we can't really avoid
2781 * this advance-then-back-up behavior when dealing with an
2782 * uncommitted message.)
2783 *
2784 * Note that we must test XidInMVCCSnapshot before we test
2785 * TransactionIdDidCommit, else we might return a message from
2786 * a transaction that is not yet visible to snapshots; compare
2787 * the comments at the head of heapam_visibility.c.
2788 *
2789 * Also, while our own xact won't be listed in the snapshot,
2790 * we need not check for TransactionIdIsCurrentTransactionId
2791 * because our transaction cannot (yet) have queued any
2792 * messages.
2793 */
2794 *current = thisentry;
2795 reachedStop = true;
2796 break;
2797 }
2798
2799 /*
2800 * Quick check for the case that we're not listening on any
2801 * channels, before calling TransactionIdDidCommit(). This makes
2802 * that case a little faster, but more importantly, it ensures
2803 * that if there's a bad entry in the queue for which
2804 * TransactionIdDidCommit() fails for some reason, we can skip
2805 * over it on the first LISTEN in a session, and not get stuck on
2806 * it indefinitely. (This is a little trickier than it looks: it
2807 * works because BecomeRegisteredListener runs this code before we
2808 * have made the first entry in localChannelTable.)
2809 */
2811 continue;
2812
2813 if (TransactionIdDidCommit(qe->xid))
2814 {
2815 memcpy(local_buf_end, qe, qe->length);
2816 local_buf_end += qe->length;
2817 }
2818 else
2819 {
2820 /*
2821 * The source transaction aborted or crashed, so we just
2822 * ignore its notifications.
2823 */
2824 }
2825 }
2826
2827 /* Loop back if we're not at end of page */
2828 } while (!reachedEndOfPage);
2829
2830 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2832
2833 /*
2834 * Now that we have let go of the SLRU bank lock, send the notifications
2835 * to our backend
2836 */
2838 for (char *p = local_buf; p < local_buf_end;)
2839 {
2841
2842 /* qe->data is the null-terminated channel name */
2843 char *channel = qe->data;
2844
2845 if (IsListeningOn(channel))
2846 {
2847 /* payload follows channel name */
2848 char *payload = qe->data + strlen(channel) + 1;
2849
2850 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2851 }
2852
2853 p += qe->length;
2854 }
2855
2856 if (QUEUE_POS_EQUAL(*current, stop))
2857 reachedStop = true;
2858
2859 return reachedStop;
2860}

References Assert, asyncQueueAdvance(), AsyncQueueEntry::data, fb(), IsListeningOn(), LocalChannelTableIsEmpty, LWLockRelease(), memcpy(), MyDatabaseId, NotifyCtl, NotifyMyFrontEnd(), QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), TransactionIdDidCommit(), and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 2598 of file async.c.

2599{
2600 QueuePosition pos;
2601 QueuePosition head;
2602 Snapshot snapshot;
2603
2604 /*
2605 * Fetch current state, indicate to others that we have woken up, and that
2606 * we are in process of advancing our position.
2607 */
2609 /* Assert checks that we have a valid state entry */
2613 head = QUEUE_HEAD;
2614
2615 if (QUEUE_POS_EQUAL(pos, head))
2616 {
2617 /* Nothing to do, we have read all notifications already. */
2619 return;
2620 }
2621
2624
2625 /*----------
2626 * Get snapshot we'll use to decide which xacts are still in progress.
2627 * This is trickier than it might seem, because of race conditions.
2628 * Consider the following example:
2629 *
2630 * Backend 1: Backend 2:
2631 *
2632 * transaction starts
2633 * UPDATE foo SET ...;
2634 * NOTIFY foo;
2635 * commit starts
2636 * queue the notify message
2637 * transaction starts
2638 * LISTEN foo; -- first LISTEN in session
2639 * SELECT * FROM foo WHERE ...;
2640 * commit to clog
2641 * commit starts
2642 * add backend 2 to array of listeners
2643 * advance to queue head (this code)
2644 * commit to clog
2645 *
2646 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2647 * wasn't committed yet. Ideally we'd ensure that client 2 would
2648 * eventually get transaction 1's notify message, but there's no way
2649 * to do that; until we're in the listener array, there's no guarantee
2650 * that the notify message doesn't get removed from the queue.
2651 *
2652 * Therefore the coding technique transaction 2 is using is unsafe:
2653 * applications must commit a LISTEN before inspecting database state,
2654 * if they want to ensure they will see notifications about subsequent
2655 * changes to that state.
2656 *
2657 * What we do guarantee is that we'll see all notifications from
2658 * transactions committing after the snapshot we take here.
2659 * BecomeRegisteredListener has already added us to the listener array,
2660 * so no not-yet-committed messages can be removed from the queue
2661 * before we see them.
2662 *----------
2663 */
2664 snapshot = RegisterSnapshot(GetLatestSnapshot());
2665
2666 /*
2667 * It is possible that we fail while trying to send a message to our
2668 * frontend (for example, because of encoding conversion failure). If
2669 * that happens it is critical that we not try to send the same message
2670 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2671 * ERRORs to FATAL, causing the client connection to be closed on error.
2672 *
2673 * We used to only skip over the offending message and try to soldier on,
2674 * but it was somewhat questionable to lose a notification and give the
2675 * client an ERROR instead. A client application is not be prepared for
2676 * that and can't tell that a notification was missed. It was also not
2677 * very useful in practice because notifications are often processed while
2678 * a connection is idle and reading a message from the client, and in that
2679 * state, any error is upgraded to FATAL anyway. Closing the connection
2680 * is a clear signal to the application that it might have missed
2681 * notifications.
2682 */
2683 {
2685 bool reachedStop;
2686
2687 ExitOnAnyError = true;
2688
2689 do
2690 {
2691 /*
2692 * Process messages up to the stop position, end of page, or an
2693 * uncommitted message.
2694 *
2695 * Our stop position is what we found to be the head's position
2696 * when we entered this function. It might have changed already.
2697 * But if it has, we will receive (or have already received and
2698 * queued) another signal and come here again.
2699 *
2700 * We are not holding NotifyQueueLock here! The queue can only
2701 * extend beyond the head pointer (see above) and we leave our
2702 * backend's pointer where it is so nobody will truncate or
2703 * rewrite pages under us. Especially we don't want to hold a lock
2704 * while sending the notifications to the frontend.
2705 */
2706 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2707 } while (!reachedStop);
2708
2709 /* Update shared state */
2714
2716 }
2717
2718 /* Done with snapshot */
2719 UnregisterSnapshot(snapshot);
2720}

References Assert, asyncQueueProcessPageEntries(), ExitOnAnyError, fb(), GetLatestSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_HEAD, QUEUE_POS_EQUAL, RegisterSnapshot(), and UnregisterSnapshot().

Referenced by BecomeRegisteredListener(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1913 of file async.c.

1914{
1915 Assert(LocalChannelTableIsEmpty()); /* else caller error */
1916
1917 if (!amRegisteredListener) /* nothing to do */
1918 return;
1919
1920 /*
1921 * Need exclusive lock here to manipulate list links.
1922 */
1924 /* Mark our entry as invalid */
1929 /* and remove it from the list */
1932 else
1933 {
1935 {
1937 {
1939 break;
1940 }
1941 }
1942 }
1945
1946 /* mark ourselves as no longer listed in the global array */
1947 amRegisteredListener = false;
1948}

References amRegisteredListener, Assert, fb(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, LocalChannelTableIsEmpty, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 2190 of file async.c.

2191{
2192 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2194 int64 occupied = headPage - tailPage;
2195
2196 if (occupied == 0)
2197 return (double) 0; /* fast exit for common case */
2198
2199 return (double) occupied / (double) max_notify_queue_pages;
2200}

References fb(), max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

◆ AsyncShmemRequest()

static void AsyncShmemRequest ( void arg)
static

Definition at line 798 of file async.c.

799{
800 Size size;
801
802 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
803 size = add_size(size, offsetof(AsyncQueueControl, backend));
804
805 ShmemRequestStruct(.name = "Async Queue Control",
806 .size = size,
807 .ptr = (void **) &asyncQueueControl,
808 );
809
811 .name = "notify",
812 .Dir = "pg_notify",
813
814 /* long segment names are used in order to avoid wraparound */
815 .long_segment_names = true,
816
817 .nslots = notify_buffers,
818
819 .sync_handler = SYNC_HANDLER_NONE,
820 .PagePrecedes = asyncQueuePagePrecedes,
821 .errdetail_for_io_error = asyncQueueErrdetailForIoError,
822
823 .buffer_tranche_id = LWTRANCHE_NOTIFY_BUFFER,
824 .bank_tranche_id = LWTRANCHE_NOTIFY_SLRU,
825 );
826}

References add_size(), asyncQueueControl, asyncQueueErrdetailForIoError(), asyncQueuePagePrecedes(), fb(), MaxBackends, mul_size(), name, notify_buffers, NotifySlruDesc, ShmemRequestStruct, SimpleLruRequest, and SYNC_HANDLER_NONE.

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 2420 of file async.c.

2421{
2422 /* Revert staged listen/unlisten changes */
2424
2425 /* If we're no longer listening on anything, unregister */
2428
2429 /* And clean up */
2431}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and LocalChannelTableIsEmpty.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1378 of file async.c.

1379{
1380 /*
1381 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1382 * return as soon as possible
1383 */
1385 return;
1386
1387 if (Trace_notify)
1388 elog(DEBUG1, "AtCommit_Notify");
1389
1390 /* Apply staged listen/unlisten changes */
1392
1393 /* If no longer listening to anything, get out of listener array */
1396
1397 /*
1398 * Send signals to listening backends. We need do this only if there are
1399 * pending notifies, which were previously added to the shared queue by
1400 * PreCommit_Notify().
1401 */
1402 if (pendingNotifies != NULL)
1404
1405 /*
1406 * If it's time to try to advance the global tail pointer, do that.
1407 *
1408 * (It might seem odd to do this in the sender, when more than likely the
1409 * listeners won't yet have read the messages we just sent. However,
1410 * there's less contention if only the sender does it, and there is little
1411 * need for urgency in advancing the global tail. So this typically will
1412 * be clearing out messages that were sent some time ago.)
1413 */
1414 if (tryAdvanceTail)
1415 {
1416 tryAdvanceTail = false;
1418 }
1419
1420 /* And clean up */
1422}

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueAdvanceTail(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), DEBUG1, elog, fb(), LocalChannelTableIsEmpty, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 1160 of file async.c.

1161{
1162 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1164 ereport(ERROR,
1166 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1167}

References ereport, errcode(), errmsg, ERROR, fb(), pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 2509 of file async.c.

2510{
2511 int my_level = GetCurrentTransactionNestLevel();
2512
2513 /*
2514 * All we have to do is pop the stack --- the actions/notifies made in
2515 * this subxact are no longer interesting, and the space will be freed
2516 * when CurTransactionContext is recycled. We still have to free the
2517 * ActionList and NotificationList objects themselves, though, because
2518 * those are allocated in TopTransactionContext.
2519 *
2520 * Note that there might be no entries at all, or no entries for the
2521 * current subtransaction level, either because none were ever created, or
2522 * because we reentered this routine due to trouble during subxact abort.
2523 */
2524 while (pendingActions != NULL &&
2525 pendingActions->nestingLevel >= my_level)
2526 {
2528
2531 }
2532
2533 while (pendingNotifies != NULL &&
2534 pendingNotifies->nestingLevel >= my_level)
2535 {
2537
2540 }
2541}

References fb(), GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 2439 of file async.c.

2440{
2441 int my_level = GetCurrentTransactionNestLevel();
2442
2443 /* If there are actions at our nesting level, we must reparent them. */
2444 if (pendingActions != NULL &&
2445 pendingActions->nestingLevel >= my_level)
2446 {
2447 if (pendingActions->upper == NULL ||
2448 pendingActions->upper->nestingLevel < my_level - 1)
2449 {
2450 /* nothing to merge; give the whole thing to the parent */
2452 }
2453 else
2454 {
2456
2458
2459 /*
2460 * Mustn't try to eliminate duplicates here --- see queue_listen()
2461 */
2464 childPendingActions->actions);
2466 }
2467 }
2468
2469 /* If there are notifies at our nesting level, we must reparent them. */
2470 if (pendingNotifies != NULL &&
2471 pendingNotifies->nestingLevel >= my_level)
2472 {
2473 Assert(pendingNotifies->nestingLevel == my_level);
2474
2475 if (pendingNotifies->upper == NULL ||
2476 pendingNotifies->upper->nestingLevel < my_level - 1)
2477 {
2478 /* nothing to merge; give the whole thing to the parent */
2480 }
2481 else
2482 {
2483 /*
2484 * Formerly, we didn't bother to eliminate duplicates here, but
2485 * now we must, else we fall foul of "Assert(!found)", either here
2486 * or during a later attempt to build the parent-level hashtable.
2487 */
2489 ListCell *l;
2490
2492 /* Insert all the subxact's events into parent, except for dups */
2493 foreach(l, childPendingNotifies->events)
2494 {
2496
2499 }
2501 }
2502 }
2503}

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), fb(), GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ BecomeRegisteredListener()

static void BecomeRegisteredListener ( void  )
static

Definition at line 1430 of file async.c.

1431{
1432 QueuePosition head;
1433 QueuePosition max;
1435
1436 /*
1437 * Nothing to do if we are already listening to something, nor if we
1438 * already ran this routine in this transaction.
1439 */
1441 return;
1442
1443 if (Trace_notify)
1444 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1445
1446 /*
1447 * Before registering, make sure we will unlisten before dying. (Note:
1448 * this action does not get undone if we abort later.)
1449 */
1451 {
1454 }
1455
1456 /*
1457 * This is our first LISTEN, so establish our pointer.
1458 *
1459 * We set our pointer to the global tail pointer and then move it forward
1460 * over already-committed notifications. This ensures we cannot miss any
1461 * not-yet-committed notifications. We might get a few more but that
1462 * doesn't hurt.
1463 *
1464 * In some scenarios there might be a lot of committed notifications that
1465 * have not yet been pruned away (because some backend is being lazy about
1466 * reading them). To reduce our startup time, we can look at other
1467 * backends and adopt the maximum "pos" pointer of any backend that's in
1468 * our database; any notifications it's already advanced over are surely
1469 * committed and need not be re-examined by us. (We must consider only
1470 * backends connected to our DB, because others will not have bothered to
1471 * check committed-ness of notifications in our DB.)
1472 *
1473 * We need exclusive lock here so we can look at other backends' entries
1474 * and manipulate the list links.
1475 */
1477 head = QUEUE_HEAD;
1478 max = QUEUE_TAIL;
1481 {
1483 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1484 /* Also find last listening backend before this one */
1485 if (i < MyProcNumber)
1486 prevListener = i;
1487 }
1493 /* Insert backend into list of listeners at correct position */
1495 {
1498 }
1499 else
1500 {
1503 }
1505
1506 /* Now we are listed in the global array, so remember we're listening */
1507 amRegisteredListener = true;
1508
1509 /*
1510 * Try to move our pointer forward as far as possible. This will skip
1511 * over already-committed notifications, which we want to do because they
1512 * might be quite stale. Note that we are not yet listening on anything,
1513 * so we won't deliver such notifications to our frontend. Also, although
1514 * our transaction might have executed NOTIFY, those message(s) aren't
1515 * queued yet so we won't skip them here.
1516 */
1517 if (!QUEUE_POS_EQUAL(max, head))
1519}

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, fb(), i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ check_notify_buffers()

bool check_notify_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 3296 of file async.c.

3297{
3298 return check_slru_buffers("notify_buffers", newval);
3299}

References check_slru_buffers(), and newval.

◆ CleanupListenersOnExit()

static void CleanupListenersOnExit ( void  )
static

Definition at line 1843 of file async.c.

1844{
1845 dshash_seq_status status;
1846 GlobalChannelEntry *entry;
1847
1848 if (Trace_notify)
1849 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1850
1851 /* Clear our local cache (not really necessary, but be consistent) */
1852 if (localChannelTable != NULL)
1853 {
1856 }
1857
1858 /* Now remove our entries from the shared globalChannelTable */
1859 if (globalChannelTable == NULL)
1860 return;
1861
1862 dshash_seq_init(&status, globalChannelTable, true);
1863 while ((entry = dshash_seq_next(&status)) != NULL)
1864 {
1866
1867 if (entry->key.dboid != MyDatabaseId)
1868 continue; /* not relevant */
1869
1872
1873 for (int i = 0; i < entry->numListeners; i++)
1874 {
1875 if (listeners[i].procNo == MyProcNumber)
1876 {
1877 entry->numListeners--;
1878 if (i < entry->numListeners)
1879 memmove(&listeners[i], &listeners[i + 1],
1880 sizeof(ListenerEntry) * (entry->numListeners - i));
1881
1882 if (entry->numListeners == 0)
1883 {
1885 dshash_delete_current(&status);
1886 }
1887 break;
1888 }
1889 }
1890 }
1891 dshash_seq_term(&status);
1892}

References GlobalChannelKey::dboid, DEBUG1, dsa_free(), dsa_get_address(), dshash_delete_current(), dshash_seq_init(), dshash_seq_next(), dshash_seq_term(), elog, fb(), globalChannelDSA, globalChannelTable, hash_destroy(), i, GlobalChannelEntry::key, GlobalChannelEntry::listenersArray, localChannelTable, MyDatabaseId, MyProcNumber, MyProcPid, GlobalChannelEntry::numListeners, and Trace_notify.

Referenced by Async_UnlistenOnExit().

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 3278 of file async.c.

3279{
3280 /*
3281 * Everything's allocated in either TopTransactionContext or the context
3282 * for the subtransaction to which it corresponds. So, there's nothing to
3283 * do here except reset the pointers; the space will be reclaimed when the
3284 * contexts are deleted.
3285 */
3288 /* Also clear pendingListenActions, which is derived from pendingActions */
3290}

References fb(), pendingActions, pendingListenActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ GlobalChannelKeyInit()

static void GlobalChannelKeyInit ( GlobalChannelKey key,
Oid  dboid,
const char channel 
)
inlinestatic

Definition at line 660 of file async.c.

661{
662 memset(key, 0, sizeof(GlobalChannelKey));
663 key->dboid = dboid;
664 strlcpy(key->channel, channel, NAMEDATALEN);
665}

References fb(), NAMEDATALEN, and strlcpy().

Referenced by ApplyPendingListenActions(), PrepareTableEntriesForListen(), and SignalBackends().

◆ globalChannelTableHash()

static dshash_hash globalChannelTableHash ( const void key,
size_t  size,
void arg 
)
static

Definition at line 672 of file async.c.

673{
674 const GlobalChannelKey *k = (const GlobalChannelKey *) key;
675 dshash_hash h;
676
678 h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
680
681 return h;
682}

References GlobalChannelKey::channel, DatumGetUInt32(), GlobalChannelKey::dboid, fb(), hash_any(), hash_uint32(), and NAMEDATALEN.

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 2552 of file async.c.

2553{
2554 /*
2555 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2556 * you do here.
2557 */
2558
2559 /* signal that work needs to be done */
2561
2562 /* latch will be set by procsignal_sigusr1_handler */
2563}

References notifyInterruptPending.

Referenced by procsignal_sigusr1_handler().

◆ initGlobalChannelTable()

static void initGlobalChannelTable ( void  )
static

Definition at line 699 of file async.c.

700{
701 MemoryContext oldcontext;
702
703 /* Quick exit if we already did this */
706 return;
707
708 /* Otherwise, use a lock to ensure only one process creates the table */
710
711 /* Be sure any local memory allocated by DSA routines is persistent */
713
715 {
716 /* Initialize dynamic shared hash table for global channels */
722 NULL);
723
724 /* Store handles in shared memory for other backends to use */
728 }
729 else if (!globalChannelTable)
730 {
731 /* Attach to existing dynamic shared hash table */
737 NULL);
738 }
739
740 MemoryContextSwitchTo(oldcontext);
742}

References asyncQueueControl, dsa_attach(), dsa_create, dsa_get_handle(), dsa_pin(), dsa_pin_mapping(), dshash_attach(), dshash_create(), dshash_get_hash_table_handle(), DSHASH_HANDLE_INVALID, fb(), globalChannelDSA, globalChannelTable, AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, globalChannelTableDSHParams, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MemoryContextSwitchTo(), and TopMemoryContext.

Referenced by PreCommit_Notify().

◆ initLocalChannelTable()

static void initLocalChannelTable ( void  )
static

Definition at line 750 of file async.c.

751{
753
754 /* Quick exit if we already did this */
755 if (localChannelTable != NULL)
756 return;
757
758 /* Initialize local hash table for this backend's listened channels */
760 hash_ctl.entrysize = sizeof(ChannelName);
761
763 hash_create("Local Listen Channels",
764 64,
765 &hash_ctl,
767}

References fb(), hash_create(), HASH_ELEM, HASH_STRINGS, HASHCTL::keysize, localChannelTable, and NAMEDATALEN.

Referenced by PreCommit_Notify().

◆ initPendingListenActions()

static void initPendingListenActions ( void  )
static

◆ IsListeningOn()

static bool IsListeningOn ( const char channel)
static

Definition at line 1900 of file async.c.

1901{
1902 if (localChannelTable == NULL)
1903 return false;
1904
1905 return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1906}

References fb(), HASH_FIND, hash_search(), and localChannelTable.

Referenced by asyncQueueProcessPageEntries().

◆ notification_hash()

static uint32 notification_hash ( const void key,
Size  keysize 
)
static

Definition at line 3248 of file async.c.

3249{
3250 const Notification *k = *(const Notification *const *) key;
3251
3252 Assert(keysize == sizeof(Notification *));
3253 /* We don't bother to include the payload's trailing null in the hash */
3254 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3255 k->channel_len + k->payload_len + 1));
3256}

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void key1,
const void key2,
Size  keysize 
)
static

Definition at line 3262 of file async.c.

3263{
3264 const Notification *k1 = *(const Notification *const *) key1;
3265 const Notification *k2 = *(const Notification *const *) key2;
3266
3267 Assert(keysize == sizeof(Notification *));
3268 if (k1->channel_len == k2->channel_len &&
3269 k1->payload_len == k2->payload_len &&
3270 memcmp(k1->data, k2->data,
3271 k1->channel_len + k1->payload_len + 2) == 0)
3272 return 0; /* equal */
3273 return 1; /* not equal */
3274}

References Assert, and fb().

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char channel,
const char payload,
int32  srcPid 
)

Definition at line 3099 of file async.c.

3100{
3102 {
3104
3106 pq_sendint32(&buf, srcPid);
3107 pq_sendstring(&buf, channel);
3108 pq_sendstring(&buf, payload);
3110
3111 /*
3112 * NOTE: we do not do pq_flush() here. Some level of caller will
3113 * handle it later, allowing this message to be combined into a packet
3114 * with other ones.
3115 */
3116 }
3117 else
3118 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3119}

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 1095 of file async.c.

1096{
1098 HASH_SEQ_STATUS *status;
1099
1100 /* stuff done only on the first call of the function */
1101 if (SRF_IS_FIRSTCALL())
1102 {
1103 /* create a function context for cross-call persistence */
1105
1106 /* Initialize hash table iteration if we have any channels */
1107 if (localChannelTable != NULL)
1108 {
1109 MemoryContext oldcontext;
1110
1111 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1112 status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1114 funcctx->user_fctx = status;
1115 MemoryContextSwitchTo(oldcontext);
1116 }
1117 else
1118 {
1119 funcctx->user_fctx = NULL;
1120 }
1121 }
1122
1123 /* stuff done on every call of the function */
1125 status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1126
1127 if (status != NULL)
1128 {
1129 ChannelName *entry;
1130
1131 entry = (ChannelName *) hash_seq_search(status);
1132 if (entry != NULL)
1134 }
1135
1137}

References ChannelName::channel, CStringGetTextDatum, fb(), hash_seq_init(), hash_seq_search(), localChannelTable, MemoryContextSwitchTo(), palloc(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 2165 of file async.c.

2166{
2167 double usage;
2168
2169 /* Advance the queue tail so we don't report a too-large result */
2171
2175
2177}

References asyncQueueAdvanceTail(), asyncQueueUsage(), fb(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 860 of file async.c.

861{
862 const char *channel;
863 const char *payload;
864
865 if (PG_ARGISNULL(0))
866 channel = "";
867 else
869
870 if (PG_ARGISNULL(1))
871 payload = "";
872 else
874
875 /* For NOTIFY as a statement, this is checked in ProcessUtility */
877
878 Async_Notify(channel, payload);
879
881}

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 1185 of file async.c.

1186{
1187 ListCell *p;
1188
1190 return; /* no relevant statements in this xact */
1191
1192 if (Trace_notify)
1193 elog(DEBUG1, "PreCommit_Notify");
1194
1195 /* Preflight for any pending listen/unlisten actions */
1197
1198 if (pendingActions != NULL)
1199 {
1200 /* Ensure we have a local channel table */
1202 /* Create pendingListenActions hash table for this transaction */
1204
1205 /* Stage all the actions this transaction wants to perform */
1206 foreach(p, pendingActions->actions)
1207 {
1209
1210 switch (actrec->action)
1211 {
1212 case LISTEN_LISTEN:
1215 break;
1216 case LISTEN_UNLISTEN:
1218 break;
1221 break;
1222 }
1223 }
1224 }
1225
1226 /* Queue any pending notifies (must happen after the above) */
1227 if (pendingNotifies)
1228 {
1230 bool firstIteration = true;
1231
1232 /*
1233 * Build list of unique channel names being notified for use by
1234 * SignalBackends().
1235 *
1236 * If uniqueChannelHash is available, use it to efficiently get the
1237 * unique channels. Otherwise, fall back to the O(N^2) approach.
1238 */
1241 {
1242 HASH_SEQ_STATUS status;
1244
1246 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1249 channelEntry->channel);
1250 }
1251 else
1252 {
1253 /* O(N^2) approach is better for small number of notifications */
1255 {
1256 char *channel = n->data;
1257 bool found = false;
1258
1259 /* Name present in list? */
1261 {
1262 if (strcmp(oldchan, channel) == 0)
1263 {
1264 found = true;
1265 break;
1266 }
1267 }
1268 /* Add if not already in list */
1269 if (!found)
1272 channel);
1273 }
1274 }
1275
1276 /* Preallocate workspace that will be needed by SignalBackends() */
1277 if (signalPids == NULL)
1279 MaxBackends * sizeof(int32));
1280
1281 if (signalProcnos == NULL)
1283 MaxBackends * sizeof(ProcNumber));
1284
1285 /*
1286 * Make sure that we have an XID assigned to the current transaction.
1287 * GetCurrentTransactionId is cheap if we already have an XID, but not
1288 * so cheap if we don't, and we'd prefer not to do that work while
1289 * holding NotifyQueueLock.
1290 */
1292
1293 /*
1294 * Serialize writers by acquiring a special lock that we hold till
1295 * after commit. This ensures that queue entries appear in commit
1296 * order, and in particular that there are never uncommitted queue
1297 * entries ahead of committed ones, so an uncommitted transaction
1298 * can't block delivery of deliverable notifications.
1299 *
1300 * We use a heavyweight lock so that it'll automatically be released
1301 * after either commit or abort. This also allows deadlocks to be
1302 * detected, though really a deadlock shouldn't be possible here.
1303 *
1304 * The lock is on "database 0", which is pretty ugly but it doesn't
1305 * seem worth inventing a special locktag category just for this.
1306 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1307 * used by the flatfiles mechanism.)
1308 */
1311
1312 /*
1313 * For the direct advancement optimization in SignalBackends(), we
1314 * need to ensure that no other backend can insert queue entries
1315 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1316 * heavyweight lock above provides this guarantee, since it serializes
1317 * all writers.
1318 *
1319 * Note: if the heavyweight lock were ever removed for scalability
1320 * reasons, we could achieve the same guarantee by holding
1321 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1322 * than releasing and reacquiring it for each page as we do below.
1323 */
1324
1325 /* Initialize values to a safe default in case list is empty */
1328
1329 /* Now push the notifications into the queue */
1331 while (nextNotify != NULL)
1332 {
1333 /*
1334 * Add the pending notifications to the queue. We acquire and
1335 * release NotifyQueueLock once per page, which might be overkill
1336 * but it does allow readers to get in while we're doing this.
1337 *
1338 * A full queue is very uncommon and should really not happen,
1339 * given that we have so much space available in the SLRU pages.
1340 * Nevertheless we need to deal with this possibility. Note that
1341 * when we get here we are in the process of committing our
1342 * transaction, but we have not yet committed to clog, so at this
1343 * point in time we can still roll the transaction back.
1344 */
1346 if (firstIteration)
1347 {
1349 firstIteration = false;
1350 }
1352 if (asyncQueueIsFull())
1353 ereport(ERROR,
1355 errmsg("too many notifications in the NOTIFY queue")));
1359 }
1360
1361 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1362 }
1363}

References AccessExclusiveLock, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), BecomeRegisteredListener(), DEBUG1, elog, ereport, errcode(), errmsg, ERROR, NotificationList::events, fb(), foreach_ptr, GetCurrentTransactionId(), hash_seq_init(), hash_seq_search(), initGlobalChannelTable(), initLocalChannelTable(), initPendingListenActions(), InvalidOid, lappend(), lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MemoryContextAlloc(), NIL, pendingActions, pendingNotifies, PrepareTableEntriesForListen(), PrepareTableEntriesForUnlisten(), PrepareTableEntriesForUnlistenAll(), QUEUE_HEAD, queueHeadAfterWrite, queueHeadBeforeWrite, SET_QUEUE_POS, signalPids, signalProcnos, TopMemoryContext, Trace_notify, NotificationList::uniqueChannelHash, and NotificationList::uniqueChannelNames.

Referenced by CommitTransaction().

◆ PrepareTableEntriesForListen()

static void PrepareTableEntriesForListen ( const char channel)
static

Definition at line 1530 of file async.c.

1531{
1533 GlobalChannelEntry *entry;
1534 bool found;
1536 PendingListenEntry *pending;
1537
1538 /*
1539 * Record in local pending hash that we want to LISTEN, overwriting any
1540 * earlier attempt to UNLISTEN.
1541 */
1542 pending = (PendingListenEntry *)
1544 pending->action = PENDING_LISTEN;
1545
1546 /*
1547 * Ensure that there is an entry for the channel in localChannelTable.
1548 * (Should this fail, we can just roll back.) If the transaction fails
1549 * after this point, we will remove the entry if appropriate during
1550 * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1551 * to return TRUE; we assume nothing is going to consult that before
1552 * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1553 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1554 * present to ensure they do the right things; see
1555 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1556 */
1558
1559 /* Pre-allocate entry in shared globalChannelTable */
1560 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1561 entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1562
1563 if (!found)
1564 {
1565 /* New channel entry, so initialize it to a safe state */
1567 entry->numListeners = 0;
1568 entry->allocatedListeners = 0;
1569 }
1570
1571 /*
1572 * Create listenersArray if entry doesn't have one. It's tempting to fold
1573 * this into the !found case, but this coding allows us to cope in case
1574 * dsa_allocate() failed in an earlier attempt.
1575 */
1576 if (!DsaPointerIsValid(entry->listenersArray))
1577 {
1581 }
1582
1585
1586 /*
1587 * Check if we already have a ListenerEntry (possibly from earlier in this
1588 * transaction)
1589 */
1590 for (int i = 0; i < entry->numListeners; i++)
1591 {
1592 if (listeners[i].procNo == MyProcNumber)
1593 {
1594 /* Already have an entry; leave removeOnAbort as-is */
1596 return;
1597 }
1598 }
1599
1600 /* Need to add a new entry; grow array if necessary */
1601 if (entry->numListeners >= entry->allocatedListeners)
1602 {
1603 int new_size = entry->allocatedListeners * 2;
1606 sizeof(ListenerEntry) * new_size);
1608
1610 entry->listenersArray = new_array;
1614 }
1615
1616 listeners[entry->numListeners].procNo = MyProcNumber;
1617 listeners[entry->numListeners].removeOnAbort = true;
1618 entry->numListeners++;
1619
1621}

References PendingListenEntry::action, GlobalChannelEntry::allocatedListeners, dsa_allocate, dsa_free(), dsa_get_address(), DsaPointerIsValid, dshash_find_or_insert, dshash_release_lock(), fb(), globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, HASH_ENTER, hash_search(), i, INITIAL_LISTENERS_ARRAY_SIZE, InvalidDsaPointer, GlobalChannelEntry::listenersArray, localChannelTable, memcpy(), MyDatabaseId, MyProcNumber, GlobalChannelEntry::numListeners, PENDING_LISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlisten()

static void PrepareTableEntriesForUnlisten ( const char channel)
static

Definition at line 1632 of file async.c.

1633{
1634 PendingListenEntry *pending;
1635
1636 /*
1637 * If the channel name is not in localChannelTable, then we are neither
1638 * listening on it nor preparing to listen on it, so we don't need to
1639 * record an UNLISTEN action.
1640 */
1642 if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1643 return;
1644
1645 /*
1646 * Record in local pending hash that we want to UNLISTEN, overwriting any
1647 * earlier attempt to LISTEN. Don't touch localChannelTable or
1648 * globalChannelTable yet - we keep receiving signals until commit.
1649 */
1650 pending = (PendingListenEntry *)
1652 pending->action = PENDING_UNLISTEN;
1653}

References PendingListenEntry::action, Assert, fb(), HASH_ENTER, HASH_FIND, hash_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ PrepareTableEntriesForUnlistenAll()

static void PrepareTableEntriesForUnlistenAll ( void  )
static

Definition at line 1662 of file async.c.

1663{
1666 PendingListenEntry *pending;
1667
1668 /*
1669 * Scan localChannelTable, which will have the names of all channels that
1670 * we are listening on or have prepared to listen on. Record an UNLISTEN
1671 * action for each one, overwriting any earlier attempt to LISTEN.
1672 */
1674 while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1675 {
1676 pending = (PendingListenEntry *)
1678 pending->action = PENDING_UNLISTEN;
1679 }
1680}

References PendingListenEntry::action, fb(), HASH_ENTER, hash_search(), hash_seq_init(), hash_seq_search(), localChannelTable, PENDING_UNLISTEN, and pendingListenActions.

Referenced by PreCommit_Notify().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 3058 of file async.c.

3059{
3060 /* We *must* reset the flag */
3061 notifyInterruptPending = false;
3062
3063 /* Do nothing else if we aren't actively listening */
3065 return;
3066
3067 if (Trace_notify)
3068 elog(DEBUG1, "ProcessIncomingNotify");
3069
3070 set_ps_display("notify interrupt");
3071
3072 /*
3073 * We must run asyncQueueReadAllNotifications inside a transaction, else
3074 * bad things happen if it gets an error.
3075 */
3077
3079
3081
3082 /*
3083 * If this isn't an end-of-command case, we must flush the notify messages
3084 * to ensure frontend gets them promptly.
3085 */
3086 if (flush)
3087 pq_flush();
3088
3089 set_ps_display("idle");
3090
3091 if (Trace_notify)
3092 elog(DEBUG1, "ProcessIncomingNotify: done");
3093}

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, LocalChannelTableIsEmpty, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 2581 of file async.c.

2582{
2584 return; /* not really idle */
2585
2586 /* Loop in case another signal arrives while sending messages */
2588 ProcessIncomingNotify(flush);
2589}

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char channel 
)
static

Definition at line 996 of file async.c.

997{
998 MemoryContext oldcontext;
1000 int my_level = GetCurrentTransactionNestLevel();
1001
1002 /*
1003 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
1004 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
1005 * final per-channel intent is computed during PreCommit_Notify.
1006 */
1008
1009 /* space for terminating null is included in sizeof(ListenAction) */
1011 strlen(channel) + 1);
1012 actrec->action = action;
1013 strcpy(actrec->channel, channel);
1014
1015 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1016 {
1017 ActionList *actions;
1018
1019 /*
1020 * First action in current sub(xact). Note that we allocate the
1021 * ActionList in TopTransactionContext; the nestingLevel might get
1022 * changed later by AtSubCommit_Notify.
1023 */
1024 actions = (ActionList *)
1026 actions->nestingLevel = my_level;
1027 actions->actions = list_make1(actrec);
1028 actions->upper = pendingActions;
1029 pendingActions = actions;
1030 }
1031 else
1033
1034 MemoryContextSwitchTo(oldcontext);
1035}

References ActionList::actions, CurTransactionContext, fb(), GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ RemoveListenerFromChannel()

static void RemoveListenerFromChannel ( GlobalChannelEntry **  entry_ptr,
ListenerEntry listeners,
int  idx 
)
static

Definition at line 1692 of file async.c.

1695{
1696 GlobalChannelEntry *entry = *entry_ptr;
1697
1698 entry->numListeners--;
1699 if (idx < entry->numListeners)
1701 sizeof(ListenerEntry) * (entry->numListeners - idx));
1702
1703 if (entry->numListeners == 0)
1704 {
1707 /* tells caller not to release the entry's lock: */
1708 *entry_ptr = NULL;
1709 }
1710}

References dsa_free(), dshash_delete_entry(), fb(), globalChannelDSA, globalChannelTable, idx(), GlobalChannelEntry::listenersArray, and GlobalChannelEntry::numListeners.

Referenced by ApplyPendingListenActions().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 2263 of file async.c.

2264{
2265 int count;
2266
2267 /* Can't get here without PreCommit_Notify having made the global table */
2269
2270 /* It should have set up these arrays, too */
2272
2273 /*
2274 * Identify backends that we need to signal. We don't want to send
2275 * signals while holding the NotifyQueueLock, so this part just builds a
2276 * list of target PIDs in signalPids[] and signalProcnos[].
2277 */
2278 count = 0;
2279
2281
2282 /* Scan each channel name that we notified in this transaction */
2284 {
2286 GlobalChannelEntry *entry;
2288
2289 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2290 entry = dshash_find(globalChannelTable, &key, false);
2291 if (entry == NULL)
2292 continue; /* nobody is listening */
2293
2295 entry->listenersArray);
2296
2297 /*
2298 * Identify listeners that now need waking, add them to arrays.
2299 *
2300 * Note that we signal listeners regardless of the state of their
2301 * removeOnAbort flags. Hence a new listener that reached PreCommit,
2302 * but then failed before AtCommit_Notify, can receive a signal even
2303 * though it was never really listening. This is okay because it will
2304 * not do anything in response to that signal. If we did not do it
2305 * like this then a new listener might miss some messages due to the
2306 * direct-advance logic below.
2307 */
2308 for (int j = 0; j < entry->numListeners; j++)
2309 {
2310 ProcNumber i = listeners[j].procNo;
2311 int32 pid;
2312 QueuePosition pos;
2313
2315 continue; /* already signaled, no need to repeat */
2316
2317 pid = QUEUE_BACKEND_PID(i);
2318 pos = QUEUE_BACKEND_POS(i);
2319
2320 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2321 continue; /* it's fully caught up already */
2322
2323 Assert(pid != InvalidPid);
2324
2326 signalPids[count] = pid;
2327 signalProcnos[count] = i;
2328 count++;
2329 }
2330
2332 }
2333
2334 /*
2335 * Scan all listeners. Any that are not already pending wakeup must not
2336 * be interested in our notifications (else we'd have set their wakeup
2337 * flags above). Check to see if we can directly advance their queue
2338 * pointers to save a wakeup. Otherwise, if they are far behind, wake
2339 * them anyway so they will catch up.
2340 */
2342 {
2343 int32 pid;
2344 QueuePosition pos;
2345
2347 continue;
2348
2349 /* If it's currently advancing, we should not touch it */
2351 continue;
2352
2353 pid = QUEUE_BACKEND_PID(i);
2354 pos = QUEUE_BACKEND_POS(i);
2355
2356 /*
2357 * We can directly advance the other backend's queue pointer if it's
2358 * not currently advancing (else there are race conditions), and its
2359 * current pointer is not behind queueHeadBeforeWrite (else we'd make
2360 * it miss some older messages), and we'd not be moving the pointer
2361 * backward.
2362 */
2365 {
2366 /* We can directly advance its pointer past what we wrote */
2368 }
2371 {
2372 /* It's idle and far behind, so wake it up */
2373 Assert(pid != InvalidPid);
2374
2376 signalPids[count] = pid;
2377 signalProcnos[count] = i;
2378 count++;
2379 }
2380 }
2381
2383
2384 /* Now send signals */
2385 for (int i = 0; i < count; i++)
2386 {
2387 int32 pid = signalPids[i];
2388
2389 /*
2390 * If we are signaling our own process, no need to involve the kernel;
2391 * just set the flag directly.
2392 */
2393 if (pid == MyProcPid)
2394 {
2396 continue;
2397 }
2398
2399 /*
2400 * Note: assuming things aren't broken, a signal failure here could
2401 * only occur if the target backend exited since we released
2402 * NotifyQueueLock; which is unlikely but certainly possible. So we
2403 * just log a low-level debug message if it happens.
2404 */
2406 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2407 }
2408}

References Assert, asyncQueuePageDiff(), DEBUG3, dsa_get_address(), dshash_find(), dshash_release_lock(), elog, fb(), foreach_ptr, globalChannelDSA, GlobalChannelKeyInit(), globalChannelTable, i, INVALID_PROC_NUMBER, InvalidPid, j, GlobalChannelEntry::listenersArray, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcPid, notifyInterruptPending, GlobalChannelEntry::numListeners, pendingNotifies, PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, QUEUE_POS_PRECEDES, queueHeadAfterWrite, queueHeadBeforeWrite, SendProcSignal(), signalPids, signalProcnos, and NotificationList::uniqueChannelNames.

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

◆ AsyncShmemCallbacks

const ShmemCallbacks AsyncShmemCallbacks
Initial value:
= {
.request_fn = AsyncShmemRequest,
.init_fn = AsyncShmemInit,
}

Definition at line 352 of file async.c.

352 {
353 .request_fn = AsyncShmemRequest,
354 .init_fn = AsyncShmemInit,
355};

◆ globalChannelDSA

◆ globalChannelTable

◆ globalChannelTableDSHParams

const dshash_parameters globalChannelTableDSHParams
static

◆ localChannelTable

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 584 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ notifyInterruptPending

◆ NotifySlruDesc

SlruDesc NotifySlruDesc
static

Definition at line 375 of file async.c.

Referenced by AsyncShmemRequest().

◆ pendingActions

◆ pendingListenActions

◆ pendingNotifies

◆ queueHeadAfterWrite

QueuePosition queueHeadAfterWrite
static

Definition at line 567 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ queueHeadBeforeWrite

QueuePosition queueHeadBeforeWrite
static

Definition at line 566 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalPids

int32* signalPids = NULL
static

Definition at line 574 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ signalProcnos

ProcNumber* signalProcnos = NULL
static

Definition at line 575 of file async.c.

Referenced by PreCommit_Notify(), and SignalBackends().

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 578 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 555 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and BecomeRegisteredListener().