PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)   ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)   ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL }
 

Functions

static int asyncQueuePageDiff (int p, int q)
 
static bool asyncQueuePagePrecedes (int p, int q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 192 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 415 of file async.c.

Referenced by AddEventToPendingNotifies().

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 166 of file async.c.

Referenced by Async_Notify(), and asyncQueueNotificationToEntry().

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

◆ QUEUE_BACKEND_PID

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 241 of file async.c.

Referenced by asyncQueueAddEntries(), and SignalBackends().

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 315 of file async.c.

Referenced by asyncQueueFillWarning().

◆ QUEUE_HEAD

◆ QUEUE_MAX_PAGE

#define QUEUE_MAX_PAGE   (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

Definition at line 331 of file async.c.

Referenced by asyncQueueAdvance(), asyncQueueIsFull(), asyncQueuePageDiff(), and asyncQueueUsage().

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
 
)    ((x).page == (y).page && (x).offset == (y).offset)

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)    ((x).page == 0 && (x).offset == 0)

Definition at line 215 of file async.c.

Referenced by asyncQueueAddEntries().

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507

Definition at line 225 of file async.c.

Referenced by Exec_ListenPreCommit().

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507

Definition at line 219 of file async.c.

Referenced by asyncQueueAdvanceTail(), and asyncQueueFillWarning().

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 301 of file async.c.

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 190 of file async.c.

Referenced by asyncQueueAdvance(), and asyncQueueNotificationToEntry().

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 206 of file async.c.

Referenced by asyncQueueAdvance(), and AsyncShmemInit().

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 350 of file async.c.

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2343 of file async.c.

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationHash::event, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), and notification_match().

Referenced by Async_Notify(), and AtSubCommit_Notify().

2344 {
2346 
2347  /* Create the hash table if it's time to */
2349  pendingNotifies->hashtab == NULL)
2350  {
2351  HASHCTL hash_ctl;
2352  ListCell *l;
2353 
2354  /* Create the hash table */
2355  hash_ctl.keysize = sizeof(Notification *);
2356  hash_ctl.entrysize = sizeof(NotificationHash);
2357  hash_ctl.hash = notification_hash;
2358  hash_ctl.match = notification_match;
2359  hash_ctl.hcxt = CurTransactionContext;
2361  hash_create("Pending Notifies",
2362  256L,
2363  &hash_ctl,
2365 
2366  /* Insert all the already-existing events */
2367  foreach(l, pendingNotifies->events)
2368  {
2369  Notification *oldn = (Notification *) lfirst(l);
2370  NotificationHash *hentry;
2371  bool found;
2372 
2374  &oldn,
2375  HASH_ENTER,
2376  &found);
2377  Assert(!found);
2378  hentry->event = oldn;
2379  }
2380  }
2381 
2382  /* Add new event to the list, in order */
2384 
2385  /* Add event to the hash table if needed */
2386  if (pendingNotifies->hashtab != NULL)
2387  {
2388  NotificationHash *hentry;
2389  bool found;
2390 
2392  &n,
2393  HASH_ENTER,
2394  &found);
2395  Assert(!found);
2396  hentry->event = n;
2397  }
2398 }
#define NIL
Definition: pg_list.h:65
struct NotificationHash NotificationHash
List * events
Definition: async.c:410
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
Size entrysize
Definition: hsearch.h:76
MemoryContext CurTransactionContext
Definition: mcxt.c:54
Notification * event
Definition: async.c:419
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static NotificationList * pendingNotifies
Definition: async.c:422
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
List * lappend(List *list, void *datum)
Definition: list.c:336
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:415
Size keysize
Definition: hsearch.h:75
HashCompareFunc match
Definition: hsearch.h:80
HTAB * hashtab
Definition: async.c:411
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2420
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2406
#define HASH_COMPARE
Definition: hsearch.h:99
static int list_length(const List *l)
Definition: pg_list.h:149
HashValueFunc hash
Definition: hsearch.h:78
#define HASH_FUNCTION
Definition: hsearch.h:98

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 772 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

773 {
774  if (Trace_notify)
775  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
776 
777  queue_listen(LISTEN_LISTEN, channel);
778 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 625 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

626 {
627  int my_level = GetCurrentTransactionNestLevel();
628  size_t channel_len;
629  size_t payload_len;
630  Notification *n;
631  MemoryContext oldcontext;
632 
633  if (IsParallelWorker())
634  elog(ERROR, "cannot send notifications from a parallel worker");
635 
636  if (Trace_notify)
637  elog(DEBUG1, "Async_Notify(%s)", channel);
638 
639  channel_len = channel ? strlen(channel) : 0;
640  payload_len = payload ? strlen(payload) : 0;
641 
642  /* a channel name must be specified */
643  if (channel_len == 0)
644  ereport(ERROR,
645  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
646  errmsg("channel name cannot be empty")));
647 
648  /* enforce length limits */
649  if (channel_len >= NAMEDATALEN)
650  ereport(ERROR,
651  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
652  errmsg("channel name too long")));
653 
654  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
655  ereport(ERROR,
656  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
657  errmsg("payload string too long")));
658 
659  /*
660  * We must construct the Notification entry, even if we end up not using
661  * it, in order to compare it cheaply to existing list entries.
662  *
663  * The notification list needs to live until end of transaction, so store
664  * it in the transaction context.
665  */
667 
668  n = (Notification *) palloc(offsetof(Notification, data) +
669  channel_len + payload_len + 2);
670  n->channel_len = channel_len;
671  n->payload_len = payload_len;
672  strcpy(n->data, channel);
673  if (payload)
674  strcpy(n->data + channel_len + 1, payload);
675  else
676  n->data[channel_len + 1] = '\0';
677 
678  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
679  {
680  NotificationList *notifies;
681 
682  /*
683  * First notify event in current (sub)xact. Note that we allocate the
684  * NotificationList in TopTransactionContext; the nestingLevel might
685  * get changed later by AtSubCommit_Notify.
686  */
687  notifies = (NotificationList *)
689  sizeof(NotificationList));
690  notifies->nestingLevel = my_level;
691  notifies->events = list_make1(n);
692  /* We certainly don't need a hashtable yet */
693  notifies->hashtab = NULL;
694  notifies->upper = pendingNotifies;
695  pendingNotifies = notifies;
696  }
697  else
698  {
699  /* Now check for duplicates */
701  {
702  /* It's a dup, so forget it */
703  pfree(n);
704  MemoryContextSwitchTo(oldcontext);
705  return;
706  }
707 
708  /* Append more events to existing list */
710  }
711 
712  MemoryContextSwitchTo(oldcontext);
713 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:410
MemoryContext TopTransactionContext
Definition: mcxt.c:53
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:54
int errcode(int sqlerrcode)
Definition: elog.c:698
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2302
int nestingLevel
Definition: async.c:409
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:422
#define ERROR
Definition: elog.h:46
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2343
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:166
struct NotificationList * upper
Definition: async.c:412
#define IsParallelWorker()
Definition: parallel.h:61
#define ereport(elevel,...)
Definition: elog.h:157
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:858
HTAB * hashtab
Definition: async.c:411
uint16 channel_len
Definition: async.c:401
uint16 payload_len
Definition: async.c:402
bool Trace_notify
Definition: async.c:443
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define elog(elevel,...)
Definition: elog.h:232
#define offsetof(type, field)
Definition: c.h:727

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 786 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

