PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/hashutils.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)   ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define AsyncCtl   (&AsyncCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL }
 

Functions

static int asyncQueuePageDiff (int p, int q)
 
static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (void)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void ProcessCompletedNotifies (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData AsyncCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool backendHasSentNotifications = false
 
static bool backendTryAdvanceTail = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncCtl

#define AsyncCtl   (&AsyncCtlData)

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 182 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 399 of file async.c.

Referenced by AddEventToPendingNotifies().

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 156 of file async.c.

Referenced by Async_Notify(), and asyncQueueNotificationToEntry().

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

◆ QUEUE_BACKEND_PID

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 228 of file async.c.

Referenced by asyncQueueAddEntries(), and SignalBackends().

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 296 of file async.c.

Referenced by asyncQueueFillWarning().

◆ QUEUE_HEAD

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

Definition at line 315 of file async.c.

Referenced by asyncQueueAdvance(), asyncQueueIsFull(), asyncQueuePageDiff(), and asyncQueueUsage().

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
 
)    ((x).page == (y).page && (x).offset == (y).offset)

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489

Definition at line 212 of file async.c.

Referenced by Exec_ListenPreCommit().

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489

Definition at line 206 of file async.c.

Referenced by asyncQueueAdvanceTail(), and asyncQueueFillWarning().

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

◆ QUEUE_POS_PAGE

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 180 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 196 of file async.c.

Referenced by asyncQueueAdvance(), and AsyncShmemInit().

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 334 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2312 of file async.c.

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MemSet, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), and notification_match().

Referenced by Async_Notify(), and AtSubCommit_Notify().

2313 {
2315 
2316  /* Create the hash table if it's time to */
2318  pendingNotifies->hashtab == NULL)
2319  {
2320  HASHCTL hash_ctl;
2321  ListCell *l;
2322 
2323  /* Create the hash table */
2324  MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2325  hash_ctl.keysize = sizeof(Notification *);
2326  hash_ctl.entrysize = sizeof(NotificationHash);
2327  hash_ctl.hash = notification_hash;
2328  hash_ctl.match = notification_match;
2329  hash_ctl.hcxt = CurTransactionContext;
2331  hash_create("Pending Notifies",
2332  256L,
2333  &hash_ctl,
2335 
2336  /* Insert all the already-existing events */
2337  foreach(l, pendingNotifies->events)
2338  {
2339  Notification *oldn = (Notification *) lfirst(l);
2340  NotificationHash *hentry;
2341  bool found;
2342 
2344  &oldn,
2345  HASH_ENTER,
2346  &found);
2347  Assert(!found);
2348  hentry->event = oldn;
2349  }
2350  }
2351 
2352  /* Add new event to the list, in order */
2354 
2355  /* Add event to the hash table if needed */
2356  if (pendingNotifies->hashtab != NULL)
2357  {
2358  NotificationHash *hentry;
2359  bool found;
2360 
2362  &n,
2363  HASH_ENTER,
2364  &found);
2365  Assert(!found);
2366  hentry->event = n;
2367  }
2368 }
#define NIL
Definition: pg_list.h:65
struct NotificationHash NotificationHash
List * events
Definition: async.c:394
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define MemSet(start, val, len)
Definition: c.h:955
Notification * event
Definition: async.c:403
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static NotificationList * pendingNotifies
Definition: async.c:406
List * lappend(List *list, void *datum)
Definition: list.c:322
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:399
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
HashCompareFunc match
Definition: hsearch.h:75
HTAB * hashtab
Definition: async.c:395
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2390
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2376
#define HASH_COMPARE
Definition: hsearch.h:90
static int list_length(const List *l)
Definition: pg_list.h:169
HashValueFunc hash
Definition: hsearch.h:74
#define HASH_FUNCTION
Definition: hsearch.h:89

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 762 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

763 {
764  if (Trace_notify)
765  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
766 
767  queue_listen(LISTEN_LISTEN, channel);
768 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 615 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

616 {
617  int my_level = GetCurrentTransactionNestLevel();
618  size_t channel_len;
619  size_t payload_len;
620  Notification *n;
621  MemoryContext oldcontext;
622 
623  if (IsParallelWorker())
624  elog(ERROR, "cannot send notifications from a parallel worker");
625 
626  if (Trace_notify)
627  elog(DEBUG1, "Async_Notify(%s)", channel);
628 
629  channel_len = channel ? strlen(channel) : 0;
630  payload_len = payload ? strlen(payload) : 0;
631 
632  /* a channel name must be specified */
633  if (channel_len == 0)
634  ereport(ERROR,
635  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
636  errmsg("channel name cannot be empty")));
637 
638  /* enforce length limits */
639  if (channel_len >= NAMEDATALEN)
640  ereport(ERROR,
641  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
642  errmsg("channel name too long")));
643 
644  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
645  ereport(ERROR,
646  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
647  errmsg("payload string too long")));
648 
649  /*
650  * We must construct the Notification entry, even if we end up not using
651  * it, in order to compare it cheaply to existing list entries.
652  *
653  * The notification list needs to live until end of transaction, so store
654  * it in the transaction context.
655  */
657 
658  n = (Notification *) palloc(offsetof(Notification, data) +
659  channel_len + payload_len + 2);
660  n->channel_len = channel_len;
661  n->payload_len = payload_len;
662  strcpy(n->data, channel);
663  if (payload)
664  strcpy(n->data + channel_len + 1, payload);
665  else
666  n->data[channel_len + 1] = '\0';
667 
668  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
669  {
670  NotificationList *notifies;
671 
672  /*
673  * First notify event in current (sub)xact. Note that we allocate the
674  * NotificationList in TopTransactionContext; the nestingLevel might
675  * get changed later by AtSubCommit_Notify.
676  */
677  notifies = (NotificationList *)
679  sizeof(NotificationList));
680  notifies->nestingLevel = my_level;
681  notifies->events = list_make1(n);
682  /* We certainly don't need a hashtable yet */
683  notifies->hashtab = NULL;
684  notifies->upper = pendingNotifies;
685  pendingNotifies = notifies;
686  }
687  else
688  {
689  /* Now check for duplicates */
691  {
692  /* It's a dup, so forget it */
693  pfree(n);
694  MemoryContextSwitchTo(oldcontext);
695  return;
696  }
697 
698  /* Append more events to existing list */
700  }
701 
702  MemoryContextSwitchTo(oldcontext);
703 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:394
MemoryContext TopTransactionContext
Definition: mcxt.c:49
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:570
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2271
int nestingLevel
Definition: async.c:393
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2312
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:396
#define ereport(elevel, rest)
Definition: elog.h:141
#define IsParallelWorker()
Definition: parallel.h:60
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
HTAB * hashtab
Definition: async.c:395
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386
bool Trace_notify
Definition: async.c:430
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:784
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:226
#define offsetof(type, field)
Definition: c.h:655

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 776 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

