PostgreSQL Source Code  git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 
typedef struct NotificationHash NotificationHash
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 

Functions

static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 189 of file async.c.

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 397 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 163 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 310 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 301 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 300 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 303 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 238 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 299 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 313 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 296 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 302 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 311 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 209 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 212 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition: async.c:476
int y
Definition: isn.c:72
int x
Definition: isn.c:71

Definition at line 222 of file async.c.

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 216 of file async.c.

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 201 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 200 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 298 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 297 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 187 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 203 of file async.c.

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationHash

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 332 of file async.c.

333 {
ListenActionKind
Definition: async.c:333
@ LISTEN_LISTEN
Definition: async.c:334
@ LISTEN_UNLISTEN_ALL
Definition: async.c:336
@ LISTEN_UNLISTEN
Definition: async.c:335

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2289 of file async.c.

2290 {
2292 
2293  /* Create the hash table if it's time to */
2295  pendingNotifies->hashtab == NULL)
2296  {
2297  HASHCTL hash_ctl;
2298  ListCell *l;
2299 
2300  /* Create the hash table */
2301  hash_ctl.keysize = sizeof(Notification *);
2302  hash_ctl.entrysize = sizeof(NotificationHash);
2303  hash_ctl.hash = notification_hash;
2304  hash_ctl.match = notification_match;
2305  hash_ctl.hcxt = CurTransactionContext;
2307  hash_create("Pending Notifies",
2308  256L,
2309  &hash_ctl,
2311 
2312  /* Insert all the already-existing events */
2313  foreach(l, pendingNotifies->events)
2314  {
2315  Notification *oldn = (Notification *) lfirst(l);
2316  bool found;
2317 
2319  &oldn,
2320  HASH_ENTER,
2321  &found);
2322  Assert(!found);
2323  }
2324  }
2325 
2326  /* Add new event to the list, in order */
2328 
2329  /* Add event to the hash table if needed */
2330  if (pendingNotifies->hashtab != NULL)
2331  {
2332  bool found;
2333 
2335  &n,
2336  HASH_ENTER,
2337  &found);
2338  Assert(!found);
2339  }
2340 }
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:397
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2348
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2362
struct NotificationHash NotificationHash
static NotificationList * pendingNotifies
Definition: async.c:404
#define Assert(condition)
Definition: c.h:858
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_COMPARE
Definition: hsearch.h:99
#define HASH_FUNCTION
Definition: hsearch.h:98
List * lappend(List *list, void *datum)
Definition: list.c:339
MemoryContext CurTransactionContext
Definition: mcxt.c:155
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
Size keysize
Definition: hsearch.h:75
HashValueFunc hash
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:76
HashCompareFunc match
Definition: hsearch.h:80
MemoryContext hcxt
Definition: hsearch.h:86
HTAB * hashtab
Definition: async.c:393
List * events
Definition: async.c:392

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), notification_match(), and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 738 of file async.c.

739 {
740  if (Trace_notify)
741  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
742 
743  queue_listen(LISTEN_LISTEN, channel);
744 }
bool Trace_notify
Definition: async.c:425
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:690
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:224
int MyProcPid
Definition: globals.c:45

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 591 of file async.c.

592 {
593  int my_level = GetCurrentTransactionNestLevel();
594  size_t channel_len;
595  size_t payload_len;
596  Notification *n;
597  MemoryContext oldcontext;
598 
599  if (IsParallelWorker())
600  elog(ERROR, "cannot send notifications from a parallel worker");
601 
602  if (Trace_notify)
603  elog(DEBUG1, "Async_Notify(%s)", channel);
604 
605  channel_len = channel ? strlen(channel) : 0;
606  payload_len = payload ? strlen(payload) : 0;
607 
608  /* a channel name must be specified */
609  if (channel_len == 0)
610  ereport(ERROR,
611  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
612  errmsg("channel name cannot be empty")));
613 
614  /* enforce length limits */
615  if (channel_len >= NAMEDATALEN)
616  ereport(ERROR,
617  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
618  errmsg("channel name too long")));
619 
620  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
621  ereport(ERROR,
622  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
623  errmsg("payload string too long")));
624 
625  /*
626  * We must construct the Notification entry, even if we end up not using
627  * it, in order to compare it cheaply to existing list entries.
628  *
629  * The notification list needs to live until end of transaction, so store
630  * it in the transaction context.
631  */
633 
634  n = (Notification *) palloc(offsetof(Notification, data) +
635  channel_len + payload_len + 2);
636  n->channel_len = channel_len;
637  n->payload_len = payload_len;
638  strcpy(n->data, channel);
639  if (payload)
640  strcpy(n->data + channel_len + 1, payload);
641  else
642  n->data[channel_len + 1] = '\0';
643 
644  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
645  {
646  NotificationList *notifies;
647 
648  /*
649  * First notify event in current (sub)xact. Note that we allocate the
650  * NotificationList in TopTransactionContext; the nestingLevel might
651  * get changed later by AtSubCommit_Notify.
652  */
653  notifies = (NotificationList *)
655  sizeof(NotificationList));
656  notifies->nestingLevel = my_level;
657  notifies->events = list_make1(n);
658  /* We certainly don't need a hashtable yet */
659  notifies->hashtab = NULL;
660  notifies->upper = pendingNotifies;
661  pendingNotifies = notifies;
662  }
663  else
664  {
665  /* Now check for duplicates */
667  {
668  /* It's a dup, so forget it */
669  pfree(n);
670  MemoryContextSwitchTo(oldcontext);
671  return;
672  }
673 
674  /* Append more events to existing list */
676  }
677 
678  MemoryContextSwitchTo(oldcontext);
679 }
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2248
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2289
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:163
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsParallelWorker()
Definition: parallel.h:60
MemoryContext TopTransactionContext
Definition: mcxt.c:154
void pfree(void *pointer)
Definition: mcxt.c:1520
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1180
void * palloc(Size size)
Definition: mcxt.c:1316
#define NAMEDATALEN
const void * data
#define list_make1(x1)
Definition: pg_list.h:212
MemoryContextSwitchTo(old_ctx)
int nestingLevel
Definition: async.c:391
struct NotificationList * upper
Definition: async.c:394
uint16 payload_len
Definition: async.c:384
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:386
uint16 channel_len
Definition: async.c:383
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:926

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 752 of file async.c.

753 {
754  if (Trace_notify)
755  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
756 
757  /* If we couldn't possibly be listening, no need to queue anything */
758  if (pendingActions == NULL && !unlistenExitRegistered)
759  return;
760 
761  queue_listen(LISTEN_UNLISTEN, channel);
762 }
static ActionList * pendingActions
Definition: async.c:352
static bool unlistenExitRegistered
Definition: async.c:416

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 770 of file async.c.