787 {
788  if (Trace_notify)
789  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
790 
791  /* If we couldn't possibly be listening, no need to queue anything */
792  if (pendingActions == NULL && !unlistenExitRegistered)
793  return;
794 
795  queue_listen(LISTEN_UNLISTEN, channel);
796 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
static ActionList * pendingActions
Definition: async.c:370
static bool unlistenExitRegistered
Definition: async.c:434
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 804 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

805 {
806  if (Trace_notify)
807  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
808 
809  /* If we couldn't possibly be listening, no need to queue anything */
810  if (pendingActions == NULL && !unlistenExitRegistered)
811  return;
812 
814 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
static ActionList * pendingActions
Definition: async.c:370
static bool unlistenExitRegistered
Definition: async.c:434
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 857 of file async.c.

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

858 {
861 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1228
static void asyncQueueUnregister(void)
Definition: async.c:1265

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2302 of file async.c.

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, and Notification::payload_len.

Referenced by Async_Notify(), and AtSubCommit_Notify().

2303 {
2304  if (pendingNotifies == NULL)
2305  return false;
2306 
2307  if (pendingNotifies->hashtab != NULL)
2308  {
2309  /* Use the hash table to probe for a match */
2311  &n,
2312  HASH_FIND,
2313  NULL))
2314  return true;
2315  }
2316  else
2317  {
2318  /* Must scan the event list */
2319  ListCell *l;
2320 
2321  foreach(l, pendingNotifies->events)
2322  {
2323  Notification *oldn = (Notification *) lfirst(l);
2324 
2325  if (n->channel_len == oldn->channel_len &&
2326  n->payload_len == oldn->payload_len &&
2327  memcmp(n->data, oldn->data,
2328  n->channel_len + n->payload_len + 2) == 0)
2329  return true;
2330  }
2331  }
2332 
2333  return false;
2334 }
List * events
Definition: async.c:410
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static NotificationList * pendingNotifies
Definition: async.c:422
HTAB * hashtab
Definition: async.c:411
#define lfirst(lc)
Definition: pg_list.h:169
uint16 channel_len
Definition: async.c:401
uint16 payload_len
Definition: async.c:402

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1408 of file async.c.

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

1409 {
1410  AsyncQueueEntry qe;
1411  QueuePosition queue_head;
1412  int pageno;
1413  int offset;
1414  int slotno;
1415 
1416  /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
1417  LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
1418 
1419  /*
1420  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1421  * memory upon exiting. The reason for this is that if we have to advance
1422  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1423  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1424  * subsequent insertions would try to put entries into a page that slru.c
1425  * thinks doesn't exist yet.) So, use a local position variable. Note
1426  * that if we do fail, any already-inserted queue entries are forgotten;
1427  * this is okay, since they'd be useless anyway after our transaction
1428  * rolls back.
1429  */
1430  queue_head = QUEUE_HEAD;
1431 
1432  /*
1433  * If this is the first write since the postmaster started, we need to
1434  * initialize the first page of the async SLRU. Otherwise, the current
1435  * page should be initialized already, so just fetch it.
1436  *
1437  * (We could also take the first path when the SLRU position has just
1438  * wrapped around, but re-zeroing the page is harmless in that case.)
1439  */
1440  pageno = QUEUE_POS_PAGE(queue_head);
1441  if (QUEUE_POS_IS_ZERO(queue_head))
1442  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1443  else
1444  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1446 
1447  /* Note we mark the page dirty before writing in it */
1448  NotifyCtl->shared->page_dirty[slotno] = true;
1449 
1450  while (nextNotify != NULL)
1451  {
1452  Notification *n = (Notification *) lfirst(nextNotify);
1453 
1454  /* Construct a valid queue entry in local variable qe */
1456 
1457  offset = QUEUE_POS_OFFSET(queue_head);
1458 
1459  /* Check whether the entry really fits on the current page */
1460  if (offset + qe.length <= QUEUE_PAGESIZE)
1461  {
1462  /* OK, so advance nextNotify past this item */
1463  nextNotify = lnext(pendingNotifies->events, nextNotify);
1464  }
1465  else
1466  {
1467  /*
1468  * Write a dummy entry to fill up the page. Actually readers will
1469  * only check dboid and since it won't match any reader's database
1470  * OID, they will ignore this entry and move on.
1471  */
1472  qe.length = QUEUE_PAGESIZE - offset;
1473  qe.dboid = InvalidOid;
1474  qe.data[0] = '\0'; /* empty channel */
1475  qe.data[1] = '\0'; /* empty payload */
1476  }
1477 
1478  /* Now copy qe into the shared buffer page */
1479  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1480  &qe,
1481  qe.length);
1482 
1483  /* Advance queue_head appropriately, and detect if page is full */
1484  if (asyncQueueAdvance(&(queue_head), qe.length))
1485  {
1486  /*
1487  * Page is full, so we're done here, but first fill the next page
1488  * with zeroes. The reason to do this is to ensure that slru.c's
1489  * idea of the head page is always the same as ours, which avoids
1490  * boundary problems in SimpleLruTruncate. The test in
1491  * asyncQueueIsFull() ensured that there is room to create this
1492  * page without overrunning the queue.
1493  */
1494  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1495 
1496  /*
1497  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1498  * set flag to remember that we should try to advance the tail
1499  * pointer (we don't want to actually do that right here).
1500  */
1501  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1502  tryAdvanceTail = true;
1503 
1504  /* And exit the loop */
1505  break;
1506  }
1507  }
1508 
1509  /* Success, so update the global QUEUE_HEAD */
1510  QUEUE_HEAD = queue_head;
1511 
1512  LWLockRelease(NotifySLRULock);
1513 
1514  return nextNotify;
1515 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:186
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:215
List * events
Definition: async.c:410
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:322
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1372
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1337
#define QUEUE_HEAD
Definition: async.c:299
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define NotifyCtl
Definition: async.c:313
#define QUEUE_POS_OFFSET(x)
Definition: async.c:204
static NotificationList * pendingNotifies
Definition: async.c:422
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:395
#define QUEUE_PAGESIZE
Definition: async.c:314
#define InvalidTransactionId
Definition: transam.h:31
#define QUEUE_CLEANUP_DELAY
Definition: async.c:241
#define InvalidOid
Definition: postgres_ext.h:36
#define lfirst(lc)
Definition: pg_list.h:169
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
static bool tryAdvanceTail
Definition: async.c:440
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:280
#define QUEUE_POS_PAGE(x)
Definition: async.c:203

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1337 of file async.c.

References Assert, AsyncQueueEntryEmptySize, QUEUE_MAX_PAGE, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

1338 {
1339  int pageno = QUEUE_POS_PAGE(*position);
1340  int offset = QUEUE_POS_OFFSET(*position);
1341  bool pageJump = false;
1342 
1343  /*
1344  * Move to the next writing position: First jump over what we have just
1345  * written or read.
1346  */
1347  offset += entryLength;
1348  Assert(offset <= QUEUE_PAGESIZE);
1349 
1350  /*
1351  * In a second step check if another entry can possibly be written to the
1352  * page. If so, stay here, we have reached the next position. If not, then
1353  * we need to move on to the next page.
1354  */
1356  {
1357  pageno++;
1358  if (pageno > QUEUE_MAX_PAGE)
1359  pageno = 0; /* wrap around */
1360  offset = 0;
1361  pageJump = true;
1362  }
1363 
1364  SET_QUEUE_POS(*position, pageno, offset);
1365  return pageJump;
1366 }
#define QUEUE_POS_OFFSET(x)
Definition: async.c:204
#define AsyncQueueEntryEmptySize
Definition: async.c:192
#define QUEUE_PAGESIZE
Definition: async.c:314
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:206
#define QUEUEALIGN(len)
Definition: async.c:190
#define Assert(condition)
Definition: c.h:804
#define QUEUE_MAX_PAGE
Definition: async.c:331
#define QUEUE_POS_PAGE(x)
Definition: async.c:203

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2157 of file async.c.

References Assert, asyncQueuePagePrecedes(), i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

2158 {
2159  QueuePosition min;
2160  int oldtailpage;
2161  int newtailpage;
2162  int boundary;
2163 
2164  /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2165  LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2166 
2167  /*
2168  * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2169  * (ie, exactly match at least one backend's queue position), so it must
2170  * be updated atomically with the actual computation. Since v13, we could
2171  * get away with not doing it like that, but it seems prudent to keep it
2172  * so.
2173  *
2174  * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2175  * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2176  * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2177  * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2178  * there are pages we can truncate but haven't yet finished doing so.
2179  *
2180  * For concurrency's sake, we don't want to hold NotifyQueueLock while
2181  * performing SimpleLruTruncate. This is OK because no backend will try
2182  * to access the pages we are in the midst of truncating.
2183  */
2184  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2185  min = QUEUE_HEAD;
2187  {
2189  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2190  }
2191  QUEUE_TAIL = min;
2192  oldtailpage = QUEUE_STOP_PAGE;
2193  LWLockRelease(NotifyQueueLock);
2194 
2195  /*
2196  * We can truncate something if the global tail advanced across an SLRU
2197  * segment boundary.
2198  *
2199  * XXX it might be better to truncate only once every several segments, to
2200  * reduce the number of directory scans.
2201  */
2202  newtailpage = QUEUE_POS_PAGE(min);
2203  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2204  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2205  {
2206  /*
2207  * SimpleLruTruncate() will ask for NotifySLRULock but will also
2208  * release the lock again.
2209  */
2210  SimpleLruTruncate(NotifyCtl, newtailpage);
2211 
2212  /*
2213  * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
2214  * for the segment immediately prior to the old tail, allowing fresh
2215  * data into that segment.
2216  */
2217  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2218  QUEUE_STOP_PAGE = newtailpage;
2219  LWLockRelease(NotifyQueueLock);
2220  }
2221 
2222  LWLockRelease(NotifyQueueTailLock);
2223 }
#define QUEUE_TAIL
Definition: async.c:300
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1226
#define QUEUE_HEAD
Definition: async.c:299
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define NotifyCtl
Definition: async.c:313
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507
int BackendId
Definition: backendid.h:21
#define QUEUE_STOP_PAGE
Definition: async.c:301
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
int i
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define QUEUE_POS_PAGE(x)
Definition: async.c:203
#define QUEUE_POS_MIN(x, y)
Definition: async.c:219
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1576 of file async.c.

References Assert, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

1577 {
1578  double fillDegree;
1579  TimestampTz t;
1580 
1581  fillDegree = asyncQueueUsage();
1582  if (fillDegree < 0.5)
1583  return;
1584 
1585  t = GetCurrentTimestamp();
1586 
1589  {
1590  QueuePosition min = QUEUE_HEAD;
1591  int32 minPid = InvalidPid;
1592 
1594  {
1596  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1597  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1598  minPid = QUEUE_BACKEND_PID(i);
1599  }
1600 
1601  ereport(WARNING,
1602  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1603  (minPid != InvalidPid ?
1604  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1605  : 0),
1606  (minPid != InvalidPid ?
1607  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1608  : 0)));
1609 
1611  }
1612 }
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
static double asyncQueueUsage(void)
Definition: async.c:1547
#define QUEUE_HEAD
Definition: async.c:299
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
signed int int32
Definition: c.h:429
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
int errdetail(const char *fmt,...)
Definition: elog.c:1042
static AsyncQueueControl * asyncQueueControl
Definition: async.c:297
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:315
#define WARNING
Definition: elog.h:40
int BackendId
Definition: backendid.h:21
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
TimestampTz lastQueueFillWarn
Definition: async.c:292
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:212
#define QUEUE_POS_MIN(x, y)
Definition: async.c:219
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1306 of file async.c.

