PostgreSQL Source Code git master
Loading...
Searching...
No Matches
async.h File Reference
#include <signal.h>
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void AtPrepare_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void AsyncNotifyFreezeXids (TransactionId newFrozenXid)
 

Variables

PGDLLIMPORT bool Trace_notify
 
PGDLLIMPORT int max_notify_queue_pages
 
PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending
 

Function Documentation

◆ Async_Listen()

void Async_Listen ( const char channel)
extern

Definition at line 1035 of file async.c.

1036{
1037 if (Trace_notify)
1038 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1039
1040 queue_listen(LISTEN_LISTEN, channel);
1041}
@ LISTEN_LISTEN
Definition async.c:426
bool Trace_notify
Definition async.c:567
static void queue_listen(ListenActionKind action, const char *channel)
Definition async.c:988
#define DEBUG1
Definition elog.h:30
#define elog(elevel,...)
Definition elog.h:226
int MyProcPid
Definition globals.c:47

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char channel,
const char payload 
)
extern

Definition at line 886 of file async.c.

887{
888 int my_level = GetCurrentTransactionNestLevel();
889 size_t channel_len;
890 size_t payload_len;
891 Notification *n;
892 MemoryContext oldcontext;
893
894 if (IsParallelWorker())
895 elog(ERROR, "cannot send notifications from a parallel worker");
896
897 if (Trace_notify)
898 elog(DEBUG1, "Async_Notify(%s)", channel);
899
900 channel_len = channel ? strlen(channel) : 0;
901 payload_len = payload ? strlen(payload) : 0;
902
903 /* a channel name must be specified */
904 if (channel_len == 0)
907 errmsg("channel name cannot be empty")));
908
909 /* enforce length limits */
910 if (channel_len >= NAMEDATALEN)
913 errmsg("channel name too long")));
914
915 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
918 errmsg("payload string too long")));
919
920 /*
921 * We must construct the Notification entry, even if we end up not using
922 * it, in order to compare it cheaply to existing list entries.
923 *
924 * The notification list needs to live until end of transaction, so store
925 * it in the transaction context.
926 */
928
930 channel_len + payload_len + 2);
931 n->channel_len = channel_len;
932 n->payload_len = payload_len;
933 strcpy(n->data, channel);
934 if (payload)
935 strcpy(n->data + channel_len + 1, payload);
936 else
937 n->data[channel_len + 1] = '\0';
938
939 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
940 {
942
943 /*
944 * First notify event in current (sub)xact. Note that we allocate the
945 * NotificationList in TopTransactionContext; the nestingLevel might
946 * get changed later by AtSubCommit_Notify.
947 */
950 sizeof(NotificationList));
951 notifies->nestingLevel = my_level;
952 notifies->events = list_make1(n);
953 /* We certainly don't need a hashtable yet */
954 notifies->hashtab = NULL;
955 /* We won't build uniqueChannelNames/Hash till later, either */
956 notifies->uniqueChannelNames = NIL;
957 notifies->uniqueChannelHash = NULL;
958 notifies->upper = pendingNotifies;
960 }
961 else
962 {
963 /* Now check for duplicates */
965 {
966 /* It's a dup, so forget it */
967 pfree(n);
968 MemoryContextSwitchTo(oldcontext);
969 return;
970 }
971
972 /* Append more events to existing list */
974 }
975
976 MemoryContextSwitchTo(oldcontext);
977}
static bool AsyncExistsPendingNotify(Notification *n)
Definition async.c:3117
static NotificationList * pendingNotifies
Definition async.c:520
static void AddEventToPendingNotifies(Notification *n)
Definition async.c:3158
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition async.c:200
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define IsParallelWorker()
Definition parallel.h:62
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext TopTransactionContext
Definition mcxt.c:171
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurTransactionContext
Definition mcxt.c:172
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
#define NAMEDATALEN
const void * data
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
static int fb(int x)
uint16 payload_len
Definition async.c:498
char data[FLEXIBLE_ARRAY_MEMBER]
Definition async.c:500
uint16 channel_len
Definition async.c:497
int GetCurrentTransactionNestLevel(void)
Definition xact.c:930

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, fb(), GetCurrentTransactionNestLevel(), IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NIL, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char channel)
extern