771 {
772  if (Trace_notify)
773  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
774 
775  /* If we couldn't possibly be listening, no need to queue anything */
776  if (pendingActions == NULL && !unlistenExitRegistered)
777  return;
778 
780 }

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 823 of file async.c.

824 {
827 }
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1194
static void asyncQueueUnregister(void)
Definition: async.c:1231

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2248 of file async.c.

2249 {
2250  if (pendingNotifies == NULL)
2251  return false;
2252 
2253  if (pendingNotifies->hashtab != NULL)
2254  {
2255  /* Use the hash table to probe for a match */
2257  &n,
2258  HASH_FIND,
2259  NULL))
2260  return true;
2261  }
2262  else
2263  {
2264  /* Must scan the event list */
2265  ListCell *l;
2266 
2267  foreach(l, pendingNotifies->events)
2268  {
2269  Notification *oldn = (Notification *) lfirst(l);
2270 
2271  if (n->channel_len == oldn->channel_len &&
2272  n->payload_len == oldn->payload_len &&
2273  memcmp(n->data, oldn->data,
2274  n->channel_len + n->payload_len + 2) == 0)
2275  return true;
2276  }
2277  }
2278 
2279  return false;
2280 }
@ HASH_FIND
Definition: hsearch.h:113

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1356 of file async.c.

1357 {
1358  AsyncQueueEntry qe;
1359  QueuePosition queue_head;
1360  int64 pageno;
1361  int offset;
1362  int slotno;
1363  LWLock *prevlock;
1364 
1365  /*
1366  * We work with a local copy of QUEUE_HEAD, which we write back to shared
1367  * memory upon exiting. The reason for this is that if we have to advance
1368  * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1369  * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1370  * subsequent insertions would try to put entries into a page that slru.c
1371  * thinks doesn't exist yet.) So, use a local position variable. Note
1372  * that if we do fail, any already-inserted queue entries are forgotten;
1373  * this is okay, since they'd be useless anyway after our transaction
1374  * rolls back.
1375  */
1376  queue_head = QUEUE_HEAD;
1377 
1378  /*
1379  * If this is the first write since the postmaster started, we need to
1380  * initialize the first page of the async SLRU. Otherwise, the current
1381  * page should be initialized already, so just fetch it.
1382  */
1383  pageno = QUEUE_POS_PAGE(queue_head);
1384  prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
1385 
1386  /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
1387  LWLockAcquire(prevlock, LW_EXCLUSIVE);
1388 
1389  if (QUEUE_POS_IS_ZERO(queue_head))
1390  slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1391  else
1392  slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1394 
1395  /* Note we mark the page dirty before writing in it */
1396  NotifyCtl->shared->page_dirty[slotno] = true;
1397 
1398  while (nextNotify != NULL)
1399  {
1400  Notification *n = (Notification *) lfirst(nextNotify);
1401 
1402  /* Construct a valid queue entry in local variable qe */
1404 
1405  offset = QUEUE_POS_OFFSET(queue_head);
1406 
1407  /* Check whether the entry really fits on the current page */
1408  if (offset + qe.length <= QUEUE_PAGESIZE)
1409  {
1410  /* OK, so advance nextNotify past this item */
1411  nextNotify = lnext(pendingNotifies->events, nextNotify);
1412  }
1413  else
1414  {
1415  /*
1416  * Write a dummy entry to fill up the page. Actually readers will
1417  * only check dboid and since it won't match any reader's database
1418  * OID, they will ignore this entry and move on.
1419  */
1420  qe.length = QUEUE_PAGESIZE - offset;
1421  qe.dboid = InvalidOid;
1422  qe.data[0] = '\0'; /* empty channel */
1423  qe.data[1] = '\0'; /* empty payload */
1424  }
1425 
1426  /* Now copy qe into the shared buffer page */
1427  memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1428  &qe,
1429  qe.length);
1430 
1431  /* Advance queue_head appropriately, and detect if page is full */
1432  if (asyncQueueAdvance(&(queue_head), qe.length))
1433  {
1434  LWLock *lock;
1435 
1436  pageno = QUEUE_POS_PAGE(queue_head);
1437  lock = SimpleLruGetBankLock(NotifyCtl, pageno);
1438  if (lock != prevlock)
1439  {
1440  LWLockRelease(prevlock);
1441  LWLockAcquire(lock, LW_EXCLUSIVE);
1442  prevlock = lock;
1443  }
1444 
1445  /*
1446  * Page is full, so we're done here, but first fill the next page
1447  * with zeroes. The reason to do this is to ensure that slru.c's
1448  * idea of the head page is always the same as ours, which avoids
1449  * boundary problems in SimpleLruTruncate. The test in
1450  * asyncQueueIsFull() ensured that there is room to create this
1451  * page without overrunning the queue.
1452  */
1453  slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1454 
1455  /*
1456  * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1457  * set flag to remember that we should try to advance the tail
1458  * pointer (we don't want to actually do that right here).
1459  */
1460  if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1461  tryAdvanceTail = true;
1462 
1463  /* And exit the loop */
1464  break;
1465  }
1466  }
1467 
1468  /* Success, so update the global QUEUE_HEAD */
1469  QUEUE_HEAD = queue_head;
1470 
1471  LWLockRelease(prevlock);
1472 
1473  return nextNotify;
1474 }
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1320
static bool tryAdvanceTail
Definition: async.c:422
#define QUEUE_POS_OFFSET(x)
Definition: async.c:201
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1287
#define QUEUE_POS_PAGE(x)
Definition: async.c:200
#define QUEUE_CLEANUP_DELAY
Definition: async.c:238
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:212
#define NotifyCtl
Definition: async.c:310
#define QUEUE_PAGESIZE
Definition: async.c:311
#define QUEUE_HEAD
Definition: async.c:296
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:343
#define InvalidOid
Definition: postgres_ext.h:36
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:488
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:361
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition: slru.h:179
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:183
Definition: lwlock.h:42
#define InvalidTransactionId
Definition: transam.h:31

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1287 of file async.c.

