PostgreSQL Source Code git master
Loading...
Searching...
No Matches
sinvaladt.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procnumber.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
Include dependency graph for sinvaladt.c:

Go to the source code of this file.

Data Structures

struct  ProcState
 
struct  SISeg
 

Macros

#define MAXNUMMESSAGES   4096
 
#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)
 
#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)
 
#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)
 
#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)
 
#define WRITE_QUANTUM   64
 
#define NumProcStateSlots   (MaxBackends + NUM_AUXILIARY_PROCS)
 

Typedefs

typedef struct ProcState ProcState
 
typedef struct SISeg SISeg
 

Functions

static void CleanupInvalidationState (int status, Datum arg)
 
Size SharedInvalShmemSize (void)
 
void SharedInvalShmemInit (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Variables

static SISegshmInvalBuffer
 
static LocalTransactionId nextLocalTransactionId
 

Macro Definition Documentation

◆ CLEANUP_MIN

#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)

Definition at line 131 of file sinvaladt.c.

◆ CLEANUP_QUANTUM

#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)

Definition at line 132 of file sinvaladt.c.

◆ MAXNUMMESSAGES

#define MAXNUMMESSAGES   4096

Definition at line 129 of file sinvaladt.c.

◆ MSGNUMWRAPAROUND

#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)

Definition at line 130 of file sinvaladt.c.

◆ NumProcStateSlots

#define NumProcStateSlots   (MaxBackends + NUM_AUXILIARY_PROCS)

Definition at line 204 of file sinvaladt.c.

◆ SIG_THRESHOLD

#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)

Definition at line 133 of file sinvaladt.c.

◆ WRITE_QUANTUM

#define WRITE_QUANTUM   64

Definition at line 134 of file sinvaladt.c.

Typedef Documentation

◆ ProcState

◆ SISeg

Function Documentation

◆ CleanupInvalidationState()

static void CleanupInvalidationState ( int  status,
Datum  arg 
)
static

Definition at line 328 of file sinvaladt.c.

329{
332 int i;
333
334 Assert(segP);
335
337
338 stateP = &segP->procState[MyProcNumber];
339
340 /* Update next local transaction ID for next holder of this proc number */
341 stateP->nextLXID = nextLocalTransactionId;
342
343 /* Mark myself inactive */
344 stateP->procPid = 0;
345 stateP->nextMsgNum = 0;
346 stateP->resetState = false;
347 stateP->signaled = false;
348
349 for (i = segP->numProcs - 1; i >= 0; i--)
350 {
351 if (segP->pgprocnos[i] == MyProcNumber)
352 {
353 if (i != segP->numProcs - 1)
354 segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
355 break;
356 }
357 }
358 if (i < 0)
359 elog(PANIC, "could not find entry in sinval array");
360 segP->numProcs--;
361
363}
#define Assert(condition)
Definition c.h:873
#define PANIC
Definition elog.h:42
#define elog(elevel,...)
Definition elog.h:226
ProcNumber MyProcNumber
Definition globals.c:90
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * arg
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342
static int fb(int x)
static LocalTransactionId nextLocalTransactionId
Definition sinvaladt.c:209

References arg, Assert, DatumGetPointer(), elog, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, nextLocalTransactionId, and PANIC.

Referenced by SharedInvalBackendInit().

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 701 of file sinvaladt.c.

702{
703 LocalTransactionId result;
704
705 /* loop to avoid returning InvalidLocalTransactionId at wraparound */
706 do
707 {
708 result = nextLocalTransactionId++;
709 } while (!LocalTransactionIdIsValid(result));
710
711 return result;
712}
uint32 LocalTransactionId
Definition c.h:668
#define LocalTransactionIdIsValid(lxid)
Definition lock.h:68

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 272 of file sinvaladt.c.

273{
277
278 if (MyProcNumber < 0)
279 elog(ERROR, "MyProcNumber not set");
281 elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
283 stateP = &segP->procState[MyProcNumber];
284
285 /*
286 * This can run in parallel with read operations, but not with write
287 * operations, since SIInsertDataEntries relies on the pgprocnos array to
288 * set hasMessages appropriately.
289 */
291
292 oldPid = stateP->procPid;
293 if (oldPid != 0)
294 {
296 elog(ERROR, "sinval slot for backend %d is already in use by process %d",
297 MyProcNumber, (int) oldPid);
298 }
299
301
302 /* Fetch next local transaction ID into local memory */
303 nextLocalTransactionId = stateP->nextLXID;
304
305 /* mark myself active, with all extant messages already read */
306 stateP->procPid = MyProcPid;
307 stateP->nextMsgNum = segP->maxMsgNum;
308 stateP->resetState = false;
309 stateP->signaled = false;
310 stateP->hasMessages = false;
311 stateP->sendOnly = sendOnly;
312
314
315 /* register exit routine to mark my entry inactive at exit */
317}
#define ERROR
Definition elog.h:39
int MyProcPid
Definition globals.c:47
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static Datum PointerGetDatum(const void *X)
Definition postgres.h:352
static SISeg * shmInvalBuffer
Definition sinvaladt.c:206
#define NumProcStateSlots
Definition sinvaladt.c:204
static void CleanupInvalidationState(int status, Datum arg)
Definition sinvaladt.c:328
int * pgprocnos
Definition sinvaladt.c:194
int numProcs
Definition sinvaladt.c:193

References CleanupInvalidationState(), elog, ERROR, fb(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, nextLocalTransactionId, SISeg::numProcs, NumProcStateSlots, on_shmem_exit(), PANIC, SISeg::pgprocnos, PointerGetDatum(), and shmInvalBuffer.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

◆ SharedInvalShmemInit()

void SharedInvalShmemInit ( void  )

Definition at line 234 of file sinvaladt.c.

235{
236 int i;
237 bool found;
238
239 /* Allocate space in shared memory */
241 ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
242 if (found)
243 return;
244
245 /* Clear message counters, save size of procState array, init spinlock */
250
251 /* The buffer[] array is initially all unused, so we need not fill it */
252
253 /* Mark all backends inactive, and initialize nextLXID */
254 for (i = 0; i < NumProcStateSlots; i++)
255 {
256 shmInvalBuffer->procState[i].procPid = 0; /* inactive */
257 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
262 }
265}
#define InvalidLocalTransactionId
Definition lock.h:67
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
Size SharedInvalShmemSize(void)
Definition sinvaladt.c:218
#define CLEANUP_MIN
Definition sinvaladt.c:131
#define SpinLockInit(lock)
Definition spin.h:57
int nextMsgNum
Definition sinvaladt.c:142
bool signaled
Definition sinvaladt.c:144
LocalTransactionId nextLXID
Definition sinvaladt.c:161
pid_t procPid
Definition sinvaladt.c:140
bool hasMessages
Definition sinvaladt.c:145
bool resetState
Definition sinvaladt.c:143
int minMsgNum
Definition sinvaladt.c:170
int maxMsgNum
Definition sinvaladt.c:171
slock_t msgnumLock
Definition sinvaladt.c:174
int nextThreshold
Definition sinvaladt.c:172
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition sinvaladt.c:195

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, SISeg::numProcs, NumProcStateSlots, SISeg::pgprocnos, ProcState::procPid, SISeg::procState, ProcState::resetState, SharedInvalShmemSize(), ShmemInitStruct(), shmInvalBuffer, ProcState::signaled, and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ SharedInvalShmemSize()

Size SharedInvalShmemSize ( void  )

Definition at line 218 of file sinvaladt.c.

219{
220 Size size;
221
222 size = offsetof(SISeg, procState);
223 size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
224 size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
225
226 return size;
227}
size_t Size
Definition c.h:619
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497

References add_size(), fb(), mul_size(), and NumProcStateSlots.

Referenced by CalculateShmemSize(), and SharedInvalShmemInit().

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 577 of file sinvaladt.c.

578{
580 int min,
581 minsig,
582 lowbound,
583 numMsgs,
584 i;
586
587 /* Lock out all writers and readers */
591
592 /*
593 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
594 * furthest-back backend that needs signaling (if any), and reset any
595 * backends that are too far back. Note that because we ignore sendOnly
596 * backends here it is possible for them to keep sending messages without
597 * a problem even when they are the only active backend.
598 */
599 min = segP->maxMsgNum;
600 minsig = min - SIG_THRESHOLD;
602
603 for (i = 0; i < segP->numProcs; i++)
604 {
605 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
606 int n = stateP->nextMsgNum;
607
608 /* Ignore if already in reset state */
609 Assert(stateP->procPid != 0);
610 if (stateP->resetState || stateP->sendOnly)
611 continue;
612
613 /*
614 * If we must free some space and this backend is preventing it, force
615 * him into reset state and then ignore until he catches up.
616 */
617 if (n < lowbound)
618 {
619 stateP->resetState = true;
620 /* no point in signaling him ... */
621 continue;
622 }
623
624 /* Track the global minimum nextMsgNum */
625 if (n < min)
626 min = n;
627
628 /* Also see who's furthest back of the unsignaled backends */
629 if (n < minsig && !stateP->signaled)
630 {
631 minsig = n;
632 needSig = stateP;
633 }
634 }
635 segP->minMsgNum = min;
636
637 /*
638 * When minMsgNum gets really large, decrement all message counters so as
639 * to forestall overflow of the counters. This happens seldom enough that
640 * folding it into the previous loop would be a loser.
641 */
642 if (min >= MSGNUMWRAPAROUND)
643 {
644 segP->minMsgNum -= MSGNUMWRAPAROUND;
645 segP->maxMsgNum -= MSGNUMWRAPAROUND;
646 for (i = 0; i < segP->numProcs; i++)
647 segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
648 }
649
650 /*
651 * Determine how many messages are still in the queue, and set the
652 * threshold at which we should repeat SICleanupQueue().
653 */
654 numMsgs = segP->maxMsgNum - segP->minMsgNum;
655 if (numMsgs < CLEANUP_MIN)
656 segP->nextThreshold = CLEANUP_MIN;
657 else
658 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
659
660 /*
661 * Lastly, signal anyone who needs a catchup interrupt. Since
662 * SendProcSignal() might not be fast, we don't want to hold locks while
663 * executing it.
664 */
665 if (needSig)
666 {
667 pid_t his_pid = needSig->procPid;
668 ProcNumber his_procNumber = (needSig - &segP->procState[0]);
669
670 needSig->signaled = true;
673 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
677 }
678 else
679 {
683 }
684}
#define DEBUG4
Definition elog.h:27
int ProcNumber
Definition procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:284
@ PROCSIG_CATCHUP_INTERRUPT
Definition procsignal.h:32
#define CLEANUP_QUANTUM
Definition sinvaladt.c:132
#define MAXNUMMESSAGES
Definition sinvaladt.c:129
#define MSGNUMWRAPAROUND
Definition sinvaladt.c:130
#define SIG_THRESHOLD
Definition sinvaladt.c:133

References Assert, CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXNUMMESSAGES, MSGNUMWRAPAROUND, ProcState::nextMsgNum, PROCSIG_CATCHUP_INTERRUPT, SendProcSignal(), shmInvalBuffer, and SIG_THRESHOLD.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 473 of file sinvaladt.c.

474{
475 SISeg *segP;
477 int max;
478 int n;
479
482
483 /*
484 * Before starting to take locks, do a quick, unlocked test to see whether
485 * there can possibly be anything to read. On a multiprocessor system,
486 * it's possible that this load could migrate backwards and occur before
487 * we actually enter this function, so we might miss a sinval message that
488 * was just added by some other processor. But they can't migrate
489 * backwards over a preceding lock acquisition, so it should be OK. If we
490 * haven't acquired a lock preventing against further relevant
491 * invalidations, any such occurrence is not much different than if the
492 * invalidation had arrived slightly later in the first place.
493 */
494 if (!stateP->hasMessages)
495 return 0;
496
498
499 /*
500 * We must reset hasMessages before determining how many messages we're
501 * going to read. That way, if new messages arrive after we have
502 * determined how many we're reading, the flag will get reset and we'll
503 * notice those messages part-way through.
504 *
505 * Note that, if we don't end up reading all of the messages, we had
506 * better be certain to reset this flag before exiting!
507 */
508 stateP->hasMessages = false;
509
510 /* Fetch current value of maxMsgNum using spinlock */
511 SpinLockAcquire(&segP->msgnumLock);
512 max = segP->maxMsgNum;
513 SpinLockRelease(&segP->msgnumLock);
514
515 if (stateP->resetState)
516 {
517 /*
518 * Force reset. We can say we have dealt with any messages added
519 * since the reset, as well; and that means we should clear the
520 * signaled flag, too.
521 */
522 stateP->nextMsgNum = max;
523 stateP->resetState = false;
524 stateP->signaled = false;
526 return -1;
527 }
528
529 /*
530 * Retrieve messages and advance backend's counter, until data array is
531 * full or there are no more messages.
532 *
533 * There may be other backends that haven't read the message(s), so we
534 * cannot delete them here. SICleanupQueue() will eventually remove them
535 * from the queue.
536 */
537 n = 0;
538 while (n < datasize && stateP->nextMsgNum < max)
539 {
540 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
541 stateP->nextMsgNum++;
542 }
543
544 /*
545 * If we have caught up completely, reset our "signaled" flag so that
546 * we'll get another signal if we fall behind again.
547 *
548 * If we haven't caught up completely, reset the hasMessages flag so that
549 * we see the remaining messages next time.
550 */
551 if (stateP->nextMsgNum >= max)
552 stateP->signaled = false;
553 else
554 stateP->hasMessages = true;
555
557 return n;
558}
@ LW_SHARED
Definition lwlock.h:113
const void * data
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59

References data, fb(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MAXNUMMESSAGES, MyProcNumber, SISeg::procState, shmInvalBuffer, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 370 of file sinvaladt.c.

371{
373
374 /*
375 * N can be arbitrarily large. We divide the work into groups of no more
376 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
377 * an unreasonably long time. (This is not so much because we care about
378 * letting in other writers, as that some just-caught-up backend might be
379 * trying to do SICleanupQueue to pass on its signal, and we don't want it
380 * to have to wait a long time.) Also, we need to consider calling
381 * SICleanupQueue every so often.
382 */
383 while (n > 0)
384 {
385 int nthistime = Min(n, WRITE_QUANTUM);
386 int numMsgs;
387 int max;
388 int i;
389
390 n -= nthistime;
391
393
394 /*
395 * If the buffer is full, we *must* acquire some space. Clean the
396 * queue and reset anyone who is preventing space from being freed.
397 * Otherwise, clean the queue only when it's exceeded the next
398 * fullness threshold. We have to loop and recheck the buffer state
399 * after any call of SICleanupQueue.
400 */
401 for (;;)
402 {
403 numMsgs = segP->maxMsgNum - segP->minMsgNum;
405 numMsgs >= segP->nextThreshold)
407 else
408 break;
409 }
410
411 /*
412 * Insert new message(s) into proper slot of circular buffer
413 */
414 max = segP->maxMsgNum;
415 while (nthistime-- > 0)
416 {
417 segP->buffer[max % MAXNUMMESSAGES] = *data++;
418 max++;
419 }
420
421 /* Update current value of maxMsgNum using spinlock */
422 SpinLockAcquire(&segP->msgnumLock);
423 segP->maxMsgNum = max;
424 SpinLockRelease(&segP->msgnumLock);
425
426 /*
427 * Now that the maxMsgNum change is globally visible, we give everyone
428 * a swift kick to make sure they read the newly added messages.
429 * Releasing SInvalWriteLock will enforce a full memory barrier, so
430 * these (unlocked) changes will be committed to memory before we exit
431 * the function.
432 */
433 for (i = 0; i < segP->numProcs; i++)
434 {
435 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
436
437 stateP->hasMessages = true;
438 }
439
441 }
442}
#define Min(x, y)
Definition c.h:997
#define WRITE_QUANTUM
Definition sinvaladt.c:134
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition sinvaladt.c:577

References data, fb(), ProcState::hasMessages, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXNUMMESSAGES, Min, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

Variable Documentation

◆ nextLocalTransactionId

LocalTransactionId nextLocalTransactionId
static

◆ shmInvalBuffer

SISeg* shmInvalBuffer
static