PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)   ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)   ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL }
 

Functions

static int asyncQueuePageDiff (int p, int q)
 
static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (void)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void ProcessCompletedNotifies (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool backendHasSentNotifications = false
 
static bool backendTryAdvanceTail = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 182 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 402 of file async.c.

Referenced by AddEventToPendingNotifies().

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 156 of file async.c.

Referenced by Async_Notify(), and asyncQueueNotificationToEntry().

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

◆ QUEUE_BACKEND_PID

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 231 of file async.c.

Referenced by asyncQueueAddEntries(), and SignalBackends().

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 299 of file async.c.

Referenced by asyncQueueFillWarning().

◆ QUEUE_HEAD

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

Definition at line 318 of file async.c.

Referenced by asyncQueueAdvance(), asyncQueueIsFull(), asyncQueuePageDiff(), and asyncQueueUsage().

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
 
)    ((x).page == (y).page && (x).offset == (y).offset)

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)    ((x).page == 0 && (x).offset == 0)

Definition at line 205 of file async.c.

Referenced by asyncQueueAddEntries().

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492

Definition at line 215 of file async.c.

Referenced by Exec_ListenPreCommit().

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492

Definition at line 209 of file async.c.

Referenced by asyncQueueAdvanceTail(), and asyncQueueFillWarning().

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 180 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 196 of file async.c.

Referenced by asyncQueueAdvance(), and AsyncShmemInit().

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 337 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2328 of file async.c.

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MemSet, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), and notification_match().

Referenced by Async_Notify(), and AtSubCommit_Notify().

2329 {
2331 
2332  /* Create the hash table if it's time to */
2334  pendingNotifies->hashtab == NULL)
2335  {
2336  HASHCTL hash_ctl;
2337  ListCell *l;
2338 
2339  /* Create the hash table */
2340  MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2341  hash_ctl.keysize = sizeof(Notification *);
2342  hash_ctl.entrysize = sizeof(NotificationHash);
2343  hash_ctl.hash = notification_hash;
2344  hash_ctl.match = notification_match;
2345  hash_ctl.hcxt = CurTransactionContext;
2347  hash_create("Pending Notifies",
2348  256L,
2349  &hash_ctl,
2351 
2352  /* Insert all the already-existing events */
2353  foreach(l, pendingNotifies->events)
2354  {
2355  Notification *oldn = (Notification *) lfirst(l);
2356  NotificationHash *hentry;
2357  bool found;
2358 
2360  &oldn,
2361  HASH_ENTER,
2362  &found);
2363  Assert(!found);
2364  hentry->event = oldn;
2365  }
2366  }
2367 
2368  /* Add new event to the list, in order */
2370 
2371  /* Add event to the hash table if needed */
2372  if (pendingNotifies->hashtab != NULL)
2373  {
2374  NotificationHash *hentry;
2375  bool found;
2376 
2378  &n,
2379  HASH_ENTER,
2380  &found);
2381  Assert(!found);
2382  hentry->event = n;
2383  }
2384 }
#define NIL
Definition: pg_list.h:65
struct NotificationHash NotificationHash
List * events
Definition: async.c:397
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define MemSet(start, val, len)
Definition: c.h:949
Notification * event
Definition: async.c:406
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:927
static NotificationList * pendingNotifies
Definition: async.c:409
List * lappend(List *list, void *datum)
Definition: list.c:321
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:402
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:328
Size keysize
Definition: hsearch.h:72
HashCompareFunc match
Definition: hsearch.h:75
HTAB * hashtab
Definition: async.c:398
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2406
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2392
#define HASH_COMPARE
Definition: hsearch.h:90
static int list_length(const List *l)
Definition: pg_list.h:169
HashValueFunc hash
Definition: hsearch.h:74
#define HASH_FUNCTION
Definition: hsearch.h:89

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 757 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

758 {
759  if (Trace_notify)
760  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
761 
762  queue_listen(LISTEN_LISTEN, channel);
763 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:709
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 610 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

611 {
612  int my_level = GetCurrentTransactionNestLevel();
613  size_t channel_len;
614  size_t payload_len;
615  Notification *n;
616  MemoryContext oldcontext;
617 
618  if (IsParallelWorker())
619  elog(ERROR, "cannot send notifications from a parallel worker");
620 
621  if (Trace_notify)
622  elog(DEBUG1, "Async_Notify(%s)", channel);
623 
624  channel_len = channel ? strlen(channel) : 0;
625  payload_len = payload ? strlen(payload) : 0;
626 
627  /* a channel name must be specified */
628  if (channel_len == 0)
629  ereport(ERROR,
630  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
631  errmsg("channel name cannot be empty")));
632 
633  /* enforce length limits */
634  if (channel_len >= NAMEDATALEN)
635  ereport(ERROR,
636  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
637  errmsg("channel name too long")));
638 
639  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
640  ereport(ERROR,
641  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
642  errmsg("payload string too long")));
643 
644  /*
645  * We must construct the Notification entry, even if we end up not using
646  * it, in order to compare it cheaply to existing list entries.
647  *
648  * The notification list needs to live until end of transaction, so store
649  * it in the transaction context.
650  */
652 
653  n = (Notification *) palloc(offsetof(Notification, data) +
654  channel_len + payload_len + 2);
655  n->channel_len = channel_len;
656  n->payload_len = payload_len;
657  strcpy(n->data, channel);
658  if (payload)
659  strcpy(n->data + channel_len + 1, payload);
660  else
661  n->data[channel_len + 1] = '\0';
662 
663  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
664  {
665  NotificationList *notifies;
666 
667  /*
668  * First notify event in current (sub)xact. Note that we allocate the
669  * NotificationList in TopTransactionContext; the nestingLevel might
670  * get changed later by AtSubCommit_Notify.
671  */
672  notifies = (NotificationList *)
674  sizeof(NotificationList));
675  notifies->nestingLevel = my_level;
676  notifies->events = list_make1(n);
677  /* We certainly don't need a hashtable yet */
678  notifies->hashtab = NULL;
679  notifies->upper = pendingNotifies;
680  pendingNotifies = notifies;
681  }
682  else
683  {
684  /* Now check for duplicates */
686  {
687  /* It's a dup, so forget it */
688  pfree(n);
689  MemoryContextSwitchTo(oldcontext);
690  return;
691  }
692 
693  /* Append more events to existing list */
695  }
696 
697  MemoryContextSwitchTo(oldcontext);
698 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:397
MemoryContext TopTransactionContext
Definition: mcxt.c:49
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:610
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2287
int nestingLevel
Definition: async.c:396
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2328
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:399
#define IsParallelWorker()
Definition: parallel.h:61
#define ereport(elevel,...)
Definition: elog.h:144
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
HTAB * hashtab
Definition: async.c:398
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389
bool Trace_notify
Definition: async.c:433
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:214
#define offsetof(type, field)
Definition: c.h:668

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 771 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