References asyncQueuePagePrecedes(), QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, and SLRU_PAGES_PER_SEGMENT.

Referenced by PreCommit_Notify().

1307 {
1308  int nexthead;
1309  int boundary;
1310 
1311  /*
1312  * The queue is full if creating a new head page would create a page that
1313  * logically precedes the current global tail pointer, ie, the head
1314  * pointer would wrap around compared to the tail. We cannot create such
1315  * a head page for fear of confusing slru.c. For safety we round the tail
1316  * pointer back to a segment boundary (truncation logic in
1317  * asyncQueueAdvanceTail does not do this, so doing it here is optional).
1318  *
1319  * Note that this test is *not* dependent on how much space there is on
1320  * the current head page. This is necessary because asyncQueueAddEntries
1321  * might try to create the next head page in any case.
1322  */
1323  nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
1324  if (nexthead > QUEUE_MAX_PAGE)
1325  nexthead = 0; /* wrap around */
1326  boundary = QUEUE_STOP_PAGE;
1327  boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
1328  return asyncQueuePagePrecedes(nexthead, boundary);
1329 }
#define QUEUE_HEAD
Definition: async.c:299
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507
#define QUEUE_STOP_PAGE
Definition: async.c:301
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:34
#define QUEUE_MAX_PAGE
Definition: async.c:331
#define QUEUE_POS_PAGE(x)
Definition: async.c:203

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1372 of file async.c.

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, AsyncQueueEntry::data, Notification::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

1373 {
1374  size_t channellen = n->channel_len;
1375  size_t payloadlen = n->payload_len;
1376  int entryLength;
1377 
1378  Assert(channellen < NAMEDATALEN);
1379  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1380 
1381  /* The terminators are already included in AsyncQueueEntryEmptySize */
1382  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1383  entryLength = QUEUEALIGN(entryLength);
1384  qe->length = entryLength;
1385  qe->dboid = MyDatabaseId;
1386  qe->xid = GetCurrentTransactionId();
1387  qe->srcPid = MyProcPid;
1388  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1389 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:186
int MyProcPid
Definition: globals.c:43
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
#define NAMEDATALEN
#define AsyncQueueEntryEmptySize
Definition: async.c:192
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:439
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:166
#define QUEUEALIGN(len)
Definition: async.c:190
Oid MyDatabaseId
Definition: globals.c:88
#define Assert(condition)
Definition: c.h:804
uint16 channel_len
Definition: async.c:401
uint16 payload_len
Definition: async.c:402
TransactionId xid
Definition: async.c:184
int32 srcPid
Definition: async.c:185

◆ asyncQueuePageDiff()

static int asyncQueuePageDiff ( int  p,
int  q 
)
static

Definition at line 481 of file async.c.

References Assert, and QUEUE_MAX_PAGE.

Referenced by asyncQueuePagePrecedes(), and SignalBackends().

482 {
483  int diff;
484 
485  /*
486  * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
487  * in the range 0..QUEUE_MAX_PAGE.
488  */
489  Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
490  Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
491 
492  diff = p - q;
493  if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
494  diff -= QUEUE_MAX_PAGE + 1;
495  else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
496  diff += QUEUE_MAX_PAGE + 1;
497  return diff;
498 }
#define Assert(condition)
Definition: c.h:804
#define QUEUE_MAX_PAGE
Definition: async.c:331

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int  p,
int  q 
)
static

Definition at line 507 of file async.c.

References asyncQueuePageDiff().

Referenced by asyncQueueAdvanceTail(), asyncQueueIsFull(), and AsyncShmemInit().

508 {
509  return asyncQueuePageDiff(p, q) < 0;
510 }
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:481

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2065 of file async.c.

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

