PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/wait_event.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"
 
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
 
#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_origins, bool recoveryOK)
 
static bool IsReservedOriginName (const char *name)
 
ReplOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
ReplOriginId replorigin_create (const char *roname)
 
static void replorigin_state_clear (ReplOriginId roident, bool nowait)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (ReplOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (ReplOriginId node, bool flush)
 
static void replorigin_session_reset_internal (void)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (ReplOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void replorigin_xact_clear (bool clear_origin)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

int max_active_replication_origins = 10
 
ReplOriginXactState replorigin_xact_state
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ PG_REPLORIGIN_CHECKPOINT_FILENAME

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"

Definition at line 101 of file origin.c.

◆ PG_REPLORIGIN_CHECKPOINT_TMPFILE

#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"

Definition at line 102 of file origin.c.

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 193 of file origin.c.

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 605 of file origin.c.

606{
608 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
609 int tmpfd;
610 int i;
613
615 return;
616
618
619 /* make sure no old temp file is remaining */
620 if (unlink(tmppath) < 0 && errno != ENOENT)
623 errmsg("could not remove file \"%s\": %m",
624 tmppath)));
625
626 /*
627 * no other backend can perform this at the same time; only one checkpoint
628 * can happen at a time.
629 */
632 if (tmpfd < 0)
635 errmsg("could not create file \"%s\": %m",
636 tmppath)));
637
638 /* write magic */
639 errno = 0;
640 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
641 {
642 /* if write didn't set errno, assume problem is no disk space */
643 if (errno == 0)
644 errno = ENOSPC;
647 errmsg("could not write to file \"%s\": %m",
648 tmppath)));
649 }
650 COMP_CRC32C(crc, &magic, sizeof(magic));
651
652 /* prevent concurrent creations/drops */
654
655 /* write actual data */
656 for (i = 0; i < max_active_replication_origins; i++)
657 {
660 XLogRecPtr local_lsn;
661
662 if (curstate->roident == InvalidReplOriginId)
663 continue;
664
665 /* zero, to avoid uninitialized padding bytes */
666 memset(&disk_state, 0, sizeof(disk_state));
667
669
670 disk_state.roident = curstate->roident;
671
672 disk_state.remote_lsn = curstate->remote_lsn;
673 local_lsn = curstate->local_lsn;
674
675 LWLockRelease(&curstate->lock);
676
677 /* make sure we only write out a commit that's persistent */
678 XLogFlush(local_lsn);
679
680 errno = 0;
681 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
682 sizeof(disk_state))
683 {
684 /* if write didn't set errno, assume problem is no disk space */
685 if (errno == 0)
686 errno = ENOSPC;
689 errmsg("could not write to file \"%s\": %m",
690 tmppath)));
691 }
692
694 }
695
697
698 /* write out the CRC */
700 errno = 0;
701 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
702 {
703 /* if write didn't set errno, assume problem is no disk space */
704 if (errno == 0)
705 errno = ENOSPC;
708 errmsg("could not write to file \"%s\": %m",
709 tmppath)));
710 }
711
712 if (CloseTransientFile(tmpfd) != 0)
715 errmsg("could not close file \"%s\": %m",
716 tmppath)));
717
718 /* fsync, rename to permanent file, fsync file and directory */
720}
#define PG_BINARY
Definition c.h:1376
uint32_t uint32
Definition c.h:618
int errcode_for_file_access(void)
Definition elog.c:897
#define PANIC
Definition elog.h:42
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
static char * errmsg
int max_active_replication_origins
Definition origin.c:105
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:102
static ReplicationState * replication_states
Definition origin.c:177
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:101
#define REPLICATION_STATE_MAGIC
Definition origin.c:193
#define InvalidReplOriginId
Definition origin.h:33
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
static int fb(int x)
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2767
uint64 XLogRecPtr
Definition xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg, fb(), FIN_CRC32C, i, INIT_CRC32C, InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, REPLICATION_STATE_MAGIC, replication_states, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ IsReservedOriginName()

static bool IsReservedOriginName ( const char name)
static

Definition at line 215 of file origin.c.

216{
217 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
219}
int pg_strcasecmp(const char *s1, const char *s2)
const char * name

