PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 

Variables

PGDLLIMPORT bool Trace_notify
 
PGDLLIMPORT int max_notify_queue_pages
 
PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending
 

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 738 of file async.c.

739 {
740  if (Trace_notify)
741  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
742 
743  queue_listen(LISTEN_LISTEN, channel);
744 }
@ LISTEN_LISTEN
Definition: async.c:334
bool Trace_notify
Definition: async.c:425
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:690
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:224
int MyProcPid
Definition: globals.c:45

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 591 of file async.c.

592 {
593  int my_level = GetCurrentTransactionNestLevel();
594  size_t channel_len;
595  size_t payload_len;
596  Notification *n;
597  MemoryContext oldcontext;
598 
599  if (IsParallelWorker())
600  elog(ERROR, "cannot send notifications from a parallel worker");
601 
602  if (Trace_notify)
603  elog(DEBUG1, "Async_Notify(%s)", channel);
604 
605  channel_len = channel ? strlen(channel) : 0;
606  payload_len = payload ? strlen(payload) : 0;
607 
608  /* a channel name must be specified */
609  if (channel_len == 0)
610  ereport(ERROR,
611  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
612  errmsg("channel name cannot be empty")));
613 
614  /* enforce length limits */
615  if (channel_len >= NAMEDATALEN)
616  ereport(ERROR,
617  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
618  errmsg("channel name too long")));
619 
620  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
621  ereport(ERROR,
622  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
623  errmsg("payload string too long")));
624 
625  /*
626  * We must construct the Notification entry, even if we end up not using
627  * it, in order to compare it cheaply to existing list entries.
628  *
629  * The notification list needs to live until end of transaction, so store
630  * it in the transaction context.
631  */
633 
634  n = (Notification *) palloc(offsetof(Notification, data) +
635  channel_len + payload_len + 2);
636  n->channel_len = channel_len;
637  n->payload_len = payload_len;
638  strcpy(n->data, channel);
639  if (payload)
640  strcpy(n->data + channel_len + 1, payload);
641  else
642  n->data[channel_len + 1] = '\0';
643 
644  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
645  {
646  NotificationList *notifies;
647 
648  /*
649  * First notify event in current (sub)xact. Note that we allocate the
650  * NotificationList in TopTransactionContext; the nestingLevel might
651  * get changed later by AtSubCommit_Notify.
652  */
653  notifies = (NotificationList *)
655  sizeof(NotificationList));
656  notifies->nestingLevel = my_level;
657  notifies->events = list_make1(n);
658  /* We certainly don't need a hashtable yet */
659  notifies->hashtab = NULL;
660  notifies->upper = pendingNotifies;
661  pendingNotifies = notifies;
662  }
663  else
664  {
665  /* Now check for duplicates */
667  {
668  /* It's a dup, so forget it */
669  pfree(n);
670  MemoryContextSwitchTo(oldcontext);
671  return;
672  }
673 
674  /* Append more events to existing list */
676  }
677 
678  MemoryContextSwitchTo(oldcontext);
679 }
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2248
static NotificationList * pendingNotifies
Definition: async.c:404
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2289
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:163
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsParallelWorker()
Definition: parallel.h:60
MemoryContext TopTransactionContext
Definition: mcxt.c:154
void pfree(void *pointer)
Definition: mcxt.c:1520
MemoryContext CurTransactionContext
Definition: mcxt.c:155
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1180
void * palloc(Size size)
Definition: mcxt.c:1316
#define NAMEDATALEN
const void * data
#define list_make1(x1)
Definition: pg_list.h:212
MemoryContextSwitchTo(old_ctx)
int nestingLevel
Definition: async.c:391
HTAB * hashtab
Definition: async.c:393
List * events
Definition: async.c:392
struct NotificationList * upper
Definition: async.c:394
uint16 payload_len
Definition: async.c:384
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:386
uint16 channel_len
Definition: async.c:383
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:926

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 752 of file async.c.

753 {
754  if (Trace_notify)
755  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
756 
757  /* If we couldn't possibly be listening, no need to queue anything */
758  if (pendingActions == NULL && !unlistenExitRegistered)
759  return;
760 
761  queue_listen(LISTEN_UNLISTEN, channel);
762 }
static ActionList * pendingActions
Definition: async.c:352
@ LISTEN_UNLISTEN
Definition: async.c:335
static bool unlistenExitRegistered
Definition: async.c:416

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 770 of file async.c.

771 {
772  if (Trace_notify)
773  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
774 
775  /* If we couldn't possibly be listening, no need to queue anything */
776  if (pendingActions == NULL && !unlistenExitRegistered)
777  return;
778 
780 }
@ LISTEN_UNLISTEN_ALL
Definition: async.c:336

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 502 of file async.c.

