PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 

Functions

static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 191 of file async.c.

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 399 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 165 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 312 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 303 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 302 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 305 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 240 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 301 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 315 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 298 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 304 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 313 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 211 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 214 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition: async.c:478
int y
Definition: isn.c:72
int x
Definition: isn.c:71

Definition at line 224 of file async.c.

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 218 of file async.c.

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 203 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 202 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 300 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 299 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 189 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 205 of file async.c.

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 334 of file async.c.

335 {
ListenActionKind
Definition: async.c:335
@ LISTEN_LISTEN
Definition: async.c:336
@ LISTEN_UNLISTEN_ALL
Definition: async.c:338
@ LISTEN_UNLISTEN
Definition: async.c:337

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2281 of file async.c.

2282 {
2284 
2285  /* Create the hash table if it's time to */
2287  pendingNotifies->hashtab == NULL)
2288  {
2289  HASHCTL hash_ctl;
2290  ListCell *l;
2291 
2292  /* Create the hash table */
2293  hash_ctl.keysize = sizeof(Notification *);
2294  hash_ctl.entrysize = sizeof(NotificationHash);
2295  hash_ctl.hash = notification_hash;
2296  hash_ctl.match = notification_match;
2297  hash_ctl.hcxt = CurTransactionContext;
2299  hash_create("Pending Notifies",
2300  256L,
2301  &hash_ctl,
2303 
2304  /* Insert all the already-existing events */
2305  foreach(l, pendingNotifies->events)
2306  {
2307  Notification *oldn = (Notification *) lfirst(l);
2308  bool found;
2309 
2311  &oldn,
2312  HASH_ENTER,
2313  &found);
2314  Assert(!found);
2315  }
2316  }
2317 
2318  /* Add new event to the list, in order */
2320 
2321  /* Add event to the hash table if needed */
2322  if (pendingNotifies->hashtab != NULL)
2323  {
2324  bool found;
2325 
2327  &n,
2328  HASH_ENTER,
2329  &found);
2330  Assert(!found);
2331  }
2332 }
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:399
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2340
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2354
struct NotificationHash NotificationHash
static NotificationList * pendingNotifies
Definition: async.c:406
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_COMPARE
Definition: hsearch.h:99
#define HASH_FUNCTION
Definition: hsearch.h:98
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:338
MemoryContext CurTransactionContext
Definition: mcxt.c:147
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
Size keysize
Definition: hsearch.h:75
HashValueFunc hash
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:76
HashCompareFunc match
Definition: hsearch.h:80
MemoryContext hcxt
Definition: hsearch.h:86
HTAB * hashtab
Definition: async.c:395
List * events
Definition: async.c:394

References Assert(), CurTransactionContext, HASHCTL::entrysize, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), notification_match(), and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 744 of file async.c.

745 {
746  if (Trace_notify)
747  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
748 
749  queue_listen(LISTEN_LISTEN, channel);
750 }
bool Trace_notify
Definition: async.c:427
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:696
#define DEBUG1
Definition: elog.h:30
int MyProcPid
Definition: globals.c:44

References DEBUG1, elog(), LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 597 of file async.c.

598 {
599  int my_level = GetCurrentTransactionNestLevel();
600  size_t channel_len;
601  size_t payload_len;
602  Notification *n;
603  MemoryContext oldcontext;
604 
605  if (IsParallelWorker())
606  elog(ERROR, "cannot send notifications from a parallel worker");
607 
608  if (Trace_notify)
609  elog(DEBUG1, "Async_Notify(%s)", channel);
610 
611  channel_len = channel ? strlen(channel) : 0;
612  payload_len = payload ? strlen(payload) : 0;
613 
614  /* a channel name must be specified */
615  if (channel_len == 0)
616  ereport(ERROR,
617  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
618  errmsg("channel name cannot be empty")));
619 
620  /* enforce length limits */
621  if (channel_len >= NAMEDATALEN)
622  ereport(ERROR,
623  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
624  errmsg("channel name too long")));
625 
626  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
627  ereport(ERROR,
628  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629  errmsg("payload string too long")));
630 
631  /*
632  * We must construct the Notification entry, even if we end up not using
633  * it, in order to compare it cheaply to existing list entries.
634  *
635  * The notification list needs to live until end of transaction, so store
636  * it in the transaction context.
637  */
639 
640  n = (Notification *) palloc(offsetof(Notification, data) +
641  channel_len + payload_len + 2);
642  n->channel_len = channel_len;
643  n->payload_len = payload_len;
644  strcpy(n->data, channel);
645  if (payload)
646  strcpy(n->data + channel_len + 1, payload);
647  else
648  n->data[channel_len + 1] = '\0';
649 
650  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
651  {
652  NotificationList *notifies;
653 
654  /*
655  * First notify event in current (sub)xact. Note that we allocate the
656  * NotificationList in TopTransactionContext; the nestingLevel might
657  * get changed later by AtSubCommit_Notify.
658  */
659  notifies = (NotificationList *)
661  sizeof(NotificationList));
662  notifies->nestingLevel = my_level;
663  notifies->events = list_make1(n);
664  /* We certainly don't need a hashtable yet */
665  notifies->hashtab = NULL;
666  notifies->upper = pendingNotifies;
667  pendingNotifies = notifies;
668  }
669  else
670  {
671  /* Now check for duplicates */
673  {
674  /* It's a dup, so forget it */
675  pfree(n);
676  MemoryContextSwitchTo(oldcontext);
677  return;
678  }
679 
680  /* Append more events to existing list */
682  }
683 
684  MemoryContextSwitchTo(oldcontext);
685 }
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2240
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2281
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:165
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsParallelWorker()
Definition: parallel.h:61
MemoryContext TopTransactionContext
Definition: mcxt.c:146
void pfree(void *pointer)
Definition: mcxt.c:1456
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
void * palloc(Size size)
Definition: mcxt.c:1226
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
#define NAMEDATALEN
const void * data
#define list_make1(x1)
Definition: pg_list.h:212
int nestingLevel
Definition: async.c:393
struct NotificationList * upper
Definition: async.c:396
uint16 payload_len
Definition: async.c:386
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
uint16 channel_len
Definition: async.c:385
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:914

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog(), ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 758 of file async.c.

759 {
760  if (Trace_notify)
761  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
762 
763  /* If we couldn't possibly be listening, no need to queue anything */
764  if (pendingActions == NULL && !unlistenExitRegistered)
765  return;
766 
767  queue_listen(LISTEN_UNLISTEN, channel);
768 }
static ActionList * pendingActions
Definition: async.c:354
static bool unlistenExitRegistered
Definition: async.c:418