References fb(), name, and pg_strcasecmp().

Referenced by pg_replication_origin_create().

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1561 of file origin.c.

1562{
1565 ReplOriginId node;
1566
1567 replorigin_check_prerequisites(true, false);
1568
1569 /* lock to prevent the replication origin from vanishing */
1571
1572 node = replorigin_by_name(text_to_cstring(name), false);
1573
1574 /*
1575 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1576 * xact hasn't committed yet. This is why this function should be used to
1577 * set up the initial replication state, but not for replay.
1578 */
1580 true /* go backward */ , true /* WAL log */ );
1581
1583
1585}
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
#define RowExclusiveLock
Definition lockdefs.h:38
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:232
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:919
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:196
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
Definition c.h:778
char * text_to_cstring(const text *t)
Definition varlena.c:217
uint16 ReplOriginId
Definition xlogdefs.h:69
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References fb(), InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1374 of file origin.c.

1375{
1376 char *name;
1377 ReplOriginId roident;
1378
1379 replorigin_check_prerequisites(false, false);
1380
1382
1383 /*
1384 * Replication origins "any and "none" are reserved for system options.
1385 * The origins "pg_xxx" are reserved for internal use.
1386 */
1388 ereport(ERROR,
1390 errmsg("replication origin name \"%s\" is reserved",
1391 name),
1392 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1394
1395 /*
1396 * If built with appropriate switch, whine when regression-testing
1397 * conventions for replication origin names are violated.
1398 */
1399#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1400 if (strncmp(name, "regress_", 8) != 0)
1401 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1402#endif
1403
1404 roident = replorigin_create(name);
1405
1406 pfree(name);
1407
1408 PG_RETURN_OID(roident);
1409}
bool IsReservedName(const char *name)
Definition catalog.c:278
int errcode(int sqlerrcode)
Definition elog.c:874
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_OID(x)
Definition fmgr.h:361
void pfree(void *pointer)
Definition mcxt.c:1616
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:263
static bool IsReservedOriginName(const char *name)
Definition origin.c:215
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332

References DatumGetPointer(), elog, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), IsReservedName(), IsReservedOriginName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), text_to_cstring(), and WARNING.

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1415 of file origin.c.

1416{
1417 char *name;
1418
1419 replorigin_check_prerequisites(false, false);
1420
1422
1423 replorigin_drop_by_name(name, false, true);
1424
1425 pfree(name);
1426
1428}
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:448

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1434 of file origin.c.

1435{
1436 char *name;
1437 ReplOriginId roident;
1438
1439 replorigin_check_prerequisites(false, false);
1440
1442 roident = replorigin_by_name(name, true);
1443
1444 pfree(name);
1445
1446 if (OidIsValid(roident))
1447 PG_RETURN_OID(roident);
1449}
#define OidIsValid(objectId)
Definition c.h:860
#define PG_RETURN_NULL()
Definition fmgr.h:346

References DatumGetPointer(), name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1596 of file origin.c.

1597{
1598 char *name;
1599 bool flush;
1600 ReplOriginId roident;
1601 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1602
1604
1606 flush = PG_GETARG_BOOL(1);
1607
1608 roident = replorigin_by_name(name, false);
1609 Assert(OidIsValid(roident));
1610
1611 remote_lsn = replorigin_get_progress(roident, flush);
1612
1613 if (!XLogRecPtrIsValid(remote_lsn))
1615
1616 PG_RETURN_LSN(remote_lsn);
1617}
#define Assert(condition)
Definition c.h:945
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1048
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References Assert, DatumGetPointer(), InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), text_to_cstring(), and XLogRecPtrIsValid.

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1494 of file origin.c.

1495{
1496 replorigin_check_prerequisites(false, false);
1497
1499}
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
ReplOriginXactState replorigin_xact_state
Definition origin.c:167
ReplOriginId origin
Definition origin.h:45

References InvalidReplOriginId, ReplOriginXactState::origin, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_xact_state.

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1510 of file origin.c.

