PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"
 
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
 
#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_origins, bool recoveryOK)
 
static bool IsReservedOriginName (const char *name)
 
ReplOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
ReplOriginId replorigin_create (const char *roname)
 
static void replorigin_state_clear (ReplOriginId roident, bool nowait)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (ReplOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (ReplOriginId node, bool flush)
 
static void replorigin_session_reset_internal (void)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (ReplOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void replorigin_xact_clear (bool clear_origin)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

int max_active_replication_origins = 10
 
ReplOriginXactState replorigin_xact_state
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ PG_REPLORIGIN_CHECKPOINT_FILENAME

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"

Definition at line 100 of file origin.c.

◆ PG_REPLORIGIN_CHECKPOINT_TMPFILE

#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"

Definition at line 101 of file origin.c.

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 192 of file origin.c.

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 604 of file origin.c.

605{
607 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
608 int tmpfd;
609 int i;
612
614 return;
615
617
618 /* make sure no old temp file is remaining */
619 if (unlink(tmppath) < 0 && errno != ENOENT)
622 errmsg("could not remove file \"%s\": %m",
623 tmppath)));
624
625 /*
626 * no other backend can perform this at the same time; only one checkpoint
627 * can happen at a time.
628 */
631 if (tmpfd < 0)
634 errmsg("could not create file \"%s\": %m",
635 tmppath)));
636
637 /* write magic */
638 errno = 0;
639 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
640 {
641 /* if write didn't set errno, assume problem is no disk space */
642 if (errno == 0)
643 errno = ENOSPC;
646 errmsg("could not write to file \"%s\": %m",
647 tmppath)));
648 }
649 COMP_CRC32C(crc, &magic, sizeof(magic));
650
651 /* prevent concurrent creations/drops */
653
654 /* write actual data */
655 for (i = 0; i < max_active_replication_origins; i++)
656 {
659 XLogRecPtr local_lsn;
660
661 if (curstate->roident == InvalidReplOriginId)
662 continue;
663
664 /* zero, to avoid uninitialized padding bytes */
665 memset(&disk_state, 0, sizeof(disk_state));
666
668
669 disk_state.roident = curstate->roident;
670
671 disk_state.remote_lsn = curstate->remote_lsn;
672 local_lsn = curstate->local_lsn;
673
674 LWLockRelease(&curstate->lock);
675
676 /* make sure we only write out a commit that's persistent */
677 XLogFlush(local_lsn);
678
679 errno = 0;
680 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
681 sizeof(disk_state))
682 {
683 /* if write didn't set errno, assume problem is no disk space */
684 if (errno == 0)
685 errno = ENOSPC;
688 errmsg("could not write to file \"%s\": %m",
689 tmppath)));
690 }
691
693 }
694
696
697 /* write out the CRC */
699 errno = 0;
700 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
701 {
702 /* if write didn't set errno, assume problem is no disk space */
703 if (errno == 0)
704 errno = ENOSPC;
707 errmsg("could not write to file \"%s\": %m",
708 tmppath)));
709 }
710
711 if (CloseTransientFile(tmpfd) != 0)
714 errmsg("could not close file \"%s\": %m",
715 tmppath)));
716
717 /* fsync, rename to permanent file, fsync file and directory */
719}
#define PG_BINARY
Definition c.h:1287
uint32_t uint32
Definition c.h:546
int errcode_for_file_access(void)
Definition elog.c:886
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define PANIC
Definition elog.h:42
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:782
int CloseTransientFile(int fd)
Definition fd.c:2854
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2677
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
int max_active_replication_origins
Definition origin.c:104
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:101
static ReplicationState * replication_states
Definition origin.c:176
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:100
#define REPLICATION_STATE_MAGIC
Definition origin.c:192
#define InvalidReplOriginId
Definition origin.h:33
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
static int fb(int x)
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2783
uint64 XLogRecPtr
Definition xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), fb(), FIN_CRC32C, i, INIT_CRC32C, InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, REPLICATION_STATE_MAGIC, replication_states, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ IsReservedOriginName()

static bool IsReservedOriginName ( const char name)
static

Definition at line 214 of file origin.c.