References DEBUG1, elog(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 776 of file async.c.

777 {
778  if (Trace_notify)
779  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
780 
781  /* If we couldn't possibly be listening, no need to queue anything */
782  if (pendingActions == NULL && !unlistenExitRegistered)
783  return;
784 
786 }

References DEBUG1, elog(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 829 of file async.c.

830 {
833 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1200
static void asyncQueueUnregister(void)
Definition: async.c:1237

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2240 of file async.c.

2241 {
2242  if (pendingNotifies == NULL)
2243  return false;
2244 
2245  if (pendingNotifies->hashtab != NULL)
2246  {
2247  /* Use the hash table to probe for a match */
2249  &n,
2250  HASH_FIND,
2251  NULL))
2252  return true;
2253  }
2254  else
2255  {
2256  /* Must scan the event list */
2257  ListCell *l;
2258 
2259  foreach(l, pendingNotifies->events)
2260  {
2261  Notification *oldn = (Notification *) lfirst(l);
2262 
2263  if (n->channel_len == oldn->channel_len &&
2264  n->payload_len == oldn->payload_len &&
2265  memcmp(n->data, oldn->data,
2266  n->channel_len + n->payload_len + 2) == 0)
2267  return true;
2268  }
2269  }
2270 
2271  return false;
2272 }
@ HASH_FIND
Definition: hsearch.h:113

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1362 of file async.c.

1363 {
1364  AsyncQueueEntry qe;
1365  QueuePosition queue_head;
1366  int64 pageno;
1367  int offset;
1368  int slotno;
1369 
1370  /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
1371  LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
1372 
1373  /*
1374  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1375  * memory upon exiting. The reason for this is that if we have to advance
1376  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1377  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1378  * subsequent insertions would try to put entries into a page that slru.c
1379  * thinks doesn't exist yet.) So, use a local position variable. Note
1380  * that if we do fail, any already-inserted queue entries are forgotten;
1381  * this is okay, since they'd be useless anyway after our transaction
1382  * rolls back.
1383  */
1384  queue_head = QUEUE_HEAD;
1385 
1386  /*
1387  * If this is the first write since the postmaster started, we need to
1388  * initialize the first page of the async SLRU. Otherwise, the current
1389  * page should be initialized already, so just fetch it.
1390  */
1391  pageno = QUEUE_POS_PAGE(queue_head);
1392  if (QUEUE_POS_IS_ZERO(queue_head))
1393  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1394  else
1395  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1397 
1398  /* Note we mark the page dirty before writing in it */
1399  NotifyCtl->shared->page_dirty[slotno] = true;
1400 
1401  while (nextNotify != NULL)
1402  {
1403  Notification *n = (Notification *) lfirst(nextNotify);
1404 
1405  /* Construct a valid queue entry in local variable qe */
1407 
1408  offset = QUEUE_POS_OFFSET(queue_head);
1409 
1410  /* Check whether the entry really fits on the current page */
1411  if (offset + qe.length <= QUEUE_PAGESIZE)
1412  {
1413  /* OK, so advance nextNotify past this item */
1414  nextNotify = lnext(pendingNotifies->events, nextNotify);
1415  }
1416  else
1417  {
1418  /*
1419  * Write a dummy entry to fill up the page. Actually readers will
1420  * only check dboid and since it won't match any reader's database
1421  * OID, they will ignore this entry and move on.
1422  */
1423  qe.length = QUEUE_PAGESIZE - offset;
1424  qe.dboid = InvalidOid;
1425  qe.data[0] = '\0'; /* empty channel */
1426  qe.data[1] = '\0'; /* empty payload */
1427  }
1428 
1429  /* Now copy qe into the shared buffer page */
1430  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1431  &qe,
1432  qe.length);
1433 
1434  /* Advance queue_head appropriately, and detect if page is full */
1435  if (asyncQueueAdvance(&(queue_head), qe.length))
1436  {
1437  /*
1438  * Page is full, so we're done here, but first fill the next page
1439  * with zeroes. The reason to do this is to ensure that slru.c's
1440  * idea of the head page is always the same as ours, which avoids
1441  * boundary problems in SimpleLruTruncate. The test in
1442  * asyncQueueIsFull() ensured that there is room to create this
1443  * page without overrunning the queue.
1444  */
1445  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1446 
1447  /*
1448  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1449  * set flag to remember that we should try to advance the tail
1450  * pointer (we don't want to actually do that right here).
1451  */
1452  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1453  tryAdvanceTail = true;
1454 
1455  /* And exit the loop */
1456  break;
1457  }
1458  }
1459 
1460  /* Success, so update the global QUEUE_HEAD */
1461  QUEUE_HEAD = queue_head;
1462 
1463  LWLockRelease(NotifySLRULock);
1464 
1465  return nextNotify;
1466 }
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1326
static bool tryAdvanceTail
Definition: async.c:424
#define QUEUE_POS_OFFSET(x)
Definition: async.c:203
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1293
#define QUEUE_POS_PAGE(x)
Definition: async.c:202
#define QUEUE_CLEANUP_DELAY
Definition: async.c:240
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:214
#define NotifyCtl
Definition: async.c:312
#define QUEUE_PAGESIZE
Definition: async.c:313
#define QUEUE_HEAD
Definition: async.c:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LW_EXCLUSIVE
Definition: lwlock.h:116
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:343
#define InvalidOid
Definition: postgres_ext.h:36
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:423
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:308
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:185
#define InvalidTransactionId
Definition: transam.h:31

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1293 of file async.c.

1294 {
1295  int64 pageno = QUEUE_POS_PAGE(*position);
1296  int offset = QUEUE_POS_OFFSET(*position);
1297  bool pageJump = false;
1298 
1299  /*
1300  * Move to the next writing position: First jump over what we have just
1301  * written or read.
1302  */
1303  offset += entryLength;
1304  Assert(offset <= QUEUE_PAGESIZE);
1305 
1306  /*
1307  * In a second step check if another entry can possibly be written to the
1308  * page. If so, stay here, we have reached the next position. If not, then
1309  * we need to move on to the next page.
1310  */
1312  {
1313  pageno++;
1314  offset = 0;
1315  pageJump = true;
1316  }
1317 
1318  SET_QUEUE_POS(*position, pageno, offset);
1319  return pageJump;
1320 }
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:205
#define AsyncQueueEntryEmptySize
Definition: async.c:191
#define QUEUEALIGN(len)
Definition: async.c:189

