PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/subsystems.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/wait_event.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"
 
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
 
#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void ReplicationOriginShmemRequest (void *arg)
 
static void ReplicationOriginShmemInit (void *arg)
 
static void ReplicationOriginShmemAttach (void *arg)
 
static void replorigin_check_prerequisites (bool check_origins, bool recoveryOK)
 
static bool IsReservedOriginName (const char *name)
 
ReplOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
ReplOriginId replorigin_create (const char *roname)
 
static void replorigin_state_clear (ReplOriginId roident, bool nowait)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (ReplOriginId roident, bool missing_ok, char **roname)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (ReplOriginId node, bool flush)
 
static void replorigin_session_reset_internal (void)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (ReplOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void replorigin_xact_clear (bool clear_origin)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

int max_active_replication_origins = 10
 
ReplOriginXactState replorigin_xact_state
 
static ReplicationStatereplication_states
 
const ShmemCallbacks ReplicationOriginShmemCallbacks
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ PG_REPLORIGIN_CHECKPOINT_FILENAME

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"

Definition at line 102 of file origin.c.

◆ PG_REPLORIGIN_CHECKPOINT_TMPFILE

#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"

Definition at line 103 of file origin.c.

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 204 of file origin.c.

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 614 of file origin.c.

615{
617 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
618 int tmpfd;
619 int i;
622
624 return;
625
627
628 /* make sure no old temp file is remaining */
629 if (unlink(tmppath) < 0 && errno != ENOENT)
632 errmsg("could not remove file \"%s\": %m",
633 tmppath)));
634
635 /*
636 * no other backend can perform this at the same time; only one checkpoint
637 * can happen at a time.
638 */
641 if (tmpfd < 0)
644 errmsg("could not create file \"%s\": %m",
645 tmppath)));
646
647 /* write magic */
648 errno = 0;
649 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
650 {
651 /* if write didn't set errno, assume problem is no disk space */
652 if (errno == 0)
653 errno = ENOSPC;
656 errmsg("could not write to file \"%s\": %m",
657 tmppath)));
658 }
659 COMP_CRC32C(crc, &magic, sizeof(magic));
660
661 /* prevent concurrent creations/drops */
663
664 /* write actual data */
665 for (i = 0; i < max_active_replication_origins; i++)
666 {
669 XLogRecPtr local_lsn;
670
671 if (curstate->roident == InvalidReplOriginId)
672 continue;
673
674 /* zero, to avoid uninitialized padding bytes */
675 memset(&disk_state, 0, sizeof(disk_state));
676
678
679 disk_state.roident = curstate->roident;
680
681 disk_state.remote_lsn = curstate->remote_lsn;
682 local_lsn = curstate->local_lsn;
683
684 LWLockRelease(&curstate->lock);
685
686 /* make sure we only write out a commit that's persistent */
687 XLogFlush(local_lsn);
688
689 errno = 0;
690 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
691 sizeof(disk_state))
692 {
693 /* if write didn't set errno, assume problem is no disk space */
694 if (errno == 0)
695 errno = ENOSPC;
698 errmsg("could not write to file \"%s\": %m",
699 tmppath)));
700 }
701
703 }
704
706
707 /* write out the CRC */
709 errno = 0;
710 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
711 {
712 /* if write didn't set errno, assume problem is no disk space */
713 if (errno == 0)
714 errno = ENOSPC;
717 errmsg("could not write to file \"%s\": %m",
718 tmppath)));
719 }
720
721 if (CloseTransientFile(tmpfd) != 0)
724 errmsg("could not close file \"%s\": %m",
725 tmppath)));
726
727 /* fsync, rename to permanent file, fsync file and directory */
729}
#define PG_BINARY
Definition c.h:1374
uint32_t uint32
Definition c.h:624
int errcode_for_file_access(void)
Definition elog.c:897
#define PANIC
Definition elog.h:44
#define ereport(elevel,...)
Definition elog.h:152
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
static char * errmsg
int max_active_replication_origins
Definition origin.c:106
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:103
static ReplicationState * replication_states
Definition origin.c:178
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:102
#define REPLICATION_STATE_MAGIC
Definition origin.c:204
#define InvalidReplOriginId
Definition origin.h:33
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:173
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:178
return crc
static int fb(int x)
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
uint64 XLogRecPtr
Definition xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg, fb(), FIN_CRC32C, i, INIT_CRC32C, InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, REPLICATION_STATE_MAGIC, replication_states, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ IsReservedOriginName()