1288 {
1289  int64 pageno = QUEUE_POS_PAGE(*position);
1290  int offset = QUEUE_POS_OFFSET(*position);
1291  bool pageJump = false;
1292 
1293  /*
1294  * Move to the next writing position: First jump over what we have just
1295  * written or read.
1296  */
1297  offset += entryLength;
1298  Assert(offset <= QUEUE_PAGESIZE);
1299 
1300  /*
1301  * In a second step check if another entry can possibly be written to the
1302  * page. If so, stay here, we have reached the next position. If not, then
1303  * we need to move on to the next page.
1304  */
1306  {
1307  pageno++;
1308  offset = 0;
1309  pageJump = true;
1310  }
1311 
1312  SET_QUEUE_POS(*position, pageno, offset);
1313  return pageJump;
1314 }
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:203
#define AsyncQueueEntryEmptySize
Definition: async.c:189
#define QUEUEALIGN(len)
Definition: async.c:187

References Assert, AsyncQueueEntryEmptySize, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2108 of file async.c.

2109 {
2110  QueuePosition min;
2111  int oldtailpage;
2112  int newtailpage;
2113  int boundary;
2114 
2115  /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2116  LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2117 
2118  /*
2119  * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2120  * (ie, exactly match at least one backend's queue position), so it must
2121  * be updated atomically with the actual computation. Since v13, we could
2122  * get away with not doing it like that, but it seems prudent to keep it
2123  * so.
2124  *
2125  * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2126  * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2127  * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2128  * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2129  * there are pages we can truncate but haven't yet finished doing so.
2130  *
2131  * For concurrency's sake, we don't want to hold NotifyQueueLock while
2132  * performing SimpleLruTruncate. This is OK because no backend will try
2133  * to access the pages we are in the midst of truncating.
2134  */
2135  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2136  min = QUEUE_HEAD;
2138  {
2140  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2141  }
2142  QUEUE_TAIL = min;
2143  oldtailpage = QUEUE_STOP_PAGE;
2144  LWLockRelease(NotifyQueueLock);
2145 
2146  /*
2147  * We can truncate something if the global tail advanced across an SLRU
2148  * segment boundary.
2149  *
2150  * XXX it might be better to truncate only once every several segments, to
2151  * reduce the number of directory scans.
2152  */
2153  newtailpage = QUEUE_POS_PAGE(min);
2154  boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2155  if (asyncQueuePagePrecedes(oldtailpage, boundary))
2156  {
2157  /*
2158  * SimpleLruTruncate() will ask for SLRU bank locks but will also
2159  * release the lock again.
2160  */
2161  SimpleLruTruncate(NotifyCtl, newtailpage);
2162 
2163  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2164  QUEUE_STOP_PAGE = newtailpage;
2165  LWLockRelease(NotifyQueueLock);
2166  }
2167 
2168  LWLockRelease(NotifyQueueTailLock);
2169 }
#define QUEUE_FIRST_LISTENER
Definition: async.c:299
#define QUEUE_POS_MIN(x, y)
Definition: async.c:216
#define QUEUE_BACKEND_POS(i)
Definition: async.c:303
#define QUEUE_TAIL
Definition: async.c:297
#define QUEUE_BACKEND_PID(i)
Definition: async.c:300
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:302
#define QUEUE_STOP_PAGE
Definition: async.c:298
int i
Definition: isn.c:73
#define InvalidPid
Definition: miscadmin.h:32
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int ProcNumber
Definition: procnumber.h:24
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1391
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:39

References Assert, asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1527 of file async.c.

1528 {
1529  double fillDegree;
1530  TimestampTz t;
1531 
1532  fillDegree = asyncQueueUsage();
1533  if (fillDegree < 0.5)
1534  return;
1535 
1536  t = GetCurrentTimestamp();
1537 
1540  {
1541  QueuePosition min = QUEUE_HEAD;
1542  int32 minPid = InvalidPid;
1543 
1545  {
1547  min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1548  if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
1549  minPid = QUEUE_BACKEND_PID(i);
1550  }
1551 
1552  ereport(WARNING,
1553  (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1554  (minPid != InvalidPid ?
1555  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1556  : 0),
1557  (minPid != InvalidPid ?
1558  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1559  : 0)));
1560 
1562  }
1563 }
static double asyncQueueUsage(void)
Definition: async.c:1506
static AsyncQueueControl * asyncQueueControl
Definition: async.c:294
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:313
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:209
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1790
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
signed int int32
Definition: c.h:494
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1205
int errhint(const char *fmt,...)
Definition: elog.c:1319
#define WARNING
Definition: elog.h:36
TimestampTz lastQueueFillWarn
Definition: async.c:290

References Assert, asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1272 of file async.c.

1273 {
1274  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1275  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1276  int occupied = headPage - tailPage;
1277 
1278  return occupied >= max_notify_queue_pages;
1279 }
int max_notify_queue_pages
Definition: async.c:428

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1320 of file async.c.

1321 {
1322  size_t channellen = n->channel_len;
1323  size_t payloadlen = n->payload_len;
1324  int entryLength;
1325 
1326  Assert(channellen < NAMEDATALEN);
1327  Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1328 
1329  /* The terminators are already included in AsyncQueueEntryEmptySize */
1330  entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1331  entryLength = QUEUEALIGN(entryLength);
1332  qe->length = entryLength;
1333  qe->dboid = MyDatabaseId;
1334  qe->xid = GetCurrentTransactionId();
1335  qe->srcPid = MyProcPid;
1336  memcpy(qe->data, n->data, channellen + payloadlen + 2);
1337 }
Oid MyDatabaseId
Definition: globals.c:91
int32 srcPid
Definition: async.c:182
TransactionId xid
Definition: async.c:181
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:451

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, AsyncQueueEntry::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 466 of file async.c.

467 {
468  return p - q;
469 }

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 476 of file async.c.

477 {
478  return p < q;
479 }

