PostgreSQL Source Code git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 

Functions

static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 189 of file async.c.

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 397 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 163 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 310 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 301 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 300 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 303 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 238 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 299 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 313 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 296 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 302 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 311 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 209 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 212 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition: async.c:476
int y
Definition: isn.c:71
int x
Definition: isn.c:70

Definition at line 222 of file async.c.

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 216 of file async.c.

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 201 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 200 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 298 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 297 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 187 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 203 of file async.c.

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 332 of file async.c.

333{
ListenActionKind
Definition: async.c:333
@ LISTEN_LISTEN
Definition: async.c:334
@ LISTEN_UNLISTEN_ALL
Definition: async.c:336
@ LISTEN_UNLISTEN
Definition: async.c:335

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2289 of file async.c.

2290{
2292
2293 /* Create the hash table if it's time to */
2295 pendingNotifies->hashtab == NULL)
2296 {
2297 HASHCTL hash_ctl;
2298 ListCell *l;
2299
2300 /* Create the hash table */
2301 hash_ctl.keysize = sizeof(Notification *);
2302 hash_ctl.entrysize = sizeof(struct NotificationHash);
2303 hash_ctl.hash = notification_hash;
2304 hash_ctl.match = notification_match;
2305 hash_ctl.hcxt = CurTransactionContext;
2307 hash_create("Pending Notifies",
2308 256L,
2309 &hash_ctl,
2311
2312 /* Insert all the already-existing events */
2313 foreach(l, pendingNotifies->events)
2314 {
2315 Notification *oldn = (Notification *) lfirst(l);
2316 bool found;
2317
2319 &oldn,
2320 HASH_ENTER,
2321 &found);
2322 Assert(!found);
2323 }
2324 }
2325
2326 /* Add new event to the list, in order */
2328
2329 /* Add event to the hash table if needed */
2330 if (pendingNotifies->hashtab != NULL)
2331 {
2332 bool found;
2333
2335 &n,
2336 HASH_ENTER,
2337 &found);
2338 Assert(!found);
2339 }
2340}
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:397
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2348
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2362
static NotificationList * pendingNotifies
Definition: async.c:404
#define Assert(condition)
Definition: c.h:815
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_COMPARE
Definition: hsearch.h:99
#define HASH_FUNCTION
Definition: hsearch.h:98
List * lappend(List *list, void *datum)
Definition: list.c:339
MemoryContext CurTransactionContext
Definition: mcxt.c:155
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
Size keysize
Definition: hsearch.h:75
HashValueFunc hash
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:76
HashCompareFunc match
Definition: hsearch.h:80
MemoryContext hcxt
Definition: hsearch.h:86
HTAB * hashtab
Definition: async.c:393
List * events
Definition: async.c:392

References Assert, CurTransactionContext, HASHCTL::entrysize, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), notification_match(), and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 738 of file async.c.

739{
740 if (Trace_notify)
741 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
742
743 queue_listen(LISTEN_LISTEN, channel);
744}
bool Trace_notify
Definition: async.c:425
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:690
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:225
int MyProcPid
Definition: globals.c:46

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 591 of file async.c.

592{
593 int my_level = GetCurrentTransactionNestLevel();
594 size_t channel_len;
595 size_t payload_len;
596 Notification *n;
597 MemoryContext oldcontext;
598
599 if (IsParallelWorker())
600 elog(ERROR, "cannot send notifications from a parallel worker");
601
602 if (Trace_notify)
603 elog(DEBUG1, "Async_Notify(%s)", channel);
604
605 channel_len = channel ? strlen(channel) : 0;
606 payload_len = payload ? strlen(payload) : 0;
607
608 /* a channel name must be specified */
609 if (channel_len == 0)
611 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
612 errmsg("channel name cannot be empty")));
613
614 /* enforce length limits */
615 if (channel_len >= NAMEDATALEN)
617 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
618 errmsg("channel name too long")));
619
620 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
622 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
623 errmsg("payload string too long")));
624
625 /*
626 * We must construct the Notification entry, even if we end up not using
627 * it, in order to compare it cheaply to existing list entries.
628 *
629 * The notification list needs to live until end of transaction, so store
630 * it in the transaction context.
631 */
633
634 n = (Notification *) palloc(offsetof(Notification, data) +
635 channel_len + payload_len + 2);
636 n->channel_len = channel_len;
637 n->payload_len = payload_len;
638 strcpy(n->data, channel);
639 if (payload)
640 strcpy(n->data + channel_len + 1, payload);
641 else
642 n->data[channel_len + 1] = '\0';
643
644 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
645 {
646 NotificationList *notifies;
647
648 /*
649 * First notify event in current (sub)xact. Note that we allocate the
650 * NotificationList in TopTransactionContext; the nestingLevel might
651 * get changed later by AtSubCommit_Notify.
652 */
653 notifies = (NotificationList *)
655 sizeof(NotificationList));
656 notifies->nestingLevel = my_level;
657 notifies->events = list_make1(n);
658 /* We certainly don't need a hashtable yet */
659 notifies->hashtab = NULL;
660 notifies->upper = pendingNotifies;
661 pendingNotifies = notifies;
662 }
663 else
664 {
665 /* Now check for duplicates */
667 {
668 /* It's a dup, so forget it */
669 pfree(n);
670 MemoryContextSwitchTo(oldcontext);
671 return;
672 }
673
674 /* Append more events to existing list */
676 }
677
678 MemoryContextSwitchTo(oldcontext);
679}
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2248
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2289
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:163
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsParallelWorker()
Definition: parallel.h:60
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
MemoryContext TopTransactionContext
Definition: mcxt.c:154
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define NAMEDATALEN
const void * data
#define list_make1(x1)
Definition: pg_list.h:212
int nestingLevel
Definition: async.c:391
struct NotificationList * upper
Definition: async.c:394
uint16 payload_len
Definition: async.c:384
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:386
uint16 channel_len
Definition: async.c:383
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:928

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 752 of file async.c.

753{
754 if (Trace_notify)
755 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
756
757 /* If we couldn't possibly be listening, no need to queue anything */
759 return;
760
762}
static ActionList * pendingActions
Definition: async.c:352
static bool unlistenExitRegistered
Definition: async.c:416

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 770 of file async.c.

771{
772 if (Trace_notify)
773 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
774
775 /* If we couldn't possibly be listening, no need to queue anything */
777 return;
778
780}

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 823 of file async.c.