static bool IsReservedOriginName ( const char name)
static

Definition at line 226 of file origin.c.

227{
228 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
230}
int pg_strcasecmp(const char *s1, const char *s2)
const char * name

References fb(), name, and pg_strcasecmp().

Referenced by pg_replication_origin_create().

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1585 of file origin.c.

1586{
1589 ReplOriginId node;
1590
1591 replorigin_check_prerequisites(true, false);
1592
1593 /* lock to prevent the replication origin from vanishing */
1595
1596 node = replorigin_by_name(text_to_cstring(name), false);
1597
1598 /*
1599 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1600 * xact hasn't committed yet. This is why this function should be used to
1601 * set up the initial replication state, but not for replay.
1602 */
1604 true /* go backward */ , true /* WAL log */ );
1605
1607
1609}
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
#define RowExclusiveLock
Definition lockdefs.h:38
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:928
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:207
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
Definition c.h:776
char * text_to_cstring(const text *t)
Definition varlena.c:217
uint16 ReplOriginId
Definition xlogdefs.h:69
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References fb(), InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1398 of file origin.c.

1399{
1400 char *name;
1401 ReplOriginId roident;
1402
1403 replorigin_check_prerequisites(false, false);
1404
1406
1407 /*
1408 * Replication origins "any and "none" are reserved for system options.
1409 * The origins "pg_xxx" are reserved for internal use.
1410 */
1412 ereport(ERROR,
1414 errmsg("replication origin name \"%s\" is reserved",
1415 name),
1416 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1418
1419 /*
1420 * If built with appropriate switch, whine when regression-testing
1421 * conventions for replication origin names are violated.
1422 */
1423#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1424 if (strncmp(name, "regress_", 8) != 0)
1425 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1426#endif
1427
1428 roident = replorigin_create(name);
1429
1430 pfree(name);
1431
1432 PG_RETURN_OID(roident);
1433}
bool IsReservedName(const char *name)
Definition catalog.c:278
int errcode(int sqlerrcode)
Definition elog.c:874
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_OID(x)
Definition fmgr.h:361
void pfree(void *pointer)
Definition mcxt.c:1616
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
static bool IsReservedOriginName(const char *name)
Definition origin.c:226
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332

References DatumGetPointer(), elog, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), IsReservedName(), IsReservedOriginName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), text_to_cstring(), and WARNING.

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1439 of file origin.c.

1440{
1441 char *name;
1442
1443 replorigin_check_prerequisites(false, false);
1444
1446
1447 replorigin_drop_by_name(name, false, true);
1448
1449 pfree(name);
1450
1452}
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1458 of file origin.c.

1459{
1460 char *name;
1461 ReplOriginId roident;
1462
1463 replorigin_check_prerequisites(false, false);
1464
1466 roident = replorigin_by_name(name, true);
1467
1468 pfree(name);
1469
1470 if (OidIsValid(roident))
1471 PG_RETURN_OID(roident);
1473}
#define OidIsValid(objectId)
Definition c.h:858
#define PG_RETURN_NULL()
Definition fmgr.h:346

References DatumGetPointer(), name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1620 of file origin.c.

1621{
1622 char *name;
1623 bool flush;
1624 ReplOriginId roident;
1625 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1626
1628
1630 flush = PG_GETARG_BOOL(1);
1631
1632 roident = replorigin_by_name(name, false);
1633 Assert(OidIsValid(roident));
1634
1635 remote_lsn = replorigin_get_progress(roident, flush);
1636
1637 if (!XLogRecPtrIsValid(remote_lsn))
1639
1640 PG_RETURN_LSN(remote_lsn);
1641}
#define Assert(condition)
Definition c.h:943
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References Assert, DatumGetPointer(), InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), text_to_cstring(), and XLogRecPtrIsValid.

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1518 of file origin.c.

1519{
1520 replorigin_check_prerequisites(false, false);
1521
1523}
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
ReplOriginXactState replorigin_xact_state
Definition origin.c:168
ReplOriginId origin
Definition origin.h:45

References InvalidReplOriginId, ReplOriginXactState::origin, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_xact_state.

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1534 of file origin.c.

