PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_ASYNC_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubStart_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void ProcessCompletedNotifies (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 

Variables

bool Trace_notify
 
volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

◆ NUM_ASYNC_BUFFERS

#define NUM_ASYNC_BUFFERS   8

Definition at line 21 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 699 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

700 {
701  if (Trace_notify)
702  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
703 
704  queue_listen(LISTEN_LISTEN, channel);
705 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:669
bool Trace_notify
Definition: async.c:406
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 582 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextSwitchTo(), NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pfree(), and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

583 {
584  size_t channel_len;
585  size_t payload_len;
586  Notification *n;
587  MemoryContext oldcontext;
588 
589  if (IsParallelWorker())
590  elog(ERROR, "cannot send notifications from a parallel worker");
591 
592  if (Trace_notify)
593  elog(DEBUG1, "Async_Notify(%s)", channel);
594 
595  channel_len = channel ? strlen(channel) : 0;
596  payload_len = payload ? strlen(payload) : 0;
597 
598  /* a channel name must be specified */
599  if (channel_len == 0)
600  ereport(ERROR,
601  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
602  errmsg("channel name cannot be empty")));
603 
604  /* enforce length limits */
605  if (channel_len >= NAMEDATALEN)
606  ereport(ERROR,
607  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
608  errmsg("channel name too long")));
609 
610  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
611  ereport(ERROR,
612  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
613  errmsg("payload string too long")));
614 
615  /*
616  * We must construct the Notification entry, even if we end up not using
617  * it, in order to compare it cheaply to existing list entries.
618  *
619  * The notification list needs to live until end of transaction, so store
620  * it in the transaction context.
621  */
623 
624  n = (Notification *) palloc(offsetof(Notification, data) +
625  channel_len + payload_len + 2);
626  n->channel_len = channel_len;
627  n->payload_len = payload_len;
628  strcpy(n->data, channel);
629  if (payload)
630  strcpy(n->data + channel_len + 1, payload);
631  else
632  n->data[channel_len + 1] = '\0';
633 
634  /* Now check for duplicates */
636  {
637  /* It's a dup, so forget it */
638  pfree(n);
639  MemoryContextSwitchTo(oldcontext);
640  return;
641  }
642 
643  if (pendingNotifies == NULL)
644  {
645  /* First notify event in current (sub)xact */
648  /* We certainly don't need a hashtable yet */
649  pendingNotifies->hashtab = NULL;
650  }
651  else
652  {
653  /* Append more events to existing list */
655  }
656 
657  MemoryContextSwitchTo(oldcontext);
658 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:372
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:367
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:570
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2214
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1031
static NotificationList * pendingNotifies
Definition: async.c:383
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2255
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:153
#define ereport(elevel, rest)
Definition: elog.h:141
#define IsParallelWorker()
Definition: parallel.h:60
HTAB * hashtab
Definition: async.c:373
uint16 channel_len
Definition: async.c:364
uint16 payload_len
Definition: async.c:365
bool Trace_notify
Definition: async.c:406
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define offsetof(type, field)
Definition: c.h:655

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 713 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