503 {
504  bool found;
505  Size size;
506 
507  /*
508  * Create or attach to the AsyncQueueControl structure.
509  */
511  size = add_size(size, offsetof(AsyncQueueControl, backend));
512 
514  ShmemInitStruct("Async Queue Control", size, &found);
515 
516  if (!found)
517  {
518  /* First time through, so initialize it */
519  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
520  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
521  QUEUE_STOP_PAGE = 0;
524  for (int i = 0; i < MaxBackends; i++)
525  {
530  }
531  }
532 
533  /*
534  * Set up SLRU management of the pg_notify data. Note that long segment
535  * names are used in order to avoid wraparound.
536  */
537  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
538  SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
540  SYNC_HANDLER_NONE, true);
541 
542  if (!found)
543  {
544  /*
545  * During start or reboot, clean out the pg_notify directory.
546  */
548  }
549 }
#define QUEUE_FIRST_LISTENER
Definition: async.c:299
#define QUEUE_BACKEND_POS(i)
Definition: async.c:303
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:203
static AsyncQueueControl * asyncQueueControl
Definition: async.c:294
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition: async.c:476
#define QUEUE_TAIL
Definition: async.c:297
#define QUEUE_BACKEND_PID(i)
Definition: async.c:300
#define NotifyCtl
Definition: async.c:310
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:302
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:301
#define QUEUE_HEAD
Definition: async.c:296
#define QUEUE_STOP_PAGE
Definition: async.c:298
size_t Size
Definition: c.h:605
int MaxBackends
Definition: globals.c:143
int notify_buffers
Definition: globals.c:165
int i
Definition: isn.c:73
@ LWTRANCHE_NOTIFY_SLRU
Definition: lwlock.h:213
@ LWTRANCHE_NOTIFY_BUFFER
Definition: lwlock.h:184
#define InvalidPid
Definition: miscadmin.h:32
#define InvalidOid
Definition: postgres_ext.h:36
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:238
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1774
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1727
TimestampTz lastQueueFillWarn
Definition: async.c:290
@ SYNC_HANDLER_NONE
Definition: sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), size, SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 485 of file async.c.

486 {
487  Size size;
488 
489  /* This had better match AsyncShmemInit */
491  size = add_size(size, offsetof(AsyncQueueControl, backend));
492 
494 
495  return size;
496 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:184

References add_size(), MaxBackends, mul_size(), notify_buffers, SimpleLruShmemSize(), and size.

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1671 of file async.c.

1672 {
1673  /*
1674  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1675  * we have registered as a listener but have not made any entry in
1676  * listenChannels. In that case, deregister again.
1677  */
1680 
1681  /* And clean up */
1683 }
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2378
static List * listenChannels
Definition: async.c:320
static bool amRegisteredListener
Definition: async.c:419
static void asyncQueueUnregister(void)
Definition: async.c:1231
#define NIL
Definition: pg_list.h:68

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), listenChannels, and NIL.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 968 of file async.c.

969 {
970  ListCell *p;
971 
972  /*
973  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
974  * return as soon as possible
975  */
977  return;
978 
979  if (Trace_notify)
980  elog(DEBUG1, "AtCommit_Notify");
981 
982  /* Perform any pending listen/unlisten actions */
983  if (pendingActions != NULL)
984  {
985  foreach(p, pendingActions->actions)
986  {
987  ListenAction *actrec = (ListenAction *) lfirst(p);
988 
989  switch (actrec->action)
990  {
991  case LISTEN_LISTEN:
992  Exec_ListenCommit(actrec->channel);
993  break;
994  case LISTEN_UNLISTEN:
995  Exec_UnlistenCommit(actrec->channel);
996  break;
997  case LISTEN_UNLISTEN_ALL:
999  break;
1000  }
1001  }
1002  }
1003 
1004  /* If no longer listening to anything, get out of listener array */
1007 
1008  /*
1009  * Send signals to listening backends. We need do this only if there are
1010  * pending notifies, which were previously added to the shared queue by
1011  * PreCommit_Notify().
1012  */
1013  if (pendingNotifies != NULL)
1014  SignalBackends();
1015 
1016  /*
1017  * If it's time to try to advance the global tail pointer, do that.
1018  *
1019  * (It might seem odd to do this in the sender, when more than likely the
1020  * listeners won't yet have read the messages we just sent. However,
1021  * there's less contention if only the sender does it, and there is little
1022  * need for urgency in advancing the global tail. So this typically will
1023  * be clearing out messages that were sent some time ago.)
1024  */
1025  if (tryAdvanceTail)
1026  {
1027  tryAdvanceTail = false;
1029  }
1030 
1031  /* And clean up */
1033 }
static void SignalBackends(void)
Definition: async.c:1581
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1136
static bool tryAdvanceTail
Definition: async.c:422
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1163
static void asyncQueueAdvanceTail(void)
Definition: async.c:2108
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1194
#define lfirst(lc)
Definition: pg_list.h:172
List * actions
Definition: async.c:348
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:342
ListenActionKind action
Definition: async.c:341

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 836 of file async.c.