1535{
1536 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1537 bool flush = PG_GETARG_BOOL(0);
1538
1539 replorigin_check_prerequisites(true, false);
1540
1542 ereport(ERROR,
1544 errmsg("no replication origin is configured")));
1545
1546 remote_lsn = replorigin_session_get_progress(flush);
1547
1548 if (!XLogRecPtrIsValid(remote_lsn))
1550
1551 PG_RETURN_LSN(remote_lsn);
1552}
static ReplicationState * session_replication_state
Definition origin.c:201
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1353

References ereport, errcode(), errmsg, ERROR, fb(), InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), replorigin_session_get_progress(), session_replication_state, and XLogRecPtrIsValid.

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1503 of file origin.c.

1504{
1505 replorigin_check_prerequisites(true, false);
1506
1508
1510
1512}
void replorigin_session_reset(void)
Definition origin.c:1301
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1377

References PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_reset(), and replorigin_xact_clear().

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1479 of file origin.c.

1480{
1481 char *name;
1482 ReplOriginId origin;
1483 int pid;
1484
1485 replorigin_check_prerequisites(true, false);
1486
1488 origin = replorigin_by_name(name, false);
1489 pid = PG_GETARG_INT32(1);
1490 replorigin_session_setup(origin, pid);
1491
1493
1494 pfree(name);
1495
1497}
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1156

References DatumGetPointer(), name, ReplOriginXactState::origin, pfree(), PG_GETARG_DATUM, PG_GETARG_INT32, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_setup(), replorigin_xact_state, and text_to_cstring().

◆ pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1573 of file origin.c.

1574{
1575 replorigin_check_prerequisites(true, false);
1576
1577 /* Do not clear the session origin */
1578 replorigin_xact_clear(false);
1579
1581}

References PG_RETURN_VOID, replorigin_check_prerequisites(), and replorigin_xact_clear().

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1645 of file origin.c.

1646{
1647 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1648 int i;
1649#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1650
1651 /* we want to return 0 rows if slot is set to zero */
1652 replorigin_check_prerequisites(false, true);
1653
1654 InitMaterializedSRF(fcinfo, 0);
1655
1656 /* prevent slots from being concurrently dropped */
1658
1659 /*
1660 * Iterate through all possible replication_states, display if they are
1661 * filled. Note that we do not take any locks, so slightly corrupted/out
1662 * of date values are a possibility.
1663 */
1664 for (i = 0; i < max_active_replication_origins; i++)
1665 {
1669 char *roname;
1670
1672
1673 /* unused slot, nothing to display */
1674 if (state->roident == InvalidReplOriginId)
1675 continue;
1676
1677 memset(values, 0, sizeof(values));
1678 memset(nulls, 1, sizeof(nulls));
1679
1680 values[0] = ObjectIdGetDatum(state->roident);
1681 nulls[0] = false;
1682
1683 /*
1684 * We're not preventing the origin to be dropped concurrently, so
1685 * silently accept that it might be gone.
1686 */
1687 if (replorigin_by_oid(state->roident, true,
1688 &roname))
1689 {
1691 nulls[1] = false;
1692 }
1693
1694 LWLockAcquire(&state->lock, LW_SHARED);
1695
1696 values[2] = LSNGetDatum(state->remote_lsn);
1697 nulls[2] = false;
1698
1699 values[3] = LSNGetDatum(state->local_lsn);
1700 nulls[3] = false;
1701
1702 LWLockRelease(&state->lock);
1703
1704 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1705 values, nulls);
1706 }
1707
1709
1710#undef REPLICATION_ORIGIN_PROGRESS_COLS
1711
1712 return (Datum) 0;
1713}
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:513
#define REPLICATION_ORIGIN_PROGRESS_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785

References CStringGetTextDatum, fb(), i, InitMaterializedSRF(), InvalidReplOriginId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, ObjectIdGetDatum(), REPLICATION_ORIGIN_PROGRESS_COLS, replication_states, replorigin_by_oid(), replorigin_check_prerequisites(), tuplestore_putvalues(), and values.

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1129 of file origin.c.

1130{
1132 return;
1133
1135}
static void replorigin_session_reset_internal(void)
Definition origin.c:1095

References fb(), replorigin_session_reset_internal(), and session_replication_state.

Referenced by replorigin_session_setup().

◆ ReplicationOriginShmemAttach()

static void ReplicationOriginShmemAttach ( void arg)
static

Definition at line 589 of file origin.c.

590{
592 return;
593
595}
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:193
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:164