1511{
1512 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1513 bool flush = PG_GETARG_BOOL(0);
1514
1515 replorigin_check_prerequisites(true, false);
1516
1518 ereport(ERROR,
1520 errmsg("no replication origin is configured")));
1521
1522 remote_lsn = replorigin_session_get_progress(flush);
1523
1524 if (!XLogRecPtrIsValid(remote_lsn))
1526
1527 PG_RETURN_LSN(remote_lsn);
1528}
static ReplicationState * session_replication_state
Definition origin.c:190
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1329

References ereport, errcode(), errmsg, ERROR, fb(), InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), replorigin_session_get_progress(), session_replication_state, and XLogRecPtrIsValid.

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1479 of file origin.c.

1480{
1481 replorigin_check_prerequisites(true, false);
1482
1484
1486
1488}
void replorigin_session_reset(void)
Definition origin.c:1277
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1353

References PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_reset(), and replorigin_xact_clear().

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1455 of file origin.c.

1456{
1457 char *name;
1458 ReplOriginId origin;
1459 int pid;
1460
1461 replorigin_check_prerequisites(true, false);
1462
1464 origin = replorigin_by_name(name, false);
1465 pid = PG_GETARG_INT32(1);
1466 replorigin_session_setup(origin, pid);
1467
1469
1470 pfree(name);
1471
1473}
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1147

References DatumGetPointer(), name, ReplOriginXactState::origin, pfree(), PG_GETARG_DATUM, PG_GETARG_INT32, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_setup(), replorigin_xact_state, and text_to_cstring().

◆ pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1549 of file origin.c.

1550{
1551 replorigin_check_prerequisites(true, false);
1552
1553 /* Do not clear the session origin */
1554 replorigin_xact_clear(false);
1555
1557}

References PG_RETURN_VOID, replorigin_check_prerequisites(), and replorigin_xact_clear().

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1621 of file origin.c.

1622{
1623 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1624 int i;
1625#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1626
1627 /* we want to return 0 rows if slot is set to zero */
1628 replorigin_check_prerequisites(false, true);
1629
1630 InitMaterializedSRF(fcinfo, 0);
1631
1632 /* prevent slots from being concurrently dropped */
1634
1635 /*
1636 * Iterate through all possible replication_states, display if they are
1637 * filled. Note that we do not take any locks, so slightly corrupted/out
1638 * of date values are a possibility.
1639 */
1640 for (i = 0; i < max_active_replication_origins; i++)
1641 {
1645 char *roname;
1646
1648
1649 /* unused slot, nothing to display */
1650 if (state->roident == InvalidReplOriginId)
1651 continue;
1652
1653 memset(values, 0, sizeof(values));
1654 memset(nulls, 1, sizeof(nulls));
1655
1656 values[0] = ObjectIdGetDatum(state->roident);
1657 nulls[0] = false;
1658
1659 /*
1660 * We're not preventing the origin to be dropped concurrently, so
1661 * silently accept that it might be gone.
1662 */
1663 if (replorigin_by_oid(state->roident, true,
1664 &roname))
1665 {
1667 nulls[1] = false;
1668 }
1669
1670 LWLockAcquire(&state->lock, LW_SHARED);
1671
1672 values[2] = LSNGetDatum(state->remote_lsn);
1673 nulls[2] = false;
1674
1675 values[3] = LSNGetDatum(state->local_lsn);
1676 nulls[3] = false;
1677
1678 LWLockRelease(&state->lock);
1679
1680 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1681 values, nulls);
1682 }
1683
1685
1686#undef REPLICATION_ORIGIN_PROGRESS_COLS
1687
1688 return (Datum) 0;
1689}
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define CStringGetTextDatum(s)
Definition builtins.h:98
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:502
#define REPLICATION_ORIGIN_PROGRESS_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785

References CStringGetTextDatum, fb(), i, InitMaterializedSRF(), InvalidReplOriginId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, ObjectIdGetDatum(), REPLICATION_ORIGIN_PROGRESS_COLS, replication_states, replorigin_by_oid(), replorigin_check_prerequisites(), tuplestore_putvalues(), and values.

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1120 of file origin.c.

1121{
1123 return;
1124
1126}
static void replorigin_session_reset_internal(void)
Definition origin.c:1086

References fb(), replorigin_session_reset_internal(), and session_replication_state.