215{
216 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
218}
int pg_strcasecmp(const char *s1, const char *s2)
const char * name

References fb(), name, and pg_strcasecmp().

Referenced by pg_replication_origin_create().

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1560 of file origin.c.

1561{
1564 ReplOriginId node;
1565
1566 replorigin_check_prerequisites(true, false);
1567
1568 /* lock to prevent the replication origin from vanishing */
1570
1571 node = replorigin_by_name(text_to_cstring(name), false);
1572
1573 /*
1574 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1575 * xact hasn't committed yet. This is why this function should be used to
1576 * set up the initial replication state, but not for replay.
1577 */
1579 true /* go backward */ , true /* WAL log */ );
1580
1582
1584}
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
#define RowExclusiveLock
Definition lockdefs.h:38
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:231
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:918
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:195
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
Definition c.h:706
char * text_to_cstring(const text *t)
Definition varlena.c:214
uint16 ReplOriginId
Definition xlogdefs.h:69
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References fb(), InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1373 of file origin.c.

1374{
1375 char *name;
1376 ReplOriginId roident;
1377
1378 replorigin_check_prerequisites(false, false);
1379
1381
1382 /*
1383 * Replication origins "any and "none" are reserved for system options.
1384 * The origins "pg_xxx" are reserved for internal use.
1385 */
1387 ereport(ERROR,
1389 errmsg("replication origin name \"%s\" is reserved",
1390 name),
1391 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1393
1394 /*
1395 * If built with appropriate switch, whine when regression-testing
1396 * conventions for replication origin names are violated.
1397 */
1398#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1399 if (strncmp(name, "regress_", 8) != 0)
1400 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1401#endif
1402
1403 roident = replorigin_create(name);
1404
1405 pfree(name);
1406
1407 PG_RETURN_OID(roident);
1408}
bool IsReservedName(const char *name)
Definition catalog.c:278
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errcode(int sqlerrcode)
Definition elog.c:863
#define WARNING
Definition elog.h:36
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_OID(x)
Definition fmgr.h:361
void pfree(void *pointer)
Definition mcxt.c:1616
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:262
static bool IsReservedOriginName(const char *name)
Definition origin.c:214
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342

References DatumGetPointer(), elog, ereport, errcode(), errdetail(), errmsg(), ERROR, fb(), IsReservedName(), IsReservedOriginName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), text_to_cstring(), and WARNING.

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1414 of file origin.c.

1415{
1416 char *name;
1417
1418 replorigin_check_prerequisites(false, false);
1419
1421
1422 replorigin_drop_by_name(name, false, true);
1423
1424 pfree(name);
1425
1427}
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:447

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1433 of file origin.c.

1434{
1435 char *name;
1436 ReplOriginId roident;
1437
1438 replorigin_check_prerequisites(false, false);
1439
1441 roident = replorigin_by_name(name, true);
1442
1443 pfree(name);
1444
1445 if (OidIsValid(roident))
1446 PG_RETURN_OID(roident);
1448}
#define OidIsValid(objectId)
Definition c.h:788
#define PG_RETURN_NULL()
Definition fmgr.h:346

References DatumGetPointer(), name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1595 of file origin.c.

1596{
1597 char *name;
1598 bool flush;
1599 ReplOriginId roident;
1600 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1601
1603
1605 flush = PG_GETARG_BOOL(1);
1606
1607 roident = replorigin_by_name(name, false);
1608 Assert(OidIsValid(roident));
1609
1610 remote_lsn = replorigin_get_progress(roident, flush);
1611
1612 if (!XLogRecPtrIsValid(remote_lsn))
1614
1615 PG_RETURN_LSN(remote_lsn);
1616}
#define Assert(condition)
Definition c.h:873
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1047
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References Assert, DatumGetPointer(), InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), text_to_cstring(), and XLogRecPtrIsValid.

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1493 of file origin.c.

1494{
1495 replorigin_check_prerequisites(false, false);
1496
1498}
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
ReplOriginXactState replorigin_xact_state
Definition origin.c:166
ReplOriginId origin
Definition origin.h:45

References InvalidReplOriginId, ReplOriginXactState::origin, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_xact_state.

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1509 of file origin.c.