777 {
778  if (Trace_notify)
779  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
780 
781  /* If we couldn't possibly be listening, no need to queue anything */
782  if (pendingActions == NULL && !unlistenExitRegistered)
783  return;
784 
785  queue_listen(LISTEN_UNLISTEN, channel);
786 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
static ActionList * pendingActions
Definition: async.c:354
static bool unlistenExitRegistered
Definition: async.c:418
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 794 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

795 {
796  if (Trace_notify)
797  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
798 
799  /* If we couldn't possibly be listening, no need to queue anything */
800  if (pendingActions == NULL && !unlistenExitRegistered)
801  return;
802 
804 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
static ActionList * pendingActions
Definition: async.c:354
static bool unlistenExitRegistered
Definition: async.c:418
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 847 of file async.c.

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

848 {
851 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1192
static void asyncQueueUnregister(void)
Definition: async.c:1308

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2271 of file async.c.

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, and Notification::payload_len.

Referenced by Async_Notify(), and AtSubCommit_Notify().

2272 {
2273  if (pendingNotifies == NULL)
2274  return false;
2275 
2276  if (pendingNotifies->hashtab != NULL)
2277  {
2278  /* Use the hash table to probe for a match */
2280  &n,
2281  HASH_FIND,
2282  NULL))
2283  return true;
2284  }
2285  else
2286  {
2287  /* Must scan the event list */
2288  ListCell *l;
2289 
2290  foreach(l, pendingNotifies->events)
2291  {
2292  Notification *oldn = (Notification *) lfirst(l);
2293 
2294  if (n->channel_len == oldn->channel_len &&
2295  n->payload_len == oldn->payload_len &&
2296  memcmp(n->data, oldn->data,
2297  n->channel_len + n->payload_len + 2) == 0)
2298  return true;
2299  }
2300  }
2301 
2302  return false;
2303 }
List * events
Definition: async.c:394
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static NotificationList * pendingNotifies
Definition: async.c:406
HTAB * hashtab
Definition: async.c:395
#define lfirst(lc)
Definition: pg_list.h:190
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1451 of file async.c.

References AsyncCtl, asyncQueueAdvance(), asyncQueueNotificationToEntry(), backendTryAdvanceTail, AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), and SimpleLruZeroPage().

Referenced by PreCommit_Notify().

