PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"
 
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
 
#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_origins, bool recoveryOK)
 
static bool IsReservedOriginName (const char *name)
 
RepOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId replorigin_create (const char *roname)
 
static void replorigin_state_clear (RepOriginId roident, bool nowait)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void replorigin_session_reset_internal (void)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

int max_active_replication_origins = 10
 
RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ PG_REPLORIGIN_CHECKPOINT_FILENAME

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"

Definition at line 100 of file origin.c.

◆ PG_REPLORIGIN_CHECKPOINT_TMPFILE

#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"

Definition at line 101 of file origin.c.

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 190 of file origin.c.

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 602 of file origin.c.

603{
605 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
606 int tmpfd;
607 int i;
610
612 return;
613
615
616 /* make sure no old temp file is remaining */
617 if (unlink(tmppath) < 0 && errno != ENOENT)
620 errmsg("could not remove file \"%s\": %m",
621 tmppath)));
622
623 /*
624 * no other backend can perform this at the same time; only one checkpoint
625 * can happen at a time.
626 */
629 if (tmpfd < 0)
632 errmsg("could not create file \"%s\": %m",
633 tmppath)));
634
635 /* write magic */
636 errno = 0;
637 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
638 {
639 /* if write didn't set errno, assume problem is no disk space */
640 if (errno == 0)
641 errno = ENOSPC;
644 errmsg("could not write to file \"%s\": %m",
645 tmppath)));
646 }
647 COMP_CRC32C(crc, &magic, sizeof(magic));
648
649 /* prevent concurrent creations/drops */
651
652 /* write actual data */
653 for (i = 0; i < max_active_replication_origins; i++)
654 {
657 XLogRecPtr local_lsn;
658
659 if (curstate->roident == InvalidRepOriginId)
660 continue;
661
662 /* zero, to avoid uninitialized padding bytes */
663 memset(&disk_state, 0, sizeof(disk_state));
664
666
667 disk_state.roident = curstate->roident;
668
669 disk_state.remote_lsn = curstate->remote_lsn;
670 local_lsn = curstate->local_lsn;
671
672 LWLockRelease(&curstate->lock);
673
674 /* make sure we only write out a commit that's persistent */
675 XLogFlush(local_lsn);
676
677 errno = 0;
678 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
679 sizeof(disk_state))
680 {
681 /* if write didn't set errno, assume problem is no disk space */
682 if (errno == 0)
683 errno = ENOSPC;
686 errmsg("could not write to file \"%s\": %m",
687 tmppath)));
688 }
689
691 }
692
694
695 /* write out the CRC */
697 errno = 0;
698 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
699 {
700 /* if write didn't set errno, assume problem is no disk space */
701 if (errno == 0)
702 errno = ENOSPC;
705 errmsg("could not write to file \"%s\": %m",
706 tmppath)));
707 }
708
709 if (CloseTransientFile(tmpfd) != 0)
712 errmsg("could not close file \"%s\": %m",
713 tmppath)));
714
715 /* fsync, rename to permanent file, fsync file and directory */
717}
#define PG_BINARY
Definition c.h:1281
uint32_t uint32
Definition c.h:556
int errcode_for_file_access(void)
Definition elog.c:886
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define PANIC
Definition elog.h:42
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:779
int CloseTransientFile(int fd)
Definition fd.c:2851
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2674
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
int max_active_replication_origins
Definition origin.c:104
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:101
static ReplicationState * replication_states
Definition origin.c:174
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:100
#define REPLICATION_STATE_MAGIC
Definition origin.c:190
#define InvalidRepOriginId
Definition origin.h:33
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
static int fb(int x)
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2784
uint64 XLogRecPtr
Definition xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), fb(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, REPLICATION_STATE_MAGIC, replication_states, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ IsReservedOriginName()

static bool IsReservedOriginName ( const char name)
static

Definition at line 212 of file origin.c.

213{
214 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
216}
int pg_strcasecmp(const char *s1, const char *s2)
const char * name

References fb(), name, and pg_strcasecmp().

Referenced by pg_replication_origin_create().

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1547 of file origin.c.

1548{
1551 RepOriginId node;
1552
1553 replorigin_check_prerequisites(true, false);
1554
1555 /* lock to prevent the replication origin from vanishing */
1557
1558 node = replorigin_by_name(text_to_cstring(name), false);
1559
1560 /*
1561 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1562 * xact hasn't committed yet. This is why this function should be used to
1563 * set up the initial replication state, but not for replay.
1564 */
1566 true /* go backward */ , true /* WAL log */ );
1567
1569
1571}
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
#define RowExclusiveLock
Definition lockdefs.h:38
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:229
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:916
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:193
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
Definition c.h:716
char * text_to_cstring(const text *t)
Definition varlena.c:214
uint16 RepOriginId
Definition xlogdefs.h:69
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References fb(), InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1358 of file origin.c.

