PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_ASYNC_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void ProcessCompletedNotifies (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 

Variables

bool Trace_notify
 
volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

◆ NUM_ASYNC_BUFFERS

#define NUM_ASYNC_BUFFERS   8

Definition at line 21 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 762 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

763 {
764  if (Trace_notify)
765  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
766 
767  queue_listen(LISTEN_LISTEN, channel);
768 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 615 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

616 {
617  int my_level = GetCurrentTransactionNestLevel();
618  size_t channel_len;
619  size_t payload_len;
620  Notification *n;
621  MemoryContext oldcontext;
622 
623  if (IsParallelWorker())
624  elog(ERROR, "cannot send notifications from a parallel worker");
625 
626  if (Trace_notify)
627  elog(DEBUG1, "Async_Notify(%s)", channel);
628 
629  channel_len = channel ? strlen(channel) : 0;
630  payload_len = payload ? strlen(payload) : 0;
631 
632  /* a channel name must be specified */
633  if (channel_len == 0)
634  ereport(ERROR,
635  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
636  errmsg("channel name cannot be empty")));
637 
638  /* enforce length limits */
639  if (channel_len >= NAMEDATALEN)
640  ereport(ERROR,
641  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
642  errmsg("channel name too long")));
643 
644  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
645  ereport(ERROR,
646  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
647  errmsg("payload string too long")));
648 
649  /*
650  * We must construct the Notification entry, even if we end up not using
651  * it, in order to compare it cheaply to existing list entries.
652  *
653  * The notification list needs to live until end of transaction, so store
654  * it in the transaction context.
655  */
657 
658  n = (Notification *) palloc(offsetof(Notification, data) +
659  channel_len + payload_len + 2);
660  n->channel_len = channel_len;
661  n->payload_len = payload_len;
662  strcpy(n->data, channel);
663  if (payload)
664  strcpy(n->data + channel_len + 1, payload);
665  else
666  n->data[channel_len + 1] = '\0';
667 
668  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
669  {
670  NotificationList *notifies;
671 
672  /*
673  * First notify event in current (sub)xact. Note that we allocate the
674  * NotificationList in TopTransactionContext; the nestingLevel might
675  * get changed later by AtSubCommit_Notify.
676  */
677  notifies = (NotificationList *)
679  sizeof(NotificationList));
680  notifies->nestingLevel = my_level;
681  notifies->events = list_make1(n);
682  /* We certainly don't need a hashtable yet */
683  notifies->hashtab = NULL;
684  notifies->upper = pendingNotifies;
685  pendingNotifies = notifies;
686  }
687  else
688  {
689  /* Now check for duplicates */
691  {
692  /* It's a dup, so forget it */
693  pfree(n);
694  MemoryContextSwitchTo(oldcontext);
695  return;
696  }
697 
698  /* Append more events to existing list */
700  }
701 
702  MemoryContextSwitchTo(oldcontext);
703 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:394
MemoryContext TopTransactionContext
Definition: mcxt.c:49
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:608
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2280
int nestingLevel
Definition: async.c:393
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2321
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:396
#define ereport(elevel, rest)
Definition: elog.h:141
#define IsParallelWorker()
Definition: parallel.h:60
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
HTAB * hashtab
Definition: async.c:395
uint16 channel_len
Definition: async.c:385
uint16 payload_len
Definition: async.c:386
bool Trace_notify
Definition: async.c:430
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:228
#define offsetof(type, field)
Definition: c.h:662

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 776 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