References max_active_replication_origins, replication_states, replication_states_ctl, and ReplicationStateCtl::states.

◆ ReplicationOriginShmemInit()

static void ReplicationOriginShmemInit ( void arg)
static

◆ ReplicationOriginShmemRequest()

static void ReplicationOriginShmemRequest ( void arg)
static

Definition at line 554 of file origin.c.

555{
556 Size size = 0;
557
559 return;
560
561 size = add_size(size, offsetof(ReplicationStateCtl, states));
562 size = add_size(size,
564 ShmemRequestStruct(.name = "ReplicationOriginState",
565 .size = size,
566 .ptr = (void **) &replication_states_ctl,
567 );
568}
size_t Size
Definition c.h:689
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176

References add_size(), fb(), max_active_replication_origins, mul_size(), name, replication_states_ctl, and ShmemRequestStruct.

◆ replorigin_advance()

void replorigin_advance ( ReplOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 928 of file origin.c.

931{
932 int i;
935
937
938 /* we don't track DoNotReplicateId */
939 if (node == DoNotReplicateId)
940 return;
941
942 /*
943 * XXX: For the case where this is called by WAL replay, it'd be more
944 * efficient to restore into a backend local hashtable and only dump into
945 * shmem after recovery is finished. Let's wait with implementing that
946 * till it's shown to be a measurable expense
947 */
948
949 /* Lock exclusively, as we may have to create a new table entry. */
951
952 /*
953 * Search for either an existing slot for the origin, or a free one we can
954 * use.
955 */
956 for (i = 0; i < max_active_replication_origins; i++)
957 {
959
960 /* remember where to insert if necessary */
961 if (curstate->roident == InvalidReplOriginId &&
962 free_state == NULL)
963 {
965 continue;
966 }
967
968 /* not our slot */
969 if (curstate->roident != node)
970 {
971 continue;
972 }
973
974 /* ok, found slot */
976
978
979 /* Make sure it's not used by somebody else */
980 if (replication_state->refcount > 0)
981 {
984 (replication_state->acquired_by != 0)
985 ? errmsg("replication origin with ID %d is already active for PID %d",
986 replication_state->roident,
987 replication_state->acquired_by)
988 : errmsg("replication origin with ID %d is already active in another process",
989 replication_state->roident)));
990 }
991
992 break;
993 }
994
998 errmsg("could not find free replication state slot for replication origin with ID %d",
999 node),
1000 errhint("Increase \"max_active_replication_origins\" and try again.")));
1001
1002 if (replication_state == NULL)
1003 {
1004 /* initialize new slot */
1009 replication_state->roident = node;
1010 }
1011
1013
1014 /*
1015 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1016 * and the standby gets the message. Primarily this will be called during
1017 * WAL replay (of commit records) where no WAL logging is necessary.
1018 */
1019 if (wal_log)
1020 {
1022
1024 xlrec.node_id = node;
1025 xlrec.force = go_backward;
1026
1028 XLogRegisterData(&xlrec, sizeof(xlrec));
1029
1031 }
1032
1033 /*
1034 * Due to - harmless - race conditions during a checkpoint we could see
1035 * values here that are older than the ones we already have in memory. We
1036 * could also see older values for prepared transactions when the prepare
1037 * is sent at a later point of time along with commit prepared and there
1038 * are other transactions commits between prepare and commit prepared. See
1039 * ReorderBufferFinishPrepared. Don't overwrite those.
1040 */
1041 if (go_backward || replication_state->remote_lsn < remote_commit)
1042 replication_state->remote_lsn = remote_commit;
1044 (go_backward || replication_state->local_lsn < local_commit))
1045 replication_state->local_lsn = local_commit;
1047
1048 /*
1049 * Release *after* changing the LSNs, slot isn't acquired and thus could
1050 * otherwise be dropped anytime.
1051 */
1053}
int errhint(const char *fmt,...) pg_attribute_printf(1
@ LW_EXCLUSIVE
Definition lwlock.h:104
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
XLogRecPtr remote_lsn
Definition origin.h:20
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:482
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:372
void XLogBeginInsert(void)
Definition xloginsert.c:153

References Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::remote_lsn, replication_states, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), XLogRecPtrIsValid, and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

ReplOriginId replorigin_by_name ( const char roname,
bool  missing_ok 
)

Definition at line 243 of file origin.c.

