PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
sinvaladt.h File Reference
#include "storage/lock.h"
#include "storage/sinval.h"
Include dependency graph for sinvaladt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size SharedInvalShmemSize (void)
 
void SharedInvalShmemInit (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Function Documentation

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 700 of file sinvaladt.c.

701{
702 LocalTransactionId result;
703
704 /* loop to avoid returning InvalidLocalTransactionId at wraparound */
705 do
706 {
707 result = nextLocalTransactionId++;
708 } while (!LocalTransactionIdIsValid(result));
709
710 return result;
711}
uint32 LocalTransactionId
Definition: c.h:608
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:66
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:208

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 271 of file sinvaladt.c.

272{
273 ProcState *stateP;
274 pid_t oldPid;
275 SISeg *segP = shmInvalBuffer;
276
277 if (MyProcNumber < 0)
278 elog(ERROR, "MyProcNumber not set");
280 elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
282 stateP = &segP->procState[MyProcNumber];
283
284 /*
285 * This can run in parallel with read operations, but not with write
286 * operations, since SIInsertDataEntries relies on the pgprocnos array to
287 * set hasMessages appropriately.
288 */
289 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
290
291 oldPid = stateP->procPid;
292 if (oldPid != 0)
293 {
294 LWLockRelease(SInvalWriteLock);
295 elog(ERROR, "sinval slot for backend %d is already in use by process %d",
296 MyProcNumber, (int) oldPid);
297 }
298
300
301 /* Fetch next local transaction ID into local memory */
303
304 /* mark myself active, with all extant messages already read */
305 stateP->procPid = MyProcPid;
306 stateP->nextMsgNum = segP->maxMsgNum;
307 stateP->resetState = false;
308 stateP->signaled = false;
309 stateP->hasMessages = false;
310 stateP->sendOnly = sendOnly;
311
312 LWLockRelease(SInvalWriteLock);
313
314 /* register exit routine to mark my entry inactive at exit */
316}
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
int MyProcPid
Definition: globals.c:46
ProcNumber MyProcNumber
Definition: globals.c:89
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:205
#define NumProcStateSlots
Definition: sinvaladt.c:203
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:327
int nextMsgNum
Definition: sinvaladt.c:142
bool signaled
Definition: sinvaladt.c:144
LocalTransactionId nextLXID
Definition: sinvaladt.c:161
pid_t procPid
Definition: sinvaladt.c:140
bool hasMessages
Definition: sinvaladt.c:145
bool sendOnly
Definition: sinvaladt.c:153
bool resetState
Definition: sinvaladt.c:143
int maxMsgNum
Definition: sinvaladt.c:171
int * pgprocnos
Definition: sinvaladt.c:194
int numProcs
Definition: sinvaladt.c:193
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:195

References CleanupInvalidationState(), elog, ERROR, ProcState::hasMessages, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MyProcNumber, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::numProcs, NumProcStateSlots, on_shmem_exit(), PANIC, SISeg::pgprocnos, PointerGetDatum(), ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

◆ SharedInvalShmemInit()

void SharedInvalShmemInit ( void  )

Definition at line 233 of file sinvaladt.c.

234{
235 int i;
236 bool found;
237
238 /* Allocate space in shared memory */
240 ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
241 if (found)
242 return;
243
244 /* Clear message counters, save size of procState array, init spinlock */
249
250 /* The buffer[] array is initially all unused, so we need not fill it */
251
252 /* Mark all backends inactive, and initialize nextLXID */
253 for (i = 0; i < NumProcStateSlots; i++)
254 {
255 shmInvalBuffer->procState[i].procPid = 0; /* inactive */
256 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
261 }
264}
int i
Definition: isn.c:72
#define InvalidLocalTransactionId
Definition: lock.h:65
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
Size SharedInvalShmemSize(void)
Definition: sinvaladt.c:217
#define CLEANUP_MIN
Definition: sinvaladt.c:131
#define SpinLockInit(lock)
Definition: spin.h:57
int minMsgNum
Definition: sinvaladt.c:170
slock_t msgnumLock
Definition: sinvaladt.c:174
int nextThreshold
Definition: sinvaladt.c:172

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, SISeg::numProcs, NumProcStateSlots, SISeg::pgprocnos, ProcState::procPid, SISeg::procState, ProcState::resetState, SharedInvalShmemSize(), ShmemInitStruct(), shmInvalBuffer, ProcState::signaled, and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ SharedInvalShmemSize()

Size SharedInvalShmemSize ( void  )

Definition at line 217 of file sinvaladt.c.

218{
219 Size size;
220
221 size = offsetof(SISeg, procState);
222 size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
223 size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
224
225 return size;
226}
size_t Size
Definition: c.h:559
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), mul_size(), NumProcStateSlots, and size.

