PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/hashutils.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)   ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define AsyncCtl   (&AsyncCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL }
 

Functions

static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static bool SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (void)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void ProcessCompletedNotifies (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubStart_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData AsyncCtlData
 
static ListlistenChannels = NIL
 
static ListpendingActions = NIL
 
static ListupperPendingActions = NIL
 
static NotificationListpendingNotifies = NULL
 
static ListupperPendingNotifies = NIL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool backendHasSentNotifications = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncCtl

#define AsyncCtl   (&AsyncCtlData)

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 179 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 366 of file async.c.

Referenced by AddEventToPendingNotifies().

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 153 of file async.c.

Referenced by Async_Notify(), and asyncQueueNotificationToEntry().

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 260 of file async.c.

Referenced by asyncQueueUnregister(), AsyncShmemInit(), and Exec_ListenPreCommit().

◆ QUEUE_BACKEND_PID

◆ QUEUE_BACKEND_POS

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 270 of file async.c.

Referenced by asyncQueueFillWarning().

◆ QUEUE_HEAD

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
 
)    ((x).page == (y).page && (x).offset == (y).offset)

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:432

Definition at line 209 of file async.c.

Referenced by Exec_ListenPreCommit().

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:432

Definition at line 203 of file async.c.

Referenced by asyncQueueAdvanceTail(), and asyncQueueFillWarning().

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

◆ QUEUE_POS_PAGE

◆ QUEUE_TAIL

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 177 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 193 of file async.c.

Referenced by asyncQueueAdvance(), and AsyncShmemInit().

Typedef Documentation

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 308 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2220 of file async.c.

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MemSet, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), and notification_match().

Referenced by Async_Notify(), and AtSubCommit_Notify().

2221 {
2223 
2224  /* Create the hash table if it's time to */
2226  pendingNotifies->hashtab == NULL)
2227  {
2228  HASHCTL hash_ctl;
2229  ListCell *l;
2230 
2231  /* Create the hash table */
2232  MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2233  hash_ctl.keysize = sizeof(Notification *);
2234  hash_ctl.entrysize = sizeof(NotificationHash);
2235  hash_ctl.hash = notification_hash;
2236  hash_ctl.match = notification_match;
2237  hash_ctl.hcxt = CurTransactionContext;
2239  hash_create("Pending Notifies",
2240  256L,
2241  &hash_ctl,
2243 
2244  /* Insert all the already-existing events */
2245  foreach(l, pendingNotifies->events)
2246  {
2247  Notification *oldn = (Notification *) lfirst(l);
2248  NotificationHash *hentry;
2249  bool found;
2250 
2252  &oldn,
2253  HASH_ENTER,
2254  &found);
2255  Assert(!found);
2256  hentry->event = oldn;
2257  }
2258  }
2259 
2260  /* Add new event to the list, in order */
2262 
2263  /* Add event to the hash table if needed */
2264  if (pendingNotifies->hashtab != NULL)
2265  {
2266  NotificationHash *hentry;
2267  bool found;
2268 
2270  &n,
2271  HASH_ENTER,
2272  &found);
2273  Assert(!found);
2274  hentry->event = n;
2275  }
2276 }
#define NIL
Definition: pg_list.h:65
struct NotificationHash NotificationHash
List * events
Definition: async.c:362
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define MemSet(start, val, len)
Definition: c.h:955
Notification * event
Definition: async.c:370
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static NotificationList * pendingNotifies
Definition: async.c:373
List * lappend(List *list, void *datum)
Definition: list.c:321
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:366
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
HashCompareFunc match
Definition: hsearch.h:75
HTAB * hashtab
Definition: async.c:363
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2298
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2284
#define HASH_COMPARE
Definition: hsearch.h:90
static int list_length(const List *l)
Definition: pg_list.h:169
HashValueFunc hash
Definition: hsearch.h:74
#define HASH_FUNCTION
Definition: hsearch.h:89

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 689 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

690 {
691  if (Trace_notify)
692  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
693 
694  queue_listen(LISTEN_LISTEN, channel);
695 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:659
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 572 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextSwitchTo(), NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pfree(), and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

573 {
574  size_t channel_len;
575  size_t payload_len;
576  Notification *n;
577  MemoryContext oldcontext;
578 
579  if (IsParallelWorker())
580  elog(ERROR, "cannot send notifications from a parallel worker");
581 
582  if (Trace_notify)
583  elog(DEBUG1, "Async_Notify(%s)", channel);
584 
585  channel_len = channel ? strlen(channel) : 0;
586  payload_len = payload ? strlen(payload) : 0;
587 
588  /* a channel name must be specified */
589  if (channel_len == 0)
590  ereport(ERROR,
591  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
592  errmsg("channel name cannot be empty")));
593 
594  /* enforce length limits */
595  if (channel_len >= NAMEDATALEN)
596  ereport(ERROR,
597  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
598  errmsg("channel name too long")));
599 
600  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
601  ereport(ERROR,
602  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
603  errmsg("payload string too long")));
604 
605  /*
606  * We must construct the Notification entry, even if we end up not using
607  * it, in order to compare it cheaply to existing list entries.
608  *
609  * The notification list needs to live until end of transaction, so store
610  * it in the transaction context.
611  */
613 
614  n = (Notification *) palloc(offsetof(Notification, data) +
615  channel_len + payload_len + 2);
616  n->channel_len = channel_len;
617  n->payload_len = payload_len;
618  strcpy(n->data, channel);
619  if (payload)
620  strcpy(n->data + channel_len + 1, payload);
621  else
622  n->data[channel_len + 1] = '\0';
623 
624  /* Now check for duplicates */
626  {
627  /* It's a dup, so forget it */
628  pfree(n);
629  MemoryContextSwitchTo(oldcontext);
630  return;
631  }
632 
633  if (pendingNotifies == NULL)
634  {
635  /* First notify event in current (sub)xact */
638  /* We certainly don't need a hashtable yet */
639  pendingNotifies->hashtab = NULL;
640  }
641  else
642  {
643  /* Append more events to existing list */
645  }
646 
647  MemoryContextSwitchTo(oldcontext);
648 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:362
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:357
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:570
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2179
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1031
static NotificationList * pendingNotifies
Definition: async.c:373
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2220
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:153
#define ereport(elevel, rest)
Definition: elog.h:141
#define IsParallelWorker()
Definition: parallel.h:60
HTAB * hashtab
Definition: async.c:363
uint16 channel_len
Definition: async.c:354
uint16 payload_len
Definition: async.c:355
bool Trace_notify
Definition: async.c:396
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define offsetof(type, field)
Definition: c.h:655

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 703 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

