PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 

Functions

static int asyncQueuePageDiff (int p, int q)
 
static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 192 of file async.c.

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 415 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 166 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 313 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 304 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 303 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 306 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 241 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 302 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 315 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 299 of file async.c.

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

Definition at line 331 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 305 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 314 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 212 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 215 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507
int y
Definition: isn.c:72
int x
Definition: isn.c:71

Definition at line 225 of file async.c.

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 219 of file async.c.

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 204 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 203 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 301 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 300 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 190 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 206 of file async.c.

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 350 of file async.c.

351 {
ListenActionKind
Definition: async.c:351
@ LISTEN_LISTEN
Definition: async.c:352
@ LISTEN_UNLISTEN_ALL
Definition: async.c:354
@ LISTEN_UNLISTEN
Definition: async.c:353

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2343 of file async.c.

2344 {
2346 
2347  /* Create the hash table if it's time to */
2349  pendingNotifies->hashtab == NULL)
2350  {
2351  HASHCTL hash_ctl;
2352  ListCell *l;
2353 
2354  /* Create the hash table */
2355  hash_ctl.keysize = sizeof(Notification *);
2356  hash_ctl.entrysize = sizeof(NotificationHash);
2357  hash_ctl.hash = notification_hash;
2358  hash_ctl.match = notification_match;
2359  hash_ctl.hcxt = CurTransactionContext;
2361  hash_create("Pending Notifies",
2362  256L,
2363  &hash_ctl,
2365 
2366  /* Insert all the already-existing events */
2367  foreach(l, pendingNotifies->events)
2368  {
2369  Notification *oldn = (Notification *) lfirst(l);
2370  NotificationHash *hentry;
2371  bool found;
2372 
2374  &oldn,
2375  HASH_ENTER,
2376  &found);
2377  Assert(!found);
2378  hentry->event = oldn;
2379  }
2380  }
2381 
2382  /* Add new event to the list, in order */
2384 
2385  /* Add event to the hash table if needed */
2386  if (pendingNotifies->hashtab != NULL)
2387  {
2388  NotificationHash *hentry;
2389  bool found;
2390 
2392  &n,
2393  HASH_ENTER,
2394  &found);
2395  Assert(!found);
2396  hentry->event = n;
2397  }
2398 }
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:415
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2406
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2420
struct NotificationHash NotificationHash
static NotificationList * pendingNotifies
Definition: async.c:422
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_COMPARE
Definition: hsearch.h:99
#define HASH_FUNCTION
Definition: hsearch.h:98
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:338
MemoryContext CurTransactionContext
Definition: mcxt.c:147
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
Size keysize
Definition: hsearch.h:75
HashValueFunc hash
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:76
HashCompareFunc match
Definition: hsearch.h:80
MemoryContext hcxt
Definition: hsearch.h:86
Notification * event
Definition: async.c:419
HTAB * hashtab
Definition: async.c:411
List * events
Definition: async.c:410

References Assert(), CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), notification_match(), and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 772 of file async.c.

773 {
774  if (Trace_notify)
775  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
776 
777  queue_listen(LISTEN_LISTEN, channel);
778 }
bool Trace_notify
Definition: async.c:443
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
#define DEBUG1
Definition: elog.h:30
int MyProcPid
Definition: globals.c:44

References DEBUG1, elog(), LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 625 of file async.c.

626 {
627  int my_level = GetCurrentTransactionNestLevel();
628  size_t channel_len;
629  size_t payload_len;
630  Notification *n;
631  MemoryContext oldcontext;
632 
633  if (IsParallelWorker())
634  elog(ERROR, "cannot send notifications from a parallel worker");
635 
636  if (Trace_notify)
637  elog(DEBUG1, "Async_Notify(%s)", channel);
638 
639  channel_len = channel ? strlen(channel) : 0;
640  payload_len = payload ? strlen(payload) : 0;
641 
642  /* a channel name must be specified */
643  if (channel_len == 0)
644  ereport(ERROR,
645  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
646  errmsg("channel name cannot be empty")));
647 
648  /* enforce length limits */
649  if (channel_len >= NAMEDATALEN)
650  ereport(ERROR,
651  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
652  errmsg("channel name too long")));
653 
654  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
655  ereport(ERROR,
656  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
657  errmsg("payload string too long")));
658 
659  /*
660  * We must construct the Notification entry, even if we end up not using
661  * it, in order to compare it cheaply to existing list entries.
662  *
663  * The notification list needs to live until end of transaction, so store
664  * it in the transaction context.
665  */
667 
668  n = (Notification *) palloc(offsetof(Notification, data) +
669  channel_len + payload_len + 2);
670  n->channel_len = channel_len;
671  n->payload_len = payload_len;
672  strcpy(n->data, channel);
673  if (payload)
674  strcpy(n->data + channel_len + 1, payload);
675  else
676  n->data[channel_len + 1] = '\0';
677 
678  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
679  {
680  NotificationList *notifies;
681 
682  /*
683  * First notify event in current (sub)xact. Note that we allocate the
684  * NotificationList in TopTransactionContext; the nestingLevel might
685  * get changed later by AtSubCommit_Notify.
686  */
687  notifies = (NotificationList *)
689  sizeof(NotificationList));
690  notifies->nestingLevel = my_level;
691  notifies->events = list_make1(n);
692  /* We certainly don't need a hashtable yet */
693  notifies->hashtab = NULL;
694  notifies->upper = pendingNotifies;
695  pendingNotifies = notifies;
696  }
697  else
698  {
699  /* Now check for duplicates */
701  {
702  /* It's a dup, so forget it */
703  pfree(n);
704  MemoryContextSwitchTo(oldcontext);
705  return;
706  }
707 
708  /* Append more events to existing list */
710  }
711 
712  MemoryContextSwitchTo(oldcontext);
713 }
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2302
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2343
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:166
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsParallelWorker()
Definition: parallel.h:61
MemoryContext TopTransactionContext
Definition: mcxt.c:146
void pfree(void *pointer)
Definition: mcxt.c:1456
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
void * palloc(Size size)
Definition: mcxt.c:1226
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
#define NAMEDATALEN
const void * data
#define list_make1(x1)
Definition: pg_list.h:212
int nestingLevel
Definition: async.c:409
struct NotificationList * upper
Definition: async.c:412
uint16 payload_len
Definition: async.c:402
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
uint16 channel_len
Definition: async.c:401
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:914

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog(), ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 786 of file async.c.

787 {
788  if (Trace_notify)
789  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
790 
791  /* If we couldn't possibly be listening, no need to queue anything */
792  if (pendingActions == NULL && !unlistenExitRegistered)
793  return;
794 
795  queue_listen(LISTEN_UNLISTEN, channel);
796 }
static ActionList * pendingActions
Definition: async.c:370
static bool unlistenExitRegistered
Definition: async.c:434