824{
827}
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1194
static void asyncQueueUnregister(void)
Definition: async.c:1231

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2248 of file async.c.

2249{
2250 if (pendingNotifies == NULL)
2251 return false;
2252
2253 if (pendingNotifies->hashtab != NULL)
2254 {
2255 /* Use the hash table to probe for a match */
2257 &n,
2258 HASH_FIND,
2259 NULL))
2260 return true;
2261 }
2262 else
2263 {
2264 /* Must scan the event list */
2265 ListCell *l;
2266
2267 foreach(l, pendingNotifies->events)
2268 {
2269 Notification *oldn = (Notification *) lfirst(l);
2270
2271 if (n->channel_len == oldn->channel_len &&
2272 n->payload_len == oldn->payload_len &&
2273 memcmp(n->data, oldn->data,
2274 n->channel_len + n->payload_len + 2) == 0)
2275 return true;
2276 }
2277 }
2278
2279 return false;
2280}
@ HASH_FIND
Definition: hsearch.h:113

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1356 of file async.c.

1357{
1358 AsyncQueueEntry qe;
1359 QueuePosition queue_head;
1360 int64 pageno;
1361 int offset;
1362 int slotno;
1363 LWLock *prevlock;
1364
1365 /*
1366 * We work with a local copy of QUEUE_HEAD, which we write back to shared
1367 * memory upon exiting. The reason for this is that if we have to advance
1368 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1369 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1370 * subsequent insertions would try to put entries into a page that slru.c
1371 * thinks doesn't exist yet.) So, use a local position variable. Note
1372 * that if we do fail, any already-inserted queue entries are forgotten;
1373 * this is okay, since they'd be useless anyway after our transaction
1374 * rolls back.
1375 */
1376 queue_head = QUEUE_HEAD;
1377
1378 /*
1379 * If this is the first write since the postmaster started, we need to
1380 * initialize the first page of the async SLRU. Otherwise, the current
1381 * page should be initialized already, so just fetch it.
1382 */
1383 pageno = QUEUE_POS_PAGE(queue_head);
1384 prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
1385
1386 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
1387 LWLockAcquire(prevlock, LW_EXCLUSIVE);
1388
1389 if (QUEUE_POS_IS_ZERO(queue_head))
1390 slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1391 else
1392 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1394
1395 /* Note we mark the page dirty before writing in it */
1396 NotifyCtl->shared->page_dirty[slotno] = true;
1397
1398 while (nextNotify != NULL)
1399 {
1400 Notification *n = (Notification *) lfirst(nextNotify);
1401
1402 /* Construct a valid queue entry in local variable qe */
1404
1405 offset = QUEUE_POS_OFFSET(queue_head);
1406
1407 /* Check whether the entry really fits on the current page */
1408 if (offset + qe.length <= QUEUE_PAGESIZE)
1409 {
1410 /* OK, so advance nextNotify past this item */
1411 nextNotify = lnext(pendingNotifies->events, nextNotify);
1412 }
1413 else
1414 {
1415 /*
1416 * Write a dummy entry to fill up the page. Actually readers will
1417 * only check dboid and since it won't match any reader's database
1418 * OID, they will ignore this entry and move on.
1419 */
1420 qe.length = QUEUE_PAGESIZE - offset;
1421 qe.dboid = InvalidOid;
1422 qe.data[0] = '\0'; /* empty channel */
1423 qe.data[1] = '\0'; /* empty payload */
1424 }
1425
1426 /* Now copy qe into the shared buffer page */
1427 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1428 &qe,
1429 qe.length);
1430
1431 /* Advance queue_head appropriately, and detect if page is full */
1432 if (asyncQueueAdvance(&(queue_head), qe.length))
1433 {
1434 LWLock *lock;
1435
1436 pageno = QUEUE_POS_PAGE(queue_head);
1437 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
1438 if (lock != prevlock)
1439 {
1440 LWLockRelease(prevlock);
1442 prevlock = lock;
1443 }
1444
1445 /*
1446 * Page is full, so we're done here, but first fill the next page
1447 * with zeroes. The reason to do this is to ensure that slru.c's
1448 * idea of the head page is always the same as ours, which avoids
1449 * boundary problems in SimpleLruTruncate. The test in
1450 * asyncQueueIsFull() ensured that there is room to create this
1451 * page without overrunning the queue.
1452 */
1453 slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1454
1455 /*
1456 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1457 * set flag to remember that we should try to advance the tail
1458 * pointer (we don't want to actually do that right here).
1459 */
1460 if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1461 tryAdvanceTail = true;
1462
1463 /* And exit the loop */
1464 break;
1465 }
1466 }
1467
1468 /* Success, so update the global QUEUE_HEAD */
1469 QUEUE_HEAD = queue_head;
1470
1471 LWLockRelease(prevlock);
1472
1473 return nextNotify;
1474}
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1320
static bool tryAdvanceTail
Definition: async.c:422
#define QUEUE_POS_OFFSET(x)
Definition: async.c:201
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1287
#define QUEUE_POS_PAGE(x)
Definition: async.c:200
#define QUEUE_CLEANUP_DELAY
Definition: async.c:238
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:212
#define NotifyCtl
Definition: async.c:310
#define QUEUE_PAGESIZE
Definition: async.c:311
#define QUEUE_HEAD
Definition: async.c:296
int64_t int64
Definition: c.h:485
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:343
#define InvalidOid
Definition: postgres_ext.h:37
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:502
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:375
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition: slru.h:175
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:183
Definition: lwlock.h:42
#define InvalidTransactionId
Definition: transam.h:31

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1287 of file async.c.

1288{
1289 int64 pageno = QUEUE_POS_PAGE(*position);
1290 int offset = QUEUE_POS_OFFSET(*position);
1291 bool pageJump = false;
1292
1293 /*
1294 * Move to the next writing position: First jump over what we have just
1295 * written or read.
1296 */
1297 offset += entryLength;
1298 Assert(offset <= QUEUE_PAGESIZE);
1299
1300 /*
1301 * In a second step check if another entry can possibly be written to the
1302 * page. If so, stay here, we have reached the next position. If not, then
1303 * we need to move on to the next page.
1304 */
1306 {
1307 pageno++;
1308 offset = 0;
1309 pageJump = true;
1310 }
1311
1312 SET_QUEUE_POS(*position, pageno, offset);
1313 return pageJump;
1314}
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:203
#define AsyncQueueEntryEmptySize
Definition: async.c:189
#define QUEUEALIGN(len)
Definition: async.c:187