1359{
1360 char *name;
1361 RepOriginId roident;
1362
1363 replorigin_check_prerequisites(false, false);
1364
1366
1367 /*
1368 * Replication origins "any and "none" are reserved for system options.
1369 * The origins "pg_xxx" are reserved for internal use.
1370 */
1372 ereport(ERROR,
1374 errmsg("replication origin name \"%s\" is reserved",
1375 name),
1376 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1378
1379 /*
1380 * If built with appropriate switch, whine when regression-testing
1381 * conventions for replication origin names are violated.
1382 */
1383#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1384 if (strncmp(name, "regress_", 8) != 0)
1385 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1386#endif
1387
1388 roident = replorigin_create(name);
1389
1390 pfree(name);
1391
1392 PG_RETURN_OID(roident);
1393}
bool IsReservedName(const char *name)
Definition catalog.c:278
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errcode(int sqlerrcode)
Definition elog.c:863
#define WARNING
Definition elog.h:36
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_OID(x)
Definition fmgr.h:361
void pfree(void *pointer)
Definition mcxt.c:1616
RepOriginId replorigin_create(const char *roname)
Definition origin.c:260
static bool IsReservedOriginName(const char *name)
Definition origin.c:212
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342

References DatumGetPointer(), elog, ereport, errcode(), errdetail(), errmsg(), ERROR, fb(), IsReservedName(), IsReservedOriginName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), text_to_cstring(), and WARNING.

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1399 of file origin.c.

1400{
1401 char *name;
1402
1403 replorigin_check_prerequisites(false, false);
1404
1406
1407 replorigin_drop_by_name(name, false, true);
1408
1409 pfree(name);
1410
1412}
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:445

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1418 of file origin.c.

1419{
1420 char *name;
1421 RepOriginId roident;
1422
1423 replorigin_check_prerequisites(false, false);
1424
1426 roident = replorigin_by_name(name, true);
1427
1428 pfree(name);
1429
1430 if (OidIsValid(roident))
1431 PG_RETURN_OID(roident);
1433}
#define OidIsValid(objectId)
Definition c.h:798
#define PG_RETURN_NULL()
Definition fmgr.h:346

References DatumGetPointer(), name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1582 of file origin.c.

1583{
1584 char *name;
1585 bool flush;
1586 RepOriginId roident;
1587 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1588
1590
1592 flush = PG_GETARG_BOOL(1);
1593
1594 roident = replorigin_by_name(name, false);
1595 Assert(OidIsValid(roident));
1596
1597 remote_lsn = replorigin_get_progress(roident, flush);
1598
1599 if (!XLogRecPtrIsValid(remote_lsn))
1601
1602 PG_RETURN_LSN(remote_lsn);
1603}
#define Assert(condition)
Definition c.h:883
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition origin.c:1045
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References Assert, DatumGetPointer(), InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), text_to_cstring(), and XLogRecPtrIsValid.

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1480 of file origin.c.

1481{
1482 replorigin_check_prerequisites(false, false);
1483
1485}
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
RepOriginId replorigin_session_origin
Definition origin.c:166

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1496 of file origin.c.

