PostgreSQL Source Code git master
Loading...
Searching...
No Matches
sinvaladt.h File Reference
#include "storage/sinval.h"
Include dependency graph for sinvaladt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void SharedInvalBackendInit (bool sendOnly)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Function Documentation

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )
extern

Definition at line 703 of file sinvaladt.c.

704{
706
707 /* loop to avoid returning InvalidLocalTransactionId at wraparound */
708 do
709 {
712
713 return result;
714}
uint32 LocalTransactionId
Definition c.h:738
uint32 result
#define LocalTransactionIdIsValid(lxid)
Definition lock.h:69
static LocalTransactionId nextLocalTransactionId
Definition sinvaladt.c:218

References LocalTransactionIdIsValid, nextLocalTransactionId, and result.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)
extern

Definition at line 274 of file sinvaladt.c.

275{
279
280 if (MyProcNumber < 0)
281 elog(ERROR, "MyProcNumber not set");
283 elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
285 stateP = &segP->procState[MyProcNumber];
286
287 /*
288 * This can run in parallel with read operations, but not with write
289 * operations, since SIInsertDataEntries relies on the pgprocnos array to
290 * set hasMessages appropriately.
291 */
293
294 oldPid = stateP->procPid;
295 if (oldPid != 0)
296 {
298 elog(ERROR, "sinval slot for backend %d is already in use by process %d",
299 MyProcNumber, (int) oldPid);
300 }
301
303
304 /* Fetch next local transaction ID into local memory */
305 nextLocalTransactionId = stateP->nextLXID;
306
307 /* mark myself active, with all extant messages already read */
308 stateP->procPid = MyProcPid;
309 stateP->nextMsgNum = segP->maxMsgNum;
310 stateP->resetState = false;
311 stateP->signaled = false;
312 stateP->hasMessages = false;
313 stateP->sendOnly = sendOnly;
314
316
317 /* register exit routine to mark my entry inactive at exit */
319}
#define PANIC
Definition elog.h:44
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
int MyProcPid
Definition globals.c:49
ProcNumber MyProcNumber
Definition globals.c:92
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
static int fb(int x)
static SISeg * shmInvalBuffer
Definition sinvaladt.c:207
#define NumProcStateSlots
Definition sinvaladt.c:205
static void CleanupInvalidationState(int status, Datum arg)
Definition sinvaladt.c:330
int * pgprocnos
Definition sinvaladt.c:195
int numProcs
Definition sinvaladt.c:194

References CleanupInvalidationState(), elog, ERROR, fb(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, nextLocalTransactionId, SISeg::numProcs, NumProcStateSlots, on_shmem_exit(), PANIC, SISeg::pgprocnos, PointerGetDatum(), and shmInvalBuffer.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)
extern

Definition at line 579 of file sinvaladt.c.