772 {
773  if (Trace_notify)
774  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
775 
776  /* If we couldn't possibly be listening, no need to queue anything */
777  if (pendingActions == NULL && !unlistenExitRegistered)
778  return;
779 
780  queue_listen(LISTEN_UNLISTEN, channel);
781 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:709
static ActionList * pendingActions
Definition: async.c:357
static bool unlistenExitRegistered
Definition: async.c:421
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 789 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

790 {
791  if (Trace_notify)
792  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
793 
794  /* If we couldn't possibly be listening, no need to queue anything */
795  if (pendingActions == NULL && !unlistenExitRegistered)
796  return;
797 
799 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:709
static ActionList * pendingActions
Definition: async.c:357
static bool unlistenExitRegistered
Definition: async.c:421
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 842 of file async.c.

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

843 {
846 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1185
static void asyncQueueUnregister(void)
Definition: async.c:1301

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2287 of file async.c.

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, and Notification::payload_len.

Referenced by Async_Notify(), and AtSubCommit_Notify().

2288 {
2289  if (pendingNotifies == NULL)
2290  return false;
2291 
2292  if (pendingNotifies->hashtab != NULL)
2293  {
2294  /* Use the hash table to probe for a match */
2296  &n,
2297  HASH_FIND,
2298  NULL))
2299  return true;
2300  }
2301  else
2302  {
2303  /* Must scan the event list */
2304  ListCell *l;
2305 
2306  foreach(l, pendingNotifies->events)
2307  {
2308  Notification *oldn = (Notification *) lfirst(l);
2309 
2310  if (n->channel_len == oldn->channel_len &&
2311  n->payload_len == oldn->payload_len &&
2312  memcmp(n->data, oldn->data,
2313  n->channel_len + n->payload_len + 2) == 0)
2314  return true;
2315  }
2316  }
2317 
2318  return false;
2319 }
List * events
Definition: async.c:397
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:927
static NotificationList * pendingNotifies
Definition: async.c:409
HTAB * hashtab
Definition: async.c:398
#define lfirst(lc)
Definition: pg_list.h:190
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1444 of file async.c.

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), backendTryAdvanceTail, AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), and SimpleLruZeroPage().

Referenced by PreCommit_Notify().

1445 {
1446  AsyncQueueEntry qe;
1447  QueuePosition queue_head;
1448  int pageno;
1449  int offset;
1450  int slotno;
1451 
1452  /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
1453  LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
1454 
1455  /*
1456  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1457  * memory upon exiting. The reason for this is that if we have to advance
1458  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1459  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1460  * subsequent insertions would try to put entries into a page that slru.c
1461  * thinks doesn't exist yet.) So, use a local position variable. Note
1462  * that if we do fail, any already-inserted queue entries are forgotten;
1463  * this is okay, since they'd be useless anyway after our transaction
1464  * rolls back.
1465  */
1466  queue_head = QUEUE_HEAD;
1467 
1468  /*
1469  * If this is the first write since the postmaster started, we need to
1470  * initialize the first page of the async SLRU. Otherwise, the current
1471  * page should be initialized already, so just fetch it.
1472  *
1473  * (We could also take the first path when the SLRU position has just
1474  * wrapped around, but re-zeroing the page is harmless in that case.)
1475  */
1476  pageno = QUEUE_POS_PAGE(queue_head);
1477  if (QUEUE_POS_IS_ZERO(queue_head))
1478  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1479  else
1480  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1482 
1483  /* Note we mark the page dirty before writing in it */
1484  NotifyCtl->shared->page_dirty[slotno] = true;
1485 
1486  while (nextNotify != NULL)
1487  {
1488  Notification *n = (Notification *) lfirst(nextNotify);
1489 
1490  /* Construct a valid queue entry in local variable qe */
1492 
1493  offset = QUEUE_POS_OFFSET(queue_head);
1494 
1495  /* Check whether the entry really fits on the current page */
1496  if (offset + qe.length <= QUEUE_PAGESIZE)
1497  {
1498  /* OK, so advance nextNotify past this item */
1499  nextNotify = lnext(pendingNotifies->events, nextNotify);
1500  }
1501  else
1502  {
1503  /*
1504  * Write a dummy entry to fill up the page. Actually readers will
1505  * only check dboid and since it won't match any reader's database
1506  * OID, they will ignore this entry and move on.
1507  */
1508  qe.length = QUEUE_PAGESIZE - offset;
1509  qe.dboid = InvalidOid;
1510  qe.data[0] = '\0'; /* empty channel */
1511  qe.data[1] = '\0'; /* empty payload */
1512  }
1513 
1514  /* Now copy qe into the shared buffer page */
1515  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1516  &qe,
1517  qe.length);
1518 
1519  /* Advance queue_head appropriately, and detect if page is full */
1520  if (asyncQueueAdvance(&(queue_head), qe.length))
1521  {
1522  /*
1523  * Page is full, so we're done here, but first fill the next page
1524  * with zeroes. The reason to do this is to ensure that slru.c's
1525  * idea of the head page is always the same as ours, which avoids
1526  * boundary problems in SimpleLruTruncate. The test in
1527  * asyncQueueIsFull() ensured that there is room to create this
1528  * page without overrunning the queue.
1529  */
1530  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1531 
1532  /*
1533  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1534  * set flag to remember that we should try to advance the tail
1535  * pointer (we don't want to actually do that right here).
1536  */
1537  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1538  backendTryAdvanceTail = true;
1539 
1540  /* And exit the loop */
1541  break;
1542  }
1543  }
1544 
1545  /* Success, so update the global QUEUE_HEAD */
1546  QUEUE_HEAD = queue_head;
1547 
1548  LWLockRelease(NotifySLRULock);
1549 
1550  return nextNotify;
1551 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:205
List * events
Definition: async.c:397
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:321
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1408
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1373
#define QUEUE_HEAD
Definition: async.c:284
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NotifyCtl
Definition: async.c:297
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static NotificationList * pendingNotifies
Definition: async.c:409
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:382
static bool backendTryAdvanceTail
Definition: async.c:430
#define QUEUE_PAGESIZE
Definition: async.c:298
#define InvalidTransactionId
Definition: transam.h:31
#define QUEUE_CLEANUP_DELAY
Definition: async.c:231
#define InvalidOid
Definition: postgres_ext.h:36
#define lfirst(lc)
Definition: pg_list.h:190
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:267
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1373 of file async.c.

References Assert, AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

1374 {
1375  int pageno = QUEUE_POS_PAGE(*position);
1376  int offset = QUEUE_POS_OFFSET(*position);
1377  bool pageJump = false;
1378 
1379  /*
1380  * Move to the next writing position: First jump over what we have just
1381  * written or read.
1382  */
1383  offset += entryLength;
1384  Assert(offset <= QUEUE_PAGESIZE);
1385 
1386  /*
1387  * In a second step check if another entry can possibly be written to the
1388  * page. If so, stay here, we have reached the next position. If not, then
1389  * we need to move on to the next page.
1390  */
1392  {
1393  pageno++;
1394  if (pageno > QUEUE_MAX_PAGE)
1395  pageno = 0; /* wrap around */
1396  offset = 0;
1397  pageJump = true;
1398  }
1399 
1400  SET_QUEUE_POS(*position, pageno, offset);
1401  return pageJump;
1402 }
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
#define AsyncQueueEntryEmptySize
Definition: async.c:182
#define QUEUE_PAGESIZE
Definition: async.c:298
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
#define QUEUEALIGN(len)
Definition: async.c:180
#define Assert(condition)
Definition: c.h:745
#define QUEUE_MAX_PAGE
Definition: async.c:318
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2173 of file async.c.

References Assert, asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by pg_notification_queue_usage(), and ProcessCompletedNotifies().

