PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)   ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)   ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL }
 

Functions

static int asyncQueuePageDiff (int p, int q)
 
static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (void)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void ProcessCompletedNotifies (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool backendHasSentNotifications = false
 
static bool backendTryAdvanceTail = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 182 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 405 of file async.c.

Referenced by AddEventToPendingNotifies().

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 156 of file async.c.

Referenced by Async_Notify(), and asyncQueueNotificationToEntry().

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

◆ QUEUE_BACKEND_PID

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 231 of file async.c.

Referenced by asyncQueueAddEntries(), and SignalBackends().

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 305 of file async.c.

Referenced by asyncQueueFillWarning().

◆ QUEUE_HEAD

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

Definition at line 321 of file async.c.

Referenced by asyncQueueAdvance(), asyncQueueIsFull(), asyncQueuePageDiff(), and asyncQueueUsage().

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
 
)    ((x).page == (y).page && (x).offset == (y).offset)

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)    ((x).page == 0 && (x).offset == 0)

Definition at line 205 of file async.c.

Referenced by asyncQueueAddEntries().

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:500

Definition at line 215 of file async.c.

Referenced by Exec_ListenPreCommit().

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:500

Definition at line 209 of file async.c.

Referenced by asyncQueueAdvanceTail(), and asyncQueueFillWarning().

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 291 of file async.c.

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 180 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 196 of file async.c.

Referenced by asyncQueueAdvance(), and AsyncShmemInit().

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 340 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2370 of file async.c.

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), and notification_match().

Referenced by Async_Notify(), and AtSubCommit_Notify().

2371 {
2373 
2374  /* Create the hash table if it's time to */
2376  pendingNotifies->hashtab == NULL)
2377  {
2378  HASHCTL hash_ctl;
2379  ListCell *l;
2380 
2381  /* Create the hash table */
2382  hash_ctl.keysize = sizeof(Notification *);
2383  hash_ctl.entrysize = sizeof(NotificationHash);
2384  hash_ctl.hash = notification_hash;
2385  hash_ctl.match = notification_match;
2386  hash_ctl.hcxt = CurTransactionContext;
2388  hash_create("Pending Notifies",
2389  256L,
2390  &hash_ctl,
2392 
2393  /* Insert all the already-existing events */
2394  foreach(l, pendingNotifies->events)
2395  {
2396  Notification *oldn = (Notification *) lfirst(l);
2397  NotificationHash *hentry;
2398  bool found;
2399 
2401  &oldn,
2402  HASH_ENTER,
2403  &found);
2404  Assert(!found);
2405  hentry->event = oldn;
2406  }
2407  }
2408 
2409  /* Add new event to the list, in order */
2411 
2412  /* Add event to the hash table if needed */
2413  if (pendingNotifies->hashtab != NULL)
2414  {
2415  NotificationHash *hentry;
2416  bool found;
2417 
2419  &n,
2420  HASH_ENTER,
2421  &found);
2422  Assert(!found);
2423  hentry->event = n;
2424  }
2425 }
#define NIL
Definition: pg_list.h:65
struct NotificationHash NotificationHash
List * events
Definition: async.c:400
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
Size entrysize
Definition: hsearch.h:76
MemoryContext CurTransactionContext
Definition: mcxt.c:54
Notification * event
Definition: async.c:409
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static NotificationList * pendingNotifies
Definition: async.c:412
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
List * lappend(List *list, void *datum)
Definition: list.c:336
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:405
Size keysize
Definition: hsearch.h:75
HashCompareFunc match
Definition: hsearch.h:80
HTAB * hashtab
Definition: async.c:401
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2447
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2433
#define HASH_COMPARE
Definition: hsearch.h:99
static int list_length(const List *l)
Definition: pg_list.h:149
HashValueFunc hash
Definition: hsearch.h:78
#define HASH_FUNCTION
Definition: hsearch.h:98

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 765 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

766 {
767  if (Trace_notify)
768  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
769 
770  queue_listen(LISTEN_LISTEN, channel);
771 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:717
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 618 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

619 {
620  int my_level = GetCurrentTransactionNestLevel();
621  size_t channel_len;
622  size_t payload_len;
623  Notification *n;
624  MemoryContext oldcontext;
625 
626  if (IsParallelWorker())
627  elog(ERROR, "cannot send notifications from a parallel worker");
628 
629  if (Trace_notify)
630  elog(DEBUG1, "Async_Notify(%s)", channel);
631 
632  channel_len = channel ? strlen(channel) : 0;
633  payload_len = payload ? strlen(payload) : 0;
634 
635  /* a channel name must be specified */
636  if (channel_len == 0)
637  ereport(ERROR,
638  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
639  errmsg("channel name cannot be empty")));
640 
641  /* enforce length limits */
642  if (channel_len >= NAMEDATALEN)
643  ereport(ERROR,
644  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
645  errmsg("channel name too long")));
646 
647  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
648  ereport(ERROR,
649  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
650  errmsg("payload string too long")));
651 
652  /*
653  * We must construct the Notification entry, even if we end up not using
654  * it, in order to compare it cheaply to existing list entries.
655  *
656  * The notification list needs to live until end of transaction, so store
657  * it in the transaction context.
658  */
660 
661  n = (Notification *) palloc(offsetof(Notification, data) +
662  channel_len + payload_len + 2);
663  n->channel_len = channel_len;
664  n->payload_len = payload_len;
665  strcpy(n->data, channel);
666  if (payload)
667  strcpy(n->data + channel_len + 1, payload);
668  else
669  n->data[channel_len + 1] = '\0';
670 
671  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
672  {
673  NotificationList *notifies;
674 
675  /*
676  * First notify event in current (sub)xact. Note that we allocate the
677  * NotificationList in TopTransactionContext; the nestingLevel might
678  * get changed later by AtSubCommit_Notify.
679  */
680  notifies = (NotificationList *)
682  sizeof(NotificationList));
683  notifies->nestingLevel = my_level;
684  notifies->events = list_make1(n);
685  /* We certainly don't need a hashtable yet */
686  notifies->hashtab = NULL;
687  notifies->upper = pendingNotifies;
688  pendingNotifies = notifies;
689  }
690  else
691  {
692  /* Now check for duplicates */
694  {
695  /* It's a dup, so forget it */
696  pfree(n);
697  MemoryContextSwitchTo(oldcontext);
698  return;
699  }
700 
701  /* Append more events to existing list */
703  }
704 
705  MemoryContextSwitchTo(oldcontext);
706 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:400
MemoryContext TopTransactionContext
Definition: mcxt.c:53
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:394
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:54
int errcode(int sqlerrcode)
Definition: elog.c:698
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2329
int nestingLevel
Definition: async.c:399
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:412
#define ERROR
Definition: elog.h:46
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2370
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:402
#define IsParallelWorker()
Definition: parallel.h:61
#define ereport(elevel,...)
Definition: elog.h:157
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
HTAB * hashtab
Definition: async.c:401
uint16 channel_len
Definition: async.c:391
uint16 payload_len
Definition: async.c:392
bool Trace_notify
Definition: async.c:436
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define elog(elevel,...)
Definition: elog.h:232
#define offsetof(type, field)
Definition: c.h:727

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 779 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

