PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_NOTIFY_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 

Variables

PGDLLIMPORT bool Trace_notify
 
PGDLLIMPORT int max_notify_queue_pages
 
PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

◆ NUM_NOTIFY_BUFFERS

#define NUM_NOTIFY_BUFFERS   8

Definition at line 21 of file async.h.

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 744 of file async.c.

745 {
746  if (Trace_notify)
747  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
748 
749  queue_listen(LISTEN_LISTEN, channel);
750 }
@ LISTEN_LISTEN
Definition: async.c:336
bool Trace_notify
Definition: async.c:427
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:696
#define DEBUG1
Definition: elog.h:30
int MyProcPid
Definition: globals.c:45

References DEBUG1, elog(), LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 597 of file async.c.

598 {
599  int my_level = GetCurrentTransactionNestLevel();
600  size_t channel_len;
601  size_t payload_len;
602  Notification *n;
603  MemoryContext oldcontext;
604 
605  if (IsParallelWorker())
606  elog(ERROR, "cannot send notifications from a parallel worker");
607 
608  if (Trace_notify)
609  elog(DEBUG1, "Async_Notify(%s)", channel);
610 
611  channel_len = channel ? strlen(channel) : 0;
612  payload_len = payload ? strlen(payload) : 0;
613 
614  /* a channel name must be specified */
615  if (channel_len == 0)
616  ereport(ERROR,
617  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
618  errmsg("channel name cannot be empty")));
619 
620  /* enforce length limits */
621  if (channel_len >= NAMEDATALEN)
622  ereport(ERROR,
623  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
624  errmsg("channel name too long")));
625 
626  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
627  ereport(ERROR,
628  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629  errmsg("payload string too long")));
630 
631  /*
632  * We must construct the Notification entry, even if we end up not using
633  * it, in order to compare it cheaply to existing list entries.
634  *
635  * The notification list needs to live until end of transaction, so store
636  * it in the transaction context.
637  */
639 
640  n = (Notification *) palloc(offsetof(Notification, data) +
641  channel_len + payload_len + 2);
642  n->channel_len = channel_len;
643  n->payload_len = payload_len;
644  strcpy(n->data, channel);
645  if (payload)
646  strcpy(n->data + channel_len + 1, payload);
647  else
648  n->data[channel_len + 1] = '\0';
649 
650  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
651  {
652  NotificationList *notifies;
653 
654  /*
655  * First notify event in current (sub)xact. Note that we allocate the
656  * NotificationList in TopTransactionContext; the nestingLevel might
657  * get changed later by AtSubCommit_Notify.
658  */
659  notifies = (NotificationList *)
661  sizeof(NotificationList));
662  notifies->nestingLevel = my_level;
663  notifies->events = list_make1(n);
664  /* We certainly don't need a hashtable yet */
665  notifies->hashtab = NULL;
666  notifies->upper = pendingNotifies;
667  pendingNotifies = notifies;
668  }
669  else
670  {
671  /* Now check for duplicates */
673  {
674  /* It's a dup, so forget it */
675  pfree(n);
676  MemoryContextSwitchTo(oldcontext);
677  return;
678  }
679 
680  /* Append more events to existing list */
682  }
683 
684  MemoryContextSwitchTo(oldcontext);
685 }
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2240
static NotificationList * pendingNotifies
Definition: async.c:406
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2281
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:165
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsParallelWorker()
Definition: parallel.h:61
MemoryContext TopTransactionContext
Definition: mcxt.c:146
void pfree(void *pointer)
Definition: mcxt.c:1431
MemoryContext CurTransactionContext
Definition: mcxt.c:147
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1034
void * palloc(Size size)
Definition: mcxt.c:1201
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define NAMEDATALEN
const void * data
#define list_make1(x1)
Definition: pg_list.h:212
int nestingLevel
Definition: async.c:393
HTAB * hashtab
Definition: async.c:395
List * events
Definition: async.c:394
struct NotificationList * upper
Definition: async.c:396
uint16 payload_len
Definition: async.c:386
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:388
uint16 channel_len
Definition: async.c:385
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:914

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog(), ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 758 of file async.c.

759 {
760  if (Trace_notify)
761  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
762 
763  /* If we couldn't possibly be listening, no need to queue anything */
764  if (pendingActions == NULL && !unlistenExitRegistered)
765  return;
766 
767  queue_listen(LISTEN_UNLISTEN, channel);
768 }
static ActionList * pendingActions
Definition: async.c:354
@ LISTEN_UNLISTEN
Definition: async.c:337
static bool unlistenExitRegistered
Definition: async.c:418