837 {
838  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
840  ereport(ERROR,
841  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
843 }

References ereport, errcode(), errmsg(), ERROR, pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1761 of file async.c.

1762 {
1763  int my_level = GetCurrentTransactionNestLevel();
1764 
1765  /*
1766  * All we have to do is pop the stack --- the actions/notifies made in
1767  * this subxact are no longer interesting, and the space will be freed
1768  * when CurTransactionContext is recycled. We still have to free the
1769  * ActionList and NotificationList objects themselves, though, because
1770  * those are allocated in TopTransactionContext.
1771  *
1772  * Note that there might be no entries at all, or no entries for the
1773  * current subtransaction level, either because none were ever created, or
1774  * because we reentered this routine due to trouble during subxact abort.
1775  */
1776  while (pendingActions != NULL &&
1777  pendingActions->nestingLevel >= my_level)
1778  {
1779  ActionList *childPendingActions = pendingActions;
1780 
1782  pfree(childPendingActions);
1783  }
1784 
1785  while (pendingNotifies != NULL &&
1786  pendingNotifies->nestingLevel >= my_level)
1787  {
1788  NotificationList *childPendingNotifies = pendingNotifies;
1789 
1791  pfree(childPendingNotifies);
1792  }
1793 }
int nestingLevel
Definition: async.c:347
struct ActionList * upper
Definition: async.c:349

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1691 of file async.c.

1692 {
1693  int my_level = GetCurrentTransactionNestLevel();
1694 
1695  /* If there are actions at our nesting level, we must reparent them. */
1696  if (pendingActions != NULL &&
1697  pendingActions->nestingLevel >= my_level)
1698  {
1699  if (pendingActions->upper == NULL ||
1700  pendingActions->upper->nestingLevel < my_level - 1)
1701  {
1702  /* nothing to merge; give the whole thing to the parent */
1704  }
1705  else
1706  {
1707  ActionList *childPendingActions = pendingActions;
1708 
1710 
1711  /*
1712  * Mustn't try to eliminate duplicates here --- see queue_listen()
1713  */
1716  childPendingActions->actions);
1717  pfree(childPendingActions);
1718  }
1719  }
1720 
1721  /* If there are notifies at our nesting level, we must reparent them. */
1722  if (pendingNotifies != NULL &&
1723  pendingNotifies->nestingLevel >= my_level)
1724  {
1725  Assert(pendingNotifies->nestingLevel == my_level);
1726 
1727  if (pendingNotifies->upper == NULL ||
1728  pendingNotifies->upper->nestingLevel < my_level - 1)
1729  {
1730  /* nothing to merge; give the whole thing to the parent */
1732  }
1733  else
1734  {
1735  /*
1736  * Formerly, we didn't bother to eliminate duplicates here, but
1737  * now we must, else we fall foul of "Assert(!found)", either here
1738  * or during a later attempt to build the parent-level hashtable.
1739  */
1740  NotificationList *childPendingNotifies = pendingNotifies;
1741  ListCell *l;
1742 
1744  /* Insert all the subxact's events into parent, except for dups */
1745  foreach(l, childPendingNotifies->events)
1746  {
1747  Notification *childn = (Notification *) lfirst(l);
1748 
1749  if (!AsyncExistsPendingNotify(childn))
1750  AddEventToPendingNotifies(childn);
1751  }
1752  pfree(childPendingNotifies);
1753  }
1754  }
1755 }
#define Assert(condition)
Definition: c.h:858
List * list_concat(List *list1, const List *list2)
Definition: list.c:561

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1804 of file async.c.

1805 {
1806  /*
1807  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1808  * you do here.
1809  */
1810 
1811  /* signal that work needs to be done */
1812  notifyInterruptPending = true;
1813 
1814  /* make sure the event is processed in due course */
1815  SetLatch(MyLatch);
1816 }
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:413
struct Latch * MyLatch
Definition: globals.c:60
void SetLatch(Latch *latch)
Definition: latch.c:632

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2224 of file async.c.