780 {
781  if (Trace_notify)
782  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
783 
784  /* If we couldn't possibly be listening, no need to queue anything */
785  if (pendingActions == NULL && !unlistenExitRegistered)
786  return;
787 
788  queue_listen(LISTEN_UNLISTEN, channel);
789 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:717
static ActionList * pendingActions
Definition: async.c:360
static bool unlistenExitRegistered
Definition: async.c:424
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 797 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

798 {
799  if (Trace_notify)
800  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
801 
802  /* If we couldn't possibly be listening, no need to queue anything */
803  if (pendingActions == NULL && !unlistenExitRegistered)
804  return;
805 
807 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:717
static ActionList * pendingActions
Definition: async.c:360
static bool unlistenExitRegistered
Definition: async.c:424
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 850 of file async.c.

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

851 {
854 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1193
static void asyncQueueUnregister(void)
Definition: async.c:1309

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2329 of file async.c.

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, and Notification::payload_len.

Referenced by Async_Notify(), and AtSubCommit_Notify().

2330 {
2331  if (pendingNotifies == NULL)
2332  return false;
2333 
2334  if (pendingNotifies->hashtab != NULL)
2335  {
2336  /* Use the hash table to probe for a match */
2338  &n,
2339  HASH_FIND,
2340  NULL))
2341  return true;
2342  }
2343  else
2344  {
2345  /* Must scan the event list */
2346  ListCell *l;
2347 
2348  foreach(l, pendingNotifies->events)
2349  {
2350  Notification *oldn = (Notification *) lfirst(l);
2351 
2352  if (n->channel_len == oldn->channel_len &&
2353  n->payload_len == oldn->payload_len &&
2354  memcmp(n->data, oldn->data,
2355  n->channel_len + n->payload_len + 2) == 0)
2356  return true;
2357  }
2358  }
2359 
2360  return false;
2361 }
List * events
Definition: async.c:400
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:394
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static NotificationList * pendingNotifies
Definition: async.c:412
HTAB * hashtab
Definition: async.c:401
#define lfirst(lc)
Definition: pg_list.h:169
uint16 channel_len
Definition: async.c:391
uint16 payload_len
Definition: async.c:392

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1452 of file async.c.

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), backendTryAdvanceTail, AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), and SimpleLruZeroPage().

Referenced by PreCommit_Notify().

1453 {
1454  AsyncQueueEntry qe;
1455  QueuePosition queue_head;
1456  int pageno;
1457  int offset;
1458  int slotno;
1459 
1460  /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
1461  LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
1462 
1463  /*
1464  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1465  * memory upon exiting. The reason for this is that if we have to advance
1466  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1467  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1468  * subsequent insertions would try to put entries into a page that slru.c
1469  * thinks doesn't exist yet.) So, use a local position variable. Note
1470  * that if we do fail, any already-inserted queue entries are forgotten;
1471  * this is okay, since they'd be useless anyway after our transaction
1472  * rolls back.
1473  */
1474  queue_head = QUEUE_HEAD;
1475 
1476  /*
1477  * If this is the first write since the postmaster started, we need to
1478  * initialize the first page of the async SLRU. Otherwise, the current
1479  * page should be initialized already, so just fetch it.
1480  *
1481  * (We could also take the first path when the SLRU position has just
1482  * wrapped around, but re-zeroing the page is harmless in that case.)
1483  */
1484  pageno = QUEUE_POS_PAGE(queue_head);
1485  if (QUEUE_POS_IS_ZERO(queue_head))
1486  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1487  else
1488  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1490 
1491  /* Note we mark the page dirty before writing in it */
1492  NotifyCtl->shared->page_dirty[slotno] = true;
1493 
1494  while (nextNotify != NULL)
1495  {
1496  Notification *n = (Notification *) lfirst(nextNotify);
1497 
1498  /* Construct a valid queue entry in local variable qe */
1500 
1501  offset = QUEUE_POS_OFFSET(queue_head);
1502 
1503  /* Check whether the entry really fits on the current page */
1504  if (offset + qe.length <= QUEUE_PAGESIZE)
1505  {
1506  /* OK, so advance nextNotify past this item */
1507  nextNotify = lnext(pendingNotifies->events, nextNotify);
1508  }
1509  else
1510  {
1511  /*
1512  * Write a dummy entry to fill up the page. Actually readers will
1513  * only check dboid and since it won't match any reader's database
1514  * OID, they will ignore this entry and move on.
1515  */
1516  qe.length = QUEUE_PAGESIZE - offset;
1517  qe.dboid = InvalidOid;
1518  qe.data[0] = '\0'; /* empty channel */
1519  qe.data[1] = '\0'; /* empty payload */
1520  }
1521 
1522  /* Now copy qe into the shared buffer page */
1523  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1524  &qe,
1525  qe.length);
1526 
1527  /* Advance queue_head appropriately, and detect if page is full */
1528  if (asyncQueueAdvance(&(queue_head), qe.length))
1529  {
1530  /*
1531  * Page is full, so we're done here, but first fill the next page
1532  * with zeroes. The reason to do this is to ensure that slru.c's
1533  * idea of the head page is always the same as ours, which avoids
1534  * boundary problems in SimpleLruTruncate. The test in
1535  * asyncQueueIsFull() ensured that there is room to create this
1536  * page without overrunning the queue.
1537  */
1538  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1539 
1540  /*
1541  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1542  * set flag to remember that we should try to advance the tail
1543  * pointer (we don't want to actually do that right here).
1544  */
1545  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1546  backendTryAdvanceTail = true;
1547 
1548  /* And exit the loop */
1549  break;
1550  }
1551  }
1552 
1553  /* Success, so update the global QUEUE_HEAD */
1554  QUEUE_HEAD = queue_head;
1555 
1556  LWLockRelease(NotifySLRULock);
1557 
1558  return nextNotify;
1559 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:205
List * events
Definition: async.c:400
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:322
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1416
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1381
#define QUEUE_HEAD
Definition: async.c:289
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define NotifyCtl
Definition: async.c:303
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static NotificationList * pendingNotifies
Definition: async.c:412
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:395
static bool backendTryAdvanceTail
Definition: async.c:433
#define QUEUE_PAGESIZE
Definition: async.c:304
#define InvalidTransactionId
Definition: transam.h:31
#define QUEUE_CLEANUP_DELAY
Definition: async.c:231
#define InvalidOid
Definition: postgres_ext.h:36
#define lfirst(lc)
Definition: pg_list.h:169
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:280
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1381 of file async.c.

References Assert, AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

1382 {
1383  int pageno = QUEUE_POS_PAGE(*position);
1384  int offset = QUEUE_POS_OFFSET(*position);
1385  bool pageJump = false;
1386 
1387  /*
1388  * Move to the next writing position: First jump over what we have just
1389  * written or read.
1390  */
1391  offset += entryLength;
1392  Assert(offset <= QUEUE_PAGESIZE);
1393 
1394  /*
1395  * In a second step check if another entry can possibly be written to the
1396  * page. If so, stay here, we have reached the next position. If not, then
1397  * we need to move on to the next page.
1398  */
1400  {
1401  pageno++;
1402  if (pageno > QUEUE_MAX_PAGE)
1403  pageno = 0; /* wrap around */
1404  offset = 0;
1405  pageJump = true;
1406  }
1407 
1408  SET_QUEUE_POS(*position, pageno, offset);
1409  return pageJump;
1410 }
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
#define AsyncQueueEntryEmptySize
Definition: async.c:182
#define QUEUE_PAGESIZE
Definition: async.c:304
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
#define QUEUEALIGN(len)
Definition: async.c:180
#define Assert(condition)
Definition: c.h:804
#define QUEUE_MAX_PAGE
Definition: async.c:321
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2185 of file async.c.

References Assert, asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by pg_notification_queue_usage(), and ProcessCompletedNotifies().