Definition at line 1049 of file async.c.

1050{
1051 if (Trace_notify)
1052 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1053
1054 /* If we couldn't possibly be listening, no need to queue anything */
1056 return;
1057
1058 queue_listen(LISTEN_UNLISTEN, channel);
1059}
static ActionList * pendingActions
Definition async.c:444
@ LISTEN_UNLISTEN
Definition async.c:427
static bool unlistenExitRegistered
Definition async.c:541

References DEBUG1, elog, fb(), LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )
extern

Definition at line 1067 of file async.c.

1068{
1069 if (Trace_notify)
1070 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1071
1072 /* If we couldn't possibly be listening, no need to queue anything */
1074 return;
1075
1077}
@ LISTEN_UNLISTEN_ALL
Definition async.c:428

References DEBUG1, elog, fb(), LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ AsyncNotifyFreezeXids()

void AsyncNotifyFreezeXids ( TransactionId  newFrozenXid)
extern

Definition at line 2945 of file async.c.

2946{
2947 QueuePosition pos;
2948 QueuePosition head;
2949 int64 curpage = -1;
2950 int slotno = -1;
2951 char *page_buffer = NULL;
2952 bool page_dirty = false;
2953
2954 /*
2955 * Acquire locks in the correct order to avoid deadlocks. As per the
2956 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2957 * bank locks.
2958 *
2959 * We only need SHARED mode since we're just reading the head/tail
2960 * positions, not modifying them.
2961 */
2964
2965 pos = QUEUE_TAIL;
2966 head = QUEUE_HEAD;
2967
2968 /* Release NotifyQueueLock early, we only needed to read the positions */
2970
2971 /*
2972 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2973 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2974 * we're working.
2975 */
2976 while (!QUEUE_POS_EQUAL(pos, head))
2977 {
2979 TransactionId xid;
2980 int64 pageno = QUEUE_POS_PAGE(pos);
2981 int offset = QUEUE_POS_OFFSET(pos);
2982
2983 /* If we need a different page, release old lock and get new one */
2984 if (pageno != curpage)
2985 {
2986 LWLock *lock;
2987
2988 /* Release previous page if any */
2989 if (slotno >= 0)
2990 {
2991 if (page_dirty)
2992 {
2993 NotifyCtl->shared->page_dirty[slotno] = true;
2994 page_dirty = false;
2995 }
2997 }
2998
2999 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3001 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3003 page_buffer = NotifyCtl->shared->page_buffer[slotno];
3004 curpage = pageno;
3005 }
3006
3007 qe = (AsyncQueueEntry *) (page_buffer + offset);
3008 xid = qe->xid;
3009
3010 if (TransactionIdIsNormal(xid) &&
3012 {
3013 if (TransactionIdDidCommit(xid))
3014 {
3015 qe->xid = FrozenTransactionId;
3016 page_dirty = true;
3017 }
3018 else
3019 {
3020 qe->xid = InvalidTransactionId;
3021 page_dirty = true;
3022 }
3023 }
3024
3025 /* Advance to next entry */
3026 asyncQueueAdvance(&pos, qe->length);
3027 }
3028
3029 /* Release final page lock if we acquired one */
3030 if (slotno >= 0)
3031 {
3032 if (page_dirty)
3033 NotifyCtl->shared->page_dirty[slotno] = true;
3035 }
3036
3038}
#define QUEUE_POS_OFFSET(x)
Definition async.c:238
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition async.c:1966
#define QUEUE_TAIL
Definition async.c:349
#define QUEUE_POS_PAGE(x)
Definition async.c:237
#define NotifyCtl
Definition async.c:364
#define QUEUE_HEAD
Definition async.c:348
#define QUEUE_POS_EQUAL(x, y)
Definition async.c:246
int64_t int64
Definition c.h:555
uint32 TransactionId
Definition c.h:678
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition slru.c:527
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:160
bool TransactionIdDidCommit(TransactionId transactionId)
Definition transam.c:126
#define FrozenTransactionId
Definition transam.h:33
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References asyncQueueAdvance(), fb(), FrozenTransactionId, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruGetBankLock(), SimpleLruReadPage(), TransactionIdDidCommit(), TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by vac_truncate_clog().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )
extern

