PostgreSQL Source Code  git master
async.h File Reference
#include <signal.h>
#include "fmgr.h"
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_ASYNC_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubStart_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void ProcessCompletedNotifies (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 

Variables

bool Trace_notify
 
volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

◆ NUM_ASYNC_BUFFERS

#define NUM_ASYNC_BUFFERS   8

Definition at line 23 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 638 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

639 {
640  if (Trace_notify)
641  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
642 
643  queue_listen(LISTEN_LISTEN, channel);
644 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:608
bool Trace_notify
Definition: async.c:370
#define elog
Definition: elog.h:219

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 543 of file async.c.

References AsyncExistsPendingNotify(), Notification::channel, CurTransactionContext, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, IsParallelWorker, lappend(), MemoryContextSwitchTo(), NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload, pstrdup(), and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

544 {
545  Notification *n;
546  MemoryContext oldcontext;
547 
548  if (IsParallelWorker())
549  elog(ERROR, "cannot send notifications from a parallel worker");
550 
551  if (Trace_notify)
552  elog(DEBUG1, "Async_Notify(%s)", channel);
553 
554  /* a channel name must be specified */
555  if (!channel || !strlen(channel))
556  ereport(ERROR,
557  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
558  errmsg("channel name cannot be empty")));
559 
560  if (strlen(channel) >= NAMEDATALEN)
561  ereport(ERROR,
562  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
563  errmsg("channel name too long")));
564 
565  if (payload)
566  {
567  if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
568  ereport(ERROR,
569  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
570  errmsg("payload string too long")));
571  }
572 
573  /* no point in making duplicate entries in the list ... */
574  if (AsyncExistsPendingNotify(channel, payload))
575  return;
576 
577  /*
578  * The notification list needs to live until end of transaction, so store
579  * it in the transaction context.
580  */
582 
583  n = (Notification *) palloc(sizeof(Notification));
584  n->channel = pstrdup(channel);
585  if (payload)
586  n->payload = pstrdup(payload);
587  else
588  n->payload = "";
589 
590  /*
591  * We want to preserve the order so we need to append every notification.
592  * See comments at AsyncExistsPendingNotify().
593  */
595 
596  MemoryContextSwitchTo(oldcontext);
597 }
#define DEBUG1
Definition: elog.h:25
char * pstrdup(const char *in)
Definition: mcxt.c:1076
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:49
int errcode(int sqlerrcode)
Definition: elog.c:575
char * channel
Definition: async.c:343
#define NAMEDATALEN
char * payload
Definition: async.c:344
#define ERROR
Definition: elog.h:43
static bool AsyncExistsPendingNotify(const char *channel, const char *payload)
Definition: async.c:2121
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:153
#define ereport(elevel, rest)
Definition: elog.h:122
#define IsParallelWorker()
Definition: parallel.h:58
List * lappend(List *list, void *datum)
Definition: list.c:128
bool Trace_notify
Definition: async.c:370
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static List * pendingNotifies
Definition: async.c:347

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 652 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