References DEBUG1, elog(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 804 of file async.c.

805 {
806  if (Trace_notify)
807  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
808 
809  /* If we couldn't possibly be listening, no need to queue anything */
810  if (pendingActions == NULL && !unlistenExitRegistered)
811  return;
812 
814 }

References DEBUG1, elog(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 857 of file async.c.

858 {
861 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1228
static void asyncQueueUnregister(void)
Definition: async.c:1265

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2302 of file async.c.

2303 {
2304  if (pendingNotifies == NULL)
2305  return false;
2306 
2307  if (pendingNotifies->hashtab != NULL)
2308  {
2309  /* Use the hash table to probe for a match */
2311  &n,
2312  HASH_FIND,
2313  NULL))
2314  return true;
2315  }
2316  else
2317  {
2318  /* Must scan the event list */
2319  ListCell *l;
2320 
2321  foreach(l, pendingNotifies->events)
2322  {
2323  Notification *oldn = (Notification *) lfirst(l);
2324 
2325  if (n->channel_len == oldn->channel_len &&
2326  n->payload_len == oldn->payload_len &&
2327  memcmp(n->data, oldn->data,
2328  n->channel_len + n->payload_len + 2) == 0)
2329  return true;
2330  }
2331  }
2332 
2333  return false;
2334 }
@ HASH_FIND
Definition: hsearch.h:113

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1408 of file async.c.

1409 {
1410  AsyncQueueEntry qe;
1411  QueuePosition queue_head;
1412  int pageno;
1413  int offset;
1414  int slotno;
1415 
1416  /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
1417  LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
1418 
1419  /*
1420  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1421  * memory upon exiting. The reason for this is that if we have to advance
1422  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1423  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1424  * subsequent insertions would try to put entries into a page that slru.c
1425  * thinks doesn't exist yet.) So, use a local position variable. Note
1426  * that if we do fail, any already-inserted queue entries are forgotten;
1427  * this is okay, since they'd be useless anyway after our transaction
1428  * rolls back.
1429  */
1430  queue_head = QUEUE_HEAD;
1431 
1432  /*
1433  * If this is the first write since the postmaster started, we need to
1434  * initialize the first page of the async SLRU. Otherwise, the current
1435  * page should be initialized already, so just fetch it.
1436  *
1437  * (We could also take the first path when the SLRU position has just
1438  * wrapped around, but re-zeroing the page is harmless in that case.)
1439  */
1440  pageno = QUEUE_POS_PAGE(queue_head);
1441  if (QUEUE_POS_IS_ZERO(queue_head))
1442  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1443  else
1444  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1446 
1447  /* Note we mark the page dirty before writing in it */
1448  NotifyCtl->shared->page_dirty[slotno] = true;
1449 
1450  while (nextNotify != NULL)
1451  {
1452  Notification *n = (Notification *) lfirst(nextNotify);
1453 
1454  /* Construct a valid queue entry in local variable qe */
1456 
1457  offset = QUEUE_POS_OFFSET(queue_head);
1458 
1459  /* Check whether the entry really fits on the current page */
1460  if (offset + qe.length <= QUEUE_PAGESIZE)
1461  {
1462  /* OK, so advance nextNotify past this item */
1463  nextNotify = lnext(pendingNotifies->events, nextNotify);
1464  }
1465  else
1466  {
1467  /*
1468  * Write a dummy entry to fill up the page. Actually readers will
1469  * only check dboid and since it won't match any reader's database
1470  * OID, they will ignore this entry and move on.
1471  */
1472  qe.length = QUEUE_PAGESIZE - offset;
1473  qe.dboid = InvalidOid;
1474  qe.data[0] = '\0'; /* empty channel */
1475  qe.data[1] = '\0'; /* empty payload */
1476  }
1477 
1478  /* Now copy qe into the shared buffer page */
1479  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1480  &qe,
1481  qe.length);
1482 
1483  /* Advance queue_head appropriately, and detect if page is full */
1484  if (asyncQueueAdvance(&(queue_head), qe.length))
1485  {
1486  /*
1487  * Page is full, so we're done here, but first fill the next page
1488  * with zeroes. The reason to do this is to ensure that slru.c's
1489  * idea of the head page is always the same as ours, which avoids
1490  * boundary problems in SimpleLruTruncate. The test in
1491  * asyncQueueIsFull() ensured that there is room to create this
1492  * page without overrunning the queue.
1493  */
1494  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1495 
1496  /*
1497  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1498  * set flag to remember that we should try to advance the tail
1499  * pointer (we don't want to actually do that right here).
1500  */
1501  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1502  tryAdvanceTail = true;
1503 
1504  /* And exit the loop */
1505  break;
1506  }
1507  }
1508 
1509  /* Success, so update the global QUEUE_HEAD */
1510  QUEUE_HEAD = queue_head;
1511 
1512  LWLockRelease(NotifySLRULock);
1513 
1514  return nextNotify;
1515 }
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1372
static bool tryAdvanceTail
Definition: async.c:440
#define QUEUE_POS_OFFSET(x)
Definition: async.c:204
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1337
#define QUEUE_POS_PAGE(x)
Definition: async.c:203
#define QUEUE_CLEANUP_DELAY
Definition: async.c:241
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:215
#define NotifyCtl
Definition: async.c:313
#define QUEUE_PAGESIZE
Definition: async.c:314
#define QUEUE_HEAD
Definition: async.c:299
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_EXCLUSIVE
Definition: lwlock.h:116
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:343
#define InvalidOid
Definition: postgres_ext.h:36
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:281
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:396
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:186
#define InvalidTransactionId
Definition: transam.h:31

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1337 of file async.c.

1338 {
1339  int pageno = QUEUE_POS_PAGE(*position);
1340  int offset = QUEUE_POS_OFFSET(*position);
1341  bool pageJump = false;
1342 
1343  /*
1344  * Move to the next writing position: First jump over what we have just
1345  * written or read.
1346  */
1347  offset += entryLength;
1348  Assert(offset <= QUEUE_PAGESIZE);
1349 
1350  /*
1351  * In a second step check if another entry can possibly be written to the
1352  * page. If so, stay here, we have reached the next position. If not, then
1353  * we need to move on to the next page.
1354  */
1356  {
1357  pageno++;
1358  if (pageno > QUEUE_MAX_PAGE)
1359  pageno = 0; /* wrap around */
1360  offset = 0;
1361  pageJump = true;
1362  }
1363 
1364  SET_QUEUE_POS(*position, pageno, offset);
1365  return pageJump;
1366 }
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:206
#define AsyncQueueEntryEmptySize
Definition: async.c:192
#define QUEUE_MAX_PAGE
Definition: async.c:331
#define QUEUEALIGN(len)
Definition: async.c:190

