PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)   ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)   ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL }
 

Functions

static int asyncQueuePageDiff (int p, int q)
 
static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (void)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void ProcessCompletedNotifies (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool backendHasSentNotifications = false
 
static bool backendTryAdvanceTail = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 182 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 402 of file async.c.

Referenced by AddEventToPendingNotifies().

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 156 of file async.c.

Referenced by Async_Notify(), and asyncQueueNotificationToEntry().

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

◆ QUEUE_BACKEND_PID

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 231 of file async.c.

Referenced by asyncQueueAddEntries(), and SignalBackends().

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 302 of file async.c.

Referenced by asyncQueueFillWarning().

◆ QUEUE_HEAD

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

Definition at line 318 of file async.c.

Referenced by asyncQueueAdvance(), asyncQueueIsFull(), asyncQueuePageDiff(), and asyncQueueUsage().

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
 
)    ((x).page == (y).page && (x).offset == (y).offset)

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)    ((x).page == 0 && (x).offset == 0)

Definition at line 205 of file async.c.

Referenced by asyncQueueAddEntries().

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492

Definition at line 215 of file async.c.

Referenced by Exec_ListenPreCommit().

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492

Definition at line 209 of file async.c.

Referenced by asyncQueueAdvanceTail(), and asyncQueueFillWarning().

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 180 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 196 of file async.c.

Referenced by asyncQueueAdvance(), and AsyncShmemInit().

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 337 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2340 of file async.c.

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MemSet, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), and notification_match().

Referenced by Async_Notify(), and AtSubCommit_Notify().

2341 {
2343 
2344  /* Create the hash table if it's time to */
2346  pendingNotifies->hashtab == NULL)
2347  {
2348  HASHCTL hash_ctl;
2349  ListCell *l;
2350 
2351  /* Create the hash table */
2352  MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2353  hash_ctl.keysize = sizeof(Notification *);
2354  hash_ctl.entrysize = sizeof(NotificationHash);
2355  hash_ctl.hash = notification_hash;
2356  hash_ctl.match = notification_match;
2357  hash_ctl.hcxt = CurTransactionContext;
2359  hash_create("Pending Notifies",
2360  256L,
2361  &hash_ctl,
2363 
2364  /* Insert all the already-existing events */
2365  foreach(l, pendingNotifies->events)
2366  {
2367  Notification *oldn = (Notification *) lfirst(l);
2368  NotificationHash *hentry;
2369  bool found;
2370 
2372  &oldn,
2373  HASH_ENTER,
2374  &found);
2375  Assert(!found);
2376  hentry->event = oldn;
2377  }
2378  }
2379 
2380  /* Add new event to the list, in order */
2382 
2383  /* Add event to the hash table if needed */
2384  if (pendingNotifies->hashtab != NULL)
2385  {
2386  NotificationHash *hentry;
2387  bool found;
2388 
2390  &n,
2391  HASH_ENTER,
2392  &found);
2393  Assert(!found);
2394  hentry->event = n;
2395  }
2396 }
#define NIL
Definition: pg_list.h:65
struct NotificationHash NotificationHash
List * events
Definition: async.c:397
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
MemoryContext hcxt
Definition: hsearch.h:77
Size entrysize
Definition: hsearch.h:72
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define MemSet(start, val, len)
Definition: c.h:950
Notification * event
Definition: async.c:406
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
static NotificationList * pendingNotifies
Definition: async.c:409
List * lappend(List *list, void *datum)
Definition: list.c:321
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:402
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
Size keysize
Definition: hsearch.h:71
HashCompareFunc match
Definition: hsearch.h:74
HTAB * hashtab
Definition: async.c:398
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2418
#define Assert(condition)
Definition: c.h:746
#define lfirst(lc)
Definition: pg_list.h:169
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2404
#define HASH_COMPARE
Definition: hsearch.h:88
static int list_length(const List *l)
Definition: pg_list.h:149
HashValueFunc hash
Definition: hsearch.h:73
#define HASH_FUNCTION
Definition: hsearch.h:87

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 756 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

757 {
758  if (Trace_notify)
759  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
760 
761  queue_listen(LISTEN_LISTEN, channel);
762 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:708
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 609 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

610 {
611  int my_level = GetCurrentTransactionNestLevel();
612  size_t channel_len;
613  size_t payload_len;
614  Notification *n;
615  MemoryContext oldcontext;
616 
617  if (IsParallelWorker())
618  elog(ERROR, "cannot send notifications from a parallel worker");
619 
620  if (Trace_notify)
621  elog(DEBUG1, "Async_Notify(%s)", channel);
622 
623  channel_len = channel ? strlen(channel) : 0;
624  payload_len = payload ? strlen(payload) : 0;
625 
626  /* a channel name must be specified */
627  if (channel_len == 0)
628  ereport(ERROR,
629  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
630  errmsg("channel name cannot be empty")));
631 
632  /* enforce length limits */
633  if (channel_len >= NAMEDATALEN)
634  ereport(ERROR,
635  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
636  errmsg("channel name too long")));
637 
638  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
639  ereport(ERROR,
640  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
641  errmsg("payload string too long")));
642 
643  /*
644  * We must construct the Notification entry, even if we end up not using
645  * it, in order to compare it cheaply to existing list entries.
646  *
647  * The notification list needs to live until end of transaction, so store
648  * it in the transaction context.
649  */
651 
652  n = (Notification *) palloc(offsetof(Notification, data) +
653  channel_len + payload_len + 2);
654  n->channel_len = channel_len;
655  n->payload_len = payload_len;
656  strcpy(n->data, channel);
657  if (payload)
658  strcpy(n->data + channel_len + 1, payload);
659  else
660  n->data[channel_len + 1] = '\0';
661 
662  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
663  {
664  NotificationList *notifies;
665 
666  /*
667  * First notify event in current (sub)xact. Note that we allocate the
668  * NotificationList in TopTransactionContext; the nestingLevel might
669  * get changed later by AtSubCommit_Notify.
670  */
671  notifies = (NotificationList *)
673  sizeof(NotificationList));
674  notifies->nestingLevel = my_level;
675  notifies->events = list_make1(n);
676  /* We certainly don't need a hashtable yet */
677  notifies->hashtab = NULL;
678  notifies->upper = pendingNotifies;
679  pendingNotifies = notifies;
680  }
681  else
682  {
683  /* Now check for duplicates */
685  {
686  /* It's a dup, so forget it */
687  pfree(n);
688  MemoryContextSwitchTo(oldcontext);
689  return;
690  }
691 
692  /* Append more events to existing list */
694  }
695 
696  MemoryContextSwitchTo(oldcontext);
697 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:397
MemoryContext TopTransactionContext
Definition: mcxt.c:49
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:610
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2299
int nestingLevel
Definition: async.c:396
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1057
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2340
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:399
#define IsParallelWorker()
Definition: parallel.h:61
#define ereport(elevel,...)
Definition: elog.h:144
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
HTAB * hashtab
Definition: async.c:398
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389
bool Trace_notify
Definition: async.c:433
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:821
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
#define elog(elevel,...)
Definition: elog.h:214
#define offsetof(type, field)
Definition: c.h:669

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 770 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