Referenced by asyncQueueAdvanceTail(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2016 of file async.c.

2020 {
2021  bool reachedStop = false;
2022  bool reachedEndOfPage;
2023  AsyncQueueEntry *qe;
2024 
2025  do
2026  {
2027  QueuePosition thisentry = *current;
2028 
2029  if (QUEUE_POS_EQUAL(thisentry, stop))
2030  break;
2031 
2032  qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2033 
2034  /*
2035  * Advance *current over this message, possibly to the next page. As
2036  * noted in the comments for asyncQueueReadAllNotifications, we must
2037  * do this before possibly failing while processing the message.
2038  */
2039  reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2040 
2041  /* Ignore messages destined for other databases */
2042  if (qe->dboid == MyDatabaseId)
2043  {
2044  if (XidInMVCCSnapshot(qe->xid, snapshot))
2045  {
2046  /*
2047  * The source transaction is still in progress, so we can't
2048  * process this message yet. Break out of the loop, but first
2049  * back up *current so we will reprocess the message next
2050  * time. (Note: it is unlikely but not impossible for
2051  * TransactionIdDidCommit to fail, so we can't really avoid
2052  * this advance-then-back-up behavior when dealing with an
2053  * uncommitted message.)
2054  *
2055  * Note that we must test XidInMVCCSnapshot before we test
2056  * TransactionIdDidCommit, else we might return a message from
2057  * a transaction that is not yet visible to snapshots; compare
2058  * the comments at the head of heapam_visibility.c.
2059  *
2060  * Also, while our own xact won't be listed in the snapshot,
2061  * we need not check for TransactionIdIsCurrentTransactionId
2062  * because our transaction cannot (yet) have queued any
2063  * messages.
2064  */
2065  *current = thisentry;
2066  reachedStop = true;
2067  break;
2068  }
2069  else if (TransactionIdDidCommit(qe->xid))
2070  {
2071  /* qe->data is the null-terminated channel name */
2072  char *channel = qe->data;
2073 
2074  if (IsListeningOn(channel))
2075  {
2076  /* payload follows channel name */
2077  char *payload = qe->data + strlen(channel) + 1;
2078 
2079  NotifyMyFrontEnd(channel, payload, qe->srcPid);
2080  }
2081  }
2082  else
2083  {
2084  /*
2085  * The source transaction aborted or crashed, so we just
2086  * ignore its notifications.
2087  */
2088  }
2089  }
2090 
2091  /* Loop back if we're not at end of page */
2092  } while (!reachedEndOfPage);
2093 
2094  if (QUEUE_POS_EQUAL(*current, stop))
2095  reachedStop = true;
2096 
2097  return reachedStop;
2098 }
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2224
static bool IsListeningOn(const char *channel)
Definition: async.c:1212
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:1856
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1851 of file async.c.

1852 {
1853  volatile QueuePosition pos;
1854  QueuePosition head;
1855  Snapshot snapshot;
1856 
1857  /* page_buffer must be adequately aligned, so use a union */
1858  union
1859  {
1860  char buf[QUEUE_PAGESIZE];
1861  AsyncQueueEntry align;
1862  } page_buffer;
1863 
1864  /* Fetch current state */
1865  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1866  /* Assert checks that we have a valid state entry */
1869  head = QUEUE_HEAD;
1870  LWLockRelease(NotifyQueueLock);
1871 
1872  if (QUEUE_POS_EQUAL(pos, head))
1873  {
1874  /* Nothing to do, we have read all notifications already. */
1875  return;
1876  }
1877 
1878  /*----------
1879  * Get snapshot we'll use to decide which xacts are still in progress.
1880  * This is trickier than it might seem, because of race conditions.
1881  * Consider the following example:
1882  *
1883  * Backend 1: Backend 2:
1884  *
1885  * transaction starts
1886  * UPDATE foo SET ...;
1887  * NOTIFY foo;
1888  * commit starts
1889  * queue the notify message
1890  * transaction starts
1891  * LISTEN foo; -- first LISTEN in session
1892  * SELECT * FROM foo WHERE ...;
1893  * commit to clog
1894  * commit starts
1895  * add backend 2 to array of listeners
1896  * advance to queue head (this code)
1897  * commit to clog
1898  *
1899  * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1900  * wasn't committed yet. Ideally we'd ensure that client 2 would
1901  * eventually get transaction 1's notify message, but there's no way
1902  * to do that; until we're in the listener array, there's no guarantee
1903  * that the notify message doesn't get removed from the queue.
1904  *
1905  * Therefore the coding technique transaction 2 is using is unsafe:
1906  * applications must commit a LISTEN before inspecting database state,
1907  * if they want to ensure they will see notifications about subsequent
1908  * changes to that state.
1909  *
1910  * What we do guarantee is that we'll see all notifications from
1911  * transactions committing after the snapshot we take here.
1912  * Exec_ListenPreCommit has already added us to the listener array,
1913  * so no not-yet-committed messages can be removed from the queue
1914  * before we see them.
1915  *----------
1916  */
1917  snapshot = RegisterSnapshot(GetLatestSnapshot());
1918 
1919  /*
1920  * It is possible that we fail while trying to send a message to our
1921  * frontend (for example, because of encoding conversion failure). If
1922  * that happens it is critical that we not try to send the same message
1923  * over and over again. Therefore, we place a PG_TRY block here that will
1924  * forcibly advance our queue position before we lose control to an error.
1925  * (We could alternatively retake NotifyQueueLock and move the position
1926  * before handling each individual message, but that seems like too much
1927  * lock traffic.)
1928  */
1929  PG_TRY();
1930  {
1931  bool reachedStop;
1932 
1933  do
1934  {
1935  int curpage = QUEUE_POS_PAGE(pos);
1936  int curoffset = QUEUE_POS_OFFSET(pos);
1937  int slotno;
1938  int copysize;
1939 
1940  /*
1941  * We copy the data from SLRU into a local buffer, so as to avoid
1942  * holding the SLRU lock while we are examining the entries and
1943  * possibly transmitting them to our frontend. Copy only the part
1944  * of the page we will actually inspect.
1945  */
1946  slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
1948  if (curpage == QUEUE_POS_PAGE(head))
1949  {
1950  /* we only want to read as far as head */
1951  copysize = QUEUE_POS_OFFSET(head) - curoffset;
1952  if (copysize < 0)
1953  copysize = 0; /* just for safety */
1954  }
1955  else
1956  {
1957  /* fetch all the rest of the page */
1958  copysize = QUEUE_PAGESIZE - curoffset;
1959  }
1960  memcpy(page_buffer.buf + curoffset,
1961  NotifyCtl->shared->page_buffer[slotno] + curoffset,
1962  copysize);
1963  /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
1965 
1966  /*
1967  * Process messages up to the stop position, end of page, or an
1968  * uncommitted message.
1969  *
1970  * Our stop position is what we found to be the head's position
1971  * when we entered this function. It might have changed already.
1972  * But if it has, we will receive (or have already received and
1973  * queued) another signal and come here again.
1974  *
1975  * We are not holding NotifyQueueLock here! The queue can only
1976  * extend beyond the head pointer (see above) and we leave our
1977  * backend's pointer where it is so nobody will truncate or
1978  * rewrite pages under us. Especially we don't want to hold a lock
1979  * while sending the notifications to the frontend.
1980  */
1981  reachedStop = asyncQueueProcessPageEntries(&pos, head,
1982  page_buffer.buf,
1983  snapshot);
1984  } while (!reachedStop);
1985  }
1986  PG_FINALLY();
1987  {
1988  /* Update shared state */
1989  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1991  LWLockRelease(NotifyQueueLock);
1992  }
1993  PG_END_TRY();
1994 
1995  /* Done with snapshot */
1996  UnregisterSnapshot(snapshot);
1997 }
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2016
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_FINALLY(...)
Definition: elog.h:387
ProcNumber MyProcNumber
Definition: globals.c:87
@ LW_SHARED
Definition: lwlock.h:115
static char * buf
Definition: pg_test_fsync.c:73
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:591
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:291
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:836
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:794

References Assert, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1231 of file async.c.

1232 {
1233  Assert(listenChannels == NIL); /* else caller error */
1234 
1235  if (!amRegisteredListener) /* nothing to do */
1236  return;
1237 
1238  /*
1239  * Need exclusive lock here to manipulate list links.
1240  */
1241  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1242  /* Mark our entry as invalid */
1245  /* and remove it from the list */
1248  else
1249  {
1251  {
1253  {
1255  break;
1256  }
1257  }
1258  }
1260  LWLockRelease(NotifyQueueLock);
1261 
1262  /* mark ourselves as no longer listed in the global array */
1263  amRegisteredListener = false;
1264 }
static List * listenChannels
Definition: async.c:320
static bool amRegisteredListener
Definition: async.c:419
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:301

References amRegisteredListener, Assert, i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, listenChannels, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1506 of file async.c.

1507 {
1508  int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1509  int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1510  int occupied = headPage - tailPage;
1511 
1512  if (occupied == 0)
1513  return (double) 0; /* fast exit for common case */
1514 
1515  return (double) occupied / (double) max_notify_queue_pages;
1516 }

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 502 of file async.c.

503 {
504  bool found;
505  Size size;
506 
507  /*
508  * Create or attach to the AsyncQueueControl structure.
509  */
511  size = add_size(size, offsetof(AsyncQueueControl, backend));
512 
514  ShmemInitStruct("Async Queue Control", size, &found);
515 
516  if (!found)
517  {
518  /* First time through, so initialize it */
519  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
520  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
521  QUEUE_STOP_PAGE = 0;
524  for (int i = 0; i < MaxBackends; i++)
525  {
530  }
531  }
532 
533  /*
534  * Set up SLRU management of the pg_notify data. Note that long segment
535  * names are used in order to avoid wraparound.
536  */
537  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
538  SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
540  SYNC_HANDLER_NONE, true);
541 
542  if (!found)
543  {
544  /*
545  * During start or reboot, clean out the pg_notify directory.
546  */
548  }
549 }
size_t Size
Definition: c.h:605
int MaxBackends
Definition: globals.c:143
int notify_buffers
Definition: globals.c:165
@ LWTRANCHE_NOTIFY_SLRU
Definition: lwlock.h:213
@ LWTRANCHE_NOTIFY_BUFFER
Definition: lwlock.h:184
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:238
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1774
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1727
@ SYNC_HANDLER_NONE
Definition: sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), size, SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 485 of file async.c.