1497{
1498 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1499 bool flush = PG_GETARG_BOOL(0);
1500
1501 replorigin_check_prerequisites(true, false);
1502
1504 ereport(ERROR,
1506 errmsg("no replication origin is configured")));
1507
1508 remote_lsn = replorigin_session_get_progress(flush);
1509
1510 if (!XLogRecPtrIsValid(remote_lsn))
1512
1513 PG_RETURN_LSN(remote_lsn);
1514}
static ReplicationState * session_replication_state
Definition origin.c:187
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1326

References ereport, errcode(), errmsg(), ERROR, fb(), InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), replorigin_session_get_progress(), session_replication_state, and XLogRecPtrIsValid.

◆ pg_replication_origin_session_reset()

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1439 of file origin.c.

1440{
1441 char *name;
1442 RepOriginId origin;
1443 int pid;
1444
1445 replorigin_check_prerequisites(true, false);
1446
1448 origin = replorigin_by_name(name, false);
1449 pid = PG_GETARG_INT32(1);
1450 replorigin_session_setup(origin, pid);
1451
1453
1454 pfree(name);
1455
1457}
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition origin.c:1144

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_GETARG_INT32, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

◆ pg_replication_origin_xact_reset()

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1607 of file origin.c.

1608{
1609 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1610 int i;
1611#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1612
1613 /* we want to return 0 rows if slot is set to zero */
1614 replorigin_check_prerequisites(false, true);
1615
1616 InitMaterializedSRF(fcinfo, 0);
1617
1618 /* prevent slots from being concurrently dropped */
1620
1621 /*
1622 * Iterate through all possible replication_states, display if they are
1623 * filled. Note that we do not take any locks, so slightly corrupted/out
1624 * of date values are a possibility.
1625 */
1626 for (i = 0; i < max_active_replication_origins; i++)
1627 {
1631 char *roname;
1632
1634
1635 /* unused slot, nothing to display */
1636 if (state->roident == InvalidRepOriginId)
1637 continue;
1638
1639 memset(values, 0, sizeof(values));
1640 memset(nulls, 1, sizeof(nulls));
1641
1642 values[0] = ObjectIdGetDatum(state->roident);
1643 nulls[0] = false;
1644
1645 /*
1646 * We're not preventing the origin to be dropped concurrently, so
1647 * silently accept that it might be gone.
1648 */
1649 if (replorigin_by_oid(state->roident, true,
1650 &roname))
1651 {
1653 nulls[1] = false;
1654 }
1655
1656 LWLockAcquire(&state->lock, LW_SHARED);
1657
1658 values[2] = LSNGetDatum(state->remote_lsn);
1659 nulls[2] = false;
1660
1661 values[3] = LSNGetDatum(state->local_lsn);
1662 nulls[3] = false;
1663
1664 LWLockRelease(&state->lock);
1665
1666 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1667 values, nulls);
1668 }
1669
1671
1672#undef REPLICATION_ORIGIN_PROGRESS_COLS
1673
1674 return (Datum) 0;
1675}
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition origin.c:499
#define REPLICATION_ORIGIN_PROGRESS_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:784

References CStringGetTextDatum, fb(), i, InitMaterializedSRF(), InvalidRepOriginId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, ObjectIdGetDatum(), REPLICATION_ORIGIN_PROGRESS_COLS, replication_states, replorigin_by_oid(), replorigin_check_prerequisites(), tuplestore_putvalues(), and values.

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1117 of file origin.c.

1118{
1120 return;
1121
1123}
static void replorigin_session_reset_internal(void)
Definition origin.c:1083

References fb(), replorigin_session_reset_internal(), and session_replication_state.

Referenced by replorigin_session_setup().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 555 of file origin.c.

556{
557 bool found;
558
560 return;
561
563 ShmemInitStruct("ReplicationOriginState",
565 &found);
567
568 if (!found)
569 {
570 int i;
571
573
575
576 for (i = 0; i < max_active_replication_origins; i++)
577 {
581 }
582 }
583}
#define MemSet(start, val, len)
Definition c.h:1023
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:179
Size ReplicationOriginShmemSize(void)
Definition origin.c:540
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:162