1510{
1511 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1512 bool flush = PG_GETARG_BOOL(0);
1513
1514 replorigin_check_prerequisites(true, false);
1515
1517 ereport(ERROR,
1519 errmsg("no replication origin is configured")));
1520
1521 remote_lsn = replorigin_session_get_progress(flush);
1522
1523 if (!XLogRecPtrIsValid(remote_lsn))
1525
1526 PG_RETURN_LSN(remote_lsn);
1527}
static ReplicationState * session_replication_state
Definition origin.c:189
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1328

References ereport, errcode(), errmsg(), ERROR, fb(), InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), replorigin_session_get_progress(), session_replication_state, and XLogRecPtrIsValid.

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1478 of file origin.c.

1479{
1480 replorigin_check_prerequisites(true, false);
1481
1483
1485
1487}
void replorigin_session_reset(void)
Definition origin.c:1276
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1352

References PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_reset(), and replorigin_xact_clear().

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1454 of file origin.c.

1455{
1456 char *name;
1457 ReplOriginId origin;
1458 int pid;
1459
1460 replorigin_check_prerequisites(true, false);
1461
1463 origin = replorigin_by_name(name, false);
1464 pid = PG_GETARG_INT32(1);
1465 replorigin_session_setup(origin, pid);
1466
1468
1469 pfree(name);
1470
1472}
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1146

References DatumGetPointer(), name, ReplOriginXactState::origin, pfree(), PG_GETARG_DATUM, PG_GETARG_INT32, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_setup(), replorigin_xact_state, and text_to_cstring().

◆ pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1548 of file origin.c.

1549{
1550 replorigin_check_prerequisites(true, false);
1551
1552 /* Do not clear the session origin */
1553 replorigin_xact_clear(false);
1554
1556}

References PG_RETURN_VOID, replorigin_check_prerequisites(), and replorigin_xact_clear().

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1620 of file origin.c.

1621{
1622 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1623 int i;
1624#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1625
1626 /* we want to return 0 rows if slot is set to zero */
1627 replorigin_check_prerequisites(false, true);
1628
1629 InitMaterializedSRF(fcinfo, 0);
1630
1631 /* prevent slots from being concurrently dropped */
1633
1634 /*
1635 * Iterate through all possible replication_states, display if they are
1636 * filled. Note that we do not take any locks, so slightly corrupted/out
1637 * of date values are a possibility.
1638 */
1639 for (i = 0; i < max_active_replication_origins; i++)
1640 {
1644 char *roname;
1645
1647
1648 /* unused slot, nothing to display */
1649 if (state->roident == InvalidReplOriginId)
1650 continue;
1651
1652 memset(values, 0, sizeof(values));
1653 memset(nulls, 1, sizeof(nulls));
1654
1655 values[0] = ObjectIdGetDatum(state->roident);
1656 nulls[0] = false;
1657
1658 /*
1659 * We're not preventing the origin to be dropped concurrently, so
1660 * silently accept that it might be gone.
1661 */
1662 if (replorigin_by_oid(state->roident, true,
1663 &roname))
1664 {
1666 nulls[1] = false;
1667 }
1668
1669 LWLockAcquire(&state->lock, LW_SHARED);
1670
1671 values[2] = LSNGetDatum(state->remote_lsn);
1672 nulls[2] = false;
1673
1674 values[3] = LSNGetDatum(state->local_lsn);
1675 nulls[3] = false;
1676
1677 LWLockRelease(&state->lock);
1678
1679 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1680 values, nulls);
1681 }
1682
1684
1685#undef REPLICATION_ORIGIN_PROGRESS_COLS
1686
1687 return (Datum) 0;
1688}
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:501
#define REPLICATION_ORIGIN_PROGRESS_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:784

References CStringGetTextDatum, fb(), i, InitMaterializedSRF(), InvalidReplOriginId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, ObjectIdGetDatum(), REPLICATION_ORIGIN_PROGRESS_COLS, replication_states, replorigin_by_oid(), replorigin_check_prerequisites(), tuplestore_putvalues(), and values.

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1119 of file origin.c.

1120{
1122 return;
1123
1125}
static void replorigin_session_reset_internal(void)
Definition origin.c:1085