486 {
487  Size size;
488 
489  /* This had better match AsyncShmemInit */
491  size = add_size(size, offsetof(AsyncQueueControl, backend));
492 
494 
495  return size;
496 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:184

References add_size(), MaxBackends, mul_size(), notify_buffers, SimpleLruShmemSize(), and size.

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1671 of file async.c.

1672 {
1673  /*
1674  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1675  * we have registered as a listener but have not made any entry in
1676  * listenChannels. In that case, deregister again.
1677  */
1680 
1681  /* And clean up */
1683 }
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2378

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), listenChannels, and NIL.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 968 of file async.c.

969 {
970  ListCell *p;
971 
972  /*
973  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
974  * return as soon as possible
975  */
977  return;
978 
979  if (Trace_notify)
980  elog(DEBUG1, "AtCommit_Notify");
981 
982  /* Perform any pending listen/unlisten actions */
983  if (pendingActions != NULL)
984  {
985  foreach(p, pendingActions->actions)
986  {
987  ListenAction *actrec = (ListenAction *) lfirst(p);
988 
989  switch (actrec->action)
990  {
991  case LISTEN_LISTEN:
992  Exec_ListenCommit(actrec->channel);
993  break;
994  case LISTEN_UNLISTEN:
995  Exec_UnlistenCommit(actrec->channel);
996  break;
997  case LISTEN_UNLISTEN_ALL:
999  break;
1000  }
1001  }
1002  }
1003 
1004  /* If no longer listening to anything, get out of listener array */
1007 
1008  /*
1009  * Send signals to listening backends. We need do this only if there are
1010  * pending notifies, which were previously added to the shared queue by
1011  * PreCommit_Notify().
1012  */
1013  if (pendingNotifies != NULL)
1014  SignalBackends();
1015 
1016  /*
1017  * If it's time to try to advance the global tail pointer, do that.
1018  *
1019  * (It might seem odd to do this in the sender, when more than likely the
1020  * listeners won't yet have read the messages we just sent. However,
1021  * there's less contention if only the sender does it, and there is little
1022  * need for urgency in advancing the global tail. So this typically will
1023  * be clearing out messages that were sent some time ago.)
1024  */
1025  if (tryAdvanceTail)
1026  {
1027  tryAdvanceTail = false;
1029  }
1030 
1031  /* And clean up */
1033 }
static void SignalBackends(void)
Definition: async.c:1581
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1136
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1163
static void asyncQueueAdvanceTail(void)
Definition: async.c:2108
List * actions
Definition: async.c:348
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:342
ListenActionKind action
Definition: async.c:341

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 836 of file async.c.