2186 {
2187  QueuePosition min;
2188  int oldtailpage;
2189  int newtailpage;
2190  int boundary;
2191 
2192  /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2193  LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2194 
2195  /*
2196  * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2197  * (ie, exactly match at least one backend's queue position), so it must
2198  * be updated atomically with the actual computation. Since v13, we could
2199  * get away with not doing it like that, but it seems prudent to keep it
2200  * so.
2201  *
2202  * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2203  * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2204  * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2205  * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2206  * there are pages we can truncate but haven't yet finished doing so.
2207  *
2208  * For concurrency's sake, we don't want to hold NotifyQueueLock while
2209  * performing SimpleLruTruncate. This is OK because no backend will try
2210  * to access the pages we are in the midst of truncating.
2211  */
2212  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2213  min = QUEUE_HEAD;
2215  {
2217  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2218  }
2219  QUEUE_TAIL = min;
2220  oldtailpage = QUEUE_STOP_PAGE;
2221  LWLockRelease(NotifyQueueLock);
2222 
2223  /*
2224  * We can truncate something if the global tail advanced across an SLRU
2225  * segment boundary.
2226  *
2227  * XXX it might be better to truncate only once every several segments, to
2228  * reduce the number of directory scans.
2229  */
2230  newtailpage = QUEUE_POS_PAGE(min);
2231  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2232  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2233  {
2234  /*
2235  * SimpleLruTruncate() will ask for NotifySLRULock but will also
2236  * release the lock again.
2237  */
2238  SimpleLruTruncate(NotifyCtl, newtailpage);
2239 
2240  /*
2241  * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
2242  * for the segment immediately prior to the old tail, allowing fresh
2243  * data into that segment.
2244  */
2245  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2246  QUEUE_STOP_PAGE = newtailpage;
2247  LWLockRelease(NotifyQueueLock);
2248  }
2249 
2250  LWLockRelease(NotifyQueueTailLock);
2251 }
#define QUEUE_TAIL
Definition: async.c:290
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1226
#define QUEUE_HEAD
Definition: async.c:289
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define NotifyCtl
Definition: async.c:303
#define QUEUE_FIRST_LISTENER
Definition: async.c:292
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:500
int BackendId
Definition: backendid.h:21
#define QUEUE_STOP_PAGE
Definition: async.c:291
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:295
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
int i
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34
#define QUEUE_BACKEND_POS(i)
Definition: async.c:296
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_MIN(x, y)
Definition: async.c:209
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1620 of file async.c.

References Assert, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

1621 {
1622  double fillDegree;
1623  TimestampTz t;
1624 
1625  fillDegree = asyncQueueUsage();
1626  if (fillDegree < 0.5)
1627  return;
1628 
1629  t = GetCurrentTimestamp();
1630 
1633  {
1634  QueuePosition min = QUEUE_HEAD;
1635  int32 minPid = InvalidPid;
1636 
1638  {
1640  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1641  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1642  minPid = QUEUE_BACKEND_PID(i);
1643  }
1644 
1645  ereport(WARNING,
1646  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1647  (minPid != InvalidPid ?
1648  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1649  : 0),
1650  (minPid != InvalidPid ?
1651  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1652  : 0)));
1653 
1655  }
1656 }
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
static double asyncQueueUsage(void)
Definition: async.c:1591
#define QUEUE_HEAD
Definition: async.c:289
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
signed int int32
Definition: c.h:429
#define QUEUE_FIRST_LISTENER
Definition: async.c:292
int errdetail(const char *fmt,...)
Definition: elog.c:1042
static AsyncQueueControl * asyncQueueControl
Definition: async.c:287
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:305
#define WARNING
Definition: elog.h:40
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:295
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
TimestampTz lastQueueFillWarn
Definition: async.c:282
#define QUEUE_BACKEND_POS(i)
Definition: async.c:296
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MIN(x, y)
Definition: async.c:209
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1350 of file async.c.

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

1351 {
1352  int nexthead;
1353  int boundary;
1354 
1355  /*
1356  * The queue is full if creating a new head page would create a page that
1357  * logically precedes the current global tail pointer, ie, the head
1358  * pointer would wrap around compared to the tail. We cannot create such
1359  * a head page for fear of confusing slru.c. For safety we round the tail
1360  * pointer back to a segment boundary (truncation logic in
1361  * asyncQueueAdvanceTail does not do this, so doing it here is optional).
1362  *
1363  * Note that this test is *not* dependent on how much space there is on
1364  * the current head page. This is necessary because asyncQueueAddEntries
1365  * might try to create the next head page in any case.
1366  */
1367  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1368  if (nexthead > QUEUE_MAX_PAGE)
1369  nexthead = 0; /* wrap around */
1370  boundary = QUEUE_STOP_PAGE;
1371  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1372  return asyncQueuePagePrecedes(nexthead, boundary);
1373 }
#define QUEUE_HEAD
Definition: async.c:289
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:500
#define QUEUE_STOP_PAGE
Definition: async.c:291
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34
#define QUEUE_MAX_PAGE
Definition: async.c:321
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1416 of file async.c.

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, AsyncQueueEntry::data, Notification::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