Definition at line 793 of file async.c.

794{
795 bool found;
796 Size size;
797
798 /*
799 * Create or attach to the AsyncQueueControl structure.
800 */
801 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
802 size = add_size(size, offsetof(AsyncQueueControl, backend));
803
805 ShmemInitStruct("Async Queue Control", size, &found);
806
807 if (!found)
808 {
809 /* First time through, so initialize it */
812 QUEUE_STOP_PAGE = 0;
817 for (int i = 0; i < MaxBackends; i++)
818 {
825 }
826 }
827
828 /*
829 * Set up SLRU management of the pg_notify data. Note that long segment
830 * names are used in order to avoid wraparound.
831 */
832 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
835 SYNC_HANDLER_NONE, true);
836
837 if (!found)
838 {
839 /*
840 * During start or reboot, clean out the pg_notify directory.
841 */
843 }
844}
#define QUEUE_FIRST_LISTENER
Definition async.c:351
#define QUEUE_BACKEND_IS_ADVANCING(i)
Definition async.c:357
#define QUEUE_BACKEND_POS(i)
Definition async.c:355
#define SET_QUEUE_POS(x, y, z)
Definition async.c:240
static AsyncQueueControl * asyncQueueControl
Definition async.c:346
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition async.c:628
#define QUEUE_BACKEND_PID(i)
Definition async.c:352
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
Definition async.c:356
#define QUEUE_NEXT_LISTENER(i)
Definition async.c:354
#define QUEUE_BACKEND_DBOID(i)
Definition async.c:353
#define QUEUE_STOP_PAGE
Definition async.c:350
size_t Size
Definition c.h:631
#define DSA_HANDLE_INVALID
Definition dsa.h:139
#define DSHASH_HANDLE_INVALID
Definition dshash.h:27
int MaxBackends
Definition globals.c:146
int notify_buffers
Definition globals.c:164
int i
Definition isn.c:77
#define InvalidPid
Definition miscadmin.h:32
#define InvalidOid
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:252
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1816
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1769
dshash_table_handle globalChannelTableDSH
Definition async.c:341
TimestampTz lastQueueFillWarn
Definition async.c:339
dsa_handle globalChannelTableDSA
Definition async.c:340
@ SYNC_HANDLER_NONE
Definition sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, fb(), AsyncQueueControl::globalChannelTableDSA, AsyncQueueControl::globalChannelTableDSH, i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_IS_ADVANCING, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_BACKEND_WAKEUP_PENDING, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )
extern

Definition at line 776 of file async.c.

777{
778 Size size;
779
780 /* This had better match AsyncShmemInit */
781 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
782 size = add_size(size, offsetof(AsyncQueueControl, backend));
783
785
786 return size;
787}
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:198

References add_size(), fb(), MaxBackends, mul_size(), notify_buffers, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )
extern

Definition at line 2411 of file async.c.

2412{
2413 /* Revert staged listen/unlisten changes */
2415
2416 /* If we're no longer listening on anything, unregister */
2419
2420 /* And clean up */
2422}
static void ApplyPendingListenActions(bool isCommit)
Definition async.c:1713
static void ClearPendingActionsAndNotifies(void)
Definition async.c:3272
static bool amRegisteredListener
Definition async.c:544
static void asyncQueueUnregister(void)
Definition async.c:1908
#define LocalChannelTableIsEmpty()
Definition async.c:411

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and LocalChannelTableIsEmpty.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )
extern

Definition at line 1370 of file async.c.

1371{
1372 /*
1373 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1374 * return as soon as possible
1375 */
1377 return;
1378
1379 if (Trace_notify)
1380 elog(DEBUG1, "AtCommit_Notify");
1381
1382 /* Apply staged listen/unlisten changes */
1384
1385 /* If no longer listening to anything, get out of listener array */
1388
1389 /*
1390 * Send signals to listening backends. We need do this only if there are
1391 * pending notifies, which were previously added to the shared queue by
1392 * PreCommit_Notify().
1393 */
1394 if (pendingNotifies != NULL)
1396
1397 /*
1398 * If it's time to try to advance the global tail pointer, do that.
1399 *
1400 * (It might seem odd to do this in the sender, when more than likely the
1401 * listeners won't yet have read the messages we just sent. However,
1402 * there's less contention if only the sender does it, and there is little
1403 * need for urgency in advancing the global tail. So this typically will
1404 * be clearing out messages that were sent some time ago.)
1405 */
1406 if (tryAdvanceTail)
1407 {
1408 tryAdvanceTail = false;
1410 }
1411
1412 /* And clean up */
1414}
static void SignalBackends(void)
Definition async.c:2259
static bool tryAdvanceTail
Definition async.c:564
static void asyncQueueAdvanceTail(void)
Definition async.c:2863

