PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_NOTIFY_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void ProcessCompletedNotifies (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 

Variables

bool Trace_notify
 
volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

◆ NUM_NOTIFY_BUFFERS

#define NUM_NOTIFY_BUFFERS   8

Definition at line 21 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 765 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

766 {
767  if (Trace_notify)
768  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
769 
770  queue_listen(LISTEN_LISTEN, channel);
771 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:41
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:717
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:227

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 618 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

619 {
620  int my_level = GetCurrentTransactionNestLevel();
621  size_t channel_len;
622  size_t payload_len;
623  Notification *n;
624  MemoryContext oldcontext;
625 
626  if (IsParallelWorker())
627  elog(ERROR, "cannot send notifications from a parallel worker");
628 
629  if (Trace_notify)
630  elog(DEBUG1, "Async_Notify(%s)", channel);
631 
632  channel_len = channel ? strlen(channel) : 0;
633  payload_len = payload ? strlen(payload) : 0;
634 
635  /* a channel name must be specified */
636  if (channel_len == 0)
637  ereport(ERROR,
638  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
639  errmsg("channel name cannot be empty")));
640 
641  /* enforce length limits */
642  if (channel_len >= NAMEDATALEN)
643  ereport(ERROR,
644  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
645  errmsg("channel name too long")));
646 
647  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
648  ereport(ERROR,
649  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
650  errmsg("payload string too long")));
651 
652  /*
653  * We must construct the Notification entry, even if we end up not using
654  * it, in order to compare it cheaply to existing list entries.
655  *
656  * The notification list needs to live until end of transaction, so store
657  * it in the transaction context.
658  */
660 
661  n = (Notification *) palloc(offsetof(Notification, data) +
662  channel_len + payload_len + 2);
663  n->channel_len = channel_len;
664  n->payload_len = payload_len;
665  strcpy(n->data, channel);
666  if (payload)
667  strcpy(n->data + channel_len + 1, payload);
668  else
669  n->data[channel_len + 1] = '\0';
670 
671  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
672  {
673  NotificationList *notifies;
674 
675  /*
676  * First notify event in current (sub)xact. Note that we allocate the
677  * NotificationList in TopTransactionContext; the nestingLevel might
678  * get changed later by AtSubCommit_Notify.
679  */
680  notifies = (NotificationList *)
682  sizeof(NotificationList));
683  notifies->nestingLevel = my_level;
684  notifies->events = list_make1(n);
685  /* We certainly don't need a hashtable yet */
686  notifies->hashtab = NULL;
687  notifies->upper = pendingNotifies;
688  pendingNotifies = notifies;
689  }
690  else
691  {
692  /* Now check for duplicates */
694  {
695  /* It's a dup, so forget it */
696  pfree(n);
697  MemoryContextSwitchTo(oldcontext);
698  return;
699  }
700 
701  /* Append more events to existing list */
703  }
704 
705  MemoryContextSwitchTo(oldcontext);
706 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:400
MemoryContext TopTransactionContext
Definition: mcxt.c:49
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:394
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:694
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2329
int nestingLevel
Definition: async.c:399
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1057
static NotificationList * pendingNotifies
Definition: async.c:412
#define ERROR
Definition: elog.h:45
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2370
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:402
#define IsParallelWorker()
Definition: parallel.h:61
#define ereport(elevel,...)
Definition: elog.h:155
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
HTAB * hashtab
Definition: async.c:401
uint16 channel_len
Definition: async.c:391
uint16 payload_len
Definition: async.c:392
bool Trace_notify
Definition: async.c:436
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:905
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
#define elog(elevel,...)
Definition: elog.h:227
#define offsetof(type, field)
Definition: c.h:727

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 779 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