References Assert(), AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2157 of file async.c.

2158 {
2159  QueuePosition min;
2160  int oldtailpage;
2161  int newtailpage;
2162  int boundary;
2163 
2164  /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2165  LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2166 
2167  /*
2168  * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2169  * (ie, exactly match at least one backend's queue position), so it must
2170  * be updated atomically with the actual computation. Since v13, we could
2171  * get away with not doing it like that, but it seems prudent to keep it
2172  * so.
2173  *
2174  * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2175  * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2176  * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2177  * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2178  * there are pages we can truncate but haven't yet finished doing so.
2179  *
2180  * For concurrency's sake, we don't want to hold NotifyQueueLock while
2181  * performing SimpleLruTruncate. This is OK because no backend will try
2182  * to access the pages we are in the midst of truncating.
2183  */
2184  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2185  min = QUEUE_HEAD;
2187  {
2189  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2190  }
2191  QUEUE_TAIL = min;
2192  oldtailpage = QUEUE_STOP_PAGE;
2193  LWLockRelease(NotifyQueueLock);
2194 
2195  /*
2196  * We can truncate something if the global tail advanced across an SLRU
2197  * segment boundary.
2198  *
2199  * XXX it might be better to truncate only once every several segments, to
2200  * reduce the number of directory scans.
2201  */
2202  newtailpage = QUEUE_POS_PAGE(min);
2203  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2204  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2205  {
2206  /*
2207  * SimpleLruTruncate() will ask for NotifySLRULock but will also
2208  * release the lock again.
2209  */
2210  SimpleLruTruncate(NotifyCtl, newtailpage);
2211 
2212  /*
2213  * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
2214  * for the segment immediately prior to the old tail, allowing fresh
2215  * data into that segment.
2216  */
2217  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2218  QUEUE_STOP_PAGE = newtailpage;
2219  LWLockRelease(NotifyQueueLock);
2220  }
2221 
2222  LWLockRelease(NotifyQueueTailLock);
2223 }
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
#define QUEUE_POS_MIN(x, y)
Definition: async.c:219
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define QUEUE_TAIL
Definition: async.c:300
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
#define QUEUE_STOP_PAGE
Definition: async.c:301
int BackendId
Definition: backendid.h:21
int i
Definition: isn.c:73
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1227
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34

References Assert(), asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1576 of file async.c.

1577 {
1578  double fillDegree;
1579  TimestampTz t;
1580 
1581  fillDegree = asyncQueueUsage();
1582  if (fillDegree < 0.5)
1583  return;
1584 
1585  t = GetCurrentTimestamp();
1586 
1589  {
1590  QueuePosition min = QUEUE_HEAD;
1591  int32 minPid = InvalidPid;
1592 
1594  {
1596  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1597  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1598  minPid = QUEUE_BACKEND_PID(i);
1599  }
1600 
1601  ereport(WARNING,
1602  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1603  (minPid != InvalidPid ?
1604  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1605  : 0),
1606  (minPid != InvalidPid ?
1607  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1608  : 0)));
1609 
1611  }
1612 }
static double asyncQueueUsage(void)
Definition: async.c:1547
static AsyncQueueControl * asyncQueueControl
Definition: async.c:297
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:315
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:212
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
signed int int32
Definition: c.h:478
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
#define WARNING
Definition: elog.h:36
TimestampTz lastQueueFillWarn
Definition: async.c:292

References Assert(), asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1306 of file async.c.

1307 {
1308  int nexthead;
1309  int boundary;
1310 
1311  /*
1312  * The queue is full if creating a new head page would create a page that
1313  * logically precedes the current global tail pointer, ie, the head
1314  * pointer would wrap around compared to the tail. We cannot create such
1315  * a head page for fear of confusing slru.c. For safety we round the tail
1316  * pointer back to a segment boundary (truncation logic in
1317  * asyncQueueAdvanceTail does not do this, so doing it here is optional).
1318  *
1319  * Note that this test is *not* dependent on how much space there is on
1320  * the current head page. This is necessary because asyncQueueAddEntries
1321  * might try to create the next head page in any case.
1322  */
1323  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1324  if (nexthead > QUEUE_MAX_PAGE)
1325  nexthead = 0; /* wrap around */
1326  boundary = QUEUE_STOP_PAGE;
1327  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1328  return asyncQueuePagePrecedes(nexthead, boundary);
1329 }

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1372 of file async.c.

1373 {
1374  size_t channellen = n->channel_len;
1375  size_t payloadlen = n->payload_len;
1376  int entryLength;
1377 
1378  Assert(channellen < NAMEDATALEN);
1379  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1380 
1381  /* The terminators are already included in AsyncQueueEntryEmptySize */
1382  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1383  entryLength = QUEUEALIGN(entryLength);
1384  qe->length = entryLength;
1385  qe->dboid = MyDatabaseId;
1386  qe->xid = GetCurrentTransactionId();
1387  qe->srcPid = MyProcPid;
1388  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1389 }
Oid MyDatabaseId
Definition: globals.c:89
int32 srcPid
Definition: async.c:185
TransactionId xid
Definition: async.c:184
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:445

References Assert(), AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, AsyncQueueEntry::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int asyncQueuePageDiff ( int  p,
int  q 
)
static

Definition at line 481 of file async.c.

482 {
483  int diff;
484 
485  /*
486  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
487  * in the range 0..QUEUE_MAX_PAGE.
488  */
489  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
490  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
491 
492  diff = p - q;
493  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
494  diff -= QUEUE_MAX_PAGE + 1;
495  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
496  diff += QUEUE_MAX_PAGE + 1;
497  return diff;
498 }

References Assert(), and QUEUE_MAX_PAGE.

Referenced by asyncQueuePagePrecedes(), and SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 507 of file async.c.

508 {
509  return asyncQueuePageDiff(p, q) < 0;
510 }
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:481

References asyncQueuePageDiff().

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2065 of file async.c.