References amRegisteredListener, ApplyPendingListenActions(), asyncQueueAdvanceTail(), asyncQueueUnregister(), ClearPendingActionsAndNotifies(), DEBUG1, elog, fb(), LocalChannelTableIsEmpty, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )
extern

Definition at line 1152 of file async.c.

1153{
1154 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1156 ereport(ERROR,
1158 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1159}

References ereport, errcode(), errmsg(), ERROR, fb(), pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )
extern

Definition at line 2500 of file async.c.

2501{
2502 int my_level = GetCurrentTransactionNestLevel();
2503
2504 /*
2505 * All we have to do is pop the stack --- the actions/notifies made in
2506 * this subxact are no longer interesting, and the space will be freed
2507 * when CurTransactionContext is recycled. We still have to free the
2508 * ActionList and NotificationList objects themselves, though, because
2509 * those are allocated in TopTransactionContext.
2510 *
2511 * Note that there might be no entries at all, or no entries for the
2512 * current subtransaction level, either because none were ever created, or
2513 * because we reentered this routine due to trouble during subxact abort.
2514 */
2515 while (pendingActions != NULL &&
2516 pendingActions->nestingLevel >= my_level)
2517 {
2519
2522 }
2523
2524 while (pendingNotifies != NULL &&
2525 pendingNotifies->nestingLevel >= my_level)
2526 {
2528
2531 }
2532}
int nestingLevel
Definition async.c:439
struct ActionList * upper
Definition async.c:441
struct NotificationList * upper
Definition async.c:510

References fb(), GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )
extern

Definition at line 2430 of file async.c.

2431{
2432 int my_level = GetCurrentTransactionNestLevel();
2433
2434 /* If there are actions at our nesting level, we must reparent them. */
2435 if (pendingActions != NULL &&
2436 pendingActions->nestingLevel >= my_level)
2437 {
2438 if (pendingActions->upper == NULL ||
2439 pendingActions->upper->nestingLevel < my_level - 1)
2440 {
2441 /* nothing to merge; give the whole thing to the parent */
2443 }
2444 else
2445 {
2447
2449
2450 /*
2451 * Mustn't try to eliminate duplicates here --- see queue_listen()
2452 */
2455 childPendingActions->actions);
2457 }
2458 }
2459
2460 /* If there are notifies at our nesting level, we must reparent them. */
2461 if (pendingNotifies != NULL &&
2462 pendingNotifies->nestingLevel >= my_level)
2463 {
2464 Assert(pendingNotifies->nestingLevel == my_level);
2465
2466 if (pendingNotifies->upper == NULL ||
2467 pendingNotifies->upper->nestingLevel < my_level - 1)
2468 {
2469 /* nothing to merge; give the whole thing to the parent */
2471 }
2472 else
2473 {
2474 /*
2475 * Formerly, we didn't bother to eliminate duplicates here, but
2476 * now we must, else we fall foul of "Assert(!found)", either here
2477 * or during a later attempt to build the parent-level hashtable.
2478 */
2480 ListCell *l;
2481
2483 /* Insert all the subxact's events into parent, except for dups */
2484 foreach(l, childPendingNotifies->events)
2485 {
2487
2490 }
2492 }
2493 }
2494}
#define Assert(condition)
Definition c.h:885
List * list_concat(List *list1, const List *list2)
Definition list.c:561
#define lfirst(lc)
Definition pg_list.h:172
List * actions
Definition async.c:440

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), fb(), GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )
extern

Definition at line 2543 of file async.c.