References ConditionVariableInit(), fb(), i, LWLockInitialize(), max_active_replication_origins, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 540 of file origin.c.

541{
542 Size size = 0;
543
545 return size;
546
547 size = add_size(size, offsetof(ReplicationStateCtl, states));
548
549 size = add_size(size,
551 return size;
552}
size_t Size
Definition c.h:629
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510

References add_size(), fb(), max_active_replication_origins, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 916 of file origin.c.

919{
920 int i;
923
924 Assert(node != InvalidRepOriginId);
925
926 /* we don't track DoNotReplicateId */
927 if (node == DoNotReplicateId)
928 return;
929
930 /*
931 * XXX: For the case where this is called by WAL replay, it'd be more
932 * efficient to restore into a backend local hashtable and only dump into
933 * shmem after recovery is finished. Let's wait with implementing that
934 * till it's shown to be a measurable expense
935 */
936
937 /* Lock exclusively, as we may have to create a new table entry. */
939
940 /*
941 * Search for either an existing slot for the origin, or a free one we can
942 * use.
943 */
944 for (i = 0; i < max_active_replication_origins; i++)
945 {
947
948 /* remember where to insert if necessary */
949 if (curstate->roident == InvalidRepOriginId &&
950 free_state == NULL)
951 {
953 continue;
954 }
955
956 /* not our slot */
957 if (curstate->roident != node)
958 {
959 continue;
960 }
961
962 /* ok, found slot */
964
966
967 /* Make sure it's not used by somebody else */
968 if (replication_state->refcount > 0)
969 {
972 (replication_state->acquired_by != 0)
973 ? errmsg("replication origin with ID %d is already active for PID %d",
974 replication_state->roident,
975 replication_state->acquired_by)
976 : errmsg("replication origin with ID %d is already active in another process",
977 replication_state->roident)));
978 }
979
980 break;
981 }
982
986 errmsg("could not find free replication state slot for replication origin with ID %d",
987 node),
988 errhint("Increase \"max_active_replication_origins\" and try again.")));
989
990 if (replication_state == NULL)
991 {
992 /* initialize new slot */
997 replication_state->roident = node;
998 }
999
1001
1002 /*
1003 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1004 * and the standby gets the message. Primarily this will be called during
1005 * WAL replay (of commit records) where no WAL logging is necessary.
1006 */
1007 if (wal_log)
1008 {
1010
1012 xlrec.node_id = node;
1013 xlrec.force = go_backward;
1014
1016 XLogRegisterData(&xlrec, sizeof(xlrec));
1017
1019 }
1020
1021 /*
1022 * Due to - harmless - race conditions during a checkpoint we could see
1023 * values here that are older than the ones we already have in memory. We
1024 * could also see older values for prepared transactions when the prepare
1025 * is sent at a later point of time along with commit prepared and there
1026 * are other transactions commits between prepare and commit prepared. See
1027 * ReorderBufferFinishPrepared. Don't overwrite those.
1028 */
1029 if (go_backward || replication_state->remote_lsn < remote_commit)
1030 replication_state->remote_lsn = remote_commit;
1032 (go_backward || replication_state->local_lsn < local_commit))
1033 replication_state->local_lsn = local_commit;
1035
1036 /*
1037 * Release *after* changing the LSNs, slot isn't acquired and thus could
1038 * otherwise be dropped anytime.
1039 */
1041}
int errhint(const char *fmt,...)
Definition elog.c:1330
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
XLogRecPtr remote_lsn
Definition origin.h:20
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152

References Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, fb(), i, InvalidRepOriginId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::remote_lsn, replication_states, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), XLogRecPtrIsValid, and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( const char roname,
bool  missing_ok 
)

Definition at line 229 of file origin.c.