Referenced by CalculateShmemSize(), and SharedInvalShmemInit().

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 576 of file sinvaladt.c.

577{
578 SISeg *segP = shmInvalBuffer;
579 int min,
580 minsig,
581 lowbound,
582 numMsgs,
583 i;
584 ProcState *needSig = NULL;
585
586 /* Lock out all writers and readers */
587 if (!callerHasWriteLock)
588 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
589 LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
590
591 /*
592 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
593 * furthest-back backend that needs signaling (if any), and reset any
594 * backends that are too far back. Note that because we ignore sendOnly
595 * backends here it is possible for them to keep sending messages without
596 * a problem even when they are the only active backend.
597 */
598 min = segP->maxMsgNum;
599 minsig = min - SIG_THRESHOLD;
600 lowbound = min - MAXNUMMESSAGES + minFree;
601
602 for (i = 0; i < segP->numProcs; i++)
603 {
604 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
605 int n = stateP->nextMsgNum;
606
607 /* Ignore if already in reset state */
608 Assert(stateP->procPid != 0);
609 if (stateP->resetState || stateP->sendOnly)
610 continue;
611
612 /*
613 * If we must free some space and this backend is preventing it, force
614 * him into reset state and then ignore until he catches up.
615 */
616 if (n < lowbound)
617 {
618 stateP->resetState = true;
619 /* no point in signaling him ... */
620 continue;
621 }
622
623 /* Track the global minimum nextMsgNum */
624 if (n < min)
625 min = n;
626
627 /* Also see who's furthest back of the unsignaled backends */
628 if (n < minsig && !stateP->signaled)
629 {
630 minsig = n;
631 needSig = stateP;
632 }
633 }
634 segP->minMsgNum = min;
635
636 /*
637 * When minMsgNum gets really large, decrement all message counters so as
638 * to forestall overflow of the counters. This happens seldom enough that
639 * folding it into the previous loop would be a loser.
640 */
641 if (min >= MSGNUMWRAPAROUND)
642 {
645 for (i = 0; i < segP->numProcs; i++)
647 }
648
649 /*
650 * Determine how many messages are still in the queue, and set the
651 * threshold at which we should repeat SICleanupQueue().
652 */
653 numMsgs = segP->maxMsgNum - segP->minMsgNum;
654 if (numMsgs < CLEANUP_MIN)
656 else
657 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
658
659 /*
660 * Lastly, signal anyone who needs a catchup interrupt. Since
661 * SendProcSignal() might not be fast, we don't want to hold locks while
662 * executing it.
663 */
664 if (needSig)
665 {
666 pid_t his_pid = needSig->procPid;
667 ProcNumber his_procNumber = (needSig - &segP->procState[0]);
668
669 needSig->signaled = true;
670 LWLockRelease(SInvalReadLock);
671 LWLockRelease(SInvalWriteLock);
672 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
673 SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
674 if (callerHasWriteLock)
675 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
676 }
677 else
678 {
679 LWLockRelease(SInvalReadLock);
680 if (!callerHasWriteLock)
681 LWLockRelease(SInvalWriteLock);
682 }
683}
#define Assert(condition)
Definition: c.h:812
#define DEBUG4
Definition: elog.h:27
int ProcNumber
Definition: procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_CATCHUP_INTERRUPT
Definition: procsignal.h:32
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:132
#define MAXNUMMESSAGES
Definition: sinvaladt.c:129
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:130
#define SIG_THRESHOLD
Definition: sinvaladt.c:133

References Assert, CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, SISeg::numProcs, SISeg::pgprocnos, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 472 of file sinvaladt.c.

