PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/hashutils.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)   ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define AsyncCtl   (&AsyncCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL }
 

Functions

static int asyncQueuePageDiff (int p, int q)
 
static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (void)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void ProcessCompletedNotifies (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData AsyncCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool backendHasSentNotifications = false
 
static bool backendTryAdvanceTail = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncCtl

#define AsyncCtl   (&AsyncCtlData)

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 182 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 399 of file async.c.

Referenced by AddEventToPendingNotifies().

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 156 of file async.c.

Referenced by Async_Notify(), and asyncQueueNotificationToEntry().

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

◆ QUEUE_BACKEND_PID

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 228 of file async.c.

Referenced by asyncQueueAddEntries(), and SignalBackends().

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 296 of file async.c.

Referenced by asyncQueueFillWarning().

◆ QUEUE_HEAD

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

Definition at line 315 of file async.c.

Referenced by asyncQueueAdvance(), asyncQueueIsFull(), asyncQueuePageDiff(), and asyncQueueUsage().

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
 
)    ((x).page == (y).page && (x).offset == (y).offset)

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489

Definition at line 212 of file async.c.

Referenced by Exec_ListenPreCommit().

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489

Definition at line 206 of file async.c.

Referenced by asyncQueueAdvanceTail(), and asyncQueueFillWarning().

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

◆ QUEUE_POS_PAGE

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 180 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 196 of file async.c.

Referenced by asyncQueueAdvance(), and AsyncShmemInit().

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 334 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2321 of file async.c.

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MemSet, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), and notification_match().

Referenced by Async_Notify(), and AtSubCommit_Notify().

2322 {
2324 
2325  /* Create the hash table if it's time to */
2327  pendingNotifies->hashtab == NULL)
2328  {
2329  HASHCTL hash_ctl;
2330  ListCell *l;
2331 
2332  /* Create the hash table */
2333  MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2334  hash_ctl.keysize = sizeof(Notification *);
2335  hash_ctl.entrysize = sizeof(NotificationHash);
2336  hash_ctl.hash = notification_hash;
2337  hash_ctl.match = notification_match;
2338  hash_ctl.hcxt = CurTransactionContext;
2340  hash_create("Pending Notifies",
2341  256L,
2342  &hash_ctl,
2344 
2345  /* Insert all the already-existing events */
2346  foreach(l, pendingNotifies->events)
2347  {
2348  Notification *oldn = (Notification *) lfirst(l);
2349  NotificationHash *hentry;
2350  bool found;
2351 
2353  &oldn,
2354  HASH_ENTER,
2355  &found);
2356  Assert(!found);
2357  hentry->event = oldn;
2358  }
2359  }
2360 
2361  /* Add new event to the list, in order */
2363 
2364  /* Add event to the hash table if needed */
2365  if (pendingNotifies->hashtab != NULL)
2366  {
2367  NotificationHash *hentry;
2368  bool found;
2369 
2371  &n,
2372  HASH_ENTER,
2373  &found);
2374  Assert(!found);
2375  hentry->event = n;
2376  }
2377 }
#define NIL
Definition: pg_list.h:65
struct NotificationHash NotificationHash
List * events
Definition: async.c:394
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define MemSet(start, val, len)
Definition: c.h:962
Notification * event
Definition: async.c:403
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static NotificationList * pendingNotifies
Definition: async.c:406
List * lappend(List *list, void *datum)
Definition: list.c:322
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:399
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
HashCompareFunc match
Definition: hsearch.h:75
HTAB * hashtab
Definition: async.c:395
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2399
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2385
#define HASH_COMPARE
Definition: hsearch.h:90
static int list_length(const List *l)
Definition: pg_list.h:169
HashValueFunc hash
Definition: hsearch.h:74
#define HASH_FUNCTION
Definition: hsearch.h:89

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 762 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

763 {
764  if (Trace_notify)
765  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
766 
767  queue_listen(LISTEN_LISTEN, channel);
768 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 615 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

616 {
617  int my_level = GetCurrentTransactionNestLevel();
618  size_t channel_len;
619  size_t payload_len;
620  Notification *n;
621  MemoryContext oldcontext;
622 
623  if (IsParallelWorker())
624  elog(ERROR, "cannot send notifications from a parallel worker");
625 
626  if (Trace_notify)
627  elog(DEBUG1, "Async_Notify(%s)", channel);
628 
629  channel_len = channel ? strlen(channel) : 0;
630  payload_len = payload ? strlen(payload) : 0;
631 
632  /* a channel name must be specified */
633  if (channel_len == 0)
634  ereport(ERROR,
635  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
636  errmsg("channel name cannot be empty")));
637 
638  /* enforce length limits */
639  if (channel_len >= NAMEDATALEN)
640  ereport(ERROR,
641  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
642  errmsg("channel name too long")));
643 
644  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
645  ereport(ERROR,
646  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
647  errmsg("payload string too long")));
648 
649  /*
650  * We must construct the Notification entry, even if we end up not using
651  * it, in order to compare it cheaply to existing list entries.
652  *
653  * The notification list needs to live until end of transaction, so store
654  * it in the transaction context.
655  */
657 
658  n = (Notification *) palloc(offsetof(Notification, data) +
659  channel_len + payload_len + 2);
660  n->channel_len = channel_len;
661  n->payload_len = payload_len;
662  strcpy(n->data, channel);
663  if (payload)
664  strcpy(n->data + channel_len + 1, payload);
665  else
666  n->data[channel_len + 1] = '\0';
667 
668  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
669  {
670  NotificationList *notifies;
671 
672  /*
673  * First notify event in current (sub)xact. Note that we allocate the
674  * NotificationList in TopTransactionContext; the nestingLevel might
675  * get changed later by AtSubCommit_Notify.
676  */
677  notifies = (NotificationList *)
679  sizeof(NotificationList));
680  notifies->nestingLevel = my_level;
681  notifies->events = list_make1(n);
682  /* We certainly don't need a hashtable yet */
683  notifies->hashtab = NULL;
684  notifies->upper = pendingNotifies;
685  pendingNotifies = notifies;
686  }
687  else
688  {
689  /* Now check for duplicates */
691  {
692  /* It's a dup, so forget it */
693  pfree(n);
694  MemoryContextSwitchTo(oldcontext);
695  return;
696  }
697 
698  /* Append more events to existing list */
700  }
701 
702  MemoryContextSwitchTo(oldcontext);
703 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:394
MemoryContext TopTransactionContext
Definition: mcxt.c:49
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:608
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2280
int nestingLevel
Definition: async.c:393
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2321
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:396
#define ereport(elevel, rest)
Definition: elog.h:141
#define IsParallelWorker()
Definition: parallel.h:60
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
HTAB * hashtab
Definition: async.c:395
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386
bool Trace_notify
Definition: async.c:430
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:228
#define offsetof(type, field)
Definition: c.h:662

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 776 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