2069 {
2070  bool reachedStop = false;
2071  bool reachedEndOfPage;
2072  AsyncQueueEntry *qe;
2073 
2074  do
2075  {
2076  QueuePosition thisentry = *current;
2077 
2078  if (QUEUE_POS_EQUAL(thisentry, stop))
2079  break;
2080 
2081  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2082 
2083  /*
2084  * Advance *current over this message, possibly to the next page. As
2085  * noted in the comments for asyncQueueReadAllNotifications, we must
2086  * do this before possibly failing while processing the message.
2087  */
2088  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2089 
2090  /* Ignore messages destined for other databases */
2091  if (qe->dboid == MyDatabaseId)
2092  {
2093  if (XidInMVCCSnapshot(qe->xid, snapshot))
2094  {
2095  /*
2096  * The source transaction is still in progress, so we can't
2097  * process this message yet. Break out of the loop, but first
2098  * back up *current so we will reprocess the message next
2099  * time. (Note: it is unlikely but not impossible for
2100  * TransactionIdDidCommit to fail, so we can't really avoid
2101  * this advance-then-back-up behavior when dealing with an
2102  * uncommitted message.)
2103  *
2104  * Note that we must test XidInMVCCSnapshot before we test
2105  * TransactionIdDidCommit, else we might return a message from
2106  * a transaction that is not yet visible to snapshots; compare
2107  * the comments at the head of heapam_visibility.c.
2108  *
2109  * Also, while our own xact won't be listed in the snapshot,
2110  * we need not check for TransactionIdIsCurrentTransactionId
2111  * because our transaction cannot (yet) have queued any
2112  * messages.
2113  */
2114  *current = thisentry;
2115  reachedStop = true;
2116  break;
2117  }
2118  else if (TransactionIdDidCommit(qe->xid))
2119  {
2120  /* qe->data is the null-terminated channel name */
2121  char *channel = qe->data;
2122 
2123  if (IsListeningOn(channel))
2124  {
2125  /* payload follows channel name */
2126  char *payload = qe->data + strlen(channel) + 1;
2127 
2128  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2129  }
2130  }
2131  else
2132  {
2133  /*
2134  * The source transaction aborted or crashed, so we just
2135  * ignore its notifications.
2136  */
2137  }
2138  }
2139 
2140  /* Loop back if we're not at end of page */
2141  } while (!reachedEndOfPage);
2142 
2143  if (QUEUE_POS_EQUAL(*current, stop))
2144  reachedStop = true;
2145 
2146  return reachedStop;
2147 }
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2278
static bool IsListeningOn(const char *channel)
Definition: async.c:1246
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2287
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1900 of file async.c.

1901 {
1902  volatile QueuePosition pos;
1903  QueuePosition head;
1904  Snapshot snapshot;
1905 
1906  /* page_buffer must be adequately aligned, so use a union */
1907  union
1908  {
1909  char buf[QUEUE_PAGESIZE];
1910  AsyncQueueEntry align;
1911  } page_buffer;
1912 
1913  /* Fetch current state */
1914  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1915  /* Assert checks that we have a valid state entry */
1918  head = QUEUE_HEAD;
1919  LWLockRelease(NotifyQueueLock);
1920 
1921  if (QUEUE_POS_EQUAL(pos, head))
1922  {
1923  /* Nothing to do, we have read all notifications already. */
1924  return;
1925  }
1926 
1927  /*----------
1928  * Get snapshot we'll use to decide which xacts are still in progress.
1929  * This is trickier than it might seem, because of race conditions.
1930  * Consider the following example:
1931  *
1932  * Backend 1: Backend 2:
1933  *
1934  * transaction starts
1935  * UPDATE foo SET ...;
1936  * NOTIFY foo;
1937  * commit starts
1938  * queue the notify message
1939  * transaction starts
1940  * LISTEN foo; -- first LISTEN in session
1941  * SELECT * FROM foo WHERE ...;
1942  * commit to clog
1943  * commit starts
1944  * add backend 2 to array of listeners
1945  * advance to queue head (this code)
1946  * commit to clog
1947  *
1948  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1949  * wasn't committed yet. Ideally we'd ensure that client 2 would
1950  * eventually get transaction 1's notify message, but there's no way
1951  * to do that; until we're in the listener array, there's no guarantee
1952  * that the notify message doesn't get removed from the queue.
1953  *
1954  * Therefore the coding technique transaction 2 is using is unsafe:
1955  * applications must commit a LISTEN before inspecting database state,
1956  * if they want to ensure they will see notifications about subsequent
1957  * changes to that state.
1958  *
1959  * What we do guarantee is that we'll see all notifications from
1960  * transactions committing after the snapshot we take here.
1961  * Exec_ListenPreCommit has already added us to the listener array,
1962  * so no not-yet-committed messages can be removed from the queue
1963  * before we see them.
1964  *----------
1965  */
1966  snapshot = RegisterSnapshot(GetLatestSnapshot());
1967 
1968  /*
1969  * It is possible that we fail while trying to send a message to our
1970  * frontend (for example, because of encoding conversion failure). If
1971  * that happens it is critical that we not try to send the same message
1972  * over and over again. Therefore, we place a PG_TRY block here that will
1973  * forcibly advance our queue position before we lose control to an error.
1974  * (We could alternatively retake NotifyQueueLock and move the position
1975  * before handling each individual message, but that seems like too much
1976  * lock traffic.)
1977  */
1978  PG_TRY();
1979  {
1980  bool reachedStop;
1981 
1982  do
1983  {
1984  int curpage = QUEUE_POS_PAGE(pos);
1985  int curoffset = QUEUE_POS_OFFSET(pos);
1986  int slotno;
1987  int copysize;
1988 
1989  /*
1990  * We copy the data from SLRU into a local buffer, so as to avoid
1991  * holding the NotifySLRULock while we are examining the entries
1992  * and possibly transmitting them to our frontend. Copy only the
1993  * part of the page we will actually inspect.
1994  */
1995  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
1997  if (curpage == QUEUE_POS_PAGE(head))
1998  {
1999  /* we only want to read as far as head */
2000  copysize = QUEUE_POS_OFFSET(head) - curoffset;
2001  if (copysize < 0)
2002  copysize = 0; /* just for safety */
2003  }
2004  else
2005  {
2006  /* fetch all the rest of the page */
2007  copysize = QUEUE_PAGESIZE - curoffset;
2008  }
2009  memcpy(page_buffer.buf + curoffset,
2010  NotifyCtl->shared->page_buffer[slotno] + curoffset,
2011  copysize);
2012  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2013  LWLockRelease(NotifySLRULock);
2014 
2015  /*
2016  * Process messages up to the stop position, end of page, or an
2017  * uncommitted message.
2018  *
2019  * Our stop position is what we found to be the head's position
2020  * when we entered this function. It might have changed already.
2021  * But if it has, we will receive (or have already received and
2022  * queued) another signal and come here again.
2023  *
2024  * We are not holding NotifyQueueLock here! The queue can only
2025  * extend beyond the head pointer (see above) and we leave our
2026  * backend's pointer where it is so nobody will truncate or
2027  * rewrite pages under us. Especially we don't want to hold a lock
2028  * while sending the notifications to the frontend.
2029  */
2030  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2031  page_buffer.buf,
2032  snapshot);
2033  } while (!reachedStop);
2034  }
2035  PG_FINALLY();
2036  {
2037  /* Update shared state */
2038  LWLockAcquire(NotifyQueueLock, LW_SHARED);
2040  LWLockRelease(NotifyQueueLock);
2041  }
2042  PG_END_TRY();
2043 
2044  /* Done with snapshot */
2045  UnregisterSnapshot(snapshot);
2046 }
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2065
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_FINALLY(...)
Definition: elog.h:387
BackendId MyBackendId
Definition: globals.c:85
@ LW_SHARED
Definition: lwlock.h:117
static char * buf
Definition: pg_test_fsync.c:67
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:496
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:326
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:871
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:829