References fb(), replorigin_session_reset_internal(), and session_replication_state.

Referenced by replorigin_session_setup().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 557 of file origin.c.

558{
559 bool found;
560
562 return;
563
565 ShmemInitStruct("ReplicationOriginState",
567 &found);
569
570 if (!found)
571 {
572 int i;
573
575
577
578 for (i = 0; i < max_active_replication_origins; i++)
579 {
583 }
584 }
585}
#define MemSet(start, val, len)
Definition c.h:1013
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:181
Size ReplicationOriginShmemSize(void)
Definition origin.c:542
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:162

References ConditionVariableInit(), fb(), i, LWLockInitialize(), max_active_replication_origins, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 542 of file origin.c.

543{
544 Size size = 0;
545
547 return size;
548
549 size = add_size(size, offsetof(ReplicationStateCtl, states));
550
551 size = add_size(size,
553 return size;
554}
size_t Size
Definition c.h:619
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497

References add_size(), fb(), max_active_replication_origins, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( ReplOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 918 of file origin.c.

921{
922 int i;
925
927
928 /* we don't track DoNotReplicateId */
929 if (node == DoNotReplicateId)
930 return;
931
932 /*
933 * XXX: For the case where this is called by WAL replay, it'd be more
934 * efficient to restore into a backend local hashtable and only dump into
935 * shmem after recovery is finished. Let's wait with implementing that
936 * till it's shown to be a measurable expense
937 */
938
939 /* Lock exclusively, as we may have to create a new table entry. */
941
942 /*
943 * Search for either an existing slot for the origin, or a free one we can
944 * use.
945 */
946 for (i = 0; i < max_active_replication_origins; i++)
947 {
949
950 /* remember where to insert if necessary */
951 if (curstate->roident == InvalidReplOriginId &&
952 free_state == NULL)
953 {
955 continue;
956 }
957
958 /* not our slot */
959 if (curstate->roident != node)
960 {
961 continue;
962 }
963
964 /* ok, found slot */
966
968
969 /* Make sure it's not used by somebody else */
970 if (replication_state->refcount > 0)
971 {
974 (replication_state->acquired_by != 0)
975 ? errmsg("replication origin with ID %d is already active for PID %d",
976 replication_state->roident,
977 replication_state->acquired_by)
978 : errmsg("replication origin with ID %d is already active in another process",
979 replication_state->roident)));
980 }
981
982 break;
983 }
984
988 errmsg("could not find free replication state slot for replication origin with ID %d",
989 node),
990 errhint("Increase \"max_active_replication_origins\" and try again.")));
991
992 if (replication_state == NULL)
993 {
994 /* initialize new slot */
999 replication_state->roident = node;
1000 }
1001
1003
1004 /*
1005 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1006 * and the standby gets the message. Primarily this will be called during
1007 * WAL replay (of commit records) where no WAL logging is necessary.
1008 */
1009 if (wal_log)
1010 {
1012
1014 xlrec.node_id = node;
1015 xlrec.force = go_backward;
1016
1018 XLogRegisterData(&xlrec, sizeof(xlrec));
1019
1021 }
1022
1023 /*
1024 * Due to - harmless - race conditions during a checkpoint we could see
1025 * values here that are older than the ones we already have in memory. We
1026 * could also see older values for prepared transactions when the prepare
1027 * is sent at a later point of time along with commit prepared and there
1028 * are other transactions commits between prepare and commit prepared. See
1029 * ReorderBufferFinishPrepared. Don't overwrite those.
1030 */
1031 if (go_backward || replication_state->remote_lsn < remote_commit)
1032 replication_state->remote_lsn = remote_commit;
1034 (go_backward || replication_state->local_lsn < local_commit))
1035 replication_state->local_lsn = local_commit;
1037
1038 /*
1039 * Release *after* changing the LSNs, slot isn't acquired and thus could
1040 * otherwise be dropped anytime.
1041 */
1043}
int errhint(const char *fmt,...)
Definition elog.c:1330
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
XLogRecPtr remote_lsn
Definition origin.h:20
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152

References Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, fb(), i, InvalidReplOriginId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::remote_lsn, replication_states, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), XLogRecPtrIsValid, and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