714 {
715  if (Trace_notify)
716  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
717 
718  /* If we couldn't possibly be listening, no need to queue anything */
720  return;
721 
722  queue_listen(LISTEN_UNLISTEN, channel);
723 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:669
static bool unlistenExitRegistered
Definition: async.c:397
static List * pendingActions
Definition: async.c:331
bool Trace_notify
Definition: async.c:406
#define elog(elevel,...)
Definition: elog.h:226

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 731 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

732 {
733  if (Trace_notify)
734  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
735 
736  /* If we couldn't possibly be listening, no need to queue anything */
738  return;
739 
741 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:669
static bool unlistenExitRegistered
Definition: async.c:397
static List * pendingActions
Definition: async.c:331
bool Trace_notify
Definition: async.c:406
#define elog(elevel,...)
Definition: elog.h:226

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 482 of file async.c.

References add_size(), AsyncCtl, asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWTRANCHE_ASYNC_BUFFERS, MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

483 {
484  bool found;
485  int slotno;
486  Size size;
487 
488  /*
489  * Create or attach to the AsyncQueueControl structure.
490  *
491  * The used entries in the backend[] array run from 1 to MaxBackends; the
492  * zero'th entry is unused but must be allocated.
493  */
494  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
495  size = add_size(size, offsetof(AsyncQueueControl, backend));
496 
498  ShmemInitStruct("Async Queue Control", size, &found);
499 
500  if (!found)
501  {
502  /* First time through, so initialize it */
503  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
504  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
507  /* zero'th entry won't be used, but let's initialize it anyway */
508  for (int i = 0; i <= MaxBackends; i++)
509  {
514  }
515  }
516 
517  /*
518  * Set up SLRU management of the pg_notify data.
519  */
520  AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
522  AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
523  /* Override default assumption that writes should be fsync'd */
524  AsyncCtl->do_fsync = false;
525 
526  if (!found)
527  {
528  /*
529  * During start or reboot, clean out the pg_notify directory.
530  */
532 
533  /* Now initialize page zero to empty */
534  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
536  /* This write is just to verify that pg_notify/ is writable */
537  SimpleLruWritePage(AsyncCtl, slotno);
538  LWLockRelease(AsyncCtlLock);
539  }
540 }
#define QUEUE_TAIL
Definition: async.c:266
#define QUEUE_BACKEND_PID(i)
Definition: async.c:268
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1370
#define AsyncCtl
Definition: async.c:278
#define QUEUE_HEAD
Definition: async.c:265
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:267
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:135
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:193
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:578
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:442
static AsyncQueueControl * asyncQueueControl
Definition: async.c:263
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:269
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:270
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1393
size_t Size
Definition: c.h:466
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
TimestampTz lastQueueFillWarn
Definition: async.c:258
#define QUEUE_BACKEND_POS(i)
Definition: async.c:271
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:263
#define QUEUE_POS_PAGE(x)
Definition: async.c:190
#define offsetof(type, field)
Definition: c.h:655
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:165

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 465 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

466 {
467  Size size;
468 
469  /* This had better match AsyncShmemInit */
470  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
471  size = add_size(size, offsetof(AsyncQueueControl, backend));
472 
474 
475  return size;
476 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:145
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:135
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:655

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1658 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1659 {
1660  /*
1661  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1662  * we have registered as a listener but have not made any entry in
1663  * listenChannels. In that case, deregister again.
1664  */
1667 
1668  /* And clean up */
1670 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:306
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2349
static void asyncQueueUnregister(void)
Definition: async.c:1243
static bool amRegisteredListener
Definition: async.c:400

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 921 of file async.c.

References ListenAction::action, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

922 {
923  ListCell *p;
924 
925  /*
926  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
927  * return as soon as possible
928  */
930  return;
931 
932  if (Trace_notify)
933  elog(DEBUG1, "AtCommit_Notify");
934 
935  /* Perform any pending listen/unlisten actions */
936  foreach(p, pendingActions)
937  {
938  ListenAction *actrec = (ListenAction *) lfirst(p);
939 
940  switch (actrec->action)
941  {
942  case LISTEN_LISTEN:
943  Exec_ListenCommit(actrec->channel);
944  break;
945  case LISTEN_UNLISTEN:
946  Exec_UnlistenCommit(actrec->channel);
947  break;
948  case LISTEN_UNLISTEN_ALL:
950  break;
951  }
952  }
953 
954  /* If no longer listening to anything, get out of listener array */
957 
958  /* And clean up */
960 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1125
static List * listenChannels
Definition: async.c:306
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2349
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1094
static NotificationList * pendingNotifies
Definition: async.c:383
static List * pendingActions
Definition: async.c:331
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1067
static void asyncQueueUnregister(void)
Definition: async.c:1243
static bool amRegisteredListener
Definition: async.c:400
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:406
#define elog(elevel,...)
Definition: elog.h:226
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:328
ListenActionKind action
Definition: async.c:327

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 797 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

798 {
799  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
801  ereport(ERROR,
802  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
803  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
804 }
int errcode(int sqlerrcode)
Definition: elog.c:570
static NotificationList * pendingNotifies
Definition: async.c:383
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:331
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1765 of file async.c.

References GetCurrentTransactionNestLevel(), linitial, linitial_node, list_delete_first(), and list_length().

Referenced by AbortSubTransaction().

1766 {
1767  int my_level = GetCurrentTransactionNestLevel();
1768 
1769  /*
1770  * All we have to do is pop the stack --- the actions/notifies made in
1771  * this subxact are no longer interesting, and the space will be freed
1772  * when CurTransactionContext is recycled.
1773  *
1774  * This routine could be called more than once at a given nesting level if
1775  * there is trouble during subxact abort. Avoid dumping core by using
1776  * GetCurrentTransactionNestLevel as the indicator of how far we need to
1777  * prune the list.
1778  */
1779  while (list_length(upperPendingActions) > my_level - 2)
1780  {
1783  }
1784 
1785  while (list_length(upperPendingNotifies) > my_level - 2)
1786  {
1789  }
1790 }
#define linitial_node(type, l)
Definition: pg_list.h:198
static NotificationList * pendingNotifies
Definition: async.c:383
#define linitial(l)
Definition: pg_list.h:195
static List * pendingActions
Definition: async.c:331
static List * upperPendingActions
Definition: async.c:333
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
static int list_length(const List *l)
Definition: pg_list.h:169
static List * upperPendingNotifies
Definition: async.c:385
Definition: pg_list.h:50
List * list_delete_first(List *list)
Definition: list.c:857

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1708 of file async.c.

References AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, linitial, linitial_node, list_concat(), list_delete_first(), list_length(), and pendingNotifies.

Referenced by CommitSubTransaction().

1709 {
1710  List *parentPendingActions;
1711  NotificationList *parentPendingNotifies;
1712 
1713  parentPendingActions = linitial_node(List, upperPendingActions);
1715 
1718 
1719  /*
1720  * Mustn't try to eliminate duplicates here --- see queue_listen()
1721  */
1722  pendingActions = list_concat(parentPendingActions, pendingActions);
1723 
1724  parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
1726 
1729 
1730  if (pendingNotifies == NULL)
1731  {
1732  /* easy, no notify events happened in current subxact */
1733  pendingNotifies = parentPendingNotifies;
1734  }
1735  else if (parentPendingNotifies == NULL)
1736  {
1737  /* easy, subxact's list becomes parent's */
1738  }
1739  else
1740  {
1741  /*
1742  * Formerly, we didn't bother to eliminate duplicates here, but now we
1743  * must, else we fall foul of "Assert(!found)", either here or during
1744  * a later attempt to build the parent-level hashtable.
1745  */
1746  NotificationList *childPendingNotifies = pendingNotifies;
1747  ListCell *l;
1748 
1749  pendingNotifies = parentPendingNotifies;
1750  /* Insert all the subxact's events into parent, except for dups */
1751  foreach(l, childPendingNotifies->events)
1752  {
1753  Notification *childn = (Notification *) lfirst(l);
1754 
1755  if (!AsyncExistsPendingNotify(childn))
1756  AddEventToPendingNotifies(childn);
1757  }
1758  }
1759 }
List * events
Definition: async.c:372
List * list_concat(List *list1, const List *list2)
Definition: list.c:515
#define linitial_node(type, l)
Definition: pg_list.h:198
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2214
static NotificationList * pendingNotifies
Definition: async.c:383
#define linitial(l)
Definition: pg_list.h:195
static List * pendingActions
Definition: async.c:331
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2255
static List * upperPendingActions
Definition: async.c:333
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
static int list_length(const List *l)
Definition: pg_list.h:169
static List * upperPendingNotifies
Definition: async.c:385
Definition: pg_list.h:50
List * list_delete_first(List *list)
Definition: list.c:857

◆ AtSubStart_Notify()

void AtSubStart_Notify ( void  )

Definition at line 1678 of file async.c.

References Assert, GetCurrentTransactionNestLevel(), lcons(), list_length(), MemoryContextSwitchTo(), NIL, and TopTransactionContext.

Referenced by StartSubTransaction().

1679 {
1680  MemoryContext old_cxt;
1681 
1682  /* Keep the list-of-lists in TopTransactionContext for simplicity */
1684 
1686 
1689 
1690  pendingActions = NIL;
1691 
1693 
1696 
1697  pendingNotifies = NULL;
1698 
1699  MemoryContextSwitchTo(old_cxt);
1700 }
#define NIL
Definition: pg_list.h:65
MemoryContext TopTransactionContext
Definition: mcxt.c:49
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static NotificationList * pendingNotifies
Definition: async.c:383
static List * pendingActions
Definition: async.c:331
static List * upperPendingActions
Definition: async.c:333
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
List * lcons(void *datum, List *list)
Definition: list.c:453
#define Assert(condition)
Definition: c.h:732
static int list_length(const List *l)
Definition: pg_list.h:169
static List * upperPendingNotifies
Definition: async.c:385

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1801 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1802 {
1803  /*
1804  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1805  * you do here.
1806  */
1807 
1808  /* signal that work needs to be done */
1809  notifyInterruptPending = true;
1810 
1811  /* make sure the event is processed in due course */
1812  SetLatch(MyLatch);
1813 }
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:394

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2189 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2190 {
2192  {
2194 
2195  pq_beginmessage(&buf, 'A');
2196  pq_sendint32(&buf, srcPid);
2197  pq_sendstring(&buf, channel);
2199  pq_sendstring(&buf, payload);
2200  pq_endmessage(&buf);
2201 
2202  /*
2203  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2204  * happen at the end of the transaction, and for incoming notifies
2205  * ProcessIncomingNotify will do it after finding all the notifies.
2206  */
2207  }
2208  else
2209  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2210 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:226
CommandDest whereToSendOutput
Definition: postgres.c:90
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 822 of file async.c.

References AccessExclusiveLock, ListenAction::action, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

823 {
824  ListCell *p;
825 
827  return; /* no relevant statements in this xact */
828 
829  if (Trace_notify)
830  elog(DEBUG1, "PreCommit_Notify");
831 
832  /* Preflight for any pending listen/unlisten actions */
833  foreach(p, pendingActions)
834  {
835  ListenAction *actrec = (ListenAction *) lfirst(p);
836 
837  switch (actrec->action)
838  {
839  case LISTEN_LISTEN:
841  break;
842  case LISTEN_UNLISTEN:
843  /* there is no Exec_UnlistenPreCommit() */
844  break;
845  case LISTEN_UNLISTEN_ALL:
846  /* there is no Exec_UnlistenAllPreCommit() */
847  break;
848  }
849  }
850 
851  /* Queue any pending notifies (must happen after the above) */
852  if (pendingNotifies)
853  {
854  ListCell *nextNotify;
855 
856  /*
857  * Make sure that we have an XID assigned to the current transaction.
858  * GetCurrentTransactionId is cheap if we already have an XID, but not
859  * so cheap if we don't, and we'd prefer not to do that work while
860  * holding AsyncQueueLock.
861  */
862  (void) GetCurrentTransactionId();
863 
864  /*
865  * Serialize writers by acquiring a special lock that we hold till
866  * after commit. This ensures that queue entries appear in commit
867  * order, and in particular that there are never uncommitted queue
868  * entries ahead of committed ones, so an uncommitted transaction
869  * can't block delivery of deliverable notifications.
870  *
871  * We use a heavyweight lock so that it'll automatically be released
872  * after either commit or abort. This also allows deadlocks to be
873  * detected, though really a deadlock shouldn't be possible here.
874  *
875  * The lock is on "database 0", which is pretty ugly but it doesn't
876  * seem worth inventing a special locktag category just for this.
877  * (Historical note: before PG 9.0, a similar lock on "database 0" was
878  * used by the flatfiles mechanism.)
879  */
880  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
882 
883  /* Now push the notifications into the queue */
885 
886  nextNotify = list_head(pendingNotifies->events);
887  while (nextNotify != NULL)
888  {
889  /*
890  * Add the pending notifications to the queue. We acquire and
891  * release AsyncQueueLock once per page, which might be overkill
892  * but it does allow readers to get in while we're doing this.
893  *
894  * A full queue is very uncommon and should really not happen,
895  * given that we have so much space available in the SLRU pages.
896  * Nevertheless we need to deal with this possibility. Note that
897  * when we get here we are in the process of committing our
898  * transaction, but we have not yet committed to clog, so at this
899  * point in time we can still roll the transaction back.
900  */
901  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
903  if (asyncQueueIsFull())
904  ereport(ERROR,
905  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
906  errmsg("too many notifications in the NOTIFY queue")));
907  nextNotify = asyncQueueAddEntries(nextNotify);
908  LWLockRelease(AsyncQueueLock);
909  }
910  }
911 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:372
int errcode(int sqlerrcode)
Definition: elog.c:570
static bool asyncQueueIsFull(void)
Definition: async.c:1293
static void asyncQueueFillWarning(void)
Definition: async.c:1534
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static NotificationList * pendingNotifies
Definition: async.c:383
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:331
static bool backendHasSentNotifications
Definition: async.c:403
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
#define ereport(elevel, rest)
Definition: elog.h:141
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1004
#define InvalidOid
Definition: postgres_ext.h:36
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1395
#define lfirst(lc)
Definition: pg_list.h:190
static void Exec_ListenPreCommit(void)
Definition: async.c:968
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:406
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
ListenActionKind action
Definition: async.c:327

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1156 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1157 {
1158  MemoryContext caller_context;
1159  bool signalled;
1160 
1161  /* Nothing to do if we didn't send any notifications */
1163  return;
1164 
1165  /*
1166  * We reset the flag immediately; otherwise, if any sort of error occurs
1167  * below, we'd be locked up in an infinite loop, because control will come
1168  * right back here after error cleanup.
1169  */
1171 
1172  /*
1173  * We must preserve the caller's memory context (probably MessageContext)
1174  * across the transaction we do here.
1175  */
1176  caller_context = CurrentMemoryContext;
1177 
1178  if (Trace_notify)
1179  elog(DEBUG1, "ProcessCompletedNotifies");
1180 
1181  /*
1182  * We must run asyncQueueReadAllNotifications inside a transaction, else
1183  * bad things happen if it gets an error.
1184  */
1186 
1187  /* Send signals to other backends */
1188  signalled = SignalBackends();
1189 
1190  if (listenChannels != NIL)
1191  {
1192  /* Read the queue ourselves, and send relevant stuff to the frontend */
1194  }
1195  else if (!signalled)
1196  {
1197  /*
1198  * If we found no other listening backends, and we aren't listening
1199  * ourselves, then we must execute asyncQueueAdvanceTail to flush the
1200  * queue, because ain't nobody else gonna do it. This prevents queue
1201  * overflow when we're sending useless notifies to nobody. (A new
1202  * listener could have joined since we looked, but if so this is
1203  * harmless.)
1204  */
1206  }
1207 
1209 
1210  MemoryContextSwitchTo(caller_context);
1211 
1212  /* We don't need pq_flush() here since postgres.c will do one shortly */
1213 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2895
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:306
static bool backendHasSentNotifications
Definition: async.c:403
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2100
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1841
void StartTransactionCommand(void)
Definition: xact.c:2794
bool Trace_notify
Definition: async.c:406
#define elog(elevel,...)
Definition: elog.h:226
static bool SignalBackends(void)
Definition: async.c:1586

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1825 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by ProcessClientReadInterrupt().

1826 {
1828  return; /* not really idle */
1829 
1830  while (notifyInterruptPending)
1832 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4651
static void ProcessIncomingNotify(void)
Definition: async.c:2150
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:394

Variable Documentation

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending

◆ Trace_notify