Referenced by replorigin_session_setup().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 558 of file origin.c.

559{
560 bool found;
561
563 return;
564
566 ShmemInitStruct("ReplicationOriginState",
568 &found);
570
571 if (!found)
572 {
573 int i;
574
576
578
579 for (i = 0; i < max_active_replication_origins; i++)
580 {
584 }
585 }
586}
#define MemSet(start, val, len)
Definition c.h:1109
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:699
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:182
Size ReplicationOriginShmemSize(void)
Definition origin.c:543
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:163

References ConditionVariableInit(), fb(), i, LWLockInitialize(), max_active_replication_origins, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 543 of file origin.c.

544{
545 Size size = 0;
546
548 return size;
549
550 size = add_size(size, offsetof(ReplicationStateCtl, states));
551
552 size = add_size(size,
554 return size;
555}
size_t Size
Definition c.h:691
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500

References add_size(), fb(), max_active_replication_origins, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( ReplOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 919 of file origin.c.

922{
923 int i;
926
928
929 /* we don't track DoNotReplicateId */
930 if (node == DoNotReplicateId)
931 return;
932
933 /*
934 * XXX: For the case where this is called by WAL replay, it'd be more
935 * efficient to restore into a backend local hashtable and only dump into
936 * shmem after recovery is finished. Let's wait with implementing that
937 * till it's shown to be a measurable expense
938 */
939
940 /* Lock exclusively, as we may have to create a new table entry. */
942
943 /*
944 * Search for either an existing slot for the origin, or a free one we can
945 * use.
946 */
947 for (i = 0; i < max_active_replication_origins; i++)
948 {
950
951 /* remember where to insert if necessary */
952 if (curstate->roident == InvalidReplOriginId &&
953 free_state == NULL)
954 {
956 continue;
957 }
958
959 /* not our slot */
960 if (curstate->roident != node)
961 {
962 continue;
963 }
964
965 /* ok, found slot */
967
969
970 /* Make sure it's not used by somebody else */
971 if (replication_state->refcount > 0)
972 {
975 (replication_state->acquired_by != 0)
976 ? errmsg("replication origin with ID %d is already active for PID %d",
977 replication_state->roident,
978 replication_state->acquired_by)
979 : errmsg("replication origin with ID %d is already active in another process",
980 replication_state->roident)));
981 }
982
983 break;
984 }
985
989 errmsg("could not find free replication state slot for replication origin with ID %d",
990 node),
991 errhint("Increase \"max_active_replication_origins\" and try again.")));
992
993 if (replication_state == NULL)
994 {
995 /* initialize new slot */
1000 replication_state->roident = node;
1001 }
1002
1004
1005 /*
1006 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1007 * and the standby gets the message. Primarily this will be called during
1008 * WAL replay (of commit records) where no WAL logging is necessary.
1009 */
1010 if (wal_log)
1011 {
1013
1015 xlrec.node_id = node;
1016 xlrec.force = go_backward;
1017
1019 XLogRegisterData(&xlrec, sizeof(xlrec));
1020
1022 }
1023
1024 /*
1025 * Due to - harmless - race conditions during a checkpoint we could see
1026 * values here that are older than the ones we already have in memory. We
1027 * could also see older values for prepared transactions when the prepare
1028 * is sent at a later point of time along with commit prepared and there
1029 * are other transactions commits between prepare and commit prepared. See
1030 * ReorderBufferFinishPrepared. Don't overwrite those.
1031 */
1032 if (go_backward || replication_state->remote_lsn < remote_commit)
1033 replication_state->remote_lsn = remote_commit;
1035 (go_backward || replication_state->local_lsn < local_commit))
1036 replication_state->local_lsn = local_commit;
1038
1039 /*
1040 * Release *after* changing the LSNs, slot isn't acquired and thus could
1041 * otherwise be dropped anytime.
1042 */
1044}
int errhint(const char *fmt,...) pg_attribute_printf(1
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
XLogRecPtr remote_lsn
Definition origin.h:20
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:479
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:369
void XLogBeginInsert(void)
Definition xloginsert.c:153

References Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::remote_lsn, replication_states, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), XLogRecPtrIsValid, and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