References Assert(), asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1265 of file async.c.

1266 {
1267  Assert(listenChannels == NIL); /* else caller error */
1268 
1269  if (!amRegisteredListener) /* nothing to do */
1270  return;
1271 
1272  /*
1273  * Need exclusive lock here to manipulate list links.
1274  */
1275  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1276  /* Mark our entry as invalid */
1279  /* and remove it from the list */
1282  else
1283  {
1285  {
1287  {
1289  break;
1290  }
1291  }
1292  }
1294  LWLockRelease(NotifyQueueLock);
1295 
1296  /* mark ourselves as no longer listed in the global array */
1297  amRegisteredListener = false;
1298 }
static List * listenChannels
Definition: async.c:338
static bool amRegisteredListener
Definition: async.c:437
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:304
#define InvalidBackendId
Definition: backendid.h:23

References amRegisteredListener, Assert(), i, InvalidBackendId, InvalidOid, InvalidPid, listenChannels, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1547 of file async.c.

1548 {
1549  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1550  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1551  int occupied;
1552 
1553  occupied = headPage - tailPage;
1554 
1555  if (occupied == 0)
1556  return (double) 0; /* fast exit for common case */
1557 
1558  if (occupied < 0)
1559  {
1560  /* head has wrapped around, tail not yet */
1561  occupied += QUEUE_MAX_PAGE + 1;
1562  }
1563 
1564  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1565 }

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 533 of file async.c.

534 {
535  bool found;
536  Size size;
537 
538  /*
539  * Create or attach to the AsyncQueueControl structure.
540  *
541  * The used entries in the backend[] array run from 1 to MaxBackends; the
542  * zero'th entry is unused but must be allocated.
543  */
544  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
545  size = add_size(size, offsetof(AsyncQueueControl, backend));
546 
548  ShmemInitStruct("Async Queue Control", size, &found);
549 
550  if (!found)
551  {
552  /* First time through, so initialize it */
553  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
554  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
555  QUEUE_STOP_PAGE = 0;
558  /* zero'th entry won't be used, but let's initialize it anyway */
559  for (int i = 0; i <= MaxBackends; i++)
560  {
565  }
566  }
567 
568  /*
569  * Set up SLRU management of the pg_notify data.
570  */
571  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
573  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
575 
576  if (!found)
577  {
578  /*
579  * During start or reboot, clean out the pg_notify directory.
580  */
582  }
583 }
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
size_t Size
Definition: c.h:589
int MaxBackends
Definition: globals.c:140
@ LWTRANCHE_NOTIFY_BUFFER
Definition: lwlock.h:186
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1531
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:188
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1554
@ SYNC_HANDLER_NONE
Definition: sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateSharedMemoryAndSemaphores().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 516 of file async.c.

517 {
518  Size size;
519 
520  /* This had better match AsyncShmemInit */
521  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
522  size = add_size(size, offsetof(AsyncQueueControl, backend));
523 
525 
526  return size;
527 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1720 of file async.c.

1721 {
1722  /*
1723  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1724  * we have registered as a listener but have not made any entry in
1725  * listenChannels. In that case, deregister again.
1726  */
1729 
1730  /* And clean up */
1732 }
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2436

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), listenChannels, and NIL.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1002 of file async.c.

1003 {
1004  ListCell *p;
1005 
1006  /*
1007  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1008  * return as soon as possible
1009  */
1011  return;
1012 
1013  if (Trace_notify)
1014  elog(DEBUG1, "AtCommit_Notify");
1015 
1016  /* Perform any pending listen/unlisten actions */
1017  if (pendingActions != NULL)
1018  {
1019  foreach(p, pendingActions->actions)
1020  {
1021  ListenAction *actrec = (ListenAction *) lfirst(p);
1022 
1023  switch (actrec->action)
1024  {
1025  case LISTEN_LISTEN:
1026  Exec_ListenCommit(actrec->channel);
1027  break;
1028  case LISTEN_UNLISTEN:
1029  Exec_UnlistenCommit(actrec->channel);
1030  break;
1031  case LISTEN_UNLISTEN_ALL:
1033  break;
1034  }
1035  }
1036  }
1037 
1038  /* If no longer listening to anything, get out of listener array */
1041 
1042  /*
1043  * Send signals to listening backends. We need do this only if there are
1044  * pending notifies, which were previously added to the shared queue by
1045  * PreCommit_Notify().
1046  */
1047  if (pendingNotifies != NULL)
1048  SignalBackends();
1049 
1050  /*
1051  * If it's time to try to advance the global tail pointer, do that.
1052  *
1053  * (It might seem odd to do this in the sender, when more than likely the
1054  * listeners won't yet have read the messages we just sent. However,
1055  * there's less contention if only the sender does it, and there is little
1056  * need for urgency in advancing the global tail. So this typically will
1057  * be clearing out messages that were sent some time ago.)
1058  */
1059  if (tryAdvanceTail)
1060  {
1061  tryAdvanceTail = false;
1063  }
1064 
1065  /* And clean up */
1067 }
static void SignalBackends(void)
Definition: async.c:1630
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1170
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1197
static void asyncQueueAdvanceTail(void)
Definition: async.c:2157
List * actions
Definition: async.c:366
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:360
ListenActionKind action
Definition: async.c:359

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog(), Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 870 of file async.c.