771 {
772  if (Trace_notify)
773  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
774 
775  /* If we couldn't possibly be listening, no need to queue anything */
776  if (pendingActions == NULL && !unlistenExitRegistered)
777  return;
778 
779  queue_listen(LISTEN_UNLISTEN, channel);
780 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:708
static ActionList * pendingActions
Definition: async.c:357
static bool unlistenExitRegistered
Definition: async.c:421
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 788 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

789 {
790  if (Trace_notify)
791  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
792 
793  /* If we couldn't possibly be listening, no need to queue anything */
794  if (pendingActions == NULL && !unlistenExitRegistered)
795  return;
796 
798 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:708
static ActionList * pendingActions
Definition: async.c:357
static bool unlistenExitRegistered
Definition: async.c:421
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 841 of file async.c.

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

842 {
845 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1184
static void asyncQueueUnregister(void)
Definition: async.c:1300

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2299 of file async.c.

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, and Notification::payload_len.

Referenced by Async_Notify(), and AtSubCommit_Notify().

2300 {
2301  if (pendingNotifies == NULL)
2302  return false;
2303 
2304  if (pendingNotifies->hashtab != NULL)
2305  {
2306  /* Use the hash table to probe for a match */
2308  &n,
2309  HASH_FIND,
2310  NULL))
2311  return true;
2312  }
2313  else
2314  {
2315  /* Must scan the event list */
2316  ListCell *l;
2317 
2318  foreach(l, pendingNotifies->events)
2319  {
2320  Notification *oldn = (Notification *) lfirst(l);
2321 
2322  if (n->channel_len == oldn->channel_len &&
2323  n->payload_len == oldn->payload_len &&
2324  memcmp(n->data, oldn->data,
2325  n->channel_len + n->payload_len + 2) == 0)
2326  return true;
2327  }
2328  }
2329 
2330  return false;
2331 }
List * events
Definition: async.c:397
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
static NotificationList * pendingNotifies
Definition: async.c:409
HTAB * hashtab
Definition: async.c:398
#define lfirst(lc)
Definition: pg_list.h:169
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1443 of file async.c.

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), backendTryAdvanceTail, AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), and SimpleLruZeroPage().

Referenced by PreCommit_Notify().

1444 {
1445  AsyncQueueEntry qe;
1446  QueuePosition queue_head;
1447  int pageno;
1448  int offset;
1449  int slotno;
1450 
1451  /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
1452  LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
1453 
1454  /*
1455  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1456  * memory upon exiting. The reason for this is that if we have to advance
1457  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1458  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1459  * subsequent insertions would try to put entries into a page that slru.c
1460  * thinks doesn't exist yet.) So, use a local position variable. Note
1461  * that if we do fail, any already-inserted queue entries are forgotten;
1462  * this is okay, since they'd be useless anyway after our transaction
1463  * rolls back.
1464  */
1465  queue_head = QUEUE_HEAD;
1466 
1467  /*
1468  * If this is the first write since the postmaster started, we need to
1469  * initialize the first page of the async SLRU. Otherwise, the current
1470  * page should be initialized already, so just fetch it.
1471  *
1472  * (We could also take the first path when the SLRU position has just
1473  * wrapped around, but re-zeroing the page is harmless in that case.)
1474  */
1475  pageno = QUEUE_POS_PAGE(queue_head);
1476  if (QUEUE_POS_IS_ZERO(queue_head))
1477  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1478  else
1479  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1481 
1482  /* Note we mark the page dirty before writing in it */
1483  NotifyCtl->shared->page_dirty[slotno] = true;
1484 
1485  while (nextNotify != NULL)
1486  {
1487  Notification *n = (Notification *) lfirst(nextNotify);
1488 
1489  /* Construct a valid queue entry in local variable qe */
1491 
1492  offset = QUEUE_POS_OFFSET(queue_head);
1493 
1494  /* Check whether the entry really fits on the current page */
1495  if (offset + qe.length <= QUEUE_PAGESIZE)
1496  {
1497  /* OK, so advance nextNotify past this item */
1498  nextNotify = lnext(pendingNotifies->events, nextNotify);
1499  }
1500  else
1501  {
1502  /*
1503  * Write a dummy entry to fill up the page. Actually readers will
1504  * only check dboid and since it won't match any reader's database
1505  * OID, they will ignore this entry and move on.
1506  */
1507  qe.length = QUEUE_PAGESIZE - offset;
1508  qe.dboid = InvalidOid;
1509  qe.data[0] = '\0'; /* empty channel */
1510  qe.data[1] = '\0'; /* empty payload */
1511  }
1512 
1513  /* Now copy qe into the shared buffer page */
1514  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1515  &qe,
1516  qe.length);
1517 
1518  /* Advance queue_head appropriately, and detect if page is full */
1519  if (asyncQueueAdvance(&(queue_head), qe.length))
1520  {
1521  /*
1522  * Page is full, so we're done here, but first fill the next page
1523  * with zeroes. The reason to do this is to ensure that slru.c's
1524  * idea of the head page is always the same as ours, which avoids
1525  * boundary problems in SimpleLruTruncate. The test in
1526  * asyncQueueIsFull() ensured that there is room to create this
1527  * page without overrunning the queue.
1528  */
1529  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1530 
1531  /*
1532  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1533  * set flag to remember that we should try to advance the tail
1534  * pointer (we don't want to actually do that right here).
1535  */
1536  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1537  backendTryAdvanceTail = true;
1538 
1539  /* And exit the loop */
1540  break;
1541  }
1542  }
1543 
1544  /* Success, so update the global QUEUE_HEAD */
1545  QUEUE_HEAD = queue_head;
1546 
1547  LWLockRelease(NotifySLRULock);
1548 
1549  return nextNotify;
1550 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:205
List * events
Definition: async.c:397
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:310
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1407
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1372
#define QUEUE_HEAD
Definition: async.c:287
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NotifyCtl
Definition: async.c:300
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static NotificationList * pendingNotifies
Definition: async.c:409
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:394
static bool backendTryAdvanceTail
Definition: async.c:430
#define QUEUE_PAGESIZE
Definition: async.c:301
#define InvalidTransactionId
Definition: transam.h:31
#define QUEUE_CLEANUP_DELAY
Definition: async.c:231
#define InvalidOid
Definition: postgres_ext.h:36
#define lfirst(lc)
Definition: pg_list.h:169
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:279
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1372 of file async.c.

References Assert, AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

1373 {
1374  int pageno = QUEUE_POS_PAGE(*position);
1375  int offset = QUEUE_POS_OFFSET(*position);
1376  bool pageJump = false;
1377 
1378  /*
1379  * Move to the next writing position: First jump over what we have just
1380  * written or read.
1381  */
1382  offset += entryLength;
1383  Assert(offset <= QUEUE_PAGESIZE);
1384 
1385  /*
1386  * In a second step check if another entry can possibly be written to the
1387  * page. If so, stay here, we have reached the next position. If not, then
1388  * we need to move on to the next page.
1389  */
1391  {
1392  pageno++;
1393  if (pageno > QUEUE_MAX_PAGE)
1394  pageno = 0; /* wrap around */
1395  offset = 0;
1396  pageJump = true;
1397  }
1398 
1399  SET_QUEUE_POS(*position, pageno, offset);
1400  return pageJump;
1401 }
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
#define AsyncQueueEntryEmptySize
Definition: async.c:182
#define QUEUE_PAGESIZE
Definition: async.c:301
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
#define QUEUEALIGN(len)
Definition: async.c:180
#define Assert(condition)
Definition: c.h:746
#define QUEUE_MAX_PAGE
Definition: async.c:318
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2171 of file async.c.

References Assert, asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by pg_notification_queue_usage(), and ProcessCompletedNotifies().