230{
232 Oid roident = InvalidOid;
233 HeapTuple tuple;
235
237
239 if (HeapTupleIsValid(tuple))
240 {
242 roident = ident->roident;
243 ReleaseSysCache(tuple);
244 }
245 else if (!missing_ok)
248 errmsg("replication origin \"%s\" does not exist",
249 roname)));
250
251 return roident;
252}
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
FormData_pg_replication_origin * Form_pg_replication_origin
#define InvalidOid
unsigned int Oid
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition syscache.c:220

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 499 of file origin.c.

500{
501 HeapTuple tuple;
503
504 Assert(OidIsValid((Oid) roident));
505 Assert(roident != InvalidRepOriginId);
506 Assert(roident != DoNotReplicateId);
507
509 ObjectIdGetDatum((Oid) roident));
510
511 if (HeapTupleIsValid(tuple))
512 {
514 *roname = text_to_cstring(&ric->roname);
515 ReleaseSysCache(tuple);
516
517 return true;
518 }
519 else
520 {
521 *roname = NULL;
522
523 if (!missing_ok)
526 errmsg("replication origin with ID %d does not exist",
527 roident)));
528
529 return false;
530 }
531}

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_check_prerequisites()

◆ replorigin_create()

RepOriginId replorigin_create ( const char roname)

Definition at line 260 of file origin.c.

261{
262 Oid roident;
263 HeapTuple tuple = NULL;
264 Relation rel;
267 SysScanDesc scan;
269
270 /*
271 * To avoid needing a TOAST table for pg_replication_origin, we limit
272 * replication origin names to 512 bytes. This should be more than enough
273 * for all practical use.
274 */
278 errmsg("replication origin name is too long"),
279 errdetail("Replication origin names must be no longer than %d bytes.",
281
283
285
286 /*
287 * We need the numeric replication origin to be 16bit wide, so we cannot
288 * rely on the normal oid allocation. Instead we simply scan
289 * pg_replication_origin for the first unused id. That's not particularly
290 * efficient, but this should be a fairly infrequent operation - we can
291 * easily spend a bit more code on this when it turns out it needs to be
292 * faster.
293 *
294 * We handle concurrency by taking an exclusive lock (allowing reads!)
295 * over the table for the duration of the search. Because we use a "dirty
296 * snapshot" we can read rows that other in-progress sessions have
297 * written, even though they would be invisible with normal snapshots. Due
298 * to the exclusive lock there's no danger that new rows can appear while
299 * we're checking.
300 */
302
304
305 /*
306 * We want to be able to access pg_replication_origin without setting up a
307 * snapshot. To make that safe, it needs to not have a TOAST table, since
308 * TOASTed data cannot be fetched without a snapshot. As of this writing,
309 * its only varlena column is roname, which we limit to 512 bytes to avoid
310 * needing out-of-line storage. If you add a TOAST table to this catalog,
311 * be sure to set up a snapshot everywhere it might be needed. For more
312 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
313 */
314 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
315
316 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
317 {
318 bool nulls[Natts_pg_replication_origin];
320 bool collides;
321
323
324 ScanKeyInit(&key,
327 ObjectIdGetDatum(roident));
328
330 true /* indexOK */ ,
332 1, &key);
333
335
336 systable_endscan(scan);
337
338 if (!collides)
339 {
340 /*
341 * Ok, found an unused roident, insert the new row and do a CCI,
342 * so our callers can look it up if they want to.
343 */
344 memset(&nulls, 0, sizeof(nulls));
345
348
349 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
350 CatalogTupleInsert(rel, tuple);
352 break;
353 }
354 }
355
356 /* now release lock again, */
358
359 if (tuple == NULL)
362 errmsg("could not find free replication origin ID")));
363
364 heap_freetuple(tuple);
365 return roident;
366}
#define PG_UINT16_MAX
Definition c.h:611
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define ExclusiveLock
Definition lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define MAX_RONAME_LEN
Definition origin.h:41
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
Form_pg_class rd_rel
Definition rel.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool IsTransactionState(void)
Definition xact.c:388
void CommandCounterIncrement(void)
Definition xact.c:1101

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg(), ERROR, ExclusiveLock, fb(), heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char name,
bool  missing_ok,
bool  nowait 
)