References Assert, AsyncQueueEntryEmptySize, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2108 of file async.c.

2109{
2110 QueuePosition min;
2111 int64 oldtailpage;
2112 int64 newtailpage;
2113 int64 boundary;
2114
2115 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2116 LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2117
2118 /*
2119 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2120 * (ie, exactly match at least one backend's queue position), so it must
2121 * be updated atomically with the actual computation. Since v13, we could
2122 * get away with not doing it like that, but it seems prudent to keep it
2123 * so.
2124 *
2125 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2126 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2127 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2128 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2129 * there are pages we can truncate but haven't yet finished doing so.
2130 *
2131 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2132 * performing SimpleLruTruncate. This is OK because no backend will try
2133 * to access the pages we are in the midst of truncating.
2134 */
2135 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2136 min = QUEUE_HEAD;
2138 {
2140 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2141 }
2142 QUEUE_TAIL = min;
2143 oldtailpage = QUEUE_STOP_PAGE;
2144 LWLockRelease(NotifyQueueLock);
2145
2146 /*
2147 * We can truncate something if the global tail advanced across an SLRU
2148 * segment boundary.
2149 *
2150 * XXX it might be better to truncate only once every several segments, to
2151 * reduce the number of directory scans.
2152 */
2153 newtailpage = QUEUE_POS_PAGE(min);
2154 boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2155 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2156 {
2157 /*
2158 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2159 * release the lock again.
2160 */
2161 SimpleLruTruncate(NotifyCtl, newtailpage);
2162
2163 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2164 QUEUE_STOP_PAGE = newtailpage;
2165 LWLockRelease(NotifyQueueLock);
2166 }
2167
2168 LWLockRelease(NotifyQueueTailLock);
2169}
#define QUEUE_FIRST_LISTENER
Definition: async.c:299
#define QUEUE_POS_MIN(x, y)
Definition: async.c:216
#define QUEUE_BACKEND_POS(i)
Definition: async.c:303
#define QUEUE_TAIL
Definition: async.c:297
#define QUEUE_BACKEND_PID(i)
Definition: async.c:300
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:302
#define QUEUE_STOP_PAGE
Definition: async.c:298
int i
Definition: isn.c:72
#define InvalidPid
Definition: miscadmin.h:32
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int ProcNumber
Definition: procnumber.h:24
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1408
#define SLRU_PAGES_PER_SEGMENT
Definition: slru.h:39

References Assert, asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1527 of file async.c.

1528{
1529 double fillDegree;
1530 TimestampTz t;
1531
1532 fillDegree = asyncQueueUsage();
1533 if (fillDegree < 0.5)
1534 return;
1535
1536 t = GetCurrentTimestamp();
1537
1540 {
1542 int32 minPid = InvalidPid;
1543
1545 {
1547 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1549 minPid = QUEUE_BACKEND_PID(i);
1550 }
1551
1553 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1554 (minPid != InvalidPid ?
1555 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1556 : 0),
1557 (minPid != InvalidPid ?
1558 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1559 : 0)));
1560
1562 }
1563}
static double asyncQueueUsage(void)
Definition: async.c:1506
static AsyncQueueControl * asyncQueueControl
Definition: async.c:294
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:313
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:209
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1780
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
int32_t int32
Definition: c.h:484
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
#define WARNING
Definition: elog.h:36
TimestampTz lastQueueFillWarn
Definition: async.c:290

References Assert, asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1272 of file async.c.

1273{
1274 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1275 int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1276 int64 occupied = headPage - tailPage;
1277
1278 return occupied >= max_notify_queue_pages;
1279}
int max_notify_queue_pages
Definition: async.c:428

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1320 of file async.c.

1321{
1322 size_t channellen = n->channel_len;
1323 size_t payloadlen = n->payload_len;
1324 int entryLength;
1325
1326 Assert(channellen < NAMEDATALEN);
1327 Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1328
1329 /* The terminators are already included in AsyncQueueEntryEmptySize */
1330 entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1331 entryLength = QUEUEALIGN(entryLength);
1332 qe->length = entryLength;
1333 qe->dboid = MyDatabaseId;
1335 qe->srcPid = MyProcPid;
1336 memcpy(qe->data, n->data, channellen + payloadlen + 2);
1337}
Oid MyDatabaseId
Definition: globals.c:93
int32 srcPid
Definition: async.c:182
TransactionId xid
Definition: async.c:181
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:453

References Assert, AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, AsyncQueueEntry::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 466 of file async.c.

467{
468 return p - q;
469}

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 476 of file async.c.

477{
478 return p < q;
479}