References Assert(), AsyncQueueEntryEmptySize, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2100 of file async.c.

2101 {
2102  QueuePosition min;
2103  int oldtailpage;
2104  int newtailpage;
2105  int boundary;
2106 
2107  /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2108  LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2109 
2110  /*
2111  * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2112  * (ie, exactly match at least one backend's queue position), so it must
2113  * be updated atomically with the actual computation. Since v13, we could
2114  * get away with not doing it like that, but it seems prudent to keep it
2115  * so.
2116  *
2117  * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2118  * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2119  * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2120  * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2121  * there are pages we can truncate but haven't yet finished doing so.
2122  *
2123  * For concurrency's sake, we don't want to hold NotifyQueueLock while
2124  * performing SimpleLruTruncate. This is OK because no backend will try
2125  * to access the pages we are in the midst of truncating.
2126  */
2127  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2128  min = QUEUE_HEAD;
2130  {
2132  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2133  }
2134  QUEUE_TAIL = min;
2135  oldtailpage = QUEUE_STOP_PAGE;
2136  LWLockRelease(NotifyQueueLock);
2137 
2138  /*
2139  * We can truncate something if the global tail advanced across an SLRU
2140  * segment boundary.
2141  *
2142  * XXX it might be better to truncate only once every several segments, to
2143  * reduce the number of directory scans.
2144  */
2145  newtailpage = QUEUE_POS_PAGE(min);
2146  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2147  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2148  {
2149  /*
2150  * SimpleLruTruncate() will ask for NotifySLRULock but will also
2151  * release the lock again.
2152  */
2153  SimpleLruTruncate(NotifyCtl, newtailpage);
2154 
2155  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2156  QUEUE_STOP_PAGE = newtailpage;
2157  LWLockRelease(NotifyQueueLock);
2158  }
2159 
2160  LWLockRelease(NotifyQueueTailLock);
2161 }
#define QUEUE_FIRST_LISTENER
Definition: async.c:301
#define QUEUE_POS_MIN(x, y)
Definition: async.c:218
#define QUEUE_BACKEND_POS(i)
Definition: async.c:305
#define QUEUE_TAIL
Definition: async.c:299
#define QUEUE_BACKEND_PID(i)
Definition: async.c:302
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:304
#define QUEUE_STOP_PAGE
Definition: async.c:300
int BackendId
Definition: backendid.h:21
int i
Definition: isn.c:73
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1254
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34

References Assert(), asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1519 of file async.c.

1520 {
1521  double fillDegree;
1522  TimestampTz t;
1523 
1524  fillDegree = asyncQueueUsage();
1525  if (fillDegree < 0.5)
1526  return;
1527 
1528  t = GetCurrentTimestamp();
1529 
1532  {
1533  QueuePosition min = QUEUE_HEAD;
1534  int32 minPid = InvalidPid;
1535 
1537  {
1539  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1540  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1541  minPid = QUEUE_BACKEND_PID(i);
1542  }
1543 
1544  ereport(WARNING,
1545  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1546  (minPid != InvalidPid ?
1547  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1548  : 0),
1549  (minPid != InvalidPid ?
1550  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1551  : 0)));
1552 
1554  }
1555 }
static double asyncQueueUsage(void)
Definition: async.c:1498
static AsyncQueueControl * asyncQueueControl
Definition: async.c:296
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:315
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:211
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1785
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1649
signed int int32
Definition: c.h:483
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
#define WARNING
Definition: elog.h:36
TimestampTz lastQueueFillWarn
Definition: async.c:291

References Assert(), asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1278 of file async.c.

1279 {
1280  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1281  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1282  int occupied = headPage - tailPage;
1283 
1284  return occupied >= max_notify_queue_pages;
1285 }
int max_notify_queue_pages
Definition: async.c:430

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1326 of file async.c.

1327 {
1328  size_t channellen = n->channel_len;
1329  size_t payloadlen = n->payload_len;
1330  int entryLength;
1331 
1332  Assert(channellen < NAMEDATALEN);
1333  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1334 
1335  /* The terminators are already included in AsyncQueueEntryEmptySize */
1336  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1337  entryLength = QUEUEALIGN(entryLength);
1338  qe->length = entryLength;
1339  qe->dboid = MyDatabaseId;
1340  qe->xid = GetCurrentTransactionId();
1341  qe->srcPid = MyProcPid;
1342  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1343 }
Oid MyDatabaseId
Definition: globals.c:89
int32 srcPid
Definition: async.c:184
TransactionId xid
Definition: async.c:183
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:445

References Assert(), AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, AsyncQueueEntry::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 468 of file async.c.

469 {
470  return p - q;
471 }

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 478 of file async.c.

479 {
480  return p < q;
481 }