ReplOriginId replorigin_by_name ( const char roname,
bool  missing_ok 
)

Definition at line 232 of file origin.c.

233{
235 Oid roident = InvalidOid;
236 HeapTuple tuple;
238
240
242 if (HeapTupleIsValid(tuple))
243 {
245 roident = ident->roident;
246 ReleaseSysCache(tuple);
247 }
248 else if (!missing_ok)
251 errmsg("replication origin \"%s\" does not exist",
252 roname)));
253
254 return roident;
255}
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
END_CATALOG_STRUCT typedef FormData_pg_replication_origin * Form_pg_replication_origin
#define InvalidOid
unsigned int Oid
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220

References CStringGetTextDatum, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( ReplOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 502 of file origin.c.

503{
504 HeapTuple tuple;
506
507 Assert(OidIsValid((Oid) roident));
508 Assert(roident != InvalidReplOriginId);
509 Assert(roident != DoNotReplicateId);
510
512 ObjectIdGetDatum((Oid) roident));
513
514 if (HeapTupleIsValid(tuple))
515 {
517 *roname = text_to_cstring(&ric->roname);
518 ReleaseSysCache(tuple);
519
520 return true;
521 }
522 else
523 {
524 *roname = NULL;
525
526 if (!missing_ok)
529 errmsg("replication origin with ID %d does not exist",
530 roident)));
531
532 return false;
533 }
534}

References Assert, DoNotReplicateId, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, InvalidReplOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_check_prerequisites()

◆ replorigin_create()

ReplOriginId replorigin_create ( const char roname)

Definition at line 263 of file origin.c.

264{
265 Oid roident;
266 HeapTuple tuple = NULL;
267 Relation rel;
270 SysScanDesc scan;
272
273 /*
274 * To avoid needing a TOAST table for pg_replication_origin, we limit
275 * replication origin names to 512 bytes. This should be more than enough
276 * for all practical use.
277 */
281 errmsg("replication origin name is too long"),
282 errdetail("Replication origin names must be no longer than %d bytes.",
284
286
288
289 /*
290 * We need the numeric replication origin to be 16bit wide, so we cannot
291 * rely on the normal oid allocation. Instead we simply scan
292 * pg_replication_origin for the first unused id. That's not particularly
293 * efficient, but this should be a fairly infrequent operation - we can
294 * easily spend a bit more code on this when it turns out it needs to be
295 * faster.
296 *
297 * We handle concurrency by taking an exclusive lock (allowing reads!)
298 * over the table for the duration of the search. Because we use a "dirty
299 * snapshot" we can read rows that other in-progress sessions have
300 * written, even though they would be invisible with normal snapshots. Due
301 * to the exclusive lock there's no danger that new rows can appear while
302 * we're checking.
303 */
305
307
308 /*
309 * We want to be able to access pg_replication_origin without setting up a
310 * snapshot. To make that safe, it needs to not have a TOAST table, since
311 * TOASTed data cannot be fetched without a snapshot. As of this writing,
312 * its only varlena column is roname, which we limit to 512 bytes to avoid
313 * needing out-of-line storage. If you add a TOAST table to this catalog,
314 * be sure to set up a snapshot everywhere it might be needed. For more
315 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
316 */
317 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
318
319 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
320 {
321 bool nulls[Natts_pg_replication_origin];
323 bool collides;
324
326
327 ScanKeyInit(&key,
330 ObjectIdGetDatum(roident));
331
333 true /* indexOK */ ,
335 1, &key);
336
338
339 systable_endscan(scan);
340
341 if (!collides)
342 {
343 /*
344 * Ok, found an unused roident, insert the new row and do a CCI,
345 * so our callers can look it up if they want to.
346 */
347 memset(&nulls, 0, sizeof(nulls));
348
351
352 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
353 CatalogTupleInsert(rel, tuple);
355 break;
356 }
357 }
358
359 /* now release lock again, */
361
362 if (tuple == NULL)
365 errmsg("could not find free replication origin ID")));
366
367 heap_freetuple(tuple);
368 return roident;
369}
#define PG_UINT16_MAX
Definition c.h:673
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1037
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define ExclusiveLock
Definition lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define MAX_RONAME_LEN
Definition origin.h:41
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
Form_pg_class rd_rel
Definition rel.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1102

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg, ERROR, ExclusiveLock, fb(), heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char name,
bool  missing_ok,
bool  nowait 
)