1452 {
1453  AsyncQueueEntry qe;
1454  QueuePosition queue_head;
1455  int pageno;
1456  int offset;
1457  int slotno;
1458 
1459  /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
1460  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
1461 
1462  /*
1463  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1464  * memory upon exiting. The reason for this is that if we have to advance
1465  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1466  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1467  * subsequent insertions would try to put entries into a page that slru.c
1468  * thinks doesn't exist yet.) So, use a local position variable. Note
1469  * that if we do fail, any already-inserted queue entries are forgotten;
1470  * this is okay, since they'd be useless anyway after our transaction
1471  * rolls back.
1472  */
1473  queue_head = QUEUE_HEAD;
1474 
1475  /* Fetch the current page */
1476  pageno = QUEUE_POS_PAGE(queue_head);
1477  slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
1478  /* Note we mark the page dirty before writing in it */
1479  AsyncCtl->shared->page_dirty[slotno] = true;
1480 
1481  while (nextNotify != NULL)
1482  {
1483  Notification *n = (Notification *) lfirst(nextNotify);
1484 
1485  /* Construct a valid queue entry in local variable qe */
1487 
1488  offset = QUEUE_POS_OFFSET(queue_head);
1489 
1490  /* Check whether the entry really fits on the current page */
1491  if (offset + qe.length <= QUEUE_PAGESIZE)
1492  {
1493  /* OK, so advance nextNotify past this item */
1494  nextNotify = lnext(pendingNotifies->events, nextNotify);
1495  }
1496  else
1497  {
1498  /*
1499  * Write a dummy entry to fill up the page. Actually readers will
1500  * only check dboid and since it won't match any reader's database
1501  * OID, they will ignore this entry and move on.
1502  */
1503  qe.length = QUEUE_PAGESIZE - offset;
1504  qe.dboid = InvalidOid;
1505  qe.data[0] = '\0'; /* empty channel */
1506  qe.data[1] = '\0'; /* empty payload */
1507  }
1508 
1509  /* Now copy qe into the shared buffer page */
1510  memcpy(AsyncCtl->shared->page_buffer[slotno] + offset,
1511  &qe,
1512  qe.length);
1513 
1514  /* Advance queue_head appropriately, and detect if page is full */
1515  if (asyncQueueAdvance(&(queue_head), qe.length))
1516  {
1517  /*
1518  * Page is full, so we're done here, but first fill the next page
1519  * with zeroes. The reason to do this is to ensure that slru.c's
1520  * idea of the head page is always the same as ours, which avoids
1521  * boundary problems in SimpleLruTruncate. The test in
1522  * asyncQueueIsFull() ensured that there is room to create this
1523  * page without overrunning the queue.
1524  */
1525  slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
1526 
1527  /*
1528  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1529  * set flag to remember that we should try to advance the tail
1530  * pointer (we don't want to actually do that right here).
1531  */
1532  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1533  backendTryAdvanceTail = true;
1534 
1535  /* And exit the loop */
1536  break;
1537  }
1538  }
1539 
1540  /* Success, so update the global QUEUE_HEAD */
1541  QUEUE_HEAD = queue_head;
1542 
1543  LWLockRelease(AsyncCtlLock);
1544 
1545  return nextNotify;
1546 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
List * events
Definition: async.c:394
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:321
#define AsyncCtl
Definition: async.c:294
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1415
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1380
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static NotificationList * pendingNotifies
Definition: async.c:406
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:375
static bool backendTryAdvanceTail
Definition: async.c:427
#define QUEUE_PAGESIZE
Definition: async.c:295
#define InvalidTransactionId
Definition: transam.h:31
#define QUEUE_CLEANUP_DELAY
Definition: async.c:228
#define InvalidOid
Definition: postgres_ext.h:36
#define lfirst(lc)
Definition: pg_list.h:190
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:263
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1380 of file async.c.

References Assert, AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

1381 {
1382  int pageno = QUEUE_POS_PAGE(*position);
1383  int offset = QUEUE_POS_OFFSET(*position);
1384  bool pageJump = false;
1385 
1386  /*
1387  * Move to the next writing position: First jump over what we have just
1388  * written or read.
1389  */
1390  offset += entryLength;
1391  Assert(offset <= QUEUE_PAGESIZE);
1392 
1393  /*
1394  * In a second step check if another entry can possibly be written to the
1395  * page. If so, stay here, we have reached the next position. If not, then
1396  * we need to move on to the next page.
1397  */
1399  {
1400  pageno++;
1401  if (pageno > QUEUE_MAX_PAGE)
1402  pageno = 0; /* wrap around */
1403  offset = 0;
1404  pageJump = true;
1405  }
1406 
1407  SET_QUEUE_POS(*position, pageno, offset);
1408  return pageJump;
1409 }
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
#define AsyncQueueEntryEmptySize
Definition: async.c:182
#define QUEUE_PAGESIZE
Definition: async.c:295
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
#define QUEUEALIGN(len)
Definition: async.c:180
#define Assert(condition)
Definition: c.h:732
#define QUEUE_MAX_PAGE
Definition: async.c:315
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2157 of file async.c.

References Assert, AsyncCtl, asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by ProcessCompletedNotifies().

2158 {
2159  QueuePosition min;
2160  int oldtailpage;
2161  int newtailpage;
2162  int boundary;
2163 
2164  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
2165  min = QUEUE_HEAD;
2167  {
2169  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2170  }
2171  oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
2172  QUEUE_TAIL = min;
2173  LWLockRelease(AsyncQueueLock);
2174 
2175  /*
2176  * We can truncate something if the global tail advanced across an SLRU
2177  * segment boundary.
2178  *
2179  * XXX it might be better to truncate only once every several segments, to
2180  * reduce the number of directory scans.
2181  */
2182  newtailpage = QUEUE_POS_PAGE(min);
2183  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2184  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2185  {
2186  /*
2187  * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
2188  * the lock again.
2189  */
2190  SimpleLruTruncate(AsyncCtl, newtailpage);
2191  }
2192 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1185
#define AsyncCtl
Definition: async.c:294
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_MIN(x, y)
Definition: async.c:206
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1599 of file async.c.

References Assert, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

1600 {
1601  double fillDegree;
1602  TimestampTz t;
1603 
1604  fillDegree = asyncQueueUsage();
1605  if (fillDegree < 0.5)
1606  return;
1607 
1608  t = GetCurrentTimestamp();
1609 
1612  {
1613  QueuePosition min = QUEUE_HEAD;
1614  int32 minPid = InvalidPid;
1615 
1617  {
1619  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1620  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1621  minPid = QUEUE_BACKEND_PID(i);
1622  }
1623 
1624  ereport(WARNING,
1625  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1626  (minPid != InvalidPid ?
1627  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1628  : 0),
1629  (minPid != InvalidPid ?
1630  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1631  : 0)));
1632 
1634  }
1635 }
int errhint(const char *fmt,...)
Definition: elog.c:974
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
static double asyncQueueUsage(void)
Definition: async.c:1570
#define QUEUE_HEAD
Definition: async.c:281
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
signed int int32
Definition: c.h:346
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
int errdetail(const char *fmt,...)
Definition: elog.c:860
static AsyncQueueControl * asyncQueueControl
Definition: async.c:279
#define ereport(elevel, rest)
Definition: elog.h:141
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:296
#define WARNING
Definition: elog.h:40
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:732
int errmsg(const char *fmt,...)
Definition: elog.c:784
int i
TimestampTz lastQueueFillWarn
Definition: async.c:274
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MIN(x, y)
Definition: async.c:206
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1349 of file async.c.

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_TAIL, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

1350 {
1351  int nexthead;
1352  int boundary;
1353 
1354  /*
1355  * The queue is full if creating a new head page would create a page that
1356  * logically precedes the current global tail pointer, ie, the head
1357  * pointer would wrap around compared to the tail. We cannot create such
1358  * a head page for fear of confusing slru.c. For safety we round the tail
1359  * pointer back to a segment boundary (compare the truncation logic in
1360  * asyncQueueAdvanceTail).
1361  *
1362  * Note that this test is *not* dependent on how much space there is on
1363  * the current head page. This is necessary because asyncQueueAddEntries
1364  * might try to create the next head page in any case.
1365  */
1366  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1367  if (nexthead > QUEUE_MAX_PAGE)
1368  nexthead = 0; /* wrap around */
1369  boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
1370  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1371  return asyncQueuePagePrecedes(nexthead, boundary);
1372 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_HEAD
Definition: async.c:281
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_MAX_PAGE
Definition: async.c:315
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1415 of file async.c.

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, AsyncQueueEntry::data, Notification::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

1416 {
1417  size_t channellen = n->channel_len;
1418  size_t payloadlen = n->payload_len;
1419  int entryLength;
1420 
1421  Assert(channellen < NAMEDATALEN);
1422  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1423 
1424  /* The terminators are already included in AsyncQueueEntryEmptySize */
1425  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1426  entryLength = QUEUEALIGN(entryLength);
1427  qe->length = entryLength;
1428  qe->dboid = MyDatabaseId;
1429  qe->xid = GetCurrentTransactionId();
1430  qe->srcPid = MyProcPid;
1431  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1432 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
int MyProcPid
Definition: globals.c:40
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
#define NAMEDATALEN
#define AsyncQueueEntryEmptySize
Definition: async.c:182
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
#define QUEUEALIGN(len)
Definition: async.c:180
Oid MyDatabaseId
Definition: globals.c:85
#define Assert(condition)
Definition: c.h:732
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175

◆ asyncQueuePageDiff()

static int asyncQueuePageDiff ( int  p,
int  q 
)
static

Definition at line 468 of file async.c.

References Assert, and QUEUE_MAX_PAGE.

Referenced by asyncQueuePagePrecedes(), and SignalBackends().

469 {
470  int diff;
471 
472  /*
473  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
474  * in the range 0..QUEUE_MAX_PAGE.
475  */
476  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
477  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
478 
479  diff = p - q;
480  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
481  diff -= QUEUE_MAX_PAGE + 1;
482  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
483  diff += QUEUE_MAX_PAGE + 1;
484  return diff;
485 }
#define Assert(condition)
Definition: c.h:732
#define QUEUE_MAX_PAGE
Definition: async.c:315

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 489 of file async.c.

References asyncQueuePageDiff().

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

490 {
491  return asyncQueuePageDiff(p, q) < 0;
492 }
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:468

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2068 of file async.c.

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

2072 {
2073  bool reachedStop = false;
2074  bool reachedEndOfPage;
2075  AsyncQueueEntry *qe;
2076 
2077  do
2078  {
2079  QueuePosition thisentry = *current;
2080 
2081  if (QUEUE_POS_EQUAL(thisentry, stop))
2082  break;
2083 
2084  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2085 
2086  /*
2087  * Advance *current over this message, possibly to the next page. As
2088  * noted in the comments for asyncQueueReadAllNotifications, we must
2089  * do this before possibly failing while processing the message.
2090  */
2091  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2092 
2093  /* Ignore messages destined for other databases */
2094  if (qe->dboid == MyDatabaseId)
2095  {
2096  if (XidInMVCCSnapshot(qe->xid, snapshot))
2097  {
2098  /*
2099  * The source transaction is still in progress, so we can't
2100  * process this message yet. Break out of the loop, but first
2101  * back up *current so we will reprocess the message next
2102  * time. (Note: it is unlikely but not impossible for
2103  * TransactionIdDidCommit to fail, so we can't really avoid
2104  * this advance-then-back-up behavior when dealing with an
2105  * uncommitted message.)
2106  *
2107  * Note that we must test XidInMVCCSnapshot before we test
2108  * TransactionIdDidCommit, else we might return a message from
2109  * a transaction that is not yet visible to snapshots; compare
2110  * the comments at the head of heapam_visibility.c.
2111  *
2112  * Also, while our own xact won't be listed in the snapshot,
2113  * we need not check for TransactionIdIsCurrentTransactionId
2114  * because our transaction cannot (yet) have queued any
2115  * messages.
2116  */
2117  *current = thisentry;
2118  reachedStop = true;
2119  break;
2120  }
2121  else if (TransactionIdDidCommit(qe->xid))
2122  {
2123  /* qe->data is the null-terminated channel name */
2124  char *channel = qe->data;
2125 
2126  if (IsListeningOn(channel))
2127  {
2128  /* payload follows channel name */
2129  char *payload = qe->data + strlen(channel) + 1;
2130 
2131  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2132  }
2133  }
2134  else
2135  {
2136  /*
2137  * The source transaction aborted or crashed, so we just
2138  * ignore its notifications.
2139  */
2140  }
2141  }
2142 
2143  /* Loop back if we're not at end of page */
2144  } while (!reachedEndOfPage);
2145 
2146  if (QUEUE_POS_EQUAL(*current, stop))
2147  reachedStop = true;
2148 
2149  return reachedStop;
2150 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2241
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1380
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
static bool IsListeningOn(const char *channel)
Definition: async.c:1289
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2246

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1909 of file async.c.

References Assert, AsyncCtl, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), ProcessCompletedNotifies(), and ProcessIncomingNotify().

1910 {
1911  volatile QueuePosition pos;
1912  QueuePosition oldpos;
1913  QueuePosition head;
1914  Snapshot snapshot;
1915 
1916  /* page_buffer must be adequately aligned, so use a union */
1917  union
1918  {
1919  char buf[QUEUE_PAGESIZE];
1920  AsyncQueueEntry align;
1921  } page_buffer;
1922 
1923  /* Fetch current state */
1924  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1925  /* Assert checks that we have a valid state entry */
1927  pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
1928  head = QUEUE_HEAD;
1929  LWLockRelease(AsyncQueueLock);
1930 
1931  if (QUEUE_POS_EQUAL(pos, head))
1932  {
1933  /* Nothing to do, we have read all notifications already. */
1934  return;
1935  }
1936 
1937  /* Get snapshot we'll use to decide which xacts are still in progress */
1938  snapshot = RegisterSnapshot(GetLatestSnapshot());
1939 
1940  /*----------
1941  * Note that we deliver everything that we see in the queue and that
1942  * matches our _current_ listening state.
1943  * Especially we do not take into account different commit times.
1944  * Consider the following example:
1945  *
1946  * Backend 1: Backend 2:
1947  *
1948  * transaction starts
1949  * NOTIFY foo;
1950  * commit starts
1951  * transaction starts
1952  * LISTEN foo;
1953  * commit starts
1954  * commit to clog
1955  * commit to clog
1956  *
1957  * It could happen that backend 2 sees the notification from backend 1 in
1958  * the queue. Even though the notifying transaction committed before
1959  * the listening transaction, we still deliver the notification.
1960  *
1961  * The idea is that an additional notification does not do any harm, we
1962  * just need to make sure that we do not miss a notification.
1963  *
1964  * It is possible that we fail while trying to send a message to our
1965  * frontend (for example, because of encoding conversion failure).
1966  * If that happens it is critical that we not try to send the same
1967  * message over and over again. Therefore, we place a PG_TRY block
1968  * here that will forcibly advance our backend position before we lose
1969  * control to an error. (We could alternatively retake AsyncQueueLock
1970  * and move the position before handling each individual message, but
1971  * that seems like too much lock traffic.)
1972  *----------
1973  */
1974  PG_TRY();
1975  {
1976  bool reachedStop;
1977 
1978  do
1979  {
1980  int curpage = QUEUE_POS_PAGE(pos);
1981  int curoffset = QUEUE_POS_OFFSET(pos);
1982  int slotno;
1983  int copysize;
1984 
1985  /*
1986  * We copy the data from SLRU into a local buffer, so as to avoid
1987  * holding the AsyncCtlLock while we are examining the entries and
1988  * possibly transmitting them to our frontend. Copy only the part
1989  * of the page we will actually inspect.
1990  */
1991  slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
1993  if (curpage == QUEUE_POS_PAGE(head))
1994  {
1995  /* we only want to read as far as head */
1996  copysize = QUEUE_POS_OFFSET(head) - curoffset;
1997  if (copysize < 0)
1998  copysize = 0; /* just for safety */
1999  }
2000  else
2001  {
2002  /* fetch all the rest of the page */
2003  copysize = QUEUE_PAGESIZE - curoffset;
2004  }
2005  memcpy(page_buffer.buf + curoffset,
2006  AsyncCtl->shared->page_buffer[slotno] + curoffset,
2007  copysize);
2008  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2009  LWLockRelease(AsyncCtlLock);
2010 
2011  /*
2012  * Process messages up to the stop position, end of page, or an
2013  * uncommitted message.
2014  *
2015  * Our stop position is what we found to be the head's position
2016  * when we entered this function. It might have changed already.
2017  * But if it has, we will receive (or have already received and
2018  * queued) another signal and come here again.
2019  *
2020  * We are not holding AsyncQueueLock here! The queue can only
2021  * extend beyond the head pointer (see above) and we leave our
2022  * backend's pointer where it is so nobody will truncate or
2023  * rewrite pages under us. Especially we don't want to hold a lock
2024  * while sending the notifications to the frontend.
2025  */
2026  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2027  page_buffer.buf,
2028  snapshot);
2029  } while (!reachedStop);
2030  }
2031  PG_CATCH();
2032  {
2033  /* Update shared state */
2034  LWLockAcquire(AsyncQueueLock, LW_SHARED);
2036  LWLockRelease(AsyncQueueLock);
2037 
2038  PG_RE_THROW();
2039  }
2040  PG_END_TRY();
2041 
2042  /* Update shared state */
2043  LWLockAcquire(AsyncQueueLock, LW_SHARED);
2045  LWLockRelease(AsyncQueueLock);
2046 
2047  /* Done with snapshot */
2048  UnregisterSnapshot(snapshot);
2049 }
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:865
#define AsyncCtl
Definition: async.c:294
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static char * buf
Definition: pg_test_fsync.c:68
#define QUEUE_PAGESIZE
Definition: async.c:295
#define InvalidTransactionId
Definition: transam.h:31
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:907
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:467
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2068
#define PG_CATCH()
Definition: elog.h:310
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define PG_RE_THROW()
Definition: elog.h:331
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:381
#define PG_TRY()
Definition: elog.h:301
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define PG_END_TRY()
Definition: elog.h:317
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1308 of file async.c.

References amRegisteredListener, Assert, i, InvalidBackendId, InvalidOid, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

1309 {
1310  Assert(listenChannels == NIL); /* else caller error */
1311 
1312  if (!amRegisteredListener) /* nothing to do */
1313  return;
1314 
1315  /*
1316  * Need exclusive lock here to manipulate list links.
1317  */
1318  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1319  /* Mark our entry as invalid */
1322  /* and remove it from the list */
1325  else
1326  {
1328  {
1330  {
1332  break;
1333  }
1334  }
1335  }
1337  LWLockRelease(AsyncQueueLock);
1338 
1339  /* mark ourselves as no longer listed in the global array */
1340  amRegisteredListener = false;
1341 }
#define NIL
Definition: pg_list.h:65
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
static List * listenChannels
Definition: async.c:322
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:421
int BackendId
Definition: backendid.h:21
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1570 of file async.c.

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

1571 {
1572  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1573  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1574  int occupied;
1575 
1576  occupied = headPage - tailPage;
1577 
1578  if (occupied == 0)
1579  return (double) 0; /* fast exit for common case */
1580 
1581  if (occupied < 0)
1582  {
1583  /* head has wrapped around, tail not yet */
1584  occupied += QUEUE_MAX_PAGE + 1;
1585  }
1586 
1587  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1588 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_HEAD
Definition: async.c:281
#define QUEUE_MAX_PAGE
Definition: async.c:315
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 515 of file async.c.

References add_size(), AsyncCtl, asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWTRANCHE_ASYNC_BUFFERS, MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

516 {
517  bool found;
518  int slotno;
519  Size size;
520 
521  /*
522  * Create or attach to the AsyncQueueControl structure.
523  *
524  * The used entries in the backend[] array run from 1 to MaxBackends; the
525  * zero'th entry is unused but must be allocated.
526  */
527  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
528  size = add_size(size, offsetof(AsyncQueueControl, backend));
529 
531  ShmemInitStruct("Async Queue Control", size, &found);
532 
533  if (!found)
534  {
535  /* First time through, so initialize it */
536  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
537  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
540  /* zero'th entry won't be used, but let's initialize it anyway */
541  for (int i = 0; i <= MaxBackends; i++)
542  {
547  }
548  }
549 
550  /*
551  * Set up SLRU management of the pg_notify data.
552  */
553  AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
555  AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
556  /* Override default assumption that writes should be fsync'd */
557  AsyncCtl->do_fsync = false;
558 
559  if (!found)
560  {
561  /*
562  * During start or reboot, clean out the pg_notify directory.
563  */
565 
566  /* Now initialize page zero to empty */
567  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
569  /* This write is just to verify that pg_notify/ is writable */
570  SimpleLruWritePage(AsyncCtl, slotno);
571  LWLockRelease(AsyncCtlLock);
572  }
573 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1370
#define AsyncCtl
Definition: async.c:294
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:135
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:578
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489
static AsyncQueueControl * asyncQueueControl
Definition: async.c:279
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1393
size_t Size
Definition: c.h:466
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
TimestampTz lastQueueFillWarn
Definition: async.c:274
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:263
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define offsetof(type, field)
Definition: c.h:655
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:165

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 498 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

499 {
500  Size size;
501 
502  /* This had better match AsyncShmemInit */
503  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
504  size = add_size(size, offsetof(AsyncQueueControl, backend));
505 
507 
508  return size;
509 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:145
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:135
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:655

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1735 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1736 {
1737  /*
1738  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1739  * we have registered as a listener but have not made any entry in
1740  * listenChannels. In that case, deregister again.
1741  */
1744 
1745  /* And clean up */
1747 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:322
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2406
static void asyncQueueUnregister(void)
Definition: async.c:1308
static bool amRegisteredListener
Definition: async.c:421

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 987 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

988 {
989  ListCell *p;
990 
991  /*
992  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
993  * return as soon as possible
994  */
996  return;
997 
998  if (Trace_notify)
999  elog(DEBUG1, "AtCommit_Notify");
1000 
1001  /* Perform any pending listen/unlisten actions */
1002  if (pendingActions != NULL)
1003  {
1004  foreach(p, pendingActions->actions)
1005  {
1006  ListenAction *actrec = (ListenAction *) lfirst(p);
1007 
1008  switch (actrec->action)
1009  {
1010  case LISTEN_LISTEN:
1011  Exec_ListenCommit(actrec->channel);
1012  break;
1013  case LISTEN_UNLISTEN:
1014  Exec_UnlistenCommit(actrec->channel);
1015  break;
1016  case LISTEN_UNLISTEN_ALL:
1018  break;
1019  }
1020  }
1021  }
1022 
1023  /* If no longer listening to anything, get out of listener array */
1026 
1027  /* And clean up */
1029 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:354
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1192
static List * listenChannels
Definition: async.c:322
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2406
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1161
static NotificationList * pendingNotifies
Definition: async.c:406
List * actions
Definition: async.c:350
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1134
static void asyncQueueUnregister(void)
Definition: async.c:1308
static bool amRegisteredListener
Definition: async.c:421
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:344
ListenActionKind action
Definition: async.c:343

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 860 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

861 {
862  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
864  ereport(ERROR,
865  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
866  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
867 }
static ActionList * pendingActions
Definition: async.c:354
int errcode(int sqlerrcode)
Definition: elog.c:570
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1825 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1826 {
1827  int my_level = GetCurrentTransactionNestLevel();
1828 
1829  /*
1830  * All we have to do is pop the stack --- the actions/notifies made in
1831  * this subxact are no longer interesting, and the space will be freed
1832  * when CurTransactionContext is recycled. We still have to free the
1833  * ActionList and NotificationList objects themselves, though, because
1834  * those are allocated in TopTransactionContext.
1835  *
1836  * Note that there might be no entries at all, or no entries for the
1837  * current subtransaction level, either because none were ever created,
1838  * or because we reentered this routine due to trouble during subxact
1839  * abort.
1840  */
1841  while (pendingActions != NULL &&
1842  pendingActions->nestingLevel >= my_level)
1843  {
1844  ActionList *childPendingActions = pendingActions;
1845 
1847  pfree(childPendingActions);
1848  }
1849 
1850  while (pendingNotifies != NULL &&
1851  pendingNotifies->nestingLevel >= my_level)
1852  {
1853  NotificationList *childPendingNotifies = pendingNotifies;
1854 
1856  pfree(childPendingNotifies);
1857  }
1858 }
static ActionList * pendingActions
Definition: async.c:354
int nestingLevel
Definition: async.c:393
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
struct NotificationList * upper
Definition: async.c:396
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1755 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1756 {
1757  int my_level = GetCurrentTransactionNestLevel();
1758 
1759  /* If there are actions at our nesting level, we must reparent them. */
1760  if (pendingActions != NULL &&
1761  pendingActions->nestingLevel >= my_level)
1762  {
1763  if (pendingActions->upper == NULL ||
1764  pendingActions->upper->nestingLevel < my_level - 1)
1765  {
1766  /* nothing to merge; give the whole thing to the parent */
1768  }
1769  else
1770  {
1771  ActionList *childPendingActions = pendingActions;
1772 
1774 
1775  /*
1776  * Mustn't try to eliminate duplicates here --- see queue_listen()
1777  */
1780  childPendingActions->actions);
1781  pfree(childPendingActions);
1782  }
1783  }
1784 
1785  /* If there are notifies at our nesting level, we must reparent them. */
1786  if (pendingNotifies != NULL &&
1787  pendingNotifies->nestingLevel >= my_level)
1788  {
1789  Assert(pendingNotifies->nestingLevel == my_level);
1790 
1791  if (pendingNotifies->upper == NULL ||
1792  pendingNotifies->upper->nestingLevel < my_level - 1)
1793  {
1794  /* nothing to merge; give the whole thing to the parent */
1796  }
1797  else
1798  {
1799  /*
1800  * Formerly, we didn't bother to eliminate duplicates here, but
1801  * now we must, else we fall foul of "Assert(!found)", either here
1802  * or during a later attempt to build the parent-level hashtable.
1803  */
1804  NotificationList *childPendingNotifies = pendingNotifies;
1805  ListCell *l;
1806 
1808  /* Insert all the subxact's events into parent, except for dups */
1809  foreach(l, childPendingNotifies->events)
1810  {
1811  Notification *childn = (Notification *) lfirst(l);
1812 
1813  if (!AsyncExistsPendingNotify(childn))
1814  AddEventToPendingNotifies(childn);
1815  }
1816  pfree(childPendingNotifies);
1817  }
1818  }
1819 }
List * events
Definition: async.c:394
static ActionList * pendingActions
Definition: async.c:354
List * list_concat(List *list1, const List *list2)
Definition: list.c:516
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2271
int nestingLevel
Definition: async.c:393
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
List * actions
Definition: async.c:350
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2312
struct NotificationList * upper
Definition: async.c:396
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2406 of file async.c.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

2407 {
2408  /*
2409  * Everything's allocated in either TopTransactionContext or the context
2410  * for the subtransaction to which it corresponds. So, there's nothing
2411  * to do here except rest the pointers; the space will be reclaimed when
2412  * the contexts are deleted.
2413  */
2414  pendingActions = NULL;
2415  pendingNotifies = NULL;
2416 }
static ActionList * pendingActions
Definition: async.c:354
static NotificationList * pendingNotifies
Definition: async.c:406

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1134 of file async.c.

References IsListeningOn(), lappend(), MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

1135 {
1136  MemoryContext oldcontext;
1137 
1138  /* Do nothing if we are already listening on this channel */
1139  if (IsListeningOn(channel))
1140  return;
1141 
1142  /*
1143  * Add the new channel name to listenChannels.
1144  *
1145  * XXX It is theoretically possible to get an out-of-memory failure here,
1146  * which would be bad because we already committed. For the moment it
1147  * doesn't seem worth trying to guard against that, but maybe improve this
1148  * later.
1149  */
1152  MemoryContextSwitchTo(oldcontext);
1153 }
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:322
static bool IsListeningOn(const char *channel)
Definition: async.c:1289
MemoryContext TopMemoryContext
Definition: mcxt.c:44
List * lappend(List *list, void *datum)
Definition: list.c:322

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1037 of file async.c.

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

1038 {
1039  QueuePosition head;
1040  QueuePosition max;
1041  BackendId prevListener;
1042 
1043  /*
1044  * Nothing to do if we are already listening to something, nor if we
1045  * already ran this routine in this transaction.
1046  */
1048  return;
1049 
1050  if (Trace_notify)
1051  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1052 
1053  /*
1054  * Before registering, make sure we will unlisten before dying. (Note:
1055  * this action does not get undone if we abort later.)
1056  */
1058  {
1060  unlistenExitRegistered = true;
1061  }
1062 
1063  /*
1064  * This is our first LISTEN, so establish our pointer.
1065  *
1066  * We set our pointer to the global tail pointer and then move it forward
1067  * over already-committed notifications. This ensures we cannot miss any
1068  * not-yet-committed notifications. We might get a few more but that
1069  * doesn't hurt.
1070  *
1071  * In some scenarios there might be a lot of committed notifications that
1072  * have not yet been pruned away (because some backend is being lazy about
1073  * reading them). To reduce our startup time, we can look at other
1074  * backends and adopt the maximum "pos" pointer of any backend that's in
1075  * our database; any notifications it's already advanced over are surely
1076  * committed and need not be re-examined by us. (We must consider only
1077  * backends connected to our DB, because others will not have bothered to
1078  * check committed-ness of notifications in our DB.)
1079  *
1080  * We need exclusive lock here so we can look at other backends' entries
1081  * and manipulate the list links.
1082  */
1083  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1084  head = QUEUE_HEAD;
1085  max = QUEUE_TAIL;
1086  prevListener = InvalidBackendId;
1088  {
1090  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1091  /* Also find last listening backend before this one */
1092  if (i < MyBackendId)
1093  prevListener = i;
1094  }
1098  /* Insert backend into list of listeners at correct position */
1099  if (prevListener > 0)
1100  {
1102  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1103  }
1104  else
1105  {
1108  }
1109  LWLockRelease(AsyncQueueLock);
1110 
1111  /* Now we are listed in the global array, so remember we're listening */
1112  amRegisteredListener = true;
1113 
1114  /*
1115  * Try to move our pointer forward as far as possible. This will skip over
1116  * already-committed notifications. Still, we could get notifications that
1117  * have already committed before we started to LISTEN.
1118  *
1119  * Note that we are not yet listening on anything, so we won't deliver any
1120  * notification to the frontend. Also, although our transaction might
1121  * have executed NOTIFY, those message(s) aren't queued yet so we can't
1122  * see them in the queue.
1123  */
1124  if (!QUEUE_POS_EQUAL(max, head))
1126 }
#define QUEUE_TAIL
Definition: async.c:282
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
#define QUEUE_HEAD
Definition: async.c:281
static bool unlistenExitRegistered
Definition: async.c:418
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:847
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:421
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1909
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MAX(x, y)
Definition: async.c:212

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1192 of file async.c.

References DEBUG1, elog, list_free_deep(), MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

1193 {
1194  if (Trace_notify)
1195  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1196 
1198  listenChannels = NIL;
1199 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:322
void list_free_deep(List *list)
Definition: list.c:1391
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1161 of file async.c.

References DEBUG1, elog, foreach_delete_current, lfirst, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

1162 {
1163  ListCell *q;
1164 
1165  if (Trace_notify)
1166  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1167 
1168  foreach(q, listenChannels)
1169  {
1170  char *lchan = (char *) lfirst(q);
1171 
1172  if (strcmp(lchan, channel) == 0)
1173  {
1175  pfree(lchan);
1176  break;
1177  }
1178  }
1179 
1180  /*
1181  * We do not complain about unlistening something not being listened;
1182  * should we?
1183  */
1184 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:322
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:368
void pfree(void *pointer)
Definition: mcxt.c:1056
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1869 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1870 {
1871  /*
1872  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1873  * you do here.
1874  */
1875 
1876  /* signal that work needs to be done */
1877  notifyInterruptPending = true;
1878 
1879  /* make sure the event is processed in due course */
1880  SetLatch(MyLatch);
1881 }
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1289 of file async.c.

References lfirst.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

1290 {
1291  ListCell *p;
1292 
1293  foreach(p, listenChannels)
1294  {
1295  char *lchan = (char *) lfirst(p);
1296 
1297  if (strcmp(lchan, channel) == 0)
1298  return true;
1299  }
1300  return false;
1301 }
static List * listenChannels
Definition: async.c:322
#define lfirst(lc)
Definition: pg_list.h:190

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2376 of file async.c.

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32, hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2377 {
2378  const Notification *k = *(const Notification *const *) key;
2379 
2380  Assert(keysize == sizeof(Notification *));
2381  /* We don't bother to include the payload's trailing null in the hash */
2382  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2383  k->channel_len + k->payload_len + 1));
2384 }
#define DatumGetUInt32(X)
Definition: postgres.h:486
Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.c:148
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
#define Assert(condition)
Definition: c.h:732
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2390 of file async.c.

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2391 {
2392  const Notification *k1 = *(const Notification *const *) key1;
2393  const Notification *k2 = *(const Notification *const *) key2;
2394 
2395  Assert(keysize == sizeof(Notification *));
2396  if (k1->channel_len == k2->channel_len &&
2397  k1->payload_len == k2->payload_len &&
2398  memcmp(k1->data, k2->data,
2399  k1->channel_len + k1->payload_len + 2) == 0)
2400  return 0; /* equal */
2401  return 1; /* not equal */
2402 }
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
#define Assert(condition)
Definition: c.h:732
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2246 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2247 {
2249  {
2251 
2252  pq_beginmessage(&buf, 'A');
2253  pq_sendint32(&buf, srcPid);
2254  pq_sendstring(&buf, channel);
2256  pq_sendstring(&buf, payload);
2257  pq_endmessage(&buf);
2258 
2259  /*
2260  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2261  * happen at the end of the transaction, and for incoming notifies
2262  * ProcessIncomingNotify will do it after finding all the notifies.
2263  */
2264  }
2265  else
2266  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2267 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:226
CommandDest whereToSendOutput
Definition: postgres.c:90
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 814 of file async.c.

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

815 {
816  FuncCallContext *funcctx;
817 
818  /* stuff done only on the first call of the function */
819  if (SRF_IS_FIRSTCALL())
820  {
821  /* create a function context for cross-call persistence */
822  funcctx = SRF_FIRSTCALL_INIT();
823  }
824 
825  /* stuff done on every call of the function */
826  funcctx = SRF_PERCALL_SETUP();
827 
828  if (funcctx->call_cntr < list_length(listenChannels))
829  {
830  char *channel = (char *) list_nth(listenChannels,
831  funcctx->call_cntr);
832 
833  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
834  }
835 
836  SRF_RETURN_DONE(funcctx);
837 }
uint64 call_cntr
Definition: funcapi.h:66
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:283
static List * listenChannels
Definition: async.c:322
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:287
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:289
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
static int list_length(const List *l)
Definition: pg_list.h:169
#define CStringGetTextDatum(s)
Definition: builtins.h:83
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:307
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:285

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1553 of file async.c.

References asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

1554 {
1555  double usage;
1556 
1557  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1558  usage = asyncQueueUsage();
1559  LWLockRelease(AsyncQueueLock);
1560 
1561  PG_RETURN_FLOAT8(usage);
1562 }
static void usage(void)
Definition: pg_standby.c:593
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:356
static double asyncQueueUsage(void)
Definition: async.c:1570
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 581 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

582 {
583  const char *channel;
584  const char *payload;
585 
586  if (PG_ARGISNULL(0))
587  channel = "";
588  else
589  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
590 
591  if (PG_ARGISNULL(1))
592  payload = "";
593  else
594  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
595 
596  /* For NOTIFY as a statement, this is checked in ProcessUtility */
598 
599  Async_Notify(channel, payload);
600 
601  PG_RETURN_VOID();
602 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:303
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:276
#define PG_RETURN_VOID()
Definition: fmgr.h:339
#define PG_ARGISNULL(n)
Definition: fmgr.h:204
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:615
char * text_to_cstring(const text *t)
Definition: varlena.c:204

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 885 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

886 {
887  ListCell *p;
888 
890  return; /* no relevant statements in this xact */
891 
892  if (Trace_notify)
893  elog(DEBUG1, "PreCommit_Notify");
894 
895  /* Preflight for any pending listen/unlisten actions */
896  if (pendingActions != NULL)
897  {
898  foreach(p, pendingActions->actions)
899  {
900  ListenAction *actrec = (ListenAction *) lfirst(p);
901 
902  switch (actrec->action)
903  {
904  case LISTEN_LISTEN:
906  break;
907  case LISTEN_UNLISTEN:
908  /* there is no Exec_UnlistenPreCommit() */
909  break;
910  case LISTEN_UNLISTEN_ALL:
911  /* there is no Exec_UnlistenAllPreCommit() */
912  break;
913  }
914  }
915  }
916 
917  /* Queue any pending notifies (must happen after the above) */
918  if (pendingNotifies)
919  {
920  ListCell *nextNotify;
921 
922  /*
923  * Make sure that we have an XID assigned to the current transaction.
924  * GetCurrentTransactionId is cheap if we already have an XID, but not
925  * so cheap if we don't, and we'd prefer not to do that work while
926  * holding AsyncQueueLock.
927  */
928  (void) GetCurrentTransactionId();
929 
930  /*
931  * Serialize writers by acquiring a special lock that we hold till
932  * after commit. This ensures that queue entries appear in commit
933  * order, and in particular that there are never uncommitted queue
934  * entries ahead of committed ones, so an uncommitted transaction
935  * can't block delivery of deliverable notifications.
936  *
937  * We use a heavyweight lock so that it'll automatically be released
938  * after either commit or abort. This also allows deadlocks to be
939  * detected, though really a deadlock shouldn't be possible here.
940  *
941  * The lock is on "database 0", which is pretty ugly but it doesn't
942  * seem worth inventing a special locktag category just for this.
943  * (Historical note: before PG 9.0, a similar lock on "database 0" was
944  * used by the flatfiles mechanism.)
945  */
946  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
948 
949  /* Now push the notifications into the queue */
951 
952  nextNotify = list_head(pendingNotifies->events);
953  while (nextNotify != NULL)
954  {
955  /*
956  * Add the pending notifications to the queue. We acquire and
957  * release AsyncQueueLock once per page, which might be overkill
958  * but it does allow readers to get in while we're doing this.
959  *
960  * A full queue is very uncommon and should really not happen,
961  * given that we have so much space available in the SLRU pages.
962  * Nevertheless we need to deal with this possibility. Note that
963  * when we get here we are in the process of committing our
964  * transaction, but we have not yet committed to clog, so at this
965  * point in time we can still roll the transaction back.
966  */
967  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
969  if (asyncQueueIsFull())
970  ereport(ERROR,
971  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
972  errmsg("too many notifications in the NOTIFY queue")));
973  nextNotify = asyncQueueAddEntries(nextNotify);
974  LWLockRelease(AsyncQueueLock);
975  }
976  }
977 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:394
static ActionList * pendingActions
Definition: async.c:354
int errcode(int sqlerrcode)
Definition: elog.c:570
static bool asyncQueueIsFull(void)
Definition: async.c:1349
static void asyncQueueFillWarning(void)
Definition: async.c:1599
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
static bool backendHasSentNotifications
Definition: async.c:424
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
List * actions
Definition: async.c:350
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
#define ereport(elevel, rest)
Definition: elog.h:141
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1451
#define lfirst(lc)
Definition: pg_list.h:190
static void Exec_ListenPreCommit(void)
Definition: async.c:1037
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:430
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
ListenActionKind action
Definition: async.c:343

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1225 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1226 {
1227  MemoryContext caller_context;
1228 
1229  /* Nothing to do if we didn't send any notifications */
1231  return;
1232 
1233  /*
1234  * We reset the flag immediately; otherwise, if any sort of error occurs
1235  * below, we'd be locked up in an infinite loop, because control will come
1236  * right back here after error cleanup.
1237  */
1239 
1240  /*
1241  * We must preserve the caller's memory context (probably MessageContext)
1242  * across the transaction we do here.
1243  */
1244  caller_context = CurrentMemoryContext;
1245 
1246  if (Trace_notify)
1247  elog(DEBUG1, "ProcessCompletedNotifies");
1248 
1249  /*
1250  * We must run asyncQueueReadAllNotifications inside a transaction, else
1251  * bad things happen if it gets an error.
1252  */
1254 
1255  /* Send signals to other backends */
1256  SignalBackends();
1257 
1258  if (listenChannels != NIL)
1259  {
1260  /* Read the queue ourselves, and send relevant stuff to the frontend */
1262  }
1263 
1264  /*
1265  * If it's time to try to advance the global tail pointer, do that.
1266  */
1268  {
1269  backendTryAdvanceTail = false;
1271  }
1272 
1274 
1275  MemoryContextSwitchTo(caller_context);
1276 
1277  /* We don't need pq_flush() here since postgres.c will do one shortly */
1278 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2895
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:322
static bool backendHasSentNotifications
Definition: async.c:424
static bool backendTryAdvanceTail
Definition: async.c:427
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2157
static void SignalBackends(void)
Definition: async.c:1652
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1909
void StartTransactionCommand(void)
Definition: xact.c:2794
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( void  )
static

Definition at line 2207 of file async.c.

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

2208 {
2209  /* We *must* reset the flag */
2210  notifyInterruptPending = false;
2211 
2212  /* Do nothing else if we aren't actively listening */
2213  if (listenChannels == NIL)
2214  return;
2215 
2216  if (Trace_notify)
2217  elog(DEBUG1, "ProcessIncomingNotify");
2218 
2219  set_ps_display("notify interrupt", false);
2220 
2221  /*
2222  * We must run asyncQueueReadAllNotifications inside a transaction, else
2223  * bad things happen if it gets an error.
2224  */
2226 
2228 
2230 
2231  /*
2232  * Must flush the notify messages to ensure frontend gets them promptly.
2233  */
2234  pq_flush();
2235 
2236  set_ps_display("idle", false);
2237 
2238  if (Trace_notify)
2239  elog(DEBUG1, "ProcessIncomingNotify: done");
2240 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
#define pq_flush()
Definition: libpq.h:39
void CommitTransactionCommand(void)
Definition: xact.c:2895
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
static List * listenChannels
Definition: async.c:322
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1909
void StartTransactionCommand(void)
Definition: xact.c:2794
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:226
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1893 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by ProcessClientReadInterrupt().

1894 {
1896  return; /* not really idle */
1897 
1898  while (notifyInterruptPending)
1900 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4651
static void ProcessIncomingNotify(void)
Definition: async.c:2207
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 714 of file async.c.

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, offsetof, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

715 {
716  MemoryContext oldcontext;
717  ListenAction *actrec;
718  int my_level = GetCurrentTransactionNestLevel();
719 
720  /*
721  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
722  * be too complicated to ensure we get the right interactions of
723  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
724  * would be any performance benefit anyway in sane applications.
725  */
727 
728  /* space for terminating null is included in sizeof(ListenAction) */
729  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
730  strlen(channel) + 1);
731  actrec->action = action;
732  strcpy(actrec->channel, channel);
733 
734  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
735  {
736  ActionList *actions;
737 
738  /*
739  * First action in current sub(xact). Note that we allocate the
740  * ActionList in TopTransactionContext; the nestingLevel might get
741  * changed later by AtSubCommit_Notify.
742  */
743  actions = (ActionList *)
745  actions->nestingLevel = my_level;
746  actions->actions = list_make1(actrec);
747  actions->upper = pendingActions;
748  pendingActions = actions;
749  }
750  else
752 
753  MemoryContextSwitchTo(oldcontext);
754 }
MemoryContext TopTransactionContext
Definition: mcxt.c:49
static ActionList * pendingActions
Definition: async.c:354
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define list_make1(x1)
Definition: pg_list.h:227
List * actions
Definition: async.c:350
List * lappend(List *list, void *datum)
Definition: list.c:322
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
void * palloc(Size size)
Definition: mcxt.c:949
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:344
ListenActionKind action
Definition: async.c:343
#define offsetof(type, field)
Definition: c.h:655

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1652 of file async.c.

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by ProcessCompletedNotifies().

1653 {
1654  int32 *pids;
1655  BackendId *ids;
1656  int count;
1657 
1658  /*
1659  * Identify backends that we need to signal. We don't want to send
1660  * signals while holding the AsyncQueueLock, so this loop just builds a
1661  * list of target PIDs.
1662  *
1663  * XXX in principle these pallocs could fail, which would be bad. Maybe
1664  * preallocate the arrays? But in practice this is only run in trivial
1665  * transactions, so there should surely be space available.
1666  */
1667  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1668  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1669  count = 0;
1670 
1671  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1673  {
1674  int32 pid = QUEUE_BACKEND_PID(i);
1675  QueuePosition pos;
1676 
1677  Assert(pid != InvalidPid);
1678  if (pid == MyProcPid)
1679  continue; /* never signal self */
1680  pos = QUEUE_BACKEND_POS(i);
1682  {
1683  /*
1684  * Always signal listeners in our own database, unless they're
1685  * already caught up (unlikely, but possible).
1686  */
1687  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1688  continue;
1689  }
1690  else
1691  {
1692  /*
1693  * Listeners in other databases should be signaled only if they
1694  * are far behind.
1695  */
1698  continue;
1699  }
1700  /* OK, need to signal this one */
1701  pids[count] = pid;
1702  ids[count] = i;
1703  count++;
1704  }
1705  LWLockRelease(AsyncQueueLock);
1706 
1707  /* Now send signals */
1708  for (int i = 0; i < count; i++)
1709  {
1710  int32 pid = pids[i];
1711 
1712  /*
1713  * Note: assuming things aren't broken, a signal failure here could
1714  * only occur if the target backend exited since we released
1715  * AsyncQueueLock; which is unlikely but certainly possible. So we
1716  * just log a low-level debug message if it happens.
1717  */
1718  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1719  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1720  }
1721 
1722  pfree(pids);
1723  pfree(ids);
1724 }
int MyProcPid
Definition: globals.c:40
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
#define DEBUG3
Definition: elog.h:23
#define QUEUE_HEAD
Definition: async.c:281
signed int int32
Definition: c.h:346
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:468
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
void pfree(void *pointer)
Definition: mcxt.c:1056
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
int MaxBackends
Definition: globals.c:135
#define QUEUE_CLEANUP_DELAY
Definition: async.c:228
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
void * palloc(Size size)
Definition: mcxt.c:949
#define elog(elevel,...)
Definition: elog.h:226
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define InvalidPid
Definition: miscadmin.h:32

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ AsyncCtlData

SlruCtlData AsyncCtlData
static

Definition at line 292 of file async.c.

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 279 of file async.c.

◆ backendHasSentNotifications

bool backendHasSentNotifications = false
static

Definition at line 424 of file async.c.

Referenced by PreCommit_Notify(), and ProcessCompletedNotifies().

◆ backendTryAdvanceTail

bool backendTryAdvanceTail = false
static

Definition at line 427 of file async.c.

Referenced by asyncQueueAddEntries(), and ProcessCompletedNotifies().

◆ listenChannels

List* listenChannels = NIL
static

Definition at line 322 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

ActionList* pendingActions = NULL
static

Definition at line 354 of file async.c.

Referenced by AtSubAbort_Notify(), AtSubCommit_Notify(), and queue_listen().

◆ pendingNotifies

NotificationList* pendingNotifies = NULL
static

Definition at line 406 of file async.c.

Referenced by Async_Notify(), AtSubAbort_Notify(), and AtSubCommit_Notify().

◆ Trace_notify

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 418 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().