2069 {
2070  bool reachedStop = false;
2071  bool reachedEndOfPage;
2072  AsyncQueueEntry *qe;
2073 
2074  do
2075  {
2076  QueuePosition thisentry = *current;
2077 
2078  if (QUEUE_POS_EQUAL(thisentry, stop))
2079  break;
2080 
2081  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2082 
2083  /*
2084  * Advance *current over this message, possibly to the next page. As
2085  * noted in the comments for asyncQueueReadAllNotifications, we must
2086  * do this before possibly failing while processing the message.
2087  */
2088  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2089 
2090  /* Ignore messages destined for other databases */
2091  if (qe->dboid == MyDatabaseId)
2092  {
2093  if (XidInMVCCSnapshot(qe->xid, snapshot))
2094  {
2095  /*
2096  * The source transaction is still in progress, so we can't
2097  * process this message yet. Break out of the loop, but first
2098  * back up *current so we will reprocess the message next
2099  * time. (Note: it is unlikely but not impossible for
2100  * TransactionIdDidCommit to fail, so we can't really avoid
2101  * this advance-then-back-up behavior when dealing with an
2102  * uncommitted message.)
2103  *
2104  * Note that we must test XidInMVCCSnapshot before we test
2105  * TransactionIdDidCommit, else we might return a message from
2106  * a transaction that is not yet visible to snapshots; compare
2107  * the comments at the head of heapam_visibility.c.
2108  *
2109  * Also, while our own xact won't be listed in the snapshot,
2110  * we need not check for TransactionIdIsCurrentTransactionId
2111  * because our transaction cannot (yet) have queued any
2112  * messages.
2113  */
2114  *current = thisentry;
2115  reachedStop = true;
2116  break;
2117  }
2118  else if (TransactionIdDidCommit(qe->xid))
2119  {
2120  /* qe->data is the null-terminated channel name */
2121  char *channel = qe->data;
2122 
2123  if (IsListeningOn(channel))
2124  {
2125  /* payload follows channel name */
2126  char *payload = qe->data + strlen(channel) + 1;
2127 
2128  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2129  }
2130  }
2131  else
2132  {
2133  /*
2134  * The source transaction aborted or crashed, so we just
2135  * ignore its notifications.
2136  */
2137  }
2138  }
2139 
2140  /* Loop back if we're not at end of page */
2141  } while (!reachedEndOfPage);
2142 
2143  if (QUEUE_POS_EQUAL(*current, stop))
2144  reachedStop = true;
2145 
2146  return reachedStop;
2147 }
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:186
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:2257
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1337
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
static bool IsListeningOn(const char *channel)
Definition: async.c:1246
#define QUEUE_POS_OFFSET(x)
Definition: async.c:204
Oid MyDatabaseId
Definition: globals.c:88
TransactionId xid
Definition: async.c:184
int32 srcPid
Definition: async.c:185
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:212
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2278

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1900 of file async.c.

References Assert, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyBackendId, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), and ProcessIncomingNotify().

1901 {
1902  volatile QueuePosition pos;
1903  QueuePosition head;
1904  Snapshot snapshot;
1905 
1906  /* page_buffer must be adequately aligned, so use a union */
1907  union
1908  {
1909  char buf[QUEUE_PAGESIZE];
1910  AsyncQueueEntry align;
1911  } page_buffer;
1912 
1913  /* Fetch current state */
1914  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1915  /* Assert checks that we have a valid state entry */
1918  head = QUEUE_HEAD;
1919  LWLockRelease(NotifyQueueLock);
1920 
1921  if (QUEUE_POS_EQUAL(pos, head))
1922  {
1923  /* Nothing to do, we have read all notifications already. */
1924  return;
1925  }
1926 
1927  /*----------
1928  * Get snapshot we'll use to decide which xacts are still in progress.
1929  * This is trickier than it might seem, because of race conditions.
1930  * Consider the following example:
1931  *
1932  * Backend 1: Backend 2:
1933  *
1934  * transaction starts
1935  * UPDATE foo SET ...;
1936  * NOTIFY foo;
1937  * commit starts
1938  * queue the notify message
1939  * transaction starts
1940  * LISTEN foo; -- first LISTEN in session
1941  * SELECT * FROM foo WHERE ...;
1942  * commit to clog
1943  * commit starts
1944  * add backend 2 to array of listeners
1945  * advance to queue head (this code)
1946  * commit to clog
1947  *
1948  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1949  * wasn't committed yet. Ideally we'd ensure that client 2 would
1950  * eventually get transaction 1's notify message, but there's no way
1951  * to do that; until we're in the listener array, there's no guarantee
1952  * that the notify message doesn't get removed from the queue.
1953  *
1954  * Therefore the coding technique transaction 2 is using is unsafe:
1955  * applications must commit a LISTEN before inspecting database state,
1956  * if they want to ensure they will see notifications about subsequent
1957  * changes to that state.
1958  *
1959  * What we do guarantee is that we'll see all notifications from
1960  * transactions committing after the snapshot we take here.
1961  * Exec_ListenPreCommit has already added us to the listener array,
1962  * so no not-yet-committed messages can be removed from the queue
1963  * before we see them.
1964  *----------
1965  */
1966  snapshot = RegisterSnapshot(GetLatestSnapshot());
1967 
1968  /*
1969  * It is possible that we fail while trying to send a message to our
1970  * frontend (for example, because of encoding conversion failure). If
1971  * that happens it is critical that we not try to send the same message
1972  * over and over again. Therefore, we place a PG_TRY block here that will
1973  * forcibly advance our queue position before we lose control to an error.
1974  * (We could alternatively retake NotifyQueueLock and move the position
1975  * before handling each individual message, but that seems like too much
1976  * lock traffic.)
1977  */
1978  PG_TRY();
1979  {
1980  bool reachedStop;
1981 
1982  do
1983  {
1984  int curpage = QUEUE_POS_PAGE(pos);
1985  int curoffset = QUEUE_POS_OFFSET(pos);
1986  int slotno;
1987  int copysize;
1988 
1989  /*
1990  * We copy the data from SLRU into a local buffer, so as to avoid
1991  * holding the NotifySLRULock while we are examining the entries
1992  * and possibly transmitting them to our frontend. Copy only the
1993  * part of the page we will actually inspect.
1994  */
1995  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
1997  if (curpage == QUEUE_POS_PAGE(head))
1998  {
1999  /* we only want to read as far as head */
2000  copysize = QUEUE_POS_OFFSET(head) - curoffset;
2001  if (copysize < 0)
2002  copysize = 0; /* just for safety */
2003  }
2004  else
2005  {
2006  /* fetch all the rest of the page */
2007  copysize = QUEUE_PAGESIZE - curoffset;
2008  }
2009  memcpy(page_buffer.buf + curoffset,
2010  NotifyCtl->shared->page_buffer[slotno] + curoffset,
2011  copysize);
2012  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2013  LWLockRelease(NotifySLRULock);
2014 
2015  /*
2016  * Process messages up to the stop position, end of page, or an
2017  * uncommitted message.
2018  *
2019  * Our stop position is what we found to be the head's position
2020  * when we entered this function. It might have changed already.
2021  * But if it has, we will receive (or have already received and
2022  * queued) another signal and come here again.
2023  *
2024  * We are not holding NotifyQueueLock here! The queue can only
2025  * extend beyond the head pointer (see above) and we leave our
2026  * backend's pointer where it is so nobody will truncate or
2027  * rewrite pages under us. Especially we don't want to hold a lock
2028  * while sending the notifications to the frontend.
2029  */
2030  reachedStop = asyncQueueProcessPageEntries(&pos, head,
2031  page_buffer.buf,
2032  snapshot);
2033  } while (!reachedStop);
2034  }
2035  PG_FINALLY();
2036  {
2037  /* Update shared state */
2038  LWLockAcquire(NotifyQueueLock, LW_SHARED);
2040  LWLockRelease(NotifyQueueLock);
2041  }
2042  PG_END_TRY();
2043 
2044  /* Done with snapshot */
2045  UnregisterSnapshot(snapshot);
2046 }
int MyProcPid
Definition: globals.c:43
BackendId MyBackendId
Definition: globals.c:84
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:825
#define QUEUE_HEAD
Definition: async.c:299
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define NotifyCtl
Definition: async.c:313
#define QUEUE_POS_OFFSET(x)
Definition: async.c:204
static char * buf
Definition: pg_test_fsync.c:68
#define QUEUE_PAGESIZE
Definition: async.c:314
#define InvalidTransactionId
Definition: transam.h:31
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:867
#define PG_FINALLY()
Definition: elog.h:330
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:495
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2065
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:325
#define PG_TRY()
Definition: elog.h:313
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define PG_END_TRY()
Definition: elog.h:338
#define QUEUE_POS_PAGE(x)
Definition: async.c:203
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:212

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1265 of file async.c.