777 {
778  if (Trace_notify)
779  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
780 
781  /* If we couldn't possibly be listening, no need to queue anything */
782  if (pendingActions == NULL && !unlistenExitRegistered)
783  return;
784 
785  queue_listen(LISTEN_UNLISTEN, channel);
786 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
static ActionList * pendingActions
Definition: async.c:354
static bool unlistenExitRegistered
Definition: async.c:418
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 794 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

795 {
796  if (Trace_notify)
797  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
798 
799  /* If we couldn't possibly be listening, no need to queue anything */
800  if (pendingActions == NULL && !unlistenExitRegistered)
801  return;
802 
804 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
static ActionList * pendingActions
Definition: async.c:354
static bool unlistenExitRegistered
Definition: async.c:418
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 847 of file async.c.

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

848 {
851 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1190
static void asyncQueueUnregister(void)
Definition: async.c:1306

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2280 of file async.c.

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, and Notification::payload_len.

Referenced by Async_Notify(), and AtSubCommit_Notify().

2281 {
2282  if (pendingNotifies == NULL)
2283  return false;
2284 
2285  if (pendingNotifies->hashtab != NULL)
2286  {
2287  /* Use the hash table to probe for a match */
2289  &n,
2290  HASH_FIND,
2291  NULL))
2292  return true;
2293  }
2294  else
2295  {
2296  /* Must scan the event list */
2297  ListCell *l;
2298 
2299  foreach(l, pendingNotifies->events)
2300  {
2301  Notification *oldn = (Notification *) lfirst(l);
2302 
2303  if (n->channel_len == oldn->channel_len &&
2304  n->payload_len == oldn->payload_len &&
2305  memcmp(n->data, oldn->data,
2306  n->channel_len + n->payload_len + 2) == 0)
2307  return true;
2308  }
2309  }
2310 
2311  return false;
2312 }
List * events
Definition: async.c:394
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static NotificationList * pendingNotifies
Definition: async.c:406
HTAB * hashtab
Definition: async.c:395
#define lfirst(lc)
Definition: pg_list.h:190
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1449 of file async.c.

References AsyncCtl, asyncQueueAdvance(), asyncQueueNotificationToEntry(), backendTryAdvanceTail, AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), and SimpleLruZeroPage().

Referenced by PreCommit_Notify().