ReplOriginId replorigin_by_name ( const char roname,
bool  missing_ok 
)

Definition at line 231 of file origin.c.

232{
234 Oid roident = InvalidOid;
235 HeapTuple tuple;
237
239
241 if (HeapTupleIsValid(tuple))
242 {
244 roident = ident->roident;
245 ReleaseSysCache(tuple);
246 }
247 else if (!missing_ok)
250 errmsg("replication origin \"%s\" does not exist",
251 roname)));
252
253 return roident;
254}
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
FormData_pg_replication_origin * Form_pg_replication_origin
#define InvalidOid
unsigned int Oid
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition syscache.c:220

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( ReplOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 501 of file origin.c.

502{
503 HeapTuple tuple;
505
506 Assert(OidIsValid((Oid) roident));
507 Assert(roident != InvalidReplOriginId);
508 Assert(roident != DoNotReplicateId);
509
511 ObjectIdGetDatum((Oid) roident));
512
513 if (HeapTupleIsValid(tuple))
514 {
516 *roname = text_to_cstring(&ric->roname);
517 ReleaseSysCache(tuple);
518
519 return true;
520 }
521 else
522 {
523 *roname = NULL;
524
525 if (!missing_ok)
528 errmsg("replication origin with ID %d does not exist",
529 roident)));
530
531 return false;
532 }
533}

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, InvalidReplOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_check_prerequisites()

◆ replorigin_create()

ReplOriginId replorigin_create ( const char roname)

Definition at line 262 of file origin.c.

263{
264 Oid roident;
265 HeapTuple tuple = NULL;
266 Relation rel;
269 SysScanDesc scan;
271
272 /*
273 * To avoid needing a TOAST table for pg_replication_origin, we limit
274 * replication origin names to 512 bytes. This should be more than enough
275 * for all practical use.
276 */
280 errmsg("replication origin name is too long"),
281 errdetail("Replication origin names must be no longer than %d bytes.",
283
285
287
288 /*
289 * We need the numeric replication origin to be 16bit wide, so we cannot
290 * rely on the normal oid allocation. Instead we simply scan
291 * pg_replication_origin for the first unused id. That's not particularly
292 * efficient, but this should be a fairly infrequent operation - we can
293 * easily spend a bit more code on this when it turns out it needs to be
294 * faster.
295 *
296 * We handle concurrency by taking an exclusive lock (allowing reads!)
297 * over the table for the duration of the search. Because we use a "dirty
298 * snapshot" we can read rows that other in-progress sessions have
299 * written, even though they would be invisible with normal snapshots. Due
300 * to the exclusive lock there's no danger that new rows can appear while
301 * we're checking.
302 */
304
306
307 /*
308 * We want to be able to access pg_replication_origin without setting up a
309 * snapshot. To make that safe, it needs to not have a TOAST table, since
310 * TOASTed data cannot be fetched without a snapshot. As of this writing,
311 * its only varlena column is roname, which we limit to 512 bytes to avoid
312 * needing out-of-line storage. If you add a TOAST table to this catalog,
313 * be sure to set up a snapshot everywhere it might be needed. For more
314 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
315 */
316 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
317
318 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
319 {
320 bool nulls[Natts_pg_replication_origin];
322 bool collides;
323
325
326 ScanKeyInit(&key,
329 ObjectIdGetDatum(roident));
330
332 true /* indexOK */ ,
334 1, &key);
335
337
338 systable_endscan(scan);
339
340 if (!collides)
341 {
342 /*
343 * Ok, found an unused roident, insert the new row and do a CCI,
344 * so our callers can look it up if they want to.
345 */
346 memset(&nulls, 0, sizeof(nulls));
347
350
351 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
352 CatalogTupleInsert(rel, tuple);
354 break;
355 }
356 }
357
358 /* now release lock again, */
360
361 if (tuple == NULL)
364 errmsg("could not find free replication origin ID")));
365
366 heap_freetuple(tuple);
367 return roident;
368}
#define PG_UINT16_MAX
Definition c.h:601
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define ExclusiveLock
Definition lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define MAX_RONAME_LEN
Definition origin.h:41
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
Form_pg_class rd_rel
Definition rel.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool IsTransactionState(void)
Definition xact.c:388
void CommandCounterIncrement(void)
Definition xact.c:1101

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg(), ERROR, ExclusiveLock, fb(), heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char name,
bool  missing_ok,
bool  nowait 
)