2172 {
2173  QueuePosition min;
2174  int oldtailpage;
2175  int newtailpage;
2176  int boundary;
2177 
2178  /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2179  LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2180 
2181  /* Compute the new tail. */
2182  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2183  min = QUEUE_HEAD;
2185  {
2187  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2188  }
2189  oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
2190  LWLockRelease(NotifyQueueLock);
2191 
2192  /*
2193  * We can truncate something if the global tail advanced across an SLRU
2194  * segment boundary.
2195  *
2196  * XXX it might be better to truncate only once every several segments, to
2197  * reduce the number of directory scans.
2198  */
2199  newtailpage = QUEUE_POS_PAGE(min);
2200  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2201  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2202  {
2203  /*
2204  * SimpleLruTruncate() will ask for NotifySLRULock but will also
2205  * release the lock again.
2206  */
2207  SimpleLruTruncate(NotifyCtl, newtailpage);
2208  }
2209 
2210  /*
2211  * Advertise the new tail. This changes asyncQueueIsFull()'s verdict for
2212  * the segment immediately prior to the new tail, allowing fresh data into
2213  * that segment.
2214  */
2215  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2216  QUEUE_TAIL = min;
2217  LWLockRelease(NotifyQueueLock);
2218 
2219  LWLockRelease(NotifyQueueTailLock);
2220 }
#define QUEUE_TAIL
Definition: async.c:288
#define QUEUE_BACKEND_PID(i)
Definition: async.c:290
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1225
#define QUEUE_HEAD
Definition: async.c:287
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NotifyCtl
Definition: async.c:300
#define QUEUE_FIRST_LISTENER
Definition: async.c:289
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:292
#define Assert(condition)
Definition: c.h:746
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
int i
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34
#define QUEUE_BACKEND_POS(i)
Definition: async.c:293
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_MIN(x, y)
Definition: async.c:209
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1606 of file async.c.

References Assert, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

1607 {
1608  double fillDegree;
1609  TimestampTz t;
1610 
1611  fillDegree = asyncQueueUsage();
1612  if (fillDegree < 0.5)
1613  return;
1614 
1615  t = GetCurrentTimestamp();
1616 
1619  {
1620  QueuePosition min = QUEUE_HEAD;
1621  int32 minPid = InvalidPid;
1622 
1624  {
1626  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1627  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1628  minPid = QUEUE_BACKEND_PID(i);
1629  }
1630 
1631  ereport(WARNING,
1632  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1633  (minPid != InvalidPid ?
1634  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1635  : 0),
1636  (minPid != InvalidPid ?
1637  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1638  : 0)));
1639 
1641  }
1642 }
int errhint(const char *fmt,...)
Definition: elog.c:1068
#define QUEUE_BACKEND_PID(i)
Definition: async.c:290
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
static double asyncQueueUsage(void)
Definition: async.c:1577
#define QUEUE_HEAD
Definition: async.c:287
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1677
signed int int32
Definition: c.h:363
#define QUEUE_FIRST_LISTENER
Definition: async.c:289
int errdetail(const char *fmt,...)
Definition: elog.c:954
static AsyncQueueControl * asyncQueueControl
Definition: async.c:285
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:302
#define WARNING
Definition: elog.h:40
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:292
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:746
int errmsg(const char *fmt,...)
Definition: elog.c:821
int i
TimestampTz lastQueueFillWarn
Definition: async.c:280
#define QUEUE_BACKEND_POS(i)
Definition: async.c:293
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MIN(x, y)
Definition: async.c:209
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1341 of file async.c.

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_TAIL, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

1342 {
1343  int nexthead;
1344  int boundary;
1345 
1346  /*
1347  * The queue is full if creating a new head page would create a page that
1348  * logically precedes the current global tail pointer, ie, the head
1349  * pointer would wrap around compared to the tail. We cannot create such
1350  * a head page for fear of confusing slru.c. For safety we round the tail
1351  * pointer back to a segment boundary (compare the truncation logic in
1352  * asyncQueueAdvanceTail).
1353  *
1354  * Note that this test is *not* dependent on how much space there is on
1355  * the current head page. This is necessary because asyncQueueAddEntries
1356  * might try to create the next head page in any case.
1357  */
1358  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1359  if (nexthead > QUEUE_MAX_PAGE)
1360  nexthead = 0; /* wrap around */
1361  boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
1362  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1363  return asyncQueuePagePrecedes(nexthead, boundary);
1364 }
#define QUEUE_TAIL
Definition: async.c:288
#define QUEUE_HEAD
Definition: async.c:287
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34
#define QUEUE_MAX_PAGE
Definition: async.c:318
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1407 of file async.c.

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, AsyncQueueEntry::data, Notification::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