Referenced by asyncQueueAdvanceTail(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( volatile QueuePosition current,
QueuePosition  stop,
char *  page_buffer,
Snapshot  snapshot 
)
static

Definition at line 2016 of file async.c.

2020{
2021 bool reachedStop = false;
2022 bool reachedEndOfPage;
2023 AsyncQueueEntry *qe;
2024
2025 do
2026 {
2027 QueuePosition thisentry = *current;
2028
2029 if (QUEUE_POS_EQUAL(thisentry, stop))
2030 break;
2031
2032 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2033
2034 /*
2035 * Advance *current over this message, possibly to the next page. As
2036 * noted in the comments for asyncQueueReadAllNotifications, we must
2037 * do this before possibly failing while processing the message.
2038 */
2039 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2040
2041 /* Ignore messages destined for other databases */
2042 if (qe->dboid == MyDatabaseId)
2043 {
2044 if (XidInMVCCSnapshot(qe->xid, snapshot))
2045 {
2046 /*
2047 * The source transaction is still in progress, so we can't
2048 * process this message yet. Break out of the loop, but first
2049 * back up *current so we will reprocess the message next
2050 * time. (Note: it is unlikely but not impossible for
2051 * TransactionIdDidCommit to fail, so we can't really avoid
2052 * this advance-then-back-up behavior when dealing with an
2053 * uncommitted message.)
2054 *
2055 * Note that we must test XidInMVCCSnapshot before we test
2056 * TransactionIdDidCommit, else we might return a message from
2057 * a transaction that is not yet visible to snapshots; compare
2058 * the comments at the head of heapam_visibility.c.
2059 *
2060 * Also, while our own xact won't be listed in the snapshot,
2061 * we need not check for TransactionIdIsCurrentTransactionId
2062 * because our transaction cannot (yet) have queued any
2063 * messages.
2064 */
2065 *current = thisentry;
2066 reachedStop = true;
2067 break;
2068 }
2069 else if (TransactionIdDidCommit(qe->xid))
2070 {
2071 /* qe->data is the null-terminated channel name */
2072 char *channel = qe->data;
2073
2074 if (IsListeningOn(channel))
2075 {
2076 /* payload follows channel name */
2077 char *payload = qe->data + strlen(channel) + 1;
2078
2079 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2080 }
2081 }
2082 else
2083 {
2084 /*
2085 * The source transaction aborted or crashed, so we just
2086 * ignore its notifications.
2087 */
2088 }
2089 }
2090
2091 /* Loop back if we're not at end of page */
2092 } while (!reachedEndOfPage);
2093
2094 if (QUEUE_POS_EQUAL(*current, stop))
2095 reachedStop = true;
2096
2097 return reachedStop;
2098}
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2224
static bool IsListeningOn(const char *channel)
Definition: async.c:1212
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:1800
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1851 of file async.c.

1852{
1853 volatile QueuePosition pos;
1854 QueuePosition head;
1855 Snapshot snapshot;
1856
1857 /* page_buffer must be adequately aligned, so use a union */
1858 union
1859 {
1860 char buf[QUEUE_PAGESIZE];
1861 AsyncQueueEntry align;
1862 } page_buffer;
1863
1864 /* Fetch current state */
1865 LWLockAcquire(NotifyQueueLock, LW_SHARED);
1866 /* Assert checks that we have a valid state entry */
1869 head = QUEUE_HEAD;
1870 LWLockRelease(NotifyQueueLock);
1871
1872 if (QUEUE_POS_EQUAL(pos, head))
1873 {
1874 /* Nothing to do, we have read all notifications already. */
1875 return;
1876 }
1877
1878 /*----------
1879 * Get snapshot we'll use to decide which xacts are still in progress.
1880 * This is trickier than it might seem, because of race conditions.
1881 * Consider the following example:
1882 *
1883 * Backend 1: Backend 2:
1884 *
1885 * transaction starts
1886 * UPDATE foo SET ...;
1887 * NOTIFY foo;
1888 * commit starts
1889 * queue the notify message
1890 * transaction starts
1891 * LISTEN foo; -- first LISTEN in session
1892 * SELECT * FROM foo WHERE ...;
1893 * commit to clog
1894 * commit starts
1895 * add backend 2 to array of listeners
1896 * advance to queue head (this code)
1897 * commit to clog
1898 *
1899 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1900 * wasn't committed yet. Ideally we'd ensure that client 2 would
1901 * eventually get transaction 1's notify message, but there's no way
1902 * to do that; until we're in the listener array, there's no guarantee
1903 * that the notify message doesn't get removed from the queue.
1904 *
1905 * Therefore the coding technique transaction 2 is using is unsafe:
1906 * applications must commit a LISTEN before inspecting database state,
1907 * if they want to ensure they will see notifications about subsequent
1908 * changes to that state.
1909 *
1910 * What we do guarantee is that we'll see all notifications from
1911 * transactions committing after the snapshot we take here.
1912 * Exec_ListenPreCommit has already added us to the listener array,
1913 * so no not-yet-committed messages can be removed from the queue
1914 * before we see them.
1915 *----------
1916 */
1917 snapshot = RegisterSnapshot(GetLatestSnapshot());
1918
1919 /*
1920 * It is possible that we fail while trying to send a message to our
1921 * frontend (for example, because of encoding conversion failure). If
1922 * that happens it is critical that we not try to send the same message
1923 * over and over again. Therefore, we place a PG_TRY block here that will
1924 * forcibly advance our queue position before we lose control to an error.
1925 * (We could alternatively retake NotifyQueueLock and move the position
1926 * before handling each individual message, but that seems like too much
1927 * lock traffic.)
1928 */
1929 PG_TRY();
1930 {
1931 bool reachedStop;
1932
1933 do
1934 {
1935 int64 curpage = QUEUE_POS_PAGE(pos);
1936 int curoffset = QUEUE_POS_OFFSET(pos);
1937 int slotno;
1938 int copysize;
1939
1940 /*
1941 * We copy the data from SLRU into a local buffer, so as to avoid
1942 * holding the SLRU lock while we are examining the entries and
1943 * possibly transmitting them to our frontend. Copy only the part
1944 * of the page we will actually inspect.
1945 */
1946 slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
1948 if (curpage == QUEUE_POS_PAGE(head))
1949 {
1950 /* we only want to read as far as head */
1951 copysize = QUEUE_POS_OFFSET(head) - curoffset;
1952 if (copysize < 0)
1953 copysize = 0; /* just for safety */
1954 }
1955 else
1956 {
1957 /* fetch all the rest of the page */
1958 copysize = QUEUE_PAGESIZE - curoffset;
1959 }
1960 memcpy(page_buffer.buf + curoffset,
1961 NotifyCtl->shared->page_buffer[slotno] + curoffset,
1962 copysize);
1963 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
1965
1966 /*
1967 * Process messages up to the stop position, end of page, or an
1968 * uncommitted message.
1969 *
1970 * Our stop position is what we found to be the head's position
1971 * when we entered this function. It might have changed already.
1972 * But if it has, we will receive (or have already received and
1973 * queued) another signal and come here again.
1974 *
1975 * We are not holding NotifyQueueLock here! The queue can only
1976 * extend beyond the head pointer (see above) and we leave our
1977 * backend's pointer where it is so nobody will truncate or
1978 * rewrite pages under us. Especially we don't want to hold a lock
1979 * while sending the notifications to the frontend.
1980 */
1981 reachedStop = asyncQueueProcessPageEntries(&pos, head,
1982 page_buffer.buf,
1983 snapshot);
1984 } while (!reachedStop);
1985 }
1986 PG_FINALLY();
1987 {
1988 /* Update shared state */
1989 LWLockAcquire(NotifyQueueLock, LW_SHARED);
1991 LWLockRelease(NotifyQueueLock);
1992 }
1993 PG_END_TRY();
1994
1995 /* Done with snapshot */
1996 UnregisterSnapshot(snapshot);
1997}
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
Definition: async.c:2016
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define PG_FINALLY(...)
Definition: elog.h:388
ProcNumber MyProcNumber
Definition: globals.c:89
@ LW_SHARED
Definition: lwlock.h:115
static char * buf
Definition: pg_test_fsync.c:72
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:605
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:283
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:794
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:752

References Assert, asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1231 of file async.c.

1232{
1233 Assert(listenChannels == NIL); /* else caller error */
1234
1235 if (!amRegisteredListener) /* nothing to do */
1236 return;
1237
1238 /*
1239 * Need exclusive lock here to manipulate list links.
1240 */
1241 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1242 /* Mark our entry as invalid */
1245 /* and remove it from the list */
1248 else
1249 {
1251 {
1253 {
1255 break;
1256 }
1257 }
1258 }
1260 LWLockRelease(NotifyQueueLock);
1261
1262 /* mark ourselves as no longer listed in the global array */
1263 amRegisteredListener = false;
1264}
static List * listenChannels
Definition: async.c:320
static bool amRegisteredListener
Definition: async.c:419
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:301

References amRegisteredListener, Assert, i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, listenChannels, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1506 of file async.c.

1507{
1508 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1509 int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1510 int64 occupied = headPage - tailPage;
1511
1512 if (occupied == 0)
1513 return (double) 0; /* fast exit for common case */
1514
1515 return (double) occupied / (double) max_notify_queue_pages;
1516}

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 502 of file async.c.

503{
504 bool found;
505 Size size;
506
507 /*
508 * Create or attach to the AsyncQueueControl structure.
509 */
511 size = add_size(size, offsetof(AsyncQueueControl, backend));
512
514 ShmemInitStruct("Async Queue Control", size, &found);
515
516 if (!found)
517 {
518 /* First time through, so initialize it */
521 QUEUE_STOP_PAGE = 0;
524 for (int i = 0; i < MaxBackends; i++)
525 {
530 }
531 }
532
533 /*
534 * Set up SLRU management of the pg_notify data. Note that long segment
535 * names are used in order to avoid wraparound.
536 */
537 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
540 SYNC_HANDLER_NONE, true);
541
542 if (!found)
543 {
544 /*
545 * During start or reboot, clean out the pg_notify directory.
546 */
548 }
549}
size_t Size
Definition: c.h:562
int MaxBackends
Definition: globals.c:145
int notify_buffers
Definition: globals.c:163
@ LWTRANCHE_NOTIFY_SLRU
Definition: lwlock.h:213
@ LWTRANCHE_NOTIFY_BUFFER
Definition: lwlock.h:184
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
static pg_noinline void Size size
Definition: slab.c:607
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:252
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1791
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1744
@ SYNC_HANDLER_NONE
Definition: sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), size, SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 485 of file async.c.