Definition at line 447 of file origin.c.

448{
449 ReplOriginId roident;
450 Relation rel;
451 HeapTuple tuple;
452
454
456
457 roident = replorigin_by_name(name, missing_ok);
458
459 /* Lock the origin to prevent concurrent drops. */
462
464 if (!HeapTupleIsValid(tuple))
465 {
466 if (!missing_ok)
467 elog(ERROR, "cache lookup failed for replication origin with ID %d",
468 roident);
469
470 /*
471 * We don't need to retain the locks if the origin is already dropped.
472 */
476 return;
477 }
478
479 replorigin_state_clear(roident, nowait);
480
481 /*
482 * Now, we can delete the catalog entry.
483 */
484 CatalogTupleDelete(rel, &tuple->t_self);
485 ReleaseSysCache(tuple);
486
488
489 /* We keep the lock on pg_replication_origin until commit */
490 table_close(rel, NoLock);
491}
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:374
ItemPointerData t_self
Definition htup.h:65

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, fb(), HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), ProcessSyncingTablesForApply(), and ProcessSyncingTablesForSync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( ReplOriginId  node,
bool  flush 
)

Definition at line 1047 of file origin.c.

1048{
1049 int i;
1050 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1051 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1052
1053 /* prevent slots from being concurrently dropped */
1055
1056 for (i = 0; i < max_active_replication_origins; i++)
1057 {
1059
1061
1062 if (state->roident == node)
1063 {
1064 LWLockAcquire(&state->lock, LW_SHARED);
1065
1066 remote_lsn = state->remote_lsn;
1067 local_lsn = state->local_lsn;
1068
1069 LWLockRelease(&state->lock);
1070
1071 break;
1072 }
1073 }
1074
1076
1077 if (flush && XLogRecPtrIsValid(local_lsn))
1078 XLogFlush(local_lsn);
1079
1080 return remote_lsn;
1081}

References fb(), i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, XLogFlush(), and XLogRecPtrIsValid.

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 857 of file origin.c.

858{
859 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
860
861 switch (info)
862 {
864 {
867
868 replorigin_advance(xlrec->node_id,
869 xlrec->remote_lsn, record->EndRecPtr,
870 xlrec->force /* backward */ ,
871 false /* WAL log */ );
872 break;
873 }
875 {
877 int i;
878
880
881 for (i = 0; i < max_active_replication_origins; i++)
882 {
884
885 /* found our slot */
886 if (state->roident == xlrec->node_id)
887 {
888 /* reset entry */
889 state->roident = InvalidReplOriginId;
890 state->remote_lsn = InvalidXLogRecPtr;
891 state->local_lsn = InvalidXLogRecPtr;
892 break;
893 }
894 }
895 break;
896 }
897 default:
898 elog(PANIC, "replorigin_redo: unknown op code %u", info);
899 }
900}
uint8_t uint8
Definition c.h:544
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414

References elog, XLogReaderState::EndRecPtr, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, max_active_replication_origins, PANIC, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1276 of file origin.c.

1277{
1279
1281 ereport(ERROR,
1283 errmsg("no replication origin is configured")));
1284
1285 /*
1286 * Restrict explicit resetting of the replication origin if it was first
1287 * acquired by this process and others are still using it. While the
1288 * system handles this safely (as happens if the first session exits
1289 * without calling reset), it is best to avoid doing so.
1290 */
1293 ereport(ERROR,
1295 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1297 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1298 errhint("Reset the replication origin in all other processes before retrying.")));
1299
1301}
int MyProcPid
Definition globals.c:47

References ReplicationState::acquired_by, Assert, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, fb(), max_active_replication_origins, MyProcPid, ReplicationState::refcount, replorigin_session_reset_internal(), ReplicationState::roident, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and ProcessSyncingTablesForSync().

◆ replorigin_session_reset_internal()