1408 {
1409  size_t channellen = n->channel_len;
1410  size_t payloadlen = n->payload_len;
1411  int entryLength;
1412 
1413  Assert(channellen < NAMEDATALEN);
1414  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1415 
1416  /* The terminators are already included in AsyncQueueEntryEmptySize */
1417  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1418  entryLength = QUEUEALIGN(entryLength);
1419  qe->length = entryLength;
1420  qe->dboid = MyDatabaseId;
1421  qe->xid = GetCurrentTransactionId();
1422  qe->srcPid = MyProcPid;
1423  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1424 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
int MyProcPid
Definition: globals.c:40
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
#define NAMEDATALEN
#define AsyncQueueEntryEmptySize
Definition: async.c:182
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
#define QUEUEALIGN(len)
Definition: async.c:180
Oid MyDatabaseId
Definition: globals.c:85
#define Assert(condition)
Definition: c.h:746
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175

◆ asyncQueuePageDiff()

static int asyncQueuePageDiff ( int  p,
int  q 
)
static

Definition at line 471 of file async.c.

References Assert, and QUEUE_MAX_PAGE.

Referenced by asyncQueuePagePrecedes(), and SignalBackends().

472 {
473  int diff;
474 
475  /*
476  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
477  * in the range 0..QUEUE_MAX_PAGE.
478  */
479  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
480  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
481 
482  diff = p - q;
483  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
484  diff -= QUEUE_MAX_PAGE + 1;
485  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
486  diff += QUEUE_MAX_PAGE + 1;
487  return diff;
488 }
#define Assert(condition)
Definition: c.h:746
#define QUEUE_MAX_PAGE
Definition: async.c:318

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 492 of file async.c.

References asyncQueuePageDiff().

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

493 {
494  return asyncQueuePageDiff(p, q) < 0;
495 }
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:471

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2082 of file async.c.

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

2086 {
2087  bool reachedStop = false;
2088  bool reachedEndOfPage;
2089  AsyncQueueEntry *qe;
2090 
2091  do
2092  {
2093  QueuePosition thisentry = *current;
2094 
2095  if (QUEUE_POS_EQUAL(thisentry, stop))
2096  break;
2097 
2098  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2099 
2100  /*
2101  * Advance *current over this message, possibly to the next page. As
2102  * noted in the comments for asyncQueueReadAllNotifications, we must
2103  * do this before possibly failing while processing the message.
2104  */
2105  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2106 
2107  /* Ignore messages destined for other databases */
2108  if (qe->dboid == MyDatabaseId)
2109  {
2110  if (XidInMVCCSnapshot(qe->xid, snapshot))
2111  {
2112  /*
2113  * The source transaction is still in progress, so we can't
2114  * process this message yet. Break out of the loop, but first
2115  * back up *current so we will reprocess the message next
2116  * time. (Note: it is unlikely but not impossible for
2117  * TransactionIdDidCommit to fail, so we can't really avoid
2118  * this advance-then-back-up behavior when dealing with an
2119  * uncommitted message.)
2120  *
2121  * Note that we must test XidInMVCCSnapshot before we test
2122  * TransactionIdDidCommit, else we might return a message from
2123  * a transaction that is not yet visible to snapshots; compare
2124  * the comments at the head of heapam_visibility.c.
2125  *
2126  * Also, while our own xact won't be listed in the snapshot,
2127  * we need not check for TransactionIdIsCurrentTransactionId
2128  * because our transaction cannot (yet) have queued any
2129  * messages.
2130  */
2131  *current = thisentry;
2132  reachedStop = true;
2133  break;
2134  }
2135  else if (TransactionIdDidCommit(qe->xid))
2136  {
2137  /* qe->data is the null-terminated channel name */
2138  char *channel = qe->data;
2139 
2140  if (IsListeningOn(channel))
2141  {
2142  /* payload follows channel name */
2143  char *payload = qe->data + strlen(channel) + 1;
2144 
2145  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2146  }
2147  }
2148  else
2149  {
2150  /*
2151  * The source transaction aborted or crashed, so we just
2152  * ignore its notifications.
2153  */
2154  }
2155  }
2156 
2157  /* Loop back if we're not at end of page */
2158  } while (!reachedEndOfPage);
2159 
2160  if (QUEUE_POS_EQUAL(*current, stop))
2161  reachedStop = true;
2162 
2163  return reachedStop;
2164 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2238
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1372
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
static bool IsListeningOn(const char *channel)
Definition: async.c:1281
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2274

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1917 of file async.c.

References Assert, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), ProcessCompletedNotifies(), and ProcessIncomingNotify().

1918 {
1919  volatile QueuePosition pos;
1920  QueuePosition head;
1921  Snapshot snapshot;
1922 
1923  /* page_buffer must be adequately aligned, so use a union */
1924  union
1925  {
1926  char buf[QUEUE_PAGESIZE];
1927  AsyncQueueEntry align;
1928  } page_buffer;
1929 
1930  /* Fetch current state */
1931  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1932  /* Assert checks that we have a valid state entry */
1935  head = QUEUE_HEAD;
1936  LWLockRelease(NotifyQueueLock);
1937 
1938  if (QUEUE_POS_EQUAL(pos, head))
1939  {
1940  /* Nothing to do, we have read all notifications already. */
1941  return;
1942  }
1943 
1944  /*----------
1945  * Get snapshot we'll use to decide which xacts are still in progress.
1946  * This is trickier than it might seem, because of race conditions.
1947  * Consider the following example:
1948  *
1949  * Backend 1: Backend 2:
1950  *
1951  * transaction starts
1952  * UPDATE foo SET ...;
1953  * NOTIFY foo;
1954  * commit starts
1955  * queue the notify message
1956  * transaction starts
1957  * LISTEN foo; -- first LISTEN in session
1958  * SELECT * FROM foo WHERE ...;
1959  * commit to clog
1960  * commit starts
1961  * add backend 2 to array of listeners
1962  * advance to queue head (this code)
1963  * commit to clog
1964  *
1965  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1966  * wasn't committed yet. Ideally we'd ensure that client 2 would
1967  * eventually get transaction 1's notify message, but there's no way
1968  * to do that; until we're in the listener array, there's no guarantee
1969  * that the notify message doesn't get removed from the queue.
1970  *
1971  * Therefore the coding technique transaction 2 is using is unsafe:
1972  * applications must commit a LISTEN before inspecting database state,
1973  * if they want to ensure they will see notifications about subsequent
1974  * changes to that state.
1975  *
1976  * What we do guarantee is that we'll see all notifications from
1977  * transactions committing after the snapshot we take here.
1978  * Exec_ListenPreCommit has already added us to the listener array,
1979  * so no not-yet-committed messages can be removed from the queue
1980  * before we see them.
1981  *----------
1982  */
1983  snapshot = RegisterSnapshot(GetLatestSnapshot());
1984 
1985  /*
1986  * It is possible that we fail while trying to send a message to our
1987  * frontend (for example, because of encoding conversion failure). If
1988  * that happens it is critical that we not try to send the same message
1989  * over and over again. Therefore, we place a PG_TRY block here that will
1990  * forcibly advance our queue position before we lose control to an error.
1991  * (We could alternatively retake NotifyQueueLock and move the position
1992  * before handling each individual message, but that seems like too much
1993  * lock traffic.)
1994  */
1995  PG_TRY();
1996  {
1997  bool reachedStop;
1998 
1999  do
2000  {
2001  int curpage = QUEUE_POS_PAGE(pos);
2002  int curoffset = QUEUE_POS_OFFSET(pos);
2003  int slotno;
2004  int copysize;
2005 
2006  /*
2007  * We copy the data from SLRU into a local buffer, so as to avoid
2008  * holding the NotifySLRULock while we are examining the entries
2009  * and possibly transmitting them to our frontend. Copy only the
2010  * part of the page we will actually inspect.
2011  */
2012  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
2014  if (curpage == QUEUE_POS_PAGE(head))
2015  {
2016  /* we only want to read as far as head */
2017  copysize = QUEUE_POS_OFFSET(head) - curoffset;
2018  if (copysize < 0)
2019  copysize = 0; /* just for safety */
2020  }
2021  else
2022  {
2023  /* fetch all the rest of the page */
2024  copysize = QUEUE_PAGESIZE - curoffset;
2025  }
2026  memcpy(page_buffer.buf + curoffset,
2027  NotifyCtl->shared->page_buffer[slotno] + curoffset,
2028  copysize);
2029  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2030  LWLockRelease(NotifySLRULock);
2031 
2032  /*
2033  * Process messages up to the stop position, end of page, or an
2034  * uncommitted message.
2035  *
2036  * Our stop position is what we found to be the head's position
2037  * when we entered this function. It might have changed already.
2038  * But if it has, we will receive (or have already received and
2039  * queued) another signal and come here again.
2040  *
2041  * We are not holding NotifyQueueLock here! The queue can only
2042  * extend beyond the head pointer (see above) and we leave our
2043  * backend's pointer where it is so nobody will truncate or
2044  * rewrite pages under us. Especially we don't want to hold a lock
2045  * while sending the notifications to the frontend.
2046  */
2047  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2048  page_buffer.buf,
2049  snapshot);
2050  } while (!reachedStop);
2051  }
2052  PG_FINALLY();
2053  {
2054  /* Update shared state */
2055  LWLockAcquire(NotifyQueueLock, LW_SHARED);
2057  LWLockRelease(NotifyQueueLock);
2058  }
2059  PG_END_TRY();
2060 
2061  /* Done with snapshot */
2062  UnregisterSnapshot(snapshot);
2063 }
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:290
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:810
#define QUEUE_HEAD
Definition: async.c:287
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NotifyCtl
Definition: async.c:300
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static char * buf
Definition: pg_test_fsync.c:68
#define QUEUE_PAGESIZE
Definition: async.c:301
#define InvalidTransactionId
Definition: transam.h:31
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:852
#define PG_FINALLY()
Definition: elog.h:312
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:494
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2082
#define Assert(condition)
Definition: c.h:746
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:325
#define PG_TRY()
Definition: elog.h:295
#define QUEUE_BACKEND_POS(i)
Definition: async.c:293
#define PG_END_TRY()
Definition: elog.h:320
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1300 of file async.c.

References amRegisteredListener, Assert, i, InvalidBackendId, InvalidOid, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

1301 {
1302  Assert(listenChannels == NIL); /* else caller error */
1303 
1304  if (!amRegisteredListener) /* nothing to do */
1305  return;
1306 
1307  /*
1308  * Need exclusive lock here to manipulate list links.
1309  */
1310  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1311  /* Mark our entry as invalid */
1314  /* and remove it from the list */
1317  else
1318  {
1320  {
1322  {
1324  break;
1325  }
1326  }
1327  }
1329  LWLockRelease(NotifyQueueLock);
1330 
1331  /* mark ourselves as no longer listed in the global array */
1332  amRegisteredListener = false;
1333 }
#define NIL
Definition: pg_list.h:65
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:290
static List * listenChannels
Definition: async.c:325
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define QUEUE_FIRST_LISTENER
Definition: async.c:289
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:424
int BackendId
Definition: backendid.h:21
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:291
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:292
#define Assert(condition)
Definition: c.h:746
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
int i
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1577 of file async.c.

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