871 {
872  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
874  ereport(ERROR,
875  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
876  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
877 }

References ereport, errcode(), errmsg(), ERROR, pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1810 of file async.c.

1811 {
1812  int my_level = GetCurrentTransactionNestLevel();
1813 
1814  /*
1815  * All we have to do is pop the stack --- the actions/notifies made in
1816  * this subxact are no longer interesting, and the space will be freed
1817  * when CurTransactionContext is recycled. We still have to free the
1818  * ActionList and NotificationList objects themselves, though, because
1819  * those are allocated in TopTransactionContext.
1820  *
1821  * Note that there might be no entries at all, or no entries for the
1822  * current subtransaction level, either because none were ever created, or
1823  * because we reentered this routine due to trouble during subxact abort.
1824  */
1825  while (pendingActions != NULL &&
1826  pendingActions->nestingLevel >= my_level)
1827  {
1828  ActionList *childPendingActions = pendingActions;
1829 
1831  pfree(childPendingActions);
1832  }
1833 
1834  while (pendingNotifies != NULL &&
1835  pendingNotifies->nestingLevel >= my_level)
1836  {
1837  NotificationList *childPendingNotifies = pendingNotifies;
1838 
1840  pfree(childPendingNotifies);
1841  }
1842 }
int nestingLevel
Definition: async.c:365
struct ActionList * upper
Definition: async.c:367

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1740 of file async.c.

1741 {
1742  int my_level = GetCurrentTransactionNestLevel();
1743 
1744  /* If there are actions at our nesting level, we must reparent them. */
1745  if (pendingActions != NULL &&
1746  pendingActions->nestingLevel >= my_level)
1747  {
1748  if (pendingActions->upper == NULL ||
1749  pendingActions->upper->nestingLevel < my_level - 1)
1750  {
1751  /* nothing to merge; give the whole thing to the parent */
1753  }
1754  else
1755  {
1756  ActionList *childPendingActions = pendingActions;
1757 
1759 
1760  /*
1761  * Mustn't try to eliminate duplicates here --- see queue_listen()
1762  */
1765  childPendingActions->actions);
1766  pfree(childPendingActions);
1767  }
1768  }
1769 
1770  /* If there are notifies at our nesting level, we must reparent them. */
1771  if (pendingNotifies != NULL &&
1772  pendingNotifies->nestingLevel >= my_level)
1773  {
1774  Assert(pendingNotifies->nestingLevel == my_level);
1775 
1776  if (pendingNotifies->upper == NULL ||
1777  pendingNotifies->upper->nestingLevel < my_level - 1)
1778  {
1779  /* nothing to merge; give the whole thing to the parent */
1781  }
1782  else
1783  {
1784  /*
1785  * Formerly, we didn't bother to eliminate duplicates here, but
1786  * now we must, else we fall foul of "Assert(!found)", either here
1787  * or during a later attempt to build the parent-level hashtable.
1788  */
1789  NotificationList *childPendingNotifies = pendingNotifies;
1790  ListCell *l;
1791 
1793  /* Insert all the subxact's events into parent, except for dups */
1794  foreach(l, childPendingNotifies->events)
1795  {
1796  Notification *childn = (Notification *) lfirst(l);
1797 
1798  if (!AsyncExistsPendingNotify(childn))
1799  AddEventToPendingNotifies(childn);
1800  }
1801  pfree(childPendingNotifies);
1802  }
1803  }
1804 }
List * list_concat(List *list1, const List *list2)
Definition: list.c:560

References ActionList::actions, AddEventToPendingNotifies(), Assert(), AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2436 of file async.c.

2437 {
2438  /*
2439  * Everything's allocated in either TopTransactionContext or the context
2440  * for the subtransaction to which it corresponds. So, there's nothing to
2441  * do here except reset the pointers; the space will be reclaimed when the
2442  * contexts are deleted.
2443  */
2444  pendingActions = NULL;
2445  pendingNotifies = NULL;
2446 }

References pendingActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1170 of file async.c.

1171 {
1172  MemoryContext oldcontext;
1173 
1174  /* Do nothing if we are already listening on this channel */
1175  if (IsListeningOn(channel))
1176  return;
1177 
1178  /*
1179  * Add the new channel name to listenChannels.
1180  *
1181  * XXX It is theoretically possible to get an out-of-memory failure here,
1182  * which would be bad because we already committed. For the moment it
1183  * doesn't seem worth trying to guard against that, but maybe improve this
1184  * later.
1185  */
1188  MemoryContextSwitchTo(oldcontext);
1189 }
char * pstrdup(const char *in)
Definition: mcxt.c:1644
MemoryContext TopMemoryContext
Definition: mcxt.c:141

References IsListeningOn(), lappend(), listenChannels, MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1075 of file async.c.

1076 {
1077  QueuePosition head;
1078  QueuePosition max;
1079  BackendId prevListener;
1080 
1081  /*
1082  * Nothing to do if we are already listening to something, nor if we
1083  * already ran this routine in this transaction.
1084  */
1086  return;
1087 
1088  if (Trace_notify)
1089  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1090 
1091  /*
1092  * Before registering, make sure we will unlisten before dying. (Note:
1093  * this action does not get undone if we abort later.)
1094  */
1096  {
1098  unlistenExitRegistered = true;
1099  }
1100 
1101  /*
1102  * This is our first LISTEN, so establish our pointer.
1103  *
1104  * We set our pointer to the global tail pointer and then move it forward
1105  * over already-committed notifications. This ensures we cannot miss any
1106  * not-yet-committed notifications. We might get a few more but that
1107  * doesn't hurt.
1108  *
1109  * In some scenarios there might be a lot of committed notifications that
1110  * have not yet been pruned away (because some backend is being lazy about
1111  * reading them). To reduce our startup time, we can look at other
1112  * backends and adopt the maximum "pos" pointer of any backend that's in
1113  * our database; any notifications it's already advanced over are surely
1114  * committed and need not be re-examined by us. (We must consider only
1115  * backends connected to our DB, because others will not have bothered to
1116  * check committed-ness of notifications in our DB.)
1117  *
1118  * We need exclusive lock here so we can look at other backends' entries
1119  * and manipulate the list links.
1120  */
1121  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1122  head = QUEUE_HEAD;
1123  max = QUEUE_TAIL;
1124  prevListener = InvalidBackendId;
1126  {
1128  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1129  /* Also find last listening backend before this one */
1130  if (i < MyBackendId)
1131  prevListener = i;
1132  }
1136  /* Insert backend into list of listeners at correct position */
1137  if (prevListener > 0)
1138  {
1140  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1141  }
1142  else
1143  {
1146  }
1147  LWLockRelease(NotifyQueueLock);
1148 
1149  /* Now we are listed in the global array, so remember we're listening */
1150  amRegisteredListener = true;
1151 
1152  /*
1153  * Try to move our pointer forward as far as possible. This will skip
1154  * over already-committed notifications, which we want to do because they
1155  * might be quite stale. Note that we are not yet listening on anything,
1156  * so we won't deliver such notifications to our frontend. Also, although
1157  * our transaction might have executed NOTIFY, those message(s) aren't
1158  * queued yet so we won't skip them here.
1159  */
1160  if (!QUEUE_POS_EQUAL(max, head))
1162 }
#define QUEUE_POS_MAX(x, y)
Definition: async.c:225
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1900
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:857
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog(), i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1228 of file async.c.