777 {
778  if (Trace_notify)
779  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
780 
781  /* If we couldn't possibly be listening, no need to queue anything */
782  if (pendingActions == NULL && !unlistenExitRegistered)
783  return;
784 
785  queue_listen(LISTEN_UNLISTEN, channel);
786 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
static ActionList * pendingActions
Definition: async.c:354
static bool unlistenExitRegistered
Definition: async.c:418
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 794 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

795 {
796  if (Trace_notify)
797  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
798 
799  /* If we couldn't possibly be listening, no need to queue anything */
800  if (pendingActions == NULL && !unlistenExitRegistered)
801  return;
802 
804 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:714
static ActionList * pendingActions
Definition: async.c:354
static bool unlistenExitRegistered
Definition: async.c:418
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 515 of file async.c.

References add_size(), AsyncCtl, asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWTRANCHE_ASYNC_BUFFERS, MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

516 {
517  bool found;
518  int slotno;
519  Size size;
520 
521  /*
522  * Create or attach to the AsyncQueueControl structure.
523  *
524  * The used entries in the backend[] array run from 1 to MaxBackends; the
525  * zero'th entry is unused but must be allocated.
526  */
527  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
528  size = add_size(size, offsetof(AsyncQueueControl, backend));
529 
531  ShmemInitStruct("Async Queue Control", size, &found);
532 
533  if (!found)
534  {
535  /* First time through, so initialize it */
536  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
537  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
540  /* zero'th entry won't be used, but let's initialize it anyway */
541  for (int i = 0; i <= MaxBackends; i++)
542  {
547  }
548  }
549 
550  /*
551  * Set up SLRU management of the pg_notify data.
552  */
553  AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
555  AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
556  /* Override default assumption that writes should be fsync'd */
557  AsyncCtl->do_fsync = false;
558 
559  if (!found)
560  {
561  /*
562  * During start or reboot, clean out the pg_notify directory.
563  */
565 
566  /* Now initialize page zero to empty */
567  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
569  /* This write is just to verify that pg_notify/ is writable */
570  SimpleLruWritePage(AsyncCtl, slotno);
571  LWLockRelease(AsyncCtlLock);
572  }
573 }
#define QUEUE_TAIL
Definition: async.c:282
#define QUEUE_BACKEND_PID(i)
Definition: async.c:284
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1369
#define AsyncCtl
Definition: async.c:294
#define QUEUE_HEAD
Definition: async.c:281
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:283
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:135
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:577
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:489
static AsyncQueueControl * asyncQueueControl
Definition: async.c:279
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:285
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:286
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1392
size_t Size
Definition: c.h:467
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
TimestampTz lastQueueFillWarn
Definition: async.c:274
#define QUEUE_BACKEND_POS(i)
Definition: async.c:287
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:262
#define QUEUE_POS_PAGE(x)
Definition: async.c:193
#define offsetof(type, field)
Definition: c.h:662
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:164

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 498 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

499 {
500  Size size;
501 
502  /* This had better match AsyncShmemInit */
503  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
504  size = add_size(size, offsetof(AsyncQueueControl, backend));
505 
507 
508  return size;
509 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:144
#define NUM_ASYNC_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:135
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:467
#define offsetof(type, field)
Definition: c.h:662

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1736 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1737 {
1738  /*
1739  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1740  * we have registered as a listener but have not made any entry in
1741  * listenChannels. In that case, deregister again.
1742  */
1745 
1746  /* And clean up */
1748 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:322
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2415
static void asyncQueueUnregister(void)
Definition: async.c:1306
static bool amRegisteredListener
Definition: async.c:421

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 987 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

988 {
989  ListCell *p;
990 
991  /*
992  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
993  * return as soon as possible
994  */
996  return;
997 
998  if (Trace_notify)
999  elog(DEBUG1, "AtCommit_Notify");
1000 
1001  /* Perform any pending listen/unlisten actions */
1002  if (pendingActions != NULL)
1003  {
1004  foreach(p, pendingActions->actions)
1005  {
1006  ListenAction *actrec = (ListenAction *) lfirst(p);
1007 
1008  switch (actrec->action)
1009  {
1010  case LISTEN_LISTEN:
1011  Exec_ListenCommit(actrec->channel);
1012  break;
1013  case LISTEN_UNLISTEN:
1014  Exec_UnlistenCommit(actrec->channel);
1015  break;
1016  case LISTEN_UNLISTEN_ALL:
1018  break;
1019  }
1020  }
1021  }
1022 
1023  /* If no longer listening to anything, get out of listener array */
1026 
1027  /* And clean up */
1029 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:354
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1190
static List * listenChannels
Definition: async.c:322
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2415
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1159
static NotificationList * pendingNotifies
Definition: async.c:406
List * actions
Definition: async.c:350
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1132
static void asyncQueueUnregister(void)
Definition: async.c:1306
static bool amRegisteredListener
Definition: async.c:421
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:344
ListenActionKind action
Definition: async.c:343

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 860 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

861 {
862  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
864  ereport(ERROR,
865  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
866  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
867 }
static ActionList * pendingActions
Definition: async.c:354
int errcode(int sqlerrcode)
Definition: elog.c:608
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1826 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1827 {
1828  int my_level = GetCurrentTransactionNestLevel();
1829 
1830  /*
1831  * All we have to do is pop the stack --- the actions/notifies made in
1832  * this subxact are no longer interesting, and the space will be freed
1833  * when CurTransactionContext is recycled. We still have to free the
1834  * ActionList and NotificationList objects themselves, though, because
1835  * those are allocated in TopTransactionContext.
1836  *
1837  * Note that there might be no entries at all, or no entries for the
1838  * current subtransaction level, either because none were ever created, or
1839  * because we reentered this routine due to trouble during subxact abort.
1840  */
1841  while (pendingActions != NULL &&
1842  pendingActions->nestingLevel >= my_level)
1843  {
1844  ActionList *childPendingActions = pendingActions;
1845 
1847  pfree(childPendingActions);
1848  }
1849 
1850  while (pendingNotifies != NULL &&
1851  pendingNotifies->nestingLevel >= my_level)
1852  {
1853  NotificationList *childPendingNotifies = pendingNotifies;
1854 
1856  pfree(childPendingNotifies);
1857  }
1858 }
static ActionList * pendingActions
Definition: async.c:354
int nestingLevel
Definition: async.c:393
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
struct NotificationList * upper
Definition: async.c:396
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1756 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1757 {
1758  int my_level = GetCurrentTransactionNestLevel();
1759 
1760  /* If there are actions at our nesting level, we must reparent them. */
1761  if (pendingActions != NULL &&
1762  pendingActions->nestingLevel >= my_level)
1763  {
1764  if (pendingActions->upper == NULL ||
1765  pendingActions->upper->nestingLevel < my_level - 1)
1766  {
1767  /* nothing to merge; give the whole thing to the parent */
1769  }
1770  else
1771  {
1772  ActionList *childPendingActions = pendingActions;
1773 
1775 
1776  /*
1777  * Mustn't try to eliminate duplicates here --- see queue_listen()
1778  */
1781  childPendingActions->actions);
1782  pfree(childPendingActions);
1783  }
1784  }
1785 
1786  /* If there are notifies at our nesting level, we must reparent them. */
1787  if (pendingNotifies != NULL &&
1788  pendingNotifies->nestingLevel >= my_level)
1789  {
1790  Assert(pendingNotifies->nestingLevel == my_level);
1791 
1792  if (pendingNotifies->upper == NULL ||
1793  pendingNotifies->upper->nestingLevel < my_level - 1)
1794  {
1795  /* nothing to merge; give the whole thing to the parent */
1797  }
1798  else
1799  {
1800  /*
1801  * Formerly, we didn't bother to eliminate duplicates here, but
1802  * now we must, else we fall foul of "Assert(!found)", either here
1803  * or during a later attempt to build the parent-level hashtable.
1804  */
1805  NotificationList *childPendingNotifies = pendingNotifies;
1806  ListCell *l;
1807 
1809  /* Insert all the subxact's events into parent, except for dups */
1810  foreach(l, childPendingNotifies->events)
1811  {
1812  Notification *childn = (Notification *) lfirst(l);
1813 
1814  if (!AsyncExistsPendingNotify(childn))
1815  AddEventToPendingNotifies(childn);
1816  }
1817  pfree(childPendingNotifies);
1818  }
1819  }
1820 }
List * events
Definition: async.c:394
static ActionList * pendingActions
Definition: async.c:354
List * list_concat(List *list1, const List *list2)
Definition: list.c:516
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2280
int nestingLevel
Definition: async.c:393
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:406
List * actions
Definition: async.c:350
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2321
struct NotificationList * upper
Definition: async.c:396
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1869 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1870 {
1871  /*
1872  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1873  * you do here.
1874  */
1875 
1876  /* signal that work needs to be done */
1877  notifyInterruptPending = true;
1878 
1879  /* make sure the event is processed in due course */
1880  SetLatch(MyLatch);
1881 }
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2255 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2256 {
2258  {
2260 
2261  pq_beginmessage(&buf, 'A');
2262  pq_sendint32(&buf, srcPid);
2263  pq_sendstring(&buf, channel);
2265  pq_sendstring(&buf, payload);
2266  pq_endmessage(&buf);
2267 
2268  /*
2269  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2270  * happen at the end of the transaction, and for incoming notifies
2271  * ProcessIncomingNotify will do it after finding all the notifies.
2272  */
2273  }
2274  else
2275  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2276 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:228
CommandDest whereToSendOutput
Definition: postgres.c:89
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 885 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

886 {
887  ListCell *p;
888 
890  return; /* no relevant statements in this xact */
891 
892  if (Trace_notify)
893  elog(DEBUG1, "PreCommit_Notify");
894 
895  /* Preflight for any pending listen/unlisten actions */
896  if (pendingActions != NULL)
897  {
898  foreach(p, pendingActions->actions)
899  {
900  ListenAction *actrec = (ListenAction *) lfirst(p);
901 
902  switch (actrec->action)
903  {
904  case LISTEN_LISTEN:
906  break;
907  case LISTEN_UNLISTEN:
908  /* there is no Exec_UnlistenPreCommit() */
909  break;
910  case LISTEN_UNLISTEN_ALL:
911  /* there is no Exec_UnlistenAllPreCommit() */
912  break;
913  }
914  }
915  }
916 
917  /* Queue any pending notifies (must happen after the above) */
918  if (pendingNotifies)
919  {
920  ListCell *nextNotify;
921 
922  /*
923  * Make sure that we have an XID assigned to the current transaction.
924  * GetCurrentTransactionId is cheap if we already have an XID, but not
925  * so cheap if we don't, and we'd prefer not to do that work while
926  * holding AsyncQueueLock.
927  */
928  (void) GetCurrentTransactionId();
929 
930  /*
931  * Serialize writers by acquiring a special lock that we hold till
932  * after commit. This ensures that queue entries appear in commit
933  * order, and in particular that there are never uncommitted queue
934  * entries ahead of committed ones, so an uncommitted transaction
935  * can't block delivery of deliverable notifications.
936  *
937  * We use a heavyweight lock so that it'll automatically be released
938  * after either commit or abort. This also allows deadlocks to be
939  * detected, though really a deadlock shouldn't be possible here.
940  *
941  * The lock is on "database 0", which is pretty ugly but it doesn't
942  * seem worth inventing a special locktag category just for this.
943  * (Historical note: before PG 9.0, a similar lock on "database 0" was
944  * used by the flatfiles mechanism.)
945  */
946  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
948 
949  /* Now push the notifications into the queue */
951 
952  nextNotify = list_head(pendingNotifies->events);
953  while (nextNotify != NULL)
954  {
955  /*
956  * Add the pending notifications to the queue. We acquire and
957  * release AsyncQueueLock once per page, which might be overkill
958  * but it does allow readers to get in while we're doing this.
959  *
960  * A full queue is very uncommon and should really not happen,
961  * given that we have so much space available in the SLRU pages.
962  * Nevertheless we need to deal with this possibility. Note that
963  * when we get here we are in the process of committing our
964  * transaction, but we have not yet committed to clog, so at this
965  * point in time we can still roll the transaction back.
966  */
967  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
969  if (asyncQueueIsFull())
970  ereport(ERROR,
971  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
972  errmsg("too many notifications in the NOTIFY queue")));
973  nextNotify = asyncQueueAddEntries(nextNotify);
974  LWLockRelease(AsyncQueueLock);
975  }
976  }
977 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:394
static ActionList * pendingActions
Definition: async.c:354
int errcode(int sqlerrcode)
Definition: elog.c:608
static bool asyncQueueIsFull(void)
Definition: async.c:1347
static void asyncQueueFillWarning(void)
Definition: async.c:1600
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static NotificationList * pendingNotifies
Definition: async.c:406
#define ERROR
Definition: elog.h:43
static bool backendHasSentNotifications
Definition: async.c:424
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:422
List * actions
Definition: async.c:350
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
#define ereport(elevel, rest)
Definition: elog.h:141
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1449
#define lfirst(lc)
Definition: pg_list.h:190
static void Exec_ListenPreCommit(void)
Definition: async.c:1037
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool Trace_notify
Definition: async.c:430
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
ListenActionKind action
Definition: async.c:343

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1223 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1224 {
1225  MemoryContext caller_context;
1226 
1227  /* Nothing to do if we didn't send any notifications */
1229  return;
1230 
1231  /*
1232  * We reset the flag immediately; otherwise, if any sort of error occurs
1233  * below, we'd be locked up in an infinite loop, because control will come
1234  * right back here after error cleanup.
1235  */
1237 
1238  /*
1239  * We must preserve the caller's memory context (probably MessageContext)
1240  * across the transaction we do here.
1241  */
1242  caller_context = CurrentMemoryContext;
1243 
1244  if (Trace_notify)
1245  elog(DEBUG1, "ProcessCompletedNotifies");
1246 
1247  /*
1248  * We must run asyncQueueReadAllNotifications inside a transaction, else
1249  * bad things happen if it gets an error.
1250  */
1252 
1253  /* Send signals to other backends */
1254  SignalBackends();
1255 
1256  if (listenChannels != NIL)
1257  {
1258  /* Read the queue ourselves, and send relevant stuff to the frontend */
1260  }
1261 
1262  /*
1263  * If it's time to try to advance the global tail pointer, do that.
1264  */
1266  {
1267  backendTryAdvanceTail = false;
1269  }
1270 
1272 
1273  MemoryContextSwitchTo(caller_context);
1274 
1275  /* We don't need pq_flush() here since postgres.c will do one shortly */
1276 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2898
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:322
static bool backendHasSentNotifications
Definition: async.c:424
static bool backendTryAdvanceTail
Definition: async.c:427
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2166
static void SignalBackends(void)
Definition: async.c:1653
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1911
void StartTransactionCommand(void)
Definition: xact.c:2797
bool Trace_notify
Definition: async.c:430
#define elog(elevel,...)
Definition: elog.h:228

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1895 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1896 {
1898  return; /* not really idle */
1899 
1900  while (notifyInterruptPending)
1902 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
static void ProcessIncomingNotify(void)
Definition: async.c:2216
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415

Variable Documentation

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending

◆ Trace_notify