Definition at line 445 of file origin.c.

446{
447 RepOriginId roident;
448 Relation rel;
449 HeapTuple tuple;
450
452
454
455 roident = replorigin_by_name(name, missing_ok);
456
457 /* Lock the origin to prevent concurrent drops. */
460
462 if (!HeapTupleIsValid(tuple))
463 {
464 if (!missing_ok)
465 elog(ERROR, "cache lookup failed for replication origin with ID %d",
466 roident);
467
468 /*
469 * We don't need to retain the locks if the origin is already dropped.
470 */
474 return;
475 }
476
477 replorigin_state_clear(roident, nowait);
478
479 /*
480 * Now, we can delete the catalog entry.
481 */
482 CatalogTupleDelete(rel, &tuple->t_self);
483 ReleaseSysCache(tuple);
484
486
487 /* We keep the lock on pg_replication_origin until commit */
488 table_close(rel, NoLock);
489}
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition origin.c:372
ItemPointerData t_self
Definition htup.h:65

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, fb(), HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), ProcessSyncingTablesForApply(), and ProcessSyncingTablesForSync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 1045 of file origin.c.

1046{
1047 int i;
1048 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1049 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1050
1051 /* prevent slots from being concurrently dropped */
1053
1054 for (i = 0; i < max_active_replication_origins; i++)
1055 {
1057
1059
1060 if (state->roident == node)
1061 {
1062 LWLockAcquire(&state->lock, LW_SHARED);
1063
1064 remote_lsn = state->remote_lsn;
1065 local_lsn = state->local_lsn;
1066
1067 LWLockRelease(&state->lock);
1068
1069 break;
1070 }
1071 }
1072
1074
1075 if (flush && XLogRecPtrIsValid(local_lsn))
1076 XLogFlush(local_lsn);
1077
1078 return remote_lsn;
1079}

References fb(), i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, XLogFlush(), and XLogRecPtrIsValid.

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 855 of file origin.c.

856{
857 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
858
859 switch (info)
860 {
862 {
865
866 replorigin_advance(xlrec->node_id,
867 xlrec->remote_lsn, record->EndRecPtr,
868 xlrec->force /* backward */ ,
869 false /* WAL log */ );
870 break;
871 }
873 {
875 int i;
876
878
879 for (i = 0; i < max_active_replication_origins; i++)
880 {
882
883 /* found our slot */
884 if (state->roident == xlrec->node_id)
885 {
886 /* reset entry */
887 state->roident = InvalidRepOriginId;
888 state->remote_lsn = InvalidXLogRecPtr;
889 state->local_lsn = InvalidXLogRecPtr;
890 break;
891 }
892 }
893 break;
894 }
895 default:
896 elog(PANIC, "replorigin_redo: unknown op code %u", info);
897 }
898}
uint8_t uint8
Definition c.h:554
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414

References elog, XLogReaderState::EndRecPtr, fb(), i, InvalidRepOriginId, InvalidXLogRecPtr, max_active_replication_origins, PANIC, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1274 of file origin.c.

1275{
1277
1279 ereport(ERROR,
1281 errmsg("no replication origin is configured")));
1282
1283 /*
1284 * Restrict explicit resetting of the replication origin if it was first
1285 * acquired by this process and others are still using it. While the
1286 * system handles this safely (as happens if the first session exits
1287 * without calling reset), it is best to avoid doing so.
1288 */
1291 ereport(ERROR,
1293 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1295 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1296 errhint("Reset the replication origin in all other processes before retrying.")));
1297
1299}
int MyProcPid
Definition globals.c:47

References ReplicationState::acquired_by, Assert, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, fb(), max_active_replication_origins, MyProcPid, ReplicationState::refcount, replorigin_session_reset_internal(), ReplicationState::roident, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and ProcessSyncingTablesForSync().

◆ replorigin_session_reset_internal()

static void replorigin_session_reset_internal ( void  )
static

Definition at line 1083 of file origin.c.