580{
582 int min,
583 minsig,
584 lowbound,
585 numMsgs,
586 i;
588
589 /* Lock out all writers and readers */
593
594 /*
595 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
596 * furthest-back backend that needs signaling (if any), and reset any
597 * backends that are too far back. Note that because we ignore sendOnly
598 * backends here it is possible for them to keep sending messages without
599 * a problem even when they are the only active backend.
600 */
601 min = segP->maxMsgNum;
602 minsig = min - SIG_THRESHOLD;
604
605 for (i = 0; i < segP->numProcs; i++)
606 {
607 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
608 int n = stateP->nextMsgNum;
609
610 /* Ignore if already in reset state */
611 Assert(stateP->procPid != 0);
612 if (stateP->resetState || stateP->sendOnly)
613 continue;
614
615 /*
616 * If we must free some space and this backend is preventing it, force
617 * him into reset state and then ignore until he catches up.
618 */
619 if (n < lowbound)
620 {
621 stateP->resetState = true;
622 /* no point in signaling him ... */
623 continue;
624 }
625
626 /* Track the global minimum nextMsgNum */
627 if (n < min)
628 min = n;
629
630 /* Also see who's furthest back of the unsignaled backends */
631 if (n < minsig && !stateP->signaled)
632 {
633 minsig = n;
634 needSig = stateP;
635 }
636 }
637 segP->minMsgNum = min;
638
639 /*
640 * When minMsgNum gets really large, decrement all message counters so as
641 * to forestall overflow of the counters. This happens seldom enough that
642 * folding it into the previous loop would be a loser.
643 */
644 if (min >= MSGNUMWRAPAROUND)
645 {
646 segP->minMsgNum -= MSGNUMWRAPAROUND;
647 segP->maxMsgNum -= MSGNUMWRAPAROUND;
648 for (i = 0; i < segP->numProcs; i++)
649 segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
650 }
651
652 /*
653 * Determine how many messages are still in the queue, and set the
654 * threshold at which we should repeat SICleanupQueue().
655 */
656 numMsgs = segP->maxMsgNum - segP->minMsgNum;
657 if (numMsgs < CLEANUP_MIN)
658 segP->nextThreshold = CLEANUP_MIN;
659 else
660 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
661
662 /*
663 * Lastly, signal anyone who needs a catchup interrupt. Since
664 * SendProcSignal() might not be fast, we don't want to hold locks while
665 * executing it.
666 */
667 if (needSig)
668 {
669 pid_t his_pid = needSig->procPid;
670 ProcNumber his_procNumber = (needSig - &segP->procState[0]);
671
672 needSig->signaled = true;
675 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
679 }
680 else
681 {
685 }
686}
#define Assert(condition)
Definition c.h:943
#define DEBUG4
Definition elog.h:28
int i
Definition isn.c:77
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:288
@ PROCSIG_CATCHUP_INTERRUPT
Definition procsignal.h:32
#define CLEANUP_QUANTUM
Definition sinvaladt.c:133
#define MAXNUMMESSAGES
Definition sinvaladt.c:130
#define MSGNUMWRAPAROUND
Definition sinvaladt.c:131
#define SIG_THRESHOLD
Definition sinvaladt.c:134
#define CLEANUP_MIN
Definition sinvaladt.c:132
int nextMsgNum
Definition sinvaladt.c:143

References Assert, CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXNUMMESSAGES, MSGNUMWRAPAROUND, ProcState::nextMsgNum, PROCSIG_CATCHUP_INTERRUPT, SendProcSignal(), shmInvalBuffer, and SIG_THRESHOLD.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)
extern

Definition at line 475 of file sinvaladt.c.

476{
477 SISeg *segP;
479 int max;
480 int n;
481
484
485 /*
486 * Before starting to take locks, do a quick, unlocked test to see whether
487 * there can possibly be anything to read. On a multiprocessor system,
488 * it's possible that this load could migrate backwards and occur before
489 * we actually enter this function, so we might miss a sinval message that
490 * was just added by some other processor. But they can't migrate
491 * backwards over a preceding lock acquisition, so it should be OK. If we
492 * haven't acquired a lock preventing against further relevant
493 * invalidations, any such occurrence is not much different than if the
494 * invalidation had arrived slightly later in the first place.
495 */
496 if (!stateP->hasMessages)
497 return 0;
498
500
501 /*
502 * We must reset hasMessages before determining how many messages we're
503 * going to read. That way, if new messages arrive after we have
504 * determined how many we're reading, the flag will get reset and we'll
505 * notice those messages part-way through.
506 *
507 * Note that, if we don't end up reading all of the messages, we had
508 * better be certain to reset this flag before exiting!
509 */
510 stateP->hasMessages = false;
511
512 /* Fetch current value of maxMsgNum using spinlock */
513 SpinLockAcquire(&segP->msgnumLock);
514 max = segP->maxMsgNum;
515 SpinLockRelease(&segP->msgnumLock);
516
517 if (stateP->resetState)
518 {
519 /*
520 * Force reset. We can say we have dealt with any messages added
521 * since the reset, as well; and that means we should clear the
522 * signaled flag, too.
523 */
524 stateP->nextMsgNum = max;
525 stateP->resetState = false;
526 stateP->signaled = false;
528 return -1;
529 }
530
531 /*
532 * Retrieve messages and advance backend's counter, until data array is
533 * full or there are no more messages.
534 *
535 * There may be other backends that haven't read the message(s), so we
536 * cannot delete them here. SICleanupQueue() will eventually remove them
537 * from the queue.
538 */
539 n = 0;
540 while (n < datasize && stateP->nextMsgNum < max)
541 {
542 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
543 stateP->nextMsgNum++;
544 }
545
546 /*
547 * If we have caught up completely, reset our "signaled" flag so that
548 * we'll get another signal if we fall behind again.
549 *
550 * If we haven't caught up completely, reset the hasMessages flag so that
551 * we see the remaining messages next time.
552 */
553 if (stateP->nextMsgNum >= max)
554 stateP->signaled = false;
555 else
556 stateP->hasMessages = true;
557
559 return n;
560}
@ LW_SHARED
Definition lwlock.h:105
const void * data
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition sinvaladt.c:196