Referenced by asyncQueueAdvanceTail(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2008 of file async.c.

2012 {
2013  bool reachedStop = false;
2014  bool reachedEndOfPage;
2015  AsyncQueueEntry *qe;
2016 
2017  do
2018  {
2019  QueuePosition thisentry = *current;
2020 
2021  if (QUEUE_POS_EQUAL(thisentry, stop))
2022  break;
2023 
2024  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2025 
2026  /*
2027  * Advance *current over this message, possibly to the next page. As
2028  * noted in the comments for asyncQueueReadAllNotifications, we must
2029  * do this before possibly failing while processing the message.
2030  */
2031  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2032 
2033  /* Ignore messages destined for other databases */
2034  if (qe->dboid == MyDatabaseId)
2035  {
2036  if (XidInMVCCSnapshot(qe->xid, snapshot))
2037  {
2038  /*
2039  * The source transaction is still in progress, so we can't
2040  * process this message yet. Break out of the loop, but first
2041  * back up *current so we will reprocess the message next
2042  * time. (Note: it is unlikely but not impossible for
2043  * TransactionIdDidCommit to fail, so we can't really avoid
2044  * this advance-then-back-up behavior when dealing with an
2045  * uncommitted message.)
2046  *
2047  * Note that we must test XidInMVCCSnapshot before we test
2048  * TransactionIdDidCommit, else we might return a message from
2049  * a transaction that is not yet visible to snapshots; compare
2050  * the comments at the head of heapam_visibility.c.
2051  *
2052  * Also, while our own xact won't be listed in the snapshot,
2053  * we need not check for TransactionIdIsCurrentTransactionId
2054  * because our transaction cannot (yet) have queued any
2055  * messages.
2056  */
2057  *current = thisentry;
2058  reachedStop = true;
2059  break;
2060  }
2061  else if (TransactionIdDidCommit(qe->xid))
2062  {
2063  /* qe->data is the null-terminated channel name */
2064  char *channel = qe->data;
2065 
2066  if (IsListeningOn(channel))
2067  {
2068  /* payload follows channel name */
2069  char *payload = qe->data + strlen(channel) + 1;
2070 
2071  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2072  }
2073  }
2074  else
2075  {
2076  /*
2077  * The source transaction aborted or crashed, so we just
2078  * ignore its notifications.
2079  */
2080  }
2081  }
2082 
2083  /* Loop back if we're not at end of page */
2084  } while (!reachedEndOfPage);
2085 
2086  if (QUEUE_POS_EQUAL(*current, stop))
2087  reachedStop = true;
2088 
2089  return reachedStop;
2090 }
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2216
static bool IsListeningOn(const char *channel)
Definition: async.c:1218
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:1862
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1843 of file async.c.

1844 {
1845  volatile QueuePosition pos;
1846  QueuePosition head;
1847  Snapshot snapshot;
1848 
1849  /* page_buffer must be adequately aligned, so use a union */
1850  union
1851  {
1852  char buf[QUEUE_PAGESIZE];
1853  AsyncQueueEntry align;
1854  } page_buffer;
1855 
1856  /* Fetch current state */
1857  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1858  /* Assert checks that we have a valid state entry */
1861  head = QUEUE_HEAD;
1862  LWLockRelease(NotifyQueueLock);
1863 
1864  if (QUEUE_POS_EQUAL(pos, head))
1865  {
1866  /* Nothing to do, we have read all notifications already. */
1867  return;
1868  }
1869 
1870  /*----------
1871  * Get snapshot we'll use to decide which xacts are still in progress.
1872  * This is trickier than it might seem, because of race conditions.
1873  * Consider the following example:
1874  *
1875  * Backend 1: Backend 2:
1876  *
1877  * transaction starts
1878  * UPDATE foo SET ...;
1879  * NOTIFY foo;
1880  * commit starts
1881  * queue the notify message
1882  * transaction starts
1883  * LISTEN foo; -- first LISTEN in session
1884  * SELECT * FROM foo WHERE ...;
1885  * commit to clog
1886  * commit starts
1887  * add backend 2 to array of listeners
1888  * advance to queue head (this code)
1889  * commit to clog
1890  *
1891  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1892  * wasn't committed yet. Ideally we'd ensure that client 2 would
1893  * eventually get transaction 1's notify message, but there's no way
1894  * to do that; until we're in the listener array, there's no guarantee
1895  * that the notify message doesn't get removed from the queue.
1896  *
1897  * Therefore the coding technique transaction 2 is using is unsafe:
1898  * applications must commit a LISTEN before inspecting database state,
1899  * if they want to ensure they will see notifications about subsequent
1900  * changes to that state.
1901  *
1902  * What we do guarantee is that we'll see all notifications from
1903  * transactions committing after the snapshot we take here.
1904  * Exec_ListenPreCommit has already added us to the listener array,
1905  * so no not-yet-committed messages can be removed from the queue
1906  * before we see them.
1907  *----------
1908  */
1909  snapshot = RegisterSnapshot(GetLatestSnapshot());
1910 
1911  /*
1912  * It is possible that we fail while trying to send a message to our
1913  * frontend (for example, because of encoding conversion failure). If
1914  * that happens it is critical that we not try to send the same message
1915  * over and over again. Therefore, we place a PG_TRY block here that will
1916  * forcibly advance our queue position before we lose control to an error.
1917  * (We could alternatively retake NotifyQueueLock and move the position
1918  * before handling each individual message, but that seems like too much
1919  * lock traffic.)
1920  */
1921  PG_TRY();
1922  {
1923  bool reachedStop;
1924 
1925  do
1926  {
1927  int curpage = QUEUE_POS_PAGE(pos);
1928  int curoffset = QUEUE_POS_OFFSET(pos);
1929  int slotno;
1930  int copysize;
1931 
1932  /*
1933  * We copy the data from SLRU into a local buffer, so as to avoid
1934  * holding the NotifySLRULock while we are examining the entries
1935  * and possibly transmitting them to our frontend. Copy only the
1936  * part of the page we will actually inspect.
1937  */
1938  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
1940  if (curpage == QUEUE_POS_PAGE(head))
1941  {
1942  /* we only want to read as far as head */
1943  copysize = QUEUE_POS_OFFSET(head) - curoffset;
1944  if (copysize < 0)
1945  copysize = 0; /* just for safety */
1946  }
1947  else
1948  {
1949  /* fetch all the rest of the page */
1950  copysize = QUEUE_PAGESIZE - curoffset;
1951  }
1952  memcpy(page_buffer.buf + curoffset,
1953  NotifyCtl->shared->page_buffer[slotno] + curoffset,
1954  copysize);
1955  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
1956  LWLockRelease(NotifySLRULock);
1957 
1958  /*
1959  * Process messages up to the stop position, end of page, or an
1960  * uncommitted message.
1961  *
1962  * Our stop position is what we found to be the head's position
1963  * when we entered this function. It might have changed already.
1964  * But if it has, we will receive (or have already received and
1965  * queued) another signal and come here again.
1966  *
1967  * We are not holding NotifyQueueLock here! The queue can only
1968  * extend beyond the head pointer (see above) and we leave our
1969  * backend's pointer where it is so nobody will truncate or
1970  * rewrite pages under us. Especially we don't want to hold a lock
1971  * while sending the notifications to the frontend.
1972  */
1973  reachedStop = asyncQueueProcessPageEntries(&pos, head,
1974  page_buffer.buf,
1975  snapshot);
1976  } while (!reachedStop);
1977  }
1978  PG_FINALLY();
1979  {
1980  /* Update shared state */
1981  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1983  LWLockRelease(NotifyQueueLock);
1984  }
1985  PG_END_TRY();
1986 
1987  /* Done with snapshot */
1988  UnregisterSnapshot(snapshot);
1989 }
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2008
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_FINALLY(...)
Definition: elog.h:387
BackendId MyBackendId
Definition: globals.c:85
@ LW_SHARED
Definition: lwlock.h:117
static char * buf
Definition: pg_test_fsync.c:73
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:523
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:298
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:843
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:801

References Assert(), asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1237 of file async.c.

1238 {
1239  Assert(listenChannels == NIL); /* else caller error */
1240 
1241  if (!amRegisteredListener) /* nothing to do */
1242  return;
1243 
1244  /*
1245  * Need exclusive lock here to manipulate list links.
1246  */
1247  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1248  /* Mark our entry as invalid */
1251  /* and remove it from the list */
1254  else
1255  {
1257  {
1259  {
1261  break;
1262  }
1263  }
1264  }
1266  LWLockRelease(NotifyQueueLock);
1267 
1268  /* mark ourselves as no longer listed in the global array */
1269  amRegisteredListener = false;
1270 }
static List * listenChannels
Definition: async.c:322
static bool amRegisteredListener
Definition: async.c:421
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:303
#define InvalidBackendId
Definition: backendid.h:23

References amRegisteredListener, Assert(), i, InvalidBackendId, InvalidOid, InvalidPid, listenChannels, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1498 of file async.c.

1499 {
1500  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1501  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1502  int occupied = headPage - tailPage;
1503 
1504  if (occupied == 0)
1505  return (double) 0; /* fast exit for common case */
1506 
1507  return (double) occupied / (double) max_notify_queue_pages;
1508 }

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 504 of file async.c.

505 {
506  bool found;
507  Size size;
508 
509  /*
510  * Create or attach to the AsyncQueueControl structure.
511  *
512  * The used entries in the backend[] array run from 1 to MaxBackends; the
513  * zero'th entry is unused but must be allocated.
514  */
515  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
516  size = add_size(size, offsetof(AsyncQueueControl, backend));
517 
519  ShmemInitStruct("Async Queue Control", size, &found);
520 
521  if (!found)
522  {
523  /* First time through, so initialize it */
524  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
525  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
526  QUEUE_STOP_PAGE = 0;
529  /* zero'th entry won't be used, but let's initialize it anyway */
530  for (int i = 0; i <= MaxBackends; i++)
531  {
536  }
537  }
538 
539  /*
540  * Set up SLRU management of the pg_notify data. Note that long segment
541  * names are used in order to avoid wraparound.
542  */
543  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
545  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
546  SYNC_HANDLER_NONE, true);
547 
548  if (!found)
549  {
550  /*
551  * During start or reboot, clean out the pg_notify directory.
552  */
554  }
555 }
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
size_t Size
Definition: c.h:594
int MaxBackends
Definition: globals.c:142
@ LWTRANCHE_NOTIFY_BUFFER
Definition: lwlock.h:186
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:214
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1607
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1560
@ SYNC_HANDLER_NONE
Definition: sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 487 of file async.c.