486{
487 Size size;
488
489 /* This had better match AsyncShmemInit */
491 size = add_size(size, offsetof(AsyncQueueControl, backend));
492
494
495 return size;
496}
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:199

References add_size(), MaxBackends, mul_size(), notify_buffers, SimpleLruShmemSize(), and size.

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1671 of file async.c.

1672{
1673 /*
1674 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1675 * we have registered as a listener but have not made any entry in
1676 * listenChannels. In that case, deregister again.
1677 */
1680
1681 /* And clean up */
1683}
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2378

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), listenChannels, and NIL.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 968 of file async.c.

969{
970 ListCell *p;
971
972 /*
973 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
974 * return as soon as possible
975 */
977 return;
978
979 if (Trace_notify)
980 elog(DEBUG1, "AtCommit_Notify");
981
982 /* Perform any pending listen/unlisten actions */
983 if (pendingActions != NULL)
984 {
985 foreach(p, pendingActions->actions)
986 {
987 ListenAction *actrec = (ListenAction *) lfirst(p);
988
989 switch (actrec->action)
990 {
991 case LISTEN_LISTEN:
992 Exec_ListenCommit(actrec->channel);
993 break;
994 case LISTEN_UNLISTEN:
996 break;
999 break;
1000 }
1001 }
1002 }
1003
1004 /* If no longer listening to anything, get out of listener array */
1007
1008 /*
1009 * Send signals to listening backends. We need do this only if there are
1010 * pending notifies, which were previously added to the shared queue by
1011 * PreCommit_Notify().
1012 */
1013 if (pendingNotifies != NULL)
1015
1016 /*
1017 * If it's time to try to advance the global tail pointer, do that.
1018 *
1019 * (It might seem odd to do this in the sender, when more than likely the
1020 * listeners won't yet have read the messages we just sent. However,
1021 * there's less contention if only the sender does it, and there is little
1022 * need for urgency in advancing the global tail. So this typically will
1023 * be clearing out messages that were sent some time ago.)
1024 */
1025 if (tryAdvanceTail)
1026 {
1027 tryAdvanceTail = false;
1029 }
1030
1031 /* And clean up */
1033}
static void SignalBackends(void)
Definition: async.c:1581
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1136
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1163
static void asyncQueueAdvanceTail(void)
Definition: async.c:2108
List * actions
Definition: async.c:348
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:342
ListenActionKind action
Definition: async.c:341

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 836 of file async.c.

837{
838 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
841 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
843}

References ereport, errcode(), errmsg(), ERROR, pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1761 of file async.c.

1762{
1763 int my_level = GetCurrentTransactionNestLevel();
1764
1765 /*
1766 * All we have to do is pop the stack --- the actions/notifies made in
1767 * this subxact are no longer interesting, and the space will be freed
1768 * when CurTransactionContext is recycled. We still have to free the
1769 * ActionList and NotificationList objects themselves, though, because
1770 * those are allocated in TopTransactionContext.
1771 *
1772 * Note that there might be no entries at all, or no entries for the
1773 * current subtransaction level, either because none were ever created, or
1774 * because we reentered this routine due to trouble during subxact abort.
1775 */
1776 while (pendingActions != NULL &&
1777 pendingActions->nestingLevel >= my_level)
1778 {
1779 ActionList *childPendingActions = pendingActions;
1780
1782 pfree(childPendingActions);
1783 }
1784
1785 while (pendingNotifies != NULL &&
1786 pendingNotifies->nestingLevel >= my_level)
1787 {
1788 NotificationList *childPendingNotifies = pendingNotifies;
1789
1791 pfree(childPendingNotifies);
1792 }
1793}
int nestingLevel
Definition: async.c:347
struct ActionList * upper
Definition: async.c:349

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1691 of file async.c.