1578 {
1579  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1580  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1581  int occupied;
1582 
1583  occupied = headPage - tailPage;
1584 
1585  if (occupied == 0)
1586  return (double) 0; /* fast exit for common case */
1587 
1588  if (occupied < 0)
1589  {
1590  /* head has wrapped around, tail not yet */
1591  occupied += QUEUE_MAX_PAGE + 1;
1592  }
1593 
1594  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1595 }
#define QUEUE_TAIL
Definition: async.c:288
#define QUEUE_HEAD
Definition: async.c:287
#define QUEUE_MAX_PAGE
Definition: async.c:318
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 518 of file async.c.

References add_size(), asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateSharedMemoryAndSemaphores().

519 {
520  bool found;
521  Size size;
522 
523  /*
524  * Create or attach to the AsyncQueueControl structure.
525  *
526  * The used entries in the backend[] array run from 1 to MaxBackends; the
527  * zero'th entry is unused but must be allocated.
528  */
529  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
530  size = add_size(size, offsetof(AsyncQueueControl, backend));
531 
533  ShmemInitStruct("Async Queue Control", size, &found);
534 
535  if (!found)
536  {
537  /* First time through, so initialize it */
538  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
539  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
542  /* zero'th entry won't be used, but let's initialize it anyway */
543  for (int i = 0; i <= MaxBackends; i++)
544  {
549  }
550  }
551 
552  /*
553  * Set up SLRU management of the pg_notify data.
554  */
555  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
557  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
559 
560  if (!found)
561  {
562  /*
563  * During start or reboot, clean out the pg_notify directory.
564  */
566  }
567 }
#define QUEUE_TAIL
Definition: async.c:288
#define QUEUE_BACKEND_PID(i)
Definition: async.c:290
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1426
#define QUEUE_HEAD
Definition: async.c:287
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:186
#define NotifyCtl
Definition: async.c:300
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:289
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
int MaxBackends
Definition: globals.c:136
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492
static AsyncQueueControl * asyncQueueControl
Definition: async.c:285
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:291
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:292
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1449
size_t Size
Definition: c.h:474
int i
TimestampTz lastQueueFillWarn
Definition: async.c:280
#define QUEUE_BACKEND_POS(i)
Definition: async.c:293
#define offsetof(type, field)
Definition: c.h:669
#define InvalidPid
Definition: miscadmin.h:32

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 501 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