2174 {
2175  QueuePosition min;
2176  int oldtailpage;
2177  int newtailpage;
2178  int boundary;
2179 
2180  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2181  min = QUEUE_HEAD;
2183  {
2185  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2186  }
2187  oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
2188  QUEUE_TAIL = min;
2189  LWLockRelease(NotifyQueueLock);
2190 
2191  /*
2192  * We can truncate something if the global tail advanced across an SLRU
2193  * segment boundary.
2194  *
2195  * XXX it might be better to truncate only once every several segments, to
2196  * reduce the number of directory scans.
2197  */
2198  newtailpage = QUEUE_POS_PAGE(min);
2199  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2200  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2201  {
2202  /*
2203  * SimpleLruTruncate() will ask for NotifySLRULock but will also
2204  * release the lock again.
2205  */
2206  SimpleLruTruncate(NotifyCtl, newtailpage);
2207  }
2208 }
#define QUEUE_TAIL
Definition: async.c:285
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1196
#define QUEUE_HEAD
Definition: async.c:284
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NotifyCtl
Definition: async.c:297
#define QUEUE_FIRST_LISTENER
Definition: async.c:286
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:289
#define Assert(condition)
Definition: c.h:745
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
int i
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_BACKEND_POS(i)
Definition: async.c:290
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_MIN(x, y)
Definition: async.c:209
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1607 of file async.c.

References Assert, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

1608 {
1609  double fillDegree;
1610  TimestampTz t;
1611 
1612  fillDegree = asyncQueueUsage();
1613  if (fillDegree < 0.5)
1614  return;
1615 
1616  t = GetCurrentTimestamp();
1617 
1620  {
1621  QueuePosition min = QUEUE_HEAD;
1622  int32 minPid = InvalidPid;
1623 
1625  {
1627  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1628  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1629  minPid = QUEUE_BACKEND_PID(i);
1630  }
1631 
1632  ereport(WARNING,
1633  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1634  (minPid != InvalidPid ?
1635  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1636  : 0),
1637  (minPid != InvalidPid ?
1638  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1639  : 0)));
1640 
1642  }
1643 }
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
static double asyncQueueUsage(void)
Definition: async.c:1578
#define QUEUE_HEAD
Definition: async.c:284
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1673
signed int int32
Definition: c.h:362
#define QUEUE_FIRST_LISTENER
Definition: async.c:286
int errdetail(const char *fmt,...)
Definition: elog.c:957
static AsyncQueueControl * asyncQueueControl
Definition: async.c:282
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:299
#define WARNING
Definition: elog.h:40
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:289
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:745
int errmsg(const char *fmt,...)
Definition: elog.c:824
int i
TimestampTz lastQueueFillWarn
Definition: async.c:277
#define QUEUE_BACKEND_POS(i)
Definition: async.c:290
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MIN(x, y)
Definition: async.c:209
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1342 of file async.c.

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_TAIL, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

1343 {
1344  int nexthead;
1345  int boundary;
1346 
1347  /*
1348  * The queue is full if creating a new head page would create a page that
1349  * logically precedes the current global tail pointer, ie, the head
1350  * pointer would wrap around compared to the tail. We cannot create such
1351  * a head page for fear of confusing slru.c. For safety we round the tail
1352  * pointer back to a segment boundary (compare the truncation logic in
1353  * asyncQueueAdvanceTail).
1354  *
1355  * Note that this test is *not* dependent on how much space there is on
1356  * the current head page. This is necessary because asyncQueueAddEntries
1357  * might try to create the next head page in any case.
1358  */
1359  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1360  if (nexthead > QUEUE_MAX_PAGE)
1361  nexthead = 0; /* wrap around */
1362  boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
1363  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1364  return asyncQueuePagePrecedes(nexthead, boundary);
1365 }
#define QUEUE_TAIL
Definition: async.c:285
#define QUEUE_HEAD
Definition: async.c:284
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_MAX_PAGE
Definition: async.c:318
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1408 of file async.c.

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, AsyncQueueEntry::data, Notification::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