488 {
489  Size size;
490 
491  /* This had better match AsyncShmemInit */
492  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
493  size = add_size(size, offsetof(AsyncQueueControl, backend));
494 
496 
497  return size;
498 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:182

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1663 of file async.c.

1664 {
1665  /*
1666  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1667  * we have registered as a listener but have not made any entry in
1668  * listenChannels. In that case, deregister again.
1669  */
1672 
1673  /* And clean up */
1675 }
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2370

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), listenChannels, and NIL.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 974 of file async.c.

975 {
976  ListCell *p;
977 
978  /*
979  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
980  * return as soon as possible
981  */
983  return;
984 
985  if (Trace_notify)
986  elog(DEBUG1, "AtCommit_Notify");
987 
988  /* Perform any pending listen/unlisten actions */
989  if (pendingActions != NULL)
990  {
991  foreach(p, pendingActions->actions)
992  {
993  ListenAction *actrec = (ListenAction *) lfirst(p);
994 
995  switch (actrec->action)
996  {
997  case LISTEN_LISTEN:
998  Exec_ListenCommit(actrec->channel);
999  break;
1000  case LISTEN_UNLISTEN:
1001  Exec_UnlistenCommit(actrec->channel);
1002  break;
1003  case LISTEN_UNLISTEN_ALL:
1005  break;
1006  }
1007  }
1008  }
1009 
1010  /* If no longer listening to anything, get out of listener array */
1013 
1014  /*
1015  * Send signals to listening backends. We need do this only if there are
1016  * pending notifies, which were previously added to the shared queue by
1017  * PreCommit_Notify().
1018  */
1019  if (pendingNotifies != NULL)
1020  SignalBackends();
1021 
1022  /*
1023  * If it's time to try to advance the global tail pointer, do that.
1024  *
1025  * (It might seem odd to do this in the sender, when more than likely the
1026  * listeners won't yet have read the messages we just sent. However,
1027  * there's less contention if only the sender does it, and there is little
1028  * need for urgency in advancing the global tail. So this typically will
1029  * be clearing out messages that were sent some time ago.)
1030  */
1031  if (tryAdvanceTail)
1032  {
1033  tryAdvanceTail = false;
1035  }
1036 
1037  /* And clean up */
1039 }
static void SignalBackends(void)
Definition: async.c:1573
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1142
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1169
static void asyncQueueAdvanceTail(void)
Definition: async.c:2100
List * actions
Definition: async.c:350
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:344
ListenActionKind action
Definition: async.c:343

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog(), Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 842 of file async.c.