244{
246 Oid roident = InvalidOid;
247 HeapTuple tuple;
249
251
253 if (HeapTupleIsValid(tuple))
254 {
256 roident = ident->roident;
257 ReleaseSysCache(tuple);
258 }
259 else if (!missing_ok)
262 errmsg("replication origin \"%s\" does not exist",
263 roname)));
264
265 return roident;
266}
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
END_CATALOG_STRUCT typedef FormData_pg_replication_origin * Form_pg_replication_origin
#define InvalidOid
unsigned int Oid
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221

References CStringGetTextDatum, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( ReplOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 513 of file origin.c.

514{
515 HeapTuple tuple;
517
518 Assert(OidIsValid((Oid) roident));
519 Assert(roident != InvalidReplOriginId);
520 Assert(roident != DoNotReplicateId);
521
523 ObjectIdGetDatum((Oid) roident));
524
525 if (HeapTupleIsValid(tuple))
526 {
528 *roname = text_to_cstring(&ric->roname);
529 ReleaseSysCache(tuple);
530
531 return true;
532 }
533 else
534 {
535 *roname = NULL;
536
537 if (!missing_ok)
540 errmsg("replication origin with ID %d does not exist",
541 roident)));
542
543 return false;
544 }
545}

References Assert, DoNotReplicateId, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, InvalidReplOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_check_prerequisites()

◆ replorigin_create()

ReplOriginId replorigin_create ( const char roname)

Definition at line 274 of file origin.c.

275{
276 Oid roident;
277 HeapTuple tuple = NULL;
278 Relation rel;
281 SysScanDesc scan;
283
284 /*
285 * To avoid needing a TOAST table for pg_replication_origin, we limit
286 * replication origin names to 512 bytes. This should be more than enough
287 * for all practical use.
288 */
292 errmsg("replication origin name is too long"),
293 errdetail("Replication origin names must be no longer than %d bytes.",
295
297
299
300 /*
301 * We need the numeric replication origin to be 16bit wide, so we cannot
302 * rely on the normal oid allocation. Instead we simply scan
303 * pg_replication_origin for the first unused id. That's not particularly
304 * efficient, but this should be a fairly infrequent operation - we can
305 * easily spend a bit more code on this when it turns out it needs to be
306 * faster.
307 *
308 * We handle concurrency by taking an exclusive lock (allowing reads!)
309 * over the table for the duration of the search. Because we use a "dirty
310 * snapshot" we can read rows that other in-progress sessions have
311 * written, even though they would be invisible with normal snapshots. Due
312 * to the exclusive lock there's no danger that new rows can appear while
313 * we're checking.
314 */
316
318
319 /*
320 * We want to be able to access pg_replication_origin without setting up a
321 * snapshot. To make that safe, it needs to not have a TOAST table, since
322 * TOASTed data cannot be fetched without a snapshot. As of this writing,
323 * its only varlena column is roname, which we limit to 512 bytes to avoid
324 * needing out-of-line storage. If you add a TOAST table to this catalog,
325 * be sure to set up a snapshot everywhere it might be needed. For more
326 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
327 */
328 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
329
330 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
331 {
332 bool nulls[Natts_pg_replication_origin];
334 bool collides;
335
337
338 ScanKeyInit(&key,
341 ObjectIdGetDatum(roident));
342
344 true /* indexOK */ ,
346 1, &key);
347
349
350 systable_endscan(scan);
351
352 if (!collides)
353 {
354 /*
355 * Ok, found an unused roident, insert the new row and do a CCI,
356 * so our callers can look it up if they want to.
357 */
358 memset(&nulls, 0, sizeof(nulls));
359
362
363 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
364 CatalogTupleInsert(rel, tuple);
366 break;
367 }
368 }
369
370 /* now release lock again, */
372
373 if (tuple == NULL)
376 errmsg("could not find free replication origin ID")));
377
378 heap_freetuple(tuple);
379 return roident;
380}
#define PG_UINT16_MAX
Definition c.h:671
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:612
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:523
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define ExclusiveLock
Definition lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define MAX_RONAME_LEN
Definition origin.h:41
#define RelationGetDescr(relation)
Definition rel.h:542
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
Form_pg_class rd_rel
Definition rel.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1130

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg, ERROR, ExclusiveLock, fb(), heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char name,
bool  missing_ok,
bool  nowait 
)

Definition at line 459 of file origin.c.