2225 {
2227  {
2229 
2231  pq_sendint32(&buf, srcPid);
2232  pq_sendstring(&buf, channel);
2233  pq_sendstring(&buf, payload);
2234  pq_endmessage(&buf);
2235 
2236  /*
2237  * NOTE: we do not do pq_flush() here. Some level of caller will
2238  * handle it later, allowing this message to be combined into a packet
2239  * with other ones.
2240  */
2241  }
2242  else
2243  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2244 }
@ DestRemote
Definition: dest.h:89
#define INFO
Definition: elog.h:34
static char * buf
Definition: pg_test_fsync.c:73
CommandDest whereToSendOutput
Definition: postgres.c:90
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
#define PqMsg_NotificationResponse
Definition: protocol.h:41

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 861 of file async.c.

862 {
863  ListCell *p;
864 
866  return; /* no relevant statements in this xact */
867 
868  if (Trace_notify)
869  elog(DEBUG1, "PreCommit_Notify");
870 
871  /* Preflight for any pending listen/unlisten actions */
872  if (pendingActions != NULL)
873  {
874  foreach(p, pendingActions->actions)
875  {
876  ListenAction *actrec = (ListenAction *) lfirst(p);
877 
878  switch (actrec->action)
879  {
880  case LISTEN_LISTEN:
882  break;
883  case LISTEN_UNLISTEN:
884  /* there is no Exec_UnlistenPreCommit() */
885  break;
886  case LISTEN_UNLISTEN_ALL:
887  /* there is no Exec_UnlistenAllPreCommit() */
888  break;
889  }
890  }
891  }
892 
893  /* Queue any pending notifies (must happen after the above) */
894  if (pendingNotifies)
895  {
896  ListCell *nextNotify;
897 
898  /*
899  * Make sure that we have an XID assigned to the current transaction.
900  * GetCurrentTransactionId is cheap if we already have an XID, but not
901  * so cheap if we don't, and we'd prefer not to do that work while
902  * holding NotifyQueueLock.
903  */
904  (void) GetCurrentTransactionId();
905 
906  /*
907  * Serialize writers by acquiring a special lock that we hold till
908  * after commit. This ensures that queue entries appear in commit
909  * order, and in particular that there are never uncommitted queue
910  * entries ahead of committed ones, so an uncommitted transaction
911  * can't block delivery of deliverable notifications.
912  *
913  * We use a heavyweight lock so that it'll automatically be released
914  * after either commit or abort. This also allows deadlocks to be
915  * detected, though really a deadlock shouldn't be possible here.
916  *
917  * The lock is on "database 0", which is pretty ugly but it doesn't
918  * seem worth inventing a special locktag category just for this.
919  * (Historical note: before PG 9.0, a similar lock on "database 0" was
920  * used by the flatfiles mechanism.)
921  */
922  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
924 
925  /* Now push the notifications into the queue */
926  nextNotify = list_head(pendingNotifies->events);
927  while (nextNotify != NULL)
928  {
929  /*
930  * Add the pending notifications to the queue. We acquire and
931  * release NotifyQueueLock once per page, which might be overkill
932  * but it does allow readers to get in while we're doing this.
933  *
934  * A full queue is very uncommon and should really not happen,
935  * given that we have so much space available in the SLRU pages.
936  * Nevertheless we need to deal with this possibility. Note that
937  * when we get here we are in the process of committing our
938  * transaction, but we have not yet committed to clog, so at this
939  * point in time we can still roll the transaction back.
940  */
941  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
943  if (asyncQueueIsFull())
944  ereport(ERROR,
945  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
946  errmsg("too many notifications in the NOTIFY queue")));
947  nextNotify = asyncQueueAddEntries(nextNotify);
948  LWLockRelease(NotifyQueueLock);
949  }
950 
951  /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
952  }
953 }
static void Exec_ListenPreCommit(void)
Definition: async.c:1041
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1356
static void asyncQueueFillWarning(void)
Definition: async.c:1527
static bool asyncQueueIsFull(void)
Definition: async.c:1272
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1083
#define AccessExclusiveLock
Definition: lockdefs.h:43
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static ListCell * list_head(const List *l)
Definition: pg_list.h:128
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:451

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1834 of file async.c.

1835 {
1837  return; /* not really idle */
1838 
1839  /* Loop in case another signal arrives while sending messages */
1840  while (notifyInterruptPending)
1841  ProcessIncomingNotify(flush);
1842 }
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2183
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4933

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

Variable Documentation

◆ max_notify_queue_pages

PGDLLIMPORT int max_notify_queue_pages
extern

Definition at line 428 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ notifyInterruptPending

PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending
extern

◆ Trace_notify