static void replorigin_session_reset_internal ( void  )
static

Definition at line 1085 of file origin.c.

1086{
1088
1090
1092
1093 /* The origin must be held by at least one process at this point. */
1095
1096 /*
1097 * Reset the PID only if the current session is the first to set up this
1098 * origin. This avoids clearing the first process's PID when any other
1099 * session releases the origin.
1100 */
1103
1105
1108
1110
1112}
void ConditionVariableBroadcast(ConditionVariable *cv)
ConditionVariable origin_cv
Definition origin.c:139

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), fb(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, ReplicationState::origin_cv, ReplicationState::refcount, and session_replication_state.

Referenced by ReplicationOriginExitCleanup(), and replorigin_session_reset().

◆ replorigin_session_setup()

void replorigin_session_setup ( ReplOriginId  node,
int  acquired_by 
)

Definition at line 1146 of file origin.c.

1147{
1148 static bool registered_cleanup;
1149 int i;
1150 int free_slot = -1;
1151
1152 if (!registered_cleanup)
1153 {
1155 registered_cleanup = true;
1156 }
1157
1159
1161 ereport(ERROR,
1163 errmsg("cannot setup replication origin when one is already setup")));
1164
1165 /* Lock exclusively, as we may have to create a new table entry. */
1167
1168 /*
1169 * Search for either an existing slot for the origin, or a free one we can
1170 * use.
1171 */
1172 for (i = 0; i < max_active_replication_origins; i++)
1173 {
1175
1176 /* remember where to insert if necessary */
1177 if (curstate->roident == InvalidReplOriginId &&
1178 free_slot == -1)
1179 {
1180 free_slot = i;
1181 continue;
1182 }
1183
1184 /* not our slot */
1185 if (curstate->roident != node)
1186 continue;
1187
1188 else if (curstate->acquired_by != 0 && acquired_by == 0)
1189 {
1190 ereport(ERROR,
1192 errmsg("replication origin with ID %d is already active for PID %d",
1193 curstate->roident, curstate->acquired_by)));
1194 }
1195
1196 else if (curstate->acquired_by != acquired_by)
1197 {
1198 ereport(ERROR,
1200 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1201 node, acquired_by)));
1202 }
1203
1204 /*
1205 * The origin is in use, but PID is not recorded. This can happen if
1206 * the process that originally acquired the origin exited without
1207 * releasing it. To ensure correctness, other processes cannot acquire
1208 * the origin until all processes currently using it have released it.
1209 */
1210 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1211 ereport(ERROR,
1213 errmsg("replication origin with ID %d is already active in another process",
1214 curstate->roident)));
1215
1216 /* ok, found slot */
1218 break;
1219 }
1220
1221
1223 ereport(ERROR,
1225 errmsg("could not find free replication state slot for replication origin with ID %d",
1226 node),
1227 errhint("Increase \"max_active_replication_origins\" and try again.")));
1228 else if (session_replication_state == NULL)
1229 {
1230 if (acquired_by)
1231 ereport(ERROR,
1233 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1234 acquired_by, node)));
1235
1236 /* initialize new slot */
1241 }
1242
1243
1245
1246 if (acquired_by == 0)
1247 {
1250 }
1251 else
1252 {
1253 /*
1254 * Sanity check: the origin must already be acquired by the process
1255 * passed as input, and at least one process must be using it.
1256 */
1259 }
1260
1262
1264
1265 /* probably this one is pointless */
1267}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1119

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, fb(), i, InvalidReplOriginId, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::refcount, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, session_replication_state, and XLogRecPtrIsValid.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_state_clear()

static void replorigin_state_clear ( ReplOriginId  roident,
bool  nowait 
)
static

Definition at line 374 of file origin.c.