843 {
844  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
846  ereport(ERROR,
847  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
848  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
849 }

References ereport, errcode(), errmsg(), ERROR, pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1753 of file async.c.

1754 {
1755  int my_level = GetCurrentTransactionNestLevel();
1756 
1757  /*
1758  * All we have to do is pop the stack --- the actions/notifies made in
1759  * this subxact are no longer interesting, and the space will be freed
1760  * when CurTransactionContext is recycled. We still have to free the
1761  * ActionList and NotificationList objects themselves, though, because
1762  * those are allocated in TopTransactionContext.
1763  *
1764  * Note that there might be no entries at all, or no entries for the
1765  * current subtransaction level, either because none were ever created, or
1766  * because we reentered this routine due to trouble during subxact abort.
1767  */
1768  while (pendingActions != NULL &&
1769  pendingActions->nestingLevel >= my_level)
1770  {
1771  ActionList *childPendingActions = pendingActions;
1772 
1774  pfree(childPendingActions);
1775  }
1776 
1777  while (pendingNotifies != NULL &&
1778  pendingNotifies->nestingLevel >= my_level)
1779  {
1780  NotificationList *childPendingNotifies = pendingNotifies;
1781 
1783  pfree(childPendingNotifies);
1784  }
1785 }
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1683 of file async.c.

1684 {
1685  int my_level = GetCurrentTransactionNestLevel();
1686 
1687  /* If there are actions at our nesting level, we must reparent them. */
1688  if (pendingActions != NULL &&
1689  pendingActions->nestingLevel >= my_level)
1690  {
1691  if (pendingActions->upper == NULL ||
1692  pendingActions->upper->nestingLevel < my_level - 1)
1693  {
1694  /* nothing to merge; give the whole thing to the parent */
1696  }
1697  else
1698  {
1699  ActionList *childPendingActions = pendingActions;
1700 
1702 
1703  /*
1704  * Mustn't try to eliminate duplicates here --- see queue_listen()
1705  */
1708  childPendingActions->actions);
1709  pfree(childPendingActions);
1710  }
1711  }
1712 
1713  /* If there are notifies at our nesting level, we must reparent them. */
1714  if (pendingNotifies != NULL &&
1715  pendingNotifies->nestingLevel >= my_level)
1716  {
1717  Assert(pendingNotifies->nestingLevel == my_level);
1718 
1719  if (pendingNotifies->upper == NULL ||
1720  pendingNotifies->upper->nestingLevel < my_level - 1)
1721  {
1722  /* nothing to merge; give the whole thing to the parent */
1724  }
1725  else
1726  {
1727  /*
1728  * Formerly, we didn't bother to eliminate duplicates here, but
1729  * now we must, else we fall foul of "Assert(!found)", either here
1730  * or during a later attempt to build the parent-level hashtable.
1731  */
1732  NotificationList *childPendingNotifies = pendingNotifies;
1733  ListCell *l;
1734 
1736  /* Insert all the subxact's events into parent, except for dups */
1737  foreach(l, childPendingNotifies->events)
1738  {
1739  Notification *childn = (Notification *) lfirst(l);
1740 
1741  if (!AsyncExistsPendingNotify(childn))
1742  AddEventToPendingNotifies(childn);
1743  }
1744  pfree(childPendingNotifies);
1745  }
1746  }
1747 }
List * list_concat(List *list1, const List *list2)
Definition: list.c:560

References ActionList::actions, AddEventToPendingNotifies(), Assert(), AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2370 of file async.c.

2371 {
2372  /*
2373  * Everything's allocated in either TopTransactionContext or the context
2374  * for the subtransaction to which it corresponds. So, there's nothing to
2375  * do here except reset the pointers; the space will be reclaimed when the
2376  * contexts are deleted.
2377  */
2378  pendingActions = NULL;
2379  pendingNotifies = NULL;
2380 }

References pendingActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1142 of file async.c.

1143 {
1144  MemoryContext oldcontext;
1145 
1146  /* Do nothing if we are already listening on this channel */
1147  if (IsListeningOn(channel))
1148  return;
1149 
1150  /*
1151  * Add the new channel name to listenChannels.
1152  *
1153  * XXX It is theoretically possible to get an out-of-memory failure here,
1154  * which would be bad because we already committed. For the moment it
1155  * doesn't seem worth trying to guard against that, but maybe improve this
1156  * later.
1157  */
1160  MemoryContextSwitchTo(oldcontext);
1161 }
char * pstrdup(const char *in)
Definition: mcxt.c:1644
MemoryContext TopMemoryContext
Definition: mcxt.c:141

References IsListeningOn(), lappend(), listenChannels, MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1047 of file async.c.

1048 {
1049  QueuePosition head;
1050  QueuePosition max;
1051  BackendId prevListener;
1052 
1053  /*
1054  * Nothing to do if we are already listening to something, nor if we
1055  * already ran this routine in this transaction.
1056  */
1058  return;
1059 
1060  if (Trace_notify)
1061  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1062 
1063  /*
1064  * Before registering, make sure we will unlisten before dying. (Note:
1065  * this action does not get undone if we abort later.)
1066  */
1068  {
1070  unlistenExitRegistered = true;
1071  }
1072 
1073  /*
1074  * This is our first LISTEN, so establish our pointer.
1075  *
1076  * We set our pointer to the global tail pointer and then move it forward
1077  * over already-committed notifications. This ensures we cannot miss any
1078  * not-yet-committed notifications. We might get a few more but that
1079  * doesn't hurt.
1080  *
1081  * In some scenarios there might be a lot of committed notifications that
1082  * have not yet been pruned away (because some backend is being lazy about
1083  * reading them). To reduce our startup time, we can look at other
1084  * backends and adopt the maximum "pos" pointer of any backend that's in
1085  * our database; any notifications it's already advanced over are surely
1086  * committed and need not be re-examined by us. (We must consider only
1087  * backends connected to our DB, because others will not have bothered to
1088  * check committed-ness of notifications in our DB.)
1089  *
1090  * We need exclusive lock here so we can look at other backends' entries
1091  * and manipulate the list links.
1092  */
1093  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1094  head = QUEUE_HEAD;
1095  max = QUEUE_TAIL;
1096  prevListener = InvalidBackendId;
1098  {
1100  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1101  /* Also find last listening backend before this one */
1102  if (i < MyBackendId)
1103  prevListener = i;
1104  }
1108  /* Insert backend into list of listeners at correct position */
1109  if (prevListener > 0)
1110  {
1112  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1113  }
1114  else
1115  {
1118  }
1119  LWLockRelease(NotifyQueueLock);
1120 
1121  /* Now we are listed in the global array, so remember we're listening */
1122  amRegisteredListener = true;
1123 
1124  /*
1125  * Try to move our pointer forward as far as possible. This will skip
1126  * over already-committed notifications, which we want to do because they
1127  * might be quite stale. Note that we are not yet listening on anything,
1128  * so we won't deliver such notifications to our frontend. Also, although
1129  * our transaction might have executed NOTIFY, those message(s) aren't
1130  * queued yet so we won't skip them here.
1131  */
1132  if (!QUEUE_POS_EQUAL(max, head))
1134 }
#define QUEUE_POS_MAX(x, y)
Definition: async.c:224
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1843
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:829
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog(), i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1200 of file async.c.

1201 {
1202  if (Trace_notify)
1203  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1204 
1206  listenChannels = NIL;
1207 }
void list_free_deep(List *list)
Definition: list.c:1559

References DEBUG1, elog(), list_free_deep(), listenChannels, MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1169 of file async.c.