1409 {
1410  size_t channellen = n->channel_len;
1411  size_t payloadlen = n->payload_len;
1412  int entryLength;
1413 
1414  Assert(channellen < NAMEDATALEN);
1415  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1416 
1417  /* The terminators are already included in AsyncQueueEntryEmptySize */
1418  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1419  entryLength = QUEUEALIGN(entryLength);
1420  qe->length = entryLength;
1421  qe->dboid = MyDatabaseId;
1422  qe->xid = GetCurrentTransactionId();
1423  qe->srcPid = MyProcPid;
1424  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1425 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
int MyProcPid
Definition: globals.c:40
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
#define NAMEDATALEN
#define AsyncQueueEntryEmptySize
Definition: async.c:182
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
#define QUEUEALIGN(len)
Definition: async.c:180
Oid MyDatabaseId
Definition: globals.c:85
#define Assert(condition)
Definition: c.h:745
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175

◆ asyncQueuePageDiff()

static int asyncQueuePageDiff ( int  p,
int  q 
)
static

Definition at line 471 of file async.c.

References Assert, and QUEUE_MAX_PAGE.

Referenced by asyncQueuePagePrecedes(), and SignalBackends().

472 {
473  int diff;
474 
475  /*
476  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
477  * in the range 0..QUEUE_MAX_PAGE.
478  */
479  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
480  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
481 
482  diff = p - q;
483  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
484  diff -= QUEUE_MAX_PAGE + 1;
485  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
486  diff += QUEUE_MAX_PAGE + 1;
487  return diff;
488 }
#define Assert(condition)
Definition: c.h:745
#define QUEUE_MAX_PAGE
Definition: async.c:318

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 492 of file async.c.

References asyncQueuePageDiff().

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

493 {
494  return asyncQueuePageDiff(p, q) < 0;
495 }
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:471

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2084 of file async.c.

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

2088 {
2089  bool reachedStop = false;
2090  bool reachedEndOfPage;
2091  AsyncQueueEntry *qe;
2092 
2093  do
2094  {
2095  QueuePosition thisentry = *current;
2096 
2097  if (QUEUE_POS_EQUAL(thisentry, stop))
2098  break;
2099 
2100  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2101 
2102  /*
2103  * Advance *current over this message, possibly to the next page. As
2104  * noted in the comments for asyncQueueReadAllNotifications, we must
2105  * do this before possibly failing while processing the message.
2106  */
2107  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2108 
2109  /* Ignore messages destined for other databases */
2110  if (qe->dboid == MyDatabaseId)
2111  {
2112  if (XidInMVCCSnapshot(qe->xid, snapshot))
2113  {
2114  /*
2115  * The source transaction is still in progress, so we can't
2116  * process this message yet. Break out of the loop, but first
2117  * back up *current so we will reprocess the message next
2118  * time. (Note: it is unlikely but not impossible for
2119  * TransactionIdDidCommit to fail, so we can't really avoid
2120  * this advance-then-back-up behavior when dealing with an
2121  * uncommitted message.)
2122  *
2123  * Note that we must test XidInMVCCSnapshot before we test
2124  * TransactionIdDidCommit, else we might return a message from
2125  * a transaction that is not yet visible to snapshots; compare
2126  * the comments at the head of heapam_visibility.c.
2127  *
2128  * Also, while our own xact won't be listed in the snapshot,
2129  * we need not check for TransactionIdIsCurrentTransactionId
2130  * because our transaction cannot (yet) have queued any
2131  * messages.
2132  */
2133  *current = thisentry;
2134  reachedStop = true;
2135  break;
2136  }
2137  else if (TransactionIdDidCommit(qe->xid))
2138  {
2139  /* qe->data is the null-terminated channel name */
2140  char *channel = qe->data;
2141 
2142  if (IsListeningOn(channel))
2143  {
2144  /* payload follows channel name */
2145  char *payload = qe->data + strlen(channel) + 1;
2146 
2147  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2148  }
2149  }
2150  else
2151  {
2152  /*
2153  * The source transaction aborted or crashed, so we just
2154  * ignore its notifications.
2155  */
2156  }
2157  }
2158 
2159  /* Loop back if we're not at end of page */
2160  } while (!reachedEndOfPage);
2161 
2162  if (QUEUE_POS_EQUAL(*current, stop))
2163  reachedStop = true;
2164 
2165  return reachedStop;
2166 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2259
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1373
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
static bool IsListeningOn(const char *channel)
Definition: async.c:1282
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2262

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1918 of file async.c.

References Assert, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), ProcessCompletedNotifies(), and ProcessIncomingNotify().

1919 {
1920  volatile QueuePosition pos;
1921  QueuePosition oldpos;
1922  QueuePosition head;
1923  Snapshot snapshot;
1924 
1925  /* page_buffer must be adequately aligned, so use a union */
1926  union
1927  {
1928  char buf[QUEUE_PAGESIZE];
1929  AsyncQueueEntry align;
1930  } page_buffer;
1931 
1932  /* Fetch current state */
1933  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1934  /* Assert checks that we have a valid state entry */
1936  pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
1937  head = QUEUE_HEAD;
1938  LWLockRelease(NotifyQueueLock);
1939 
1940  if (QUEUE_POS_EQUAL(pos, head))
1941  {
1942  /* Nothing to do, we have read all notifications already. */
1943  return;
1944  }
1945 
1946  /*----------
1947  * Get snapshot we'll use to decide which xacts are still in progress.
1948  * This is trickier than it might seem, because of race conditions.
1949  * Consider the following example:
1950  *
1951  * Backend 1: Backend 2:
1952  *
1953  * transaction starts
1954  * UPDATE foo SET ...;
1955  * NOTIFY foo;
1956  * commit starts
1957  * queue the notify message
1958  * transaction starts
1959  * LISTEN foo; -- first LISTEN in session
1960  * SELECT * FROM foo WHERE ...;
1961  * commit to clog
1962  * commit starts
1963  * add backend 2 to array of listeners
1964  * advance to queue head (this code)
1965  * commit to clog
1966  *
1967  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1968  * wasn't committed yet. Ideally we'd ensure that client 2 would
1969  * eventually get transaction 1's notify message, but there's no way
1970  * to do that; until we're in the listener array, there's no guarantee
1971  * that the notify message doesn't get removed from the queue.
1972  *
1973  * Therefore the coding technique transaction 2 is using is unsafe:
1974  * applications must commit a LISTEN before inspecting database state,
1975  * if they want to ensure they will see notifications about subsequent
1976  * changes to that state.
1977  *
1978  * What we do guarantee is that we'll see all notifications from
1979  * transactions committing after the snapshot we take here.
1980  * Exec_ListenPreCommit has already added us to the listener array,
1981  * so no not-yet-committed messages can be removed from the queue
1982  * before we see them.
1983  *----------
1984  */
1985  snapshot = RegisterSnapshot(GetLatestSnapshot());
1986 
1987  /*
1988  * It is possible that we fail while trying to send a message to our
1989  * frontend (for example, because of encoding conversion failure). If
1990  * that happens it is critical that we not try to send the same message
1991  * over and over again. Therefore, we place a PG_TRY block here that will
1992  * forcibly advance our queue position before we lose control to an error.
1993  * (We could alternatively retake NotifyQueueLock and move the position
1994  * before handling each individual message, but that seems like too much
1995  * lock traffic.)
1996  */
1997  PG_TRY();
1998  {
1999  bool reachedStop;
2000 
2001  do
2002  {
2003  int curpage = QUEUE_POS_PAGE(pos);
2004  int curoffset = QUEUE_POS_OFFSET(pos);
2005  int slotno;
2006  int copysize;
2007 
2008  /*
2009  * We copy the data from SLRU into a local buffer, so as to avoid
2010  * holding the NotifySLRULock while we are examining the entries
2011  * and possibly transmitting them to our frontend. Copy only the
2012  * part of the page we will actually inspect.
2013  */
2014  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
2016  if (curpage == QUEUE_POS_PAGE(head))
2017  {
2018  /* we only want to read as far as head */
2019  copysize = QUEUE_POS_OFFSET(head) - curoffset;
2020  if (copysize < 0)
2021  copysize = 0; /* just for safety */
2022  }
2023  else
2024  {
2025  /* fetch all the rest of the page */
2026  copysize = QUEUE_PAGESIZE - curoffset;
2027  }
2028  memcpy(page_buffer.buf + curoffset,
2029  NotifyCtl->shared->page_buffer[slotno] + curoffset,
2030  copysize);
2031  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2032  LWLockRelease(NotifySLRULock);
2033 
2034  /*
2035  * Process messages up to the stop position, end of page, or an
2036  * uncommitted message.
2037  *
2038  * Our stop position is what we found to be the head's position
2039  * when we entered this function. It might have changed already.
2040  * But if it has, we will receive (or have already received and
2041  * queued) another signal and come here again.
2042  *
2043  * We are not holding NotifyQueueLock here! The queue can only
2044  * extend beyond the head pointer (see above) and we leave our
2045  * backend's pointer where it is so nobody will truncate or
2046  * rewrite pages under us. Especially we don't want to hold a lock
2047  * while sending the notifications to the frontend.
2048  */
2049  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2050  page_buffer.buf,
2051  snapshot);
2052  } while (!reachedStop);
2053  }
2054  PG_FINALLY();
2055  {
2056  /* Update shared state */
2057  LWLockAcquire(NotifyQueueLock, LW_SHARED);
2059  LWLockRelease(NotifyQueueLock);
2060  }
2061  PG_END_TRY();
2062 
2063  /* Done with snapshot */
2064  UnregisterSnapshot(snapshot);
2065 }
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:856
#define QUEUE_HEAD
Definition: async.c:284
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NotifyCtl
Definition: async.c:297
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static char * buf
Definition: pg_test_fsync.c:67
#define QUEUE_PAGESIZE
Definition: async.c:298
#define InvalidTransactionId
Definition: transam.h:31
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:898
#define PG_FINALLY()
Definition: elog.h:312
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:482
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2084
#define Assert(condition)
Definition: c.h:745
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:374
#define PG_TRY()
Definition: elog.h:295
#define QUEUE_BACKEND_POS(i)
Definition: async.c:290
#define PG_END_TRY()
Definition: elog.h:320
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1301 of file async.c.

References amRegisteredListener, Assert, i, InvalidBackendId, InvalidOid, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

1302 {
1303  Assert(listenChannels == NIL); /* else caller error */
1304 
1305  if (!amRegisteredListener) /* nothing to do */
1306  return;
1307 
1308  /*
1309  * Need exclusive lock here to manipulate list links.
1310  */
1311  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1312  /* Mark our entry as invalid */
1315  /* and remove it from the list */
1318  else
1319  {
1321  {
1323  {
1325  break;
1326  }
1327  }
1328  }
1330  LWLockRelease(NotifyQueueLock);
1331 
1332  /* mark ourselves as no longer listed in the global array */
1333  amRegisteredListener = false;
1334 }
#define NIL
Definition: pg_list.h:65
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
static List * listenChannels
Definition: async.c:325
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define QUEUE_FIRST_LISTENER
Definition: async.c:286
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:424
int BackendId
Definition: backendid.h:21
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:288
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:289
#define Assert(condition)
Definition: c.h:745
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
int i
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1578 of file async.c.

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