837 {
838  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
840  ereport(ERROR,
841  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
843 }

References ereport, errcode(), errmsg(), ERROR, pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1761 of file async.c.

1762 {
1763  int my_level = GetCurrentTransactionNestLevel();
1764 
1765  /*
1766  * All we have to do is pop the stack --- the actions/notifies made in
1767  * this subxact are no longer interesting, and the space will be freed
1768  * when CurTransactionContext is recycled. We still have to free the
1769  * ActionList and NotificationList objects themselves, though, because
1770  * those are allocated in TopTransactionContext.
1771  *
1772  * Note that there might be no entries at all, or no entries for the
1773  * current subtransaction level, either because none were ever created, or
1774  * because we reentered this routine due to trouble during subxact abort.
1775  */
1776  while (pendingActions != NULL &&
1777  pendingActions->nestingLevel >= my_level)
1778  {
1779  ActionList *childPendingActions = pendingActions;
1780 
1782  pfree(childPendingActions);
1783  }
1784 
1785  while (pendingNotifies != NULL &&
1786  pendingNotifies->nestingLevel >= my_level)
1787  {
1788  NotificationList *childPendingNotifies = pendingNotifies;
1789 
1791  pfree(childPendingNotifies);
1792  }
1793 }
int nestingLevel
Definition: async.c:347
struct ActionList * upper
Definition: async.c:349

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1691 of file async.c.

1692 {
1693  int my_level = GetCurrentTransactionNestLevel();
1694 
1695  /* If there are actions at our nesting level, we must reparent them. */
1696  if (pendingActions != NULL &&
1697  pendingActions->nestingLevel >= my_level)
1698  {
1699  if (pendingActions->upper == NULL ||
1700  pendingActions->upper->nestingLevel < my_level - 1)
1701  {
1702  /* nothing to merge; give the whole thing to the parent */
1704  }
1705  else
1706  {
1707  ActionList *childPendingActions = pendingActions;
1708 
1710 
1711  /*
1712  * Mustn't try to eliminate duplicates here --- see queue_listen()
1713  */
1716  childPendingActions->actions);
1717  pfree(childPendingActions);
1718  }
1719  }
1720 
1721  /* If there are notifies at our nesting level, we must reparent them. */
1722  if (pendingNotifies != NULL &&
1723  pendingNotifies->nestingLevel >= my_level)
1724  {
1725  Assert(pendingNotifies->nestingLevel == my_level);
1726 
1727  if (pendingNotifies->upper == NULL ||
1728  pendingNotifies->upper->nestingLevel < my_level - 1)
1729  {
1730  /* nothing to merge; give the whole thing to the parent */
1732  }
1733  else
1734  {
1735  /*
1736  * Formerly, we didn't bother to eliminate duplicates here, but
1737  * now we must, else we fall foul of "Assert(!found)", either here
1738  * or during a later attempt to build the parent-level hashtable.
1739  */
1740  NotificationList *childPendingNotifies = pendingNotifies;
1741  ListCell *l;
1742 
1744  /* Insert all the subxact's events into parent, except for dups */
1745  foreach(l, childPendingNotifies->events)
1746  {
1747  Notification *childn = (Notification *) lfirst(l);
1748 
1749  if (!AsyncExistsPendingNotify(childn))
1750  AddEventToPendingNotifies(childn);
1751  }
1752  pfree(childPendingNotifies);
1753  }
1754  }
1755 }
List * list_concat(List *list1, const List *list2)
Definition: list.c:561

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ check_notify_buffers()

bool check_notify_buffers ( int *  newval,
void **  extra,
GucSource  source 
)

Definition at line 2394 of file async.c.

2395 {
2396  return check_slru_buffers("notify_buffers", newval);
2397 }
#define newval
bool check_slru_buffers(const char *name, int *newval)
Definition: slru.c:341

References check_slru_buffers(), and newval.

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2378 of file async.c.

2379 {
2380  /*
2381  * Everything's allocated in either TopTransactionContext or the context
2382  * for the subtransaction to which it corresponds. So, there's nothing to
2383  * do here except reset the pointers; the space will be reclaimed when the
2384  * contexts are deleted.
2385  */
2386  pendingActions = NULL;
2387  pendingNotifies = NULL;
2388 }

References pendingActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1136 of file async.c.

1137 {
1138  MemoryContext oldcontext;
1139 
1140  /* Do nothing if we are already listening on this channel */
1141  if (IsListeningOn(channel))
1142  return;
1143 
1144  /*
1145  * Add the new channel name to listenChannels.
1146  *
1147  * XXX It is theoretically possible to get an out-of-memory failure here,
1148  * which would be bad because we already committed. For the moment it
1149  * doesn't seem worth trying to guard against that, but maybe improve this
1150  * later.
1151  */
1154  MemoryContextSwitchTo(oldcontext);
1155 }
char * pstrdup(const char *in)
Definition: mcxt.c:1695
MemoryContext TopMemoryContext
Definition: mcxt.c:149

References IsListeningOn(), lappend(), listenChannels, MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1041 of file async.c.

1042 {
1043  QueuePosition head;
1044  QueuePosition max;
1045  ProcNumber prevListener;
1046 
1047  /*
1048  * Nothing to do if we are already listening to something, nor if we
1049  * already ran this routine in this transaction.
1050  */
1052  return;
1053 
1054  if (Trace_notify)
1055  elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1056 
1057  /*
1058  * Before registering, make sure we will unlisten before dying. (Note:
1059  * this action does not get undone if we abort later.)
1060  */
1062  {
1064  unlistenExitRegistered = true;
1065  }
1066 
1067  /*
1068  * This is our first LISTEN, so establish our pointer.
1069  *
1070  * We set our pointer to the global tail pointer and then move it forward
1071  * over already-committed notifications. This ensures we cannot miss any
1072  * not-yet-committed notifications. We might get a few more but that
1073  * doesn't hurt.
1074  *
1075  * In some scenarios there might be a lot of committed notifications that
1076  * have not yet been pruned away (because some backend is being lazy about
1077  * reading them). To reduce our startup time, we can look at other
1078  * backends and adopt the maximum "pos" pointer of any backend that's in
1079  * our database; any notifications it's already advanced over are surely
1080  * committed and need not be re-examined by us. (We must consider only
1081  * backends connected to our DB, because others will not have bothered to
1082  * check committed-ness of notifications in our DB.)
1083  *
1084  * We need exclusive lock here so we can look at other backends' entries
1085  * and manipulate the list links.
1086  */
1087  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1088  head = QUEUE_HEAD;
1089  max = QUEUE_TAIL;
1090  prevListener = INVALID_PROC_NUMBER;
1092  {
1094  max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1095  /* Also find last listening backend before this one */
1096  if (i < MyProcNumber)
1097  prevListener = i;
1098  }
1102  /* Insert backend into list of listeners at correct position */
1103  if (prevListener != INVALID_PROC_NUMBER)
1104  {
1106  QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
1107  }
1108  else
1109  {
1112  }
1113  LWLockRelease(NotifyQueueLock);
1114 
1115  /* Now we are listed in the global array, so remember we're listening */
1116  amRegisteredListener = true;
1117 
1118  /*
1119  * Try to move our pointer forward as far as possible. This will skip
1120  * over already-committed notifications, which we want to do because they
1121  * might be quite stale. Note that we are not yet listening on anything,
1122  * so we won't deliver such notifications to our frontend. Also, although
1123  * our transaction might have executed NOTIFY, those message(s) aren't
1124  * queued yet so we won't skip them here.
1125  */
1126  if (!QUEUE_POS_EQUAL(max, head))
1128 }
#define QUEUE_POS_MAX(x, y)
Definition: async.c:222
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1851
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:823
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1194 of file async.c.