780 {
781  if (Trace_notify)
782  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
783 
784  /* If we couldn't possibly be listening, no need to queue anything */
785  if (pendingActions == NULL && !unlistenExitRegistered)
786  return;
787 
788  queue_listen(LISTEN_UNLISTEN, channel);
789 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:41
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:717
static ActionList * pendingActions
Definition: async.c:360
static bool unlistenExitRegistered
Definition: async.c:424
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:227

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 797 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

798 {
799  if (Trace_notify)
800  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
801 
802  /* If we couldn't possibly be listening, no need to queue anything */
803  if (pendingActions == NULL && !unlistenExitRegistered)
804  return;
805 
807 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:41
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:717
static ActionList * pendingActions
Definition: async.c:360
static bool unlistenExitRegistered
Definition: async.c:424
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:227

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 526 of file async.c.

References add_size(), asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateSharedMemoryAndSemaphores().

527 {
528  bool found;
529  Size size;
530 
531  /*
532  * Create or attach to the AsyncQueueControl structure.
533  *
534  * The used entries in the backend[] array run from 1 to MaxBackends; the
535  * zero'th entry is unused but must be allocated.
536  */
537  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
538  size = add_size(size, offsetof(AsyncQueueControl, backend));
539 
541  ShmemInitStruct("Async Queue Control", size, &found);
542 
543  if (!found)
544  {
545  /* First time through, so initialize it */
546  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
547  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
548  QUEUE_STOP_PAGE = 0;
551  /* zero'th entry won't be used, but let's initialize it anyway */
552  for (int i = 0; i <= MaxBackends; i++)
553  {
558  }
559  }
560 
561  /*
562  * Set up SLRU management of the pg_notify data.
563  */
564  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
566  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
568 
569  if (!found)
570  {
571  /*
572  * During start or reboot, clean out the pg_notify directory.
573  */
575  }
576 }
#define QUEUE_TAIL
Definition: async.c:290
#define QUEUE_BACKEND_PID(i)
Definition: async.c:293
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1529
#define QUEUE_HEAD
Definition: async.c:289
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:186
#define NotifyCtl
Definition: async.c:303
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:292
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
int MaxBackends
Definition: globals.c:137
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:500
static AsyncQueueControl * asyncQueueControl
Definition: async.c:287
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:294
#define QUEUE_STOP_PAGE
Definition: async.c:291
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:295
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1552
size_t Size
Definition: c.h:540
int i
TimestampTz lastQueueFillWarn
Definition: async.c:282
#define QUEUE_BACKEND_POS(i)
Definition: async.c:296
#define offsetof(type, field)
Definition: c.h:727
#define InvalidPid
Definition: miscadmin.h:32

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 509 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

510 {
511  Size size;
512 
513  /* This had better match AsyncShmemInit */
514  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
515  size = add_size(size, offsetof(AsyncQueueControl, backend));
516 
518 
519  return size;
520 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:155
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:137
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1756 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1757 {
1758  /*
1759  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1760  * we have registered as a listener but have not made any entry in
1761  * listenChannels. In that case, deregister again.
1762  */
1765 
1766  /* And clean up */
1768 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:328
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2463
static void asyncQueueUnregister(void)
Definition: async.c:1309
static bool amRegisteredListener
Definition: async.c:427

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 990 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

991 {
992  ListCell *p;
993 
994  /*
995  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
996  * return as soon as possible
997  */
999  return;
1000 
1001  if (Trace_notify)
1002  elog(DEBUG1, "AtCommit_Notify");
1003 
1004  /* Perform any pending listen/unlisten actions */
1005  if (pendingActions != NULL)
1006  {
1007  foreach(p, pendingActions->actions)
1008  {
1009  ListenAction *actrec = (ListenAction *) lfirst(p);
1010 
1011  switch (actrec->action)
1012  {
1013  case LISTEN_LISTEN:
1014  Exec_ListenCommit(actrec->channel);
1015  break;
1016  case LISTEN_UNLISTEN:
1017  Exec_UnlistenCommit(actrec->channel);
1018  break;
1019  case LISTEN_UNLISTEN_ALL:
1021  break;
1022  }
1023  }
1024  }
1025 
1026  /* If no longer listening to anything, get out of listener array */
1029 
1030  /* And clean up */
1032 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:360
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1193
static List * listenChannels
Definition: async.c:328
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2463
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1162
static NotificationList * pendingNotifies
Definition: async.c:412
List * actions
Definition: async.c:356
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1135
static void asyncQueueUnregister(void)
Definition: async.c:1309
static bool amRegisteredListener
Definition: async.c:427
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:227
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:350
ListenActionKind action
Definition: async.c:349

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 863 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

864 {
865  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
867  ereport(ERROR,
868  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
869  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
870 }
static ActionList * pendingActions
Definition: async.c:360
int errcode(int sqlerrcode)
Definition: elog.c:694
static NotificationList * pendingNotifies
Definition: async.c:412
#define ERROR
Definition: elog.h:45
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg(const char *fmt,...)
Definition: elog.c:905

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1846 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1847 {
1848  int my_level = GetCurrentTransactionNestLevel();
1849 
1850  /*
1851  * All we have to do is pop the stack --- the actions/notifies made in
1852  * this subxact are no longer interesting, and the space will be freed
1853  * when CurTransactionContext is recycled. We still have to free the
1854  * ActionList and NotificationList objects themselves, though, because
1855  * those are allocated in TopTransactionContext.
1856  *
1857  * Note that there might be no entries at all, or no entries for the
1858  * current subtransaction level, either because none were ever created, or
1859  * because we reentered this routine due to trouble during subxact abort.
1860  */
1861  while (pendingActions != NULL &&
1862  pendingActions->nestingLevel >= my_level)
1863  {
1864  ActionList *childPendingActions = pendingActions;
1865 
1867  pfree(childPendingActions);
1868  }
1869 
1870  while (pendingNotifies != NULL &&
1871  pendingNotifies->nestingLevel >= my_level)
1872  {
1873  NotificationList *childPendingNotifies = pendingNotifies;
1874 
1876  pfree(childPendingNotifies);
1877  }
1878 }
static ActionList * pendingActions
Definition: async.c:360
int nestingLevel
Definition: async.c:399
void pfree(void *pointer)
Definition: mcxt.c:1057
static NotificationList * pendingNotifies
Definition: async.c:412
struct NotificationList * upper
Definition: async.c:402
int nestingLevel
Definition: async.c:355
struct ActionList * upper
Definition: async.c:357
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1776 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1777 {
1778  int my_level = GetCurrentTransactionNestLevel();
1779 
1780  /* If there are actions at our nesting level, we must reparent them. */
1781  if (pendingActions != NULL &&
1782  pendingActions->nestingLevel >= my_level)
1783  {
1784  if (pendingActions->upper == NULL ||
1785  pendingActions->upper->nestingLevel < my_level - 1)
1786  {
1787  /* nothing to merge; give the whole thing to the parent */
1789  }
1790  else
1791  {
1792  ActionList *childPendingActions = pendingActions;
1793 
1795 
1796  /*
1797  * Mustn't try to eliminate duplicates here --- see queue_listen()
1798  */
1801  childPendingActions->actions);
1802  pfree(childPendingActions);
1803  }
1804  }
1805 
1806  /* If there are notifies at our nesting level, we must reparent them. */
1807  if (pendingNotifies != NULL &&
1808  pendingNotifies->nestingLevel >= my_level)
1809  {
1810  Assert(pendingNotifies->nestingLevel == my_level);
1811 
1812  if (pendingNotifies->upper == NULL ||
1813  pendingNotifies->upper->nestingLevel < my_level - 1)
1814  {
1815  /* nothing to merge; give the whole thing to the parent */
1817  }
1818  else
1819  {
1820  /*
1821  * Formerly, we didn't bother to eliminate duplicates here, but
1822  * now we must, else we fall foul of "Assert(!found)", either here
1823  * or during a later attempt to build the parent-level hashtable.
1824  */
1825  NotificationList *childPendingNotifies = pendingNotifies;
1826  ListCell *l;
1827 
1829  /* Insert all the subxact's events into parent, except for dups */
1830  foreach(l, childPendingNotifies->events)
1831  {
1832  Notification *childn = (Notification *) lfirst(l);
1833 
1834  if (!AsyncExistsPendingNotify(childn))
1835  AddEventToPendingNotifies(childn);
1836  }
1837  pfree(childPendingNotifies);
1838  }
1839  }
1840 }
List * events
Definition: async.c:400
static ActionList * pendingActions
Definition: async.c:360
List * list_concat(List *list1, const List *list2)
Definition: list.c:530
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2329
int nestingLevel
Definition: async.c:399
void pfree(void *pointer)
Definition: mcxt.c:1057
static NotificationList * pendingNotifies
Definition: async.c:412
List * actions
Definition: async.c:356
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2370
struct NotificationList * upper
Definition: async.c:402
int nestingLevel
Definition: async.c:355
struct ActionList * upper
Definition: async.c:357
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1889 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1890 {
1891  /*
1892  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1893  * you do here.
1894  */
1895 
1896  /* signal that work needs to be done */
1897  notifyInterruptPending = true;
1898 
1899  /* make sure the event is processed in due course */
1900  SetLatch(MyLatch);
1901 }
void SetLatch(Latch *latch)
Definition: latch.c:567
struct Latch * MyLatch
Definition: globals.c:55
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:421

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2305 of file async.c.

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2306 {
2308  {
2310 
2311  pq_beginmessage(&buf, 'A');
2312  pq_sendint32(&buf, srcPid);
2313  pq_sendstring(&buf, channel);
2314  pq_sendstring(&buf, payload);
2315  pq_endmessage(&buf);
2316 
2317  /*
2318  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2319  * happen at the end of the transaction, and for incoming notifies
2320  * ProcessIncomingNotify will do it after finding all the notifies.
2321  */
2322  }
2323  else
2324  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2325 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:227
CommandDest whereToSendOutput
Definition: postgres.c:92

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 888 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

889 {
890  ListCell *p;
891 
893  return; /* no relevant statements in this xact */
894 
895  if (Trace_notify)
896  elog(DEBUG1, "PreCommit_Notify");
897 
898  /* Preflight for any pending listen/unlisten actions */
899  if (pendingActions != NULL)
900  {
901  foreach(p, pendingActions->actions)
902  {
903  ListenAction *actrec = (ListenAction *) lfirst(p);
904 
905  switch (actrec->action)
906  {
907  case LISTEN_LISTEN:
909  break;
910  case LISTEN_UNLISTEN:
911  /* there is no Exec_UnlistenPreCommit() */
912  break;
913  case LISTEN_UNLISTEN_ALL:
914  /* there is no Exec_UnlistenAllPreCommit() */
915  break;
916  }
917  }
918  }
919 
920  /* Queue any pending notifies (must happen after the above) */
921  if (pendingNotifies)
922  {
923  ListCell *nextNotify;
924 
925  /*
926  * Make sure that we have an XID assigned to the current transaction.
927  * GetCurrentTransactionId is cheap if we already have an XID, but not
928  * so cheap if we don't, and we'd prefer not to do that work while
929  * holding NotifyQueueLock.
930  */
931  (void) GetCurrentTransactionId();
932 
933  /*
934  * Serialize writers by acquiring a special lock that we hold till
935  * after commit. This ensures that queue entries appear in commit
936  * order, and in particular that there are never uncommitted queue
937  * entries ahead of committed ones, so an uncommitted transaction
938  * can't block delivery of deliverable notifications.
939  *
940  * We use a heavyweight lock so that it'll automatically be released
941  * after either commit or abort. This also allows deadlocks to be
942  * detected, though really a deadlock shouldn't be possible here.
943  *
944  * The lock is on "database 0", which is pretty ugly but it doesn't
945  * seem worth inventing a special locktag category just for this.
946  * (Historical note: before PG 9.0, a similar lock on "database 0" was
947  * used by the flatfiles mechanism.)
948  */
949  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
951 
952  /* Now push the notifications into the queue */
954 
955  nextNotify = list_head(pendingNotifies->events);
956  while (nextNotify != NULL)
957  {
958  /*
959  * Add the pending notifications to the queue. We acquire and
960  * release NotifyQueueLock once per page, which might be overkill
961  * but it does allow readers to get in while we're doing this.
962  *
963  * A full queue is very uncommon and should really not happen,
964  * given that we have so much space available in the SLRU pages.
965  * Nevertheless we need to deal with this possibility. Note that
966  * when we get here we are in the process of committing our
967  * transaction, but we have not yet committed to clog, so at this
968  * point in time we can still roll the transaction back.
969  */
970  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
972  if (asyncQueueIsFull())
973  ereport(ERROR,
974  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
975  errmsg("too many notifications in the NOTIFY queue")));
976  nextNotify = asyncQueueAddEntries(nextNotify);
977  LWLockRelease(NotifyQueueLock);
978  }
979  }
980 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:400
static ActionList * pendingActions
Definition: async.c:360
int errcode(int sqlerrcode)
Definition: elog.c:694
static bool asyncQueueIsFull(void)
Definition: async.c:1350
static void asyncQueueFillWarning(void)
Definition: async.c:1620
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
static NotificationList * pendingNotifies
Definition: async.c:412
#define ERROR
Definition: elog.h:45
static bool backendHasSentNotifications
Definition: async.c:430
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
List * actions
Definition: async.c:356
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1016
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1452
#define lfirst(lc)
Definition: pg_list.h:169
static void Exec_ListenPreCommit(void)
Definition: async.c:1040
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
bool Trace_notify
Definition: async.c:436
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:905
#define elog(elevel,...)
Definition: elog.h:227
ListenActionKind action
Definition: async.c:349

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1226 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1227 {
1228  MemoryContext caller_context;
1229 
1230  /* Nothing to do if we didn't send any notifications */
1232  return;
1233 
1234  /*
1235  * We reset the flag immediately; otherwise, if any sort of error occurs
1236  * below, we'd be locked up in an infinite loop, because control will come
1237  * right back here after error cleanup.
1238  */
1240 
1241  /*
1242  * We must preserve the caller's memory context (probably MessageContext)
1243  * across the transaction we do here.
1244  */
1245  caller_context = CurrentMemoryContext;
1246 
1247  if (Trace_notify)
1248  elog(DEBUG1, "ProcessCompletedNotifies");
1249 
1250  /*
1251  * We must run asyncQueueReadAllNotifications inside a transaction, else
1252  * bad things happen if it gets an error.
1253  */
1255 
1256  /* Send signals to other backends */
1257  SignalBackends();
1258 
1259  if (listenChannels != NIL)
1260  {
1261  /* Read the queue ourselves, and send relevant stuff to the frontend */
1263  }
1264 
1265  /*
1266  * If it's time to try to advance the global tail pointer, do that.
1267  */
1269  {
1270  backendTryAdvanceTail = false;
1272  }
1273 
1275 
1276  MemoryContextSwitchTo(caller_context);
1277 
1278  /* We don't need pq_flush() here since postgres.c will do one shortly */
1279 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2939
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:328
static bool backendHasSentNotifications
Definition: async.c:430
static bool backendTryAdvanceTail
Definition: async.c:433
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2185
static void SignalBackends(void)
Definition: async.c:1673
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1931
void StartTransactionCommand(void)
Definition: xact.c:2838
bool Trace_notify
Definition: async.c:436
#define elog(elevel,...)
Definition: elog.h:227

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1915 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1916 {
1918  return; /* not really idle */
1919 
1920  while (notifyInterruptPending)
1922 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
static void ProcessIncomingNotify(void)
Definition: async.c:2266
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:421

Variable Documentation

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending

◆ Trace_notify