1170 {
1171  ListCell *q;
1172 
1173  if (Trace_notify)
1174  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1175 
1176  foreach(q, listenChannels)
1177  {
1178  char *lchan = (char *) lfirst(q);
1179 
1180  if (strcmp(lchan, channel) == 0)
1181  {
1183  pfree(lchan);
1184  break;
1185  }
1186  }
1187 
1188  /*
1189  * We do not complain about unlistening something not being listened;
1190  * should we?
1191  */
1192 }
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:390

References DEBUG1, elog(), foreach_delete_current, lfirst, listenChannels, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1796 of file async.c.

1797 {
1798  /*
1799  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1800  * you do here.
1801  */
1802 
1803  /* signal that work needs to be done */
1804  notifyInterruptPending = true;
1805 
1806  /* make sure the event is processed in due course */
1807  SetLatch(MyLatch);
1808 }
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415
struct Latch * MyLatch
Definition: globals.c:58
void SetLatch(Latch *latch)
Definition: latch.c:633

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1218 of file async.c.

1219 {
1220  ListCell *p;
1221 
1222  foreach(p, listenChannels)
1223  {
1224  char *lchan = (char *) lfirst(p);
1225 
1226  if (strcmp(lchan, channel) == 0)
1227  return true;
1228  }
1229  return false;
1230 }

References lfirst, and listenChannels.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2340 of file async.c.

2341 {
2342  const Notification *k = *(const Notification *const *) key;
2343 
2344  Assert(keysize == sizeof(Notification *));
2345  /* We don't bother to include the payload's trailing null in the hash */
2346  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2347  k->channel_len + k->payload_len + 1));
2348 }
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222

References Assert(), Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), sort-test::key, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2354 of file async.c.

2355 {
2356  const Notification *k1 = *(const Notification *const *) key1;
2357  const Notification *k2 = *(const Notification *const *) key2;
2358 
2359  Assert(keysize == sizeof(Notification *));
2360  if (k1->channel_len == k2->channel_len &&
2361  k1->payload_len == k2->payload_len &&
2362  memcmp(k1->data, k2->data,
2363  k1->channel_len + k1->payload_len + 2) == 0)
2364  return 0; /* equal */
2365  return 1; /* not equal */
2366 }

References Assert(), Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2216 of file async.c.

2217 {
2219  {
2221 
2223  pq_sendint32(&buf, srcPid);
2224  pq_sendstring(&buf, channel);
2225  pq_sendstring(&buf, payload);
2226  pq_endmessage(&buf);
2227 
2228  /*
2229  * NOTE: we do not do pq_flush() here. Some level of caller will
2230  * handle it later, allowing this message to be combined into a packet
2231  * with other ones.
2232  */
2233  }
2234  else
2235  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2236 }
@ DestRemote
Definition: dest.h:89
#define INFO
Definition: elog.h:34
CommandDest whereToSendOutput
Definition: postgres.c:89
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:198
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define PqMsg_NotificationResponse
Definition: protocol.h:41

References buf, DestRemote, elog(), INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 796 of file async.c.

797 {
798  FuncCallContext *funcctx;
799 
800  /* stuff done only on the first call of the function */
801  if (SRF_IS_FIRSTCALL())
802  {
803  /* create a function context for cross-call persistence */
804  funcctx = SRF_FIRSTCALL_INIT();
805  }
806 
807  /* stuff done on every call of the function */
808  funcctx = SRF_PERCALL_SETUP();
809 
810  if (funcctx->call_cntr < list_length(listenChannels))
811  {
812  char *channel = (char *) list_nth(listenChannels,
813  funcctx->call_cntr);
814 
815  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
816  }
817 
818  SRF_RETURN_DONE(funcctx);
819 }
#define CStringGetTextDatum(s)
Definition: builtins.h:94
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:328
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
uint64 call_cntr
Definition: funcapi.h:65

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), listenChannels, SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1473 of file async.c.

1474 {
1475  double usage;
1476 
1477  /* Advance the queue tail so we don't report a too-large result */
1479 
1480  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1481  usage = asyncQueueUsage();
1482  LWLockRelease(NotifyQueueLock);
1483 
1485 }
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
static void usage(const char *progname)
Definition: vacuumlo.c:414

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 563 of file async.c.

564 {
565  const char *channel;
566  const char *payload;
567 
568  if (PG_ARGISNULL(0))
569  channel = "";
570  else
571  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
572 
573  if (PG_ARGISNULL(1))
574  payload = "";
575  else
576  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
577 
578  /* For NOTIFY as a statement, this is checked in ProcessUtility */
580 
581  Async_Notify(channel, payload);
582 
583  PG_RETURN_VOID();
584 }
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:597
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:448
char * text_to_cstring(const text *t)
Definition: varlena.c:217

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 867 of file async.c.

868 {
869  ListCell *p;
870 
872  return; /* no relevant statements in this xact */
873 
874  if (Trace_notify)
875  elog(DEBUG1, "PreCommit_Notify");
876 
877  /* Preflight for any pending listen/unlisten actions */
878  if (pendingActions != NULL)
879  {
880  foreach(p, pendingActions->actions)
881  {
882  ListenAction *actrec = (ListenAction *) lfirst(p);
883 
884  switch (actrec->action)
885  {
886  case LISTEN_LISTEN:
888  break;
889  case LISTEN_UNLISTEN:
890  /* there is no Exec_UnlistenPreCommit() */
891  break;
892  case LISTEN_UNLISTEN_ALL:
893  /* there is no Exec_UnlistenAllPreCommit() */
894  break;
895  }
896  }
897  }
898 
899  /* Queue any pending notifies (must happen after the above) */
900  if (pendingNotifies)
901  {
902  ListCell *nextNotify;
903 
904  /*
905  * Make sure that we have an XID assigned to the current transaction.
906  * GetCurrentTransactionId is cheap if we already have an XID, but not
907  * so cheap if we don't, and we'd prefer not to do that work while
908  * holding NotifyQueueLock.
909  */
910  (void) GetCurrentTransactionId();
911 
912  /*
913  * Serialize writers by acquiring a special lock that we hold till
914  * after commit. This ensures that queue entries appear in commit
915  * order, and in particular that there are never uncommitted queue
916  * entries ahead of committed ones, so an uncommitted transaction
917  * can't block delivery of deliverable notifications.
918  *
919  * We use a heavyweight lock so that it'll automatically be released
920  * after either commit or abort. This also allows deadlocks to be
921  * detected, though really a deadlock shouldn't be possible here.
922  *
923  * The lock is on "database 0", which is pretty ugly but it doesn't
924  * seem worth inventing a special locktag category just for this.
925  * (Historical note: before PG 9.0, a similar lock on "database 0" was
926  * used by the flatfiles mechanism.)
927  */
928  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
930 
931  /* Now push the notifications into the queue */
932  nextNotify = list_head(pendingNotifies->events);
933  while (nextNotify != NULL)
934  {
935  /*
936  * Add the pending notifications to the queue. We acquire and
937  * release NotifyQueueLock once per page, which might be overkill
938  * but it does allow readers to get in while we're doing this.
939  *
940  * A full queue is very uncommon and should really not happen,
941  * given that we have so much space available in the SLRU pages.
942  * Nevertheless we need to deal with this possibility. Note that
943  * when we get here we are in the process of committing our
944  * transaction, but we have not yet committed to clog, so at this
945  * point in time we can still roll the transaction back.
946  */
947  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
949  if (asyncQueueIsFull())
950  ereport(ERROR,
951  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
952  errmsg("too many notifications in the NOTIFY queue")));
953  nextNotify = asyncQueueAddEntries(nextNotify);
954  LWLockRelease(NotifyQueueLock);
955  }
956 
957  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
958  }
959 }
static void Exec_ListenPreCommit(void)
Definition: async.c:1047
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1362
static void asyncQueueFillWarning(void)
Definition: async.c:1519
static bool asyncQueueIsFull(void)
Definition: async.c:1278
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessExclusiveLock
Definition: lockdefs.h:43
static ListCell * list_head(const List *l)
Definition: pg_list.h:128

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog(), ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 2175 of file async.c.