1195 {
1196  if (Trace_notify)
1197  elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1198 
1200  listenChannels = NIL;
1201 }
void list_free_deep(List *list)
Definition: list.c:1560

References DEBUG1, elog, list_free_deep(), listenChannels, MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1163 of file async.c.

1164 {
1165  ListCell *q;
1166 
1167  if (Trace_notify)
1168  elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1169 
1170  foreach(q, listenChannels)
1171  {
1172  char *lchan = (char *) lfirst(q);
1173 
1174  if (strcmp(lchan, channel) == 0)
1175  {
1177  pfree(lchan);
1178  break;
1179  }
1180  }
1181 
1182  /*
1183  * We do not complain about unlistening something not being listened;
1184  * should we?
1185  */
1186 }
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391

References DEBUG1, elog, foreach_delete_current, lfirst, listenChannels, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1804 of file async.c.

1805 {
1806  /*
1807  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1808  * you do here.
1809  */
1810 
1811  /* signal that work needs to be done */
1812  notifyInterruptPending = true;
1813 
1814  /* make sure the event is processed in due course */
1815  SetLatch(MyLatch);
1816 }
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:413
struct Latch * MyLatch
Definition: globals.c:60
void SetLatch(Latch *latch)
Definition: latch.c:632

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1212 of file async.c.

1213 {
1214  ListCell *p;
1215 
1216  foreach(p, listenChannels)
1217  {
1218  char *lchan = (char *) lfirst(p);
1219 
1220  if (strcmp(lchan, channel) == 0)
1221  return true;
1222  }
1223  return false;
1224 }

References lfirst, and listenChannels.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2348 of file async.c.

2349 {
2350  const Notification *k = *(const Notification *const *) key;
2351 
2352  Assert(keysize == sizeof(Notification *));
2353  /* We don't bother to include the payload's trailing null in the hash */
2354  return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2355  k->channel_len + k->payload_len + 1));
2356 }
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), sort-test::key, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2362 of file async.c.

2363 {
2364  const Notification *k1 = *(const Notification *const *) key1;
2365  const Notification *k2 = *(const Notification *const *) key2;
2366 
2367  Assert(keysize == sizeof(Notification *));
2368  if (k1->channel_len == k2->channel_len &&
2369  k1->payload_len == k2->payload_len &&
2370  memcmp(k1->data, k2->data,
2371  k1->channel_len + k1->payload_len + 2) == 0)
2372  return 0; /* equal */
2373  return 1; /* not equal */
2374 }

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2224 of file async.c.

2225 {
2227  {
2229 
2231  pq_sendint32(&buf, srcPid);
2232  pq_sendstring(&buf, channel);
2233  pq_sendstring(&buf, payload);
2234  pq_endmessage(&buf);
2235 
2236  /*
2237  * NOTE: we do not do pq_flush() here. Some level of caller will
2238  * handle it later, allowing this message to be combined into a packet
2239  * with other ones.
2240  */
2241  }
2242  else
2243  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2244 }
@ DestRemote
Definition: dest.h:89
#define INFO
Definition: elog.h:34
CommandDest whereToSendOutput
Definition: postgres.c:90
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
#define PqMsg_NotificationResponse
Definition: protocol.h:41

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 790 of file async.c.

791 {
792  FuncCallContext *funcctx;
793 
794  /* stuff done only on the first call of the function */
795  if (SRF_IS_FIRSTCALL())
796  {
797  /* create a function context for cross-call persistence */
798  funcctx = SRF_FIRSTCALL_INIT();
799  }
800 
801  /* stuff done on every call of the function */
802  funcctx = SRF_PERCALL_SETUP();
803 
804  if (funcctx->call_cntr < list_length(listenChannels))
805  {
806  char *channel = (char *) list_nth(listenChannels,
807  funcctx->call_cntr);
808 
809  SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
810  }
811 
812  SRF_RETURN_DONE(funcctx);
813 }
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:328
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
uint64 call_cntr
Definition: funcapi.h:65

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), listenChannels, SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1481 of file async.c.

1482 {
1483  double usage;
1484 
1485  /* Advance the queue tail so we don't report a too-large result */
1487 
1488  LWLockAcquire(NotifyQueueLock, LW_SHARED);
1489  usage = asyncQueueUsage();
1490  LWLockRelease(NotifyQueueLock);
1491 
1493 }
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
static void usage(const char *progname)
Definition: vacuumlo.c:414

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 557 of file async.c.

558 {
559  const char *channel;
560  const char *payload;
561 
562  if (PG_ARGISNULL(0))
563  channel = "";
564  else
565  channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
566 
567  if (PG_ARGISNULL(1))
568  payload = "";
569  else
570  payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
571 
572  /* For NOTIFY as a statement, this is checked in ProcessUtility */
574 
575  Async_Notify(channel, payload);
576 
577  PG_RETURN_VOID();
578 }
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:591
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:441
char * text_to_cstring(const text *t)
Definition: varlena.c:217

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 861 of file async.c.

862 {
863  ListCell *p;
864 
866  return; /* no relevant statements in this xact */
867 
868  if (Trace_notify)
869  elog(DEBUG1, "PreCommit_Notify");
870 
871  /* Preflight for any pending listen/unlisten actions */
872  if (pendingActions != NULL)
873  {
874  foreach(p, pendingActions->actions)
875  {
876  ListenAction *actrec = (ListenAction *) lfirst(p);
877 
878  switch (actrec->action)
879  {
880  case LISTEN_LISTEN:
882  break;
883  case LISTEN_UNLISTEN:
884  /* there is no Exec_UnlistenPreCommit() */
885  break;
886  case LISTEN_UNLISTEN_ALL:
887  /* there is no Exec_UnlistenAllPreCommit() */
888  break;
889  }
890  }
891  }
892 
893  /* Queue any pending notifies (must happen after the above) */
894  if (pendingNotifies)
895  {
896  ListCell *nextNotify;
897 
898  /*
899  * Make sure that we have an XID assigned to the current transaction.
900  * GetCurrentTransactionId is cheap if we already have an XID, but not
901  * so cheap if we don't, and we'd prefer not to do that work while
902  * holding NotifyQueueLock.
903  */
904  (void) GetCurrentTransactionId();
905 
906  /*
907  * Serialize writers by acquiring a special lock that we hold till
908  * after commit. This ensures that queue entries appear in commit
909  * order, and in particular that there are never uncommitted queue
910  * entries ahead of committed ones, so an uncommitted transaction
911  * can't block delivery of deliverable notifications.
912  *
913  * We use a heavyweight lock so that it'll automatically be released
914  * after either commit or abort. This also allows deadlocks to be
915  * detected, though really a deadlock shouldn't be possible here.
916  *
917  * The lock is on "database 0", which is pretty ugly but it doesn't
918  * seem worth inventing a special locktag category just for this.
919  * (Historical note: before PG 9.0, a similar lock on "database 0" was
920  * used by the flatfiles mechanism.)
921  */
922  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
924 
925  /* Now push the notifications into the queue */
926  nextNotify = list_head(pendingNotifies->events);
927  while (nextNotify != NULL)
928  {
929  /*
930  * Add the pending notifications to the queue. We acquire and
931  * release NotifyQueueLock once per page, which might be overkill
932  * but it does allow readers to get in while we're doing this.
933  *
934  * A full queue is very uncommon and should really not happen,
935  * given that we have so much space available in the SLRU pages.
936  * Nevertheless we need to deal with this possibility. Note that
937  * when we get here we are in the process of committing our
938  * transaction, but we have not yet committed to clog, so at this
939  * point in time we can still roll the transaction back.
940  */
941  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
943  if (asyncQueueIsFull())
944  ereport(ERROR,
945  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
946  errmsg("too many notifications in the NOTIFY queue")));
947  nextNotify = asyncQueueAddEntries(nextNotify);
948  LWLockRelease(NotifyQueueLock);
949  }
950 
951  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
952  }
953 }
static void Exec_ListenPreCommit(void)
Definition: async.c:1041
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1356
static void asyncQueueFillWarning(void)
Definition: async.c:1527
static bool asyncQueueIsFull(void)
Definition: async.c:1272
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1083
#define AccessExclusiveLock
Definition: lockdefs.h:43
static ListCell * list_head(const List *l)
Definition: pg_list.h:128

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 2183 of file async.c.