473{
474 SISeg *segP;
475 ProcState *stateP;
476 int max;
477 int n;
478
479 segP = shmInvalBuffer;
480 stateP = &segP->procState[MyProcNumber];
481
482 /*
483 * Before starting to take locks, do a quick, unlocked test to see whether
484 * there can possibly be anything to read. On a multiprocessor system,
485 * it's possible that this load could migrate backwards and occur before
486 * we actually enter this function, so we might miss a sinval message that
487 * was just added by some other processor. But they can't migrate
488 * backwards over a preceding lock acquisition, so it should be OK. If we
489 * haven't acquired a lock preventing against further relevant
490 * invalidations, any such occurrence is not much different than if the
491 * invalidation had arrived slightly later in the first place.
492 */
493 if (!stateP->hasMessages)
494 return 0;
495
496 LWLockAcquire(SInvalReadLock, LW_SHARED);
497
498 /*
499 * We must reset hasMessages before determining how many messages we're
500 * going to read. That way, if new messages arrive after we have
501 * determined how many we're reading, the flag will get reset and we'll
502 * notice those messages part-way through.
503 *
504 * Note that, if we don't end up reading all of the messages, we had
505 * better be certain to reset this flag before exiting!
506 */
507 stateP->hasMessages = false;
508
509 /* Fetch current value of maxMsgNum using spinlock */
511 max = segP->maxMsgNum;
513
514 if (stateP->resetState)
515 {
516 /*
517 * Force reset. We can say we have dealt with any messages added
518 * since the reset, as well; and that means we should clear the
519 * signaled flag, too.
520 */
521 stateP->nextMsgNum = max;
522 stateP->resetState = false;
523 stateP->signaled = false;
524 LWLockRelease(SInvalReadLock);
525 return -1;
526 }
527
528 /*
529 * Retrieve messages and advance backend's counter, until data array is
530 * full or there are no more messages.
531 *
532 * There may be other backends that haven't read the message(s), so we
533 * cannot delete them here. SICleanupQueue() will eventually remove them
534 * from the queue.
535 */
536 n = 0;
537 while (n < datasize && stateP->nextMsgNum < max)
538 {
539 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
540 stateP->nextMsgNum++;
541 }
542
543 /*
544 * If we have caught up completely, reset our "signaled" flag so that
545 * we'll get another signal if we fall behind again.
546 *
547 * If we haven't caught up completely, reset the hasMessages flag so that
548 * we see the remaining messages next time.
549 */
550 if (stateP->nextMsgNum >= max)
551 stateP->signaled = false;
552 else
553 stateP->hasMessages = true;
554
555 LWLockRelease(SInvalReadLock);
556 return n;
557}
@ LW_SHARED
Definition: lwlock.h:115
const void * data
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:179

References SISeg::buffer, data, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyProcNumber, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 369 of file sinvaladt.c.

370{
371 SISeg *segP = shmInvalBuffer;
372
373 /*
374 * N can be arbitrarily large. We divide the work into groups of no more
375 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
376 * an unreasonably long time. (This is not so much because we care about
377 * letting in other writers, as that some just-caught-up backend might be
378 * trying to do SICleanupQueue to pass on its signal, and we don't want it
379 * to have to wait a long time.) Also, we need to consider calling
380 * SICleanupQueue every so often.
381 */
382 while (n > 0)
383 {
384 int nthistime = Min(n, WRITE_QUANTUM);
385 int numMsgs;
386 int max;
387 int i;
388
389 n -= nthistime;
390
391 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
392
393 /*
394 * If the buffer is full, we *must* acquire some space. Clean the
395 * queue and reset anyone who is preventing space from being freed.
396 * Otherwise, clean the queue only when it's exceeded the next
397 * fullness threshold. We have to loop and recheck the buffer state
398 * after any call of SICleanupQueue.
399 */
400 for (;;)
401 {
402 numMsgs = segP->maxMsgNum - segP->minMsgNum;
403 if (numMsgs + nthistime > MAXNUMMESSAGES ||
404 numMsgs >= segP->nextThreshold)
405 SICleanupQueue(true, nthistime);
406 else
407 break;
408 }
409
410 /*
411 * Insert new message(s) into proper slot of circular buffer
412 */
413 max = segP->maxMsgNum;
414 while (nthistime-- > 0)
415 {
416 segP->buffer[max % MAXNUMMESSAGES] = *data++;
417 max++;
418 }
419
420 /* Update current value of maxMsgNum using spinlock */
422 segP->maxMsgNum = max;
424
425 /*
426 * Now that the maxMsgNum change is globally visible, we give everyone
427 * a swift kick to make sure they read the newly added messages.
428 * Releasing SInvalWriteLock will enforce a full memory barrier, so
429 * these (unlocked) changes will be committed to memory before we exit
430 * the function.
431 */
432 for (i = 0; i < segP->numProcs; i++)
433 {
434 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
435
436 stateP->hasMessages = true;
437 }
438
439 LWLockRelease(SInvalWriteLock);
440 }
441}
#define Min(x, y)
Definition: c.h:958
#define WRITE_QUANTUM
Definition: sinvaladt.c:134
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:576

References SISeg::buffer, data, ProcState::hasMessages, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::numProcs, SISeg::pgprocnos, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().