PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_NOTIFY_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void ProcessCompletedNotifies (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 

Variables

bool Trace_notify
 
volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

◆ NUM_NOTIFY_BUFFERS

#define NUM_NOTIFY_BUFFERS   8

Definition at line 21 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 757 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

758 {
759  if (Trace_notify)
760  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
761 
762  queue_listen(LISTEN_LISTEN, channel);
763 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:709
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 610 of file async.c.

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, AsyncQueueEntry::data, Notification::data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, offsetof, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

611 {
612  int my_level = GetCurrentTransactionNestLevel();
613  size_t channel_len;
614  size_t payload_len;
615  Notification *n;
616  MemoryContext oldcontext;
617 
618  if (IsParallelWorker())
619  elog(ERROR, "cannot send notifications from a parallel worker");
620 
621  if (Trace_notify)
622  elog(DEBUG1, "Async_Notify(%s)", channel);
623 
624  channel_len = channel ? strlen(channel) : 0;
625  payload_len = payload ? strlen(payload) : 0;
626 
627  /* a channel name must be specified */
628  if (channel_len == 0)
629  ereport(ERROR,
630  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
631  errmsg("channel name cannot be empty")));
632 
633  /* enforce length limits */
634  if (channel_len >= NAMEDATALEN)
635  ereport(ERROR,
636  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
637  errmsg("channel name too long")));
638 
639  if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
640  ereport(ERROR,
641  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
642  errmsg("payload string too long")));
643 
644  /*
645  * We must construct the Notification entry, even if we end up not using
646  * it, in order to compare it cheaply to existing list entries.
647  *
648  * The notification list needs to live until end of transaction, so store
649  * it in the transaction context.
650  */
652 
653  n = (Notification *) palloc(offsetof(Notification, data) +
654  channel_len + payload_len + 2);
655  n->channel_len = channel_len;
656  n->payload_len = payload_len;
657  strcpy(n->data, channel);
658  if (payload)
659  strcpy(n->data + channel_len + 1, payload);
660  else
661  n->data[channel_len + 1] = '\0';
662 
663  if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
664  {
665  NotificationList *notifies;
666 
667  /*
668  * First notify event in current (sub)xact. Note that we allocate the
669  * NotificationList in TopTransactionContext; the nestingLevel might
670  * get changed later by AtSubCommit_Notify.
671  */
672  notifies = (NotificationList *)
674  sizeof(NotificationList));
675  notifies->nestingLevel = my_level;
676  notifies->events = list_make1(n);
677  /* We certainly don't need a hashtable yet */
678  notifies->hashtab = NULL;
679  notifies->upper = pendingNotifies;
680  pendingNotifies = notifies;
681  }
682  else
683  {
684  /* Now check for duplicates */
686  {
687  /* It's a dup, so forget it */
688  pfree(n);
689  MemoryContextSwitchTo(oldcontext);
690  return;
691  }
692 
693  /* Append more events to existing list */
695  }
696 
697  MemoryContextSwitchTo(oldcontext);
698 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:397
MemoryContext TopTransactionContext
Definition: mcxt.c:49
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:50
int errcode(int sqlerrcode)
Definition: elog.c:610
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2287
int nestingLevel
Definition: async.c:396
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2328
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:156
struct NotificationList * upper
Definition: async.c:399
#define IsParallelWorker()
Definition: parallel.h:61
#define ereport(elevel,...)
Definition: elog.h:144
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
HTAB * hashtab
Definition: async.c:398
uint16 channel_len
Definition: async.c:388
uint16 payload_len
Definition: async.c:389
bool Trace_notify
Definition: async.c:433
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:214
#define offsetof(type, field)
Definition: c.h:661

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 771 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