References amRegisteredListener, Assert, i, InvalidBackendId, InvalidOid, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

1266 {
1267  Assert(listenChannels == NIL); /* else caller error */
1268 
1269  if (!amRegisteredListener) /* nothing to do */
1270  return;
1271 
1272  /*
1273  * Need exclusive lock here to manipulate list links.
1274  */
1275  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1276  /* Mark our entry as invalid */
1279  /* and remove it from the list */
1282  else
1283  {
1285  {
1287  {
1289  break;
1290  }
1291  }
1292  }
1294  LWLockRelease(NotifyQueueLock);
1295 
1296  /* mark ourselves as no longer listed in the global array */
1297  amRegisteredListener = false;
1298 }
#define NIL
Definition: pg_list.h:65
BackendId MyBackendId
Definition: globals.c:84
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
static List * listenChannels
Definition: async.c:338
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:437
int BackendId
Definition: backendid.h:21
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:304
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
int i
#define InvalidPid
Definition: miscadmin.h:32

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1547 of file async.c.

References QUEUE_HEAD, QUEUE_MAX_PAGE, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

1548 {
1549  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1550  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1551  int occupied;
1552 
1553  occupied = headPage - tailPage;
1554 
1555  if (occupied == 0)
1556  return (double) 0; /* fast exit for common case */
1557 
1558  if (occupied < 0)
1559  {
1560  /* head has wrapped around, tail not yet */
1561  occupied += QUEUE_MAX_PAGE + 1;
1562  }
1563 
1564  return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1565 }
#define QUEUE_TAIL
Definition: async.c:300
#define QUEUE_HEAD
Definition: async.c:299
#define QUEUE_MAX_PAGE
Definition: async.c:331
#define QUEUE_POS_PAGE(x)
Definition: async.c:203

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 533 of file async.c.

References add_size(), asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateSharedMemoryAndSemaphores().

534 {
535  bool found;
536  Size size;
537 
538  /*
539  * Create or attach to the AsyncQueueControl structure.
540  *
541  * The used entries in the backend[] array run from 1 to MaxBackends; the
542  * zero'th entry is unused but must be allocated.
543  */
544  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
545  size = add_size(size, offsetof(AsyncQueueControl, backend));
546 
548  ShmemInitStruct("Async Queue Control", size, &found);
549 
550  if (!found)
551  {
552  /* First time through, so initialize it */
553  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
554  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
555  QUEUE_STOP_PAGE = 0;
558  /* zero'th entry won't be used, but let's initialize it anyway */
559  for (int i = 0; i <= MaxBackends; i++)
560  {
565  }
566  }
567 
568  /*
569  * Set up SLRU management of the pg_notify data.
570  */
571  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
573  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
575 
576  if (!found)
577  {
578  /*
579  * During start or reboot, clean out the pg_notify directory.
580  */
582  }
583 }
#define QUEUE_TAIL
Definition: async.c:300
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1530
#define QUEUE_HEAD
Definition: async.c:299
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:187
#define NotifyCtl
Definition: async.c:313
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
int MaxBackends
Definition: globals.c:139
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:206
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507
static AsyncQueueControl * asyncQueueControl
Definition: async.c:297
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:304
#define QUEUE_STOP_PAGE
Definition: async.c:301
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1553
size_t Size
Definition: c.h:540
int i
TimestampTz lastQueueFillWarn
Definition: async.c:292
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define offsetof(type, field)
Definition: c.h:727
#define InvalidPid
Definition: miscadmin.h:32

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 516 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