1417 {
1418  size_t channellen = n->channel_len;
1419  size_t payloadlen = n->payload_len;
1420  int entryLength;
1421 
1422  Assert(channellen < NAMEDATALEN);
1423  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1424 
1425  /* The terminators are already included in AsyncQueueEntryEmptySize */
1426  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1427  entryLength = QUEUEALIGN(entryLength);
1428  qe->length = entryLength;
1429  qe->dboid = MyDatabaseId;
1430  qe->xid = GetCurrentTransactionId();
1431  qe->srcPid = MyProcPid;
1432  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1433 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
int MyProcPid
Definition: globals.c:43
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:394
#define NAMEDATALEN
#define AsyncQueueEntryEmptySize
Definition: async.c:182
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
#define QUEUEALIGN(len)
Definition: async.c:180
Oid MyDatabaseId
Definition: globals.c:88
#define Assert(condition)
Definition: c.h:804
uint16 channel_len
Definition: async.c:391
uint16 payload_len
Definition: async.c:392
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175

◆ asyncQueuePageDiff()

static int asyncQueuePageDiff ( int  p,
int  q 
)
static

Definition at line 474 of file async.c.

References Assert, and QUEUE_MAX_PAGE.

Referenced by asyncQueuePagePrecedes(), and SignalBackends().

475 {
476  int diff;
477 
478  /*
479  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
480  * in the range 0..QUEUE_MAX_PAGE.
481  */
482  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
483  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
484 
485  diff = p - q;
486  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
487  diff -= QUEUE_MAX_PAGE + 1;
488  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
489  diff += QUEUE_MAX_PAGE + 1;
490  return diff;
491 }
#define Assert(condition)
Definition: c.h:804
#define QUEUE_MAX_PAGE
Definition: async.c:321

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 500 of file async.c.

References asyncQueuePageDiff().

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

501 {
502  return asyncQueuePageDiff(p, q) < 0;
503 }
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:474

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2096 of file async.c.

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

2100 {
2101  bool reachedStop = false;
2102  bool reachedEndOfPage;
2103  AsyncQueueEntry *qe;
2104 
2105  do
2106  {
2107  QueuePosition thisentry = *current;
2108 
2109  if (QUEUE_POS_EQUAL(thisentry, stop))
2110  break;
2111 
2112  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2113 
2114  /*
2115  * Advance *current over this message, possibly to the next page. As
2116  * noted in the comments for asyncQueueReadAllNotifications, we must
2117  * do this before possibly failing while processing the message.
2118  */
2119  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2120 
2121  /* Ignore messages destined for other databases */
2122  if (qe->dboid == MyDatabaseId)
2123  {
2124  if (XidInMVCCSnapshot(qe->xid, snapshot))
2125  {
2126  /*
2127  * The source transaction is still in progress, so we can't
2128  * process this message yet. Break out of the loop, but first
2129  * back up *current so we will reprocess the message next
2130  * time. (Note: it is unlikely but not impossible for
2131  * TransactionIdDidCommit to fail, so we can't really avoid
2132  * this advance-then-back-up behavior when dealing with an
2133  * uncommitted message.)
2134  *
2135  * Note that we must test XidInMVCCSnapshot before we test
2136  * TransactionIdDidCommit, else we might return a message from
2137  * a transaction that is not yet visible to snapshots; compare
2138  * the comments at the head of heapam_visibility.c.
2139  *
2140  * Also, while our own xact won't be listed in the snapshot,
2141  * we need not check for TransactionIdIsCurrentTransactionId
2142  * because our transaction cannot (yet) have queued any
2143  * messages.
2144  */
2145  *current = thisentry;
2146  reachedStop = true;
2147  break;
2148  }
2149  else if (TransactionIdDidCommit(qe->xid))
2150  {
2151  /* qe->data is the null-terminated channel name */
2152  char *channel = qe->data;
2153 
2154  if (IsListeningOn(channel))
2155  {
2156  /* payload follows channel name */
2157  char *payload = qe->data + strlen(channel) + 1;
2158 
2159  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2160  }
2161  }
2162  else
2163  {
2164  /*
2165  * The source transaction aborted or crashed, so we just
2166  * ignore its notifications.
2167  */
2168  }
2169  }
2170 
2171  /* Loop back if we're not at end of page */
2172  } while (!reachedEndOfPage);
2173 
2174  if (QUEUE_POS_EQUAL(*current, stop))
2175  reachedStop = true;
2176 
2177  return reachedStop;
2178 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:176
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2242
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1381
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
static bool IsListeningOn(const char *channel)
Definition: async.c:1290
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
Oid MyDatabaseId
Definition: globals.c:88
TransactionId xid
Definition: async.c:174
int32 srcPid
Definition: async.c:175
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2305

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1931 of file async.c.

References Assert, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), ProcessCompletedNotifies(), and ProcessIncomingNotify().

1932 {
1933  volatile QueuePosition pos;
1934  QueuePosition head;
1935  Snapshot snapshot;
1936 
1937  /* page_buffer must be adequately aligned, so use a union */
1938  union
1939  {
1940  char buf[QUEUE_PAGESIZE];
1941  AsyncQueueEntry align;
1942  } page_buffer;
1943 
1944  /* Fetch current state */
1945  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1946  /* Assert checks that we have a valid state entry */
1949  head = QUEUE_HEAD;
1950  LWLockRelease(NotifyQueueLock);
1951 
1952  if (QUEUE_POS_EQUAL(pos, head))
1953  {
1954  /* Nothing to do, we have read all notifications already. */
1955  return;
1956  }
1957 
1958  /*----------
1959  * Get snapshot we'll use to decide which xacts are still in progress.
1960  * This is trickier than it might seem, because of race conditions.
1961  * Consider the following example:
1962  *
1963  * Backend 1: Backend 2:
1964  *
1965  * transaction starts
1966  * UPDATE foo SET ...;
1967  * NOTIFY foo;
1968  * commit starts
1969  * queue the notify message
1970  * transaction starts
1971  * LISTEN foo; -- first LISTEN in session
1972  * SELECT * FROM foo WHERE ...;
1973  * commit to clog
1974  * commit starts
1975  * add backend 2 to array of listeners
1976  * advance to queue head (this code)
1977  * commit to clog
1978  *
1979  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1980  * wasn't committed yet. Ideally we'd ensure that client 2 would
1981  * eventually get transaction 1's notify message, but there's no way
1982  * to do that; until we're in the listener array, there's no guarantee
1983  * that the notify message doesn't get removed from the queue.
1984  *
1985  * Therefore the coding technique transaction 2 is using is unsafe:
1986  * applications must commit a LISTEN before inspecting database state,
1987  * if they want to ensure they will see notifications about subsequent
1988  * changes to that state.
1989  *
1990  * What we do guarantee is that we'll see all notifications from
1991  * transactions committing after the snapshot we take here.
1992  * Exec_ListenPreCommit has already added us to the listener array,
1993  * so no not-yet-committed messages can be removed from the queue
1994  * before we see them.
1995  *----------
1996  */
1997  snapshot = RegisterSnapshot(GetLatestSnapshot());
1998 
1999  /*
2000  * It is possible that we fail while trying to send a message to our
2001  * frontend (for example, because of encoding conversion failure). If
2002  * that happens it is critical that we not try to send the same message
2003  * over and over again. Therefore, we place a PG_TRY block here that will
2004  * forcibly advance our queue position before we lose control to an error.
2005  * (We could alternatively retake NotifyQueueLock and move the position
2006  * before handling each individual message, but that seems like too much
2007  * lock traffic.)
2008  */
2009  PG_TRY();
2010  {
2011  bool reachedStop;
2012 
2013  do
2014  {
2015  int curpage = QUEUE_POS_PAGE(pos);
2016  int curoffset = QUEUE_POS_OFFSET(pos);
2017  int slotno;
2018  int copysize;
2019 
2020  /*
2021  * We copy the data from SLRU into a local buffer, so as to avoid
2022  * holding the NotifySLRULock while we are examining the entries
2023  * and possibly transmitting them to our frontend. Copy only the
2024  * part of the page we will actually inspect.
2025  */
2026  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
2028  if (curpage == QUEUE_POS_PAGE(head))
2029  {
2030  /* we only want to read as far as head */
2031  copysize = QUEUE_POS_OFFSET(head) - curoffset;
2032  if (copysize < 0)
2033  copysize = 0; /* just for safety */
2034  }
2035  else
2036  {
2037  /* fetch all the rest of the page */
2038  copysize = QUEUE_PAGESIZE - curoffset;
2039  }
2040  memcpy(page_buffer.buf + curoffset,
2041  NotifyCtl->shared->page_buffer[slotno] + curoffset,
2042  copysize);
2043  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2044  LWLockRelease(NotifySLRULock);
2045 
2046  /*
2047  * Process messages up to the stop position, end of page, or an
2048  * uncommitted message.
2049  *
2050  * Our stop position is what we found to be the head's position
2051  * when we entered this function. It might have changed already.
2052  * But if it has, we will receive (or have already received and
2053  * queued) another signal and come here again.
2054  *
2055  * We are not holding NotifyQueueLock here! The queue can only
2056  * extend beyond the head pointer (see above) and we leave our
2057  * backend's pointer where it is so nobody will truncate or
2058  * rewrite pages under us. Especially we don't want to hold a lock
2059  * while sending the notifications to the frontend.
2060  */
2061  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2062  page_buffer.buf,
2063  snapshot);
2064  } while (!reachedStop);
2065  }
2066  PG_FINALLY();
2067  {
2068  /* Update shared state */
2069  LWLockAcquire(NotifyQueueLock, LW_SHARED);
2071  LWLockRelease(NotifyQueueLock);
2072  }
2073  PG_END_TRY();
2074 
2075  /* Done with snapshot */
2076  UnregisterSnapshot(snapshot);
2077 }
int MyProcPid
Definition: globals.c:43
BackendId MyBackendId
Definition: globals.c:84
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:810
#define QUEUE_HEAD
Definition: async.c:289
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define NotifyCtl
Definition: async.c:303
#define QUEUE_POS_OFFSET(x)
Definition: async.c:194
static char * buf
Definition: pg_test_fsync.c:68
#define QUEUE_PAGESIZE
Definition: async.c:304
#define InvalidTransactionId
Definition: transam.h:31
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:852
#define PG_FINALLY()
Definition: elog.h:330
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:495
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2096
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:325
#define PG_TRY()
Definition: elog.h:313
#define QUEUE_BACKEND_POS(i)
Definition: async.c:296
#define PG_END_TRY()
Definition: elog.h:338
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1309 of file async.c.

References amRegisteredListener, Assert, i, InvalidBackendId, InvalidOid, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