1579 {
1580  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1581  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1582  int occupied;
1583 
1584  occupied = headPage - tailPage;
1585 
1586  if (occupied == 0)
1587  return (double) 0; /* fast exit for common case */
1588 
1589  if (occupied < 0)
1590  {
1591  /* head has wrapped around, tail not yet */
1592  occupied += QUEUE_MAX_PAGE + 1;
1593  }
1594 
1595  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1596 }
#define QUEUE_TAIL
Definition: async.c:285
#define QUEUE_HEAD
Definition: async.c:284
#define QUEUE_MAX_PAGE
Definition: async.c:318
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 518 of file async.c.

References add_size(), asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

519 {
520  bool found;
521  Size size;
522 
523  /*
524  * Create or attach to the AsyncQueueControl structure.
525  *
526  * The used entries in the backend[] array run from 1 to MaxBackends; the
527  * zero'th entry is unused but must be allocated.
528  */
529  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
530  size = add_size(size, offsetof(AsyncQueueControl, backend));
531 
533  ShmemInitStruct("Async Queue Control", size, &found);
534 
535  if (!found)
536  {
537  /* First time through, so initialize it */
538  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
539  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
542  /* zero'th entry won't be used, but let's initialize it anyway */
543  for (int i = 0; i <= MaxBackends; i++)
544  {
549  }
550  }
551 
552  /*
553  * Set up SLRU management of the pg_notify data.
554  */
555  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
557  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER);
558  /* Override default assumption that writes should be fsync'd */
559  NotifyCtl->do_fsync = false;
560 
561  if (!found)
562  {
563  /*
564  * During start or reboot, clean out the pg_notify directory.
565  */
567  }
568 }
#define QUEUE_TAIL
Definition: async.c:285
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1384
#define QUEUE_HEAD
Definition: async.c:284
#define NotifyCtl
Definition: async.c:297
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:286
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
int MaxBackends
Definition: globals.c:136
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492
static AsyncQueueControl * asyncQueueControl
Definition: async.c:282
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:288
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:289
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1407
size_t Size
Definition: c.h:473
int i
TimestampTz lastQueueFillWarn
Definition: async.c:277
#define QUEUE_BACKEND_POS(i)
Definition: async.c:290
#define offsetof(type, field)
Definition: c.h:668
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:175

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 501 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

