PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
async.h File Reference
#include <signal.h>
#include "fmgr.h"
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define NUM_ASYNC_BUFFERS   8
 

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubStart_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void ProcessCompletedNotifies (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (void)
 

Variables

bool Trace_notify
 
volatile sig_atomic_t notifyInterruptPending
 

Macro Definition Documentation

#define NUM_ASYNC_BUFFERS   8

Definition at line 23 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().

Function Documentation

void Async_Listen ( const char *  channel)

Definition at line 635 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

636 {
637  if (Trace_notify)
638  elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
639 
640  queue_listen(LISTEN_LISTEN, channel);
641 }
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:605
bool Trace_notify
Definition: async.c:368
#define elog
Definition: elog.h:219
void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 540 of file async.c.

References AsyncExistsPendingNotify(), Notification::channel, CurTransactionContext, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, IsParallelWorker, lappend(), MemoryContextSwitchTo(), NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload, pstrdup(), and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

541 {
542  Notification *n;
543  MemoryContext oldcontext;
544 
545  if (IsParallelWorker())
546  elog(ERROR, "cannot send notifications from a parallel worker");
547 
548  if (Trace_notify)
549  elog(DEBUG1, "Async_Notify(%s)", channel);
550 
551  /* a channel name must be specified */
552  if (!channel || !strlen(channel))
553  ereport(ERROR,
554  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
555  errmsg("channel name cannot be empty")));
556 
557  if (strlen(channel) >= NAMEDATALEN)
558  ereport(ERROR,
559  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
560  errmsg("channel name too long")));
561 
562  if (payload)
563  {
564  if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
565  ereport(ERROR,
566  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
567  errmsg("payload string too long")));
568  }
569 
570  /* no point in making duplicate entries in the list ... */
571  if (AsyncExistsPendingNotify(channel, payload))
572  return;
573 
574  /*
575  * The notification list needs to live until end of transaction, so store
576  * it in the transaction context.
577  */
579 
580  n = (Notification *) palloc(sizeof(Notification));
581  n->channel = pstrdup(channel);
582  if (payload)
583  n->payload = pstrdup(payload);
584  else
585  n->payload = "";
586 
587  /*
588  * We want to preserve the order so we need to append every notification.
589  * See comments at AsyncExistsPendingNotify().
590  */
592 
593  MemoryContextSwitchTo(oldcontext);
594 }
#define DEBUG1
Definition: elog.h:25
char * pstrdup(const char *in)
Definition: mcxt.c:1165
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:49
int errcode(int sqlerrcode)
Definition: elog.c:575
char * channel
Definition: async.c:341
#define NAMEDATALEN
char * payload
Definition: async.c:342
#define ERROR
Definition: elog.h:43
static bool AsyncExistsPendingNotify(const char *channel, const char *payload)
Definition: async.c:2102
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:151
#define ereport(elevel, rest)
Definition: elog.h:122
#define IsParallelWorker()
Definition: parallel.h:53
List * lappend(List *list, void *datum)
Definition: list.c:128
bool Trace_notify
Definition: async.c:368
void * palloc(Size size)
Definition: mcxt.c:891
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static List * pendingNotifies
Definition: async.c:345
void Async_Unlisten ( const char *  channel)

Definition at line 649 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