Definition at line 448 of file origin.c.

449{
450 ReplOriginId roident;
451 Relation rel;
452 HeapTuple tuple;
453
455
457
458 roident = replorigin_by_name(name, missing_ok);
459
460 /* Lock the origin to prevent concurrent drops. */
463
465 if (!HeapTupleIsValid(tuple))
466 {
467 if (!missing_ok)
468 elog(ERROR, "cache lookup failed for replication origin with ID %d",
469 roident);
470
471 /*
472 * We don't need to retain the locks if the origin is already dropped.
473 */
477 return;
478 }
479
480 replorigin_state_clear(roident, nowait);
481
482 /*
483 * Now, we can delete the catalog entry.
484 */
485 CatalogTupleDelete(rel, &tuple->t_self);
486 ReleaseSysCache(tuple);
487
489
490 /* We keep the lock on pg_replication_origin until commit */
491 table_close(rel, NoLock);
492}
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:375
ItemPointerData t_self
Definition htup.h:65

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, fb(), HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), ProcessSyncingTablesForApply(), and ProcessSyncingTablesForSync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( ReplOriginId  node,
bool  flush 
)

Definition at line 1048 of file origin.c.

1049{
1050 int i;
1051 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1052 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1053
1054 /* prevent slots from being concurrently dropped */
1056
1057 for (i = 0; i < max_active_replication_origins; i++)
1058 {
1060
1062
1063 if (state->roident == node)
1064 {
1065 LWLockAcquire(&state->lock, LW_SHARED);
1066
1067 remote_lsn = state->remote_lsn;
1068 local_lsn = state->local_lsn;
1069
1070 LWLockRelease(&state->lock);
1071
1072 break;
1073 }
1074 }
1075
1077
1078 if (flush && XLogRecPtrIsValid(local_lsn))
1079 XLogFlush(local_lsn);
1080
1081 return remote_lsn;
1082}

References fb(), i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, XLogFlush(), and XLogRecPtrIsValid.

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 858 of file origin.c.

859{
860 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
861
862 switch (info)
863 {
865 {
868
869 replorigin_advance(xlrec->node_id,
870 xlrec->remote_lsn, record->EndRecPtr,
871 xlrec->force /* backward */ ,
872 false /* WAL log */ );
873 break;
874 }
876 {
878 int i;
879
881
882 for (i = 0; i < max_active_replication_origins; i++)
883 {
885
886 /* found our slot */
887 if (state->roident == xlrec->node_id)
888 {
889 /* reset entry */
890 state->roident = InvalidReplOriginId;
891 state->remote_lsn = InvalidXLogRecPtr;
892 state->local_lsn = InvalidXLogRecPtr;
893 break;
894 }
895 }
896 break;
897 }
898 default:
899 elog(PANIC, "replorigin_redo: unknown op code %u", info);
900 }
901}
uint8_t uint8
Definition c.h:616
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414

References elog, XLogReaderState::EndRecPtr, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, max_active_replication_origins, PANIC, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1277 of file origin.c.

1278{
1280
1282 ereport(ERROR,
1284 errmsg("no replication origin is configured")));
1285
1286 /*
1287 * Restrict explicit resetting of the replication origin if it was first
1288 * acquired by this process and others are still using it. While the
1289 * system handles this safely (as happens if the first session exits
1290 * without calling reset), it is best to avoid doing so.
1291 */
1294 ereport(ERROR,
1296 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1298 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1299 errhint("Reset the replication origin in all other processes before retrying.")));
1300
1302}
int MyProcPid
Definition globals.c:47

References ReplicationState::acquired_by, Assert, ereport, errcode(), errdetail(), errhint(), errmsg, ERROR, fb(), max_active_replication_origins, MyProcPid, ReplicationState::refcount, replorigin_session_reset_internal(), ReplicationState::roident, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and ProcessSyncingTablesForSync().