704 {
705  if (Trace_notify)
706  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
707 
708  /* If we couldn't possibly be listening, no need to queue anything */
710  return;
711 
712  queue_listen(LISTEN_UNLISTEN, channel);
713 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:659
static bool unlistenExitRegistered
Definition: async.c:387
static List * pendingActions
Definition: async.c:321
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 721 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

722 {
723  if (Trace_notify)
724  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
725 
726  /* If we couldn't possibly be listening, no need to queue anything */
728  return;
729 
731 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:659
static bool unlistenExitRegistered
Definition: async.c:387
static List * pendingActions
Definition: async.c:321
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 774 of file async.c.

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

775 {
778 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1104
static void asyncQueueUnregister(void)
Definition: async.c:1222

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2179 of file async.c.

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, and Notification::payload_len.

Referenced by Async_Notify(), and AtSubCommit_Notify().

2180 {
2181  if (pendingNotifies == NULL)
2182  return false;
2183 
2184  if (pendingNotifies->hashtab != NULL)
2185  {
2186  /* Use the hash table to probe for a match */
2188  &n,
2189  HASH_FIND,
2190  NULL))
2191  return true;
2192  }
2193  else
2194  {
2195  /* Must scan the event list */
2196  ListCell *l;
2197 
2198  foreach(l, pendingNotifies->events)
2199  {
2200  Notification *oldn = (Notification *) lfirst(l);
2201 
2202  if (n->channel_len == oldn->channel_len &&
2203  n->payload_len == oldn->payload_len &&
2204  memcmp(n->data, oldn->data,
2205  n->channel_len + n->payload_len + 2) == 0)
2206  return true;
2207  }
2208  }
2209 
2210  return false;
2211 }
List * events
Definition: async.c:362
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:357
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static NotificationList * pendingNotifies
Definition: async.c:373
HTAB * hashtab
Definition: async.c:363
#define lfirst(lc)
Definition: pg_list.h:190
uint16 channel_len
Definition: async.c:354
uint16 payload_len
Definition: async.c:355

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1356 of file async.c.

References AsyncCtl, asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), and SimpleLruZeroPage().

Referenced by PreCommit_Notify().

1357 {
1358  AsyncQueueEntry qe;
1359  QueuePosition queue_head;
1360  int pageno;
1361  int offset;
1362  int slotno;
1363 
1364  /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
1365  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
1366 
1367  /*
1368  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1369  * memory upon exiting. The reason for this is that if we have to advance
1370  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1371  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1372  * subsequent insertions would try to put entries into a page that slru.c
1373  * thinks doesn't exist yet.) So, use a local position variable. Note
1374  * that if we do fail, any already-inserted queue entries are forgotten;
1375  * this is okay, since they'd be useless anyway after our transaction
1376  * rolls back.
1377  */
1378  queue_head = QUEUE_HEAD;
1379 
1380  /* Fetch the current page */
1381  pageno = QUEUE_POS_PAGE(queue_head);
1382  slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
1383  /* Note we mark the page dirty before writing in it */
1384  AsyncCtl->shared->page_dirty[slotno] = true;
1385 
1386  while (nextNotify != NULL)
1387  {
1388  Notification *n = (Notification *) lfirst(nextNotify);
1389 
1390  /* Construct a valid queue entry in local variable qe */
1392 
1393  offset = QUEUE_POS_OFFSET(queue_head);
1394 
1395  /* Check whether the entry really fits on the current page */
1396  if (offset + qe.length <= QUEUE_PAGESIZE)
1397  {
1398  /* OK, so advance nextNotify past this item */
1399  nextNotify = lnext(pendingNotifies->events, nextNotify);
1400  }
1401  else
1402  {
1403  /*
1404  * Write a dummy entry to fill up the page. Actually readers will
1405  * only check dboid and since it won't match any reader's database
1406  * OID, they will ignore this entry and move on.
1407  */
1408  qe.length = QUEUE_PAGESIZE - offset;
1409  qe.dboid = InvalidOid;
1410  qe.data[0] = '\0'; /* empty channel */
1411  qe.data[1] = '\0'; /* empty payload */
1412  }
1413 
1414  /* Now copy qe into the shared buffer page */
1415  memcpy(AsyncCtl->shared->page_buffer[slotno] + offset,
1416  &qe,
1417  qe.length);
1418 
1419  /* Advance queue_head appropriately, and detect if page is full */
1420  if (asyncQueueAdvance(&(queue_head), qe.length))
1421  {
1422  /*
1423  * Page is full, so we're done here, but first fill the next page
1424  * with zeroes. The reason to do this is to ensure that slru.c's
1425  * idea of the head page is always the same as ours, which avoids
1426  * boundary problems in SimpleLruTruncate. The test in
1427  * asyncQueueIsFull() ensured that there is room to create this
1428  * page without overrunning the queue.
1429  */
1430  slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
1431  /* And exit the loop */
1432  break;
1433  }
1434  }
1435 
1436  /* Success, so update the global QUEUE_HEAD */
1437  QUEUE_HEAD = queue_head;
1438 
1439  LWLockRelease(AsyncCtlLock);
1440 
1441  return nextNotify;
1442 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:173
List * events
Definition: async.c:362
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:321
#define AsyncCtl
Definition: async.c:268
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1320
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1285
#define QUEUE_HEAD
Definition: async.c:257
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_POS_OFFSET(x)
Definition: async.c:191
static NotificationList * pendingNotifies
Definition: async.c:373
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:375
#define QUEUE_PAGESIZE
Definition: async.c:269
#define InvalidTransactionId
Definition: transam.h:31
#define InvalidOid
Definition: postgres_ext.h:36
#define lfirst(lc)
Definition: pg_list.h:190
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:263
#define QUEUE_POS_PAGE(x)
Definition: async.c:190

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1285 of file async.c.

References Assert, AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

1286 {
1287  int pageno = QUEUE_POS_PAGE(*position);
1288  int offset = QUEUE_POS_OFFSET(*position);
1289  bool pageJump = false;
1290 
1291  /*
1292  * Move to the next writing position: First jump over what we have just
1293  * written or read.
1294  */
1295  offset += entryLength;
1296  Assert(offset <= QUEUE_PAGESIZE);
1297 
1298  /*
1299  * In a second step check if another entry can possibly be written to the
1300  * page. If so, stay here, we have reached the next position. If not, then
1301  * we need to move on to the next page.
1302  */
1304  {
1305  pageno++;
1306  if (pageno > QUEUE_MAX_PAGE)
1307  pageno = 0; /* wrap around */
1308  offset = 0;
1309  pageJump = true;
1310  }
1311 
1312  SET_QUEUE_POS(*position, pageno, offset);
1313  return pageJump;
1314 }
#define QUEUE_POS_OFFSET(x)
Definition: async.c:191
#define AsyncQueueEntryEmptySize
Definition: async.c:179
#define QUEUE_PAGESIZE
Definition: async.c:269
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:193
#define QUEUEALIGN(len)
Definition: async.c:177
#define Assert(condition)
Definition: c.h:732
#define QUEUE_MAX_PAGE
Definition: async.c:289
#define QUEUE_POS_PAGE(x)
Definition: async.c:190

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2064 of file async.c.

References AsyncCtl, asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by asyncQueueReadAllNotifications(), asyncQueueUnregister(), and ProcessCompletedNotifies().

2065 {
2066  QueuePosition min;
2067  int i;
2068  int oldtailpage;
2069  int newtailpage;
2070  int boundary;
2071 
2072  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
2073  min = QUEUE_HEAD;
2074  for (i = 1; i <= MaxBackends; i++)
2075  {
2076  if (QUEUE_BACKEND_PID(i) != InvalidPid)
2077  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2078  }
2079  oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
2080  QUEUE_TAIL = min;
2081  LWLockRelease(AsyncQueueLock);
2082 
2083  /*
2084  * We can truncate something if the global tail advanced across an SLRU
2085  * segment boundary.
2086  *
2087  * XXX it might be better to truncate only once every several segments, to
2088  * reduce the number of directory scans.
2089  */
2090  newtailpage = QUEUE_POS_PAGE(min);
2091  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2092  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2093  {
2094  /*
2095  * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
2096  * the lock again.
2097  */
2098  SimpleLruTruncate(AsyncCtl, newtailpage);
2099  }
2100 }
#define QUEUE_TAIL
Definition: async.c:258
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1174
#define AsyncCtl
Definition: async.c:268
#define QUEUE_HEAD
Definition: async.c:257
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
int MaxBackends
Definition: globals.c:135
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:432
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
#define QUEUE_POS_PAGE(x)
Definition: async.c:190
#define QUEUE_POS_MIN(x, y)
Definition: async.c:203
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1495 of file async.c.

References asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, MaxBackends, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

1496 {
1497  double fillDegree;
1498  TimestampTz t;
1499 
1500  fillDegree = asyncQueueUsage();
1501  if (fillDegree < 0.5)
1502  return;
1503 
1504  t = GetCurrentTimestamp();
1505 
1508  {
1509  QueuePosition min = QUEUE_HEAD;
1510  int32 minPid = InvalidPid;
1511  int i;
1512 
1513  for (i = 1; i <= MaxBackends; i++)
1514  {
1515  if (QUEUE_BACKEND_PID(i) != InvalidPid)
1516  {
1517  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1518  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1519  minPid = QUEUE_BACKEND_PID(i);
1520  }
1521  }
1522 
1523  ereport(WARNING,
1524  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1525  (minPid != InvalidPid ?
1526  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1527  : 0),
1528  (minPid != InvalidPid ?
1529  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1530  : 0)));
1531 
1533  }
1534 }
int errhint(const char *fmt,...)
Definition: elog.c:974
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
static double asyncQueueUsage(void)
Definition: async.c:1466
#define QUEUE_HEAD
Definition: async.c:257
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1668
signed int int32
Definition: c.h:346
int MaxBackends
Definition: globals.c:135
int errdetail(const char *fmt,...)
Definition: elog.c:860
static AsyncQueueControl * asyncQueueControl
Definition: async.c:255
#define ereport(elevel, rest)
Definition: elog.h:141
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:270
#define WARNING
Definition: elog.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:784
int i
TimestampTz lastQueueFillWarn
Definition: async.c:250
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:199
#define QUEUE_POS_MIN(x, y)
Definition: async.c:203
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1254 of file async.c.

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_TAIL, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