1229 {
1230  if (Trace_notify)
1231  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1232 
1234  listenChannels = NIL;
1235 }
void list_free_deep(List *list)
Definition: list.c:1559

References DEBUG1, elog(), list_free_deep(), listenChannels, MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1197 of file async.c.

1198 {
1199  ListCell *q;
1200 
1201  if (Trace_notify)
1202  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1203 
1204  foreach(q, listenChannels)
1205  {
1206  char *lchan = (char *) lfirst(q);
1207 
1208  if (strcmp(lchan, channel) == 0)
1209  {
1211  pfree(lchan);
1212  break;
1213  }
1214  }
1215 
1216  /*
1217  * We do not complain about unlistening something not being listened;
1218  * should we?
1219  */
1220 }
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:390

References DEBUG1, elog(), foreach_delete_current, lfirst, listenChannels, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1853 of file async.c.

1854 {
1855  /*
1856  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1857  * you do here.
1858  */
1859 
1860  /* signal that work needs to be done */
1861  notifyInterruptPending = true;
1862 
1863  /* make sure the event is processed in due course */
1864  SetLatch(MyLatch);
1865 }
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431
struct Latch * MyLatch
Definition: globals.c:58
void SetLatch(Latch *latch)
Definition: latch.c:607

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1246 of file async.c.

1247 {
1248  ListCell *p;
1249 
1250  foreach(p, listenChannels)
1251  {
1252  char *lchan = (char *) lfirst(p);
1253 
1254  if (strcmp(lchan, channel) == 0)
1255  return true;
1256  }
1257  return false;
1258 }

References lfirst, and listenChannels.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2406 of file async.c.

2407 {
2408  const Notification *k = *(const Notification *const *) key;
2409 
2410  Assert(keysize == sizeof(Notification *));
2411  /* We don't bother to include the payload's trailing null in the hash */
2412  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2413  k->channel_len + k->payload_len + 1));
2414 }
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222

References Assert(), Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), sort-test::key, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2420 of file async.c.

2421 {
2422  const Notification *k1 = *(const Notification *const *) key1;
2423  const Notification *k2 = *(const Notification *const *) key2;
2424 
2425  Assert(keysize == sizeof(Notification *));
2426  if (k1->channel_len == k2->channel_len &&
2427  k1->payload_len == k2->payload_len &&
2428  memcmp(k1->data, k2->data,
2429  k1->channel_len + k1->payload_len + 2) == 0)
2430  return 0; /* equal */
2431  return 1; /* not equal */
2432 }

References Assert(), Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2278 of file async.c.

2279 {
2281  {
2283 
2284  pq_beginmessage(&buf, 'A');
2285  pq_sendint32(&buf, srcPid);
2286  pq_sendstring(&buf, channel);
2287  pq_sendstring(&buf, payload);
2288  pq_endmessage(&buf);
2289 
2290  /*
2291  * NOTE: we do not do pq_flush() here. Some level of caller will
2292  * handle it later, allowing this message to be combined into a packet
2293  * with other ones.
2294  */
2295  }
2296  else
2297  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2298 }
@ DestRemote
Definition: dest.h:89
#define INFO
Definition: elog.h:34
CommandDest whereToSendOutput
Definition: postgres.c:88
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:198
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145

References buf, DestRemote, elog(), INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 824 of file async.c.

825 {
826  FuncCallContext *funcctx;
827 
828  /* stuff done only on the first call of the function */
829  if (SRF_IS_FIRSTCALL())
830  {
831  /* create a function context for cross-call persistence */
832  funcctx = SRF_FIRSTCALL_INIT();
833  }
834 
835  /* stuff done on every call of the function */
836  funcctx = SRF_PERCALL_SETUP();
837 
838  if (funcctx->call_cntr < list_length(listenChannels))
839  {
840  char *channel = (char *) list_nth(listenChannels,
841  funcctx->call_cntr);
842 
843  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
844  }
845 
846  SRF_RETURN_DONE(funcctx);
847 }
#define CStringGetTextDatum(s)
Definition: builtins.h:94
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:328
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
uint64 call_cntr
Definition: funcapi.h:65

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), listenChannels, SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1522 of file async.c.

1523 {
1524  double usage;
1525 
1526  /* Advance the queue tail so we don't report a too-large result */
1528 
1529  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1530  usage = asyncQueueUsage();
1531  LWLockRelease(NotifyQueueLock);
1532 
1534 }
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
static void usage(const char *progname)
Definition: vacuumlo.c:414

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 591 of file async.c.

592 {
593  const char *channel;
594  const char *payload;
595 
596  if (PG_ARGISNULL(0))
597  channel = "";
598  else
599  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
600 
601  if (PG_ARGISNULL(1))
602  payload = "";
603  else
604  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
605 
606  /* For NOTIFY as a statement, this is checked in ProcessUtility */
608 
609  Async_Notify(channel, payload);
610 
611  PG_RETURN_VOID();
612 }
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:625
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:448
char * text_to_cstring(const text *t)
Definition: varlena.c:215

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 895 of file async.c.