1084{
1086
1088
1090
1091 /* The origin must be held by at least one process at this point. */
1093
1094 /*
1095 * Reset the PID only if the current session is the first to set up this
1096 * origin. This avoids clearing the first process's PID when any other
1097 * session releases the origin.
1098 */
1101
1103
1106
1108
1110}
void ConditionVariableBroadcast(ConditionVariable *cv)
ConditionVariable origin_cv
Definition origin.c:139

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), fb(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, ReplicationState::origin_cv, ReplicationState::refcount, and session_replication_state.

Referenced by ReplicationOriginExitCleanup(), and replorigin_session_reset().

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node,
int  acquired_by 
)

Definition at line 1144 of file origin.c.

1145{
1146 static bool registered_cleanup;
1147 int i;
1148 int free_slot = -1;
1149
1150 if (!registered_cleanup)
1151 {
1153 registered_cleanup = true;
1154 }
1155
1157
1159 ereport(ERROR,
1161 errmsg("cannot setup replication origin when one is already setup")));
1162
1163 /* Lock exclusively, as we may have to create a new table entry. */
1165
1166 /*
1167 * Search for either an existing slot for the origin, or a free one we can
1168 * use.
1169 */
1170 for (i = 0; i < max_active_replication_origins; i++)
1171 {
1173
1174 /* remember where to insert if necessary */
1175 if (curstate->roident == InvalidRepOriginId &&
1176 free_slot == -1)
1177 {
1178 free_slot = i;
1179 continue;
1180 }
1181
1182 /* not our slot */
1183 if (curstate->roident != node)
1184 continue;
1185
1186 else if (curstate->acquired_by != 0 && acquired_by == 0)
1187 {
1188 ereport(ERROR,
1190 errmsg("replication origin with ID %d is already active for PID %d",
1191 curstate->roident, curstate->acquired_by)));
1192 }
1193
1194 else if (curstate->acquired_by != acquired_by)
1195 {
1196 ereport(ERROR,
1198 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1199 node, acquired_by)));
1200 }
1201
1202 /*
1203 * The origin is in use, but PID is not recorded. This can happen if
1204 * the process that originally acquired the origin exited without
1205 * releasing it. To ensure correctness, other processes cannot acquire
1206 * the origin until all processes currently using it have released it.
1207 */
1208 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1209 ereport(ERROR,
1211 errmsg("replication origin with ID %d is already active in another process",
1212 curstate->roident)));
1213
1214 /* ok, found slot */
1216 break;
1217 }
1218
1219
1221 ereport(ERROR,
1223 errmsg("could not find free replication state slot for replication origin with ID %d",
1224 node),
1225 errhint("Increase \"max_active_replication_origins\" and try again.")));
1226 else if (session_replication_state == NULL)
1227 {
1228 if (acquired_by)
1229 ereport(ERROR,
1231 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1232 acquired_by, node)));
1233
1234 /* initialize new slot */
1239 }
1240
1241
1243
1244 if (acquired_by == 0)
1245 {
1248 }
1249 else
1250 {
1251 /*
1252 * Sanity check: the origin must already be acquired by the process
1253 * passed as input, and at least one process must be using it.
1254 */
1257 }
1258
1260
1262
1263 /* probably this one is pointless */
1265}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1117

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, fb(), i, InvalidRepOriginId, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::refcount, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, session_replication_state, and XLogRecPtrIsValid.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_state_clear()

static void replorigin_state_clear ( RepOriginId  roident,
bool  nowait 
)
static

Definition at line 372 of file origin.c.