1692{
1693 int my_level = GetCurrentTransactionNestLevel();
1694
1695 /* If there are actions at our nesting level, we must reparent them. */
1696 if (pendingActions != NULL &&
1697 pendingActions->nestingLevel >= my_level)
1698 {
1699 if (pendingActions->upper == NULL ||
1700 pendingActions->upper->nestingLevel < my_level - 1)
1701 {
1702 /* nothing to merge; give the whole thing to the parent */
1704 }
1705 else
1706 {
1707 ActionList *childPendingActions = pendingActions;
1708
1710
1711 /*
1712 * Mustn't try to eliminate duplicates here --- see queue_listen()
1713 */
1716 childPendingActions->actions);
1717 pfree(childPendingActions);
1718 }
1719 }
1720
1721 /* If there are notifies at our nesting level, we must reparent them. */
1722 if (pendingNotifies != NULL &&
1723 pendingNotifies->nestingLevel >= my_level)
1724 {
1725 Assert(pendingNotifies->nestingLevel == my_level);
1726
1727 if (pendingNotifies->upper == NULL ||
1728 pendingNotifies->upper->nestingLevel < my_level - 1)
1729 {
1730 /* nothing to merge; give the whole thing to the parent */
1732 }
1733 else
1734 {
1735 /*
1736 * Formerly, we didn't bother to eliminate duplicates here, but
1737 * now we must, else we fall foul of "Assert(!found)", either here
1738 * or during a later attempt to build the parent-level hashtable.
1739 */
1740 NotificationList *childPendingNotifies = pendingNotifies;
1741 ListCell *l;
1742
1744 /* Insert all the subxact's events into parent, except for dups */
1745 foreach(l, childPendingNotifies->events)
1746 {
1747 Notification *childn = (Notification *) lfirst(l);
1748
1749 if (!AsyncExistsPendingNotify(childn))
1751 }
1752 pfree(childPendingNotifies);
1753 }
1754 }
1755}
List * list_concat(List *list1, const List *list2)
Definition: list.c:561

References ActionList::actions, AddEventToPendingNotifies(), Assert, AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ check_notify_buffers()

bool check_notify_buffers ( int *  newval,
void **  extra,
GucSource  source 
)

Definition at line 2394 of file async.c.

2395{
2396 return check_slru_buffers("notify_buffers", newval);
2397}
#define newval
bool check_slru_buffers(const char *name, int *newval)
Definition: slru.c:355

References check_slru_buffers(), and newval.

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2378 of file async.c.

2379{
2380 /*
2381 * Everything's allocated in either TopTransactionContext or the context
2382 * for the subtransaction to which it corresponds. So, there's nothing to
2383 * do here except reset the pointers; the space will be reclaimed when the
2384 * contexts are deleted.
2385 */
2386 pendingActions = NULL;
2387 pendingNotifies = NULL;
2388}

References pendingActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1136 of file async.c.

1137{
1138 MemoryContext oldcontext;
1139
1140 /* Do nothing if we are already listening on this channel */
1141 if (IsListeningOn(channel))
1142 return;
1143
1144 /*
1145 * Add the new channel name to listenChannels.
1146 *
1147 * XXX It is theoretically possible to get an out-of-memory failure here,
1148 * which would be bad because we already committed. For the moment it
1149 * doesn't seem worth trying to guard against that, but maybe improve this
1150 * later.
1151 */
1154 MemoryContextSwitchTo(oldcontext);
1155}
char * pstrdup(const char *in)
Definition: mcxt.c:1696
MemoryContext TopMemoryContext
Definition: mcxt.c:149

References IsListeningOn(), lappend(), listenChannels, MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1041 of file async.c.

1042{
1043 QueuePosition head;
1044 QueuePosition max;
1045 ProcNumber prevListener;
1046
1047 /*
1048 * Nothing to do if we are already listening to something, nor if we
1049 * already ran this routine in this transaction.
1050 */
1052 return;
1053
1054 if (Trace_notify)
1055 elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1056
1057 /*
1058 * Before registering, make sure we will unlisten before dying. (Note:
1059 * this action does not get undone if we abort later.)
1060 */
1062 {
1065 }
1066
1067 /*
1068 * This is our first LISTEN, so establish our pointer.
1069 *
1070 * We set our pointer to the global tail pointer and then move it forward
1071 * over already-committed notifications. This ensures we cannot miss any
1072 * not-yet-committed notifications. We might get a few more but that
1073 * doesn't hurt.
1074 *
1075 * In some scenarios there might be a lot of committed notifications that
1076 * have not yet been pruned away (because some backend is being lazy about
1077 * reading them). To reduce our startup time, we can look at other
1078 * backends and adopt the maximum "pos" pointer of any backend that's in
1079 * our database; any notifications it's already advanced over are surely
1080 * committed and need not be re-examined by us. (We must consider only
1081 * backends connected to our DB, because others will not have bothered to
1082 * check committed-ness of notifications in our DB.)
1083 *
1084 * We need exclusive lock here so we can look at other backends' entries
1085 * and manipulate the list links.
1086 */
1087 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1088 head = QUEUE_HEAD;
1089 max = QUEUE_TAIL;
1090 prevListener = INVALID_PROC_NUMBER;
1092 {
1094 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1095 /* Also find last listening backend before this one */
1096 if (i < MyProcNumber)
1097 prevListener = i;
1098 }
1102 /* Insert backend into list of listeners at correct position */
1103 if (prevListener != INVALID_PROC_NUMBER)
1104 {
1106 QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
1107 }
1108 else
1109 {
1112 }
1113 LWLockRelease(NotifyQueueLock);
1114
1115 /* Now we are listed in the global array, so remember we're listening */
1116 amRegisteredListener = true;
1117
1118 /*
1119 * Try to move our pointer forward as far as possible. This will skip
1120 * over already-committed notifications, which we want to do because they
1121 * might be quite stale. Note that we are not yet listening on anything,
1122 * so we won't deliver such notifications to our frontend. Also, although
1123 * our transaction might have executed NOTIFY, those message(s) aren't
1124 * queued yet so we won't skip them here.
1125 */
1126 if (!QUEUE_POS_EQUAL(max, head))
1128}
#define QUEUE_POS_MAX(x, y)
Definition: async.c:222
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1851
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:823
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1194 of file async.c.