2176 {
2177  /* We *must* reset the flag */
2178  notifyInterruptPending = false;
2179 
2180  /* Do nothing else if we aren't actively listening */
2181  if (listenChannels == NIL)
2182  return;
2183 
2184  if (Trace_notify)
2185  elog(DEBUG1, "ProcessIncomingNotify");
2186 
2187  set_ps_display("notify interrupt");
2188 
2189  /*
2190  * We must run asyncQueueReadAllNotifications inside a transaction, else
2191  * bad things happen if it gets an error.
2192  */
2194 
2196 
2198 
2199  /*
2200  * If this isn't an end-of-command case, we must flush the notify messages
2201  * to ensure frontend gets them promptly.
2202  */
2203  if (flush)
2204  pq_flush();
2205 
2206  set_ps_display("idle");
2207 
2208  if (Trace_notify)
2209  elog(DEBUG1, "ProcessIncomingNotify: done");
2210 }
#define pq_flush()
Definition: libpq.h:46
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void StartTransactionCommand(void)
Definition: xact.c:2937
void CommitTransactionCommand(void)
Definition: xact.c:3034

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog(), listenChannels, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1826 of file async.c.

1827 {
1829  return; /* not really idle */
1830 
1831  /* Loop in case another signal arrives while sending messages */
1832  while (notifyInterruptPending)
1833  ProcessIncomingNotify(flush);
1834 }
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2175
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4834

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 696 of file async.c.

697 {
698  MemoryContext oldcontext;
699  ListenAction *actrec;
700  int my_level = GetCurrentTransactionNestLevel();
701 
702  /*
703  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
704  * be too complicated to ensure we get the right interactions of
705  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
706  * would be any performance benefit anyway in sane applications.
707  */
709 
710  /* space for terminating null is included in sizeof(ListenAction) */
711  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
712  strlen(channel) + 1);
713  actrec->action = action;
714  strcpy(actrec->channel, channel);
715 
716  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
717  {
718  ActionList *actions;
719 
720  /*
721  * First action in current sub(xact). Note that we allocate the
722  * ActionList in TopTransactionContext; the nestingLevel might get
723  * changed later by AtSubCommit_Notify.
724  */
725  actions = (ActionList *)
727  actions->nestingLevel = my_level;
728  actions->actions = list_make1(actrec);
729  actions->upper = pendingActions;
730  pendingActions = actions;
731  }
732  else
734 
735  MemoryContextSwitchTo(oldcontext);
736 }

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1573 of file async.c.

1574 {
1575  int32 *pids;
1576  BackendId *ids;
1577  int count;
1578 
1579  /*
1580  * Identify backends that we need to signal. We don't want to send
1581  * signals while holding the NotifyQueueLock, so this loop just builds a
1582  * list of target PIDs.
1583  *
1584  * XXX in principle these pallocs could fail, which would be bad. Maybe
1585  * preallocate the arrays? They're not that large, though.
1586  */
1587  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1588  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1589  count = 0;
1590 
1591  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1593  {
1594  int32 pid = QUEUE_BACKEND_PID(i);
1595  QueuePosition pos;
1596 
1597  Assert(pid != InvalidPid);
1598  pos = QUEUE_BACKEND_POS(i);
1600  {
1601  /*
1602  * Always signal listeners in our own database, unless they're
1603  * already caught up (unlikely, but possible).
1604  */
1605  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1606  continue;
1607  }
1608  else
1609  {
1610  /*
1611  * Listeners in other databases should be signaled only if they
1612  * are far behind.
1613  */
1616  continue;
1617  }
1618  /* OK, need to signal this one */
1619  pids[count] = pid;
1620  ids[count] = i;
1621  count++;
1622  }
1623  LWLockRelease(NotifyQueueLock);
1624 
1625  /* Now send signals */
1626  for (int i = 0; i < count; i++)
1627  {
1628  int32 pid = pids[i];
1629 
1630  /*
1631  * If we are signaling our own process, no need to involve the kernel;
1632  * just set the flag directly.
1633  */
1634  if (pid == MyProcPid)
1635  {
1636  notifyInterruptPending = true;
1637  continue;
1638  }
1639 
1640  /*
1641  * Note: assuming things aren't broken, a signal failure here could
1642  * only occur if the target backend exited since we released
1643  * NotifyQueueLock; which is unlikely but certainly possible. So we
1644  * just log a low-level debug message if it happens.
1645  */
1646  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1647  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1648  }
1649 
1650  pfree(pids);
1651  pfree(ids);
1652 }
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition: async.c:468
#define DEBUG3
Definition: elog.h:28
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_NOTIFY_INTERRUPT
Definition: procsignal.h:33

References Assert(), asyncQueuePageDiff(), DEBUG3, elog(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, notifyInterruptPending, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 296 of file async.c.

Referenced by asyncQueueFillWarning(), and AsyncShmemInit().

◆ listenChannels

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 430 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 310 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

◆ pendingNotifies

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 424 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 418 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().