373{
374 int i;
375
376 /*
377 * Clean up the slot state info, if there is any matching slot.
378 */
379restart:
381
382 for (i = 0; i < max_active_replication_origins; i++)
383 {
385
386 if (state->roident == roident)
387 {
388 /* found our slot, is it busy? */
389 if (state->refcount > 0)
390 {
392
393 if (nowait)
396 (state->acquired_by != 0)
397 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
398 state->roident,
399 state->acquired_by)
400 : errmsg("could not drop replication origin with ID %d, in use by another process",
401 state->roident)));
402
403 /*
404 * We must wait and then retry. Since we don't know which CV
405 * to wait on until here, we can't readily use
406 * ConditionVariablePrepareToSleep (calling it here would be
407 * wrong, since we could miss the signal if we did so); just
408 * use ConditionVariableSleep directly.
409 */
410 cv = &state->origin_cv;
411
413
415 goto restart;
416 }
417
418 /* first make a WAL log entry */
419 {
421
422 xlrec.node_id = roident;
424 XLogRegisterData(&xlrec, sizeof(xlrec));
426 }
427
428 /* then clear the in-memory slot */
429 state->roident = InvalidRepOriginId;
430 state->remote_lsn = InvalidXLogRecPtr;
431 state->local_lsn = InvalidXLogRecPtr;
432 break;
433 }
434 }
437}
bool ConditionVariableCancelSleep(void)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
RepOriginId node_id
Definition origin.h:27

References ConditionVariableCancelSleep(), ConditionVariableSleep(), ereport, errcode(), errmsg(), ERROR, fb(), i, InvalidRepOriginId, InvalidXLogRecPtr, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_drop::node_id, replication_states, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 728 of file origin.c.

729{
730 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
731 int fd;
732 int readBytes;
734 int last_state = 0;
737
738 /* don't want to overwrite already existing state */
739#ifdef USE_ASSERT_CHECKING
740 static bool already_started = false;
741
743 already_started = true;
744#endif
745
747 return;
748
750
751 elog(DEBUG2, "starting up replication origin progress state");
752
754
755 /*
756 * might have had max_active_replication_origins == 0 last run, or we just
757 * brought up a standby.
758 */
759 if (fd < 0 && errno == ENOENT)
760 return;
761 else if (fd < 0)
764 errmsg("could not open file \"%s\": %m",
765 path)));
766
767 /* verify magic, that is written even if nothing was active */
768 readBytes = read(fd, &magic, sizeof(magic));
769 if (readBytes != sizeof(magic))
770 {
771 if (readBytes < 0)
774 errmsg("could not read file \"%s\": %m",
775 path)));
776 else
779 errmsg("could not read file \"%s\": read %d of %zu",
780 path, readBytes, sizeof(magic))));
781 }
782 COMP_CRC32C(crc, &magic, sizeof(magic));
783
784 if (magic != REPLICATION_STATE_MAGIC)
786 (errmsg("replication checkpoint has wrong magic %u instead of %u",
787 magic, REPLICATION_STATE_MAGIC)));
788
789 /* we can skip locking here, no other access is possible */
790
791 /* recover individual states, until there are no more to be found */
792 while (true)
793 {
795
796 readBytes = read(fd, &disk_state, sizeof(disk_state));
797
798 if (readBytes < 0)
799 {
802 errmsg("could not read file \"%s\": %m",
803 path)));
804 }
805
806 /* no further data */
807 if (readBytes == sizeof(crc))
808 {
809 memcpy(&file_crc, &disk_state, sizeof(file_crc));
810 break;
811 }
812
813 if (readBytes != sizeof(disk_state))
814 {
817 errmsg("could not read file \"%s\": read %d of %zu",
818 path, readBytes, sizeof(disk_state))));
819 }
820
822
826 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
827
828 /* copy data to shared memory */
831 last_state++;
832
833 ereport(LOG,
834 errmsg("recovered replication state of node %d to %X/%08X",
835 disk_state.roident,
836 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
837 }
838
839 /* now check checksum */
841 if (file_crc != crc)
844 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
845 crc, file_crc)));
846
847 if (CloseTransientFile(fd) != 0)
850 errmsg("could not close file \"%s\": %m",
851 path)));
852}
#define LOG
Definition elog.h:31
#define DEBUG2
Definition elog.h:29
#define read(a, b, c)
Definition win32.h:13
#define ERRCODE_DATA_CORRUPTED
static int fd(const char *x, int i)
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fb(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, and ReplicationState::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replication_states

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 179 of file origin.c.

Referenced by ReplicationOriginShmemInit().

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp

◆ session_replication_state