650 {
651  if (Trace_notify)
652  elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
653 
654  /* If we couldn't possibly be listening, no need to queue anything */
656  return;
657 
658  queue_listen(LISTEN_UNLISTEN, channel);
659 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:605
static bool unlistenExitRegistered
Definition: async.c:359
static List * pendingActions
Definition: async.c:319
bool Trace_notify
Definition: async.c:368
#define elog
Definition: elog.h:219
void Async_UnlistenAll ( void  )

Definition at line 667 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

668 {
669  if (Trace_notify)
670  elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
671 
672  /* If we couldn't possibly be listening, no need to queue anything */
674  return;
675 
677 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:605
static bool unlistenExitRegistered
Definition: async.c:359
static List * pendingActions
Definition: async.c:319
bool Trace_notify
Definition: async.c:368
#define elog
Definition: elog.h:219
void AsyncShmemInit ( void  )

Definition at line 440 of file async.c.

References add_size(), AsyncCtl, asyncQueuePagePrecedes(), i, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWTRANCHE_ASYNC_BUFFERS, MaxBackends, mul_size(), NULL, NUM_ASYNC_BUFFERS, offsetof, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

441 {
442  bool found;
443  int slotno;
444  Size size;
445 
446  /*
447  * Create or attach to the AsyncQueueControl structure.
448  *
449  * The used entries in the backend[] array run from 1 to MaxBackends; the
450  * zero'th entry is unused but must be allocated.
451  */
452  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
453  size = add_size(size, offsetof(AsyncQueueControl, backend));
454 
456  ShmemInitStruct("Async Queue Control", size, &found);
457 
458  if (!found)
459  {
460  /* First time through, so initialize it */
461  int i;
462 
463  SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
464  SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
466  /* zero'th entry won't be used, but let's initialize it anyway */
467  for (i = 0; i <= MaxBackends; i++)
468  {
472  }
473  }
474 
475  /*
476  * Set up SLRU management of the pg_notify data.
477  */
478  AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
480  AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
481  /* Override default assumption that writes should be fsync'd */
482  AsyncCtl->do_fsync = false;
483 
484  if (!found)
485  {
486  /*
487  * During start or reboot, clean out the pg_notify directory.
488  */
490 
491  /* Now initialize page zero to empty */
492  LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
494  /* This write is just to verify that pg_notify/ is writable */
495  SimpleLruWritePage(AsyncCtl, slotno);
496  LWLockRelease(AsyncCtlLock);
497  }
498 }
#define QUEUE_TAIL
Definition: async.c:256
#define QUEUE_BACKEND_PID(i)
Definition: async.c:257
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1338
#define AsyncCtl
Definition: async.c:266
#define QUEUE_HEAD
Definition: async.c:255
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
#define NUM_ASYNC_BUFFERS
Definition: async.h:23
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
int MaxBackends
Definition: globals.c:126
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:191
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:573
static bool asyncQueuePagePrecedes(int p, int q)
Definition: async.c:400
static AsyncQueueControl * asyncQueueControl
Definition: async.c:253
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:258
#define InvalidOid
Definition: postgres_ext.h:36
#define NULL
Definition: c.h:226
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1361
size_t Size
Definition: c.h:353
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
int i
TimestampTz lastQueueFillWarn
Definition: async.c:248
#define QUEUE_BACKEND_POS(i)
Definition: async.c:259
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:258
#define QUEUE_POS_PAGE(x)
Definition: async.c:188
#define offsetof(type, field)
Definition: c.h:551
#define InvalidPid
Definition: miscadmin.h:31
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:164
Size AsyncShmemSize ( void  )

Definition at line 423 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, offsetof, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

424 {
425  Size size;
426 
427  /* This had better match AsyncShmemInit */
428  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
429  size = add_size(size, offsetof(AsyncQueueControl, backend));
430 
432 
433  return size;
434 }
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:144
#define NUM_ASYNC_BUFFERS
Definition: async.h:23
int MaxBackends
Definition: globals.c:126
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:353
#define offsetof(type, field)
Definition: c.h:551
void AtAbort_Notify ( void  )

Definition at line 1584 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

1585 {
1586  /*
1587  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1588  * we have registered as a listener but have not made any entry in
1589  * listenChannels. In that case, deregister again.
1590  */
1593 
1594  /* And clean up */
1596 }
#define NIL
Definition: pg_list.h:69
static List * listenChannels
Definition: async.c:294
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2150
static void asyncQueueUnregister(void)
Definition: async.c:1183
static bool amRegisteredListener
Definition: async.c:362
void AtCommit_Notify ( void  )

Definition at line 871 of file async.c.

References ListenAction::action, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

872 {
873  ListCell *p;
874 
875  /*
876  * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
877  * return as soon as possible
878  */
880  return;
881 
882  if (Trace_notify)
883  elog(DEBUG1, "AtCommit_Notify");
884 
885  /* Perform any pending listen/unlisten actions */
886  foreach(p, pendingActions)
887  {
888  ListenAction *actrec = (ListenAction *) lfirst(p);
889 
890  switch (actrec->action)
891  {
892  case LISTEN_LISTEN:
893  Exec_ListenCommit(actrec->channel);
894  break;
895  case LISTEN_UNLISTEN:
896  Exec_UnlistenCommit(actrec->channel);
897  break;
898  case LISTEN_UNLISTEN_ALL:
900  break;
901  }
902  }
903 
904  /* If no longer listening to anything, get out of listener array */
907 
908  /* And clean up */
910 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1065
static List * listenChannels
Definition: async.c:294
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2150
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1031
static List * pendingActions
Definition: async.c:319
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1004
static void asyncQueueUnregister(void)
Definition: async.c:1183
static bool amRegisteredListener
Definition: async.c:362
#define lfirst(lc)
Definition: pg_list.h:106
bool Trace_notify
Definition: async.c:368
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:316
ListenActionKind action
Definition: async.c:315
#define elog
Definition: elog.h:219
static List * pendingNotifies
Definition: async.c:345
void AtPrepare_Notify ( void  )

Definition at line 747 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

748 {
749  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
751  ereport(ERROR,
752  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
753  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
754 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:319
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
static List * pendingNotifies
Definition: async.c:345
void AtSubAbort_Notify ( void  )

Definition at line 1666 of file async.c.

References castNode, GetCurrentTransactionNestLevel(), linitial, list_delete_first(), and list_length().

Referenced by AbortSubTransaction().

1667 {
1668  int my_level = GetCurrentTransactionNestLevel();
1669 
1670  /*
1671  * All we have to do is pop the stack --- the actions/notifies made in
1672  * this subxact are no longer interesting, and the space will be freed
1673  * when CurTransactionContext is recycled.
1674  *
1675  * This routine could be called more than once at a given nesting level if
1676  * there is trouble during subxact abort. Avoid dumping core by using
1677  * GetCurrentTransactionNestLevel as the indicator of how far we need to
1678  * prune the list.
1679  */
1680  while (list_length(upperPendingActions) > my_level - 2)
1681  {
1684  }
1685 
1686  while (list_length(upperPendingNotifies) > my_level - 2)
1687  {
1690  }
1691 }
#define castNode(_type_, nodeptr)
Definition: nodes.h:577
#define linitial(l)
Definition: pg_list.h:110
static List * pendingActions
Definition: async.c:319
static List * upperPendingActions
Definition: async.c:321
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:760
static int list_length(const List *l)
Definition: pg_list.h:89
static List * upperPendingNotifies
Definition: async.c:347
Definition: pg_list.h:45
List * list_delete_first(List *list)
Definition: list.c:666
static List * pendingNotifies
Definition: async.c:345
void AtSubCommit_Notify ( void  )

Definition at line 1634 of file async.c.

References Assert, castNode, GetCurrentTransactionNestLevel(), linitial, list_concat(), list_delete_first(), and list_length().

Referenced by CommitSubTransaction().

1635 {
1636  List *parentPendingActions;
1637  List *parentPendingNotifies;
1638 
1639  parentPendingActions = castNode(List, linitial(upperPendingActions));
1641 
1644 
1645  /*
1646  * Mustn't try to eliminate duplicates here --- see queue_listen()
1647  */
1648  pendingActions = list_concat(parentPendingActions, pendingActions);
1649 
1650  parentPendingNotifies = castNode(List, linitial(upperPendingNotifies));
1652 
1655 
1656  /*
1657  * We could try to eliminate duplicates here, but it seems not worthwhile.
1658  */
1659  pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
1660 }
#define castNode(_type_, nodeptr)
Definition: nodes.h:577
List * list_concat(List *list1, List *list2)
Definition: list.c:321
#define linitial(l)
Definition: pg_list.h:110
static List * pendingActions
Definition: async.c:319
static List * upperPendingActions
Definition: async.c:321
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:760
#define Assert(condition)
Definition: c.h:671
static int list_length(const List *l)
Definition: pg_list.h:89
static List * upperPendingNotifies
Definition: async.c:347
Definition: pg_list.h:45
List * list_delete_first(List *list)
Definition: list.c:666
static List * pendingNotifies
Definition: async.c:345
void AtSubStart_Notify ( void  )

Definition at line 1604 of file async.c.

References Assert, GetCurrentTransactionNestLevel(), lcons(), list_length(), MemoryContextSwitchTo(), NIL, and TopTransactionContext.

Referenced by StartSubTransaction().

1605 {
1606  MemoryContext old_cxt;
1607 
1608  /* Keep the list-of-lists in TopTransactionContext for simplicity */
1610 
1612 
1615 
1616  pendingActions = NIL;
1617 
1619 
1622 
1623  pendingNotifies = NIL;
1624 
1625  MemoryContextSwitchTo(old_cxt);
1626 }
#define NIL
Definition: pg_list.h:69
MemoryContext TopTransactionContext
Definition: mcxt.c:48
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * pendingActions
Definition: async.c:319
static List * upperPendingActions
Definition: async.c:321
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:760
List * lcons(void *datum, List *list)
Definition: list.c:259
#define Assert(condition)
Definition: c.h:671
static int list_length(const List *l)
Definition: pg_list.h:89
static List * upperPendingNotifies
Definition: async.c:347
static List * pendingNotifies
Definition: async.c:345
void HandleNotifyInterrupt ( void  )

Definition at line 1702 of file async.c.

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

1703 {
1704  /*
1705  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1706  * you do here.
1707  */
1708 
1709  /* signal that work needs to be done */
1710  notifyInterruptPending = true;
1711 
1712  /* make sure the event is processed in due course */
1713  SetLatch(MyLatch);
1714 }
void SetLatch(volatile Latch *latch)
Definition: latch.c:380
struct Latch * MyLatch
Definition: globals.c:51
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:356
void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2077 of file async.c.

References buf, DestRemote, elog, FrontendProtocol, INFO, PG_PROTOCOL_MAJOR, pq_beginmessage(), pq_endmessage(), pq_sendint(), pq_sendstring(), and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

2078 {
2080  {
2082 
2083  pq_beginmessage(&buf, 'A');
2084  pq_sendint(&buf, srcPid, sizeof(int32));
2085  pq_sendstring(&buf, channel);
2087  pq_sendstring(&buf, payload);
2088  pq_endmessage(&buf);
2089 
2090  /*
2091  * NOTE: we do not do pq_flush() here. For a self-notify, it will
2092  * happen at the end of the transaction, and for incoming notifies
2093  * ProcessIncomingNotify will do it after finding all the notifies.
2094  */
2095  }
2096  else
2097  elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2098 }
#define INFO
Definition: elog.h:33
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
signed int int32
Definition: c.h:253
static char * buf
Definition: pg_test_fsync.c:65
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define elog
Definition: elog.h:219
CommandDest whereToSendOutput
Definition: postgres.c:86
ProtocolVersion FrontendProtocol
Definition: globals.c:27
void PreCommit_Notify ( void  )

Definition at line 772 of file async.c.

References AccessExclusiveLock, ListenAction::action, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), backendHasSentNotifications, DatabaseRelationId, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NIL, NULL, and Trace_notify.

Referenced by CommitTransaction().

773 {
774  ListCell *p;
775 
776  if (pendingActions == NIL && pendingNotifies == NIL)
777  return; /* no relevant statements in this xact */
778 
779  if (Trace_notify)
780  elog(DEBUG1, "PreCommit_Notify");
781 
782  /* Preflight for any pending listen/unlisten actions */
783  foreach(p, pendingActions)
784  {
785  ListenAction *actrec = (ListenAction *) lfirst(p);
786 
787  switch (actrec->action)
788  {
789  case LISTEN_LISTEN:
791  break;
792  case LISTEN_UNLISTEN:
793  /* there is no Exec_UnlistenPreCommit() */
794  break;
795  case LISTEN_UNLISTEN_ALL:
796  /* there is no Exec_UnlistenAllPreCommit() */
797  break;
798  }
799  }
800 
801  /* Queue any pending notifies */
802  if (pendingNotifies)
803  {
804  ListCell *nextNotify;
805 
806  /*
807  * Make sure that we have an XID assigned to the current transaction.
808  * GetCurrentTransactionId is cheap if we already have an XID, but not
809  * so cheap if we don't, and we'd prefer not to do that work while
810  * holding AsyncQueueLock.
811  */
812  (void) GetCurrentTransactionId();
813 
814  /*
815  * Serialize writers by acquiring a special lock that we hold till
816  * after commit. This ensures that queue entries appear in commit
817  * order, and in particular that there are never uncommitted queue
818  * entries ahead of committed ones, so an uncommitted transaction
819  * can't block delivery of deliverable notifications.
820  *
821  * We use a heavyweight lock so that it'll automatically be released
822  * after either commit or abort. This also allows deadlocks to be
823  * detected, though really a deadlock shouldn't be possible here.
824  *
825  * The lock is on "database 0", which is pretty ugly but it doesn't
826  * seem worth inventing a special locktag category just for this.
827  * (Historical note: before PG 9.0, a similar lock on "database 0" was
828  * used by the flatfiles mechanism.)
829  */
832 
833  /* Now push the notifications into the queue */
835 
836  nextNotify = list_head(pendingNotifies);
837  while (nextNotify != NULL)
838  {
839  /*
840  * Add the pending notifications to the queue. We acquire and
841  * release AsyncQueueLock once per page, which might be overkill
842  * but it does allow readers to get in while we're doing this.
843  *
844  * A full queue is very uncommon and should really not happen,
845  * given that we have so much space available in the SLRU pages.
846  * Nevertheless we need to deal with this possibility. Note that
847  * when we get here we are in the process of committing our
848  * transaction, but we have not yet committed to clog, so at this
849  * point in time we can still roll the transaction back.
850  */
851  LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
853  if (asyncQueueIsFull())
854  ereport(ERROR,
855  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
856  errmsg("too many notifications in the NOTIFY queue")));
857  nextNotify = asyncQueueAddEntries(nextNotify);
858  LWLockRelease(AsyncQueueLock);
859  }
860  }
861 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
#define DatabaseRelationId
Definition: pg_database.h:29
int errcode(int sqlerrcode)
Definition: elog.c:575
static bool asyncQueueIsFull(void)
Definition: async.c:1215
static void asyncQueueFillWarning(void)
Definition: async.c:1457
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
#define ERROR
Definition: elog.h:43
static List * pendingActions
Definition: async.c:319
static bool backendHasSentNotifications
Definition: async.c:365
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:416
static ListCell * list_head(const List *l)
Definition: pg_list.h:77
#define ereport(elevel, rest)
Definition: elog.h:122
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define InvalidOid
Definition: postgres_ext.h:36
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1318
#define NULL
Definition: c.h:226
#define lfirst(lc)
Definition: pg_list.h:106
static void Exec_ListenPreCommit(void)
Definition: async.c:918
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
bool Trace_notify
Definition: async.c:368
#define AccessExclusiveLock
Definition: lockdefs.h:46
int errmsg(const char *fmt,...)
Definition: elog.c:797
ListenActionKind action
Definition: async.c:315
#define elog
Definition: elog.h:219
static List * pendingNotifies
Definition: async.c:345
void ProcessCompletedNotifies ( void  )

Definition at line 1096 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

1097 {
1098  MemoryContext caller_context;
1099  bool signalled;
1100 
1101  /* Nothing to do if we didn't send any notifications */
1103  return;
1104 
1105  /*
1106  * We reset the flag immediately; otherwise, if any sort of error occurs
1107  * below, we'd be locked up in an infinite loop, because control will come
1108  * right back here after error cleanup.
1109  */
1111 
1112  /*
1113  * We must preserve the caller's memory context (probably MessageContext)
1114  * across the transaction we do here.
1115  */
1116  caller_context = CurrentMemoryContext;
1117 
1118  if (Trace_notify)
1119  elog(DEBUG1, "ProcessCompletedNotifies");
1120 
1121  /*
1122  * We must run asyncQueueReadAllNotifications inside a transaction, else
1123  * bad things happen if it gets an error.
1124  */
1126 
1127  /* Send signals to other backends */
1128  signalled = SignalBackends();
1129 
1130  if (listenChannels != NIL)
1131  {
1132  /* Read the queue ourselves, and send relevant stuff to the frontend */
1134  }
1135  else if (!signalled)
1136  {
1137  /*
1138  * If we found no other listening backends, and we aren't listening
1139  * ourselves, then we must execute asyncQueueAdvanceTail to flush the
1140  * queue, because ain't nobody else gonna do it. This prevents queue
1141  * overflow when we're sending useless notifies to nobody. (A new
1142  * listener could have joined since we looked, but if so this is
1143  * harmless.)
1144  */
1146  }
1147 
1149 
1150  MemoryContextSwitchTo(caller_context);
1151 
1152  /* We don't need pq_flush() here since postgres.c will do one shortly */
1153 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
void CommitTransactionCommand(void)
Definition: xact.c:2745
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static List * listenChannels
Definition: async.c:294
static bool backendHasSentNotifications
Definition: async.c:365
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void asyncQueueAdvanceTail(void)
Definition: async.c:1987
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1742
void StartTransactionCommand(void)
Definition: xact.c:2675
bool Trace_notify
Definition: async.c:368
#define elog
Definition: elog.h:219
static bool SignalBackends(void)
Definition: async.c:1512
void ProcessNotifyInterrupt ( void  )

Definition at line 1726 of file async.c.

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by ProcessClientReadInterrupt().

1727 {
1729  return; /* not really idle */
1730 
1731  while (notifyInterruptPending)
1733 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4320
static void ProcessIncomingNotify(void)
Definition: async.c:2038
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:356

Variable Documentation

volatile sig_atomic_t notifyInterruptPending