2544{
2545 /*
2546 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2547 * you do here.
2548 */
2549
2550 /* signal that work needs to be done */
2552
2553 /* make sure the event is processed in due course */
2555}
volatile sig_atomic_t notifyInterruptPending
Definition async.c:538
struct Latch * MyLatch
Definition globals.c:63
void SetLatch(Latch *latch)
Definition latch.c:290

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char channel,
const char payload,
int32  srcPid 
)
extern

Definition at line 3093 of file async.c.

3094{
3096 {
3098
3100 pq_sendint32(&buf, srcPid);
3101 pq_sendstring(&buf, channel);
3102 pq_sendstring(&buf, payload);
3104
3105 /*
3106 * NOTE: we do not do pq_flush() here. Some level of caller will
3107 * handle it later, allowing this message to be combined into a packet
3108 * with other ones.
3109 */
3110 }
3111 else
3112 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3113}
@ DestRemote
Definition dest.h:89
#define INFO
Definition elog.h:34
static char buf[DEFAULT_XLOG_SEG_SIZE]
CommandDest whereToSendOutput
Definition postgres.c:93
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
#define PqMsg_NotificationResponse
Definition protocol.h:41

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )
extern

Definition at line 1177 of file async.c.

1178{
1179 ListCell *p;
1180
1182 return; /* no relevant statements in this xact */
1183
1184 if (Trace_notify)
1185 elog(DEBUG1, "PreCommit_Notify");
1186
1187 /* Preflight for any pending listen/unlisten actions */
1189
1190 if (pendingActions != NULL)
1191 {
1192 /* Ensure we have a local channel table */
1194 /* Create pendingListenActions hash table for this transaction */
1196
1197 /* Stage all the actions this transaction wants to perform */
1198 foreach(p, pendingActions->actions)
1199 {
1201
1202 switch (actrec->action)
1203 {
1204 case LISTEN_LISTEN:
1207 break;
1208 case LISTEN_UNLISTEN:
1210 break;
1213 break;
1214 }
1215 }
1216 }
1217
1218 /* Queue any pending notifies (must happen after the above) */
1219 if (pendingNotifies)
1220 {
1222 bool firstIteration = true;
1223
1224 /*
1225 * Build list of unique channel names being notified for use by
1226 * SignalBackends().
1227 *
1228 * If uniqueChannelHash is available, use it to efficiently get the
1229 * unique channels. Otherwise, fall back to the O(N^2) approach.
1230 */
1233 {
1234 HASH_SEQ_STATUS status;
1236
1238 while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1241 channelEntry->channel);
1242 }
1243 else
1244 {
1245 /* O(N^2) approach is better for small number of notifications */
1247 {
1248 char *channel = n->data;
1249 bool found = false;
1250
1251 /* Name present in list? */
1253 {
1254 if (strcmp(oldchan, channel) == 0)
1255 {
1256 found = true;
1257 break;
1258 }
1259 }
1260 /* Add if not already in list */
1261 if (!found)
1264 channel);
1265 }
1266 }
1267
1268 /* Preallocate workspace that will be needed by SignalBackends() */
1269 if (signalPids == NULL)
1271 MaxBackends * sizeof(int32));
1272
1273 if (signalProcnos == NULL)
1275 MaxBackends * sizeof(ProcNumber));
1276
1277 /*
1278 * Make sure that we have an XID assigned to the current transaction.
1279 * GetCurrentTransactionId is cheap if we already have an XID, but not
1280 * so cheap if we don't, and we'd prefer not to do that work while
1281 * holding NotifyQueueLock.
1282 */
1284
1285 /*
1286 * Serialize writers by acquiring a special lock that we hold till
1287 * after commit. This ensures that queue entries appear in commit
1288 * order, and in particular that there are never uncommitted queue
1289 * entries ahead of committed ones, so an uncommitted transaction
1290 * can't block delivery of deliverable notifications.
1291 *
1292 * We use a heavyweight lock so that it'll automatically be released
1293 * after either commit or abort. This also allows deadlocks to be
1294 * detected, though really a deadlock shouldn't be possible here.
1295 *
1296 * The lock is on "database 0", which is pretty ugly but it doesn't
1297 * seem worth inventing a special locktag category just for this.
1298 * (Historical note: before PG 9.0, a similar lock on "database 0" was
1299 * used by the flatfiles mechanism.)
1300 */
1303
1304 /*
1305 * For the direct advancement optimization in SignalBackends(), we
1306 * need to ensure that no other backend can insert queue entries
1307 * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1308 * heavyweight lock above provides this guarantee, since it serializes
1309 * all writers.
1310 *
1311 * Note: if the heavyweight lock were ever removed for scalability
1312 * reasons, we could achieve the same guarantee by holding
1313 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1314 * than releasing and reacquiring it for each page as we do below.
1315 */
1316
1317 /* Initialize values to a safe default in case list is empty */
1320
1321 /* Now push the notifications into the queue */
1323 while (nextNotify != NULL)
1324 {
1325 /*
1326 * Add the pending notifications to the queue. We acquire and
1327 * release NotifyQueueLock once per page, which might be overkill
1328 * but it does allow readers to get in while we're doing this.
1329 *
1330 * A full queue is very uncommon and should really not happen,
1331 * given that we have so much space available in the SLRU pages.
1332 * Nevertheless we need to deal with this possibility. Note that
1333 * when we get here we are in the process of committing our
1334 * transaction, but we have not yet committed to clog, so at this
1335 * point in time we can still roll the transaction back.
1336 */
1338 if (firstIteration)
1339 {
1341 firstIteration = false;
1342 }
1344 if (asyncQueueIsFull())
1345 ereport(ERROR,
1347 errmsg("too many notifications in the NOTIFY queue")));
1351 }
1352
1353 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1354 }
1355}
static void PrepareTableEntriesForListen(const char *channel)
Definition async.c:1523
static void BecomeRegisteredListener(void)
Definition async.c:1422
static int32 * signalPids
Definition async.c:560
static void PrepareTableEntriesForUnlisten(const char *channel)
Definition async.c:1626
static ProcNumber * signalProcnos
Definition async.c:561
static QueuePosition queueHeadAfterWrite
Definition async.c:553
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition async.c:2035
static void PrepareTableEntriesForUnlistenAll(void)
Definition async.c:1656
static void asyncQueueFillWarning(void)
Definition async.c:2207
static void initGlobalChannelTable(void)
Definition async.c:677
static bool asyncQueueIsFull(void)
Definition async.c:1951
static void initLocalChannelTable(void)
Definition async.c:728
static void initPendingListenActions(void)
Definition async.c:754
static QueuePosition queueHeadBeforeWrite
Definition async.c:552
int32_t int32
Definition c.h:554
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
List * lappend(List *list, void *datum)
Definition list.c:339
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
MemoryContext TopMemoryContext
Definition mcxt.c:166
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static ListCell * list_head(const List *l)
Definition pg_list.h:128
int ProcNumber
Definition procnumber.h:24
List * uniqueChannelNames
Definition async.c:508
HTAB * uniqueChannelHash
Definition async.c:509
List * events
Definition async.c:506
TransactionId GetCurrentTransactionId(void)
Definition xact.c:455