502 {
503  Size size;
504 
505  /* This had better match AsyncShmemInit */
506  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
507  size = add_size(size, offsetof(AsyncQueueControl, backend));
508 
510 
511  return size;
512 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:144
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:136
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:473
#define offsetof(type, field)
Definition: c.h:668

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1743 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1744 {
1745  /*
1746  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1747  * we have registered as a listener but have not made any entry in
1748  * listenChannels. In that case, deregister again.
1749  */
1752 
1753  /* And clean up */
1755 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:325
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2422
static void asyncQueueUnregister(void)
Definition: async.c:1301
static bool amRegisteredListener
Definition: async.c:424

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 982 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

983 {
984  ListCell *p;
985 
986  /*
987  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
988  * return as soon as possible
989  */
991  return;
992 
993  if (Trace_notify)
994  elog(DEBUG1, "AtCommit_Notify");
995 
996  /* Perform any pending listen/unlisten actions */
997  if (pendingActions != NULL)
998  {
999  foreach(p, pendingActions->actions)
1000  {
1001  ListenAction *actrec = (ListenAction *) lfirst(p);
1002 
1003  switch (actrec->action)
1004  {
1005  case LISTEN_LISTEN:
1006  Exec_ListenCommit(actrec->channel);
1007  break;
1008  case LISTEN_UNLISTEN:
1009  Exec_UnlistenCommit(actrec->channel);
1010  break;
1011  case LISTEN_UNLISTEN_ALL:
1013  break;
1014  }
1015  }
1016  }
1017 
1018  /* If no longer listening to anything, get out of listener array */
1021 
1022  /* And clean up */
1024 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:357
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1185
static List * listenChannels
Definition: async.c:325
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2422
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1154
static NotificationList * pendingNotifies
Definition: async.c:409
List * actions
Definition: async.c:353
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1127
static void asyncQueueUnregister(void)
Definition: async.c:1301
static bool amRegisteredListener
Definition: async.c:424
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:347
ListenActionKind action
Definition: async.c:346

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 855 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

856 {
857  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
859  ereport(ERROR,
860  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
861  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
862 }
static ActionList * pendingActions
Definition: async.c:357
int errcode(int sqlerrcode)
Definition: elog.c:610
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1833 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1834 {
1835  int my_level = GetCurrentTransactionNestLevel();
1836 
1837  /*
1838  * All we have to do is pop the stack --- the actions/notifies made in
1839  * this subxact are no longer interesting, and the space will be freed
1840  * when CurTransactionContext is recycled. We still have to free the
1841  * ActionList and NotificationList objects themselves, though, because
1842  * those are allocated in TopTransactionContext.
1843  *
1844  * Note that there might be no entries at all, or no entries for the
1845  * current subtransaction level, either because none were ever created, or
1846  * because we reentered this routine due to trouble during subxact abort.
1847  */
1848  while (pendingActions != NULL &&
1849  pendingActions->nestingLevel >= my_level)
1850  {
1851  ActionList *childPendingActions = pendingActions;
1852 
1854  pfree(childPendingActions);
1855  }
1856 
1857  while (pendingNotifies != NULL &&
1858  pendingNotifies->nestingLevel >= my_level)
1859  {
1860  NotificationList *childPendingNotifies = pendingNotifies;
1861 
1863  pfree(childPendingNotifies);
1864  }
1865 }
static ActionList * pendingActions
Definition: async.c:357
int nestingLevel
Definition: async.c:396
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:409
struct NotificationList * upper
Definition: async.c:399
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1763 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1764 {
1765  int my_level = GetCurrentTransactionNestLevel();
1766 
1767  /* If there are actions at our nesting level, we must reparent them. */
1768  if (pendingActions != NULL &&
1769  pendingActions->nestingLevel >= my_level)
1770  {
1771  if (pendingActions->upper == NULL ||
1772  pendingActions->upper->nestingLevel < my_level - 1)
1773  {
1774  /* nothing to merge; give the whole thing to the parent */
1776  }
1777  else
1778  {
1779  ActionList *childPendingActions = pendingActions;
1780 
1782 
1783  /*
1784  * Mustn't try to eliminate duplicates here --- see queue_listen()
1785  */
1788  childPendingActions->actions);
1789  pfree(childPendingActions);
1790  }
1791  }
1792 
1793  /* If there are notifies at our nesting level, we must reparent them. */
1794  if (pendingNotifies != NULL &&
1795  pendingNotifies->nestingLevel >= my_level)
1796  {
1797  Assert(pendingNotifies->nestingLevel == my_level);
1798 
1799  if (pendingNotifies->upper == NULL ||
1800  pendingNotifies->upper->nestingLevel < my_level - 1)
1801  {
1802  /* nothing to merge; give the whole thing to the parent */
1804  }
1805  else
1806  {
1807  /*
1808  * Formerly, we didn't bother to eliminate duplicates here, but
1809  * now we must, else we fall foul of "Assert(!found)", either here
1810  * or during a later attempt to build the parent-level hashtable.
1811  */
1812  NotificationList *childPendingNotifies = pendingNotifies;
1813  ListCell *l;
1814 
1816  /* Insert all the subxact's events into parent, except for dups */
1817  foreach(l, childPendingNotifies->events)
1818  {
1819  Notification *childn = (Notification *) lfirst(l);
1820 
1821  if (!AsyncExistsPendingNotify(childn))
1822  AddEventToPendingNotifies(childn);
1823  }
1824  pfree(childPendingNotifies);
1825  }
1826  }
1827 }
List * events
Definition: async.c:397
static ActionList * pendingActions
Definition: async.c:357
List * list_concat(List *list1, const List *list2)
Definition: list.c:515
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2287
int nestingLevel
Definition: async.c:396
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:409
List * actions
Definition: async.c:353
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2328
struct NotificationList * upper
Definition: async.c:399
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2422 of file async.c.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

2423 {
2424  /*
2425  * Everything's allocated in either TopTransactionContext or the context
2426  * for the subtransaction to which it corresponds. So, there's nothing to
2427  * do here except reset the pointers; the space will be reclaimed when the
2428  * contexts are deleted.
2429  */
2430  pendingActions = NULL;
2431  pendingNotifies = NULL;
2432 }
static ActionList * pendingActions
Definition: async.c:357
static NotificationList * pendingNotifies
Definition: async.c:409

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1127 of file async.c.

References IsListeningOn(), lappend(), MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

1128 {
1129  MemoryContext oldcontext;
1130 
1131  /* Do nothing if we are already listening on this channel */
1132  if (IsListeningOn(channel))
1133  return;
1134 
1135  /*
1136  * Add the new channel name to listenChannels.
1137  *
1138  * XXX It is theoretically possible to get an out-of-memory failure here,
1139  * which would be bad because we already committed. For the moment it
1140  * doesn't seem worth trying to guard against that, but maybe improve this
1141  * later.
1142  */
1145  MemoryContextSwitchTo(oldcontext);
1146 }
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:325
static bool IsListeningOn(const char *channel)
Definition: async.c:1282
MemoryContext TopMemoryContext
Definition: mcxt.c:44
List * lappend(List *list, void *datum)
Definition: list.c:321

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1032 of file async.c.

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

1033 {
1034  QueuePosition head;
1035  QueuePosition max;
1036  BackendId prevListener;
1037 
1038  /*
1039  * Nothing to do if we are already listening to something, nor if we
1040  * already ran this routine in this transaction.
1041  */
1043  return;
1044 
1045  if (Trace_notify)
1046  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1047 
1048  /*
1049  * Before registering, make sure we will unlisten before dying. (Note:
1050  * this action does not get undone if we abort later.)
1051  */
1053  {
1055  unlistenExitRegistered = true;
1056  }
1057 
1058  /*
1059  * This is our first LISTEN, so establish our pointer.
1060  *
1061  * We set our pointer to the global tail pointer and then move it forward
1062  * over already-committed notifications. This ensures we cannot miss any
1063  * not-yet-committed notifications. We might get a few more but that
1064  * doesn't hurt.
1065  *
1066  * In some scenarios there might be a lot of committed notifications that
1067  * have not yet been pruned away (because some backend is being lazy about
1068  * reading them). To reduce our startup time, we can look at other
1069  * backends and adopt the maximum "pos" pointer of any backend that's in
1070  * our database; any notifications it's already advanced over are surely
1071  * committed and need not be re-examined by us. (We must consider only
1072  * backends connected to our DB, because others will not have bothered to
1073  * check committed-ness of notifications in our DB.)
1074  *
1075  * We need exclusive lock here so we can look at other backends' entries
1076  * and manipulate the list links.
1077  */
1078  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1079  head = QUEUE_HEAD;
1080  max = QUEUE_TAIL;
1081  prevListener = InvalidBackendId;
1083  {
1085  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1086  /* Also find last listening backend before this one */
1087  if (i < MyBackendId)
1088  prevListener = i;
1089  }
1093  /* Insert backend into list of listeners at correct position */
1094  if (prevListener > 0)
1095  {
1097  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1098  }
1099  else
1100  {
1103  }
1104  LWLockRelease(NotifyQueueLock);
1105 
1106  /* Now we are listed in the global array, so remember we're listening */
1107  amRegisteredListener = true;
1108 
1109  /*
1110  * Try to move our pointer forward as far as possible. This will skip
1111  * over already-committed notifications, which we want to do because they
1112  * might be quite stale. Note that we are not yet listening on anything,
1113  * so we won't deliver such notifications to our frontend. Also, although
1114  * our transaction might have executed NOTIFY, those message(s) aren't
1115  * queued yet so we won't skip them here.
1116  */
1117  if (!QUEUE_POS_EQUAL(max, head))
1119 }
#define QUEUE_TAIL
Definition: async.c:285
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
#define QUEUE_HEAD
Definition: async.c:284
static bool unlistenExitRegistered
Definition: async.c:421
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define QUEUE_FIRST_LISTENER
Definition: async.c:286
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:842
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:424
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:288
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1918
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:289
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:290
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MAX(x, y)
Definition: async.c:215

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1185 of file async.c.

References DEBUG1, elog, list_free_deep(), MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

1186 {
1187  if (Trace_notify)
1188  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1189 
1191  listenChannels = NIL;
1192 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:325
void list_free_deep(List *list)
Definition: list.c:1390
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1154 of file async.c.

References DEBUG1, elog, foreach_delete_current, lfirst, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

1155 {
1156  ListCell *q;
1157 
1158  if (Trace_notify)
1159  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1160 
1161  foreach(q, listenChannels)
1162  {
1163  char *lchan = (char *) lfirst(q);
1164 
1165  if (strcmp(lchan, channel) == 0)
1166  {
1168  pfree(lchan);
1169  break;
1170  }
1171  }
1172 
1173  /*
1174  * We do not complain about unlistening something not being listened;
1175  * should we?
1176  */
1177 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:325
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:368
void pfree(void *pointer)
Definition: mcxt.c:1056
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1876 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1877 {
1878  /*
1879  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1880  * you do here.
1881  */
1882 
1883  /* signal that work needs to be done */
1884  notifyInterruptPending = true;
1885 
1886  /* make sure the event is processed in due course */
1887  SetLatch(MyLatch);
1888 }
void SetLatch(Latch *latch)
Definition: latch.c:505
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1282 of file async.c.

References lfirst.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

1283 {
1284  ListCell *p;
1285 
1286  foreach(p, listenChannels)
1287  {
1288  char *lchan = (char *) lfirst(p);
1289 
1290  if (strcmp(lchan, channel) == 0)
1291  return true;
1292  }
1293  return false;
1294 }
static List * listenChannels
Definition: async.c:325
#define lfirst(lc)
Definition: pg_list.h:190

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2392 of file async.c.

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32, hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2393 {
2394  const Notification *k = *(const Notification *const *) key;
2395 
2396  Assert(keysize == sizeof(Notification *));
2397  /* We don't bother to include the payload's trailing null in the hash */
2398  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2399  k->channel_len + k->payload_len + 1));
2400 }
#define DatumGetUInt32(X)
Definition: postgres.h:486
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
#define Assert(condition)
Definition: c.h:745
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2406 of file async.c.

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2407 {
2408  const Notification *k1 = *(const Notification *const *) key1;
2409  const Notification *k2 = *(const Notification *const *) key2;
2410 
2411  Assert(keysize == sizeof(Notification *));
2412  if (k1->channel_len == k2->channel_len &&
2413  k1->payload_len == k2->payload_len &&
2414  memcmp(k1->data, k2->data,
2415  k1->channel_len + k1->payload_len + 2) == 0)
2416  return 0; /* equal */
2417  return 1; /* not equal */
2418 }
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
#define Assert(condition)
Definition: c.h:745
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2262 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2263 {
2265  {
2267 
2268  pq_beginmessage(&buf, 'A');
2269  pq_sendint32(&buf, srcPid);
2270  pq_sendstring(&buf, channel);
2272  pq_sendstring(&buf, payload);
2273  pq_endmessage(&buf);
2274 
2275  /*
2276  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2277  * happen at the end of the transaction, and for incoming notifies
2278  * ProcessIncomingNotify will do it after finding all the notifies.
2279  */
2280  }
2281  else
2282  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2283 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:214
CommandDest whereToSendOutput
Definition: postgres.c:91
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 809 of file async.c.

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

810 {
811  FuncCallContext *funcctx;
812 
813  /* stuff done only on the first call of the function */
814  if (SRF_IS_FIRSTCALL())
815  {
816  /* create a function context for cross-call persistence */
817  funcctx = SRF_FIRSTCALL_INIT();
818  }
819 
820  /* stuff done on every call of the function */
821  funcctx = SRF_PERCALL_SETUP();
822 
823  if (funcctx->call_cntr < list_length(listenChannels))
824  {
825  char *channel = (char *) list_nth(listenChannels,
826  funcctx->call_cntr);
827 
828  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
829  }
830 
831  SRF_RETURN_DONE(funcctx);
832 }
uint64 call_cntr
Definition: funcapi.h:65
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:293
static List * listenChannels
Definition: async.c:325
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:297
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:299
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
static int list_length(const List *l)
Definition: pg_list.h:169
#define CStringGetTextDatum(s)
Definition: builtins.h:86
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:317
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:295

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1558 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

1559 {
1560  double usage;
1561 
1562  /* Advance the queue tail so we don't report a too-large result */
1564 
1565  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1566  usage = asyncQueueUsage();
1567  LWLockRelease(NotifyQueueLock);
1568 
1569  PG_RETURN_FLOAT8(usage);
1570 }
static void usage(void)
Definition: pg_standby.c:589
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:365
static double asyncQueueUsage(void)
Definition: async.c:1578
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
static void asyncQueueAdvanceTail(void)
Definition: async.c:2173
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 576 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

577 {
578  const char *channel;
579  const char *payload;
580 
581  if (PG_ARGISNULL(0))
582  channel = "";
583  else
584  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
585 
586  if (PG_ARGISNULL(1))
587  payload = "";
588  else
589  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
590 
591  /* For NOTIFY as a statement, this is checked in ProcessUtility */
593 
594  Async_Notify(channel, payload);
595 
596  PG_RETURN_VOID();
597 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:308
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:444
#define PG_RETURN_VOID()
Definition: fmgr.h:348
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:610
char * text_to_cstring(const text *t)
Definition: varlena.c:205

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 880 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

881 {
882  ListCell *p;
883 
885  return; /* no relevant statements in this xact */
886 
887  if (Trace_notify)
888  elog(DEBUG1, "PreCommit_Notify");
889 
890  /* Preflight for any pending listen/unlisten actions */
891  if (pendingActions != NULL)
892  {
893  foreach(p, pendingActions->actions)
894  {
895  ListenAction *actrec = (ListenAction *) lfirst(p);
896 
897  switch (actrec->action)
898  {
899  case LISTEN_LISTEN:
901  break;
902  case LISTEN_UNLISTEN:
903  /* there is no Exec_UnlistenPreCommit() */
904  break;
905  case LISTEN_UNLISTEN_ALL:
906  /* there is no Exec_UnlistenAllPreCommit() */
907  break;
908  }
909  }
910  }
911 
912  /* Queue any pending notifies (must happen after the above) */
913  if (pendingNotifies)
914  {
915  ListCell *nextNotify;
916 
917  /*
918  * Make sure that we have an XID assigned to the current transaction.
919  * GetCurrentTransactionId is cheap if we already have an XID, but not
920  * so cheap if we don't, and we'd prefer not to do that work while
921  * holding NotifyQueueLock.
922  */
923  (void) GetCurrentTransactionId();
924 
925  /*
926  * Serialize writers by acquiring a special lock that we hold till
927  * after commit. This ensures that queue entries appear in commit
928  * order, and in particular that there are never uncommitted queue
929  * entries ahead of committed ones, so an uncommitted transaction
930  * can't block delivery of deliverable notifications.
931  *
932  * We use a heavyweight lock so that it'll automatically be released
933  * after either commit or abort. This also allows deadlocks to be
934  * detected, though really a deadlock shouldn't be possible here.
935  *
936  * The lock is on "database 0", which is pretty ugly but it doesn't
937  * seem worth inventing a special locktag category just for this.
938  * (Historical note: before PG 9.0, a similar lock on "database 0" was
939  * used by the flatfiles mechanism.)
940  */
941  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
943 
944  /* Now push the notifications into the queue */
946 
947  nextNotify = list_head(pendingNotifies->events);
948  while (nextNotify != NULL)
949  {
950  /*
951  * Add the pending notifications to the queue. We acquire and
952  * release NotifyQueueLock once per page, which might be overkill
953  * but it does allow readers to get in while we're doing this.
954  *
955  * A full queue is very uncommon and should really not happen,
956  * given that we have so much space available in the SLRU pages.
957  * Nevertheless we need to deal with this possibility. Note that
958  * when we get here we are in the process of committing our
959  * transaction, but we have not yet committed to clog, so at this
960  * point in time we can still roll the transaction back.
961  */
962  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
964  if (asyncQueueIsFull())
965  ereport(ERROR,
966  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
967  errmsg("too many notifications in the NOTIFY queue")));
968  nextNotify = asyncQueueAddEntries(nextNotify);
969  LWLockRelease(NotifyQueueLock);
970  }
971  }
972 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:397
static ActionList * pendingActions
Definition: async.c:357
int errcode(int sqlerrcode)
Definition: elog.c:610
static bool asyncQueueIsFull(void)
Definition: async.c:1342
static void asyncQueueFillWarning(void)
Definition: async.c:1607
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
static bool backendHasSentNotifications
Definition: async.c:427
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
List * actions
Definition: async.c:353
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1444
#define lfirst(lc)
Definition: pg_list.h:190
static void Exec_ListenPreCommit(void)
Definition: async.c:1032
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
bool Trace_notify
Definition: async.c:433
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
ListenActionKind action
Definition: async.c:346

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1218 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1219 {
1220  MemoryContext caller_context;
1221 
1222  /* Nothing to do if we didn't send any notifications */
1224  return;
1225 
1226  /*
1227  * We reset the flag immediately; otherwise, if any sort of error occurs
1228  * below, we'd be locked up in an infinite loop, because control will come
1229  * right back here after error cleanup.
1230  */
1232 
1233  /*
1234  * We must preserve the caller's memory context (probably MessageContext)
1235  * across the transaction we do here.
1236  */
1237  caller_context = CurrentMemoryContext;
1238 
1239  if (Trace_notify)
1240  elog(DEBUG1, "ProcessCompletedNotifies");
1241 
1242  /*
1243  * We must run asyncQueueReadAllNotifications inside a transaction, else
1244  * bad things happen if it gets an error.
1245  */
1247 
1248  /* Send signals to other backends */
1249  SignalBackends();
1250 
1251  if (listenChannels != NIL)
1252  {
1253  /* Read the queue ourselves, and send relevant stuff to the frontend */
1255  }
1256 
1257  /*
1258  * If it's time to try to advance the global tail pointer, do that.
1259  */
1261  {
1262  backendTryAdvanceTail = false;
1264  }
1265 
1267 
1268  MemoryContextSwitchTo(caller_context);
1269 
1270  /* We don't need pq_flush() here since postgres.c will do one shortly */
1271 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2947
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:325
static bool backendHasSentNotifications
Definition: async.c:427
static bool backendTryAdvanceTail
Definition: async.c:430
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2173
static void SignalBackends(void)
Definition: async.c:1660
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1918
void StartTransactionCommand(void)
Definition: xact.c:2846
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( void  )
static

Definition at line 2223 of file async.c.

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

2224 {
2225  /* We *must* reset the flag */
2226  notifyInterruptPending = false;
2227 
2228  /* Do nothing else if we aren't actively listening */
2229  if (listenChannels == NIL)
2230  return;
2231 
2232  if (Trace_notify)
2233  elog(DEBUG1, "ProcessIncomingNotify");
2234 
2235  set_ps_display("notify interrupt");
2236 
2237  /*
2238  * We must run asyncQueueReadAllNotifications inside a transaction, else
2239  * bad things happen if it gets an error.
2240  */
2242 
2244 
2246 
2247  /*
2248  * Must flush the notify messages to ensure frontend gets them promptly.
2249  */
2250  pq_flush();
2251 
2252  set_ps_display("idle");
2253 
2254  if (Trace_notify)
2255  elog(DEBUG1, "ProcessIncomingNotify: done");
2256 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
#define pq_flush()
Definition: libpq.h:39
void CommitTransactionCommand(void)
Definition: xact.c:2947
static List * listenChannels
Definition: async.c:325
void set_ps_display(const char *activity)
Definition: ps_status.c:349
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1918
void StartTransactionCommand(void)
Definition: xact.c:2846
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1902 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1903 {
1905  return; /* not really idle */
1906 
1907  while (notifyInterruptPending)
1909 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
static void ProcessIncomingNotify(void)
Definition: async.c:2223
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 709 of file async.c.

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, offsetof, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

710 {
711  MemoryContext oldcontext;
712  ListenAction *actrec;
713  int my_level = GetCurrentTransactionNestLevel();
714 
715  /*
716  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
717  * be too complicated to ensure we get the right interactions of
718  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
719  * would be any performance benefit anyway in sane applications.
720  */
722 
723  /* space for terminating null is included in sizeof(ListenAction) */
724  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
725  strlen(channel) + 1);
726  actrec->action = action;
727  strcpy(actrec->channel, channel);
728 
729  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
730  {
731  ActionList *actions;
732 
733  /*
734  * First action in current sub(xact). Note that we allocate the
735  * ActionList in TopTransactionContext; the nestingLevel might get
736  * changed later by AtSubCommit_Notify.
737  */
738  actions = (ActionList *)
740  actions->nestingLevel = my_level;
741  actions->actions = list_make1(actrec);
742  actions->upper = pendingActions;
743  pendingActions = actions;
744  }
745  else
747 
748  MemoryContextSwitchTo(oldcontext);
749 }
MemoryContext TopTransactionContext
Definition: mcxt.c:49
static ActionList * pendingActions
Definition: async.c:357
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define list_make1(x1)
Definition: pg_list.h:227
List * actions
Definition: async.c:353
List * lappend(List *list, void *datum)
Definition: list.c:321
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
void * palloc(Size size)
Definition: mcxt.c:949
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:347
ListenActionKind action
Definition: async.c:346
#define offsetof(type, field)
Definition: c.h:668

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1660 of file async.c.

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by ProcessCompletedNotifies().

1661 {
1662  int32 *pids;
1663  BackendId *ids;
1664  int count;
1665 
1666  /*
1667  * Identify backends that we need to signal. We don't want to send
1668  * signals while holding the NotifyQueueLock, so this loop just builds a
1669  * list of target PIDs.
1670  *
1671  * XXX in principle these pallocs could fail, which would be bad. Maybe
1672  * preallocate the arrays? But in practice this is only run in trivial
1673  * transactions, so there should surely be space available.
1674  */
1675  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1676  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1677  count = 0;
1678 
1679  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1681  {
1682  int32 pid = QUEUE_BACKEND_PID(i);
1683  QueuePosition pos;
1684 
1685  Assert(pid != InvalidPid);
1686  if (pid == MyProcPid)
1687  continue; /* never signal self */
1688  pos = QUEUE_BACKEND_POS(i);
1690  {
1691  /*
1692  * Always signal listeners in our own database, unless they're
1693  * already caught up (unlikely, but possible).
1694  */
1695  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1696  continue;
1697  }
1698  else
1699  {
1700  /*
1701  * Listeners in other databases should be signaled only if they
1702  * are far behind.
1703  */
1706  continue;
1707  }
1708  /* OK, need to signal this one */
1709  pids[count] = pid;
1710  ids[count] = i;
1711  count++;
1712  }
1713  LWLockRelease(NotifyQueueLock);
1714 
1715  /* Now send signals */
1716  for (int i = 0; i < count; i++)
1717  {
1718  int32 pid = pids[i];
1719 
1720  /*
1721  * Note: assuming things aren't broken, a signal failure here could
1722  * only occur if the target backend exited since we released
1723  * NotifyQueueLock; which is unlikely but certainly possible. So we
1724  * just log a low-level debug message if it happens.
1725  */
1726  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1727  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1728  }
1729 
1730  pfree(pids);
1731  pfree(ids);
1732 }
int MyProcPid
Definition: globals.c:40
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
#define DEBUG3
Definition: elog.h:23
#define QUEUE_HEAD
Definition: async.c:284
signed int int32
Definition: c.h:362
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:471
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
void pfree(void *pointer)
Definition: mcxt.c:1056
#define QUEUE_FIRST_LISTENER
Definition: async.c:286
int MaxBackends
Definition: globals.c:136
#define QUEUE_CLEANUP_DELAY
Definition: async.c:231
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:288
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:289
#define Assert(condition)
Definition: c.h:745
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
void * palloc(Size size)
Definition: mcxt.c:949
#define elog(elevel,...)
Definition: elog.h:214
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:290
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define InvalidPid
Definition: miscadmin.h:32

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 282 of file async.c.

◆ backendHasSentNotifications

bool backendHasSentNotifications = false
static

Definition at line 427 of file async.c.

Referenced by PreCommit_Notify(), and ProcessCompletedNotifies().

◆ backendTryAdvanceTail

bool backendTryAdvanceTail = false
static

Definition at line 430 of file async.c.

Referenced by asyncQueueAddEntries(), and ProcessCompletedNotifies().

◆ listenChannels

List* listenChannels = NIL
static

Definition at line 325 of file async.c.

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 295 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

ActionList* pendingActions = NULL
static

Definition at line 357 of file async.c.

Referenced by AtSubAbort_Notify(), AtSubCommit_Notify(), and queue_listen().

◆ pendingNotifies

NotificationList* pendingNotifies = NULL
static

Definition at line 409 of file async.c.

Referenced by Async_Notify(), AtSubAbort_Notify(), and AtSubCommit_Notify().

◆ Trace_notify

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 421 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().