896 {
897  ListCell *p;
898 
900  return; /* no relevant statements in this xact */
901 
902  if (Trace_notify)
903  elog(DEBUG1, "PreCommit_Notify");
904 
905  /* Preflight for any pending listen/unlisten actions */
906  if (pendingActions != NULL)
907  {
908  foreach(p, pendingActions->actions)
909  {
910  ListenAction *actrec = (ListenAction *) lfirst(p);
911 
912  switch (actrec->action)
913  {
914  case LISTEN_LISTEN:
916  break;
917  case LISTEN_UNLISTEN:
918  /* there is no Exec_UnlistenPreCommit() */
919  break;
920  case LISTEN_UNLISTEN_ALL:
921  /* there is no Exec_UnlistenAllPreCommit() */
922  break;
923  }
924  }
925  }
926 
927  /* Queue any pending notifies (must happen after the above) */
928  if (pendingNotifies)
929  {
930  ListCell *nextNotify;
931 
932  /*
933  * Make sure that we have an XID assigned to the current transaction.
934  * GetCurrentTransactionId is cheap if we already have an XID, but not
935  * so cheap if we don't, and we'd prefer not to do that work while
936  * holding NotifyQueueLock.
937  */
938  (void) GetCurrentTransactionId();
939 
940  /*
941  * Serialize writers by acquiring a special lock that we hold till
942  * after commit. This ensures that queue entries appear in commit
943  * order, and in particular that there are never uncommitted queue
944  * entries ahead of committed ones, so an uncommitted transaction
945  * can't block delivery of deliverable notifications.
946  *
947  * We use a heavyweight lock so that it'll automatically be released
948  * after either commit or abort. This also allows deadlocks to be
949  * detected, though really a deadlock shouldn't be possible here.
950  *
951  * The lock is on "database 0", which is pretty ugly but it doesn't
952  * seem worth inventing a special locktag category just for this.
953  * (Historical note: before PG 9.0, a similar lock on "database 0" was
954  * used by the flatfiles mechanism.)
955  */
956  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
958 
959  /* Now push the notifications into the queue */
960  nextNotify = list_head(pendingNotifies->events);
961  while (nextNotify != NULL)
962  {
963  /*
964  * Add the pending notifications to the queue. We acquire and
965  * release NotifyQueueLock once per page, which might be overkill
966  * but it does allow readers to get in while we're doing this.
967  *
968  * A full queue is very uncommon and should really not happen,
969  * given that we have so much space available in the SLRU pages.
970  * Nevertheless we need to deal with this possibility. Note that
971  * when we get here we are in the process of committing our
972  * transaction, but we have not yet committed to clog, so at this
973  * point in time we can still roll the transaction back.
974  */
975  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
977  if (asyncQueueIsFull())
978  ereport(ERROR,
979  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
980  errmsg("too many notifications in the NOTIFY queue")));
981  nextNotify = asyncQueueAddEntries(nextNotify);
982  LWLockRelease(NotifyQueueLock);
983  }
984 
985  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
986  }
987 }
static void Exec_ListenPreCommit(void)
Definition: async.c:1075
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1408
static void asyncQueueFillWarning(void)
Definition: async.c:1576
static bool asyncQueueIsFull(void)
Definition: async.c:1306
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessExclusiveLock
Definition: lockdefs.h:43
static ListCell * list_head(const List *l)
Definition: pg_list.h:128

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog(), ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 2237 of file async.c.

2238 {
2239  /* We *must* reset the flag */
2240  notifyInterruptPending = false;
2241 
2242  /* Do nothing else if we aren't actively listening */
2243  if (listenChannels == NIL)
2244  return;
2245 
2246  if (Trace_notify)
2247  elog(DEBUG1, "ProcessIncomingNotify");
2248 
2249  set_ps_display("notify interrupt");
2250 
2251  /*
2252  * We must run asyncQueueReadAllNotifications inside a transaction, else
2253  * bad things happen if it gets an error.
2254  */
2256 
2258 
2260 
2261  /*
2262  * If this isn't an end-of-command case, we must flush the notify messages
2263  * to ensure frontend gets them promptly.
2264  */
2265  if (flush)
2266  pq_flush();
2267 
2268  set_ps_display("idle");
2269 
2270  if (Trace_notify)
2271  elog(DEBUG1, "ProcessIncomingNotify: done");
2272 }
#define pq_flush()
Definition: libpq.h:46
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void StartTransactionCommand(void)
Definition: xact.c:2937
void CommitTransactionCommand(void)
Definition: xact.c:3034

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog(), listenChannels, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1883 of file async.c.

1884 {
1886  return; /* not really idle */
1887 
1888  /* Loop in case another signal arrives while sending messages */
1889  while (notifyInterruptPending)
1890  ProcessIncomingNotify(flush);
1891 }
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2237
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4833

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 724 of file async.c.

725 {
726  MemoryContext oldcontext;
727  ListenAction *actrec;
728  int my_level = GetCurrentTransactionNestLevel();
729 
730  /*
731  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
732  * be too complicated to ensure we get the right interactions of
733  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
734  * would be any performance benefit anyway in sane applications.
735  */
737 
738  /* space for terminating null is included in sizeof(ListenAction) */
739  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
740  strlen(channel) + 1);
741  actrec->action = action;
742  strcpy(actrec->channel, channel);
743 
744  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
745  {
746  ActionList *actions;
747 
748  /*
749  * First action in current sub(xact). Note that we allocate the
750  * ActionList in TopTransactionContext; the nestingLevel might get
751  * changed later by AtSubCommit_Notify.
752  */
753  actions = (ActionList *)
755  actions->nestingLevel = my_level;
756  actions->actions = list_make1(actrec);
757  actions->upper = pendingActions;
758  pendingActions = actions;
759  }
760  else
762 
763  MemoryContextSwitchTo(oldcontext);
764 }

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1630 of file async.c.

1631 {
1632  int32 *pids;
1633  BackendId *ids;
1634  int count;
1635 
1636  /*
1637  * Identify backends that we need to signal. We don't want to send
1638  * signals while holding the NotifyQueueLock, so this loop just builds a
1639  * list of target PIDs.
1640  *
1641  * XXX in principle these pallocs could fail, which would be bad. Maybe
1642  * preallocate the arrays? They're not that large, though.
1643  */
1644  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1645  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1646  count = 0;
1647 
1648  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1650  {
1651  int32 pid = QUEUE_BACKEND_PID(i);
1652  QueuePosition pos;
1653 
1654  Assert(pid != InvalidPid);
1655  pos = QUEUE_BACKEND_POS(i);
1657  {
1658  /*
1659  * Always signal listeners in our own database, unless they're
1660  * already caught up (unlikely, but possible).
1661  */
1662  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1663  continue;
1664  }
1665  else
1666  {
1667  /*
1668  * Listeners in other databases should be signaled only if they
1669  * are far behind.
1670  */
1673  continue;
1674  }
1675  /* OK, need to signal this one */
1676  pids[count] = pid;
1677  ids[count] = i;
1678  count++;
1679  }
1680  LWLockRelease(NotifyQueueLock);
1681 
1682  /* Now send signals */
1683  for (int i = 0; i < count; i++)
1684  {
1685  int32 pid = pids[i];
1686 
1687  /*
1688  * If we are signaling our own process, no need to involve the kernel;
1689  * just set the flag directly.
1690  */
1691  if (pid == MyProcPid)
1692  {
1693  notifyInterruptPending = true;
1694  continue;
1695  }
1696 
1697  /*
1698  * Note: assuming things aren't broken, a signal failure here could
1699  * only occur if the target backend exited since we released
1700  * NotifyQueueLock; which is unlikely but certainly possible. So we
1701  * just log a low-level debug message if it happens.
1702  */
1703  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1704  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1705  }
1706 
1707  pfree(pids);
1708  pfree(ids);
1709 }
#define DEBUG3
Definition: elog.h:28
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_NOTIFY_INTERRUPT
Definition: procsignal.h:33

References Assert(), asyncQueuePageDiff(), DEBUG3, elog(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, notifyInterruptPending, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 297 of file async.c.

Referenced by asyncQueueFillWarning(), and AsyncShmemInit().

◆ listenChannels

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 311 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

◆ pendingNotifies

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 440 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 434 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().