1255 {
1256  int nexthead;
1257  int boundary;
1258 
1259  /*
1260  * The queue is full if creating a new head page would create a page that
1261  * logically precedes the current global tail pointer, ie, the head
1262  * pointer would wrap around compared to the tail. We cannot create such
1263  * a head page for fear of confusing slru.c. For safety we round the tail
1264  * pointer back to a segment boundary (compare the truncation logic in
1265  * asyncQueueAdvanceTail).
1266  *
1267  * Note that this test is *not* dependent on how much space there is on
1268  * the current head page. This is necessary because asyncQueueAddEntries
1269  * might try to create the next head page in any case.
1270  */
1271  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1272  if (nexthead > QUEUE_MAX_PAGE)
1273  nexthead = 0; /* wrap around */
1274  boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
1275  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1276  return asyncQueuePagePrecedes(nexthead, boundary);
1277 }
#define QUEUE_TAIL
Definition: async.c:258
#define QUEUE_HEAD
Definition: async.c:257
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:432
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:33
#define QUEUE_MAX_PAGE
Definition: async.c:289
#define QUEUE_POS_PAGE(x)
Definition: async.c:190

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1320 of file async.c.

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, AsyncQueueEntry::data, Notification::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

1321 {
1322  size_t channellen = n->channel_len;
1323  size_t payloadlen = n->payload_len;
1324  int entryLength;
1325 
1326  Assert(channellen < NAMEDATALEN);
1327  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1328 
1329  /* The terminators are already included in AsyncQueueEntryEmptySize */
1330  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1331  entryLength = QUEUEALIGN(entryLength);
1332  qe->length = entryLength;
1333  qe->dboid = MyDatabaseId;
1334  qe->xid = GetCurrentTransactionId();
1335  qe->srcPid = MyProcPid;
1336  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1337 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:173
int MyProcPid
Definition: globals.c:40
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:357
#define NAMEDATALEN
#define AsyncQueueEntryEmptySize
Definition: async.c:179
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:153
#define QUEUEALIGN(len)
Definition: async.c:177
Oid MyDatabaseId
Definition: globals.c:85
#define Assert(condition)
Definition: c.h:732
uint16 channel_len
Definition: async.c:354
uint16 payload_len
Definition: async.c:355
TransactionId xid
Definition: async.c:171
int32 srcPid
Definition: async.c:172

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 432 of file async.c.

References Assert, and QUEUE_MAX_PAGE.

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

433 {
434  int diff;
435 
436  /*
437  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
438  * in the range 0..QUEUE_MAX_PAGE.
439  */
440  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
441  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
442 
443  diff = p - q;
444  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
445  diff -= QUEUE_MAX_PAGE + 1;
446  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
447  diff += QUEUE_MAX_PAGE + 1;
448  return diff < 0;
449 }
#define Assert(condition)
Definition: c.h:732
#define QUEUE_MAX_PAGE
Definition: async.c:289

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 1975 of file async.c.

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

1979 {
1980  bool reachedStop = false;
1981  bool reachedEndOfPage;
1982  AsyncQueueEntry *qe;
1983 
1984  do
1985  {
1986  QueuePosition thisentry = *current;
1987 
1988  if (QUEUE_POS_EQUAL(thisentry, stop))
1989  break;
1990 
1991  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
1992 
1993  /*
1994  * Advance *current over this message, possibly to the next page. As
1995  * noted in the comments for asyncQueueReadAllNotifications, we must
1996  * do this before possibly failing while processing the message.
1997  */
1998  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
1999 
2000  /* Ignore messages destined for other databases */
2001  if (qe->dboid == MyDatabaseId)
2002  {
2003  if (XidInMVCCSnapshot(qe->xid, snapshot))
2004  {
2005  /*
2006  * The source transaction is still in progress, so we can't
2007  * process this message yet. Break out of the loop, but first
2008  * back up *current so we will reprocess the message next
2009  * time. (Note: it is unlikely but not impossible for
2010  * TransactionIdDidCommit to fail, so we can't really avoid
2011  * this advance-then-back-up behavior when dealing with an
2012  * uncommitted message.)
2013  *
2014  * Note that we must test XidInMVCCSnapshot before we test
2015  * TransactionIdDidCommit, else we might return a message from
2016  * a transaction that is not yet visible to snapshots; compare
2017  * the comments at the head of heapam_visibility.c.
2018  *
2019  * Also, while our own xact won't be listed in the snapshot,
2020  * we need not check for TransactionIdIsCurrentTransactionId
2021  * because our transaction cannot (yet) have queued any
2022  * messages.
2023  */
2024  *current = thisentry;
2025  reachedStop = true;
2026  break;
2027  }
2028  else if (TransactionIdDidCommit(qe->xid))
2029  {
2030  /* qe->data is the null-terminated channel name */
2031  char *channel = qe->data;
2032 
2033  if (IsListeningOn(channel))
2034  {
2035  /* payload follows channel name */
2036  char *payload = qe->data + strlen(channel) + 1;
2037 
2038  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2039  }
2040  }
2041  else
2042  {
2043  /*
2044  * The source transaction aborted or crashed, so we just
2045  * ignore its notifications.
2046  */
2047  }
2048  }
2049 
2050  /* Loop back if we're not at end of page */
2051  } while (!reachedEndOfPage);
2052 
2053  if (QUEUE_POS_EQUAL(*current, stop))
2054  reachedStop = true;
2055 
2056  return reachedStop;
2057 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:173
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2241
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1285
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
static bool IsListeningOn(const char *channel)
Definition: async.c:1203
#define QUEUE_POS_OFFSET(x)
Definition: async.c:191
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
Definition: async.c:171
int32 srcPid
Definition: async.c:172
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:199
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2154

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1805 of file async.c.

References Assert, AsyncCtl, asyncQueueAdvanceTail(), asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), ProcessCompletedNotifies(), and ProcessIncomingNotify().

1806 {
1807  volatile QueuePosition pos;
1808  QueuePosition oldpos;
1809  QueuePosition head;
1810  Snapshot snapshot;
1811  bool advanceTail;
1812 
1813  /* page_buffer must be adequately aligned, so use a union */
1814  union
1815  {
1816  char buf[QUEUE_PAGESIZE];
1817  AsyncQueueEntry align;
1818  } page_buffer;
1819 
1820  /* Fetch current state */
1821  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1822  /* Assert checks that we have a valid state entry */
1824  pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
1825  head = QUEUE_HEAD;
1826  LWLockRelease(AsyncQueueLock);
1827 
1828  if (QUEUE_POS_EQUAL(pos, head))
1829  {
1830  /* Nothing to do, we have read all notifications already. */
1831  return;
1832  }
1833 
1834  /* Get snapshot we'll use to decide which xacts are still in progress */
1835  snapshot = RegisterSnapshot(GetLatestSnapshot());
1836 
1837  /*----------
1838  * Note that we deliver everything that we see in the queue and that
1839  * matches our _current_ listening state.
1840  * Especially we do not take into account different commit times.
1841  * Consider the following example:
1842  *
1843  * Backend 1: Backend 2:
1844  *
1845  * transaction starts
1846  * NOTIFY foo;
1847  * commit starts
1848  * transaction starts
1849  * LISTEN foo;
1850  * commit starts
1851  * commit to clog
1852  * commit to clog
1853  *
1854  * It could happen that backend 2 sees the notification from backend 1 in
1855  * the queue. Even though the notifying transaction committed before
1856  * the listening transaction, we still deliver the notification.
1857  *
1858  * The idea is that an additional notification does not do any harm, we
1859  * just need to make sure that we do not miss a notification.
1860  *
1861  * It is possible that we fail while trying to send a message to our
1862  * frontend (for example, because of encoding conversion failure).
1863  * If that happens it is critical that we not try to send the same
1864  * message over and over again. Therefore, we place a PG_TRY block
1865  * here that will forcibly advance our backend position before we lose
1866  * control to an error. (We could alternatively retake AsyncQueueLock
1867  * and move the position before handling each individual message, but
1868  * that seems like too much lock traffic.)
1869  *----------
1870  */
1871  PG_TRY();
1872  {
1873  bool reachedStop;
1874 
1875  do
1876  {
1877  int curpage = QUEUE_POS_PAGE(pos);
1878  int curoffset = QUEUE_POS_OFFSET(pos);
1879  int slotno;
1880  int copysize;
1881 
1882  /*
1883  * We copy the data from SLRU into a local buffer, so as to avoid
1884  * holding the AsyncCtlLock while we are examining the entries and
1885  * possibly transmitting them to our frontend. Copy only the part
1886  * of the page we will actually inspect.
1887  */
1888  slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
1890  if (curpage == QUEUE_POS_PAGE(head))
1891  {
1892  /* we only want to read as far as head */
1893  copysize = QUEUE_POS_OFFSET(head) - curoffset;
1894  if (copysize < 0)
1895  copysize = 0; /* just for safety */
1896  }
1897  else
1898  {
1899  /* fetch all the rest of the page */
1900  copysize = QUEUE_PAGESIZE - curoffset;
1901  }
1902  memcpy(page_buffer.buf + curoffset,
1903  AsyncCtl->shared->page_buffer[slotno] + curoffset,
1904  copysize);
1905  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
1906  LWLockRelease(AsyncCtlLock);
1907 
1908  /*
1909  * Process messages up to the stop position, end of page, or an
1910  * uncommitted message.
1911  *
1912  * Our stop position is what we found to be the head's position
1913  * when we entered this function. It might have changed already.
1914  * But if it has, we will receive (or have already received and
1915  * queued) another signal and come here again.
1916  *
1917  * We are not holding AsyncQueueLock here! The queue can only
1918  * extend beyond the head pointer (see above) and we leave our
1919  * backend's pointer where it is so nobody will truncate or
1920  * rewrite pages under us. Especially we don't want to hold a lock
1921  * while sending the notifications to the frontend.
1922  */
1923  reachedStop = asyncQueueProcessPageEntries(&pos, head,
1924  page_buffer.buf,
1925  snapshot);
1926  } while (!reachedStop);
1927  }
1928  PG_CATCH();
1929  {
1930  /* Update shared state */
1931  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1933  advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
1934  LWLockRelease(AsyncQueueLock);
1935 
1936  /* If we were the laziest backend, try to advance the tail pointer */
1937  if (advanceTail)
1939 
1940  PG_RE_THROW();
1941  }
1942  PG_END_TRY();
1943 
1944  /* Update shared state */
1945  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1947  advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
1948  LWLockRelease(AsyncQueueLock);
1949 
1950  /* If we were the laziest backend, try to advance the tail pointer */
1951  if (advanceTail)
1953 
1954  /* Done with snapshot */
1955  UnregisterSnapshot(snapshot);
1956 }
#define QUEUE_TAIL
Definition: async.c:258
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:865
#define AsyncCtl
Definition: async.c:268
#define QUEUE_HEAD
Definition: async.c:257
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define QUEUE_POS_OFFSET(x)
Definition: async.c:191
static char * buf
Definition: pg_test_fsync.c:68
#define QUEUE_PAGESIZE
Definition: async.c:269
#define InvalidTransactionId
Definition: transam.h:31
static void asyncQueueAdvanceTail(void)
Definition: async.c:2064
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:907
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:467
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:1975
#define PG_CATCH()
Definition: elog.h:310
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define PG_RE_THROW()
Definition: elog.h:331
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:381
#define PG_TRY()
Definition: elog.h:301
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
#define PG_END_TRY()
Definition: elog.h:317
#define QUEUE_POS_PAGE(x)
Definition: async.c:190
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:199

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1222 of file async.c.

References amRegisteredListener, Assert, asyncQueueAdvanceTail(), InvalidOid, InvalidPid, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_POS_EQUAL, and QUEUE_TAIL.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

1223 {
1224  bool advanceTail;
1225 
1226  Assert(listenChannels == NIL); /* else caller error */
1227 
1228  if (!amRegisteredListener) /* nothing to do */
1229  return;
1230 
1231  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1232  /* check if entry is valid and oldest ... */
1233  advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
1235  /* ... then mark it invalid */
1238  LWLockRelease(AsyncQueueLock);
1239 
1240  /* mark ourselves as no longer listed in the global array */
1241  amRegisteredListener = false;
1242 
1243  /* If we were the laziest backend, try to advance the tail pointer */
1244  if (advanceTail)
1246 }
#define NIL
Definition: pg_list.h:65
#define QUEUE_TAIL
Definition: async.c:258
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
static List * listenChannels
Definition: async.c:296
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static void asyncQueueAdvanceTail(void)
Definition: async.c:2064
static bool amRegisteredListener
Definition: async.c:390
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:260
#define InvalidOid
Definition: postgres_ext.h:36
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:199
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1466 of file async.c.

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

1467 {
1468  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1469  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1470  int occupied;
1471 
1472  occupied = headPage - tailPage;
1473 
1474  if (occupied == 0)
1475  return (double) 0; /* fast exit for common case */
1476 
1477  if (occupied < 0)
1478  {
1479  /* head has wrapped around, tail not yet */
1480  occupied += QUEUE_MAX_PAGE + 1;
1481  }
1482 
1483  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1484 }
#define QUEUE_TAIL
Definition: async.c:258
#define QUEUE_HEAD
Definition: async.c:257
#define QUEUE_MAX_PAGE
Definition: async.c:289
#define QUEUE_POS_PAGE(x)
Definition: async.c:190

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 472 of file async.c.

References add_size(), AsyncCtl, asyncQueuePagePrecedes(), i, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWTRANCHE_ASYNC_BUFFERS, MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

473 {
474  bool found;
475  int slotno;
476  Size size;
477 
478  /*
479  * Create or attach to the AsyncQueueControl structure.
480  *
481  * The used entries in the backend[] array run from 1 to MaxBackends; the
482  * zero'th entry is unused but must be allocated.
483  */
484  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
485  size = add_size(size, offsetof(AsyncQueueControl, backend));
486 
488  ShmemInitStruct("Async Queue Control", size, &found);
489 
490  if (!found)
491  {
492  /* First time through, so initialize it */
493  int i;
494 
495  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
496  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
498  /* zero'th entry won't be used, but let's initialize it anyway */
499  for (i = 0; i <= MaxBackends; i++)
500  {
504  }
505  }
506 
507  /*
508  * Set up SLRU management of the pg_notify data.
509  */
510  AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
512  AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
513  /* Override default assumption that writes should be fsync'd */
514  AsyncCtl->do_fsync = false;
515 
516  if (!found)
517  {
518  /*
519  * During start or reboot, clean out the pg_notify directory.
520  */
522 
523  /* Now initialize page zero to empty */
524  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
526  /* This write is just to verify that pg_notify/ is writable */
527  SimpleLruWritePage(AsyncCtl, slotno);
528  LWLockRelease(AsyncCtlLock);
529  }
530 }
#define QUEUE_TAIL
Definition: async.c:258
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1359
#define AsyncCtl
Definition: async.c:268
#define QUEUE_HEAD
Definition: async.c:257
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:135
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:193
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:578
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:432
static AsyncQueueControl * asyncQueueControl
Definition: async.c:255
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:260
#define InvalidOid
Definition: postgres_ext.h:36
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1382
size_t Size
Definition: c.h:466
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
TimestampTz lastQueueFillWarn
Definition: async.c:250
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:263
#define QUEUE_POS_PAGE(x)
Definition: async.c:190
#define offsetof(type, field)
Definition: c.h:655
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:165

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 455 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

456 {
457  Size size;
458 
459  /* This had better match AsyncShmemInit */
460  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
461  size = add_size(size, offsetof(AsyncQueueControl, backend));
462 
464 
465  return size;
466 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:145
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:135
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:655

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1622 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1623 {
1624  /*
1625  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1626  * we have registered as a listener but have not made any entry in
1627  * listenChannels. In that case, deregister again.
1628  */
1631 
1632  /* And clean up */
1634 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:296
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2314
static void asyncQueueUnregister(void)
Definition: async.c:1222
static bool amRegisteredListener
Definition: async.c:390

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 911 of file async.c.

References ListenAction::action, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

912 {
913  ListCell *p;
914 
915  /*
916  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
917  * return as soon as possible
918  */
920  return;
921 
922  if (Trace_notify)
923  elog(DEBUG1, "AtCommit_Notify");
924 
925  /* Perform any pending listen/unlisten actions */
926  foreach(p, pendingActions)
927  {
928  ListenAction *actrec = (ListenAction *) lfirst(p);
929 
930  switch (actrec->action)
931  {
932  case LISTEN_LISTEN:
933  Exec_ListenCommit(actrec->channel);
934  break;
935  case LISTEN_UNLISTEN:
936  Exec_UnlistenCommit(actrec->channel);
937  break;
938  case LISTEN_UNLISTEN_ALL:
940  break;
941  }
942  }
943 
944  /* If no longer listening to anything, get out of listener array */
947 
948  /* And clean up */
950 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1104
static List * listenChannels
Definition: async.c:296
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2314
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1073
static NotificationList * pendingNotifies
Definition: async.c:373
static List * pendingActions
Definition: async.c:321
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1046
static void asyncQueueUnregister(void)
Definition: async.c:1222
static bool amRegisteredListener
Definition: async.c:390
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:318
ListenActionKind action
Definition: async.c:317

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 787 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

788 {
789  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
791  ereport(ERROR,
792  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
793  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
794 }
int errcode(int sqlerrcode)
Definition: elog.c:570
static NotificationList * pendingNotifies
Definition: async.c:373
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:321
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1729 of file async.c.

References GetCurrentTransactionNestLevel(), linitial, linitial_node, list_delete_first(), and list_length().

Referenced by AbortSubTransaction().

1730 {
1731  int my_level = GetCurrentTransactionNestLevel();
1732 
1733  /*
1734  * All we have to do is pop the stack --- the actions/notifies made in
1735  * this subxact are no longer interesting, and the space will be freed
1736  * when CurTransactionContext is recycled.
1737  *
1738  * This routine could be called more than once at a given nesting level if
1739  * there is trouble during subxact abort. Avoid dumping core by using
1740  * GetCurrentTransactionNestLevel as the indicator of how far we need to
1741  * prune the list.
1742  */
1743  while (list_length(upperPendingActions) > my_level - 2)
1744  {
1747  }
1748 
1749  while (list_length(upperPendingNotifies) > my_level - 2)
1750  {
1753  }
1754 }
#define linitial_node(type, l)
Definition: pg_list.h:198
static NotificationList * pendingNotifies
Definition: async.c:373
#define linitial(l)
Definition: pg_list.h:195
static List * pendingActions
Definition: async.c:321
static List * upperPendingActions
Definition: async.c:323
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
static int list_length(const List *l)
Definition: pg_list.h:169
static List * upperPendingNotifies
Definition: async.c:375
Definition: pg_list.h:50
List * list_delete_first(List *list)
Definition: list.c:857

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1672 of file async.c.

References AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, linitial, linitial_node, list_concat(), list_delete_first(), list_length(), and pendingNotifies.

Referenced by CommitSubTransaction().

1673 {
1674  List *parentPendingActions;
1675  NotificationList *parentPendingNotifies;
1676 
1677  parentPendingActions = linitial_node(List, upperPendingActions);
1679 
1682 
1683  /*
1684  * Mustn't try to eliminate duplicates here --- see queue_listen()
1685  */
1686  pendingActions = list_concat(parentPendingActions, pendingActions);
1687 
1688  parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
1690 
1693 
1694  if (pendingNotifies == NULL)
1695  {
1696  /* easy, no notify events happened in current subxact */
1697  pendingNotifies = parentPendingNotifies;
1698  }
1699  else if (parentPendingNotifies == NULL)
1700  {
1701  /* easy, subxact's list becomes parent's */
1702  }
1703  else
1704  {
1705  /*
1706  * Formerly, we didn't bother to eliminate duplicates here, but now we
1707  * must, else we fall foul of "Assert(!found)", either here or during
1708  * a later attempt to build the parent-level hashtable.
1709  */
1710  NotificationList *childPendingNotifies = pendingNotifies;
1711  ListCell *l;
1712 
1713  pendingNotifies = parentPendingNotifies;
1714  /* Insert all the subxact's events into parent, except for dups */
1715  foreach(l, childPendingNotifies->events)
1716  {
1717  Notification *childn = (Notification *) lfirst(l);
1718 
1719  if (!AsyncExistsPendingNotify(childn))
1720  AddEventToPendingNotifies(childn);
1721  }
1722  }
1723 }
List * events
Definition: async.c:362
List * list_concat(List *list1, const List *list2)
Definition: list.c:515
#define linitial_node(type, l)
Definition: pg_list.h:198
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2179
static NotificationList * pendingNotifies
Definition: async.c:373
#define linitial(l)
Definition: pg_list.h:195
static List * pendingActions
Definition: async.c:321
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2220
static List * upperPendingActions
Definition: async.c:323
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
static int list_length(const List *l)
Definition: pg_list.h:169
static List * upperPendingNotifies
Definition: async.c:375
Definition: pg_list.h:50
List * list_delete_first(List *list)
Definition: list.c:857

◆ AtSubStart_Notify()

void AtSubStart_Notify ( void  )

Definition at line 1642 of file async.c.

References Assert, GetCurrentTransactionNestLevel(), lcons(), list_length(), MemoryContextSwitchTo(), NIL, and TopTransactionContext.

Referenced by StartSubTransaction().

1643 {
1644  MemoryContext old_cxt;
1645 
1646  /* Keep the list-of-lists in TopTransactionContext for simplicity */
1648 
1650 
1653 
1654  pendingActions = NIL;
1655 
1657 
1660 
1661  pendingNotifies = NULL;
1662 
1663  MemoryContextSwitchTo(old_cxt);
1664 }
#define NIL
Definition: pg_list.h:65
MemoryContext TopTransactionContext
Definition: mcxt.c:49
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static NotificationList * pendingNotifies
Definition: async.c:373
static List * pendingActions
Definition: async.c:321
static List * upperPendingActions
Definition: async.c:323
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
List * lcons(void *datum, List *list)
Definition: list.c:453
#define Assert(condition)
Definition: c.h:732
static int list_length(const List *l)
Definition: pg_list.h:169
static List * upperPendingNotifies
Definition: async.c:375

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2314 of file async.c.

References NIL.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

2315 {
2316  /*
2317  * We used to have to explicitly deallocate the list members and nodes,
2318  * because they were malloc'd. Now, since we know they are palloc'd in
2319  * CurTransactionContext, we need not do that --- they'll go away
2320  * automatically at transaction exit. We need only reset the list head
2321  * pointers.
2322  */
2323  pendingActions = NIL;
2324  pendingNotifies = NULL;
2325 }
#define NIL
Definition: pg_list.h:65
static NotificationList * pendingNotifies
Definition: async.c:373
static List * pendingActions
Definition: async.c:321

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1046 of file async.c.

References IsListeningOn(), lappend(), MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

1047 {
1048  MemoryContext oldcontext;
1049 
1050  /* Do nothing if we are already listening on this channel */
1051  if (IsListeningOn(channel))
1052  return;
1053 
1054  /*
1055  * Add the new channel name to listenChannels.
1056  *
1057  * XXX It is theoretically possible to get an out-of-memory failure here,
1058  * which would be bad because we already committed. For the moment it
1059  * doesn't seem worth trying to guard against that, but maybe improve this
1060  * later.
1061  */
1064  MemoryContextSwitchTo(oldcontext);
1065 }
char * pstrdup(const char *in)
Definition: mcxt.c:1161
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:296
static bool IsListeningOn(const char *channel)
Definition: async.c:1203
MemoryContext TopMemoryContext
Definition: mcxt.c:44
List * lappend(List *list, void *datum)
Definition: list.c:321

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 958 of file async.c.

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_POS_PAGE, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

959 {
960  QueuePosition head;
961  QueuePosition max;
962  int i;
963 
964  /*
965  * Nothing to do if we are already listening to something, nor if we
966  * already ran this routine in this transaction.
967  */
969  return;
970 
971  if (Trace_notify)
972  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
973 
974  /*
975  * Before registering, make sure we will unlisten before dying. (Note:
976  * this action does not get undone if we abort later.)
977  */
979  {
981  unlistenExitRegistered = true;
982  }
983 
984  /*
985  * This is our first LISTEN, so establish our pointer.
986  *
987  * We set our pointer to the global tail pointer and then move it forward
988  * over already-committed notifications. This ensures we cannot miss any
989  * not-yet-committed notifications. We might get a few more but that
990  * doesn't hurt.
991  *
992  * In some scenarios there might be a lot of committed notifications that
993  * have not yet been pruned away (because some backend is being lazy about
994  * reading them). To reduce our startup time, we can look at other
995  * backends and adopt the maximum "pos" pointer of any backend that's in
996  * our database; any notifications it's already advanced over are surely
997  * committed and need not be re-examined by us. (We must consider only
998  * backends connected to our DB, because others will not have bothered to
999  * check committed-ness of notifications in our DB.) But we only bother
1000  * with that if there's more than a page worth of notifications
1001  * outstanding, otherwise scanning all the other backends isn't worth it.
1002  *
1003  * We need exclusive lock here so we can look at other backends' entries.
1004  */
1005  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1006  head = QUEUE_HEAD;
1007  max = QUEUE_TAIL;
1008  if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
1009  {
1010  for (i = 1; i <= MaxBackends; i++)
1011  {
1013  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1014  }
1015  }
1019  LWLockRelease(AsyncQueueLock);
1020 
1021  /* Now we are listed in the global array, so remember we're listening */
1022  amRegisteredListener = true;
1023 
1024  /*
1025  * Try to move our pointer forward as far as possible. This will skip over
1026  * already-committed notifications. Still, we could get notifications that
1027  * have already committed before we started to LISTEN.
1028  *
1029  * Note that we are not yet listening on anything, so we won't deliver any
1030  * notification to the frontend. Also, although our transaction might
1031  * have executed NOTIFY, those message(s) aren't queued yet so we can't
1032  * see them in the queue.
1033  *
1034  * This will also advance the global tail pointer if possible.
1035  */
1036  if (!QUEUE_POS_EQUAL(max, head))
1038 }
#define QUEUE_TAIL
Definition: async.c:258
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
#define QUEUE_HEAD
Definition: async.c:257
static bool unlistenExitRegistered
Definition: async.c:387
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:774
int MaxBackends
Definition: globals.c:135
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
static bool amRegisteredListener
Definition: async.c:390
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:260
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1805
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
#define QUEUE_POS_PAGE(x)
Definition: async.c:190
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:199
#define QUEUE_POS_MAX(x, y)
Definition: async.c:209

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1104 of file async.c.

References DEBUG1, elog, list_free_deep(), MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

1105 {
1106  if (Trace_notify)
1107  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1108 
1110  listenChannels = NIL;
1111 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:296
void list_free_deep(List *list)
Definition: list.c:1387
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1073 of file async.c.

References DEBUG1, elog, foreach_delete_current, lfirst, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

1074 {
1075  ListCell *q;
1076 
1077  if (Trace_notify)
1078  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1079 
1080  foreach(q, listenChannels)
1081  {
1082  char *lchan = (char *) lfirst(q);
1083 
1084  if (strcmp(lchan, channel) == 0)
1085  {
1087  pfree(lchan);
1088  break;
1089  }
1090  }
1091 
1092  /*
1093  * We do not complain about unlistening something not being listened;
1094  * should we?
1095  */
1096 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:296
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:368
void pfree(void *pointer)
Definition: mcxt.c:1031
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1765 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1766 {
1767  /*
1768  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1769  * you do here.
1770  */
1771 
1772  /* signal that work needs to be done */
1773  notifyInterruptPending = true;
1774 
1775  /* make sure the event is processed in due course */
1776  SetLatch(MyLatch);
1777 }
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:384

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1203 of file async.c.

References lfirst.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

1204 {
1205  ListCell *p;
1206 
1207  foreach(p, listenChannels)
1208  {
1209  char *lchan = (char *) lfirst(p);
1210 
1211  if (strcmp(lchan, channel) == 0)
1212  return true;
1213  }
1214  return false;
1215 }
static List * listenChannels
Definition: async.c:296
#define lfirst(lc)
Definition: pg_list.h:190

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2284 of file async.c.

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32, hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2285 {
2286  const Notification *k = *(const Notification *const *) key;
2287 
2288  Assert(keysize == sizeof(Notification *));
2289  /* We don't bother to include the payload's trailing null in the hash */
2290  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2291  k->channel_len + k->payload_len + 1));
2292 }
#define DatumGetUInt32(X)
Definition: postgres.h:486
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:357
#define Assert(condition)
Definition: c.h:732
uint16 channel_len
Definition: async.c:354
uint16 payload_len
Definition: async.c:355
Datum hash_any(register const unsigned char *k, register int keylen)
Definition: hashfn.c:148

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2298 of file async.c.

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2299 {
2300  const Notification *k1 = *(const Notification *const *) key1;
2301  const Notification *k2 = *(const Notification *const *) key2;
2302 
2303  Assert(keysize == sizeof(Notification *));
2304  if (k1->channel_len == k2->channel_len &&
2305  k1->payload_len == k2->payload_len &&
2306  memcmp(k1->data, k2->data,
2307  k1->channel_len + k1->payload_len + 2) == 0)
2308  return 0; /* equal */
2309  return 1; /* not equal */
2310 }
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:357
#define Assert(condition)
Definition: c.h:732
uint16 channel_len
Definition: async.c:354
uint16 payload_len
Definition: async.c:355

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2154 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2155 {
2157  {
2159 
2160  pq_beginmessage(&buf, 'A');
2161  pq_sendint32(&buf, srcPid);
2162  pq_sendstring(&buf, channel);
2164  pq_sendstring(&buf, payload);
2165  pq_endmessage(&buf);
2166 
2167  /*
2168  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2169  * happen at the end of the transaction, and for incoming notifies
2170  * ProcessIncomingNotify will do it after finding all the notifies.
2171  */
2172  }
2173  else
2174  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2175 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:226
CommandDest whereToSendOutput
Definition: postgres.c:90
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 741 of file async.c.

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

742 {
743  FuncCallContext *funcctx;
744 
745  /* stuff done only on the first call of the function */
746  if (SRF_IS_FIRSTCALL())
747  {
748  /* create a function context for cross-call persistence */
749  funcctx = SRF_FIRSTCALL_INIT();
750  }
751 
752  /* stuff done on every call of the function */
753  funcctx = SRF_PERCALL_SETUP();
754 
755  if (funcctx->call_cntr < list_length(listenChannels))
756  {
757  char *channel = (char *) list_nth(listenChannels,
758  funcctx->call_cntr);
759 
760  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
761  }
762 
763  SRF_RETURN_DONE(funcctx);
764 }
uint64 call_cntr
Definition: funcapi.h:66
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:283
static List * listenChannels
Definition: async.c:296
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:287
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:289
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
static int list_length(const List *l)
Definition: pg_list.h:169
#define CStringGetTextDatum(s)
Definition: builtins.h:83
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:307
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:285

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1449 of file async.c.

References asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

1450 {
1451  double usage;
1452 
1453  LWLockAcquire(AsyncQueueLock, LW_SHARED);
1454  usage = asyncQueueUsage();
1455  LWLockRelease(AsyncQueueLock);
1456 
1457  PG_RETURN_FLOAT8(usage);
1458 }
static void usage(void)
Definition: pg_standby.c:593
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:356
static double asyncQueueUsage(void)
Definition: async.c:1466
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 538 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

539 {
540  const char *channel;
541  const char *payload;
542 
543  if (PG_ARGISNULL(0))
544  channel = "";
545  else
546  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
547 
548  if (PG_ARGISNULL(1))
549  payload = "";
550  else
551  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
552 
553  /* For NOTIFY as a statement, this is checked in ProcessUtility */
555 
556  Async_Notify(channel, payload);
557 
558  PG_RETURN_VOID();
559 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:303
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:276
#define PG_RETURN_VOID()
Definition: fmgr.h:339
#define PG_ARGISNULL(n)
Definition: fmgr.h:204
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:572
char * text_to_cstring(const text *t)
Definition: varlena.c:204

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 812 of file async.c.

References AccessExclusiveLock, ListenAction::action, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

813 {
814  ListCell *p;
815 
817  return; /* no relevant statements in this xact */
818 
819  if (Trace_notify)
820  elog(DEBUG1, "PreCommit_Notify");
821 
822  /* Preflight for any pending listen/unlisten actions */
823  foreach(p, pendingActions)
824  {
825  ListenAction *actrec = (ListenAction *) lfirst(p);
826 
827  switch (actrec->action)
828  {
829  case LISTEN_LISTEN:
831  break;
832  case LISTEN_UNLISTEN:
833  /* there is no Exec_UnlistenPreCommit() */
834  break;
835  case LISTEN_UNLISTEN_ALL:
836  /* there is no Exec_UnlistenAllPreCommit() */
837  break;
838  }
839  }
840 
841  /* Queue any pending notifies (must happen after the above) */
842  if (pendingNotifies)
843  {
844  ListCell *nextNotify;
845 
846  /*
847  * Make sure that we have an XID assigned to the current transaction.
848  * GetCurrentTransactionId is cheap if we already have an XID, but not
849  * so cheap if we don't, and we'd prefer not to do that work while
850  * holding AsyncQueueLock.
851  */
852  (void) GetCurrentTransactionId();
853 
854  /*
855  * Serialize writers by acquiring a special lock that we hold till
856  * after commit. This ensures that queue entries appear in commit
857  * order, and in particular that there are never uncommitted queue
858  * entries ahead of committed ones, so an uncommitted transaction
859  * can't block delivery of deliverable notifications.
860  *
861  * We use a heavyweight lock so that it'll automatically be released
862  * after either commit or abort. This also allows deadlocks to be
863  * detected, though really a deadlock shouldn't be possible here.
864  *
865  * The lock is on "database 0", which is pretty ugly but it doesn't
866  * seem worth inventing a special locktag category just for this.
867  * (Historical note: before PG 9.0, a similar lock on "database 0" was
868  * used by the flatfiles mechanism.)
869  */
870  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
872 
873  /* Now push the notifications into the queue */
875 
876  nextNotify = list_head(pendingNotifies->events);
877  while (nextNotify != NULL)
878  {
879  /*
880  * Add the pending notifications to the queue. We acquire and
881  * release AsyncQueueLock once per page, which might be overkill
882  * but it does allow readers to get in while we're doing this.
883  *
884  * A full queue is very uncommon and should really not happen,
885  * given that we have so much space available in the SLRU pages.
886  * Nevertheless we need to deal with this possibility. Note that
887  * when we get here we are in the process of committing our
888  * transaction, but we have not yet committed to clog, so at this
889  * point in time we can still roll the transaction back.
890  */
891  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
893  if (asyncQueueIsFull())
894  ereport(ERROR,
895  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
896  errmsg("too many notifications in the NOTIFY queue")));
897  nextNotify = asyncQueueAddEntries(nextNotify);
898  LWLockRelease(AsyncQueueLock);
899  }
900  }
901 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:362
int errcode(int sqlerrcode)
Definition: elog.c:570
static bool asyncQueueIsFull(void)
Definition: async.c:1254
static void asyncQueueFillWarning(void)
Definition: async.c:1495
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static NotificationList * pendingNotifies
Definition: async.c:373
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:321
static bool backendHasSentNotifications
Definition: async.c:393
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
#define ereport(elevel, rest)
Definition: elog.h:141
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1004
#define InvalidOid
Definition: postgres_ext.h:36
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1356
#define lfirst(lc)
Definition: pg_list.h:190
static void Exec_ListenPreCommit(void)
Definition: async.c:958
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:396
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
ListenActionKind action
Definition: async.c:317

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1135 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1136 {
1137  MemoryContext caller_context;
1138  bool signalled;
1139 
1140  /* Nothing to do if we didn't send any notifications */
1142  return;
1143 
1144  /*
1145  * We reset the flag immediately; otherwise, if any sort of error occurs
1146  * below, we'd be locked up in an infinite loop, because control will come
1147  * right back here after error cleanup.
1148  */
1150 
1151  /*
1152  * We must preserve the caller's memory context (probably MessageContext)
1153  * across the transaction we do here.
1154  */
1155  caller_context = CurrentMemoryContext;
1156 
1157  if (Trace_notify)
1158  elog(DEBUG1, "ProcessCompletedNotifies");
1159 
1160  /*
1161  * We must run asyncQueueReadAllNotifications inside a transaction, else
1162  * bad things happen if it gets an error.
1163  */
1165 
1166  /* Send signals to other backends */
1167  signalled = SignalBackends();
1168 
1169  if (listenChannels != NIL)
1170  {
1171  /* Read the queue ourselves, and send relevant stuff to the frontend */
1173  }
1174  else if (!signalled)
1175  {
1176  /*
1177  * If we found no other listening backends, and we aren't listening
1178  * ourselves, then we must execute asyncQueueAdvanceTail to flush the
1179  * queue, because ain't nobody else gonna do it. This prevents queue
1180  * overflow when we're sending useless notifies to nobody. (A new
1181  * listener could have joined since we looked, but if so this is
1182  * harmless.)
1183  */
1185  }
1186 
1188 
1189  MemoryContextSwitchTo(caller_context);
1190 
1191  /* We don't need pq_flush() here since postgres.c will do one shortly */
1192 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2895
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:296
static bool backendHasSentNotifications
Definition: async.c:393
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2064
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1805
void StartTransactionCommand(void)
Definition: xact.c:2794
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226
static bool SignalBackends(void)
Definition: async.c:1550

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( void  )
static

Definition at line 2115 of file async.c.

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

2116 {
2117  /* We *must* reset the flag */
2118  notifyInterruptPending = false;
2119 
2120  /* Do nothing else if we aren't actively listening */
2121  if (listenChannels == NIL)
2122  return;
2123 
2124  if (Trace_notify)
2125  elog(DEBUG1, "ProcessIncomingNotify");
2126 
2127  set_ps_display("notify interrupt", false);
2128 
2129  /*
2130  * We must run asyncQueueReadAllNotifications inside a transaction, else
2131  * bad things happen if it gets an error.
2132  */
2134 
2136 
2138 
2139  /*
2140  * Must flush the notify messages to ensure frontend gets them promptly.
2141  */
2142  pq_flush();
2143 
2144  set_ps_display("idle", false);
2145 
2146  if (Trace_notify)
2147  elog(DEBUG1, "ProcessIncomingNotify: done");
2148 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
#define pq_flush()
Definition: libpq.h:39
void CommitTransactionCommand(void)
Definition: xact.c:2895
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
static List * listenChannels
Definition: async.c:296
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1805
void StartTransactionCommand(void)
Definition: xact.c:2794
bool Trace_notify
Definition: async.c:396
#define elog(elevel,...)
Definition: elog.h:226
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:384

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1789 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by ProcessClientReadInterrupt().

1790 {
1792  return; /* not really idle */
1793 
1794  while (notifyInterruptPending)
1796 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4627
static void ProcessIncomingNotify(void)
Definition: async.c:2115
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:384

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 659 of file async.c.

References generate_unaccent_rules::action, ListenAction::action, ListenAction::channel, CurTransactionContext, lappend(), MemoryContextSwitchTo(), offsetof, and palloc().

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

660 {
661  MemoryContext oldcontext;
662  ListenAction *actrec;
663 
664  /*
665  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
666  * be too complicated to ensure we get the right interactions of
667  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
668  * would be any performance benefit anyway in sane applications.
669  */
671 
672  /* space for terminating null is included in sizeof(ListenAction) */
673  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
674  strlen(channel) + 1);
675  actrec->action = action;
676  strcpy(actrec->channel, channel);
677 
679 
680  MemoryContextSwitchTo(oldcontext);
681 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
static List * pendingActions
Definition: async.c:321
List * lappend(List *list, void *datum)
Definition: list.c:321
void * palloc(Size size)
Definition: mcxt.c:924
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:318
ListenActionKind action
Definition: async.c:317
#define offsetof(type, field)
Definition: c.h:655

◆ SignalBackends()

static bool SignalBackends ( void  )
static

Definition at line 1550 of file async.c.

References DEBUG3, elog, i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyProcPid, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_EQUAL, and SendProcSignal().

Referenced by ProcessCompletedNotifies().

1551 {
1552  bool signalled = false;
1553  int32 *pids;
1554  BackendId *ids;
1555  int count;
1556  int i;
1557  int32 pid;
1558 
1559  /*
1560  * Identify all backends that are listening and not already up-to-date. We
1561  * don't want to send signals while holding the AsyncQueueLock, so we just
1562  * build a list of target PIDs.
1563  *
1564  * XXX in principle these pallocs could fail, which would be bad. Maybe
1565  * preallocate the arrays? But in practice this is only run in trivial
1566  * transactions, so there should surely be space available.
1567  */
1568  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1569  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1570  count = 0;
1571 
1572  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
1573  for (i = 1; i <= MaxBackends; i++)
1574  {
1575  pid = QUEUE_BACKEND_PID(i);
1576  if (pid != InvalidPid && pid != MyProcPid)
1577  {
1579 
1580  if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1581  {
1582  pids[count] = pid;
1583  ids[count] = i;
1584  count++;
1585  }
1586  }
1587  }
1588  LWLockRelease(AsyncQueueLock);
1589 
1590  /* Now send signals */
1591  for (i = 0; i < count; i++)
1592  {
1593  pid = pids[i];
1594 
1595  /*
1596  * Note: assuming things aren't broken, a signal failure here could
1597  * only occur if the target backend exited since we released
1598  * AsyncQueueLock; which is unlikely but certainly possible. So we
1599  * just log a low-level debug message if it happens.
1600  */
1601  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1602  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1603  else
1604  signalled = true;
1605  }
1606 
1607  pfree(pids);
1608  pfree(ids);
1609 
1610  return signalled;
1611 }
int MyProcPid
Definition: globals.c:40
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
#define DEBUG3
Definition: elog.h:23
#define QUEUE_HEAD
Definition: async.c:257
signed int int32
Definition: c.h:346
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
void pfree(void *pointer)
Definition: mcxt.c:1031
int MaxBackends
Definition: globals.c:135
int BackendId
Definition: backendid.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
void * palloc(Size size)
Definition: mcxt.c:924
#define elog(elevel,...)
Definition: elog.h:226
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:199
#define InvalidPid
Definition: miscadmin.h:32

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ AsyncCtlData

SlruCtlData AsyncCtlData
static

Definition at line 266 of file async.c.

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 255 of file async.c.

◆ backendHasSentNotifications

bool backendHasSentNotifications = false
static

Definition at line 393 of file async.c.

Referenced by PreCommit_Notify(), and ProcessCompletedNotifies().

◆ listenChannels

List* listenChannels = NIL
static

Definition at line 296 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

List* pendingActions = NIL
static

Definition at line 321 of file async.c.

◆ pendingNotifies

NotificationList* pendingNotifies = NULL
static

Definition at line 373 of file async.c.

Referenced by AtSubCommit_Notify().

◆ Trace_notify

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 387 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().

◆ upperPendingActions

List* upperPendingActions = NIL
static

Definition at line 323 of file async.c.

◆ upperPendingNotifies

List* upperPendingNotifies = NIL
static

Definition at line 375 of file async.c.