460{
461 ReplOriginId roident;
462 Relation rel;
463 HeapTuple tuple;
464
466
468
469 roident = replorigin_by_name(name, missing_ok);
470
471 /* Lock the origin to prevent concurrent drops. */
474
476 if (!HeapTupleIsValid(tuple))
477 {
478 if (!missing_ok)
479 elog(ERROR, "cache lookup failed for replication origin with ID %d",
480 roident);
481
482 /*
483 * We don't need to retain the locks if the origin is already dropped.
484 */
488 return;
489 }
490
491 replorigin_state_clear(roident, nowait);
492
493 /*
494 * Now, we can delete the catalog entry.
495 */
496 CatalogTupleDelete(rel, &tuple->t_self);
497 ReleaseSysCache(tuple);
498
500
501 /* We keep the lock on pg_replication_origin until commit */
502 table_close(rel, NoLock);
503}
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:386
ItemPointerData t_self
Definition htup.h:65

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, fb(), HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), ProcessSyncingTablesForApply(), and ProcessSyncingTablesForSync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( ReplOriginId  node,
bool  flush 
)

Definition at line 1057 of file origin.c.

1058{
1059 int i;
1060 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1061 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1062
1063 /* prevent slots from being concurrently dropped */
1065
1066 for (i = 0; i < max_active_replication_origins; i++)
1067 {
1069
1071
1072 if (state->roident == node)
1073 {
1074 LWLockAcquire(&state->lock, LW_SHARED);
1075
1076 remote_lsn = state->remote_lsn;
1077 local_lsn = state->local_lsn;
1078
1079 LWLockRelease(&state->lock);
1080
1081 break;
1082 }
1083 }
1084
1086
1087 if (flush && XLogRecPtrIsValid(local_lsn))
1088 XLogFlush(local_lsn);
1089
1090 return remote_lsn;
1091}

References fb(), i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, XLogFlush(), and XLogRecPtrIsValid.

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 867 of file origin.c.

868{
869 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
870
871 switch (info)
872 {
874 {
877
878 replorigin_advance(xlrec->node_id,
879 xlrec->remote_lsn, record->EndRecPtr,
880 xlrec->force /* backward */ ,
881 false /* WAL log */ );
882 break;
883 }
885 {
887 int i;
888
890
891 for (i = 0; i < max_active_replication_origins; i++)
892 {
894
895 /* found our slot */
896 if (state->roident == xlrec->node_id)
897 {
898 /* reset entry */
899 state->roident = InvalidReplOriginId;
900 state->remote_lsn = InvalidXLogRecPtr;
901 state->local_lsn = InvalidXLogRecPtr;
902 break;
903 }
904 }
905 break;
906 }
907 default:
908 elog(PANIC, "replorigin_redo: unknown op code %u", info);
909 }
910}
uint8_t uint8
Definition c.h:622
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:410
#define XLogRecGetData(decoder)
Definition xlogreader.h:415

References elog, XLogReaderState::EndRecPtr, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, max_active_replication_origins, PANIC, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1301 of file origin.c.

1302{
1304
1306 ereport(ERROR,
1308 errmsg("no replication origin is configured")));
1309
1310 /*
1311 * Restrict explicit resetting of the replication origin if it was first
1312 * acquired by this process and others are still using it. While the
1313 * system handles this safely (as happens if the first session exits
1314 * without calling reset), it is best to avoid doing so.
1315 */
1318 ereport(ERROR,
1320 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1322 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1323 errhint("Reset the replication origin in all other processes before retrying.")));
1324
1326}
int MyProcPid
Definition globals.c:49

References ReplicationState::acquired_by, Assert, ereport, errcode(), errdetail(), errhint(), errmsg, ERROR, fb(), max_active_replication_origins, MyProcPid, ReplicationState::refcount, replorigin_session_reset_internal(), ReplicationState::roident, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and ProcessSyncingTablesForSync().

◆ replorigin_session_reset_internal()

static void replorigin_session_reset_internal ( void  )
static

Definition at line 1095 of file origin.c.