References data, fb(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MAXNUMMESSAGES, MyProcNumber, SISeg::procState, shmInvalBuffer, SpinLockAcquire(), and SpinLockRelease().

Referenced by ReceiveSharedInvalidMessages().

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)
extern

Definition at line 372 of file sinvaladt.c.

373{
375
376 /*
377 * N can be arbitrarily large. We divide the work into groups of no more
378 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
379 * an unreasonably long time. (This is not so much because we care about
380 * letting in other writers, as that some just-caught-up backend might be
381 * trying to do SICleanupQueue to pass on its signal, and we don't want it
382 * to have to wait a long time.) Also, we need to consider calling
383 * SICleanupQueue every so often.
384 */
385 while (n > 0)
386 {
387 int nthistime = Min(n, WRITE_QUANTUM);
388 int numMsgs;
389 int max;
390 int i;
391
392 n -= nthistime;
393
395
396 /*
397 * If the buffer is full, we *must* acquire some space. Clean the
398 * queue and reset anyone who is preventing space from being freed.
399 * Otherwise, clean the queue only when it's exceeded the next
400 * fullness threshold. We have to loop and recheck the buffer state
401 * after any call of SICleanupQueue.
402 */
403 for (;;)
404 {
405 numMsgs = segP->maxMsgNum - segP->minMsgNum;
407 numMsgs >= segP->nextThreshold)
409 else
410 break;
411 }
412
413 /*
414 * Insert new message(s) into proper slot of circular buffer
415 */
416 max = segP->maxMsgNum;
417 while (nthistime-- > 0)
418 {
419 segP->buffer[max % MAXNUMMESSAGES] = *data++;
420 max++;
421 }
422
423 /* Update current value of maxMsgNum using spinlock */
424 SpinLockAcquire(&segP->msgnumLock);
425 segP->maxMsgNum = max;
426 SpinLockRelease(&segP->msgnumLock);
427
428 /*
429 * Now that the maxMsgNum change is globally visible, we give everyone
430 * a swift kick to make sure they read the newly added messages.
431 * Releasing SInvalWriteLock will enforce a full memory barrier, so
432 * these (unlocked) changes will be committed to memory before we exit
433 * the function.
434 */
435 for (i = 0; i < segP->numProcs; i++)
436 {
437 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
438
439 stateP->hasMessages = true;
440 }
441
443 }
444}
#define Min(x, y)
Definition c.h:1091
#define WRITE_QUANTUM
Definition sinvaladt.c:135
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition sinvaladt.c:579
bool hasMessages
Definition sinvaladt.c:146

References data, fb(), ProcState::hasMessages, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXNUMMESSAGES, Min, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire(), SpinLockRelease(), and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().