653 {
654  if (Trace_notify)
655  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
656 
657  /* If we couldn't possibly be listening, no need to queue anything */
659  return;
660 
661  queue_listen(LISTEN_UNLISTEN, channel);
662 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:608
static bool unlistenExitRegistered
Definition: async.c:361
static List * pendingActions
Definition: async.c:321
bool Trace_notify
Definition: async.c:370
#define elog
Definition: elog.h:219

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 670 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

671 {
672  if (Trace_notify)
673  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
674 
675  /* If we couldn't possibly be listening, no need to queue anything */
677  return;
678 
680 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:608
static bool unlistenExitRegistered
Definition: async.c:361
static List * pendingActions
Definition: async.c:321
bool Trace_notify
Definition: async.c:370
#define elog
Definition: elog.h:219

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 443 of file async.c.

References add_size(), AsyncCtl, asyncQueuePagePrecedes(), i, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWTRANCHE_ASYNC_BUFFERS, MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

444 {
445  bool found;
446  int slotno;
447  Size size;
448 
449  /*
450  * Create or attach to the AsyncQueueControl structure.
451  *
452  * The used entries in the backend[] array run from 1 to MaxBackends; the
453  * zero'th entry is unused but must be allocated.
454  */
455  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
456  size = add_size(size, offsetof(AsyncQueueControl, backend));
457 
459  ShmemInitStruct("Async Queue Control", size, &found);
460 
461  if (!found)
462  {
463  /* First time through, so initialize it */
464  int i;
465 
466  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
467  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
469  /* zero'th entry won't be used, but let's initialize it anyway */
470  for (i = 0; i <= MaxBackends; i++)
471  {
475  }
476  }
477 
478  /*
479  * Set up SLRU management of the pg_notify data.
480  */
481  AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
483  AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
484  /* Override default assumption that writes should be fsync'd */
485  AsyncCtl->do_fsync = false;
486 
487  if (!found)
488  {
489  /*
490  * During start or reboot, clean out the pg_notify directory.
491  */
493 
494  /* Now initialize page zero to empty */
495  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
497  /* This write is just to verify that pg_notify/ is writable */
498  SimpleLruWritePage(AsyncCtl, slotno);
499  LWLockRelease(AsyncCtlLock);
500  }
501 }
#define QUEUE_TAIL
Definition: async.c:258
#define QUEUE_BACKEND_PID(i)
Definition: async.c:259
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1353
#define AsyncCtl
Definition: async.c:268
#define QUEUE_HEAD
Definition: async.c:257
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define NUM_ASYNC_BUFFERS
Definition: async.h:23
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:126
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:193
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:578
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:403
static AsyncQueueControl * asyncQueueControl
Definition: async.c:255
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:260
#define InvalidOid
Definition: postgres_ext.h:36
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1376
size_t Size
Definition: c.h:404
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
int i
TimestampTz lastQueueFillWarn
Definition: async.c:250
#define QUEUE_BACKEND_POS(i)
Definition: async.c:261
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:263
#define QUEUE_POS_PAGE(x)
Definition: async.c:190
#define offsetof(type, field)
Definition: c.h:593
#define InvalidPid
Definition: miscadmin.h:31
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:165

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 426 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

427 {
428  Size size;
429 
430  /* This had better match AsyncShmemInit */
431  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
432  size = add_size(size, offsetof(AsyncQueueControl, backend));
433 
435 
436  return size;
437 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:145
#define NUM_ASYNC_BUFFERS
Definition: async.h:23
int MaxBackends
Definition: globals.c:126
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:404
#define offsetof(type, field)
Definition: c.h:593

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1589 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1590 {
1591  /*
1592  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1593  * we have registered as a listener but have not made any entry in
1594  * listenChannels. In that case, deregister again.
1595  */
1598 
1599  /* And clean up */
1601 }
#define NIL
Definition: pg_list.h:69
static List * listenChannels
Definition: async.c:296
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2169
static void asyncQueueUnregister(void)
Definition: async.c:1188
static bool amRegisteredListener
Definition: async.c:364

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 874 of file async.c.

References ListenAction::action, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

875 {
876  ListCell *p;
877 
878  /*
879  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
880  * return as soon as possible
881  */
883  return;
884 
885  if (Trace_notify)
886  elog(DEBUG1, "AtCommit_Notify");
887 
888  /* Perform any pending listen/unlisten actions */
889  foreach(p, pendingActions)
890  {
891  ListenAction *actrec = (ListenAction *) lfirst(p);
892 
893  switch (actrec->action)
894  {
895  case LISTEN_LISTEN:
896  Exec_ListenCommit(actrec->channel);
897  break;
898  case LISTEN_UNLISTEN:
899  Exec_UnlistenCommit(actrec->channel);
900  break;
901  case LISTEN_UNLISTEN_ALL:
903  break;
904  }
905  }
906 
907  /* If no longer listening to anything, get out of listener array */
910 
911  /* And clean up */
913 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1070
static List * listenChannels
Definition: async.c:296
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2169
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1036
static List * pendingActions
Definition: async.c:321
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1009
static void asyncQueueUnregister(void)
Definition: async.c:1188
static bool amRegisteredListener
Definition: async.c:364
#define lfirst(lc)
Definition: pg_list.h:106
bool Trace_notify
Definition: async.c:370
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:318
ListenActionKind action
Definition: async.c:317
#define elog
Definition: elog.h:219
static List * pendingNotifies
Definition: async.c:347

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 750 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

751 {
752  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
754  ereport(ERROR,
755  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
756  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
757 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:321
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
static List * pendingNotifies
Definition: async.c:347

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1671 of file async.c.

References GetCurrentTransactionNestLevel(), linitial_node, list_delete_first(), and list_length().

Referenced by AbortSubTransaction().

1672 {
1673  int my_level = GetCurrentTransactionNestLevel();
1674 
1675  /*
1676  * All we have to do is pop the stack --- the actions/notifies made in
1677  * this subxact are no longer interesting, and the space will be freed
1678  * when CurTransactionContext is recycled.
1679  *
1680  * This routine could be called more than once at a given nesting level if
1681  * there is trouble during subxact abort. Avoid dumping core by using
1682  * GetCurrentTransactionNestLevel as the indicator of how far we need to
1683  * prune the list.
1684  */
1685  while (list_length(upperPendingActions) > my_level - 2)
1686  {
1689  }
1690 
1691  while (list_length(upperPendingNotifies) > my_level - 2)
1692  {
1695  }
1696 }
#define linitial_node(type, l)
Definition: pg_list.h:114
static List * pendingActions
Definition: async.c:321
static List * upperPendingActions
Definition: async.c:323
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:754
static int list_length(const List *l)
Definition: pg_list.h:89
static List * upperPendingNotifies
Definition: async.c:349
Definition: pg_list.h:45
List * list_delete_first(List *list)
Definition: list.c:666
static List * pendingNotifies
Definition: async.c:347

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1639 of file async.c.

References Assert, GetCurrentTransactionNestLevel(), linitial_node, list_concat(), list_delete_first(), and list_length().

Referenced by CommitSubTransaction().

1640 {
1641  List *parentPendingActions;
1642  List *parentPendingNotifies;
1643 
1644  parentPendingActions = linitial_node(List, upperPendingActions);
1646 
1649 
1650  /*
1651  * Mustn't try to eliminate duplicates here --- see queue_listen()
1652  */
1653  pendingActions = list_concat(parentPendingActions, pendingActions);
1654 
1655  parentPendingNotifies = linitial_node(List, upperPendingNotifies);
1657 
1660 
1661  /*
1662  * We could try to eliminate duplicates here, but it seems not worthwhile.
1663  */
1664  pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
1665 }
List * list_concat(List *list1, List *list2)
Definition: list.c:321
#define linitial_node(type, l)
Definition: pg_list.h:114
static List * pendingActions
Definition: async.c:321
static List * upperPendingActions
Definition: async.c:323
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:754
#define Assert(condition)
Definition: c.h:670
static int list_length(const List *l)
Definition: pg_list.h:89
static List * upperPendingNotifies
Definition: async.c:349
Definition: pg_list.h:45
List * list_delete_first(List *list)
Definition: list.c:666
static List * pendingNotifies
Definition: async.c:347

◆ AtSubStart_Notify()

void AtSubStart_Notify ( void  )

Definition at line 1609 of file async.c.

References Assert, GetCurrentTransactionNestLevel(), lcons(), list_length(), MemoryContextSwitchTo(), NIL, and TopTransactionContext.

Referenced by StartSubTransaction().

1610 {
1611  MemoryContext old_cxt;
1612 
1613  /* Keep the list-of-lists in TopTransactionContext for simplicity */
1615 
1617 
1620 
1621  pendingActions = NIL;
1622 
1624 
1627 
1628  pendingNotifies = NIL;
1629 
1630  MemoryContextSwitchTo(old_cxt);
1631 }
#define NIL
Definition: pg_list.h:69
MemoryContext TopTransactionContext
Definition: mcxt.c:48
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * pendingActions
Definition: async.c:321
static List * upperPendingActions
Definition: async.c:323
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:754
List * lcons(void *datum, List *list)
Definition: list.c:259
#define Assert(condition)
Definition: c.h:670
static int list_length(const List *l)
Definition: pg_list.h:89
static List * upperPendingNotifies
Definition: async.c:349
static List * pendingNotifies
Definition: async.c:347

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1707 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1708 {
1709  /*
1710  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1711  * you do here.
1712  */
1713 
1714  /* signal that work needs to be done */
1715  notifyInterruptPending = true;
1716 
1717  /* make sure the event is processed in due course */
1718  SetLatch(MyLatch);
1719 }
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
struct Latch * MyLatch
Definition: globals.c:52
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:358

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2096 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2097 {
2099  {
2101 
2102  pq_beginmessage(&buf, 'A');
2103  pq_sendint32(&buf, srcPid);
2104  pq_sendstring(&buf, channel);
2106  pq_sendstring(&buf, payload);
2107  pq_endmessage(&buf);
2108 
2109  /*
2110  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2111  * happen at the end of the transaction, and for incoming notifies
2112  * ProcessIncomingNotify will do it after finding all the notifies.
2113  */
2114  }
2115  else
2116  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2117 }
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define elog
Definition: elog.h:219
CommandDest whereToSendOutput
Definition: postgres.c:88
ProtocolVersion FrontendProtocol
Definition: globals.c:27

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 775 of file async.c.

References AccessExclusiveLock, ListenAction::action, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DatabaseRelationId, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NIL, and Trace_notify.

Referenced by CommitTransaction().

776 {
777  ListCell *p;
778 
779  if (pendingActions == NIL && pendingNotifies == NIL)
780  return; /* no relevant statements in this xact */
781 
782  if (Trace_notify)
783  elog(DEBUG1, "PreCommit_Notify");
784 
785  /* Preflight for any pending listen/unlisten actions */
786  foreach(p, pendingActions)
787  {
788  ListenAction *actrec = (ListenAction *) lfirst(p);
789 
790  switch (actrec->action)
791  {
792  case LISTEN_LISTEN:
794  break;
795  case LISTEN_UNLISTEN:
796  /* there is no Exec_UnlistenPreCommit() */
797  break;
798  case LISTEN_UNLISTEN_ALL:
799  /* there is no Exec_UnlistenAllPreCommit() */
800  break;
801  }
802  }
803 
804  /* Queue any pending notifies (must happen after the above) */
805  if (pendingNotifies)
806  {
807  ListCell *nextNotify;
808 
809  /*
810  * Make sure that we have an XID assigned to the current transaction.
811  * GetCurrentTransactionId is cheap if we already have an XID, but not
812  * so cheap if we don't, and we'd prefer not to do that work while
813  * holding AsyncQueueLock.
814  */
815  (void) GetCurrentTransactionId();
816 
817  /*
818  * Serialize writers by acquiring a special lock that we hold till
819  * after commit. This ensures that queue entries appear in commit
820  * order, and in particular that there are never uncommitted queue
821  * entries ahead of committed ones, so an uncommitted transaction
822  * can't block delivery of deliverable notifications.
823  *
824  * We use a heavyweight lock so that it'll automatically be released
825  * after either commit or abort. This also allows deadlocks to be
826  * detected, though really a deadlock shouldn't be possible here.
827  *
828  * The lock is on "database 0", which is pretty ugly but it doesn't
829  * seem worth inventing a special locktag category just for this.
830  * (Historical note: before PG 9.0, a similar lock on "database 0" was
831  * used by the flatfiles mechanism.)
832  */
835 
836  /* Now push the notifications into the queue */
838 
839  nextNotify = list_head(pendingNotifies);
840  while (nextNotify != NULL)
841  {
842  /*
843  * Add the pending notifications to the queue. We acquire and
844  * release AsyncQueueLock once per page, which might be overkill
845  * but it does allow readers to get in while we're doing this.
846  *
847  * A full queue is very uncommon and should really not happen,
848  * given that we have so much space available in the SLRU pages.
849  * Nevertheless we need to deal with this possibility. Note that
850  * when we get here we are in the process of committing our
851  * transaction, but we have not yet committed to clog, so at this
852  * point in time we can still roll the transaction back.
853  */
854  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
856  if (asyncQueueIsFull())
857  ereport(ERROR,
858  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
859  errmsg("too many notifications in the NOTIFY queue")));
860  nextNotify = asyncQueueAddEntries(nextNotify);
861  LWLockRelease(AsyncQueueLock);
862  }
863  }
864 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
#define DatabaseRelationId
Definition: pg_database.h:29
int errcode(int sqlerrcode)
Definition: elog.c:575
static bool asyncQueueIsFull(void)
Definition: async.c:1220
static void asyncQueueFillWarning(void)
Definition: async.c:1462
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:321
static bool backendHasSentNotifications
Definition: async.c:367
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
static ListCell * list_head(const List *l)
Definition: pg_list.h:77
#define ereport(elevel, rest)
Definition: elog.h:122
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define InvalidOid
Definition: postgres_ext.h:36
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1323
#define lfirst(lc)
Definition: pg_list.h:106
static void Exec_ListenPreCommit(void)
Definition: async.c:921
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
bool Trace_notify
Definition: async.c:370
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:797
ListenActionKind action
Definition: async.c:317
#define elog
Definition: elog.h:219
static List * pendingNotifies
Definition: async.c:347

◆ ProcessCompletedNotifies()

void ProcessCompletedNotifies ( void  )

Definition at line 1101 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1102 {
1103  MemoryContext caller_context;
1104  bool signalled;
1105 
1106  /* Nothing to do if we didn't send any notifications */
1108  return;
1109 
1110  /*
1111  * We reset the flag immediately; otherwise, if any sort of error occurs
1112  * below, we'd be locked up in an infinite loop, because control will come
1113  * right back here after error cleanup.
1114  */
1116 
1117  /*
1118  * We must preserve the caller's memory context (probably MessageContext)
1119  * across the transaction we do here.
1120  */
1121  caller_context = CurrentMemoryContext;
1122 
1123  if (Trace_notify)
1124  elog(DEBUG1, "ProcessCompletedNotifies");
1125 
1126  /*
1127  * We must run asyncQueueReadAllNotifications inside a transaction, else
1128  * bad things happen if it gets an error.
1129  */
1131 
1132  /* Send signals to other backends */
1133  signalled = SignalBackends();
1134 
1135  if (listenChannels != NIL)
1136  {
1137  /* Read the queue ourselves, and send relevant stuff to the frontend */
1139  }
1140  else if (!signalled)
1141  {
1142  /*
1143  * If we found no other listening backends, and we aren't listening
1144  * ourselves, then we must execute asyncQueueAdvanceTail to flush the
1145  * queue, because ain't nobody else gonna do it. This prevents queue
1146  * overflow when we're sending useless notifies to nobody. (A new
1147  * listener could have joined since we looked, but if so this is
1148  * harmless.)
1149  */
1151  }
1152 
1154 
1155  MemoryContextSwitchTo(caller_context);
1156 
1157  /* We don't need pq_flush() here since postgres.c will do one shortly */
1158 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2744
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:296
static bool backendHasSentNotifications
Definition: async.c:367
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void asyncQueueAdvanceTail(void)
Definition: async.c:2006
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1747
void StartTransactionCommand(void)
Definition: xact.c:2673
bool Trace_notify
Definition: async.c:370
#define elog
Definition: elog.h:219
static bool SignalBackends(void)
Definition: async.c:1517

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( void  )

Definition at line 1731 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by ProcessClientReadInterrupt().

1732 {
1734  return; /* not really idle */
1735 
1736  while (notifyInterruptPending)
1738 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4465
static void ProcessIncomingNotify(void)
Definition: async.c:2057
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:358

Variable Documentation

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending

◆ Trace_notify