1096{
1098
1100
1102
1103 /* The origin must be held by at least one process at this point. */
1105
1106 /*
1107 * Reset the PID only if the current session is the first to set up this
1108 * origin. This avoids clearing the first process's PID when any other
1109 * session releases the origin.
1110 */
1113
1115
1118
1120
1122}
void ConditionVariableBroadcast(ConditionVariable *cv)
ConditionVariable origin_cv
Definition origin.c:141

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), fb(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, ReplicationState::origin_cv, ReplicationState::refcount, and session_replication_state.

Referenced by ReplicationOriginExitCleanup(), and replorigin_session_reset().

◆ replorigin_session_setup()

void replorigin_session_setup ( ReplOriginId  node,
int  acquired_by 
)

Definition at line 1156 of file origin.c.

1157{
1158 static bool registered_cleanup;
1159 int i;
1160 int free_slot = -1;
1161
1162 if (!registered_cleanup)
1163 {
1165 registered_cleanup = true;
1166 }
1167
1169
1171 ereport(ERROR,
1173 errmsg("cannot setup replication origin when one is already setup")));
1174
1175 /* Lock exclusively, as we may have to create a new table entry. */
1177
1178 /*
1179 * Search for either an existing slot for the origin, or a free one we can
1180 * use.
1181 */
1182 for (i = 0; i < max_active_replication_origins; i++)
1183 {
1185
1186 /* remember where to insert if necessary */
1187 if (curstate->roident == InvalidReplOriginId &&
1188 free_slot == -1)
1189 {
1190 free_slot = i;
1191 continue;
1192 }
1193
1194 /* not our slot */
1195 if (curstate->roident != node)
1196 continue;
1197
1198 if (acquired_by == 0)
1199 {
1200 /* With acquired_by == 0, we need the origin to be free */
1201 if (curstate->acquired_by != 0)
1202 {
1203 ereport(ERROR,
1205 errmsg("replication origin with ID %d is already active for PID %d",
1206 curstate->roident, curstate->acquired_by)));
1207 }
1208 else if (curstate->refcount > 0)
1209 {
1210 /*
1211 * The origin is in use, but PID is not recorded. This can
1212 * happen if the process that originally acquired the origin
1213 * exited without releasing it. To ensure correctness, other
1214 * processes cannot acquire the origin until all processes
1215 * currently using it have released it.
1216 */
1217 ereport(ERROR,
1219 errmsg("replication origin with ID %d is already active in another process",
1220 curstate->roident)));
1221 }
1222 }
1223 else
1224 {
1225 /*
1226 * With acquired_by != 0, we need the origin to be active by the
1227 * given PID
1228 */
1229 if (curstate->acquired_by != acquired_by)
1230 ereport(ERROR,
1232 errmsg("replication origin with ID %d is not active for PID %d",
1233 curstate->roident, acquired_by)));
1234
1235 /*
1236 * Here, it is okay to have refcount > 0 as more than one process
1237 * can safely re-use the origin.
1238 */
1239 }
1240
1241 /* ok, found slot */
1243 break;
1244 }
1245
1247 {
1248 if (acquired_by != 0)
1249 ereport(ERROR,
1251 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1252 acquired_by, node)));
1253
1254 /* initialize new slot */
1255 if (free_slot == -1)
1256 ereport(ERROR,
1258 errmsg("could not find free replication state slot for replication origin with ID %d",
1259 node),
1260 errhint("Increase \"max_active_replication_origins\" and try again.")));
1261
1266 }
1267
1268
1270
1271 if (acquired_by == 0)
1272 {
1275 }
1276 else
1277 {
1278 /*
1279 * Sanity check: the origin must already be acquired by the process
1280 * passed as input, and at least one process must be using it.
1281 */
1284 }
1285
1287
1289
1290 /* probably this one is pointless */
1292}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1129

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::refcount, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, session_replication_state, and XLogRecPtrIsValid.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_state_clear()

static void replorigin_state_clear ( ReplOriginId  roident,
bool  nowait 
)
static

Definition at line 386 of file origin.c.

387{
388 int i;
389
390 /*
391 * Clean up the slot state info, if there is any matching slot.
392 */
393restart:
395
396 for (i = 0; i < max_active_replication_origins; i++)
397 {
399
400 if (state->roident == roident)
401 {
402 /* found our slot, is it busy? */
403 if (state->refcount > 0)
404 {
406
407 if (nowait)
410 (state->acquired_by != 0)
411 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
412 state->roident,
413 state->acquired_by)
414 : errmsg("could not drop replication origin with ID %d, in use by another process",
415 state->roident)));
416
417 /*
418 * We must wait and then retry. Since we don't know which CV
419 * to wait on until here, we can't readily use
420 * ConditionVariablePrepareToSleep (calling it here would be
421 * wrong, since we could miss the signal if we did so); just
422 * use ConditionVariableSleep directly.
423 */
424 cv = &state->origin_cv;
425
427
429 goto restart;
430 }
431
432 /* first make a WAL log entry */
433 {
435
436 xlrec.node_id = roident;
438 XLogRegisterData(&xlrec, sizeof(xlrec));
440 }
441
442 /* then clear the in-memory slot */
443 state->roident = InvalidReplOriginId;
444 state->remote_lsn = InvalidXLogRecPtr;
445 state->local_lsn = InvalidXLogRecPtr;
446 break;
447 }
448 }
451}
bool ConditionVariableCancelSleep(void)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
ReplOriginId node_id
Definition origin.h:27