1195{
1196 if (Trace_notify)
1197 elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1198
1201}
void list_free_deep(List *list)
Definition: list.c:1560

References DEBUG1, elog, list_free_deep(), listenChannels, MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1163 of file async.c.

1164{
1165 ListCell *q;
1166
1167 if (Trace_notify)
1168 elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1169
1170 foreach(q, listenChannels)
1171 {
1172 char *lchan = (char *) lfirst(q);
1173
1174 if (strcmp(lchan, channel) == 0)
1175 {
1177 pfree(lchan);
1178 break;
1179 }
1180 }
1181
1182 /*
1183 * We do not complain about unlistening something not being listened;
1184 * should we?
1185 */
1186}
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391

References DEBUG1, elog, foreach_delete_current, lfirst, listenChannels, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1804 of file async.c.

1805{
1806 /*
1807 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1808 * you do here.
1809 */
1810
1811 /* signal that work needs to be done */
1813
1814 /* make sure the event is processed in due course */
1816}
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:413
struct Latch * MyLatch
Definition: globals.c:62
void SetLatch(Latch *latch)
Definition: latch.c:632

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1212 of file async.c.

1213{
1214 ListCell *p;
1215
1216 foreach(p, listenChannels)
1217 {
1218 char *lchan = (char *) lfirst(p);
1219
1220 if (strcmp(lchan, channel) == 0)
1221 return true;
1222 }
1223 return false;
1224}

References lfirst, and listenChannels.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2348 of file async.c.

2349{
2350 const Notification *k = *(const Notification *const *) key;
2351
2352 Assert(keysize == sizeof(Notification *));
2353 /* We don't bother to include the payload's trailing null in the hash */
2354 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2355 k->channel_len + k->payload_len + 1));
2356}
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:227

References Assert, Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), sort-test::key, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2362 of file async.c.

2363{
2364 const Notification *k1 = *(const Notification *const *) key1;
2365 const Notification *k2 = *(const Notification *const *) key2;
2366
2367 Assert(keysize == sizeof(Notification *));
2368 if (k1->channel_len == k2->channel_len &&
2369 k1->payload_len == k2->payload_len &&
2370 memcmp(k1->data, k2->data,
2371 k1->channel_len + k1->payload_len + 2) == 0)
2372 return 0; /* equal */
2373 return 1; /* not equal */
2374}

References Assert, Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2224 of file async.c.

2225{
2227 {
2229
2231 pq_sendint32(&buf, srcPid);
2232 pq_sendstring(&buf, channel);
2233 pq_sendstring(&buf, payload);
2235
2236 /*
2237 * NOTE: we do not do pq_flush() here. Some level of caller will
2238 * handle it later, allowing this message to be combined into a packet
2239 * with other ones.
2240 */
2241 }
2242 else
2243 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2244}
@ DestRemote
Definition: dest.h:89
#define INFO
Definition: elog.h:34
CommandDest whereToSendOutput
Definition: postgres.c:90
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
#define PqMsg_NotificationResponse
Definition: protocol.h:41

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and HandleParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 790 of file async.c.

791{
792 FuncCallContext *funcctx;
793
794 /* stuff done only on the first call of the function */
795 if (SRF_IS_FIRSTCALL())
796 {
797 /* create a function context for cross-call persistence */
798 funcctx = SRF_FIRSTCALL_INIT();
799 }
800
801 /* stuff done on every call of the function */
802 funcctx = SRF_PERCALL_SETUP();
803
804 if (funcctx->call_cntr < list_length(listenChannels))
805 {
806 char *channel = (char *) list_nth(listenChannels,
807 funcctx->call_cntr);
808
809 SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
810 }
811
812 SRF_RETURN_DONE(funcctx);
813}
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:328
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
uint64 call_cntr
Definition: funcapi.h:65

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), listenChannels, SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1481 of file async.c.

1482{
1483 double usage;
1484
1485 /* Advance the queue tail so we don't report a too-large result */
1487
1488 LWLockAcquire(NotifyQueueLock, LW_SHARED);
1490 LWLockRelease(NotifyQueueLock);
1491
1493}
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
static void usage(const char *progname)
Definition: vacuumlo.c:414

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 557 of file async.c.

558{
559 const char *channel;
560 const char *payload;
561
562 if (PG_ARGISNULL(0))
563 channel = "";
564 else
566
567 if (PG_ARGISNULL(1))
568 payload = "";
569 else
571
572 /* For NOTIFY as a statement, this is checked in ProcessUtility */
574
575 Async_Notify(channel, payload);
576
578}
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:591
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:441
char * text_to_cstring(const text *t)
Definition: varlena.c:217

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 861 of file async.c.

862{
863 ListCell *p;
864
866 return; /* no relevant statements in this xact */
867
868 if (Trace_notify)
869 elog(DEBUG1, "PreCommit_Notify");
870
871 /* Preflight for any pending listen/unlisten actions */
872 if (pendingActions != NULL)
873 {
874 foreach(p, pendingActions->actions)
875 {
876 ListenAction *actrec = (ListenAction *) lfirst(p);
877
878 switch (actrec->action)
879 {
880 case LISTEN_LISTEN:
882 break;
883 case LISTEN_UNLISTEN:
884 /* there is no Exec_UnlistenPreCommit() */
885 break;
887 /* there is no Exec_UnlistenAllPreCommit() */
888 break;
889 }
890 }
891 }
892
893 /* Queue any pending notifies (must happen after the above) */
894 if (pendingNotifies)
895 {
896 ListCell *nextNotify;
897
898 /*
899 * Make sure that we have an XID assigned to the current transaction.
900 * GetCurrentTransactionId is cheap if we already have an XID, but not
901 * so cheap if we don't, and we'd prefer not to do that work while
902 * holding NotifyQueueLock.
903 */
905
906 /*
907 * Serialize writers by acquiring a special lock that we hold till
908 * after commit. This ensures that queue entries appear in commit
909 * order, and in particular that there are never uncommitted queue
910 * entries ahead of committed ones, so an uncommitted transaction
911 * can't block delivery of deliverable notifications.
912 *
913 * We use a heavyweight lock so that it'll automatically be released
914 * after either commit or abort. This also allows deadlocks to be
915 * detected, though really a deadlock shouldn't be possible here.
916 *
917 * The lock is on "database 0", which is pretty ugly but it doesn't
918 * seem worth inventing a special locktag category just for this.
919 * (Historical note: before PG 9.0, a similar lock on "database 0" was
920 * used by the flatfiles mechanism.)
921 */
922 LockSharedObject(DatabaseRelationId, InvalidOid, 0,
924
925 /* Now push the notifications into the queue */
926 nextNotify = list_head(pendingNotifies->events);
927 while (nextNotify != NULL)
928 {
929 /*
930 * Add the pending notifications to the queue. We acquire and
931 * release NotifyQueueLock once per page, which might be overkill
932 * but it does allow readers to get in while we're doing this.
933 *
934 * A full queue is very uncommon and should really not happen,
935 * given that we have so much space available in the SLRU pages.
936 * Nevertheless we need to deal with this possibility. Note that
937 * when we get here we are in the process of committing our
938 * transaction, but we have not yet committed to clog, so at this
939 * point in time we can still roll the transaction back.
940 */
941 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
943 if (asyncQueueIsFull())
945 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
946 errmsg("too many notifications in the NOTIFY queue")));
947 nextNotify = asyncQueueAddEntries(nextNotify);
948 LWLockRelease(NotifyQueueLock);
949 }
950
951 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
952 }
953}
static void Exec_ListenPreCommit(void)
Definition: async.c:1041
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1356
static void asyncQueueFillWarning(void)
Definition: async.c:1527
static bool asyncQueueIsFull(void)
Definition: async.c:1272
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1072
#define AccessExclusiveLock
Definition: lockdefs.h:43
static ListCell * list_head(const List *l)
Definition: pg_list.h:128

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 2183 of file async.c.