1310 {
1311  Assert(listenChannels == NIL); /* else caller error */
1312 
1313  if (!amRegisteredListener) /* nothing to do */
1314  return;
1315 
1316  /*
1317  * Need exclusive lock here to manipulate list links.
1318  */
1319  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1320  /* Mark our entry as invalid */
1323  /* and remove it from the list */
1326  else
1327  {
1329  {
1331  {
1333  break;
1334  }
1335  }
1336  }
1338  LWLockRelease(NotifyQueueLock);
1339 
1340  /* mark ourselves as no longer listed in the global array */
1341  amRegisteredListener = false;
1342 }
#define NIL
Definition: pg_list.h:65
BackendId MyBackendId
Definition: globals.c:84
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
static List * listenChannels
Definition: async.c:328
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define QUEUE_FIRST_LISTENER
Definition: async.c:292
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:427
int BackendId
Definition: backendid.h:21
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:294
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:295
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
int i
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1591 of file async.c.

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

1592 {
1593  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1594  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1595  int occupied;
1596 
1597  occupied = headPage - tailPage;
1598 
1599  if (occupied == 0)
1600  return (double) 0; /* fast exit for common case */
1601 
1602  if (occupied < 0)
1603  {
1604  /* head has wrapped around, tail not yet */
1605  occupied += QUEUE_MAX_PAGE + 1;
1606  }
1607 
1608  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1609 }
#define QUEUE_TAIL
Definition: async.c:290
#define QUEUE_HEAD
Definition: async.c:289
#define QUEUE_MAX_PAGE
Definition: async.c:321
#define QUEUE_POS_PAGE(x)
Definition: async.c:193

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 526 of file async.c.

References add_size(), asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateSharedMemoryAndSemaphores().

527 {
528  bool found;
529  Size size;
530 
531  /*
532  * Create or attach to the AsyncQueueControl structure.
533  *
534  * The used entries in the backend[] array run from 1 to MaxBackends; the
535  * zero'th entry is unused but must be allocated.
536  */
537  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
538  size = add_size(size, offsetof(AsyncQueueControl, backend));
539 
541  ShmemInitStruct("Async Queue Control", size, &found);
542 
543  if (!found)
544  {
545  /* First time through, so initialize it */
546  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
547  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
548  QUEUE_STOP_PAGE = 0;
551  /* zero'th entry won't be used, but let's initialize it anyway */
552  for (int i = 0; i <= MaxBackends; i++)
553  {
558  }
559  }
560 
561  /*
562  * Set up SLRU management of the pg_notify data.
563  */
564  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
566  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
568 
569  if (!found)
570  {
571  /*
572  * During start or reboot, clean out the pg_notify directory.
573  */
575  }
576 }
#define QUEUE_TAIL
Definition: async.c:290
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1530
#define QUEUE_HEAD
Definition: async.c:289
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:187
#define NotifyCtl
Definition: async.c:303
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:292
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
int MaxBackends
Definition: globals.c:139
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:500
static AsyncQueueControl * asyncQueueControl
Definition: async.c:287
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:294
#define QUEUE_STOP_PAGE
Definition: async.c:291
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:295
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1553
size_t Size
Definition: c.h:540
int i
TimestampTz lastQueueFillWarn
Definition: async.c:282
#define QUEUE_BACKEND_POS(i)
Definition: async.c:296
#define offsetof(type, field)
Definition: c.h:727
#define InvalidPid
Definition: miscadmin.h:32

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 509 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

