199#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
223#define QUEUEALIGN(len) INTALIGN(len)
225#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
236#define QUEUE_POS_PAGE(x) ((x).page)
237#define QUEUE_POS_OFFSET(x) ((x).offset)
239#define SET_QUEUE_POS(x,y,z) \
245#define QUEUE_POS_EQUAL(x,y) \
246 ((x).page == (y).page && (x).offset == (y).offset)
248#define QUEUE_POS_IS_ZERO(x) \
249 ((x).page == 0 && (x).offset == 0)
252#define QUEUE_POS_MIN(x,y) \
253 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
254 (x).page != (y).page ? (y) : \
255 (x).offset < (y).offset ? (x) : (y))
258#define QUEUE_POS_MAX(x,y) \
259 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
260 (x).page != (y).page ? (x) : \
261 (x).offset > (y).offset ? (x) : (y))
264#define QUEUE_POS_PRECEDES(x,y) \
265 (asyncQueuePagePrecedes((x).page, (y).page) || \
266 ((x).page == (y).page && (x).offset < (y).offset))
280#define QUEUE_CLEANUP_DELAY 4
347#define QUEUE_HEAD (asyncQueueControl->head)
348#define QUEUE_TAIL (asyncQueueControl->tail)
349#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
350#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
351#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
352#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
353#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
354#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
355#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
356#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
363#define NotifyCtl (&NotifyCtlData)
364#define QUEUE_PAGESIZE BLCKSZ
366#define QUEUE_FULL_WARN_INTERVAL 5000
377#define INITIAL_LISTENERS_ARRAY_SIZE 4
410#define LocalChannelTableIsEmpty() \
411 (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
512#define MIN_HASHABLE_NOTIFIES 16
575 const char *channel);
894 elog(
ERROR,
"cannot send notifications from a parallel worker");
899 channel_len = channel ?
strlen(channel) : 0;
900 payload_len = payload ?
strlen(payload) : 0;
903 if (channel_len == 0)
906 errmsg(
"channel name cannot be empty")));
912 errmsg(
"channel name too long")));
917 errmsg(
"payload string too long")));
929 channel_len + payload_len + 2);
936 n->
data[channel_len + 1] =
'\0';
1157 errmsg(
"cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1247 char *channel = n->data;
1346 errmsg(
"too many notifications in the NOTIFY queue")));
1723 elog(
PANIC,
"global channel table missing post-commit/abort");
1815 elog(
PANIC,
"could not find listener entry for channel \"%s\" backend %d",
2234 errdetail(
"The server process with PID %d is among those with the oldest transactions.",
minPid)
2237 errhint(
"The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2375 for (
int i = 0;
i < count;
i++)
2396 elog(
DEBUG3,
"could not signal backend with PID %d: %m", pid);
2835 char *channel =
qe->
data;
2840 char *payload =
qe->data +
strlen(channel) + 1;
2950 char *page_buffer =
NULL;
2951 bool page_dirty =
false;
3111 elog(
INFO,
"NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3218 char *channel = n->
data;
3261 if (
k1->channel_len ==
k2->channel_len &&
3262 k1->payload_len ==
k2->payload_len &&
3264 k1->channel_len +
k1->payload_len + 2) == 0)
Datum idx(PG_FUNCTION_ARGS)
static void SignalBackends(void)
static double asyncQueueUsage(void)
#define MIN_HASHABLE_NOTIFIES
static void PrepareTableEntriesForListen(const char *channel)
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
#define QUEUE_FIRST_LISTENER
#define QUEUE_POS_MAX(x, y)
static bool tryAdvanceTail
void HandleNotifyInterrupt(void)
static void BecomeRegisteredListener(void)
static void asyncQueueAdvanceTail(void)
int max_notify_queue_pages
static ActionList * pendingActions
static void ApplyPendingListenActions(bool isCommit)
#define QUEUE_BACKEND_IS_ADVANCING(i)
static uint32 notification_hash(const void *key, Size keysize)
void Async_UnlistenAll(void)
static int32 * signalPids
static SlruCtlData NotifyCtlData
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
void AtCommit_Notify(void)
#define QUEUE_POS_MIN(x, y)
static void PrepareTableEntriesForUnlisten(const char *channel)
void ProcessNotifyInterrupt(bool flush)
static bool AsyncExistsPendingNotify(Notification *n)
#define QUEUE_BACKEND_POS(i)
static const dshash_parameters globalChannelTableDSHParams
#define INITIAL_LISTENERS_ARRAY_SIZE
static int notification_match(const void *key1, const void *key2, Size keysize)
static dshash_hash globalChannelTableHash(const void *key, size_t size, void *arg)
#define SET_QUEUE_POS(x, y, z)
static ProcNumber * signalProcnos
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
static void ProcessIncomingNotify(bool flush)
static void asyncQueueReadAllNotifications(void)
static void Async_UnlistenOnExit(int code, Datum arg)
#define QUEUE_POS_OFFSET(x)
static QueuePosition queueHeadAfterWrite
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
static void ClearPendingActionsAndNotifies(void)
Datum pg_listening_channels(PG_FUNCTION_ARGS)
Datum pg_notify(PG_FUNCTION_ARGS)
static NotificationList * pendingNotifies
#define AsyncQueueEntryEmptySize
static void AddEventToPendingNotifies(Notification *n)
static AsyncQueueControl * asyncQueueControl
static bool unlistenExitRegistered
static bool asyncQueuePagePrecedes(int64 p, int64 q)
static dsa_area * globalChannelDSA
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
void AtAbort_Notify(void)
#define QUEUE_POS_PAGE(x)
static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, int idx)
void PreCommit_Notify(void)
#define QUEUE_CLEANUP_DELAY
static void PrepareTableEntriesForUnlistenAll(void)
static void asyncQueueFillWarning(void)
#define QUEUE_BACKEND_PID(i)
static void CleanupListenersOnExit(void)
static void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
Size AsyncShmemSize(void)
#define QUEUE_FULL_WARN_INTERVAL
void Async_Unlisten(const char *channel)
static HTAB * pendingListenActions
void Async_Listen(const char *channel)
#define NOTIFY_PAYLOAD_MAX_LENGTH
#define QUEUE_POS_IS_ZERO(x)
static void initGlobalChannelTable(void)
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
static HTAB * localChannelTable
static int64 asyncQueuePageDiff(int64 p, int64 q)
static void queue_listen(ListenActionKind action, const char *channel)
static bool amRegisteredListener
#define QUEUE_POS_PRECEDES(x, y)
#define QUEUE_NEXT_LISTENER(i)
#define QUEUE_BACKEND_DBOID(i)
void AtSubAbort_Notify(void)
void AtPrepare_Notify(void)
void AtSubCommit_Notify(void)
static bool asyncQueueIsFull(void)
void AsyncShmemInit(void)
static void initLocalChannelTable(void)
static dshash_table * globalChannelTable
static void asyncQueueUnregister(void)
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
#define QUEUE_POS_EQUAL(x, y)
#define LocalChannelTableIsEmpty()
static void initPendingListenActions(void)
static QueuePosition queueHeadBeforeWrite
static bool IsListeningOn(const char *channel)
void Async_Notify(const char *channel, const char *payload)
volatile sig_atomic_t notifyInterruptPending
void AsyncNotifyFreezeXids(TransactionId newFrozenXid)
bool check_notify_buffers(int *newval, void **extra, GucSource source)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
#define CStringGetTextDatum(s)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
dsa_area * dsa_attach(dsa_handle handle)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_pin_mapping(dsa_area *area)
dsa_handle dsa_get_handle(dsa_area *area)
void dsa_free(dsa_area *area, dsa_pointer dp)
void dsa_pin(dsa_area *area)
#define dsa_create(tranche_id)
#define dsa_allocate(area, size)
#define InvalidDsaPointer
#define DSA_HANDLE_INVALID
#define DsaPointerIsValid(x)
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
void dshash_delete_entry(dshash_table *hash_table, void *entry)
void dshash_release_lock(dshash_table *hash_table, void *entry)
void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, bool exclusive)
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
void dshash_seq_term(dshash_seq_status *status)
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
void * dshash_seq_next(dshash_seq_status *status)
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
void dshash_delete_current(dshash_seq_status *status)
#define DSHASH_HANDLE_INVALID
dsa_pointer dshash_table_handle
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void hash_destroy(HTAB *hashp)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TEXT_PP(n)
#define PG_RETURN_FLOAT8(x)
#define SRF_IS_FIRSTCALL()
#define SRF_PERCALL_SETUP()
#define SRF_RETURN_NEXT(_funcctx, _result)
#define SRF_FIRSTCALL_INIT()
#define SRF_RETURN_DONE(_funcctx)
static Datum hash_uint32(uint32 k)
static Datum hash_any(const unsigned char *k, int keylen)
#define IsParallelWorker()
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
List * lappend(List *list, void *datum)
List * list_concat(List *list1, const List *list2)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * MemoryContextAlloc(MemoryContext context, Size size)
MemoryContext TopTransactionContext
void pfree(void *pointer)
MemoryContext TopMemoryContext
MemoryContext CurTransactionContext
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define SLRU_PAGES_PER_SEGMENT
static int list_length(const List *l)
#define foreach_ptr(type, var, lst)
static ListCell * list_head(const List *l)
static ListCell * lnext(const List *l, const ListCell *c)
static rewind_source * source
static char buf[DEFAULT_XLOG_SEG_SIZE]
size_t strlcpy(char *dst, const char *src, size_t siz)
CommandDest whereToSendOutput
static uint32 DatumGetUInt32(Datum X)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_NOTIFY_INTERRUPT
#define PqMsg_NotificationResponse
static void set_ps_display(const char *activity)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Size SimpleLruShmemSize(int nslots, int nlsns)
bool check_slru_buffers(const char *name, int *newval)
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Snapshot GetLatestSnapshot(void)
void UnregisterSnapshot(Snapshot snapshot)
Snapshot RegisterSnapshot(Snapshot snapshot)
struct ActionList * upper
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
dshash_table_handle globalChannelTableDSH
TimestampTz lastQueueFillWarn
dsa_handle globalChannelTableDSA
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
char channel[NAMEDATALEN]
dsa_pointer listenersArray
char channel[NAMEDATALEN]
List * uniqueChannelNames
struct NotificationList * upper
char data[FLEXIBLE_ARRAY_MEMBER]
PendingListenAction action
char channel[NAMEDATALEN]
bool TransactionIdDidCommit(TransactionId transactionId)
#define FrozenTransactionId
#define InvalidTransactionId
#define TransactionIdIsNormal(xid)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
void PreventCommandDuringRecovery(const char *cmdname)
char * text_to_cstring(const text *t)
bool IsTransactionOrTransactionBlock(void)
int GetCurrentTransactionNestLevel(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
TransactionId GetCurrentTransactionId(void)