References AccessExclusiveLock, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), BecomeRegisteredListener(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, fb(), foreach_ptr, GetCurrentTransactionId(), hash_seq_init(), hash_seq_search(), initGlobalChannelTable(), initLocalChannelTable(), initPendingListenActions(), InvalidOid, lappend(), lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MemoryContextAlloc(), NIL, pendingActions, pendingNotifies, PrepareTableEntriesForListen(), PrepareTableEntriesForUnlisten(), PrepareTableEntriesForUnlistenAll(), QUEUE_HEAD, queueHeadAfterWrite, queueHeadBeforeWrite, SET_QUEUE_POS, signalPids, signalProcnos, TopMemoryContext, Trace_notify, NotificationList::uniqueChannelHash, and NotificationList::uniqueChannelNames.

Referenced by CommitTransaction().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)
extern

Definition at line 2573 of file async.c.

2574{
2576 return; /* not really idle */
2577
2578 /* Loop in case another signal arrives while sending messages */
2580 ProcessIncomingNotify(flush);
2581}
static void ProcessIncomingNotify(bool flush)
Definition async.c:3052
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

Variable Documentation

◆ max_notify_queue_pages

PGDLLIMPORT int max_notify_queue_pages
extern

Definition at line 570 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ notifyInterruptPending

◆ Trace_notify