163#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
187#define QUEUEALIGN(len) INTALIGN(len)
189#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
200#define QUEUE_POS_PAGE(x) ((x).page)
201#define QUEUE_POS_OFFSET(x) ((x).offset)
203#define SET_QUEUE_POS(x,y,z) \
209#define QUEUE_POS_EQUAL(x,y) \
210 ((x).page == (y).page && (x).offset == (y).offset)
212#define QUEUE_POS_IS_ZERO(x) \
213 ((x).page == 0 && (x).offset == 0)
216#define QUEUE_POS_MIN(x,y) \
217 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
218 (x).page != (y).page ? (y) : \
219 (x).offset < (y).offset ? (x) : (y))
222#define QUEUE_POS_MAX(x,y) \
223 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
224 (x).page != (y).page ? (x) : \
225 (x).offset > (y).offset ? (x) : (y))
238#define QUEUE_CLEANUP_DELAY 4
296#define QUEUE_HEAD (asyncQueueControl->head)
297#define QUEUE_TAIL (asyncQueueControl->tail)
298#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
299#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
300#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
301#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
302#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
303#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
310#define NotifyCtl (&NotifyCtlData)
311#define QUEUE_PAGESIZE BLCKSZ
313#define QUEUE_FULL_WARN_INTERVAL 5000
397#define MIN_HASHABLE_NOTIFIES 16
600 elog(
ERROR,
"cannot send notifications from a parallel worker");
605 channel_len = channel ? strlen(channel) : 0;
606 payload_len = payload ? strlen(payload) : 0;
609 if (channel_len == 0)
611 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
612 errmsg(
"channel name cannot be empty")));
617 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
618 errmsg(
"channel name too long")));
622 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
623 errmsg(
"payload string too long")));
635 channel_len + payload_len + 2);
638 strcpy(n->
data, channel);
640 strcpy(n->
data + channel_len + 1, payload);
642 n->
data[channel_len + 1] =
'\0';
706 strlen(channel) + 1);
708 strcpy(actrec->
channel, channel);
841 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842 errmsg(
"cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
927 while (nextNotify != NULL)
945 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
946 errmsg(
"too many notifications in the NOTIFY queue")));
1172 char *lchan = (
char *)
lfirst(q);
1174 if (strcmp(lchan, channel) == 0)
1218 char *lchan = (
char *)
lfirst(p);
1220 if (strcmp(lchan, channel) == 0)
1276 int64 occupied = headPage - tailPage;
1291 bool pageJump =
false;
1297 offset += entryLength;
1332 qe->
length = entryLength;
1336 memcpy(qe->
data, n->
data, channellen + payloadlen + 2);
1396 NotifyCtl->shared->page_dirty[slotno] =
true;
1398 while (nextNotify != NULL)
1427 memcpy(
NotifyCtl->shared->page_buffer[slotno] + offset,
1438 if (lock != prevlock)
1510 int64 occupied = headPage - tailPage;
1533 if (fillDegree < 0.5)
1553 (
errmsg(
"NOTIFY queue is %.0f%% full", fillDegree * 100),
1555 errdetail(
"The server process with PID %d is among those with the oldest transactions.", minPid)
1558 errhint(
"The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1634 for (
int i = 0;
i < count;
i++)
1655 elog(
DEBUG3,
"could not signal backend with PID %d: %m", pid);
1716 childPendingActions->
actions);
1717 pfree(childPendingActions);
1745 foreach(l, childPendingNotifies->
events)
1752 pfree(childPendingNotifies);
1782 pfree(childPendingActions);
1791 pfree(childPendingNotifies);
1960 memcpy(page_buffer.buf + curoffset,
1961 NotifyCtl->shared->page_buffer[slotno] + curoffset,
1984 }
while (!reachedStop);
2021 bool reachedStop =
false;
2022 bool reachedEndOfPage;
2065 *current = thisentry;
2072 char *channel = qe->
data;
2077 char *payload = qe->
data + strlen(channel) + 1;
2092 }
while (!reachedEndOfPage);
2243 elog(
INFO,
"NOTIFY for \"%s\" payload \"%s\"", channel, payload);
static void SignalBackends(void)
static double asyncQueueUsage(void)
#define MIN_HASHABLE_NOTIFIES
static void Exec_ListenCommit(const char *channel)
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
#define QUEUE_FIRST_LISTENER
#define QUEUE_POS_MAX(x, y)
static bool tryAdvanceTail
struct QueuePosition QueuePosition
void HandleNotifyInterrupt(void)
static void Exec_UnlistenCommit(const char *channel)
static void asyncQueueAdvanceTail(void)
int max_notify_queue_pages
static void Exec_ListenPreCommit(void)
static ActionList * pendingActions
static uint32 notification_hash(const void *key, Size keysize)
void Async_UnlistenAll(void)
static SlruCtlData NotifyCtlData
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
void AtCommit_Notify(void)
#define QUEUE_POS_MIN(x, y)
void ProcessNotifyInterrupt(bool flush)
static bool AsyncExistsPendingNotify(Notification *n)
#define QUEUE_BACKEND_POS(i)
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
static int notification_match(const void *key1, const void *key2, Size keysize)
#define SET_QUEUE_POS(x, y, z)
static void ProcessIncomingNotify(bool flush)
static void asyncQueueReadAllNotifications(void)
static void Async_UnlistenOnExit(int code, Datum arg)
#define QUEUE_POS_OFFSET(x)
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
static void ClearPendingActionsAndNotifies(void)
static List * listenChannels
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Datum pg_notify(PG_FUNCTION_ARGS)
static NotificationList * pendingNotifies
#define AsyncQueueEntryEmptySize
static void AddEventToPendingNotifies(Notification *n)
static AsyncQueueControl * asyncQueueControl
static bool unlistenExitRegistered
static bool asyncQueuePagePrecedes(int64 p, int64 q)
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
void AtAbort_Notify(void)
#define QUEUE_POS_PAGE(x)
void PreCommit_Notify(void)
#define QUEUE_CLEANUP_DELAY
struct AsyncQueueControl AsyncQueueControl
static void asyncQueueFillWarning(void)
#define QUEUE_BACKEND_PID(i)
static void Exec_UnlistenAllCommit(void)
struct ActionList ActionList
Size AsyncShmemSize(void)
#define QUEUE_FULL_WARN_INTERVAL
void Async_Unlisten(const char *channel)
void Async_Listen(const char *channel)
#define NOTIFY_PAYLOAD_MAX_LENGTH
#define QUEUE_POS_IS_ZERO(x)
static int64 asyncQueuePageDiff(int64 p, int64 q)
static void queue_listen(ListenActionKind action, const char *channel)
static bool amRegisteredListener
#define QUEUE_NEXT_LISTENER(i)
#define QUEUE_BACKEND_DBOID(i)
void AtSubAbort_Notify(void)
struct NotificationList NotificationList
void AtPrepare_Notify(void)
void AtSubCommit_Notify(void)
static bool asyncQueueIsFull(void)
void AsyncShmemInit(void)
static void asyncQueueUnregister(void)
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
struct AsyncQueueEntry AsyncQueueEntry
#define QUEUE_POS_EQUAL(x, y)
struct Notification Notification
static bool IsListeningOn(const char *channel)
void Async_Notify(const char *channel, const char *payload)
volatile sig_atomic_t notifyInterruptPending
bool check_notify_buffers(int *newval, void **extra, GucSource source)
struct QueueBackendStatus QueueBackendStatus
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
#define CStringGetTextDatum(s)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TEXT_PP(n)
#define PG_RETURN_FLOAT8(x)
#define SRF_IS_FIRSTCALL()
#define SRF_PERCALL_SETUP()
#define SRF_RETURN_NEXT(_funcctx, _result)
#define SRF_FIRSTCALL_INIT()
#define SRF_RETURN_DONE(_funcctx)
static Datum hash_any(const unsigned char *k, int keylen)
#define IsParallelWorker()
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
List * lappend(List *list, void *datum)
List * list_concat(List *list1, const List *list2)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
@ LWTRANCHE_NOTIFY_BUFFER
void * MemoryContextAlloc(MemoryContext context, Size size)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
MemoryContext CurTransactionContext
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
static void * list_nth(const List *list, int n)
static ListCell * list_head(const List *l)
static ListCell * lnext(const List *l, const ListCell *c)
static rewind_source * source
CommandDest whereToSendOutput
static uint32 DatumGetUInt32(Datum X)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_NOTIFY_INTERRUPT
#define PqMsg_NotificationResponse
static void set_ps_display(const char *activity)
MemoryContextSwitchTo(old_ctx)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
static pg_noinline void Size size
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Size SimpleLruShmemSize(int nslots, int nlsns)
bool check_slru_buffers(const char *name, int *newval)
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
#define SLRU_PAGES_PER_SEGMENT
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Snapshot GetLatestSnapshot(void)
void UnregisterSnapshot(Snapshot snapshot)
Snapshot RegisterSnapshot(Snapshot snapshot)
struct ActionList * upper
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
TimestampTz lastQueueFillWarn
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
char channel[FLEXIBLE_ARRAY_MEMBER]
struct NotificationList * upper
char data[FLEXIBLE_ARRAY_MEMBER]
bool TransactionIdDidCommit(TransactionId transactionId)
#define InvalidTransactionId
void PreventCommandDuringRecovery(const char *cmdname)
static void usage(const char *progname)
char * text_to_cstring(const text *t)
bool IsTransactionOrTransactionBlock(void)
int GetCurrentTransactionNestLevel(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
TransactionId GetCurrentTransactionId(void)