2184 {
2185  /* We *must* reset the flag */
2186  notifyInterruptPending = false;
2187 
2188  /* Do nothing else if we aren't actively listening */
2189  if (listenChannels == NIL)
2190  return;
2191 
2192  if (Trace_notify)
2193  elog(DEBUG1, "ProcessIncomingNotify");
2194 
2195  set_ps_display("notify interrupt");
2196 
2197  /*
2198  * We must run asyncQueueReadAllNotifications inside a transaction, else
2199  * bad things happen if it gets an error.
2200  */
2202 
2204 
2206 
2207  /*
2208  * If this isn't an end-of-command case, we must flush the notify messages
2209  * to ensure frontend gets them promptly.
2210  */
2211  if (flush)
2212  pq_flush();
2213 
2214  set_ps_display("idle");
2215 
2216  if (Trace_notify)
2217  elog(DEBUG1, "ProcessIncomingNotify: done");
2218 }
#define pq_flush()
Definition: libpq.h:46
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void StartTransactionCommand(void)
Definition: xact.c:2995
void CommitTransactionCommand(void)
Definition: xact.c:3093

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, listenChannels, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1834 of file async.c.

1835 {
1837  return; /* not really idle */
1838 
1839  /* Loop in case another signal arrives while sending messages */
1840  while (notifyInterruptPending)
1841  ProcessIncomingNotify(flush);
1842 }
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2183
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4933

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 690 of file async.c.

691 {
692  MemoryContext oldcontext;
693  ListenAction *actrec;
694  int my_level = GetCurrentTransactionNestLevel();
695 
696  /*
697  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
698  * be too complicated to ensure we get the right interactions of
699  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
700  * would be any performance benefit anyway in sane applications.
701  */
703 
704  /* space for terminating null is included in sizeof(ListenAction) */
705  actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
706  strlen(channel) + 1);
707  actrec->action = action;
708  strcpy(actrec->channel, channel);
709 
710  if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
711  {
712  ActionList *actions;
713 
714  /*
715  * First action in current sub(xact). Note that we allocate the
716  * ActionList in TopTransactionContext; the nestingLevel might get
717  * changed later by AtSubCommit_Notify.
718  */
719  actions = (ActionList *)
721  actions->nestingLevel = my_level;
722  actions->actions = list_make1(actrec);
723  actions->upper = pendingActions;
724  pendingActions = actions;
725  }
726  else
728 
729  MemoryContextSwitchTo(oldcontext);
730 }

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1581 of file async.c.

1582 {
1583  int32 *pids;
1584  ProcNumber *procnos;
1585  int count;
1586 
1587  /*
1588  * Identify backends that we need to signal. We don't want to send
1589  * signals while holding the NotifyQueueLock, so this loop just builds a
1590  * list of target PIDs.
1591  *
1592  * XXX in principle these pallocs could fail, which would be bad. Maybe
1593  * preallocate the arrays? They're not that large, though.
1594  */
1595  pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1596  procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
1597  count = 0;
1598 
1599  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1601  {
1602  int32 pid = QUEUE_BACKEND_PID(i);
1603  QueuePosition pos;
1604 
1605  Assert(pid != InvalidPid);
1606  pos = QUEUE_BACKEND_POS(i);
1608  {
1609  /*
1610  * Always signal listeners in our own database, unless they're
1611  * already caught up (unlikely, but possible).
1612  */
1613  if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1614  continue;
1615  }
1616  else
1617  {
1618  /*
1619  * Listeners in other databases should be signaled only if they
1620  * are far behind.
1621  */
1624  continue;
1625  }
1626  /* OK, need to signal this one */
1627  pids[count] = pid;
1628  procnos[count] = i;
1629  count++;
1630  }
1631  LWLockRelease(NotifyQueueLock);
1632 
1633  /* Now send signals */
1634  for (int i = 0; i < count; i++)
1635  {
1636  int32 pid = pids[i];
1637 
1638  /*
1639  * If we are signaling our own process, no need to involve the kernel;
1640  * just set the flag directly.
1641  */
1642  if (pid == MyProcPid)
1643  {
1644  notifyInterruptPending = true;
1645  continue;
1646  }
1647 
1648  /*
1649  * Note: assuming things aren't broken, a signal failure here could
1650  * only occur if the target backend exited since we released
1651  * NotifyQueueLock; which is unlikely but certainly possible. So we
1652  * just log a low-level debug message if it happens.
1653  */
1654  if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
1655  elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1656  }
1657 
1658  pfree(pids);
1659  pfree(procnos);
1660 }
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition: async.c:466
#define DEBUG3
Definition: elog.h:28
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:257
@ PROCSIG_NOTIFY_INTERRUPT
Definition: procsignal.h:33

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, notifyInterruptPending, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 294 of file async.c.

Referenced by asyncQueueFillWarning(), and AsyncShmemInit().

◆ listenChannels

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 428 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 308 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

◆ pendingNotifies

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 422 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 416 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().