◆ replorigin_session_reset_internal()

static void replorigin_session_reset_internal ( void  )
static

Definition at line 1086 of file origin.c.

1087{
1089
1091
1093
1094 /* The origin must be held by at least one process at this point. */
1096
1097 /*
1098 * Reset the PID only if the current session is the first to set up this
1099 * origin. This avoids clearing the first process's PID when any other
1100 * session releases the origin.
1101 */
1104
1106
1109
1111
1113}
void ConditionVariableBroadcast(ConditionVariable *cv)
ConditionVariable origin_cv
Definition origin.c:140

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), fb(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, ReplicationState::origin_cv, ReplicationState::refcount, and session_replication_state.

Referenced by ReplicationOriginExitCleanup(), and replorigin_session_reset().

◆ replorigin_session_setup()

void replorigin_session_setup ( ReplOriginId  node,
int  acquired_by 
)

Definition at line 1147 of file origin.c.

1148{
1149 static bool registered_cleanup;
1150 int i;
1151 int free_slot = -1;
1152
1153 if (!registered_cleanup)
1154 {
1156 registered_cleanup = true;
1157 }
1158
1160
1162 ereport(ERROR,
1164 errmsg("cannot setup replication origin when one is already setup")));
1165
1166 /* Lock exclusively, as we may have to create a new table entry. */
1168
1169 /*
1170 * Search for either an existing slot for the origin, or a free one we can
1171 * use.
1172 */
1173 for (i = 0; i < max_active_replication_origins; i++)
1174 {
1176
1177 /* remember where to insert if necessary */
1178 if (curstate->roident == InvalidReplOriginId &&
1179 free_slot == -1)
1180 {
1181 free_slot = i;
1182 continue;
1183 }
1184
1185 /* not our slot */
1186 if (curstate->roident != node)
1187 continue;
1188
1189 else if (curstate->acquired_by != 0 && acquired_by == 0)
1190 {
1191 ereport(ERROR,
1193 errmsg("replication origin with ID %d is already active for PID %d",
1194 curstate->roident, curstate->acquired_by)));
1195 }
1196
1197 else if (curstate->acquired_by != acquired_by)
1198 {
1199 ereport(ERROR,
1201 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1202 node, acquired_by)));
1203 }
1204
1205 /*
1206 * The origin is in use, but PID is not recorded. This can happen if
1207 * the process that originally acquired the origin exited without
1208 * releasing it. To ensure correctness, other processes cannot acquire
1209 * the origin until all processes currently using it have released it.
1210 */
1211 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1212 ereport(ERROR,
1214 errmsg("replication origin with ID %d is already active in another process",
1215 curstate->roident)));
1216
1217 /* ok, found slot */
1219 break;
1220 }
1221
1222
1224 ereport(ERROR,
1226 errmsg("could not find free replication state slot for replication origin with ID %d",
1227 node),
1228 errhint("Increase \"max_active_replication_origins\" and try again.")));
1229 else if (session_replication_state == NULL)
1230 {
1231 if (acquired_by)
1232 ereport(ERROR,
1234 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1235 acquired_by, node)));
1236
1237 /* initialize new slot */
1242 }
1243
1244
1246
1247 if (acquired_by == 0)
1248 {
1251 }
1252 else
1253 {
1254 /*
1255 * Sanity check: the origin must already be acquired by the process
1256 * passed as input, and at least one process must be using it.
1257 */
1260 }
1261
1263
1265
1266 /* probably this one is pointless */
1268}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1120

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::refcount, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, session_replication_state, and XLogRecPtrIsValid.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_state_clear()

static void replorigin_state_clear ( ReplOriginId  roident,
bool  nowait 
)
static

Definition at line 375 of file origin.c.