517 {
518  Size size;
519 
520  /* This had better match AsyncShmemInit */
521  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
522  size = add_size(size, offsetof(AsyncQueueControl, backend));
523 
525 
526  return size;
527 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:139
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1720 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1721 {
1722  /*
1723  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1724  * we have registered as a listener but have not made any entry in
1725  * listenChannels. In that case, deregister again.
1726  */
1729 
1730  /* And clean up */
1732 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:338
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2436
static void asyncQueueUnregister(void)
Definition: async.c:1265
static bool amRegisteredListener
Definition: async.c:437

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1002 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

1003 {
1004  ListCell *p;
1005 
1006  /*
1007  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1008  * return as soon as possible
1009  */
1011  return;
1012 
1013  if (Trace_notify)
1014  elog(DEBUG1, "AtCommit_Notify");
1015 
1016  /* Perform any pending listen/unlisten actions */
1017  if (pendingActions != NULL)
1018  {
1019  foreach(p, pendingActions->actions)
1020  {
1021  ListenAction *actrec = (ListenAction *) lfirst(p);
1022 
1023  switch (actrec->action)
1024  {
1025  case LISTEN_LISTEN:
1026  Exec_ListenCommit(actrec->channel);
1027  break;
1028  case LISTEN_UNLISTEN:
1029  Exec_UnlistenCommit(actrec->channel);
1030  break;
1031  case LISTEN_UNLISTEN_ALL:
1033  break;
1034  }
1035  }
1036  }
1037 
1038  /* If no longer listening to anything, get out of listener array */
1041 
1042  /*
1043  * Send signals to listening backends. We need do this only if there are
1044  * pending notifies, which were previously added to the shared queue by
1045  * PreCommit_Notify().
1046  */
1047  if (pendingNotifies != NULL)
1048  SignalBackends();
1049 
1050  /*
1051  * If it's time to try to advance the global tail pointer, do that.
1052  *
1053  * (It might seem odd to do this in the sender, when more than likely the
1054  * listeners won't yet have read the messages we just sent. However,
1055  * there's less contention if only the sender does it, and there is little
1056  * need for urgency in advancing the global tail. So this typically will
1057  * be clearing out messages that were sent some time ago.)
1058  */
1059  if (tryAdvanceTail)
1060  {
1061  tryAdvanceTail = false;
1063  }
1064 
1065  /* And clean up */
1067 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:370
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1228
static List * listenChannels
Definition: async.c:338
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2436
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1197
static NotificationList * pendingNotifies
Definition: async.c:422
List * actions
Definition: async.c:366
static void asyncQueueAdvanceTail(void)
Definition: async.c:2157
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1170
static void SignalBackends(void)
Definition: async.c:1630
static void asyncQueueUnregister(void)
Definition: async.c:1265
static bool amRegisteredListener
Definition: async.c:437
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:360
ListenActionKind action
Definition: async.c:359
static bool tryAdvanceTail
Definition: async.c:440

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 870 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

871 {
872  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
874  ereport(ERROR,
875  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
876  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
877 }
static ActionList * pendingActions
Definition: async.c:370
int errcode(int sqlerrcode)
Definition: elog.c:698
static NotificationList * pendingNotifies
Definition: async.c:422
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1810 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1811 {
1812  int my_level = GetCurrentTransactionNestLevel();
1813 
1814  /*
1815  * All we have to do is pop the stack --- the actions/notifies made in
1816  * this subxact are no longer interesting, and the space will be freed
1817  * when CurTransactionContext is recycled. We still have to free the
1818  * ActionList and NotificationList objects themselves, though, because
1819  * those are allocated in TopTransactionContext.
1820  *
1821  * Note that there might be no entries at all, or no entries for the
1822  * current subtransaction level, either because none were ever created, or
1823  * because we reentered this routine due to trouble during subxact abort.
1824  */
1825  while (pendingActions != NULL &&
1826  pendingActions->nestingLevel >= my_level)
1827  {
1828  ActionList *childPendingActions = pendingActions;
1829 
1831  pfree(childPendingActions);
1832  }
1833 
1834  while (pendingNotifies != NULL &&
1835  pendingNotifies->nestingLevel >= my_level)
1836  {
1837  NotificationList *childPendingNotifies = pendingNotifies;
1838 
1840  pfree(childPendingNotifies);
1841  }
1842 }
static ActionList * pendingActions
Definition: async.c:370
int nestingLevel
Definition: async.c:409
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:422
struct NotificationList * upper
Definition: async.c:412
int nestingLevel
Definition: async.c:365
struct ActionList * upper
Definition: async.c:367
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:858

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1740 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1741 {
1742  int my_level = GetCurrentTransactionNestLevel();
1743 
1744  /* If there are actions at our nesting level, we must reparent them. */
1745  if (pendingActions != NULL &&
1746  pendingActions->nestingLevel >= my_level)
1747  {
1748  if (pendingActions->upper == NULL ||
1749  pendingActions->upper->nestingLevel < my_level - 1)
1750  {
1751  /* nothing to merge; give the whole thing to the parent */
1753  }
1754  else
1755  {
1756  ActionList *childPendingActions = pendingActions;
1757 
1759 
1760  /*
1761  * Mustn't try to eliminate duplicates here --- see queue_listen()
1762  */
1765  childPendingActions->actions);
1766  pfree(childPendingActions);
1767  }
1768  }
1769 
1770  /* If there are notifies at our nesting level, we must reparent them. */
1771  if (pendingNotifies != NULL &&
1772  pendingNotifies->nestingLevel >= my_level)
1773  {
1774  Assert(pendingNotifies->nestingLevel == my_level);
1775 
1776  if (pendingNotifies->upper == NULL ||
1777  pendingNotifies->upper->nestingLevel < my_level - 1)
1778  {
1779  /* nothing to merge; give the whole thing to the parent */
1781  }
1782  else
1783  {
1784  /*
1785  * Formerly, we didn't bother to eliminate duplicates here, but
1786  * now we must, else we fall foul of "Assert(!found)", either here
1787  * or during a later attempt to build the parent-level hashtable.
1788  */
1789  NotificationList *childPendingNotifies = pendingNotifies;
1790  ListCell *l;
1791 
1793  /* Insert all the subxact's events into parent, except for dups */
1794  foreach(l, childPendingNotifies->events)
1795  {
1796  Notification *childn = (Notification *) lfirst(l);
1797 
1798  if (!AsyncExistsPendingNotify(childn))
1799  AddEventToPendingNotifies(childn);
1800  }
1801  pfree(childPendingNotifies);
1802  }
1803  }
1804 }
List * events
Definition: async.c:410
static ActionList * pendingActions
Definition: async.c:370
List * list_concat(List *list1, const List *list2)
Definition: list.c:530
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2302
int nestingLevel
Definition: async.c:409
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:422
List * actions
Definition: async.c:366
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2343
struct NotificationList * upper
Definition: async.c:412
int nestingLevel
Definition: async.c:365
struct ActionList * upper
Definition: async.c:367
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:858
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2436 of file async.c.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

2437 {
2438  /*
2439  * Everything's allocated in either TopTransactionContext or the context
2440  * for the subtransaction to which it corresponds. So, there's nothing to
2441  * do here except reset the pointers; the space will be reclaimed when the
2442  * contexts are deleted.
2443  */
2444  pendingActions = NULL;
2445  pendingNotifies = NULL;
2446 }
static ActionList * pendingActions
Definition: async.c:370
static NotificationList * pendingNotifies
Definition: async.c:422

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1170 of file async.c.

References IsListeningOn(), lappend(), MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

1171 {
1172  MemoryContext oldcontext;
1173 
1174  /* Do nothing if we are already listening on this channel */
1175  if (IsListeningOn(channel))
1176  return;
1177 
1178  /*
1179  * Add the new channel name to listenChannels.
1180  *
1181  * XXX It is theoretically possible to get an out-of-memory failure here,
1182  * which would be bad because we already committed. For the moment it
1183  * doesn't seem worth trying to guard against that, but maybe improve this
1184  * later.
1185  */
1188  MemoryContextSwitchTo(oldcontext);
1189 }
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:338
static bool IsListeningOn(const char *channel)
Definition: async.c:1246
MemoryContext TopMemoryContext
Definition: mcxt.c:48
List * lappend(List *list, void *datum)
Definition: list.c:336

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1075 of file async.c.

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, InvalidBackendId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, MyDatabaseId, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

1076 {
1077  QueuePosition head;
1078  QueuePosition max;
1079  BackendId prevListener;
1080 
1081  /*
1082  * Nothing to do if we are already listening to something, nor if we
1083  * already ran this routine in this transaction.
1084  */
1086  return;
1087 
1088  if (Trace_notify)
1089  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1090 
1091  /*
1092  * Before registering, make sure we will unlisten before dying. (Note:
1093  * this action does not get undone if we abort later.)
1094  */
1096  {
1098  unlistenExitRegistered = true;
1099  }
1100 
1101  /*
1102  * This is our first LISTEN, so establish our pointer.
1103  *
1104  * We set our pointer to the global tail pointer and then move it forward
1105  * over already-committed notifications. This ensures we cannot miss any
1106  * not-yet-committed notifications. We might get a few more but that
1107  * doesn't hurt.
1108  *
1109  * In some scenarios there might be a lot of committed notifications that
1110  * have not yet been pruned away (because some backend is being lazy about
1111  * reading them). To reduce our startup time, we can look at other
1112  * backends and adopt the maximum "pos" pointer of any backend that's in
1113  * our database; any notifications it's already advanced over are surely
1114  * committed and need not be re-examined by us. (We must consider only
1115  * backends connected to our DB, because others will not have bothered to
1116  * check committed-ness of notifications in our DB.)
1117  *
1118  * We need exclusive lock here so we can look at other backends' entries
1119  * and manipulate the list links.
1120  */
1121  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1122  head = QUEUE_HEAD;
1123  max = QUEUE_TAIL;
1124  prevListener = InvalidBackendId;
1126  {
1128  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1129  /* Also find last listening backend before this one */
1130  if (i < MyBackendId)
1131  prevListener = i;
1132  }
1136  /* Insert backend into list of listeners at correct position */
1137  if (prevListener > 0)
1138  {
1140  QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
1141  }
1142  else
1143  {
1146  }
1147  LWLockRelease(NotifyQueueLock);
1148 
1149  /* Now we are listed in the global array, so remember we're listening */
1150  amRegisteredListener = true;
1151 
1152  /*
1153  * Try to move our pointer forward as far as possible. This will skip
1154  * over already-committed notifications, which we want to do because they
1155  * might be quite stale. Note that we are not yet listening on anything,
1156  * so we won't deliver such notifications to our frontend. Also, although
1157  * our transaction might have executed NOTIFY, those message(s) aren't
1158  * queued yet so we won't skip them here.
1159  */
1160  if (!QUEUE_POS_EQUAL(max, head))
1162 }
#define QUEUE_TAIL
Definition: async.c:300
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
BackendId MyBackendId
Definition: globals.c:84
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
#define QUEUE_HEAD
Definition: async.c:299
static bool unlistenExitRegistered
Definition: async.c:434
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:857
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
#define InvalidBackendId
Definition: backendid.h:23
static bool amRegisteredListener
Definition: async.c:437
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:88
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:304
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1900
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232
int i
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:212
#define QUEUE_POS_MAX(x, y)
Definition: async.c:225

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1228 of file async.c.

References DEBUG1, elog, list_free_deep(), MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

1229 {
1230  if (Trace_notify)
1231  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1232 
1234  listenChannels = NIL;
1235 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static List * listenChannels
Definition: async.c:338
void list_free_deep(List *list)
Definition: list.c:1405
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1197 of file async.c.

References DEBUG1, elog, foreach_delete_current, lfirst, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

1198 {
1199  ListCell *q;
1200 
1201  if (Trace_notify)
1202  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1203 
1204  foreach(q, listenChannels)
1205  {
1206  char *lchan = (char *) lfirst(q);
1207 
1208  if (strcmp(lchan, channel) == 0)
1209  {
1211  pfree(lchan);
1212  break;
1213  }
1214  }
1215 
1216  /*
1217  * We do not complain about unlistening something not being listened;
1218  * should we?
1219  */
1220 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static List * listenChannels
Definition: async.c:338
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:369
void pfree(void *pointer)
Definition: mcxt.c:1169
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1853 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1854 {
1855  /*
1856  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1857  * you do here.
1858  */
1859 
1860  /* signal that work needs to be done */
1861  notifyInterruptPending = true;
1862 
1863  /* make sure the event is processed in due course */
1864  SetLatch(MyLatch);
1865 }
void SetLatch(Latch *latch)
Definition: latch.c:567
struct Latch * MyLatch
Definition: globals.c:57
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1246 of file async.c.

References lfirst.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

1247 {
1248  ListCell *p;
1249 
1250  foreach(p, listenChannels)
1251  {
1252  char *lchan = (char *) lfirst(p);
1253 
1254  if (strcmp(lchan, channel) == 0)
1255  return true;
1256  }
1257  return false;
1258 }
static List * listenChannels
Definition: async.c:338
#define lfirst(lc)
Definition: pg_list.h:169

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2406 of file async.c.

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32, hash_any(), and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2407 {
2408  const Notification *k = *(const Notification *const *) key;
2409 
2410  Assert(keysize == sizeof(Notification *));
2411  /* We don't bother to include the payload's trailing null in the hash */
2412  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2413  k->channel_len + k->payload_len + 1));
2414 }
#define DatumGetUInt32(X)
Definition: postgres.h:530
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
#define Assert(condition)
Definition: c.h:804
uint16 channel_len
Definition: async.c:401
uint16 payload_len
Definition: async.c:402

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2420 of file async.c.

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

2421 {
2422  const Notification *k1 = *(const Notification *const *) key1;
2423  const Notification *k2 = *(const Notification *const *) key2;
2424 
2425  Assert(keysize == sizeof(Notification *));
2426  if (k1->channel_len == k2->channel_len &&
2427  k1->payload_len == k2->payload_len &&
2428  memcmp(k1->data, k2->data,
2429  k1->channel_len + k1->payload_len + 2) == 0)
2430  return 0; /* equal */
2431  return 1; /* not equal */
2432 }
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
#define Assert(condition)
Definition: c.h:804
uint16 channel_len
Definition: async.c:401
uint16 payload_len
Definition: async.c:402

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2278 of file async.c.

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2279 {
2281  {
2283 
2284  pq_beginmessage(&buf, 'A');
2285  pq_sendint32(&buf, srcPid);
2286  pq_sendstring(&buf, channel);
2287  pq_sendstring(&buf, payload);
2288  pq_endmessage(&buf);
2289 
2290  /*
2291  * NOTE: we do not do pq_flush() here. Some level of caller will
2292  * handle it later, allowing this message to be combined into a packet
2293  * with other ones.
2294  */
2295  }
2296  else
2297  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2298 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:232
CommandDest whereToSendOutput
Definition: postgres.c:92

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 824 of file async.c.

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

825 {
826  FuncCallContext *funcctx;
827 
828  /* stuff done only on the first call of the function */
829  if (SRF_IS_FIRSTCALL())
830  {
831  /* create a function context for cross-call persistence */
832  funcctx = SRF_FIRSTCALL_INIT();
833  }
834 
835  /* stuff done on every call of the function */
836  funcctx = SRF_PERCALL_SETUP();
837 
838  if (funcctx->call_cntr < list_length(listenChannels))
839  {
840  char *channel = (char *) list_nth(listenChannels,
841  funcctx->call_cntr);
842 
843  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
844  }
845 
846  SRF_RETURN_DONE(funcctx);
847 }
uint64 call_cntr
Definition: funcapi.h:65
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:293
static List * listenChannels
Definition: async.c:338
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:297
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:299
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
static int list_length(const List *l)
Definition: pg_list.h:149
#define CStringGetTextDatum(s)
Definition: builtins.h:86
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:317
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:295

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1522 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

1523 {
1524  double usage;
1525 
1526  /* Advance the queue tail so we don't report a too-large result */
1528 
1529  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1530  usage = asyncQueueUsage();
1531  LWLockRelease(NotifyQueueLock);
1532 
1533  PG_RETURN_FLOAT8(usage);
1534 }
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
static double asyncQueueUsage(void)
Definition: async.c:1547
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
static void asyncQueueAdvanceTail(void)
Definition: async.c:2157
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
static void usage(const char *progname)
Definition: vacuumlo.c:417

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 591 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

592 {
593  const char *channel;
594  const char *payload;
595 
596  if (PG_ARGISNULL(0))
597  channel = "";
598  else
599  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
600 
601  if (PG_ARGISNULL(1))
602  payload = "";
603  else
604  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
605 
606  /* For NOTIFY as a statement, this is checked in ProcessUtility */
608 
609  Async_Notify(channel, payload);
610 
611  PG_RETURN_VOID();
612 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:445
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:625
char * text_to_cstring(const text *t)
Definition: varlena.c:222

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 895 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

896 {
897  ListCell *p;
898 
900  return; /* no relevant statements in this xact */
901 
902  if (Trace_notify)
903  elog(DEBUG1, "PreCommit_Notify");
904 
905  /* Preflight for any pending listen/unlisten actions */
906  if (pendingActions != NULL)
907  {
908  foreach(p, pendingActions->actions)
909  {
910  ListenAction *actrec = (ListenAction *) lfirst(p);
911 
912  switch (actrec->action)
913  {
914  case LISTEN_LISTEN:
916  break;
917  case LISTEN_UNLISTEN:
918  /* there is no Exec_UnlistenPreCommit() */
919  break;
920  case LISTEN_UNLISTEN_ALL:
921  /* there is no Exec_UnlistenAllPreCommit() */
922  break;
923  }
924  }
925  }
926 
927  /* Queue any pending notifies (must happen after the above) */
928  if (pendingNotifies)
929  {
930  ListCell *nextNotify;
931 
932  /*
933  * Make sure that we have an XID assigned to the current transaction.
934  * GetCurrentTransactionId is cheap if we already have an XID, but not
935  * so cheap if we don't, and we'd prefer not to do that work while
936  * holding NotifyQueueLock.
937  */
938  (void) GetCurrentTransactionId();
939 
940  /*
941  * Serialize writers by acquiring a special lock that we hold till
942  * after commit. This ensures that queue entries appear in commit
943  * order, and in particular that there are never uncommitted queue
944  * entries ahead of committed ones, so an uncommitted transaction
945  * can't block delivery of deliverable notifications.
946  *
947  * We use a heavyweight lock so that it'll automatically be released
948  * after either commit or abort. This also allows deadlocks to be
949  * detected, though really a deadlock shouldn't be possible here.
950  *
951  * The lock is on "database 0", which is pretty ugly but it doesn't
952  * seem worth inventing a special locktag category just for this.
953  * (Historical note: before PG 9.0, a similar lock on "database 0" was
954  * used by the flatfiles mechanism.)
955  */
956  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
958 
959  /* Now push the notifications into the queue */
960  nextNotify = list_head(pendingNotifies->events);
961  while (nextNotify != NULL)
962  {
963  /*
964  * Add the pending notifications to the queue. We acquire and
965  * release NotifyQueueLock once per page, which might be overkill
966  * but it does allow readers to get in while we're doing this.
967  *
968  * A full queue is very uncommon and should really not happen,
969  * given that we have so much space available in the SLRU pages.
970  * Nevertheless we need to deal with this possibility. Note that
971  * when we get here we are in the process of committing our
972  * transaction, but we have not yet committed to clog, so at this
973  * point in time we can still roll the transaction back.
974  */
975  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
977  if (asyncQueueIsFull())
978  ereport(ERROR,
979  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
980  errmsg("too many notifications in the NOTIFY queue")));
981  nextNotify = asyncQueueAddEntries(nextNotify);
982  LWLockRelease(NotifyQueueLock);
983  }
984 
985  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
986  }
987 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:410
static ActionList * pendingActions
Definition: async.c:370
int errcode(int sqlerrcode)
Definition: elog.c:698
static bool asyncQueueIsFull(void)
Definition: async.c:1306
static void asyncQueueFillWarning(void)
Definition: async.c:1576
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
static NotificationList * pendingNotifies
Definition: async.c:422
#define ERROR
Definition: elog.h:46
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:439
List * actions
Definition: async.c:366
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1018
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1408
#define lfirst(lc)
Definition: pg_list.h:169
static void Exec_ListenPreCommit(void)
Definition: async.c:1075
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
bool Trace_notify
Definition: async.c:443
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
ListenActionKind action
Definition: async.c:359

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 2237 of file async.c.

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

2238 {
2239  /* We *must* reset the flag */
2240  notifyInterruptPending = false;
2241 
2242  /* Do nothing else if we aren't actively listening */
2243  if (listenChannels == NIL)
2244  return;
2245 
2246  if (Trace_notify)
2247  elog(DEBUG1, "ProcessIncomingNotify");
2248 
2249  set_ps_display("notify interrupt");
2250 
2251  /*
2252  * We must run asyncQueueReadAllNotifications inside a transaction, else
2253  * bad things happen if it gets an error.
2254  */
2256 
2258 
2260 
2261  /*
2262  * If this isn't an end-of-command case, we must flush the notify messages
2263  * to ensure frontend gets them promptly.
2264  */
2265  if (flush)
2266  pq_flush();
2267 
2268  set_ps_display("idle");
2269 
2270  if (Trace_notify)
2271  elog(DEBUG1, "ProcessIncomingNotify: done");
2272 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
#define pq_flush()
Definition: libpq.h:46
void CommitTransactionCommand(void)
Definition: xact.c:2959
static List * listenChannels
Definition: async.c:338
void set_ps_display(const char *activity)
Definition: ps_status.c:349
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1900
void StartTransactionCommand(void)
Definition: xact.c:2858
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1883 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1884 {
1886  return; /* not really idle */
1887 
1888  /* Loop in case another signal arrives while sending messages */
1889  while (notifyInterruptPending)
1890  ProcessIncomingNotify(flush);
1891 }
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2237
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4721
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 724 of file async.c.

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, offsetof, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

725 {
726  MemoryContext oldcontext;
727  ListenAction *actrec;
728  int my_level = GetCurrentTransactionNestLevel();
729 
730  /*
731  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
732  * be too complicated to ensure we get the right interactions of
733  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
734  * would be any performance benefit anyway in sane applications.
735  */
737 
738  /* space for terminating null is included in sizeof(ListenAction) */
739  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
740  strlen(channel) + 1);
741  actrec->action = action;
742  strcpy(actrec->channel, channel);
743 
744  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
745  {
746  ActionList *actions;
747 
748  /*
749  * First action in current sub(xact). Note that we allocate the
750  * ActionList in TopTransactionContext; the nestingLevel might get
751  * changed later by AtSubCommit_Notify.
752  */
753  actions = (ActionList *)
755  actions->nestingLevel = my_level;
756  actions->actions = list_make1(actrec);
757  actions->upper = pendingActions;
758  pendingActions = actions;
759  }
760  else
762 
763  MemoryContextSwitchTo(oldcontext);
764 }
MemoryContext TopTransactionContext
Definition: mcxt.c:53
static ActionList * pendingActions
Definition: async.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:54
#define list_make1(x1)
Definition: pg_list.h:206
List * actions
Definition: async.c:366
List * lappend(List *list, void *datum)
Definition: list.c:336
int nestingLevel
Definition: async.c:365
struct ActionList * upper
Definition: async.c:367
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:858
void * palloc(Size size)
Definition: mcxt.c:1062
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:360
ListenActionKind action
Definition: async.c:359
#define offsetof(type, field)
Definition: c.h:727

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1630 of file async.c.

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, notifyInterruptPending, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by AtCommit_Notify().

1631 {
1632  int32 *pids;
1633  BackendId *ids;
1634  int count;
1635 
1636  /*
1637  * Identify backends that we need to signal. We don't want to send
1638  * signals while holding the NotifyQueueLock, so this loop just builds a
1639  * list of target PIDs.
1640  *
1641  * XXX in principle these pallocs could fail, which would be bad. Maybe
1642  * preallocate the arrays? They're not that large, though.
1643  */
1644  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1645  ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
1646  count = 0;
1647 
1648  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1650  {
1651  int32 pid = QUEUE_BACKEND_PID(i);
1652  QueuePosition pos;
1653 
1654  Assert(pid != InvalidPid);
1655  pos = QUEUE_BACKEND_POS(i);
1657  {
1658  /*
1659  * Always signal listeners in our own database, unless they're
1660  * already caught up (unlikely, but possible).
1661  */
1662  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1663  continue;
1664  }
1665  else
1666  {
1667  /*
1668  * Listeners in other databases should be signaled only if they
1669  * are far behind.
1670  */
1673  continue;
1674  }
1675  /* OK, need to signal this one */
1676  pids[count] = pid;
1677  ids[count] = i;
1678  count++;
1679  }
1680  LWLockRelease(NotifyQueueLock);
1681 
1682  /* Now send signals */
1683  for (int i = 0; i < count; i++)
1684  {
1685  int32 pid = pids[i];
1686 
1687  /*
1688  * If we are signaling our own process, no need to involve the kernel;
1689  * just set the flag directly.
1690  */
1691  if (pid == MyProcPid)
1692  {
1693  notifyInterruptPending = true;
1694  continue;
1695  }
1696 
1697  /*
1698  * Note: assuming things aren't broken, a signal failure here could
1699  * only occur if the target backend exited since we released
1700  * NotifyQueueLock; which is unlikely but certainly possible. So we
1701  * just log a low-level debug message if it happens.
1702  */
1703  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
1704  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1705  }
1706 
1707  pfree(pids);
1708  pfree(ids);
1709 }
int MyProcPid
Definition: globals.c:43
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
#define DEBUG3
Definition: elog.h:23
#define QUEUE_HEAD
Definition: async.c:299
signed int int32
Definition: c.h:429
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
static int asyncQueuePageDiff(int p, int q)
Definition: async.c:481
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
void pfree(void *pointer)
Definition: mcxt.c:1169
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
int MaxBackends
Definition: globals.c:139
#define QUEUE_CLEANUP_DELAY
Definition: async.c:241
int BackendId
Definition: backendid.h:21
Oid MyDatabaseId
Definition: globals.c:88
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:304
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define QUEUE_POS_PAGE(x)
Definition: async.c:203
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:212
#define InvalidPid
Definition: miscadmin.h:32

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 297 of file async.c.

◆ listenChannels

List* listenChannels = NIL
static

Definition at line 338 of file async.c.

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 311 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

ActionList* pendingActions = NULL
static

Definition at line 370 of file async.c.

Referenced by AtSubAbort_Notify(), AtSubCommit_Notify(), and queue_listen().

◆ pendingNotifies

NotificationList* pendingNotifies = NULL
static

Definition at line 422 of file async.c.

Referenced by Async_Notify(), AtSubAbort_Notify(), and AtSubCommit_Notify().

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 440 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 434 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().