2184{
2185 /* We *must* reset the flag */
2186 notifyInterruptPending = false;
2187
2188 /* Do nothing else if we aren't actively listening */
2189 if (listenChannels == NIL)
2190 return;
2191
2192 if (Trace_notify)
2193 elog(DEBUG1, "ProcessIncomingNotify");
2194
2195 set_ps_display("notify interrupt");
2196
2197 /*
2198 * We must run asyncQueueReadAllNotifications inside a transaction, else
2199 * bad things happen if it gets an error.
2200 */
2202
2204
2206
2207 /*
2208 * If this isn't an end-of-command case, we must flush the notify messages
2209 * to ensure frontend gets them promptly.
2210 */
2211 if (flush)
2212 pq_flush();
2213
2214 set_ps_display("idle");
2215
2216 if (Trace_notify)
2217 elog(DEBUG1, "ProcessIncomingNotify: done");
2218}
#define pq_flush()
Definition: libpq.h:46
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void StartTransactionCommand(void)
Definition: xact.c:3051
void CommitTransactionCommand(void)
Definition: xact.c:3149

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, listenChannels, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1834 of file async.c.

1835{
1837 return; /* not really idle */
1838
1839 /* Loop in case another signal arrives while sending messages */
1841 ProcessIncomingNotify(flush);
1842}
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2183
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4981

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 690 of file async.c.

691{
692 MemoryContext oldcontext;
693 ListenAction *actrec;
694 int my_level = GetCurrentTransactionNestLevel();
695
696 /*
697 * Unlike Async_Notify, we don't try to collapse out duplicates. It would
698 * be too complicated to ensure we get the right interactions of
699 * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
700 * would be any performance benefit anyway in sane applications.
701 */
703
704 /* space for terminating null is included in sizeof(ListenAction) */
705 actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
706 strlen(channel) + 1);
707 actrec->action = action;
708 strcpy(actrec->channel, channel);
709
710 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
711 {
712 ActionList *actions;
713
714 /*
715 * First action in current sub(xact). Note that we allocate the
716 * ActionList in TopTransactionContext; the nestingLevel might get
717 * changed later by AtSubCommit_Notify.
718 */
719 actions = (ActionList *)
721 actions->nestingLevel = my_level;
722 actions->actions = list_make1(actrec);
723 actions->upper = pendingActions;
724 pendingActions = actions;
725 }
726 else
728
729 MemoryContextSwitchTo(oldcontext);
730}

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1581 of file async.c.

1582{
1583 int32 *pids;
1584 ProcNumber *procnos;
1585 int count;
1586
1587 /*
1588 * Identify backends that we need to signal. We don't want to send
1589 * signals while holding the NotifyQueueLock, so this loop just builds a
1590 * list of target PIDs.
1591 *
1592 * XXX in principle these pallocs could fail, which would be bad. Maybe
1593 * preallocate the arrays? They're not that large, though.
1594 */
1595 pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1596 procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
1597 count = 0;
1598
1599 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1601 {
1602 int32 pid = QUEUE_BACKEND_PID(i);
1603 QueuePosition pos;
1604
1605 Assert(pid != InvalidPid);
1606 pos = QUEUE_BACKEND_POS(i);
1608 {
1609 /*
1610 * Always signal listeners in our own database, unless they're
1611 * already caught up (unlikely, but possible).
1612 */
1613 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1614 continue;
1615 }
1616 else
1617 {
1618 /*
1619 * Listeners in other databases should be signaled only if they
1620 * are far behind.
1621 */
1624 continue;
1625 }
1626 /* OK, need to signal this one */
1627 pids[count] = pid;
1628 procnos[count] = i;
1629 count++;
1630 }
1631 LWLockRelease(NotifyQueueLock);
1632
1633 /* Now send signals */
1634 for (int i = 0; i < count; i++)
1635 {
1636 int32 pid = pids[i];
1637
1638 /*
1639 * If we are signaling our own process, no need to involve the kernel;
1640 * just set the flag directly.
1641 */
1642 if (pid == MyProcPid)
1643 {
1645 continue;
1646 }
1647
1648 /*
1649 * Note: assuming things aren't broken, a signal failure here could
1650 * only occur if the target backend exited since we released
1651 * NotifyQueueLock; which is unlikely but certainly possible. So we
1652 * just log a low-level debug message if it happens.
1653 */
1654 if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
1655 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1656 }
1657
1658 pfree(pids);
1659 pfree(procnos);
1660}
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition: async.c:466
#define DEBUG3
Definition: elog.h:28
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_NOTIFY_INTERRUPT
Definition: procsignal.h:33

References Assert, asyncQueuePageDiff(), DEBUG3, elog, i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, notifyInterruptPending, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 294 of file async.c.

Referenced by asyncQueueFillWarning(), and AsyncShmemInit().

◆ listenChannels

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 428 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 308 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

◆ pendingNotifies

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 422 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 416 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().