376{
377 int i;
378
379 /*
380 * Clean up the slot state info, if there is any matching slot.
381 */
382restart:
384
385 for (i = 0; i < max_active_replication_origins; i++)
386 {
388
389 if (state->roident == roident)
390 {
391 /* found our slot, is it busy? */
392 if (state->refcount > 0)
393 {
395
396 if (nowait)
399 (state->acquired_by != 0)
400 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
401 state->roident,
402 state->acquired_by)
403 : errmsg("could not drop replication origin with ID %d, in use by another process",
404 state->roident)));
405
406 /*
407 * We must wait and then retry. Since we don't know which CV
408 * to wait on until here, we can't readily use
409 * ConditionVariablePrepareToSleep (calling it here would be
410 * wrong, since we could miss the signal if we did so); just
411 * use ConditionVariableSleep directly.
412 */
413 cv = &state->origin_cv;
414
416
418 goto restart;
419 }
420
421 /* first make a WAL log entry */
422 {
424
425 xlrec.node_id = roident;
427 XLogRegisterData(&xlrec, sizeof(xlrec));
429 }
430
431 /* then clear the in-memory slot */
432 state->roident = InvalidReplOriginId;
433 state->remote_lsn = InvalidXLogRecPtr;
434 state->local_lsn = InvalidXLogRecPtr;
435 break;
436 }
437 }
440}
bool ConditionVariableCancelSleep(void)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
ReplOriginId node_id
Definition origin.h:27

References ConditionVariableCancelSleep(), ConditionVariableSleep(), ereport, errcode(), errmsg, ERROR, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_drop::node_id, replication_states, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

◆ replorigin_xact_clear()

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 731 of file origin.c.

732{
733 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
734 int fd;
735 int readBytes;
737 int last_state = 0;
740
741 /* don't want to overwrite already existing state */
742#ifdef USE_ASSERT_CHECKING
743 static bool already_started = false;
744
746 already_started = true;
747#endif
748
750 return;
751
753
754 elog(DEBUG2, "starting up replication origin progress state");
755
757
758 /*
759 * might have had max_active_replication_origins == 0 last run, or we just
760 * brought up a standby.
761 */
762 if (fd < 0 && errno == ENOENT)
763 return;
764 else if (fd < 0)
767 errmsg("could not open file \"%s\": %m",
768 path)));
769
770 /* verify magic, that is written even if nothing was active */
771 readBytes = read(fd, &magic, sizeof(magic));
772 if (readBytes != sizeof(magic))
773 {
774 if (readBytes < 0)
777 errmsg("could not read file \"%s\": %m",
778 path)));
779 else
782 errmsg("could not read file \"%s\": read %d of %zu",
783 path, readBytes, sizeof(magic))));
784 }
785 COMP_CRC32C(crc, &magic, sizeof(magic));
786
787 if (magic != REPLICATION_STATE_MAGIC)
789 (errmsg("replication checkpoint has wrong magic %u instead of %u",
790 magic, REPLICATION_STATE_MAGIC)));
791
792 /* we can skip locking here, no other access is possible */
793
794 /* recover individual states, until there are no more to be found */
795 while (true)
796 {
798
799 readBytes = read(fd, &disk_state, sizeof(disk_state));
800
801 if (readBytes < 0)
802 {
805 errmsg("could not read file \"%s\": %m",
806 path)));
807 }
808
809 /* no further data */
810 if (readBytes == sizeof(crc))
811 {
812 memcpy(&file_crc, &disk_state, sizeof(file_crc));
813 break;
814 }
815
816 if (readBytes != sizeof(disk_state))
817 {
820 errmsg("could not read file \"%s\": read %d of %zu",
821 path, readBytes, sizeof(disk_state))));
822 }
823
825
829 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
830
831 /* copy data to shared memory */
834 last_state++;
835
836 ereport(LOG,
837 errmsg("recovered replication state of node %d to %X/%08X",
838 disk_state.roident,
839 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
840 }
841
842 /* now check checksum */
844 if (file_crc != crc)
847 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
848 crc, file_crc)));
849
850 if (CloseTransientFile(fd) != 0)
853 errmsg("could not close file \"%s\": %m",
854 path)));
855}
#define LOG
Definition elog.h:31
#define DEBUG2
Definition elog.h:29
#define read(a, b, c)
Definition win32.h:13
#define ERRCODE_DATA_CORRUPTED
static int fd(const char *x, int i)
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, fb(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, and ReplicationState::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replication_states

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 182 of file origin.c.

Referenced by ReplicationOriginShmemInit().

◆ replorigin_xact_state

◆ session_replication_state