PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_NOTIFY_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 

Variables

bool Trace_notify
 
volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

◆ NUM_NOTIFY_BUFFERS

#define NUM_NOTIFY_BUFFERS   8

Definition at line 21 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 772 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

773 {
774  if (Trace_notify)
775  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
776 
777  queue_listen(LISTEN_LISTEN, channel);
778 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 625 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

626 {
627  int my_level = GetCurrentTransactionNestLevel();
628  size_t channel_len;
629  size_t payload_len;
630  Notification *n;
631  MemoryContext oldcontext;
632 
633  if (IsParallelWorker())
634  elog(ERROR, "cannot send notifications from a parallel worker");
635 
636  if (Trace_notify)
637  elog(DEBUG1, "Async_Notify(%s)", channel);
638 
639  channel_len = channel ? strlen(channel) : 0;
640  payload_len = payload ? strlen(payload) : 0;
641 
642  /* a channel name must be specified */
643  if (channel_len == 0)
644  ereport(ERROR,
645  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
646  errmsg("channel name cannot be empty")));
647 
648  /* enforce length limits */
649  if (channel_len >= NAMEDATALEN)
650  ereport(ERROR,
651  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
652  errmsg("channel name too long")));
653 
654  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
655  ereport(ERROR,
656  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
657  errmsg("payload string too long")));
658 
659  /*
660  * We must construct the Notification entry, even if we end up not using
661  * it, in order to compare it cheaply to existing list entries.
662  *
663  * The notification list needs to live until end of transaction, so store
664  * it in the transaction context.
665  */
667 
668  n = (Notification *) palloc(offsetof(Notification, data) +
669  channel_len + payload_len + 2);
670  n->channel_len = channel_len;
671  n->payload_len = payload_len;
672  strcpy(n->data, channel);
673  if (payload)
674  strcpy(n->data + channel_len + 1, payload);
675  else
676  n->data[channel_len + 1] = '\0';
677 
678  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
679  {
680  NotificationList *notifies;
681 
682  /*
683  * First notify event in current (sub)xact. Note that we allocate the
684  * NotificationList in TopTransactionContext; the nestingLevel might
685  * get changed later by AtSubCommit_Notify.
686  */
687  notifies = (NotificationList *)
689  sizeof(NotificationList));
690  notifies->nestingLevel = my_level;
691  notifies->events = list_make1(n);
692  /* We certainly don't need a hashtable yet */
693  notifies->hashtab = NULL;
694  notifies->upper = pendingNotifies;
695  pendingNotifies = notifies;
696  }
697  else
698  {
699  /* Now check for duplicates */
701  {
702  /* It's a dup, so forget it */
703  pfree(n);
704  MemoryContextSwitchTo(oldcontext);
705  return;
706  }
707 
708  /* Append more events to existing list */
710  }
711 
712  MemoryContextSwitchTo(oldcontext);
713 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:410
MemoryContext TopTransactionContext
Definition: mcxt.c:53
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:404
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:54
int errcode(int sqlerrcode)
Definition: elog.c:698
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2302
int nestingLevel
Definition: async.c:409
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:422
#define ERROR
Definition: elog.h:46
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2343
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:166
struct NotificationList * upper
Definition: async.c:412
#define IsParallelWorker()
Definition: parallel.h:61
#define ereport(elevel,...)
Definition: elog.h:157
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
HTAB * hashtab
Definition: async.c:411
uint16 channel_len
Definition: async.c:401
uint16 payload_len
Definition: async.c:402
bool Trace_notify
Definition: async.c:443
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define elog(elevel,...)
Definition: elog.h:232
#define offsetof(type, field)
Definition: c.h:727

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 786 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