References DEBUG1, elog(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 776 of file async.c.

777 {
778  if (Trace_notify)
779  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
780 
781  /* If we couldn't possibly be listening, no need to queue anything */
782  if (pendingActions == NULL && !unlistenExitRegistered)
783  return;
784 
786 }
@ LISTEN_UNLISTEN_ALL
Definition: async.c:338

References DEBUG1, elog(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 504 of file async.c.

505 {
506  bool found;
507  Size size;
508 
509  /*
510  * Create or attach to the AsyncQueueControl structure.
511  *
512  * The used entries in the backend[] array run from 1 to MaxBackends; the
513  * zero'th entry is unused but must be allocated.
514  */
515  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
516  size = add_size(size, offsetof(AsyncQueueControl, backend));
517 
519  ShmemInitStruct("Async Queue Control", size, &found);
520 
521  if (!found)
522  {
523  /* First time through, so initialize it */
524  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
525  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
526  QUEUE_STOP_PAGE = 0;
529  /* zero'th entry won't be used, but let's initialize it anyway */
530  for (int i = 0; i <= MaxBackends; i++)
531  {
536  }
537  }
538 
539  /*
540  * Set up SLRU management of the pg_notify data. Note that long segment
541  * names are used in order to avoid wraparound.
542  */
543  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
545  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
546  SYNC_HANDLER_NONE, true);
547 
548  if (!found)
549  {
550  /*
551  * During start or reboot, clean out the pg_notify directory.
552  */
554  }
555 }
#define QUEUE_FIRST_LISTENER
Definition: async.c:301
#define QUEUE_BACKEND_POS(i)
Definition: async.c:305
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:205
static AsyncQueueControl * asyncQueueControl
Definition: async.c:296
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition: async.c:478
#define QUEUE_TAIL
Definition: async.c:299
#define QUEUE_BACKEND_PID(i)
Definition: async.c:302
#define NotifyCtl
Definition: async.c:312
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:304
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:303
#define QUEUE_HEAD
Definition: async.c:298
#define QUEUE_STOP_PAGE
Definition: async.c:300
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define InvalidBackendId
Definition: backendid.h:23
size_t Size
Definition: c.h:594
int MaxBackends
Definition: globals.c:143
int i
Definition: isn.c:73
@ LWTRANCHE_NOTIFY_BUFFER
Definition: lwlock.h:186
#define InvalidPid
Definition: miscadmin.h:32
#define InvalidOid
Definition: postgres_ext.h:36
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:215
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1624
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1577
TimestampTz lastQueueFillWarn
Definition: async.c:291
@ SYNC_HANDLER_NONE
Definition: sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 487 of file async.c.

488 {
489  Size size;
490 
491  /* This had better match AsyncShmemInit */
492  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
493  size = add_size(size, offsetof(AsyncQueueControl, backend));
494 
496 
497  return size;
498 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:183

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1663 of file async.c.

1664 {
1665  /*
1666  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1667  * we have registered as a listener but have not made any entry in
1668  * listenChannels. In that case, deregister again.
1669  */
1672 
1673  /* And clean up */
1675 }
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2370
static List * listenChannels
Definition: async.c:322
static bool amRegisteredListener
Definition: async.c:421
static void asyncQueueUnregister(void)
Definition: async.c:1237
#define NIL
Definition: pg_list.h:68

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), listenChannels, and NIL.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 974 of file async.c.

975 {
976  ListCell *p;
977 
978  /*
979  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
980  * return as soon as possible
981  */
983  return;
984 
985  if (Trace_notify)
986  elog(DEBUG1, "AtCommit_Notify");
987 
988  /* Perform any pending listen/unlisten actions */
989  if (pendingActions != NULL)
990  {
991  foreach(p, pendingActions->actions)
992  {
993  ListenAction *actrec = (ListenAction *) lfirst(p);
994 
995  switch (actrec->action)
996  {
997  case LISTEN_LISTEN:
998  Exec_ListenCommit(actrec->channel);
999  break;
1000  case LISTEN_UNLISTEN:
1001  Exec_UnlistenCommit(actrec->channel);
1002  break;
1003  case LISTEN_UNLISTEN_ALL:
1005  break;
1006  }
1007  }
1008  }
1009 
1010  /* If no longer listening to anything, get out of listener array */
1013 
1014  /*
1015  * Send signals to listening backends. We need do this only if there are
1016  * pending notifies, which were previously added to the shared queue by
1017  * PreCommit_Notify().
1018  */
1019  if (pendingNotifies != NULL)
1020  SignalBackends();
1021 
1022  /*
1023  * If it's time to try to advance the global tail pointer, do that.
1024  *
1025  * (It might seem odd to do this in the sender, when more than likely the
1026  * listeners won't yet have read the messages we just sent. However,
1027  * there's less contention if only the sender does it, and there is little
1028  * need for urgency in advancing the global tail. So this typically will
1029  * be clearing out messages that were sent some time ago.)
1030  */
1031  if (tryAdvanceTail)
1032  {
1033  tryAdvanceTail = false;
1035  }
1036 
1037  /* And clean up */
1039 }
static void SignalBackends(void)
Definition: async.c:1573
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1142
static bool tryAdvanceTail
Definition: async.c:424
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1169
static void asyncQueueAdvanceTail(void)
Definition: async.c:2100
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1200
#define lfirst(lc)
Definition: pg_list.h:172
List * actions
Definition: async.c:350
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:344
ListenActionKind action
Definition: async.c:343

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog(), Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 842 of file async.c.

843 {
844  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
846  ereport(ERROR,
847  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
848  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
849 }

References ereport, errcode(), errmsg(), ERROR, pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1753 of file async.c.

1754 {
1755  int my_level = GetCurrentTransactionNestLevel();
1756 
1757  /*
1758  * All we have to do is pop the stack --- the actions/notifies made in
1759  * this subxact are no longer interesting, and the space will be freed
1760  * when CurTransactionContext is recycled. We still have to free the
1761  * ActionList and NotificationList objects themselves, though, because
1762  * those are allocated in TopTransactionContext.
1763  *
1764  * Note that there might be no entries at all, or no entries for the
1765  * current subtransaction level, either because none were ever created, or
1766  * because we reentered this routine due to trouble during subxact abort.
1767  */
1768  while (pendingActions != NULL &&
1769  pendingActions->nestingLevel >= my_level)
1770  {
1771  ActionList *childPendingActions = pendingActions;
1772 
1774  pfree(childPendingActions);
1775  }
1776 
1777  while (pendingNotifies != NULL &&
1778  pendingNotifies->nestingLevel >= my_level)
1779  {
1780  NotificationList *childPendingNotifies = pendingNotifies;
1781 
1783  pfree(childPendingNotifies);
1784  }
1785 }
int nestingLevel
Definition: async.c:349
struct ActionList * upper
Definition: async.c:351

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1683 of file async.c.

1684 {
1685  int my_level = GetCurrentTransactionNestLevel();
1686 
1687  /* If there are actions at our nesting level, we must reparent them. */
1688  if (pendingActions != NULL &&
1689  pendingActions->nestingLevel >= my_level)
1690  {
1691  if (pendingActions->upper == NULL ||
1692  pendingActions->upper->nestingLevel < my_level - 1)
1693  {
1694  /* nothing to merge; give the whole thing to the parent */
1696  }
1697  else
1698  {
1699  ActionList *childPendingActions = pendingActions;
1700 
1702 
1703  /*
1704  * Mustn't try to eliminate duplicates here --- see queue_listen()
1705  */
1708  childPendingActions->actions);
1709  pfree(childPendingActions);
1710  }
1711  }
1712 
1713  /* If there are notifies at our nesting level, we must reparent them. */
1714  if (pendingNotifies != NULL &&
1715  pendingNotifies->nestingLevel >= my_level)
1716  {
1717  Assert(pendingNotifies->nestingLevel == my_level);
1718 
1719  if (pendingNotifies->upper == NULL ||
1720  pendingNotifies->upper->nestingLevel < my_level - 1)
1721  {
1722  /* nothing to merge; give the whole thing to the parent */
1724  }
1725  else
1726  {
1727  /*
1728  * Formerly, we didn't bother to eliminate duplicates here, but
1729  * now we must, else we fall foul of "Assert(!found)", either here
1730  * or during a later attempt to build the parent-level hashtable.
1731  */
1732  NotificationList *childPendingNotifies = pendingNotifies;
1733  ListCell *l;
1734 
1736  /* Insert all the subxact's events into parent, except for dups */
1737  foreach(l, childPendingNotifies->events)
1738  {
1739  Notification *childn = (Notification *) lfirst(l);
1740 
1741  if (!AsyncExistsPendingNotify(childn))
1742  AddEventToPendingNotifies(childn);
1743  }
1744  pfree(childPendingNotifies);
1745  }
1746  }
1747 }
Assert(fmt[strlen(fmt) - 1] !='\n')
List * list_concat(List *list1, const List *list2)
Definition: list.c:561

References ActionList::actions, AddEventToPendingNotifies(), Assert(), AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1796 of file async.c.

1797 {
1798  /*
1799  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1800  * you do here.
1801  */
1802 
1803  /* signal that work needs to be done */
1804  notifyInterruptPending = true;
1805 
1806  /* make sure the event is processed in due course */
1807  SetLatch(MyLatch);
1808 }
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:415
struct Latch * MyLatch
Definition: globals.c:59
void SetLatch(Latch *latch)
Definition: latch.c:633

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2216 of file async.c.

2217 {
2219  {
2221 
2223  pq_sendint32(&buf, srcPid);
2224  pq_sendstring(&buf, channel);
2225  pq_sendstring(&buf, payload);
2226  pq_endmessage(&buf);
2227 
2228  /*
2229  * NOTE: we do not do pq_flush() here. Some level of caller will
2230  * handle it later, allowing this message to be combined into a packet
2231  * with other ones.
2232  */
2233  }
2234  else
2235  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2236 }
@ DestRemote
Definition: dest.h:89
#define INFO
Definition: elog.h:34
static char * buf
Definition: pg_test_fsync.c:73
CommandDest whereToSendOutput
Definition: postgres.c:89
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:198
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define PqMsg_NotificationResponse
Definition: protocol.h:41

References buf, DestRemote, elog(), INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 867 of file async.c.

868 {
869  ListCell *p;
870 
872  return; /* no relevant statements in this xact */
873 
874  if (Trace_notify)
875  elog(DEBUG1, "PreCommit_Notify");
876 
877  /* Preflight for any pending listen/unlisten actions */
878  if (pendingActions != NULL)
879  {
880  foreach(p, pendingActions->actions)
881  {
882  ListenAction *actrec = (ListenAction *) lfirst(p);
883 
884  switch (actrec->action)
885  {
886  case LISTEN_LISTEN:
888  break;
889  case LISTEN_UNLISTEN:
890  /* there is no Exec_UnlistenPreCommit() */
891  break;
892  case LISTEN_UNLISTEN_ALL:
893  /* there is no Exec_UnlistenAllPreCommit() */
894  break;
895  }
896  }
897  }
898 
899  /* Queue any pending notifies (must happen after the above) */
900  if (pendingNotifies)
901  {
902  ListCell *nextNotify;
903 
904  /*
905  * Make sure that we have an XID assigned to the current transaction.
906  * GetCurrentTransactionId is cheap if we already have an XID, but not
907  * so cheap if we don't, and we'd prefer not to do that work while
908  * holding NotifyQueueLock.
909  */
910  (void) GetCurrentTransactionId();
911 
912  /*
913  * Serialize writers by acquiring a special lock that we hold till
914  * after commit. This ensures that queue entries appear in commit
915  * order, and in particular that there are never uncommitted queue
916  * entries ahead of committed ones, so an uncommitted transaction
917  * can't block delivery of deliverable notifications.
918  *
919  * We use a heavyweight lock so that it'll automatically be released
920  * after either commit or abort. This also allows deadlocks to be
921  * detected, though really a deadlock shouldn't be possible here.
922  *
923  * The lock is on "database 0", which is pretty ugly but it doesn't
924  * seem worth inventing a special locktag category just for this.
925  * (Historical note: before PG 9.0, a similar lock on "database 0" was
926  * used by the flatfiles mechanism.)
927  */
928  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
930 
931  /* Now push the notifications into the queue */
932  nextNotify = list_head(pendingNotifies->events);
933  while (nextNotify != NULL)
934  {
935  /*
936  * Add the pending notifications to the queue. We acquire and
937  * release NotifyQueueLock once per page, which might be overkill
938  * but it does allow readers to get in while we're doing this.
939  *
940  * A full queue is very uncommon and should really not happen,
941  * given that we have so much space available in the SLRU pages.
942  * Nevertheless we need to deal with this possibility. Note that
943  * when we get here we are in the process of committing our
944  * transaction, but we have not yet committed to clog, so at this
945  * point in time we can still roll the transaction back.
946  */
947  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
949  if (asyncQueueIsFull())
950  ereport(ERROR,
951  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
952  errmsg("too many notifications in the NOTIFY queue")));
953  nextNotify = asyncQueueAddEntries(nextNotify);
954  LWLockRelease(NotifyQueueLock);
955  }
956 
957  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
958  }
959 }
static void Exec_ListenPreCommit(void)
Definition: async.c:1047
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1362
static void asyncQueueFillWarning(void)
Definition: async.c:1519
static bool asyncQueueIsFull(void)
Definition: async.c:1278
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessExclusiveLock
Definition: lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_EXCLUSIVE
Definition: lwlock.h:116
static ListCell * list_head(const List *l)
Definition: pg_list.h:128
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:445

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog(), ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1826 of file async.c.

1827 {
1829  return; /* not really idle */
1830 
1831  /* Loop in case another signal arrives while sending messages */
1832  while (notifyInterruptPending)
1833  ProcessIncomingNotify(flush);
1834 }
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2175
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4850

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

Variable Documentation

◆ max_notify_queue_pages

PGDLLIMPORT int max_notify_queue_pages
extern

Definition at line 430 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ notifyInterruptPending

PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending
extern

◆ Trace_notify