375{
376 int i;
377
378 /*
379 * Clean up the slot state info, if there is any matching slot.
380 */
381restart:
383
384 for (i = 0; i < max_active_replication_origins; i++)
385 {
387
388 if (state->roident == roident)
389 {
390 /* found our slot, is it busy? */
391 if (state->refcount > 0)
392 {
394
395 if (nowait)
398 (state->acquired_by != 0)
399 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
400 state->roident,
401 state->acquired_by)
402 : errmsg("could not drop replication origin with ID %d, in use by another process",
403 state->roident)));
404
405 /*
406 * We must wait and then retry. Since we don't know which CV
407 * to wait on until here, we can't readily use
408 * ConditionVariablePrepareToSleep (calling it here would be
409 * wrong, since we could miss the signal if we did so); just
410 * use ConditionVariableSleep directly.
411 */
412 cv = &state->origin_cv;
413
415
417 goto restart;
418 }
419
420 /* first make a WAL log entry */
421 {
423
424 xlrec.node_id = roident;
426 XLogRegisterData(&xlrec, sizeof(xlrec));
428 }
429
430 /* then clear the in-memory slot */
431 state->roident = InvalidReplOriginId;
432 state->remote_lsn = InvalidXLogRecPtr;
433 state->local_lsn = InvalidXLogRecPtr;
434 break;
435 }
436 }
439}
bool ConditionVariableCancelSleep(void)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
ReplOriginId node_id
Definition origin.h:27

References ConditionVariableCancelSleep(), ConditionVariableSleep(), ereport, errcode(), errmsg(), ERROR, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_drop::node_id, replication_states, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

◆ replorigin_xact_clear()

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 730 of file origin.c.

731{
732 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
733 int fd;
734 int readBytes;
736 int last_state = 0;
739
740 /* don't want to overwrite already existing state */
741#ifdef USE_ASSERT_CHECKING
742 static bool already_started = false;
743
745 already_started = true;
746#endif
747
749 return;
750
752
753 elog(DEBUG2, "starting up replication origin progress state");
754
756
757 /*
758 * might have had max_active_replication_origins == 0 last run, or we just
759 * brought up a standby.
760 */
761 if (fd < 0 && errno == ENOENT)
762 return;
763 else if (fd < 0)
766 errmsg("could not open file \"%s\": %m",
767 path)));
768
769 /* verify magic, that is written even if nothing was active */
770 readBytes = read(fd, &magic, sizeof(magic));
771 if (readBytes != sizeof(magic))
772 {
773 if (readBytes < 0)
776 errmsg("could not read file \"%s\": %m",
777 path)));
778 else
781 errmsg("could not read file \"%s\": read %d of %zu",
782 path, readBytes, sizeof(magic))));
783 }
784 COMP_CRC32C(crc, &magic, sizeof(magic));
785
786 if (magic != REPLICATION_STATE_MAGIC)
788 (errmsg("replication checkpoint has wrong magic %u instead of %u",
789 magic, REPLICATION_STATE_MAGIC)));
790
791 /* we can skip locking here, no other access is possible */
792
793 /* recover individual states, until there are no more to be found */
794 while (true)
795 {
797
798 readBytes = read(fd, &disk_state, sizeof(disk_state));
799
800 if (readBytes < 0)
801 {
804 errmsg("could not read file \"%s\": %m",
805 path)));
806 }
807
808 /* no further data */
809 if (readBytes == sizeof(crc))
810 {
811 memcpy(&file_crc, &disk_state, sizeof(file_crc));
812 break;
813 }
814
815 if (readBytes != sizeof(disk_state))
816 {
819 errmsg("could not read file \"%s\": read %d of %zu",
820 path, readBytes, sizeof(disk_state))));
821 }
822
824
828 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
829
830 /* copy data to shared memory */
833 last_state++;
834
835 ereport(LOG,
836 errmsg("recovered replication state of node %d to %X/%08X",
837 disk_state.roident,
838 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
839 }
840
841 /* now check checksum */
843 if (file_crc != crc)
846 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
847 crc, file_crc)));
848
849 if (CloseTransientFile(fd) != 0)
852 errmsg("could not close file \"%s\": %m",
853 path)));
854}
#define LOG
Definition elog.h:31
#define DEBUG2
Definition elog.h:29
#define read(a, b, c)
Definition win32.h:13
#define ERRCODE_DATA_CORRUPTED
static int fd(const char *x, int i)
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fb(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, and ReplicationState::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replication_states

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 181 of file origin.c.

Referenced by ReplicationOriginShmemInit().

◆ replorigin_xact_state

◆ session_replication_state