510 {
511  Size size;
512 
513  /* This had better match AsyncShmemInit */
514  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
515  size = add_size(size, offsetof(AsyncQueueControl, backend));
516 
518 
519  return size;
520 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:139
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1756 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1757 {
1758  /*
1759  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1760  * we have registered as a listener but have not made any entry in
1761  * listenChannels. In that case, deregister again.
1762  */
1765 
1766  /* And clean up */
1768 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:328
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2463
static void asyncQueueUnregister(void)
Definition: async.c:1309
static bool amRegisteredListener
Definition: async.c:427

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 990 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

991 {
992  ListCell *p;
993 
994  /*
995  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
996  * return as soon as possible
997  */
999  return;
1000 
1001  if (Trace_notify)
1002  elog(DEBUG1, "AtCommit_Notify");
1003 
1004  /* Perform any pending listen/unlisten actions */
1005  if (pendingActions != NULL)
1006  {
1007  foreach(p, pendingActions->actions)
1008  {
1009  ListenAction *actrec = (ListenAction *) lfirst(p);
1010 
1011  switch (actrec->action)
1012  {
1013  case LISTEN_LISTEN:
1014  Exec_ListenCommit(actrec->channel);
1015  break;
1016  case LISTEN_UNLISTEN:
1017  Exec_UnlistenCommit(actrec->channel);
1018  break;
1019  case LISTEN_UNLISTEN_ALL:
1021  break;
1022  }
1023  }
1024  }
1025 
1026  /* If no longer listening to anything, get out of listener array */
1029 
1030  /* And clean up */
1032 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:360
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1193
static List * listenChannels
Definition: async.c:328
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2463
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1162
static NotificationList * pendingNotifies
Definition: async.c:412
List * actions
Definition: async.c:356
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1135
static void asyncQueueUnregister(void)
Definition: async.c:1309
static bool amRegisteredListener
Definition: async.c:427
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:350
ListenActionKind action
Definition: async.c:349

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 863 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

864 {
865  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
867  ereport(ERROR,
868  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
869  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
870 }
static ActionList * pendingActions
Definition: async.c:360
int errcode(int sqlerrcode)
Definition: elog.c:698
static NotificationList * pendingNotifies
Definition: async.c:412
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1846 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1847 {
1848  int my_level = GetCurrentTransactionNestLevel();
1849 
1850  /*
1851  * All we have to do is pop the stack --- the actions/notifies made in
1852  * this subxact are no longer interesting, and the space will be freed
1853  * when CurTransactionContext is recycled. We still have to free the
1854  * ActionList and NotificationList objects themselves, though, because
1855  * those are allocated in TopTransactionContext.
1856  *
1857  * Note that there might be no entries at all, or no entries for the
1858  * current subtransaction level, either because none were ever created, or
1859  * because we reentered this routine due to trouble during subxact abort.
1860  */
1861  while (pendingActions != NULL &&
1862  pendingActions->nestingLevel >= my_level)
1863  {
1864  ActionList *childPendingActions = pendingActions;
1865 
1867  pfree(childPendingActions);
1868  }
1869 
1870  while (pendingNotifies != NULL &&
1871  pendingNotifies->nestingLevel >= my_level)
1872  {
1873  NotificationList *childPendingNotifies = pendingNotifies;
1874 
1876  pfree(childPendingNotifies);
1877  }
1878 }
static ActionList * pendingActions
Definition: async.c:360
int nestingLevel
Definition: async.c:399
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:412
struct NotificationList * upper
Definition: async.c:402
int nestingLevel
Definition: async.c:355
struct ActionList * upper
Definition: async.c:357
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1776 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1777 {
1778  int my_level = GetCurrentTransactionNestLevel();
1779 
1780  /* If there are actions at our nesting level, we must reparent them. */
1781  if (pendingActions != NULL &&
1782  pendingActions->nestingLevel >= my_level)
1783  {
1784  if (pendingActions->upper == NULL ||
1785  pendingActions->upper->nestingLevel < my_level - 1)
1786  {
1787  /* nothing to merge; give the whole thing to the parent */
1789  }
1790  else
1791  {
1792  ActionList *childPendingActions = pendingActions;
1793 
1795 
1796  /*
1797  * Mustn't try to eliminate duplicates here --- see queue_listen()
1798  */
1801  childPendingActions->actions);
1802  pfree(childPendingActions);
1803  }
1804  }
1805 
1806  /* If there are notifies at our nesting level, we must reparent them. */
1807  if (pendingNotifies != NULL &&
1808  pendingNotifies->nestingLevel >= my_level)
1809  {
1810  Assert(pendingNotifies->nestingLevel == my_level);
1811 
1812  if (pendingNotifies->upper == NULL ||
1813  pendingNotifies->upper->nestingLevel < my_level - 1)
1814  {
1815  /* nothing to merge; give the whole thing to the parent */
1817  }
1818  else
1819  {
1820  /*
1821  * Formerly, we didn't bother to eliminate duplicates here, but
1822  * now we must, else we fall foul of "Assert(!found)", either here
1823  * or during a later attempt to build the parent-level hashtable.
1824  */
1825  NotificationList *childPendingNotifies = pendingNotifies;
1826  ListCell *l;
1827 
1829  /* Insert all the subxact's events into parent, except for dups */
1830  foreach(l, childPendingNotifies->events)
1831  {
1832  Notification *childn = (Notification *) lfirst(l);
1833 
1834  if (!AsyncExistsPendingNotify(childn))
1835  AddEventToPendingNotifies(childn);
1836  }
1837  pfree(childPendingNotifies);
1838  }
1839  }
1840 }
List * events
Definition: async.c:400
static ActionList * pendingActions
Definition: async.c:360
List * list_concat(List *list1, const List *list2)
Definition: list.c:530
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2329
int nestingLevel
Definition: async.c:399
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:412
List * actions
Definition: async.c:356
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2370
struct NotificationList * upper
Definition: async.c:402
int nestingLevel
Definition: async.c:355
struct ActionList * upper
Definition: async.c:357
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2463 of file async.c.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

2464 {
2465  /*
2466  * Everything's allocated in either TopTransactionContext or the context
2467  * for the subtransaction to which it corresponds. So, there's nothing to
2468  * do here except reset the pointers; the space will be reclaimed when the
2469  * contexts are deleted.
2470  */
2471  pendingActions = NULL;
2472  pendingNotifies = NULL;
2473 }
static ActionList * pendingActions
Definition: async.c:360
static NotificationList * pendingNotifies
Definition: async.c:412

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1135 of file async.c.

References IsListeningOn(), lappend(), MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

1136 {
1137  MemoryContext oldcontext;
1138 
1139  /* Do nothing if we are already listening on this channel */
1140  if (IsListeningOn(channel))
1141  return;
1142 
1143  /*
1144  * Add the new channel name to listenChannels.
1145  *
1146  * XXX It is theoretically possible to get an out-of-memory failure here,
1147  * which would be bad because we already committed. For the moment it
1148  * doesn't seem worth trying to guard against that, but maybe improve this
1149  * later.
1150  */
1153  MemoryContextSwitchTo(oldcontext);
1154 }
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:328
static bool IsListeningOn(const char *channel)
Definition: async.c:1290
MemoryContext TopMemoryContext
Definition: mcxt.c:48
List * lappend(List *list, void *datum)
Definition: list.c:336

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1040 of file async.c.

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

1041 {
1042  QueuePosition head;
1043  QueuePosition max;
1044  BackendId prevListener;
1045 
1046  /*
1047  * Nothing to do if we are already listening to something, nor if we
1048  * already ran this routine in this transaction.
1049  */
1051  return;
1052 
1053  if (Trace_notify)
1054  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1055 
1056  /*
1057  * Before registering, make sure we will unlisten before dying. (Note:
1058  * this action does not get undone if we abort later.)
1059  */
1061  {
1063  unlistenExitRegistered = true;
1064  }
1065 
1066  /*
1067  * This is our first LISTEN, so establish our pointer.
1068  *
1069  * We set our pointer to the global tail pointer and then move it forward
1070  * over already-committed notifications. This ensures we cannot miss any
1071  * not-yet-committed notifications. We might get a few more but that
1072  * doesn't hurt.
1073  *
1074  * In some scenarios there might be a lot of committed notifications that
1075  * have not yet been pruned away (because some backend is being lazy about
1076  * reading them). To reduce our startup time, we can look at other
1077  * backends and adopt the maximum "pos" pointer of any backend that's in
1078  * our database; any notifications it's already advanced over are surely
1079  * committed and need not be re-examined by us. (We must consider only
1080  * backends connected to our DB, because others will not have bothered to
1081  * check committed-ness of notifications in our DB.)
1082  *
1083  * We need exclusive lock here so we can look at other backends' entries
1084  * and manipulate the list links.
1085  */
1086  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1087  head = QUEUE_HEAD;
1088  max = QUEUE_TAIL;
1089  prevListener = InvalidBackendId;
1091  {
1093  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1094  /* Also find last listening backend before this one */
1095  if (i < MyBackendId)
1096  prevListener = i;
1097  }
1101  /* Insert backend into list of listeners at correct position */
1102  if (prevListener > 0)
1103  {
1105  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1106  }
1107  else
1108  {
1111  }
1112  LWLockRelease(NotifyQueueLock);
1113 
1114  /* Now we are listed in the global array, so remember we're listening */
1115  amRegisteredListener = true;
1116 
1117  /*
1118  * Try to move our pointer forward as far as possible. This will skip
1119  * over already-committed notifications, which we want to do because they
1120  * might be quite stale. Note that we are not yet listening on anything,
1121  * so we won't deliver such notifications to our frontend. Also, although
1122  * our transaction might have executed NOTIFY, those message(s) aren't
1123  * queued yet so we won't skip them here.
1124  */
1125  if (!QUEUE_POS_EQUAL(max, head))
1127 }
#define QUEUE_TAIL
Definition: async.c:290
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
BackendId MyBackendId
Definition: globals.c:84
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
#define QUEUE_HEAD
Definition: async.c:289
static bool unlistenExitRegistered
Definition: async.c:424
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define QUEUE_FIRST_LISTENER
Definition: async.c:292
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:850
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:427
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:88
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:294
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1931
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:295
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:296
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define QUEUE_POS_MAX(x, y)
Definition: async.c:215

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1193 of file async.c.

References DEBUG1, elog, list_free_deep(), MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

1194 {
1195  if (Trace_notify)
1196  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1197 
1199  listenChannels = NIL;
1200 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static List * listenChannels
Definition: async.c:328
void list_free_deep(List *list)
Definition: list.c:1405
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1162 of file async.c.

References DEBUG1, elog, foreach_delete_current, lfirst, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

1163 {
1164  ListCell *q;
1165 
1166  if (Trace_notify)
1167  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1168 
1169  foreach(q, listenChannels)
1170  {
1171  char *lchan = (char *) lfirst(q);
1172 
1173  if (strcmp(lchan, channel) == 0)
1174  {
1176  pfree(lchan);
1177  break;
1178  }
1179  }
1180 
1181  /*
1182  * We do not complain about unlistening something not being listened;
1183  * should we?
1184  */
1185 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static List * listenChannels
Definition: async.c:328
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:369
void pfree(void *pointer)
Definition: mcxt.c:1169
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1889 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1890 {
1891  /*
1892  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1893  * you do here.
1894  */
1895 
1896  /* signal that work needs to be done */
1897  notifyInterruptPending = true;
1898 
1899  /* make sure the event is processed in due course */
1900  SetLatch(MyLatch);
1901 }
void SetLatch(Latch *latch)
Definition: latch.c:567
struct Latch * MyLatch
Definition: globals.c:57
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:421

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1290 of file async.c.

References lfirst.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

1291 {
1292  ListCell *p;
1293 
1294  foreach(p, listenChannels)
1295  {
1296  char *lchan = (char *) lfirst(p);
1297 
1298  if (strcmp(lchan, channel) == 0)
1299  return true;
1300  }
1301  return false;
1302 }
static List * listenChannels
Definition: async.c:328
#define lfirst(lc)
Definition: pg_list.h:169

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2433 of file async.c.

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32, hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2434 {
2435  const Notification *k = *(const Notification *const *) key;
2436 
2437  Assert(keysize == sizeof(Notification *));
2438  /* We don't bother to include the payload's trailing null in the hash */
2439  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2440  k->channel_len + k->payload_len + 1));
2441 }
#define DatumGetUInt32(X)
Definition: postgres.h:530
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:394
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
#define Assert(condition)
Definition: c.h:804
uint16 channel_len
Definition: async.c:391
uint16 payload_len
Definition: async.c:392

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2447 of file async.c.

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2448 {
2449  const Notification *k1 = *(const Notification *const *) key1;
2450  const Notification *k2 = *(const Notification *const *) key2;
2451 
2452  Assert(keysize == sizeof(Notification *));
2453  if (k1->channel_len == k2->channel_len &&
2454  k1->payload_len == k2->payload_len &&
2455  memcmp(k1->data, k2->data,
2456  k1->channel_len + k1->payload_len + 2) == 0)
2457  return 0; /* equal */
2458  return 1; /* not equal */
2459 }
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:394
#define Assert(condition)
Definition: c.h:804
uint16 channel_len
Definition: async.c:391
uint16 payload_len
Definition: async.c:392

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2305 of file async.c.

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2306 {
2308  {
2310 
2311  pq_beginmessage(&buf, 'A');
2312  pq_sendint32(&buf, srcPid);
2313  pq_sendstring(&buf, channel);
2314  pq_sendstring(&buf, payload);
2315  pq_endmessage(&buf);
2316 
2317  /*
2318  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2319  * happen at the end of the transaction, and for incoming notifies
2320  * ProcessIncomingNotify will do it after finding all the notifies.
2321  */
2322  }
2323  else
2324  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2325 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:232
CommandDest whereToSendOutput
Definition: postgres.c:92

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 817 of file async.c.

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

818 {
819  FuncCallContext *funcctx;
820 
821  /* stuff done only on the first call of the function */
822  if (SRF_IS_FIRSTCALL())
823  {
824  /* create a function context for cross-call persistence */
825  funcctx = SRF_FIRSTCALL_INIT();
826  }
827 
828  /* stuff done on every call of the function */
829  funcctx = SRF_PERCALL_SETUP();
830 
831  if (funcctx->call_cntr < list_length(listenChannels))
832  {
833  char *channel = (char *) list_nth(listenChannels,
834  funcctx->call_cntr);
835 
836  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
837  }
838 
839  SRF_RETURN_DONE(funcctx);
840 }
uint64 call_cntr
Definition: funcapi.h:65
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:293
static List * listenChannels
Definition: async.c:328
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:297
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:299
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
static int list_length(const List *l)
Definition: pg_list.h:149
#define CStringGetTextDatum(s)
Definition: builtins.h:82
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:317
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:295

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1566 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

1567 {
1568  double usage;
1569 
1570  /* Advance the queue tail so we don't report a too-large result */
1572 
1573  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1574  usage = asyncQueueUsage();
1575  LWLockRelease(NotifyQueueLock);
1576 
1577  PG_RETURN_FLOAT8(usage);
1578 }
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
static double asyncQueueUsage(void)
Definition: async.c:1591
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
static void asyncQueueAdvanceTail(void)
Definition: async.c:2185
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
static void usage(const char *progname)
Definition: vacuumlo.c:417

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 584 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

585 {
586  const char *channel;
587  const char *payload;
588 
589  if (PG_ARGISNULL(0))
590  channel = "";
591  else
592  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
593 
594  if (PG_ARGISNULL(1))
595  payload = "";
596  else
597  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
598 
599  /* For NOTIFY as a statement, this is checked in ProcessUtility */
601 
602  Async_Notify(channel, payload);
603 
604  PG_RETURN_VOID();
605 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:445
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:618
char * text_to_cstring(const text *t)
Definition: varlena.c:223

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 888 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

889 {
890  ListCell *p;
891 
893  return; /* no relevant statements in this xact */
894 
895  if (Trace_notify)
896  elog(DEBUG1, "PreCommit_Notify");
897 
898  /* Preflight for any pending listen/unlisten actions */
899  if (pendingActions != NULL)
900  {
901  foreach(p, pendingActions->actions)
902  {
903  ListenAction *actrec = (ListenAction *) lfirst(p);
904 
905  switch (actrec->action)
906  {
907  case LISTEN_LISTEN:
909  break;
910  case LISTEN_UNLISTEN:
911  /* there is no Exec_UnlistenPreCommit() */
912  break;
913  case LISTEN_UNLISTEN_ALL:
914  /* there is no Exec_UnlistenAllPreCommit() */
915  break;
916  }
917  }
918  }
919 
920  /* Queue any pending notifies (must happen after the above) */
921  if (pendingNotifies)
922  {
923  ListCell *nextNotify;
924 
925  /*
926  * Make sure that we have an XID assigned to the current transaction.
927  * GetCurrentTransactionId is cheap if we already have an XID, but not
928  * so cheap if we don't, and we'd prefer not to do that work while
929  * holding NotifyQueueLock.
930  */
931  (void) GetCurrentTransactionId();
932 
933  /*
934  * Serialize writers by acquiring a special lock that we hold till
935  * after commit. This ensures that queue entries appear in commit
936  * order, and in particular that there are never uncommitted queue
937  * entries ahead of committed ones, so an uncommitted transaction
938  * can't block delivery of deliverable notifications.
939  *
940  * We use a heavyweight lock so that it'll automatically be released
941  * after either commit or abort. This also allows deadlocks to be
942  * detected, though really a deadlock shouldn't be possible here.
943  *
944  * The lock is on "database 0", which is pretty ugly but it doesn't
945  * seem worth inventing a special locktag category just for this.
946  * (Historical note: before PG 9.0, a similar lock on "database 0" was
947  * used by the flatfiles mechanism.)
948  */
949  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
951 
952  /* Now push the notifications into the queue */
954 
955  nextNotify = list_head(pendingNotifies->events);
956  while (nextNotify != NULL)
957  {
958  /*
959  * Add the pending notifications to the queue. We acquire and
960  * release NotifyQueueLock once per page, which might be overkill
961  * but it does allow readers to get in while we're doing this.
962  *
963  * A full queue is very uncommon and should really not happen,
964  * given that we have so much space available in the SLRU pages.
965  * Nevertheless we need to deal with this possibility. Note that
966  * when we get here we are in the process of committing our
967  * transaction, but we have not yet committed to clog, so at this
968  * point in time we can still roll the transaction back.
969  */
970  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
972  if (asyncQueueIsFull())
973  ereport(ERROR,
974  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
975  errmsg("too many notifications in the NOTIFY queue")));
976  nextNotify = asyncQueueAddEntries(nextNotify);
977  LWLockRelease(NotifyQueueLock);
978  }
979  }
980 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:400
static ActionList * pendingActions
Definition: async.c:360
int errcode(int sqlerrcode)
Definition: elog.c:698
static bool asyncQueueIsFull(void)
Definition: async.c:1350
static void asyncQueueFillWarning(void)
Definition: async.c:1620
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
static NotificationList * pendingNotifies
Definition: async.c:412
#define ERROR
Definition: elog.h:46
static bool backendHasSentNotifications
Definition: async.c:430
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
List * actions
Definition: async.c:356
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1017
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1452
#define lfirst(lc)
Definition: pg_list.h:169
static void Exec_ListenPreCommit(void)
Definition: async.c:1040
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
bool Trace_notify
Definition: async.c:436
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
ListenActionKind action
Definition: async.c:349

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1226 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1227 {
1228  MemoryContext caller_context;
1229 
1230  /* Nothing to do if we didn't send any notifications */
1232  return;
1233 
1234  /*
1235  * We reset the flag immediately; otherwise, if any sort of error occurs
1236  * below, we'd be locked up in an infinite loop, because control will come
1237  * right back here after error cleanup.
1238  */
1240 
1241  /*
1242  * We must preserve the caller's memory context (probably MessageContext)
1243  * across the transaction we do here.
1244  */
1245  caller_context = CurrentMemoryContext;
1246 
1247  if (Trace_notify)
1248  elog(DEBUG1, "ProcessCompletedNotifies");
1249 
1250  /*
1251  * We must run asyncQueueReadAllNotifications inside a transaction, else
1252  * bad things happen if it gets an error.
1253  */
1255 
1256  /* Send signals to other backends */
1257  SignalBackends();
1258 
1259  if (listenChannels != NIL)
1260  {
1261  /* Read the queue ourselves, and send relevant stuff to the frontend */
1263  }
1264 
1265  /*
1266  * If it's time to try to advance the global tail pointer, do that.
1267  */
1269  {
1270  backendTryAdvanceTail = false;
1272  }
1273 
1275 
1276  MemoryContextSwitchTo(caller_context);
1277 
1278  /* We don't need pq_flush() here since postgres.c will do one shortly */
1279 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2939
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:328
static bool backendHasSentNotifications
Definition: async.c:430
static bool backendTryAdvanceTail
Definition: async.c:433
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static void asyncQueueAdvanceTail(void)
Definition: async.c:2185
static void SignalBackends(void)
Definition: async.c:1673
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1931
void StartTransactionCommand(void)
Definition: xact.c:2838
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( void  )
static

Definition at line 2266 of file async.c.

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

2267 {
2268  /* We *must* reset the flag */
2269  notifyInterruptPending = false;
2270 
2271  /* Do nothing else if we aren't actively listening */
2272  if (listenChannels == NIL)
2273  return;
2274 
2275  if (Trace_notify)
2276  elog(DEBUG1, "ProcessIncomingNotify");
2277 
2278  set_ps_display("notify interrupt");
2279 
2280  /*
2281  * We must run asyncQueueReadAllNotifications inside a transaction, else
2282  * bad things happen if it gets an error.
2283  */
2285 
2287 
2289 
2290  /*
2291  * Must flush the notify messages to ensure frontend gets them promptly.
2292  */
2293  pq_flush();
2294 
2295  set_ps_display("idle");
2296 
2297  if (Trace_notify)
2298  elog(DEBUG1, "ProcessIncomingNotify: done");
2299 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
#define pq_flush()
Definition: libpq.h:46
void CommitTransactionCommand(void)
Definition: xact.c:2939
static List * listenChannels
Definition: async.c:328
void set_ps_display(const char *activity)
Definition: ps_status.c:349
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1931
void StartTransactionCommand(void)
Definition: xact.c:2838
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:421

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1915 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1916 {
1918  return; /* not really idle */
1919 
1920  while (notifyInterruptPending)
1922 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
static void ProcessIncomingNotify(void)
Definition: async.c:2266
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:421

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 717 of file async.c.

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, offsetof, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

718 {
719  MemoryContext oldcontext;
720  ListenAction *actrec;
721  int my_level = GetCurrentTransactionNestLevel();
722 
723  /*
724  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
725  * be too complicated to ensure we get the right interactions of
726  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
727  * would be any performance benefit anyway in sane applications.
728  */
730 
731  /* space for terminating null is included in sizeof(ListenAction) */
732  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
733  strlen(channel) + 1);
734  actrec->action = action;
735  strcpy(actrec->channel, channel);
736 
737  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
738  {
739  ActionList *actions;
740 
741  /*
742  * First action in current sub(xact). Note that we allocate the
743  * ActionList in TopTransactionContext; the nestingLevel might get
744  * changed later by AtSubCommit_Notify.
745  */
746  actions = (ActionList *)
748  actions->nestingLevel = my_level;
749  actions->actions = list_make1(actrec);
750  actions->upper = pendingActions;
751  pendingActions = actions;
752  }
753  else
755 
756  MemoryContextSwitchTo(oldcontext);
757 }
MemoryContext TopTransactionContext
Definition: mcxt.c:53
static ActionList * pendingActions
Definition: async.c:360
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:54
#define list_make1(x1)
Definition: pg_list.h:206
List * actions
Definition: async.c:356
List * lappend(List *list, void *datum)
Definition: list.c:336
int nestingLevel
Definition: async.c:355
struct ActionList * upper
Definition: async.c:357
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
void * palloc(Size size)
Definition: mcxt.c:1062
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:350
ListenActionKind action
Definition: async.c:349
#define offsetof(type, field)
Definition: c.h:727

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1673 of file async.c.

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by ProcessCompletedNotifies().

1674 {
1675  int32 *pids;
1676  BackendId *ids;
1677  int count;
1678 
1679  /*
1680  * Identify backends that we need to signal. We don't want to send
1681  * signals while holding the NotifyQueueLock, so this loop just builds a
1682  * list of target PIDs.
1683  *
1684  * XXX in principle these pallocs could fail, which would be bad. Maybe
1685  * preallocate the arrays? But in practice this is only run in trivial
1686  * transactions, so there should surely be space available.
1687  */
1688  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1689  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1690  count = 0;
1691 
1692  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1694  {
1695  int32 pid = QUEUE_BACKEND_PID(i);
1696  QueuePosition pos;
1697 
1698  Assert(pid != InvalidPid);
1699  if (pid == MyProcPid)
1700  continue; /* never signal self */
1701  pos = QUEUE_BACKEND_POS(i);
1703  {
1704  /*
1705  * Always signal listeners in our own database, unless they're
1706  * already caught up (unlikely, but possible).
1707  */
1708  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1709  continue;
1710  }
1711  else
1712  {
1713  /*
1714  * Listeners in other databases should be signaled only if they
1715  * are far behind.
1716  */
1719  continue;
1720  }
1721  /* OK, need to signal this one */
1722  pids[count] = pid;
1723  ids[count] = i;
1724  count++;
1725  }
1726  LWLockRelease(NotifyQueueLock);
1727 
1728  /* Now send signals */
1729  for (int i = 0; i < count; i++)
1730  {
1731  int32 pid = pids[i];
1732 
1733  /*
1734  * Note: assuming things aren't broken, a signal failure here could
1735  * only occur if the target backend exited since we released
1736  * NotifyQueueLock; which is unlikely but certainly possible. So we
1737  * just log a low-level debug message if it happens.
1738  */
1739  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1740  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1741  }
1742 
1743  pfree(pids);
1744  pfree(ids);
1745 }
int MyProcPid
Definition: globals.c:43
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
#define DEBUG3
Definition: elog.h:23
#define QUEUE_HEAD
Definition: async.c:289
signed int int32
Definition: c.h:429
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:474
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
void pfree(void *pointer)
Definition: mcxt.c:1169
#define QUEUE_FIRST_LISTENER
Definition: async.c:292
int MaxBackends
Definition: globals.c:139
#define QUEUE_CLEANUP_DELAY
Definition: async.c:231
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:88
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:294
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:295
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:296
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:202
#define InvalidPid
Definition: miscadmin.h:32

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 287 of file async.c.

◆ backendHasSentNotifications

bool backendHasSentNotifications = false
static

Definition at line 430 of file async.c.

Referenced by PreCommit_Notify(), and ProcessCompletedNotifies().

◆ backendTryAdvanceTail

bool backendTryAdvanceTail = false
static

Definition at line 433 of file async.c.

Referenced by asyncQueueAddEntries(), and ProcessCompletedNotifies().

◆ listenChannels

List* listenChannels = NIL
static

Definition at line 328 of file async.c.

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 301 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

ActionList* pendingActions = NULL
static

Definition at line 360 of file async.c.

Referenced by AtSubAbort_Notify(), AtSubCommit_Notify(), and queue_listen().

◆ pendingNotifies

NotificationList* pendingNotifies = NULL
static

Definition at line 412 of file async.c.

Referenced by Async_Notify(), AtSubAbort_Notify(), and AtSubCommit_Notify().

◆ Trace_notify

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 424 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().