787 {
788  if (Trace_notify)
789  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
790 
791  /* If we couldn't possibly be listening, no need to queue anything */
792  if (pendingActions == NULL && !unlistenExitRegistered)
793  return;
794 
795  queue_listen(LISTEN_UNLISTEN, channel);
796 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
static ActionList * pendingActions
Definition: async.c:370
static bool unlistenExitRegistered
Definition: async.c:434
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 804 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

805 {
806  if (Trace_notify)
807  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
808 
809  /* If we couldn't possibly be listening, no need to queue anything */
810  if (pendingActions == NULL && !unlistenExitRegistered)
811  return;
812 
814 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:724
static ActionList * pendingActions
Definition: async.c:370
static bool unlistenExitRegistered
Definition: async.c:434
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 533 of file async.c.

References add_size(), asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateSharedMemoryAndSemaphores().

534 {
535  bool found;
536  Size size;
537 
538  /*
539  * Create or attach to the AsyncQueueControl structure.
540  *
541  * The used entries in the backend[] array run from 1 to MaxBackends; the
542  * zero'th entry is unused but must be allocated.
543  */
544  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
545  size = add_size(size, offsetof(AsyncQueueControl, backend));
546 
548  ShmemInitStruct("Async Queue Control", size, &found);
549 
550  if (!found)
551  {
552  /* First time through, so initialize it */
553  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
554  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
555  QUEUE_STOP_PAGE = 0;
558  /* zero'th entry won't be used, but let's initialize it anyway */
559  for (int i = 0; i <= MaxBackends; i++)
560  {
565  }
566  }
567 
568  /*
569  * Set up SLRU management of the pg_notify data.
570  */
571  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
573  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
575 
576  if (!found)
577  {
578  /*
579  * During start or reboot, clean out the pg_notify directory.
580  */
582  }
583 }
#define QUEUE_TAIL
Definition: async.c:300
#define QUEUE_BACKEND_PID(i)
Definition: async.c:303
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1530
#define QUEUE_HEAD
Definition: async.c:299
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:187
#define NotifyCtl
Definition: async.c:313
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:302
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
int MaxBackends
Definition: globals.c:139
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:206
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:507
static AsyncQueueControl * asyncQueueControl
Definition: async.c:297
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:304
#define QUEUE_STOP_PAGE
Definition: async.c:301
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:305
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1553
size_t Size
Definition: c.h:540
int i
TimestampTz lastQueueFillWarn
Definition: async.c:292
#define QUEUE_BACKEND_POS(i)
Definition: async.c:306
#define offsetof(type, field)
Definition: c.h:727
#define InvalidPid
Definition: miscadmin.h:32

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 516 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

517 {
518  Size size;
519 
520  /* This had better match AsyncShmemInit */
521  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
522  size = add_size(size, offsetof(AsyncQueueControl, backend));
523 
525 
526  return size;
527 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:139
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1720 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1721 {
1722  /*
1723  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1724  * we have registered as a listener but have not made any entry in
1725  * listenChannels. In that case, deregister again.
1726  */
1729 
1730  /* And clean up */
1732 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:338
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2436
static void asyncQueueUnregister(void)
Definition: async.c:1265
static bool amRegisteredListener
Definition: async.c:437

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 1002 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

1003 {
1004  ListCell *p;
1005 
1006  /*
1007  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1008  * return as soon as possible
1009  */
1011  return;
1012 
1013  if (Trace_notify)
1014  elog(DEBUG1, "AtCommit_Notify");
1015 
1016  /* Perform any pending listen/unlisten actions */
1017  if (pendingActions != NULL)
1018  {
1019  foreach(p, pendingActions->actions)
1020  {
1021  ListenAction *actrec = (ListenAction *) lfirst(p);
1022 
1023  switch (actrec->action)
1024  {
1025  case LISTEN_LISTEN:
1026  Exec_ListenCommit(actrec->channel);
1027  break;
1028  case LISTEN_UNLISTEN:
1029  Exec_UnlistenCommit(actrec->channel);
1030  break;
1031  case LISTEN_UNLISTEN_ALL:
1033  break;
1034  }
1035  }
1036  }
1037 
1038  /* If no longer listening to anything, get out of listener array */
1041 
1042  /*
1043  * Send signals to listening backends. We need do this only if there are
1044  * pending notifies, which were previously added to the shared queue by
1045  * PreCommit_Notify().
1046  */
1047  if (pendingNotifies != NULL)
1048  SignalBackends();
1049 
1050  /*
1051  * If it's time to try to advance the global tail pointer, do that.
1052  *
1053  * (It might seem odd to do this in the sender, when more than likely the
1054  * listeners won't yet have read the messages we just sent. However,
1055  * there's less contention if only the sender does it, and there is little
1056  * need for urgency in advancing the global tail. So this typically will
1057  * be clearing out messages that were sent some time ago.)
1058  */
1059  if (tryAdvanceTail)
1060  {
1061  tryAdvanceTail = false;
1063  }
1064 
1065  /* And clean up */
1067 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:370
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1228
static List * listenChannels
Definition: async.c:338
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2436
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1197
static NotificationList * pendingNotifies
Definition: async.c:422
List * actions
Definition: async.c:366
static void asyncQueueAdvanceTail(void)
Definition: async.c:2157
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1170
static void SignalBackends(void)
Definition: async.c:1630
static void asyncQueueUnregister(void)
Definition: async.c:1265
static bool amRegisteredListener
Definition: async.c:437
#define lfirst(lc)
Definition: pg_list.h:169
bool Trace_notify
Definition: async.c:443
#define elog(elevel,...)
Definition: elog.h:232
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:360
ListenActionKind action
Definition: async.c:359
static bool tryAdvanceTail
Definition: async.c:440

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 870 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

871 {
872  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
874  ereport(ERROR,
875  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
876  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
877 }
static ActionList * pendingActions
Definition: async.c:370
int errcode(int sqlerrcode)
Definition: elog.c:698
static NotificationList * pendingNotifies
Definition: async.c:422
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1810 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1811 {
1812  int my_level = GetCurrentTransactionNestLevel();
1813 
1814  /*
1815  * All we have to do is pop the stack --- the actions/notifies made in
1816  * this subxact are no longer interesting, and the space will be freed
1817  * when CurTransactionContext is recycled. We still have to free the
1818  * ActionList and NotificationList objects themselves, though, because
1819  * those are allocated in TopTransactionContext.
1820  *
1821  * Note that there might be no entries at all, or no entries for the
1822  * current subtransaction level, either because none were ever created, or
1823  * because we reentered this routine due to trouble during subxact abort.
1824  */
1825  while (pendingActions != NULL &&
1826  pendingActions->nestingLevel >= my_level)
1827  {
1828  ActionList *childPendingActions = pendingActions;
1829 
1831  pfree(childPendingActions);
1832  }
1833 
1834  while (pendingNotifies != NULL &&
1835  pendingNotifies->nestingLevel >= my_level)
1836  {
1837  NotificationList *childPendingNotifies = pendingNotifies;
1838 
1840  pfree(childPendingNotifies);
1841  }
1842 }
static ActionList * pendingActions
Definition: async.c:370
int nestingLevel
Definition: async.c:409
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:422
struct NotificationList * upper
Definition: async.c:412
int nestingLevel
Definition: async.c:365
struct ActionList * upper
Definition: async.c:367
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1740 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1741 {
1742  int my_level = GetCurrentTransactionNestLevel();
1743 
1744  /* If there are actions at our nesting level, we must reparent them. */
1745  if (pendingActions != NULL &&
1746  pendingActions->nestingLevel >= my_level)
1747  {
1748  if (pendingActions->upper == NULL ||
1749  pendingActions->upper->nestingLevel < my_level - 1)
1750  {
1751  /* nothing to merge; give the whole thing to the parent */
1753  }
1754  else
1755  {
1756  ActionList *childPendingActions = pendingActions;
1757 
1759 
1760  /*
1761  * Mustn't try to eliminate duplicates here --- see queue_listen()
1762  */
1765  childPendingActions->actions);
1766  pfree(childPendingActions);
1767  }
1768  }
1769 
1770  /* If there are notifies at our nesting level, we must reparent them. */
1771  if (pendingNotifies != NULL &&
1772  pendingNotifies->nestingLevel >= my_level)
1773  {
1774  Assert(pendingNotifies->nestingLevel == my_level);
1775 
1776  if (pendingNotifies->upper == NULL ||
1777  pendingNotifies->upper->nestingLevel < my_level - 1)
1778  {
1779  /* nothing to merge; give the whole thing to the parent */
1781  }
1782  else
1783  {
1784  /*
1785  * Formerly, we didn't bother to eliminate duplicates here, but
1786  * now we must, else we fall foul of "Assert(!found)", either here
1787  * or during a later attempt to build the parent-level hashtable.
1788  */
1789  NotificationList *childPendingNotifies = pendingNotifies;
1790  ListCell *l;
1791 
1793  /* Insert all the subxact's events into parent, except for dups */
1794  foreach(l, childPendingNotifies->events)
1795  {
1796  Notification *childn = (Notification *) lfirst(l);
1797 
1798  if (!AsyncExistsPendingNotify(childn))
1799  AddEventToPendingNotifies(childn);
1800  }
1801  pfree(childPendingNotifies);
1802  }
1803  }
1804 }
List * events
Definition: async.c:410
static ActionList * pendingActions
Definition: async.c:370
List * list_concat(List *list1, const List *list2)
Definition: list.c:530
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2302
int nestingLevel
Definition: async.c:409
void pfree(void *pointer)
Definition: mcxt.c:1169
static NotificationList * pendingNotifies
Definition: async.c:422
List * actions
Definition: async.c:366
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2343
struct NotificationList * upper
Definition: async.c:412
int nestingLevel
Definition: async.c:365
struct ActionList * upper
Definition: async.c:367
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1853 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1854 {
1855  /*
1856  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1857  * you do here.
1858  */
1859 
1860  /* signal that work needs to be done */
1861  notifyInterruptPending = true;
1862 
1863  /* make sure the event is processed in due course */
1864  SetLatch(MyLatch);
1865 }
void SetLatch(Latch *latch)
Definition: latch.c:567
struct Latch * MyLatch
Definition: globals.c:57
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2278 of file async.c.

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2279 {
2281  {
2283 
2284  pq_beginmessage(&buf, 'A');
2285  pq_sendint32(&buf, srcPid);
2286  pq_sendstring(&buf, channel);
2287  pq_sendstring(&buf, payload);
2288  pq_endmessage(&buf);
2289 
2290  /*
2291  * NOTE: we do not do pq_flush() here. Some level of caller will
2292  * handle it later, allowing this message to be combined into a packet
2293  * with other ones.
2294  */
2295  }
2296  else
2297  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2298 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:68
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:232
CommandDest whereToSendOutput
Definition: postgres.c:92

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 895 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

896 {
897  ListCell *p;
898 
900  return; /* no relevant statements in this xact */
901 
902  if (Trace_notify)
903  elog(DEBUG1, "PreCommit_Notify");
904 
905  /* Preflight for any pending listen/unlisten actions */
906  if (pendingActions != NULL)
907  {
908  foreach(p, pendingActions->actions)
909  {
910  ListenAction *actrec = (ListenAction *) lfirst(p);
911 
912  switch (actrec->action)
913  {
914  case LISTEN_LISTEN:
916  break;
917  case LISTEN_UNLISTEN:
918  /* there is no Exec_UnlistenPreCommit() */
919  break;
920  case LISTEN_UNLISTEN_ALL:
921  /* there is no Exec_UnlistenAllPreCommit() */
922  break;
923  }
924  }
925  }
926 
927  /* Queue any pending notifies (must happen after the above) */
928  if (pendingNotifies)
929  {
930  ListCell *nextNotify;
931 
932  /*
933  * Make sure that we have an XID assigned to the current transaction.
934  * GetCurrentTransactionId is cheap if we already have an XID, but not
935  * so cheap if we don't, and we'd prefer not to do that work while
936  * holding NotifyQueueLock.
937  */
938  (void) GetCurrentTransactionId();
939 
940  /*
941  * Serialize writers by acquiring a special lock that we hold till
942  * after commit. This ensures that queue entries appear in commit
943  * order, and in particular that there are never uncommitted queue
944  * entries ahead of committed ones, so an uncommitted transaction
945  * can't block delivery of deliverable notifications.
946  *
947  * We use a heavyweight lock so that it'll automatically be released
948  * after either commit or abort. This also allows deadlocks to be
949  * detected, though really a deadlock shouldn't be possible here.
950  *
951  * The lock is on "database 0", which is pretty ugly but it doesn't
952  * seem worth inventing a special locktag category just for this.
953  * (Historical note: before PG 9.0, a similar lock on "database 0" was
954  * used by the flatfiles mechanism.)
955  */
956  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
958 
959  /* Now push the notifications into the queue */
960  nextNotify = list_head(pendingNotifies->events);
961  while (nextNotify != NULL)
962  {
963  /*
964  * Add the pending notifications to the queue. We acquire and
965  * release NotifyQueueLock once per page, which might be overkill
966  * but it does allow readers to get in while we're doing this.
967  *
968  * A full queue is very uncommon and should really not happen,
969  * given that we have so much space available in the SLRU pages.
970  * Nevertheless we need to deal with this possibility. Note that
971  * when we get here we are in the process of committing our
972  * transaction, but we have not yet committed to clog, so at this
973  * point in time we can still roll the transaction back.
974  */
975  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
977  if (asyncQueueIsFull())
978  ereport(ERROR,
979  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
980  errmsg("too many notifications in the NOTIFY queue")));
981  nextNotify = asyncQueueAddEntries(nextNotify);
982  LWLockRelease(NotifyQueueLock);
983  }
984 
985  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
986  }
987 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:410
static ActionList * pendingActions
Definition: async.c:370
int errcode(int sqlerrcode)
Definition: elog.c:698
static bool asyncQueueIsFull(void)
Definition: async.c:1306
static void asyncQueueFillWarning(void)
Definition: async.c:1576
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
static NotificationList * pendingNotifies
Definition: async.c:422
#define ERROR
Definition: elog.h:46
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
List * actions
Definition: async.c:366
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1017
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1408
#define lfirst(lc)
Definition: pg_list.h:169
static void Exec_ListenPreCommit(void)
Definition: async.c:1075
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
bool Trace_notify
Definition: async.c:443
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
ListenActionKind action
Definition: async.c:359

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1883 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1884 {
1886  return; /* not really idle */
1887 
1888  /* Loop in case another signal arrives while sending messages */
1889  while (notifyInterruptPending)
1890  ProcessIncomingNotify(flush);
1891 }
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2237
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4711
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:431

Variable Documentation

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending

◆ Trace_notify