502 {
503  Size size;
504 
505  /* This had better match AsyncShmemInit */
506  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
507  size = add_size(size, offsetof(AsyncQueueControl, backend));
508 
510 
511  return size;
512 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:155
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:136
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:474
#define offsetof(type, field)
Definition: c.h:669

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1742 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1743 {
1744  /*
1745  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1746  * we have registered as a listener but have not made any entry in
1747  * listenChannels. In that case, deregister again.
1748  */
1751 
1752  /* And clean up */
1754 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:325
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2434
static void asyncQueueUnregister(void)
Definition: async.c:1300
static bool amRegisteredListener
Definition: async.c:424

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 981 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

982 {
983  ListCell *p;
984 
985  /*
986  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
987  * return as soon as possible
988  */
990  return;
991 
992  if (Trace_notify)
993  elog(DEBUG1, "AtCommit_Notify");
994 
995  /* Perform any pending listen/unlisten actions */
996  if (pendingActions != NULL)
997  {
998  foreach(p, pendingActions->actions)
999  {
1000  ListenAction *actrec = (ListenAction *) lfirst(p);
1001 
1002  switch (actrec->action)
1003  {
1004  case LISTEN_LISTEN:
1005  Exec_ListenCommit(actrec->channel);
1006  break;
1007  case LISTEN_UNLISTEN:
1008  Exec_UnlistenCommit(actrec->channel);
1009  break;
1010  case LISTEN_UNLISTEN_ALL:
1012  break;
1013  }
1014  }
1015  }
1016 
1017  /* If no longer listening to anything, get out of listener array */
1020 
1021  /* And clean up */
1023 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:357
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1184
static List * listenChannels
Definition: async.c:325
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2434
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1153
static NotificationList * pendingNotifies
Definition: async.c:409
List * actions
Definition: async.c:353
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1126
static void asyncQueueUnregister(void)
Definition: async.c:1300
static bool amRegisteredListener
Definition: async.c:424
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:347
ListenActionKind action
Definition: async.c:346

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 854 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

855 {
856  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
858  ereport(ERROR,
859  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
860  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
861 }
static ActionList * pendingActions
Definition: async.c:357
int errcode(int sqlerrcode)
Definition: elog.c:610
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:821

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1832 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1833 {
1834  int my_level = GetCurrentTransactionNestLevel();
1835 
1836  /*
1837  * All we have to do is pop the stack --- the actions/notifies made in
1838  * this subxact are no longer interesting, and the space will be freed
1839  * when CurTransactionContext is recycled. We still have to free the
1840  * ActionList and NotificationList objects themselves, though, because
1841  * those are allocated in TopTransactionContext.
1842  *
1843  * Note that there might be no entries at all, or no entries for the
1844  * current subtransaction level, either because none were ever created, or
1845  * because we reentered this routine due to trouble during subxact abort.
1846  */
1847  while (pendingActions != NULL &&
1848  pendingActions->nestingLevel >= my_level)
1849  {
1850  ActionList *childPendingActions = pendingActions;
1851 
1853  pfree(childPendingActions);
1854  }
1855 
1856  while (pendingNotifies != NULL &&
1857  pendingNotifies->nestingLevel >= my_level)
1858  {
1859  NotificationList *childPendingNotifies = pendingNotifies;
1860 
1862  pfree(childPendingNotifies);
1863  }
1864 }
static ActionList * pendingActions
Definition: async.c:357
int nestingLevel
Definition: async.c:396
void pfree(void *pointer)
Definition: mcxt.c:1057
static NotificationList * pendingNotifies
Definition: async.c:409
struct NotificationList * upper
Definition: async.c:399
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1762 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1763 {
1764  int my_level = GetCurrentTransactionNestLevel();
1765 
1766  /* If there are actions at our nesting level, we must reparent them. */
1767  if (pendingActions != NULL &&
1768  pendingActions->nestingLevel >= my_level)
1769  {
1770  if (pendingActions->upper == NULL ||
1771  pendingActions->upper->nestingLevel < my_level - 1)
1772  {
1773  /* nothing to merge; give the whole thing to the parent */
1775  }
1776  else
1777  {
1778  ActionList *childPendingActions = pendingActions;
1779 
1781 
1782  /*
1783  * Mustn't try to eliminate duplicates here --- see queue_listen()
1784  */
1787  childPendingActions->actions);
1788  pfree(childPendingActions);
1789  }
1790  }
1791 
1792  /* If there are notifies at our nesting level, we must reparent them. */
1793  if (pendingNotifies != NULL &&
1794  pendingNotifies->nestingLevel >= my_level)
1795  {
1796  Assert(pendingNotifies->nestingLevel == my_level);
1797 
1798  if (pendingNotifies->upper == NULL ||
1799  pendingNotifies->upper->nestingLevel < my_level - 1)
1800  {
1801  /* nothing to merge; give the whole thing to the parent */
1803  }
1804  else
1805  {
1806  /*
1807  * Formerly, we didn't bother to eliminate duplicates here, but
1808  * now we must, else we fall foul of "Assert(!found)", either here
1809  * or during a later attempt to build the parent-level hashtable.
1810  */
1811  NotificationList *childPendingNotifies = pendingNotifies;
1812  ListCell *l;
1813 
1815  /* Insert all the subxact's events into parent, except for dups */
1816  foreach(l, childPendingNotifies->events)
1817  {
1818  Notification *childn = (Notification *) lfirst(l);
1819 
1820  if (!AsyncExistsPendingNotify(childn))
1821  AddEventToPendingNotifies(childn);
1822  }
1823  pfree(childPendingNotifies);
1824  }
1825  }
1826 }
List * events
Definition: async.c:397
static ActionList * pendingActions
Definition: async.c:357
List * list_concat(List *list1, const List *list2)
Definition: list.c:515
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2299
int nestingLevel
Definition: async.c:396
void pfree(void *pointer)
Definition: mcxt.c:1057
static NotificationList * pendingNotifies
Definition: async.c:409
List * actions
Definition: async.c:353
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2340
struct NotificationList * upper
Definition: async.c:399
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
#define Assert(condition)
Definition: c.h:746
#define lfirst(lc)
Definition: pg_list.h:169

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2434 of file async.c.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

2435 {
2436  /*
2437  * Everything's allocated in either TopTransactionContext or the context
2438  * for the subtransaction to which it corresponds. So, there's nothing to
2439  * do here except reset the pointers; the space will be reclaimed when the
2440  * contexts are deleted.
2441  */
2442  pendingActions = NULL;
2443  pendingNotifies = NULL;
2444 }
static ActionList * pendingActions
Definition: async.c:357
static NotificationList * pendingNotifies
Definition: async.c:409

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1126 of file async.c.

References IsListeningOn(), lappend(), MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

1127 {
1128  MemoryContext oldcontext;
1129 
1130  /* Do nothing if we are already listening on this channel */
1131  if (IsListeningOn(channel))
1132  return;
1133 
1134  /*
1135  * Add the new channel name to listenChannels.
1136  *
1137  * XXX It is theoretically possible to get an out-of-memory failure here,
1138  * which would be bad because we already committed. For the moment it
1139  * doesn't seem worth trying to guard against that, but maybe improve this
1140  * later.
1141  */
1144  MemoryContextSwitchTo(oldcontext);
1145 }
char * pstrdup(const char *in)
Definition: mcxt.c:1187
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:325
static bool IsListeningOn(const char *channel)
Definition: async.c:1281
MemoryContext TopMemoryContext
Definition: mcxt.c:44
List * lappend(List *list, void *datum)
Definition: list.c:321

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1031 of file async.c.

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

1032 {
1033  QueuePosition head;
1034  QueuePosition max;
1035  BackendId prevListener;
1036 
1037  /*
1038  * Nothing to do if we are already listening to something, nor if we
1039  * already ran this routine in this transaction.
1040  */
1042  return;
1043 
1044  if (Trace_notify)
1045  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1046 
1047  /*
1048  * Before registering, make sure we will unlisten before dying. (Note:
1049  * this action does not get undone if we abort later.)
1050  */
1052  {
1054  unlistenExitRegistered = true;
1055  }
1056 
1057  /*
1058  * This is our first LISTEN, so establish our pointer.
1059  *
1060  * We set our pointer to the global tail pointer and then move it forward
1061  * over already-committed notifications. This ensures we cannot miss any
1062  * not-yet-committed notifications. We might get a few more but that
1063  * doesn't hurt.
1064  *
1065  * In some scenarios there might be a lot of committed notifications that
1066  * have not yet been pruned away (because some backend is being lazy about
1067  * reading them). To reduce our startup time, we can look at other
1068  * backends and adopt the maximum "pos" pointer of any backend that's in
1069  * our database; any notifications it's already advanced over are surely
1070  * committed and need not be re-examined by us. (We must consider only
1071  * backends connected to our DB, because others will not have bothered to
1072  * check committed-ness of notifications in our DB.)
1073  *
1074  * We need exclusive lock here so we can look at other backends' entries
1075  * and manipulate the list links.
1076  */
1077  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1078  head = QUEUE_HEAD;
1079  max = QUEUE_TAIL;
1080  prevListener = InvalidBackendId;
1082  {
1084  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1085  /* Also find last listening backend before this one */
1086  if (i < MyBackendId)
1087  prevListener = i;
1088  }
1092  /* Insert backend into list of listeners at correct position */
1093  if (prevListener > 0)
1094  {
1096  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1097  }
1098  else
1099  {
1102  }
1103  LWLockRelease(NotifyQueueLock);
1104 
1105  /* Now we are listed in the global array, so remember we're listening */
1106  amRegisteredListener = true;
1107 
1108  /*
1109  * Try to move our pointer forward as far as possible. This will skip
1110  * over already-committed notifications, which we want to do because they
1111  * might be quite stale. Note that we are not yet listening on anything,
1112  * so we won't deliver such notifications to our frontend. Also, although
1113  * our transaction might have executed NOTIFY, those message(s) aren't
1114  * queued yet so we won't skip them here.
1115  */
1116  if (!QUEUE_POS_EQUAL(max, head))
1118 }
#define QUEUE_TAIL
Definition: async.c:288
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
BackendId MyBackendId
Definition: globals.c:81
#define QUEUE_BACKEND_PID(i)
Definition: async.c:290
#define QUEUE_HEAD
Definition: async.c:287
static bool unlistenExitRegistered
Definition: async.c:421
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define QUEUE_FIRST_LISTENER
Definition: async.c:289
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:841
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:424
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:291
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1917
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:292
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:293
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MAX(x, y)
Definition: async.c:215

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1184 of file async.c.

References DEBUG1, elog, list_free_deep(), MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

1185 {
1186  if (Trace_notify)
1187  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1188 
1190  listenChannels = NIL;
1191 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:325
void list_free_deep(List *list)
Definition: list.c:1390
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1153 of file async.c.

References DEBUG1, elog, foreach_delete_current, lfirst, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

1154 {
1155  ListCell *q;
1156 
1157  if (Trace_notify)
1158  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1159 
1160  foreach(q, listenChannels)
1161  {
1162  char *lchan = (char *) lfirst(q);
1163 
1164  if (strcmp(lchan, channel) == 0)
1165  {
1167  pfree(lchan);
1168  break;
1169  }
1170  }
1171 
1172  /*
1173  * We do not complain about unlistening something not being listened;
1174  * should we?
1175  */
1176 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static List * listenChannels
Definition: async.c:325
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:357
void pfree(void *pointer)
Definition: mcxt.c:1057
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1875 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1876 {
1877  /*
1878  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1879  * you do here.
1880  */
1881 
1882  /* signal that work needs to be done */
1883  notifyInterruptPending = true;
1884 
1885  /* make sure the event is processed in due course */
1886  SetLatch(MyLatch);
1887 }
void SetLatch(Latch *latch)
Definition: latch.c:505
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1281 of file async.c.

References lfirst.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

1282 {
1283  ListCell *p;
1284 
1285  foreach(p, listenChannels)
1286  {
1287  char *lchan = (char *) lfirst(p);
1288 
1289  if (strcmp(lchan, channel) == 0)
1290  return true;
1291  }
1292  return false;
1293 }
static List * listenChannels
Definition: async.c:325
#define lfirst(lc)
Definition: pg_list.h:169

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2404 of file async.c.

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32, hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2405 {
2406  const Notification *k = *(const Notification *const *) key;
2407 
2408  Assert(keysize == sizeof(Notification *));
2409  /* We don't bother to include the payload's trailing null in the hash */
2410  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2411  k->channel_len + k->payload_len + 1));
2412 }
#define DatumGetUInt32(X)
Definition: postgres.h:486
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
#define Assert(condition)
Definition: c.h:746
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2418 of file async.c.

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2419 {
2420  const Notification *k1 = *(const Notification *const *) key1;
2421  const Notification *k2 = *(const Notification *const *) key2;
2422 
2423  Assert(keysize == sizeof(Notification *));
2424  if (k1->channel_len == k2->channel_len &&
2425  k1->payload_len == k2->payload_len &&
2426  memcmp(k1->data, k2->data,
2427  k1->channel_len + k1->payload_len + 2) == 0)
2428  return 0; /* equal */
2429  return 1; /* not equal */
2430 }
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
#define Assert(condition)
Definition: c.h:746
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2274 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2275 {
2277  {
2279 
2280  pq_beginmessage(&buf, 'A');
2281  pq_sendint32(&buf, srcPid);
2282  pq_sendstring(&buf, channel);
2284  pq_sendstring(&buf, payload);
2285  pq_endmessage(&buf);
2286 
2287  /*
2288  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2289  * happen at the end of the transaction, and for incoming notifies
2290  * ProcessIncomingNotify will do it after finding all the notifies.
2291  */
2292  }
2293  else
2294  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2295 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:214
CommandDest whereToSendOutput
Definition: postgres.c:91
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 808 of file async.c.

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

809 {
810  FuncCallContext *funcctx;
811 
812  /* stuff done only on the first call of the function */
813  if (SRF_IS_FIRSTCALL())
814  {
815  /* create a function context for cross-call persistence */
816  funcctx = SRF_FIRSTCALL_INIT();
817  }
818 
819  /* stuff done on every call of the function */
820  funcctx = SRF_PERCALL_SETUP();
821 
822  if (funcctx->call_cntr < list_length(listenChannels))
823  {
824  char *channel = (char *) list_nth(listenChannels,
825  funcctx->call_cntr);
826 
827  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
828  }
829 
830  SRF_RETURN_DONE(funcctx);
831 }
uint64 call_cntr
Definition: funcapi.h:65
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:294
static List * listenChannels
Definition: async.c:325
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:298
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:300
static void * list_nth(const List *list, int n)
Definition: pg_list.h:266
static int list_length(const List *l)
Definition: pg_list.h:149
#define CStringGetTextDatum(s)
Definition: builtins.h:86
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:318
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:296

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1557 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

1558 {
1559  double usage;
1560 
1561  /* Advance the queue tail so we don't report a too-large result */
1563 
1564  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1565  usage = asyncQueueUsage();
1566  LWLockRelease(NotifyQueueLock);
1567 
1568  PG_RETURN_FLOAT8(usage);
1569 }
static void usage(void)
Definition: pg_standby.c:589
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:365
static double asyncQueueUsage(void)
Definition: async.c:1577
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
static void asyncQueueAdvanceTail(void)
Definition: async.c:2171
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 575 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

576 {
577  const char *channel;
578  const char *payload;
579 
580  if (PG_ARGISNULL(0))
581  channel = "";
582  else
583  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
584 
585  if (PG_ARGISNULL(1))
586  payload = "";
587  else
588  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
589 
590  /* For NOTIFY as a statement, this is checked in ProcessUtility */
592 
593  Async_Notify(channel, payload);
594 
595  PG_RETURN_VOID();
596 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:308
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:444
#define PG_RETURN_VOID()
Definition: fmgr.h:348
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:609
char * text_to_cstring(const text *t)
Definition: varlena.c:221

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 879 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

880 {
881  ListCell *p;
882 
884  return; /* no relevant statements in this xact */
885 
886  if (Trace_notify)
887  elog(DEBUG1, "PreCommit_Notify");
888 
889  /* Preflight for any pending listen/unlisten actions */
890  if (pendingActions != NULL)
891  {
892  foreach(p, pendingActions->actions)
893  {
894  ListenAction *actrec = (ListenAction *) lfirst(p);
895 
896  switch (actrec->action)
897  {
898  case LISTEN_LISTEN:
900  break;
901  case LISTEN_UNLISTEN:
902  /* there is no Exec_UnlistenPreCommit() */
903  break;
904  case LISTEN_UNLISTEN_ALL:
905  /* there is no Exec_UnlistenAllPreCommit() */
906  break;
907  }
908  }
909  }
910 
911  /* Queue any pending notifies (must happen after the above) */
912  if (pendingNotifies)
913  {
914  ListCell *nextNotify;
915 
916  /*
917  * Make sure that we have an XID assigned to the current transaction.
918  * GetCurrentTransactionId is cheap if we already have an XID, but not
919  * so cheap if we don't, and we'd prefer not to do that work while
920  * holding NotifyQueueLock.
921  */
922  (void) GetCurrentTransactionId();
923 
924  /*
925  * Serialize writers by acquiring a special lock that we hold till
926  * after commit. This ensures that queue entries appear in commit
927  * order, and in particular that there are never uncommitted queue
928  * entries ahead of committed ones, so an uncommitted transaction
929  * can't block delivery of deliverable notifications.
930  *
931  * We use a heavyweight lock so that it'll automatically be released
932  * after either commit or abort. This also allows deadlocks to be
933  * detected, though really a deadlock shouldn't be possible here.
934  *
935  * The lock is on "database 0", which is pretty ugly but it doesn't
936  * seem worth inventing a special locktag category just for this.
937  * (Historical note: before PG 9.0, a similar lock on "database 0" was
938  * used by the flatfiles mechanism.)
939  */
940  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
942 
943  /* Now push the notifications into the queue */
945 
946  nextNotify = list_head(pendingNotifies->events);
947  while (nextNotify != NULL)
948  {
949  /*
950  * Add the pending notifications to the queue. We acquire and
951  * release NotifyQueueLock once per page, which might be overkill
952  * but it does allow readers to get in while we're doing this.
953  *
954  * A full queue is very uncommon and should really not happen,
955  * given that we have so much space available in the SLRU pages.
956  * Nevertheless we need to deal with this possibility. Note that
957  * when we get here we are in the process of committing our
958  * transaction, but we have not yet committed to clog, so at this
959  * point in time we can still roll the transaction back.
960  */
961  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
963  if (asyncQueueIsFull())
964  ereport(ERROR,
965  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
966  errmsg("too many notifications in the NOTIFY queue")));
967  nextNotify = asyncQueueAddEntries(nextNotify);
968  LWLockRelease(NotifyQueueLock);
969  }
970  }
971 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:397
static ActionList * pendingActions
Definition: async.c:357
int errcode(int sqlerrcode)
Definition: elog.c:610
static bool asyncQueueIsFull(void)
Definition: async.c:1341
static void asyncQueueFillWarning(void)
Definition: async.c:1606
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
static bool backendHasSentNotifications
Definition: async.c:427
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
List * actions
Definition: async.c:353
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1017
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1443
#define lfirst(lc)
Definition: pg_list.h:169
static void Exec_ListenPreCommit(void)
Definition: async.c:1031
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
bool Trace_notify
Definition: async.c:433
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define elog(elevel,...)
Definition: elog.h:214
ListenActionKind action
Definition: async.c:346

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1217 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1218 {
1219  MemoryContext caller_context;
1220 
1221  /* Nothing to do if we didn't send any notifications */
1223  return;
1224 
1225  /*
1226  * We reset the flag immediately; otherwise, if any sort of error occurs
1227  * below, we'd be locked up in an infinite loop, because control will come
1228  * right back here after error cleanup.
1229  */
1231 
1232  /*
1233  * We must preserve the caller's memory context (probably MessageContext)
1234  * across the transaction we do here.
1235  */
1236  caller_context = CurrentMemoryContext;
1237 
1238  if (Trace_notify)
1239  elog(DEBUG1, "ProcessCompletedNotifies");
1240 
1241  /*
1242  * We must run asyncQueueReadAllNotifications inside a transaction, else
1243  * bad things happen if it gets an error.
1244  */
1246 
1247  /* Send signals to other backends */
1248  SignalBackends();
1249 
1250  if (listenChannels != NIL)
1251  {
1252  /* Read the queue ourselves, and send relevant stuff to the frontend */
1254  }
1255 
1256  /*
1257  * If it's time to try to advance the global tail pointer, do that.
1258  */
1260  {
1261  backendTryAdvanceTail = false;
1263  }
1264 
1266 
1267  MemoryContextSwitchTo(caller_context);
1268 
1269  /* We don't need pq_flush() here since postgres.c will do one shortly */
1270 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2947
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:325
static bool backendHasSentNotifications
Definition: async.c:427
static bool backendTryAdvanceTail
Definition: async.c:430
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2171
static void SignalBackends(void)
Definition: async.c:1659
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1917
void StartTransactionCommand(void)
Definition: xact.c:2846
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( void  )
static

Definition at line 2235 of file async.c.

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

2236 {
2237  /* We *must* reset the flag */
2238  notifyInterruptPending = false;
2239 
2240  /* Do nothing else if we aren't actively listening */
2241  if (listenChannels == NIL)
2242  return;
2243 
2244  if (Trace_notify)
2245  elog(DEBUG1, "ProcessIncomingNotify");
2246 
2247  set_ps_display("notify interrupt");
2248 
2249  /*
2250  * We must run asyncQueueReadAllNotifications inside a transaction, else
2251  * bad things happen if it gets an error.
2252  */
2254 
2256 
2258 
2259  /*
2260  * Must flush the notify messages to ensure frontend gets them promptly.
2261  */
2262  pq_flush();
2263 
2264  set_ps_display("idle");
2265 
2266  if (Trace_notify)
2267  elog(DEBUG1, "ProcessIncomingNotify: done");
2268 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
#define pq_flush()
Definition: libpq.h:39
void CommitTransactionCommand(void)
Definition: xact.c:2947
static List * listenChannels
Definition: async.c:325
void set_ps_display(const char *activity)
Definition: ps_status.c:349
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1917
void StartTransactionCommand(void)
Definition: xact.c:2846
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1901 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1902 {
1904  return; /* not really idle */
1905 
1906  while (notifyInterruptPending)
1908 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
static void ProcessIncomingNotify(void)
Definition: async.c:2235
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 708 of file async.c.

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, offsetof, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

709 {
710  MemoryContext oldcontext;
711  ListenAction *actrec;
712  int my_level = GetCurrentTransactionNestLevel();
713 
714  /*
715  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
716  * be too complicated to ensure we get the right interactions of
717  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
718  * would be any performance benefit anyway in sane applications.
719  */
721 
722  /* space for terminating null is included in sizeof(ListenAction) */
723  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
724  strlen(channel) + 1);
725  actrec->action = action;
726  strcpy(actrec->channel, channel);
727 
728  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
729  {
730  ActionList *actions;
731 
732  /*
733  * First action in current sub(xact). Note that we allocate the
734  * ActionList in TopTransactionContext; the nestingLevel might get
735  * changed later by AtSubCommit_Notify.
736  */
737  actions = (ActionList *)
739  actions->nestingLevel = my_level;
740  actions->actions = list_make1(actrec);
741  actions->upper = pendingActions;
742  pendingActions = actions;
743  }
744  else
746 
747  MemoryContextSwitchTo(oldcontext);
748 }
MemoryContext TopTransactionContext
Definition: mcxt.c:49
static ActionList * pendingActions
Definition: async.c:357
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
#define list_make1(x1)
Definition: pg_list.h:206
List * actions
Definition: async.c:353
List * lappend(List *list, void *datum)
Definition: list.c:321
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
void * palloc(Size size)
Definition: mcxt.c:950
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:347
ListenActionKind action
Definition: async.c:346
#define offsetof(type, field)
Definition: c.h:669

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1659 of file async.c.

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by ProcessCompletedNotifies().

1660 {
1661  int32 *pids;
1662  BackendId *ids;
1663  int count;
1664 
1665  /*
1666  * Identify backends that we need to signal. We don't want to send
1667  * signals while holding the NotifyQueueLock, so this loop just builds a
1668  * list of target PIDs.
1669  *
1670  * XXX in principle these pallocs could fail, which would be bad. Maybe
1671  * preallocate the arrays? But in practice this is only run in trivial
1672  * transactions, so there should surely be space available.
1673  */
1674  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1675  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1676  count = 0;
1677 
1678  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1680  {
1681  int32 pid = QUEUE_BACKEND_PID(i);
1682  QueuePosition pos;
1683 
1684  Assert(pid != InvalidPid);
1685  if (pid == MyProcPid)
1686  continue; /* never signal self */
1687  pos = QUEUE_BACKEND_POS(i);
1689  {
1690  /*
1691  * Always signal listeners in our own database, unless they're
1692  * already caught up (unlikely, but possible).
1693  */
1694  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1695  continue;
1696  }
1697  else
1698  {
1699  /*
1700  * Listeners in other databases should be signaled only if they
1701  * are far behind.
1702  */
1705  continue;
1706  }
1707  /* OK, need to signal this one */
1708  pids[count] = pid;
1709  ids[count] = i;
1710  count++;
1711  }
1712  LWLockRelease(NotifyQueueLock);
1713 
1714  /* Now send signals */
1715  for (int i = 0; i < count; i++)
1716  {
1717  int32 pid = pids[i];
1718 
1719  /*
1720  * Note: assuming things aren't broken, a signal failure here could
1721  * only occur if the target backend exited since we released
1722  * NotifyQueueLock; which is unlikely but certainly possible. So we
1723  * just log a low-level debug message if it happens.
1724  */
1725  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1726  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1727  }
1728 
1729  pfree(pids);
1730  pfree(ids);
1731 }
int MyProcPid
Definition: globals.c:40
#define QUEUE_BACKEND_PID(i)
Definition: async.c:290
#define DEBUG3
Definition: elog.h:23
#define QUEUE_HEAD
Definition: async.c:287
signed int int32
Definition: c.h:363
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:471
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
void pfree(void *pointer)
Definition: mcxt.c:1057
#define QUEUE_FIRST_LISTENER
Definition: async.c:289
int MaxBackends
Definition: globals.c:136
#define QUEUE_CLEANUP_DELAY
Definition: async.c:231
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:85
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:291
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:292
#define Assert(condition)
Definition: c.h:746
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
void * palloc(Size size)
Definition: mcxt.c:950
#define elog(elevel,...)
Definition: elog.h:214
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:293
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define InvalidPid
Definition: miscadmin.h:32

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 285 of file async.c.

◆ backendHasSentNotifications

bool backendHasSentNotifications = false
static

Definition at line 427 of file async.c.

Referenced by PreCommit_Notify(), and ProcessCompletedNotifies().

◆ backendTryAdvanceTail

bool backendTryAdvanceTail = false
static

Definition at line 430 of file async.c.

Referenced by asyncQueueAddEntries(), and ProcessCompletedNotifies().

◆ listenChannels

List* listenChannels = NIL
static

Definition at line 325 of file async.c.

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 298 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

ActionList* pendingActions = NULL
static

Definition at line 357 of file async.c.

Referenced by AtSubAbort_Notify(), AtSubCommit_Notify(), and queue_listen().

◆ pendingNotifies

NotificationList* pendingNotifies = NULL
static

Definition at line 409 of file async.c.

Referenced by Async_Notify(), AtSubAbort_Notify(), and AtSubCommit_Notify().

◆ Trace_notify

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 421 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().