References ConditionVariableCancelSleep(), ConditionVariableSleep(), ereport, errcode(), errmsg, ERROR, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_drop::node_id, replication_states, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

◆ replorigin_xact_clear()

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 740 of file origin.c.

741{
742 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
743 int fd;
744 int readBytes;
746 int last_state = 0;
749
750 /* don't want to overwrite already existing state */
751#ifdef USE_ASSERT_CHECKING
752 static bool already_started = false;
753
755 already_started = true;
756#endif
757
759 return;
760
762
763 elog(DEBUG2, "starting up replication origin progress state");
764
766
767 /*
768 * might have had max_active_replication_origins == 0 last run, or we just
769 * brought up a standby.
770 */
771 if (fd < 0 && errno == ENOENT)
772 return;
773 else if (fd < 0)
776 errmsg("could not open file \"%s\": %m",
777 path)));
778
779 /* verify magic, that is written even if nothing was active */
780 readBytes = read(fd, &magic, sizeof(magic));
781 if (readBytes != sizeof(magic))
782 {
783 if (readBytes < 0)
786 errmsg("could not read file \"%s\": %m",
787 path)));
788 else
791 errmsg("could not read file \"%s\": read %d of %zu",
792 path, readBytes, sizeof(magic))));
793 }
794 COMP_CRC32C(crc, &magic, sizeof(magic));
795
796 if (magic != REPLICATION_STATE_MAGIC)
798 (errmsg("replication checkpoint has wrong magic %u instead of %u",
799 magic, REPLICATION_STATE_MAGIC)));
800
801 /* we can skip locking here, no other access is possible */
802
803 /* recover individual states, until there are no more to be found */
804 while (true)
805 {
807
808 readBytes = read(fd, &disk_state, sizeof(disk_state));
809
810 if (readBytes < 0)
811 {
814 errmsg("could not read file \"%s\": %m",
815 path)));
816 }
817
818 /* no further data */
819 if (readBytes == sizeof(crc))
820 {
821 memcpy(&file_crc, &disk_state, sizeof(file_crc));
822 break;
823 }
824
825 if (readBytes != sizeof(disk_state))
826 {
829 errmsg("could not read file \"%s\": read %d of %zu",
830 path, readBytes, sizeof(disk_state))));
831 }
832
834
838 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
839
840 /* copy data to shared memory */
843 last_state++;
844
845 ereport(LOG,
846 errmsg("recovered replication state of node %d to %X/%08X",
847 disk_state.roident,
848 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
849 }
850
851 /* now check checksum */
853 if (file_crc != crc)
856 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
857 crc, file_crc)));
858
859 if (CloseTransientFile(fd) != 0)
862 errmsg("could not close file \"%s\": %m",
863 path)));
864}
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
#define LOG
Definition elog.h:32
#define DEBUG2
Definition elog.h:30
#define read(a, b, c)
Definition win32.h:13
#define ERRCODE_DATA_CORRUPTED
static int fd(const char *x, int i)
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, fb(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, memcpy(), OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, and ReplicationState::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replication_states

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

◆ ReplicationOriginShmemCallbacks

const ShmemCallbacks ReplicationOriginShmemCallbacks
Initial value:
= {
}
static void ReplicationOriginShmemInit(void *arg)
Definition origin.c:571
static void ReplicationOriginShmemRequest(void *arg)
Definition origin.c:554
static void ReplicationOriginShmemAttach(void *arg)
Definition origin.c:589

Definition at line 184 of file origin.c.

184 {
185 .request_fn = ReplicationOriginShmemRequest,
187 .attach_fn = ReplicationOriginShmemAttach,
188};

◆ replorigin_xact_state

◆ session_replication_state