772 {
773  if (Trace_notify)
774  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
775 
776  /* If we couldn't possibly be listening, no need to queue anything */
777  if (pendingActions == NULL && !unlistenExitRegistered)
778  return;
779 
780  queue_listen(LISTEN_UNLISTEN, channel);
781 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:709
static ActionList * pendingActions
Definition: async.c:357
static bool unlistenExitRegistered
Definition: async.c:421
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 789 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

790 {
791  if (Trace_notify)
792  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
793 
794  /* If we couldn't possibly be listening, no need to queue anything */
795  if (pendingActions == NULL && !unlistenExitRegistered)
796  return;
797 
799 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:709
static ActionList * pendingActions
Definition: async.c:357
static bool unlistenExitRegistered
Definition: async.c:421
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 518 of file async.c.

References add_size(), asyncQueuePagePrecedes(), i, InvalidBackendId, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, MaxBackends, mul_size(), NotifyCtl, NUM_NOTIFY_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

519 {
520  bool found;
521  Size size;
522 
523  /*
524  * Create or attach to the AsyncQueueControl structure.
525  *
526  * The used entries in the backend[] array run from 1 to MaxBackends; the
527  * zero'th entry is unused but must be allocated.
528  */
529  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
530  size = add_size(size, offsetof(AsyncQueueControl, backend));
531 
533  ShmemInitStruct("Async Queue Control", size, &found);
534 
535  if (!found)
536  {
537  /* First time through, so initialize it */
538  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
539  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
542  /* zero'th entry won't be used, but let's initialize it anyway */
543  for (int i = 0; i <= MaxBackends; i++)
544  {
549  }
550  }
551 
552  /*
553  * Set up SLRU management of the pg_notify data.
554  */
555  NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
557  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER);
558  /* Override default assumption that writes should be fsync'd */
559  NotifyCtl->do_fsync = false;
560 
561  if (!found)
562  {
563  /*
564  * During start or reboot, clean out the pg_notify directory.
565  */
567  }
568 }
#define QUEUE_TAIL
Definition: async.c:285
#define QUEUE_BACKEND_PID(i)
Definition: async.c:287
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1401
#define QUEUE_HEAD
Definition: async.c:284
#define NotifyCtl
Definition: async.c:297
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
#define QUEUE_FIRST_LISTENER
Definition: async.c:286
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
int MaxBackends
Definition: globals.c:135
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:196
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:492
static AsyncQueueControl * asyncQueueControl
Definition: async.c:282
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define InvalidBackendId
Definition: backendid.h:23
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:288
#define InvalidOid
Definition: postgres_ext.h:36
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:289
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1424
size_t Size
Definition: c.h:466
int i
TimestampTz lastQueueFillWarn
Definition: async.c:277
#define QUEUE_BACKEND_POS(i)
Definition: async.c:290
#define offsetof(type, field)
Definition: c.h:661
#define InvalidPid
Definition: miscadmin.h:32
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:175

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 501 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_NOTIFY_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

502 {
503  Size size;
504 
505  /* This had better match AsyncShmemInit */
506  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
507  size = add_size(size, offsetof(AsyncQueueControl, backend));
508 
510 
511  return size;
512 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:144
#define NUM_NOTIFY_BUFFERS
Definition: async.h:21
int MaxBackends
Definition: globals.c:135
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:661

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1743 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1744 {
1745  /*
1746  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1747  * we have registered as a listener but have not made any entry in
1748  * listenChannels. In that case, deregister again.
1749  */
1752 
1753  /* And clean up */
1755 }
#define NIL
Definition: pg_list.h:65
static List * listenChannels
Definition: async.c:325
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2422
static void asyncQueueUnregister(void)
Definition: async.c:1301
static bool amRegisteredListener
Definition: async.c:424

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 982 of file async.c.

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

983 {
984  ListCell *p;
985 
986  /*
987  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
988  * return as soon as possible
989  */
991  return;
992 
993  if (Trace_notify)
994  elog(DEBUG1, "AtCommit_Notify");
995 
996  /* Perform any pending listen/unlisten actions */
997  if (pendingActions != NULL)
998  {
999  foreach(p, pendingActions->actions)
1000  {
1001  ListenAction *actrec = (ListenAction *) lfirst(p);
1002 
1003  switch (actrec->action)
1004  {
1005  case LISTEN_LISTEN:
1006  Exec_ListenCommit(actrec->channel);
1007  break;
1008  case LISTEN_UNLISTEN:
1009  Exec_UnlistenCommit(actrec->channel);
1010  break;
1011  case LISTEN_UNLISTEN_ALL:
1013  break;
1014  }
1015  }
1016  }
1017 
1018  /* If no longer listening to anything, get out of listener array */
1021 
1022  /* And clean up */
1024 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
static ActionList * pendingActions
Definition: async.c:357
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1185
static List * listenChannels
Definition: async.c:325
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2422
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1154
static NotificationList * pendingNotifies
Definition: async.c:409
List * actions
Definition: async.c:353
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1127
static void asyncQueueUnregister(void)
Definition: async.c:1301
static bool amRegisteredListener
Definition: async.c:424
#define lfirst(lc)
Definition: pg_list.h:190
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:347
ListenActionKind action
Definition: async.c:346

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 855 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

856 {
857  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
859  ereport(ERROR,
860  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
861  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
862 }
static ActionList * pendingActions
Definition: async.c:357
int errcode(int sqlerrcode)
Definition: elog.c:610
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1833 of file async.c.

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

1834 {
1835  int my_level = GetCurrentTransactionNestLevel();
1836 
1837  /*
1838  * All we have to do is pop the stack --- the actions/notifies made in
1839  * this subxact are no longer interesting, and the space will be freed
1840  * when CurTransactionContext is recycled. We still have to free the
1841  * ActionList and NotificationList objects themselves, though, because
1842  * those are allocated in TopTransactionContext.
1843  *
1844  * Note that there might be no entries at all, or no entries for the
1845  * current subtransaction level, either because none were ever created, or
1846  * because we reentered this routine due to trouble during subxact abort.
1847  */
1848  while (pendingActions != NULL &&
1849  pendingActions->nestingLevel >= my_level)
1850  {
1851  ActionList *childPendingActions = pendingActions;
1852 
1854  pfree(childPendingActions);
1855  }
1856 
1857  while (pendingNotifies != NULL &&
1858  pendingNotifies->nestingLevel >= my_level)
1859  {
1860  NotificationList *childPendingNotifies = pendingNotifies;
1861 
1863  pfree(childPendingNotifies);
1864  }
1865 }
static ActionList * pendingActions
Definition: async.c:357
int nestingLevel
Definition: async.c:396
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:409
struct NotificationList * upper
Definition: async.c:399
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1763 of file async.c.

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

1764 {
1765  int my_level = GetCurrentTransactionNestLevel();
1766 
1767  /* If there are actions at our nesting level, we must reparent them. */
1768  if (pendingActions != NULL &&
1769  pendingActions->nestingLevel >= my_level)
1770  {
1771  if (pendingActions->upper == NULL ||
1772  pendingActions->upper->nestingLevel < my_level - 1)
1773  {
1774  /* nothing to merge; give the whole thing to the parent */
1776  }
1777  else
1778  {
1779  ActionList *childPendingActions = pendingActions;
1780 
1782 
1783  /*
1784  * Mustn't try to eliminate duplicates here --- see queue_listen()
1785  */
1788  childPendingActions->actions);
1789  pfree(childPendingActions);
1790  }
1791  }
1792 
1793  /* If there are notifies at our nesting level, we must reparent them. */
1794  if (pendingNotifies != NULL &&
1795  pendingNotifies->nestingLevel >= my_level)
1796  {
1797  Assert(pendingNotifies->nestingLevel == my_level);
1798 
1799  if (pendingNotifies->upper == NULL ||
1800  pendingNotifies->upper->nestingLevel < my_level - 1)
1801  {
1802  /* nothing to merge; give the whole thing to the parent */
1804  }
1805  else
1806  {
1807  /*
1808  * Formerly, we didn't bother to eliminate duplicates here, but
1809  * now we must, else we fall foul of "Assert(!found)", either here
1810  * or during a later attempt to build the parent-level hashtable.
1811  */
1812  NotificationList *childPendingNotifies = pendingNotifies;
1813  ListCell *l;
1814 
1816  /* Insert all the subxact's events into parent, except for dups */
1817  foreach(l, childPendingNotifies->events)
1818  {
1819  Notification *childn = (Notification *) lfirst(l);
1820 
1821  if (!AsyncExistsPendingNotify(childn))
1822  AddEventToPendingNotifies(childn);
1823  }
1824  pfree(childPendingNotifies);
1825  }
1826  }
1827 }
List * events
Definition: async.c:397
static ActionList * pendingActions
Definition: async.c:357
List * list_concat(List *list1, const List *list2)
Definition: list.c:515
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2287
int nestingLevel
Definition: async.c:396
void pfree(void *pointer)
Definition: mcxt.c:1056
static NotificationList * pendingNotifies
Definition: async.c:409
List * actions
Definition: async.c:353
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2328
struct NotificationList * upper
Definition: async.c:399
int nestingLevel
Definition: async.c:352
struct ActionList * upper
Definition: async.c:354
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1876 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1877 {
1878  /*
1879  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1880  * you do here.
1881  */
1882 
1883  /* signal that work needs to be done */
1884  notifyInterruptPending = true;
1885 
1886  /* make sure the event is processed in due course */
1887  SetLatch(MyLatch);
1888 }
void SetLatch(Latch *latch)
Definition: latch.c:457
struct Latch * MyLatch
Definition: globals.c:54
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2262 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2263 {
2265  {
2267 
2268  pq_beginmessage(&buf, 'A');
2269  pq_sendint32(&buf, srcPid);
2270  pq_sendstring(&buf, channel);
2272  pq_sendstring(&buf, payload);
2273  pq_endmessage(&buf);
2274 
2275  /*
2276  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2277  * happen at the end of the transaction, and for incoming notifies
2278  * ProcessIncomingNotify will do it after finding all the notifies.
2279  */
2280  }
2281  else
2282  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2283 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog(elevel,...)
Definition: elog.h:214
CommandDest whereToSendOutput
Definition: postgres.c:91
ProtocolVersion FrontendProtocol
Definition: globals.c:28

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 880 of file async.c.

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and Trace_notify.

Referenced by CommitTransaction().

881 {
882  ListCell *p;
883 
885  return; /* no relevant statements in this xact */
886 
887  if (Trace_notify)
888  elog(DEBUG1, "PreCommit_Notify");
889 
890  /* Preflight for any pending listen/unlisten actions */
891  if (pendingActions != NULL)
892  {
893  foreach(p, pendingActions->actions)
894  {
895  ListenAction *actrec = (ListenAction *) lfirst(p);
896 
897  switch (actrec->action)
898  {
899  case LISTEN_LISTEN:
901  break;
902  case LISTEN_UNLISTEN:
903  /* there is no Exec_UnlistenPreCommit() */
904  break;
905  case LISTEN_UNLISTEN_ALL:
906  /* there is no Exec_UnlistenAllPreCommit() */
907  break;
908  }
909  }
910  }
911 
912  /* Queue any pending notifies (must happen after the above) */
913  if (pendingNotifies)
914  {
915  ListCell *nextNotify;
916 
917  /*
918  * Make sure that we have an XID assigned to the current transaction.
919  * GetCurrentTransactionId is cheap if we already have an XID, but not
920  * so cheap if we don't, and we'd prefer not to do that work while
921  * holding NotifyQueueLock.
922  */
923  (void) GetCurrentTransactionId();
924 
925  /*
926  * Serialize writers by acquiring a special lock that we hold till
927  * after commit. This ensures that queue entries appear in commit
928  * order, and in particular that there are never uncommitted queue
929  * entries ahead of committed ones, so an uncommitted transaction
930  * can't block delivery of deliverable notifications.
931  *
932  * We use a heavyweight lock so that it'll automatically be released
933  * after either commit or abort. This also allows deadlocks to be
934  * detected, though really a deadlock shouldn't be possible here.
935  *
936  * The lock is on "database 0", which is pretty ugly but it doesn't
937  * seem worth inventing a special locktag category just for this.
938  * (Historical note: before PG 9.0, a similar lock on "database 0" was
939  * used by the flatfiles mechanism.)
940  */
941  LockSharedObject(DatabaseRelationId, InvalidOid, 0,
943 
944  /* Now push the notifications into the queue */
946 
947  nextNotify = list_head(pendingNotifies->events);
948  while (nextNotify != NULL)
949  {
950  /*
951  * Add the pending notifications to the queue. We acquire and
952  * release NotifyQueueLock once per page, which might be overkill
953  * but it does allow readers to get in while we're doing this.
954  *
955  * A full queue is very uncommon and should really not happen,
956  * given that we have so much space available in the SLRU pages.
957  * Nevertheless we need to deal with this possibility. Note that
958  * when we get here we are in the process of committing our
959  * transaction, but we have not yet committed to clog, so at this
960  * point in time we can still roll the transaction back.
961  */
962  LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
964  if (asyncQueueIsFull())
965  ereport(ERROR,
966  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
967  errmsg("too many notifications in the NOTIFY queue")));
968  nextNotify = asyncQueueAddEntries(nextNotify);
969  LWLockRelease(NotifyQueueLock);
970  }
971  }
972 }
#define DEBUG1
Definition: elog.h:25
List * events
Definition: async.c:397
static ActionList * pendingActions
Definition: async.c:357
int errcode(int sqlerrcode)
Definition: elog.c:610
static bool asyncQueueIsFull(void)
Definition: async.c:1342
static void asyncQueueFillWarning(void)
Definition: async.c:1607
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
static NotificationList * pendingNotifies
Definition: async.c:409
#define ERROR
Definition: elog.h:43
static bool backendHasSentNotifications
Definition: async.c:427
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
List * actions
Definition: async.c:353
static ListCell * list_head(const List *l)
Definition: pg_list.h:125
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1444
#define lfirst(lc)
Definition: pg_list.h:190
static void Exec_ListenPreCommit(void)
Definition: async.c:1032
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
bool Trace_notify
Definition: async.c:433
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
ListenActionKind action
Definition: async.c:346

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1218 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, backendTryAdvanceTail, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1219 {
1220  MemoryContext caller_context;
1221 
1222  /* Nothing to do if we didn't send any notifications */
1224  return;
1225 
1226  /*
1227  * We reset the flag immediately; otherwise, if any sort of error occurs
1228  * below, we'd be locked up in an infinite loop, because control will come
1229  * right back here after error cleanup.
1230  */
1232 
1233  /*
1234  * We must preserve the caller's memory context (probably MessageContext)
1235  * across the transaction we do here.
1236  */
1237  caller_context = CurrentMemoryContext;
1238 
1239  if (Trace_notify)
1240  elog(DEBUG1, "ProcessCompletedNotifies");
1241 
1242  /*
1243  * We must run asyncQueueReadAllNotifications inside a transaction, else
1244  * bad things happen if it gets an error.
1245  */
1247 
1248  /* Send signals to other backends */
1249  SignalBackends();
1250 
1251  if (listenChannels != NIL)
1252  {
1253  /* Read the queue ourselves, and send relevant stuff to the frontend */
1255  }
1256 
1257  /*
1258  * If it's time to try to advance the global tail pointer, do that.
1259  */
1261  {
1262  backendTryAdvanceTail = false;
1264  }
1265 
1267 
1268  MemoryContextSwitchTo(caller_context);
1269 
1270  /* We don't need pq_flush() here since postgres.c will do one shortly */
1271 }
#define NIL
Definition: pg_list.h:65
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2917
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:325
static bool backendHasSentNotifications
Definition: async.c:427
static bool backendTryAdvanceTail
Definition: async.c:430
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void asyncQueueAdvanceTail(void)
Definition: async.c:2173
static void SignalBackends(void)
Definition: async.c:1660
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1918
void StartTransactionCommand(void)
Definition: xact.c:2816
bool Trace_notify
Definition: async.c:433
#define elog(elevel,...)
Definition: elog.h:214

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1902 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

1903 {
1905  return; /* not really idle */
1906 
1907  while (notifyInterruptPending)
1909 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4672
static void ProcessIncomingNotify(void)
Definition: async.c:2223
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:418

Variable Documentation

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending

◆ Trace_notify