1450 {
1451  AsyncQueueEntry qe;
1452  QueuePosition queue_head;
1453  int pageno;
1454  int offset;
1455  int slotno;
1456 
1457  /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
1458  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
1459 
1460  /*
1461  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1462  * memory upon exiting. The reason for this is that if we have to advance
1463  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1464  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1465  * subsequent insertions would try to put entries into a page that slru.c
1466  * thinks doesn't exist yet.) So, use a local position variable. Note
1467  * that if we do fail, any already-inserted queue entries are forgotten;
1468  * this is okay, since they'd be useless anyway after our transaction
1469  * rolls back.
1470  */
1471  queue_head = QUEUE_HEAD;
1472 
1473  /* Fetch the current page */
1474  pageno = QUEUE_POS_PAGE(queue_head);
1475  slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
1476  /* Note we mark the page dirty before writing in it */
1477  AsyncCtl->shared->page_dirty[slotno] = true;
1478 
1479  while (nextNotify != NULL)
1480  {
1481  Notification *n = (Notification *) lfirst(nextNotify);
1482 
1483  /* Construct a valid queue entry in local variable qe */
1485 
1486  offset = QUEUE_POS_OFFSET(queue_head);
1487 
1488  /* Check whether the entry really fits on the current page */
1489  if (offset + qe.length <= QUEUE_PAGESIZE)
1490  {
1491  /* OK, so advance nextNotify past this item */
1492  nextNotify = lnext(pendingNotifies->events, nextNotify);
1493  }
1494  else
1495  {
1496  /*
1497  * Write a dummy entry to fill up the page. Actually readers will
1498  * only check dboid and since it won't match any reader's database
1499  * OID, they will ignore this entry and move on.
1500  */
1501  qe.length = QUEUE_PAGESIZE - offset;
1502  qe.dboid = InvalidOid;
1503  qe.data[0] = '\0'; /* empty channel */
1504  qe.data[1] = '\0'; /* empty payload */
1505  }
1506 
1507  /* Now copy qe into the shared buffer page */
1508  memcpy(AsyncCtl->shared->page_buffer[slotno] + offset,
1509  &qe,
1510  qe.length);
1511 
1512  /* Advance queue_head appropriately, and detect if page is full */
1513  if (asyncQueueAdvance(&(queue_head), qe.length))
1514  {
1515  /*
1516  * Page is full, so we're done here, but first fill the next page
1517  * with zeroes. The reason to do this is to ensure that slru.c's
1518  * idea of the head page is always the same as ours, which avoids
1519  * boundary problems in SimpleLruTruncate. The test in
1520  * asyncQueueIsFull() ensured that there is room to create this
1521  * page without overrunning the queue.
1522  */
1523  slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
1524 
1525  /*
1526  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1527  * set flag to remember that we should try to advance the tail
1528  * pointer (we don't want to actually do that right here).
1529  */
1530  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1531  backendTryAdvanceTail = true;
1532 
1533  /* And exit the loop */
1534  break;
1535  }
1536  }
1537 
1538  /* Success, so update the global QUEUE_HEAD */
1539  QUEUE_HEAD = queue_head;
1540 
1541  LWLockRelease(AsyncCtlLock);
1542 
1543  return nextNotify;
1544 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
List * events
Definition: async.c:394
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:321
#define AsyncCtl
Definition: async.c:294
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1413
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1378
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static NotificationList * pendingNotifies
Definition: async.c:406
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:374
static bool backendTryAdvanceTail
Definition: async.c:427
#define QUEUE_PAGESIZE
Definition: async.c:295
#define InvalidTransactionId
Definition: transam.h:31
#define QUEUE_CLEANUP_DELAY
Definition: async.c:228
#define InvalidOid
Definition: postgres_ext.h:36
#define lfirst(lc)
Definition: pg_list.h:190
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:262
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1378 of file async.c.

References Assert, AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

1379 {
1380  int pageno = QUEUE_POS_PAGE(*position);
1381  int offset = QUEUE_POS_OFFSET(*position);
1382  bool pageJump = false;
1383 
1384  /*
1385  * Move to the next writing position: First jump over what we have just
1386  * written or read.
1387  */
1388  offset += entryLength;
1389  Assert(offset <= QUEUE_PAGESIZE);
1390 
1391  /*
1392  * In a second step check if another entry can possibly be written to the
1393  * page. If so, stay here, we have reached the next position. If not, then
1394  * we need to move on to the next page.
1395  */
1397  {
1398  pageno++;
1399  if (pageno > QUEUE_MAX_PAGE)
1400  pageno = 0; /* wrap around */
1401  offset = 0;
1402  pageJump = true;
1403  }
1404 
1405  SET_QUEUE_POS(*position, pageno, offset);
1406  return pageJump;
1407 }
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
#define AsyncQueueEntryEmptySize
Definition: async.c:182
#define QUEUE_PAGESIZE
Definition: async.c:295
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
#define QUEUEALIGN(len)
Definition: async.c:180
#define Assert(condition)
Definition: c.h:739
#define QUEUE_MAX_PAGE
Definition: async.c:315
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2166 of file async.c.

References Assert, AsyncCtl, asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by pg_notification_queue_usage(), and ProcessCompletedNotifies().

2167 {
2168  QueuePosition min;
2169  int oldtailpage;
2170  int newtailpage;
2171  int boundary;
2172 
2173  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
2174  min = QUEUE_HEAD;
2176  {
2178  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2179  }
2180  oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
2181  QUEUE_TAIL = min;
2182  LWLockRelease(AsyncQueueLock);
2183 
2184  /*
2185  * We can truncate something if the global tail advanced across an SLRU
2186  * segment boundary.
2187  *
2188  * XXX it might be better to truncate only once every several segments, to
2189  * reduce the number of directory scans.
2190  */
2191  newtailpage = QUEUE_POS_PAGE(min);
2192  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2193  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2194  {
2195  /*
2196  * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
2197  * the lock again.
2198  */
2199  SimpleLruTruncate(AsyncCtl, newtailpage);
2200  }
2201 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1184
#define AsyncCtl
Definition: async.c:294
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:739
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_MIN(x, y)
Definition: async.c:206
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1600 of file async.c.

References Assert, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

1601 {
1602  double fillDegree;
1603  TimestampTz t;
1604 
1605  fillDegree = asyncQueueUsage();
1606  if (fillDegree < 0.5)
1607  return;
1608 
1609  t = GetCurrentTimestamp();
1610 
1613  {
1614  QueuePosition min = QUEUE_HEAD;
1615  int32 minPid = InvalidPid;
1616 
1618  {
1620  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1621  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1622  minPid = QUEUE_BACKEND_PID(i);
1623  }
1624 
1625  ereport(WARNING,
1626  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1627  (minPid != InvalidPid ?
1628  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1629  : 0),
1630  (minPid != InvalidPid ?
1631  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1632  : 0)));
1633 
1635  }
1636 }
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
static double asyncQueueUsage(void)
Definition: async.c:1571
#define QUEUE_HEAD
Definition: async.c:281
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
signed int int32
Definition: c.h:347
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
int errdetail(const char *fmt,...)
Definition: elog.c:955
static AsyncQueueControl * asyncQueueControl
Definition: async.c:279
#define ereport(elevel, rest)
Definition: elog.h:141
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:296
#define WARNING
Definition: elog.h:40
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:739
int errmsg(const char *fmt,...)
Definition: elog.c:822
int i
TimestampTz lastQueueFillWarn
Definition: async.c:274
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MIN(x, y)
Definition: async.c:206
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1347 of file async.c.

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_TAIL, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

1348 {
1349  int nexthead;
1350  int boundary;
1351 
1352  /*
1353  * The queue is full if creating a new head page would create a page that
1354  * logically precedes the current global tail pointer, ie, the head
1355  * pointer would wrap around compared to the tail. We cannot create such
1356  * a head page for fear of confusing slru.c. For safety we round the tail
1357  * pointer back to a segment boundary (compare the truncation logic in
1358  * asyncQueueAdvanceTail).
1359  *
1360  * Note that this test is *not* dependent on how much space there is on
1361  * the current head page. This is necessary because asyncQueueAddEntries
1362  * might try to create the next head page in any case.
1363  */
1364  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1365  if (nexthead > QUEUE_MAX_PAGE)
1366  nexthead = 0; /* wrap around */
1367  boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
1368  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1369  return asyncQueuePagePrecedes(nexthead, boundary);
1370 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_HEAD
Definition: async.c:281
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_MAX_PAGE
Definition: async.c:315
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1413 of file async.c.

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, AsyncQueueEntry::data, Notification::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

1414 {
1415  size_t channellen = n->channel_len;
1416  size_t payloadlen = n->payload_len;
1417  int entryLength;
1418 
1419  Assert(channellen < NAMEDATALEN);
1420  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1421 
1422  /* The terminators are already included in AsyncQueueEntryEmptySize */
1423  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1424  entryLength = QUEUEALIGN(entryLength);
1425  qe->length = entryLength;
1426  qe->dboid = MyDatabaseId;
1427  qe->xid = GetCurrentTransactionId();
1428  qe->srcPid = MyProcPid;
1429  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1430 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
int MyProcPid
Definition: globals.c:40
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
#define NAMEDATALEN
#define AsyncQueueEntryEmptySize
Definition: async.c:182
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:422
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
#define QUEUEALIGN(len)
Definition: async.c:180
Oid MyDatabaseId
Definition: globals.c:85
#define Assert(condition)
Definition: c.h:739
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175

◆ asyncQueuePageDiff()

static int asyncQueuePageDiff ( int  p,
int  q 
)
static

Definition at line 468 of file async.c.

References Assert, and QUEUE_MAX_PAGE.

Referenced by asyncQueuePagePrecedes(), and SignalBackends().

469 {
470  int diff;
471 
472  /*
473  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
474  * in the range 0..QUEUE_MAX_PAGE.
475  */
476  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
477  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
478 
479  diff = p - q;
480  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
481  diff -= QUEUE_MAX_PAGE + 1;
482  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
483  diff += QUEUE_MAX_PAGE + 1;
484  return diff;
485 }
#define Assert(condition)
Definition: c.h:739
#define QUEUE_MAX_PAGE
Definition: async.c:315

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 489 of file async.c.

References asyncQueuePageDiff().

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

490 {
491  return asyncQueuePageDiff(p, q) < 0;
492 }
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:468

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2077 of file async.c.

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

2081 {
2082  bool reachedStop = false;
2083  bool reachedEndOfPage;
2084  AsyncQueueEntry *qe;
2085 
2086  do
2087  {
2088  QueuePosition thisentry = *current;
2089 
2090  if (QUEUE_POS_EQUAL(thisentry, stop))
2091  break;
2092 
2093  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2094 
2095  /*
2096  * Advance *current over this message, possibly to the next page. As
2097  * noted in the comments for asyncQueueReadAllNotifications, we must
2098  * do this before possibly failing while processing the message.
2099  */
2100  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2101 
2102  /* Ignore messages destined for other databases */
2103  if (qe->dboid == MyDatabaseId)
2104  {
2105  if (XidInMVCCSnapshot(qe->xid, snapshot))
2106  {
2107  /*
2108  * The source transaction is still in progress, so we can't
2109  * process this message yet. Break out of the loop, but first
2110  * back up *current so we will reprocess the message next
2111  * time. (Note: it is unlikely but not impossible for
2112  * TransactionIdDidCommit to fail, so we can't really avoid
2113  * this advance-then-back-up behavior when dealing with an
2114  * uncommitted message.)
2115  *
2116  * Note that we must test XidInMVCCSnapshot before we test
2117  * TransactionIdDidCommit, else we might return a message from
2118  * a transaction that is not yet visible to snapshots; compare
2119  * the comments at the head of heapam_visibility.c.
2120  *
2121  * Also, while our own xact won't be listed in the snapshot,
2122  * we need not check for TransactionIdIsCurrentTransactionId
2123  * because our transaction cannot (yet) have queued any
2124  * messages.
2125  */
2126  *current = thisentry;
2127  reachedStop = true;
2128  break;
2129  }
2130  else if (TransactionIdDidCommit(qe->xid))
2131  {
2132  /* qe->data is the null-terminated channel name */
2133  char *channel = qe->data;
2134 
2135  if (IsListeningOn(channel))
2136  {
2137  /* payload follows channel name */
2138  char *payload = qe->data + strlen(channel) + 1;
2139 
2140  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2141  }
2142  }
2143  else
2144  {
2145  /*
2146  * The source transaction aborted or crashed, so we just
2147  * ignore its notifications.
2148  */
2149  }
2150  }
2151 
2152  /* Loop back if we're not at end of page */
2153  } while (!reachedEndOfPage);
2154 
2155  if (QUEUE_POS_EQUAL(*current, stop))
2156  reachedStop = true;
2157 
2158  return reachedStop;
2159 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2241
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1378
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
static bool IsListeningOn(const char *channel)
Definition: async.c:1287
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2255

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1911 of file async.c.

References Assert, AsyncCtl, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), ProcessCompletedNotifies(), and ProcessIncomingNotify().

1912 {
1913  volatile QueuePosition pos;
1914  QueuePosition oldpos;
1915  QueuePosition head;
1916  Snapshot snapshot;
1917 
1918  /* page_buffer must be adequately aligned, so use a union */
1919  union
1920  {
1921  char buf[QUEUE_PAGESIZE];
1922  AsyncQueueEntry align;
1923  } page_buffer;
1924 
1925  /* Fetch current state */
1926  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1927  /* Assert checks that we have a valid state entry */
1929  pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
1930  head = QUEUE_HEAD;
1931  LWLockRelease(AsyncQueueLock);
1932 
1933  if (QUEUE_POS_EQUAL(pos, head))
1934  {
1935  /* Nothing to do, we have read all notifications already. */
1936  return;
1937  }
1938 
1939  /*----------
1940  * Get snapshot we'll use to decide which xacts are still in progress.
1941  * This is trickier than it might seem, because of race conditions.
1942  * Consider the following example:
1943  *
1944  * Backend 1: Backend 2:
1945  *
1946  * transaction starts
1947  * UPDATE foo SET ...;
1948  * NOTIFY foo;
1949  * commit starts
1950  * queue the notify message
1951  * transaction starts
1952  * LISTEN foo; -- first LISTEN in session
1953  * SELECT * FROM foo WHERE ...;
1954  * commit to clog
1955  * commit starts
1956  * add backend 2 to array of listeners
1957  * advance to queue head (this code)
1958  * commit to clog
1959  *
1960  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1961  * wasn't committed yet. Ideally we'd ensure that client 2 would
1962  * eventually get transaction 1's notify message, but there's no way
1963  * to do that; until we're in the listener array, there's no guarantee
1964  * that the notify message doesn't get removed from the queue.
1965  *
1966  * Therefore the coding technique transaction 2 is using is unsafe:
1967  * applications must commit a LISTEN before inspecting database state,
1968  * if they want to ensure they will see notifications about subsequent
1969  * changes to that state.
1970  *
1971  * What we do guarantee is that we'll see all notifications from
1972  * transactions committing after the snapshot we take here.
1973  * Exec_ListenPreCommit has already added us to the listener array,
1974  * so no not-yet-committed messages can be removed from the queue
1975  * before we see them.
1976  *----------
1977  */
1978  snapshot = RegisterSnapshot(GetLatestSnapshot());
1979 
1980  /*
1981  * It is possible that we fail while trying to send a message to our
1982  * frontend (for example, because of encoding conversion failure). If
1983  * that happens it is critical that we not try to send the same message
1984  * over and over again. Therefore, we place a PG_TRY block here that will
1985  * forcibly advance our queue position before we lose control to an error.
1986  * (We could alternatively retake AsyncQueueLock and move the position
1987  * before handling each individual message, but that seems like too much
1988  * lock traffic.)
1989  */
1990  PG_TRY();
1991  {
1992  bool reachedStop;
1993 
1994  do
1995  {
1996  int curpage = QUEUE_POS_PAGE(pos);
1997  int curoffset = QUEUE_POS_OFFSET(pos);
1998  int slotno;
1999  int copysize;
2000 
2001  /*
2002  * We copy the data from SLRU into a local buffer, so as to avoid
2003  * holding the AsyncCtlLock while we are examining the entries and
2004  * possibly transmitting them to our frontend. Copy only the part
2005  * of the page we will actually inspect.
2006  */
2007  slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
2009  if (curpage == QUEUE_POS_PAGE(head))
2010  {
2011  /* we only want to read as far as head */
2012  copysize = QUEUE_POS_OFFSET(head) - curoffset;
2013  if (copysize < 0)
2014  copysize = 0; /* just for safety */
2015  }
2016  else
2017  {
2018  /* fetch all the rest of the page */
2019  copysize = QUEUE_PAGESIZE - curoffset;
2020  }
2021  memcpy(page_buffer.buf + curoffset,
2022  AsyncCtl->shared->page_buffer[slotno] + curoffset,
2023  copysize);
2024  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2025  LWLockRelease(AsyncCtlLock);
2026 
2027  /*
2028  * Process messages up to the stop position, end of page, or an
2029  * uncommitted message.
2030  *
2031  * Our stop position is what we found to be the head's position
2032  * when we entered this function. It might have changed already.
2033  * But if it has, we will receive (or have already received and
2034  * queued) another signal and come here again.
2035  *
2036  * We are not holding AsyncQueueLock here! The queue can only
2037  * extend beyond the head pointer (see above) and we leave our
2038  * backend's pointer where it is so nobody will truncate or
2039  * rewrite pages under us. Especially we don't want to hold a lock
2040  * while sending the notifications to the frontend.
2041  */
2042  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2043  page_buffer.buf,
2044  snapshot);
2045  } while (!reachedStop);
2046  }
2047  PG_FINALLY();
2048  {
2049  /* Update shared state */
2050  LWLockAcquire(AsyncQueueLock, LW_SHARED);
2052  LWLockRelease(AsyncQueueLock);
2053  }
2054  PG_END_TRY();
2055 
2056  /* Done with snapshot */
2057  UnregisterSnapshot(snapshot);
2058 }
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:865
#define AsyncCtl
Definition: async.c:294
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static char * buf
Definition: pg_test_fsync.c:67
#define QUEUE_PAGESIZE
Definition: async.c:295
#define InvalidTransactionId
Definition: transam.h:31
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:907
#define PG_FINALLY()
Definition: elog.h:339
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:466
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2077
#define Assert(condition)
Definition: c.h:739
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:381
#define PG_TRY()
Definition: elog.h:322
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define PG_END_TRY()
Definition: elog.h:347
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1306 of file async.c.

References amRegisteredListener, Assert, i, InvalidBackendId, InvalidOid, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

1307 {
1308  Assert(listenChannels == NIL); /* else caller error */
1309 
1310  if (!amRegisteredListener) /* nothing to do */
1311  return;
1312 
1313  /*
1314  * Need exclusive lock here to manipulate list links.
1315  */
1316  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1317  /* Mark our entry as invalid */
1320  /* and remove it from the list */
1323  else
1324  {
1326  {
1328  {
1330  break;
1331  }
1332  }
1333  }
1335  LWLockRelease(AsyncQueueLock);
1336 
1337  /* mark ourselves as no longer listed in the global array */
1338  amRegisteredListener = false;
1339 }
#define NIL
Definition: pg_list.h:65
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
static List * listenChannels
Definition: async.c:322
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:421
int BackendId
Definition: backendid.h:21
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:739
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1571 of file async.c.

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

1572 {
1573  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1574  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1575  int occupied;
1576 
1577  occupied = headPage - tailPage;
1578 
1579  if (occupied == 0)
1580  return (double) 0; /* fast exit for common case */
1581 
1582  if (occupied < 0)
1583  {
1584  /* head has wrapped around, tail not yet */
1585  occupied += QUEUE_MAX_PAGE + 1;
1586  }
1587 
1588  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1589 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_HEAD
Definition: async.c:281
#define QUEUE_MAX_PAGE
Definition: async.c:315
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 515 of file async.c.

References add_size(), AsyncCtl, asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWTRANCHE_ASYNC_BUFFERS, MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

516 {
517  bool found;
518  int slotno;
519  Size size;
520 
521  /*
522  * Create or attach to the AsyncQueueControl structure.
523  *
524  * The used entries in the backend[] array run from 1 to MaxBackends; the
525  * zero'th entry is unused but must be allocated.
526  */
527  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
528  size = add_size(size, offsetof(AsyncQueueControl, backend));
529 
531  ShmemInitStruct("Async Queue Control", size, &found);
532 
533  if (!found)
534  {
535  /* First time through, so initialize it */
536  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
537  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
540  /* zero'th entry won't be used, but let's initialize it anyway */
541  for (int i = 0; i <= MaxBackends; i++)
542  {
547  }
548  }
549 
550  /*
551  * Set up SLRU management of the pg_notify data.
552  */
553  AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
555  AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
556  /* Override default assumption that writes should be fsync'd */
557  AsyncCtl->do_fsync = false;
558 
559  if (!found)
560  {
561  /*
562  * During start or reboot, clean out the pg_notify directory.
563  */
565 
566  /* Now initialize page zero to empty */
567  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
569  /* This write is just to verify that pg_notify/ is writable */
570  SimpleLruWritePage(AsyncCtl, slotno);
571  LWLockRelease(AsyncCtlLock);
572  }
573 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1369
#define AsyncCtl
Definition: async.c:294
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:135
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:577
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489
static AsyncQueueControl * asyncQueueControl
Definition: async.c:279
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1392
size_t Size
Definition: c.h:467
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
TimestampTz lastQueueFillWarn
Definition: async.c:274
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:262
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define offsetof(type, field)
Definition: c.h:662
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:164

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 498 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

499 {
500  Size size;
501 
502  /* This had better match AsyncShmemInit */
503  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
504  size = add_size(size, offsetof(AsyncQueueControl, backend));
505 
507 
508  return size;
509 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:144
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:135
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:467
#define offsetof(type, field)
Definition: c.h:662

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1736 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1737 {
1738  /*
1739  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1740  * we have registered as a listener but have not made any entry in
1741  * listenChannels. In that case, deregister again.
1742  */
1745 
1746  /* And clean up */
1748 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:322
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2415
static void asyncQueueUnregister(void)
Definition: async.c:1306
static bool amRegisteredListener
Definition: async.c:421

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 987 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

988 {
989  ListCell *p;
990 
991  /*
992  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
993  * return as soon as possible
994  */
996  return;
997 
998  if (Trace_notify)
999  elog(DEBUG1, "AtCommit_Notify");
1000 
1001  /* Perform any pending listen/unlisten actions */
1002  if (pendingActions != NULL)
1003  {
1004  foreach(p, pendingActions->actions)
1005  {
1006  ListenAction *actrec = (ListenAction *) lfirst(p);
1007 
1008  switch (actrec->action)
1009  {
1010  case LISTEN_LISTEN:
1011  Exec_ListenCommit(actrec->channel);
1012  break;
1013  case LISTEN_UNLISTEN:
1014  Exec_UnlistenCommit(actrec->channel);
1015  break;
1016  case LISTEN_UNLISTEN_ALL:
1018  break;
1019  }
1020  }
1021  }
1022 
1023  /* If no longer listening to anything, get out of listener array */
1026 
1027  /* And clean up */
1029 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:354
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1190
static List * listenChannels
Definition: async.c:322
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2415
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1159
static NotificationList * pendingNotifies
Definition: async.c:406
List * actions
Definition: async.c:350
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1132
static void asyncQueueUnregister(void)
Definition: async.c:1306
static bool amRegisteredListener
Definition: async.c:421
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:344
ListenActionKind action
Definition: async.c:343

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 860 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

861 {
862  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
864  ereport(ERROR,
865  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
866  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
867 }
static ActionList * pendingActions
Definition: async.c:354
int errcode(int sqlerrcode)
Definition: elog.c:608
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1826 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1827 {
1828  int my_level = GetCurrentTransactionNestLevel();
1829 
1830  /*
1831  * All we have to do is pop the stack --- the actions/notifies made in
1832  * this subxact are no longer interesting, and the space will be freed
1833  * when CurTransactionContext is recycled. We still have to free the
1834  * ActionList and NotificationList objects themselves, though, because
1835  * those are allocated in TopTransactionContext.
1836  *
1837  * Note that there might be no entries at all, or no entries for the
1838  * current subtransaction level, either because none were ever created, or
1839  * because we reentered this routine due to trouble during subxact abort.
1840  */
1841  while (pendingActions != NULL &&
1842  pendingActions->nestingLevel >= my_level)
1843  {
1844  ActionList *childPendingActions = pendingActions;
1845 
1847  pfree(childPendingActions);
1848  }
1849 
1850  while (pendingNotifies != NULL &&
1851  pendingNotifies->nestingLevel >= my_level)
1852  {
1853  NotificationList *childPendingNotifies = pendingNotifies;
1854 
1856  pfree(childPendingNotifies);
1857  }
1858 }
static ActionList * pendingActions
Definition: async.c:354
int nestingLevel
Definition: async.c:393
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
struct NotificationList * upper
Definition: async.c:396
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1756 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1757 {
1758  int my_level = GetCurrentTransactionNestLevel();
1759 
1760  /* If there are actions at our nesting level, we must reparent them. */
1761  if (pendingActions != NULL &&
1762  pendingActions->nestingLevel >= my_level)
1763  {
1764  if (pendingActions->upper == NULL ||
1765  pendingActions->upper->nestingLevel < my_level - 1)
1766  {
1767  /* nothing to merge; give the whole thing to the parent */
1769  }
1770  else
1771  {
1772  ActionList *childPendingActions = pendingActions;
1773 
1775 
1776  /*
1777  * Mustn't try to eliminate duplicates here --- see queue_listen()
1778  */
1781  childPendingActions->actions);
1782  pfree(childPendingActions);
1783  }
1784  }
1785 
1786  /* If there are notifies at our nesting level, we must reparent them. */
1787  if (pendingNotifies != NULL &&
1788  pendingNotifies->nestingLevel >= my_level)
1789  {
1790  Assert(pendingNotifies->nestingLevel == my_level);
1791 
1792  if (pendingNotifies->upper == NULL ||
1793  pendingNotifies->upper->nestingLevel < my_level - 1)
1794  {
1795  /* nothing to merge; give the whole thing to the parent */
1797  }
1798  else
1799  {
1800  /*
1801  * Formerly, we didn't bother to eliminate duplicates here, but
1802  * now we must, else we fall foul of "Assert(!found)", either here
1803  * or during a later attempt to build the parent-level hashtable.
1804  */
1805  NotificationList *childPendingNotifies = pendingNotifies;
1806  ListCell *l;
1807 
1809  /* Insert all the subxact's events into parent, except for dups */
1810  foreach(l, childPendingNotifies->events)
1811  {
1812  Notification *childn = (Notification *) lfirst(l);
1813 
1814  if (!AsyncExistsPendingNotify(childn))
1815  AddEventToPendingNotifies(childn);
1816  }
1817  pfree(childPendingNotifies);
1818  }
1819  }
1820 }
List * events
Definition: async.c:394
static ActionList * pendingActions
Definition: async.c:354
List * list_concat(List *list1, const List *list2)
Definition: list.c:516
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2280
int nestingLevel
Definition: async.c:393
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
List * actions
Definition: async.c:350
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2321
struct NotificationList * upper
Definition: async.c:396
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2415 of file async.c.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

2416 {
2417  /*
2418  * Everything's allocated in either TopTransactionContext or the context
2419  * for the subtransaction to which it corresponds. So, there's nothing to
2420  * do here except reset the pointers; the space will be reclaimed when the
2421  * contexts are deleted.
2422  */
2423  pendingActions = NULL;
2424  pendingNotifies = NULL;
2425 }
static ActionList * pendingActions
Definition: async.c:354
static NotificationList * pendingNotifies
Definition: async.c:406

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1132 of file async.c.

References IsListeningOn(), lappend(), MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

1133 {
1134  MemoryContext oldcontext;
1135 
1136  /* Do nothing if we are already listening on this channel */
1137  if (IsListeningOn(channel))
1138  return;
1139 
1140  /*
1141  * Add the new channel name to listenChannels.
1142  *
1143  * XXX It is theoretically possible to get an out-of-memory failure here,
1144  * which would be bad because we already committed. For the moment it
1145  * doesn't seem worth trying to guard against that, but maybe improve this
1146  * later.
1147  */
1150  MemoryContextSwitchTo(oldcontext);
1151 }
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:322
static bool IsListeningOn(const char *channel)
Definition: async.c:1287
MemoryContext TopMemoryContext
Definition: mcxt.c:44
List * lappend(List *list, void *datum)
Definition: list.c:322

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1037 of file async.c.

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

1038 {
1039  QueuePosition head;
1040  QueuePosition max;
1041  BackendId prevListener;
1042 
1043  /*
1044  * Nothing to do if we are already listening to something, nor if we
1045  * already ran this routine in this transaction.
1046  */
1048  return;
1049 
1050  if (Trace_notify)
1051  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1052 
1053  /*
1054  * Before registering, make sure we will unlisten before dying. (Note:
1055  * this action does not get undone if we abort later.)
1056  */
1058  {
1060  unlistenExitRegistered = true;
1061  }
1062 
1063  /*
1064  * This is our first LISTEN, so establish our pointer.
1065  *
1066  * We set our pointer to the global tail pointer and then move it forward
1067  * over already-committed notifications. This ensures we cannot miss any
1068  * not-yet-committed notifications. We might get a few more but that
1069  * doesn't hurt.
1070  *
1071  * In some scenarios there might be a lot of committed notifications that
1072  * have not yet been pruned away (because some backend is being lazy about
1073  * reading them). To reduce our startup time, we can look at other
1074  * backends and adopt the maximum "pos" pointer of any backend that's in
1075  * our database; any notifications it's already advanced over are surely
1076  * committed and need not be re-examined by us. (We must consider only
1077  * backends connected to our DB, because others will not have bothered to
1078  * check committed-ness of notifications in our DB.)
1079  *
1080  * We need exclusive lock here so we can look at other backends' entries
1081  * and manipulate the list links.
1082  */
1083  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1084  head = QUEUE_HEAD;
1085  max = QUEUE_TAIL;
1086  prevListener = InvalidBackendId;
1088  {
1090  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1091  /* Also find last listening backend before this one */
1092  if (i < MyBackendId)
1093  prevListener = i;
1094  }
1098  /* Insert backend into list of listeners at correct position */
1099  if (prevListener > 0)
1100  {
1102  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1103  }
1104  else
1105  {
1108  }
1109  LWLockRelease(AsyncQueueLock);
1110 
1111  /* Now we are listed in the global array, so remember we're listening */
1112  amRegisteredListener = true;
1113 
1114  /*
1115  * Try to move our pointer forward as far as possible. This will skip
1116  * over already-committed notifications, which we want to do because they
1117  * might be quite stale. Note that we are not yet listening on anything,
1118  * so we won't deliver such notifications to our frontend. Also, although
1119  * our transaction might have executed NOTIFY, those message(s) aren't
1120  * queued yet so we won't skip them here.
1121  */
1122  if (!QUEUE_POS_EQUAL(max, head))
1124 }
#define QUEUE_TAIL
Definition: async.c:282
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
#define QUEUE_HEAD
Definition: async.c:281
static bool unlistenExitRegistered
Definition: async.c:418
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:847
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:421
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1911
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MAX(x, y)
Definition: async.c:212

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1190 of file async.c.

References DEBUG1, elog, list_free_deep(), MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

1191 {
1192  if (Trace_notify)
1193  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1194 
1196  listenChannels = NIL;
1197 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:322
void list_free_deep(List *list)
Definition: list.c:1391
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1159 of file async.c.

References DEBUG1, elog, foreach_delete_current, lfirst, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

1160 {
1161  ListCell *q;
1162 
1163  if (Trace_notify)
1164  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1165 
1166  foreach(q, listenChannels)
1167  {
1168  char *lchan = (char *) lfirst(q);
1169 
1170  if (strcmp(lchan, channel) == 0)
1171  {
1173  pfree(lchan);
1174  break;
1175  }
1176  }
1177 
1178  /*
1179  * We do not complain about unlistening something not being listened;
1180  * should we?
1181  */
1182 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:322
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:368
void pfree(void *pointer)
Definition: mcxt.c:1056
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1869 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1870 {
1871  /*
1872  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1873  * you do here.
1874  */
1875 
1876  /* signal that work needs to be done */
1877  notifyInterruptPending = true;
1878 
1879  /* make sure the event is processed in due course */
1880  SetLatch(MyLatch);
1881 }
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1287 of file async.c.

References lfirst.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

1288 {
1289  ListCell *p;
1290 
1291  foreach(p, listenChannels)
1292  {
1293  char *lchan = (char *) lfirst(p);
1294 
1295  if (strcmp(lchan, channel) == 0)
1296  return true;
1297  }
1298  return false;
1299 }
static List * listenChannels
Definition: async.c:322
#define lfirst(lc)
Definition: pg_list.h:190

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2385 of file async.c.

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32, hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2386 {
2387  const Notification *k = *(const Notification *const *) key;
2388 
2389  Assert(keysize == sizeof(Notification *));
2390  /* We don't bother to include the payload's trailing null in the hash */
2391  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2392  k->channel_len + k->payload_len + 1));
2393 }
#define DatumGetUInt32(X)
Definition: postgres.h:486
Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.c:148
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
#define Assert(condition)
Definition: c.h:739
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2399 of file async.c.

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2400 {
2401  const Notification *k1 = *(const Notification *const *) key1;
2402  const Notification *k2 = *(const Notification *const *) key2;
2403 
2404  Assert(keysize == sizeof(Notification *));
2405  if (k1->channel_len == k2->channel_len &&
2406  k1->payload_len == k2->payload_len &&
2407  memcmp(k1->data, k2->data,
2408  k1->channel_len + k1->payload_len + 2) == 0)
2409  return 0; /* equal */
2410  return 1; /* not equal */
2411 }
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
#define Assert(condition)
Definition: c.h:739
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2255 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2256 {
2258  {
2260 
2261  pq_beginmessage(&buf, 'A');
2262  pq_sendint32(&buf, srcPid);
2263  pq_sendstring(&buf, channel);
2265  pq_sendstring(&buf, payload);
2266  pq_endmessage(&buf);
2267 
2268  /*
2269  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2270  * happen at the end of the transaction, and for incoming notifies
2271  * ProcessIncomingNotify will do it after finding all the notifies.
2272  */
2273  }
2274  else
2275  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2276 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:228
CommandDest whereToSendOutput
Definition: postgres.c:90
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 814 of file async.c.

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

815 {
816  FuncCallContext *funcctx;
817 
818  /* stuff done only on the first call of the function */
819  if (SRF_IS_FIRSTCALL())
820  {
821  /* create a function context for cross-call persistence */
822  funcctx = SRF_FIRSTCALL_INIT();
823  }
824 
825  /* stuff done on every call of the function */
826  funcctx = SRF_PERCALL_SETUP();
827 
828  if (funcctx->call_cntr < list_length(listenChannels))
829  {
830  char *channel = (char *) list_nth(listenChannels,
831  funcctx->call_cntr);
832 
833  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
834  }
835 
836  SRF_RETURN_DONE(funcctx);
837 }
uint64 call_cntr
Definition: funcapi.h:65
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:282
static List * listenChannels
Definition: async.c:322
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:286
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:288
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
static int list_length(const List *l)
Definition: pg_list.h:169
#define CStringGetTextDatum(s)
Definition: builtins.h:83
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:306
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:284

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1551 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

1552 {
1553  double usage;
1554 
1555  /* Advance the queue tail so we don't report a too-large result */
1557 
1558  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1559  usage = asyncQueueUsage();
1560  LWLockRelease(AsyncQueueLock);
1561 
1562  PG_RETURN_FLOAT8(usage);
1563 }
static void usage(void)
Definition: pg_standby.c:591
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:356
static double asyncQueueUsage(void)
Definition: async.c:1571
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static void asyncQueueAdvanceTail(void)
Definition: async.c:2166
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 581 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

582 {
583  const char *channel;
584  const char *payload;
585 
586  if (PG_ARGISNULL(0))
587  channel = "";
588  else
589  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
590 
591  if (PG_ARGISNULL(1))
592  payload = "";
593  else
594  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
595 
596  /* For NOTIFY as a statement, this is checked in ProcessUtility */
598 
599  Async_Notify(channel, payload);
600 
601  PG_RETURN_VOID();
602 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:303
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:275
#define PG_RETURN_VOID()
Definition: fmgr.h:339
#define PG_ARGISNULL(n)
Definition: fmgr.h:204
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:615
char * text_to_cstring(const text *t)
Definition: varlena.c:204

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 885 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

886 {
887  ListCell *p;
888 
890  return; /* no relevant statements in this xact */
891 
892  if (Trace_notify)
893  elog(DEBUG1, "PreCommit_Notify");
894 
895  /* Preflight for any pending listen/unlisten actions */
896  if (pendingActions != NULL)
897  {
898  foreach(p, pendingActions->actions)
899  {
900  ListenAction *actrec = (ListenAction *) lfirst(p);
901 
902  switch (actrec->action)
903  {
904  case LISTEN_LISTEN:
906  break;
907  case LISTEN_UNLISTEN:
908  /* there is no Exec_UnlistenPreCommit() */
909  break;
910  case LISTEN_UNLISTEN_ALL:
911  /* there is no Exec_UnlistenAllPreCommit() */
912  break;
913  }
914  }
915  }
916 
917  /* Queue any pending notifies (must happen after the above) */
918  if (pendingNotifies)
919  {
920  ListCell *nextNotify;
921 
922  /*
923  * Make sure that we have an XID assigned to the current transaction.
924  * GetCurrentTransactionId is cheap if we already have an XID, but not
925  * so cheap if we don't, and we'd prefer not to do that work while
926  * holding AsyncQueueLock.
927  */
928  (void) GetCurrentTransactionId();
929 
930  /*
931  * Serialize writers by acquiring a special lock that we hold till
932  * after commit. This ensures that queue entries appear in commit
933  * order, and in particular that there are never uncommitted queue
934  * entries ahead of committed ones, so an uncommitted transaction
935  * can't block delivery of deliverable notifications.
936  *
937  * We use a heavyweight lock so that it'll automatically be released
938  * after either commit or abort. This also allows deadlocks to be
939  * detected, though really a deadlock shouldn't be possible here.
940  *
941  * The lock is on "database 0", which is pretty ugly but it doesn't
942  * seem worth inventing a special locktag category just for this.
943  * (Historical note: before PG 9.0, a similar lock on "database 0" was
944  * used by the flatfiles mechanism.)
945  */
946  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
948 
949  /* Now push the notifications into the queue */
951 
952  nextNotify = list_head(pendingNotifies->events);
953  while (nextNotify != NULL)
954  {
955  /*
956  * Add the pending notifications to the queue. We acquire and
957  * release AsyncQueueLock once per page, which might be overkill
958  * but it does allow readers to get in while we're doing this.
959  *
960  * A full queue is very uncommon and should really not happen,
961  * given that we have so much space available in the SLRU pages.
962  * Nevertheless we need to deal with this possibility. Note that
963  * when we get here we are in the process of committing our
964  * transaction, but we have not yet committed to clog, so at this
965  * point in time we can still roll the transaction back.
966  */
967  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
969  if (asyncQueueIsFull())
970  ereport(ERROR,
971  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
972  errmsg("too many notifications in the NOTIFY queue")));
973  nextNotify = asyncQueueAddEntries(nextNotify);
974  LWLockRelease(AsyncQueueLock);
975  }
976  }
977 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:394
static ActionList * pendingActions
Definition: async.c:354
int errcode(int sqlerrcode)
Definition: elog.c:608
static bool asyncQueueIsFull(void)
Definition: async.c:1347
static void asyncQueueFillWarning(void)
Definition: async.c:1600
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
static bool backendHasSentNotifications
Definition: async.c:424
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:422
List * actions
Definition: async.c:350
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
#define ereport(elevel, rest)
Definition: elog.h:141
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1449
#define lfirst(lc)
Definition: pg_list.h:190
static void Exec_ListenPreCommit(void)
Definition: async.c:1037
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:430
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
ListenActionKind action
Definition: async.c:343

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1223 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1224 {
1225  MemoryContext caller_context;
1226 
1227  /* Nothing to do if we didn't send any notifications */
1229  return;
1230 
1231  /*
1232  * We reset the flag immediately; otherwise, if any sort of error occurs
1233  * below, we'd be locked up in an infinite loop, because control will come
1234  * right back here after error cleanup.
1235  */
1237 
1238  /*
1239  * We must preserve the caller's memory context (probably MessageContext)
1240  * across the transaction we do here.
1241  */
1242  caller_context = CurrentMemoryContext;
1243 
1244  if (Trace_notify)
1245  elog(DEBUG1, "ProcessCompletedNotifies");
1246 
1247  /*
1248  * We must run asyncQueueReadAllNotifications inside a transaction, else
1249  * bad things happen if it gets an error.
1250  */
1252 
1253  /* Send signals to other backends */
1254  SignalBackends();
1255 
1256  if (listenChannels != NIL)
1257  {
1258  /* Read the queue ourselves, and send relevant stuff to the frontend */
1260  }
1261 
1262  /*
1263  * If it's time to try to advance the global tail pointer, do that.
1264  */
1266  {
1267  backendTryAdvanceTail = false;
1269  }
1270 
1272 
1273  MemoryContextSwitchTo(caller_context);
1274 
1275  /* We don't need pq_flush() here since postgres.c will do one shortly */
1276 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2898
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:322
static bool backendHasSentNotifications
Definition: async.c:424
static bool backendTryAdvanceTail
Definition: async.c:427
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2166
static void SignalBackends(void)
Definition: async.c:1653
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1911
void StartTransactionCommand(void)
Definition: xact.c:2797
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( void  )
static

Definition at line 2216 of file async.c.

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

2217 {
2218  /* We *must* reset the flag */
2219  notifyInterruptPending = false;
2220 
2221  /* Do nothing else if we aren't actively listening */
2222  if (listenChannels == NIL)
2223  return;
2224 
2225  if (Trace_notify)
2226  elog(DEBUG1, "ProcessIncomingNotify");
2227 
2228  set_ps_display("notify interrupt", false);
2229 
2230  /*
2231  * We must run asyncQueueReadAllNotifications inside a transaction, else
2232  * bad things happen if it gets an error.
2233  */
2235 
2237 
2239 
2240  /*
2241  * Must flush the notify messages to ensure frontend gets them promptly.
2242  */
2243  pq_flush();
2244 
2245  set_ps_display("idle", false);
2246 
2247  if (Trace_notify)
2248  elog(DEBUG1, "ProcessIncomingNotify: done");
2249 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
#define pq_flush()
Definition: libpq.h:39
void CommitTransactionCommand(void)
Definition: xact.c:2898
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
static List * listenChannels
Definition: async.c:322
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1911
void StartTransactionCommand(void)
Definition: xact.c:2797
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1895 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1896 {
1898  return; /* not really idle */
1899 
1900  while (notifyInterruptPending)
1902 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
static void ProcessIncomingNotify(void)
Definition: async.c:2216
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 714 of file async.c.

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, offsetof, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

715 {
716  MemoryContext oldcontext;
717  ListenAction *actrec;
718  int my_level = GetCurrentTransactionNestLevel();
719 
720  /*
721  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
722  * be too complicated to ensure we get the right interactions of
723  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
724  * would be any performance benefit anyway in sane applications.
725  */
727 
728  /* space for terminating null is included in sizeof(ListenAction) */
729  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
730  strlen(channel) + 1);
731  actrec->action = action;
732  strcpy(actrec->channel, channel);
733 
734  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
735  {
736  ActionList *actions;
737 
738  /*
739  * First action in current sub(xact). Note that we allocate the
740  * ActionList in TopTransactionContext; the nestingLevel might get
741  * changed later by AtSubCommit_Notify.
742  */
743  actions = (ActionList *)
745  actions->nestingLevel = my_level;
746  actions->actions = list_make1(actrec);
747  actions->upper = pendingActions;
748  pendingActions = actions;
749  }
750  else
752 
753  MemoryContextSwitchTo(oldcontext);
754 }
MemoryContext TopTransactionContext
Definition: mcxt.c:49
static ActionList * pendingActions
Definition: async.c:354
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define list_make1(x1)
Definition: pg_list.h:227
List * actions
Definition: async.c:350
List * lappend(List *list, void *datum)
Definition: list.c:322
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
void * palloc(Size size)
Definition: mcxt.c:949
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:344
ListenActionKind action
Definition: async.c:343
#define offsetof(type, field)
Definition: c.h:662

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1653 of file async.c.

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by ProcessCompletedNotifies().

1654 {
1655  int32 *pids;
1656  BackendId *ids;
1657  int count;
1658 
1659  /*
1660  * Identify backends that we need to signal. We don't want to send
1661  * signals while holding the AsyncQueueLock, so this loop just builds a
1662  * list of target PIDs.
1663  *
1664  * XXX in principle these pallocs could fail, which would be bad. Maybe
1665  * preallocate the arrays? But in practice this is only run in trivial
1666  * transactions, so there should surely be space available.
1667  */
1668  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1669  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1670  count = 0;
1671 
1672  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1674  {
1675  int32 pid = QUEUE_BACKEND_PID(i);
1676  QueuePosition pos;
1677 
1678  Assert(pid != InvalidPid);
1679  if (pid == MyProcPid)
1680  continue; /* never signal self */
1681  pos = QUEUE_BACKEND_POS(i);
1683  {
1684  /*
1685  * Always signal listeners in our own database, unless they're
1686  * already caught up (unlikely, but possible).
1687  */
1688  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1689  continue;
1690  }
1691  else
1692  {
1693  /*
1694  * Listeners in other databases should be signaled only if they
1695  * are far behind.
1696  */
1699  continue;
1700  }
1701  /* OK, need to signal this one */
1702  pids[count] = pid;
1703  ids[count] = i;
1704  count++;
1705  }
1706  LWLockRelease(AsyncQueueLock);
1707 
1708  /* Now send signals */
1709  for (int i = 0; i < count; i++)
1710  {
1711  int32 pid = pids[i];
1712 
1713  /*
1714  * Note: assuming things aren't broken, a signal failure here could
1715  * only occur if the target backend exited since we released
1716  * AsyncQueueLock; which is unlikely but certainly possible. So we
1717  * just log a low-level debug message if it happens.
1718  */
1719  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1720  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1721  }
1722 
1723  pfree(pids);
1724  pfree(ids);
1725 }
int MyProcPid
Definition: globals.c:40
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
#define DEBUG3
Definition: elog.h:23
#define QUEUE_HEAD
Definition: async.c:281
signed int int32
Definition: c.h:347
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:468
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:179
void pfree(void *pointer)
Definition: mcxt.c:1056
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
int MaxBackends
Definition: globals.c:135
#define QUEUE_CLEANUP_DELAY
Definition: async.c:228
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:739
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
void * palloc(Size size)
Definition: mcxt.c:949
#define elog(elevel,...)
Definition: elog.h:228
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define InvalidPid
Definition: miscadmin.h:32

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ AsyncCtlData

SlruCtlData AsyncCtlData
static

Definition at line 292 of file async.c.

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 279 of file async.c.

◆ backendHasSentNotifications

bool backendHasSentNotifications = false
static

Definition at line 424 of file async.c.

Referenced by PreCommit_Notify(), and ProcessCompletedNotifies().

◆ backendTryAdvanceTail

bool backendTryAdvanceTail = false
static

Definition at line 427 of file async.c.

Referenced by asyncQueueAddEntries(), and ProcessCompletedNotifies().

◆ listenChannels

List* listenChannels = NIL
static

Definition at line 322 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

ActionList* pendingActions = NULL
static

Definition at line 354 of file async.c.

Referenced by AtSubAbort_Notify(), AtSubCommit_Notify(), and queue_listen().

◆ pendingNotifies

NotificationList* pendingNotifies = NULL
static

Definition at line 406 of file async.c.

Referenced by Async_Notify(), AtSubAbort_Notify(), and AtSubCommit_Notify().

◆ Trace_notify

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 418 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().