PostgreSQL Source Code git master
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"
 
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
 
#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
static bool IsReservedOriginName (const char *name)
 
RepOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId replorigin_create (const char *roname)
 
static void replorigin_state_clear (RepOriginId roident, bool nowait)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ PG_REPLORIGIN_CHECKPOINT_FILENAME

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"

Definition at line 99 of file origin.c.

◆ PG_REPLORIGIN_CHECKPOINT_TMPFILE

#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"

Definition at line 100 of file origin.c.

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 186 of file origin.c.

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 577 of file origin.c.

578{
579 const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
580 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
581 int tmpfd;
582 int i;
585
586 if (max_replication_slots == 0)
587 return;
588
590
591 /* make sure no old temp file is remaining */
592 if (unlink(tmppath) < 0 && errno != ENOENT)
595 errmsg("could not remove file \"%s\": %m",
596 tmppath)));
597
598 /*
599 * no other backend can perform this at the same time; only one checkpoint
600 * can happen at a time.
601 */
602 tmpfd = OpenTransientFile(tmppath,
603 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
604 if (tmpfd < 0)
607 errmsg("could not create file \"%s\": %m",
608 tmppath)));
609
610 /* write magic */
611 errno = 0;
612 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
613 {
614 /* if write didn't set errno, assume problem is no disk space */
615 if (errno == 0)
616 errno = ENOSPC;
619 errmsg("could not write to file \"%s\": %m",
620 tmppath)));
621 }
622 COMP_CRC32C(crc, &magic, sizeof(magic));
623
624 /* prevent concurrent creations/drops */
625 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
626
627 /* write actual data */
628 for (i = 0; i < max_replication_slots; i++)
629 {
630 ReplicationStateOnDisk disk_state;
632 XLogRecPtr local_lsn;
633
634 if (curstate->roident == InvalidRepOriginId)
635 continue;
636
637 /* zero, to avoid uninitialized padding bytes */
638 memset(&disk_state, 0, sizeof(disk_state));
639
640 LWLockAcquire(&curstate->lock, LW_SHARED);
641
642 disk_state.roident = curstate->roident;
643
644 disk_state.remote_lsn = curstate->remote_lsn;
645 local_lsn = curstate->local_lsn;
646
647 LWLockRelease(&curstate->lock);
648
649 /* make sure we only write out a commit that's persistent */
650 XLogFlush(local_lsn);
651
652 errno = 0;
653 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
654 sizeof(disk_state))
655 {
656 /* if write didn't set errno, assume problem is no disk space */
657 if (errno == 0)
658 errno = ENOSPC;
661 errmsg("could not write to file \"%s\": %m",
662 tmppath)));
663 }
664
665 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
666 }
667
668 LWLockRelease(ReplicationOriginLock);
669
670 /* write out the CRC */
672 errno = 0;
673 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
674 {
675 /* if write didn't set errno, assume problem is no disk space */
676 if (errno == 0)
677 errno = ENOSPC;
680 errmsg("could not write to file \"%s\": %m",
681 tmppath)));
682 }
683
684 if (CloseTransientFile(tmpfd) != 0)
687 errmsg("could not close file \"%s\": %m",
688 tmppath)));
689
690 /* fsync, rename to permanent file, fsync file and directory */
691 durable_rename(tmppath, path, PANIC);
692}
#define PG_BINARY
Definition: c.h:1244
uint32_t uint32
Definition: c.h:502
int errcode_for_file_access(void)
Definition: elog.c:876
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PANIC
Definition: elog.h:42
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:781
int CloseTransientFile(int fd)
Definition: fd.c:2831
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2655
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1179
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1899
@ LW_SHARED
Definition: lwlock.h:115
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition: origin.c:100
static ReplicationState * replication_states
Definition: origin.c:170
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition: origin.c:99
#define REPLICATION_STATE_MAGIC
Definition: origin.c:186
#define InvalidRepOriginId
Definition: origin.h:33
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
return crc
int max_replication_slots
Definition: slot.c:150
XLogRecPtr remote_lsn
Definition: origin.c:146
RepOriginId roident
Definition: origin.c:145
XLogRecPtr remote_lsn
Definition: origin.c:115
XLogRecPtr local_lsn
Definition: origin.c:122
RepOriginId roident
Definition: origin.c:110
LWLock lock
Definition: origin.c:137
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2790
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ IsReservedOriginName()

static bool IsReservedOriginName ( const char *  name)
static

Definition at line 208 of file origin.c.

209{
210 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
211 (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
212}
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
const char * name

References name, and pg_strcasecmp().

Referenced by pg_replication_origin_create().

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1460 of file origin.c.

1461{
1463 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1464 RepOriginId node;
1465
1466 replorigin_check_prerequisites(true, false);
1467
1468 /* lock to prevent the replication origin from vanishing */
1469 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1470
1471 node = replorigin_by_name(text_to_cstring(name), false);
1472
1473 /*
1474 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1475 * xact hasn't committed yet. This is why this function should be used to
1476 * set up the initial replication state, but not for replay.
1477 */
1478 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1479 true /* go backward */ , true /* WAL log */ );
1480
1481 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1482
1484}
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
#define RowExclusiveLock
Definition: lockdefs.h:38
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:225
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:189
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:892
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:33
Definition: c.h:658
char * text_to_cstring(const text *t)
Definition: varlena.c:225
uint16 RepOriginId
Definition: xlogdefs.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1273 of file origin.c.

1274{
1275 char *name;
1276 RepOriginId roident;
1277
1278 replorigin_check_prerequisites(false, false);
1279
1281
1282 /*
1283 * Replication origins "any and "none" are reserved for system options.
1284 * The origins "pg_xxx" are reserved for internal use.
1285 */
1287 ereport(ERROR,
1288 (errcode(ERRCODE_RESERVED_NAME),
1289 errmsg("replication origin name \"%s\" is reserved",
1290 name),
1291 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1292 LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1293
1294 /*
1295 * If built with appropriate switch, whine when regression-testing
1296 * conventions for replication origin names are violated.
1297 */
1298#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1299 if (strncmp(name, "regress_", 8) != 0)
1300 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1301#endif
1302
1303 roident = replorigin_create(name);
1304
1305 pfree(name);
1306
1307 PG_RETURN_OID(roident);
1308}
bool IsReservedName(const char *name)
Definition: catalog.c:247
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errcode(int sqlerrcode)
Definition: elog.c:853
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
void pfree(void *pointer)
Definition: mcxt.c:1524
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:256
static bool IsReservedOriginName(const char *name)
Definition: origin.c:208
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317

References DatumGetPointer(), elog, ereport, errcode(), errdetail(), errmsg(), ERROR, IsReservedName(), IsReservedOriginName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), text_to_cstring(), and WARNING.

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1314 of file origin.c.

1315{
1316 char *name;
1317
1318 replorigin_check_prerequisites(false, false);
1319
1321
1322 replorigin_drop_by_name(name, false, true);
1323
1324 pfree(name);
1325
1327}
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:415

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1333 of file origin.c.

1334{
1335 char *name;
1336 RepOriginId roident;
1337
1338 replorigin_check_prerequisites(false, false);
1339
1341 roident = replorigin_by_name(name, true);
1342
1343 pfree(name);
1344
1345 if (OidIsValid(roident))
1346 PG_RETURN_OID(roident);
1348}
#define OidIsValid(objectId)
Definition: c.h:746
#define PG_RETURN_NULL()
Definition: fmgr.h:345

References DatumGetPointer(), name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1495 of file origin.c.

1496{
1497 char *name;
1498 bool flush;
1499 RepOriginId roident;
1500 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1501
1503
1505 flush = PG_GETARG_BOOL(1);
1506
1507 roident = replorigin_by_name(name, false);
1508 Assert(OidIsValid(roident));
1509
1510 remote_lsn = replorigin_get_progress(roident, flush);
1511
1512 if (remote_lsn == InvalidXLogRecPtr)
1514
1515 PG_RETURN_LSN(remote_lsn);
1516}
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
Assert(PointerIsAligned(start, uint64))
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1018
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:34

References Assert(), DatumGetPointer(), InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), and text_to_cstring().

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1393 of file origin.c.

1394{
1395 replorigin_check_prerequisites(false, false);
1396
1398}
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
RepOriginId replorigin_session_origin
Definition: origin.c:159

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1409 of file origin.c.

1410{
1411 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1412 bool flush = PG_GETARG_BOOL(0);
1413
1414 replorigin_check_prerequisites(true, false);
1415
1416 if (session_replication_state == NULL)
1417 ereport(ERROR,
1418 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1419 errmsg("no replication origin is configured")));
1420
1421 remote_lsn = replorigin_session_get_progress(flush);
1422
1423 if (remote_lsn == InvalidXLogRecPtr)
1425
1426 PG_RETURN_LSN(remote_lsn);
1427}
static ReplicationState * session_replication_state
Definition: origin.c:183
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1241

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), replorigin_session_get_progress(), and session_replication_state.

◆ pg_replication_origin_session_reset()

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1354 of file origin.c.

1355{
1356 char *name;
1357 RepOriginId origin;
1358
1359 replorigin_check_prerequisites(true, false);
1360
1362 origin = replorigin_by_name(name, false);
1363 replorigin_session_setup(origin, 0);
1364
1366
1367 pfree(name);
1368
1370}
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1101

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

◆ pg_replication_origin_xact_reset()

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1430 of file origin.c.

1431{
1432 XLogRecPtr location = PG_GETARG_LSN(0);
1433
1434 replorigin_check_prerequisites(true, false);
1435
1436 if (session_replication_state == NULL)
1437 ereport(ERROR,
1438 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1439 errmsg("no replication origin is configured")));
1440
1443
1445}
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:64

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and session_replication_state.

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1520 of file origin.c.

1521{
1522 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1523 int i;
1525
1526 /* we want to return 0 rows if slot is set to zero */
1527 replorigin_check_prerequisites(false, true);
1528
1529 InitMaterializedSRF(fcinfo, 0);
1530
1531 /* prevent slots from being concurrently dropped */
1532 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1533
1534 /*
1535 * Iterate through all possible replication_states, display if they are
1536 * filled. Note that we do not take any locks, so slightly corrupted/out
1537 * of date values are a possibility.
1538 */
1539 for (i = 0; i < max_replication_slots; i++)
1540 {
1544 char *roname;
1545
1547
1548 /* unused slot, nothing to display */
1549 if (state->roident == InvalidRepOriginId)
1550 continue;
1551
1552 memset(values, 0, sizeof(values));
1553 memset(nulls, 1, sizeof(nulls));
1554
1555 values[0] = ObjectIdGetDatum(state->roident);
1556 nulls[0] = false;
1557
1558 /*
1559 * We're not preventing the origin to be dropped concurrently, so
1560 * silently accept that it might be gone.
1561 */
1562 if (replorigin_by_oid(state->roident, true,
1563 &roname))
1564 {
1565 values[1] = CStringGetTextDatum(roname);
1566 nulls[1] = false;
1567 }
1568
1569 LWLockAcquire(&state->lock, LW_SHARED);
1570
1571 values[2] = LSNGetDatum(state->remote_lsn);
1572 nulls[2] = false;
1573
1574 values[3] = LSNGetDatum(state->local_lsn);
1575 nulls[3] = false;
1576
1577 LWLockRelease(&state->lock);
1578
1579 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1580 values, nulls);
1581 }
1582
1583 LWLockRelease(ReplicationOriginLock);
1584
1585#undef REPLICATION_ORIGIN_PROGRESS_COLS
1586
1587 return (Datum) 0;
1588}
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:469
#define REPLICATION_ORIGIN_PROGRESS_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
TupleDesc setDesc
Definition: execnodes.h:359
Tuplestorestate * setResult
Definition: execnodes.h:358
Definition: regguts.h:323
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784

References CStringGetTextDatum, i, InitMaterializedSRF(), InvalidRepOriginId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ObjectIdGetDatum(), REPLICATION_ORIGIN_PROGRESS_COLS, replication_states, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::setDesc, ReturnSetInfo::setResult, tuplestore_putvalues(), and values.

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1059 of file origin.c.

1060{
1061 ConditionVariable *cv = NULL;
1062
1063 if (session_replication_state == NULL)
1064 return;
1065
1066 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1067
1069 {
1071
1074 }
1075
1076 LWLockRelease(ReplicationOriginLock);
1077
1078 if (cv)
1080}
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:46
@ LW_EXCLUSIVE
Definition: lwlock.h:114
ConditionVariable origin_cv
Definition: origin.c:132

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, ReplicationState::origin_cv, and session_replication_state.

Referenced by replorigin_session_setup().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 530 of file origin.c.

531{
532 bool found;
533
534 if (max_replication_slots == 0)
535 return;
536
538 ShmemInitStruct("ReplicationOriginState",
540 &found);
542
543 if (!found)
544 {
545 int i;
546
548
550
551 for (i = 0; i < max_replication_slots; i++)
552 {
556 }
557 }
558}
#define MemSet(start, val, len)
Definition: c.h:991
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:718
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:190
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:175
Size ReplicationOriginShmemSize(void)
Definition: origin.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:155

References ConditionVariableInit(), i, LWLockInitialize(), LWTRANCHE_REPLICATION_ORIGIN_STATE, max_replication_slots, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 510 of file origin.c.

511{
512 Size size = 0;
513
514 /*
515 * XXX: max_replication_slots is arguably the wrong thing to use, as here
516 * we keep the replay state of *remote* transactions. But for now it seems
517 * sufficient to reuse it, rather than introduce a separate GUC.
518 */
519 if (max_replication_slots == 0)
520 return size;
521
522 size = add_size(size, offsetof(ReplicationStateCtl, states));
523
524 size = add_size(size,
526 return size;
527}
size_t Size
Definition: c.h:576
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505

References add_size(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 892 of file origin.c.

895{
896 int i;
897 ReplicationState *replication_state = NULL;
898 ReplicationState *free_state = NULL;
899
900 Assert(node != InvalidRepOriginId);
901
902 /* we don't track DoNotReplicateId */
903 if (node == DoNotReplicateId)
904 return;
905
906 /*
907 * XXX: For the case where this is called by WAL replay, it'd be more
908 * efficient to restore into a backend local hashtable and only dump into
909 * shmem after recovery is finished. Let's wait with implementing that
910 * till it's shown to be a measurable expense
911 */
912
913 /* Lock exclusively, as we may have to create a new table entry. */
914 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
915
916 /*
917 * Search for either an existing slot for the origin, or a free one we can
918 * use.
919 */
920 for (i = 0; i < max_replication_slots; i++)
921 {
923
924 /* remember where to insert if necessary */
925 if (curstate->roident == InvalidRepOriginId &&
926 free_state == NULL)
927 {
928 free_state = curstate;
929 continue;
930 }
931
932 /* not our slot */
933 if (curstate->roident != node)
934 {
935 continue;
936 }
937
938 /* ok, found slot */
939 replication_state = curstate;
940
941 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
942
943 /* Make sure it's not used by somebody else */
944 if (replication_state->acquired_by != 0)
945 {
947 (errcode(ERRCODE_OBJECT_IN_USE),
948 errmsg("replication origin with ID %d is already active for PID %d",
949 replication_state->roident,
950 replication_state->acquired_by)));
951 }
952
953 break;
954 }
955
956 if (replication_state == NULL && free_state == NULL)
958 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
959 errmsg("could not find free replication state slot for replication origin with ID %d",
960 node),
961 errhint("Increase \"max_replication_slots\" and try again.")));
962
963 if (replication_state == NULL)
964 {
965 /* initialize new slot */
966 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
967 replication_state = free_state;
968 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
969 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
970 replication_state->roident = node;
971 }
972
973 Assert(replication_state->roident != InvalidRepOriginId);
974
975 /*
976 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
977 * and the standby gets the message. Primarily this will be called during
978 * WAL replay (of commit records) where no WAL logging is necessary.
979 */
980 if (wal_log)
981 {
982 xl_replorigin_set xlrec;
983
984 xlrec.remote_lsn = remote_commit;
985 xlrec.node_id = node;
986 xlrec.force = go_backward;
987
989 XLogRegisterData(&xlrec, sizeof(xlrec));
990
991 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
992 }
993
994 /*
995 * Due to - harmless - race conditions during a checkpoint we could see
996 * values here that are older than the ones we already have in memory. We
997 * could also see older values for prepared transactions when the prepare
998 * is sent at a later point of time along with commit prepared and there
999 * are other transactions commits between prepare and commit prepared. See
1000 * ReorderBufferFinishPrepared. Don't overwrite those.
1001 */
1002 if (go_backward || replication_state->remote_lsn < remote_commit)
1003 replication_state->remote_lsn = remote_commit;
1004 if (local_commit != InvalidXLogRecPtr &&
1005 (go_backward || replication_state->local_lsn < local_commit))
1006 replication_state->local_lsn = local_commit;
1007 LWLockRelease(&replication_state->lock);
1008
1009 /*
1010 * Release *after* changing the LSNs, slot isn't acquired and thus could
1011 * otherwise be dropped anytime.
1012 */
1013 LWLockRelease(ReplicationOriginLock);
1014}
int errhint(const char *fmt,...)
Definition: elog.c:1317
#define DoNotReplicateId
Definition: origin.h:34
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:364
void XLogBeginInsert(void)
Definition: xloginsert.c:149

References ReplicationState::acquired_by, Assert(), DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, ReplicationState::remote_lsn, xl_replorigin_set::remote_lsn, replication_states, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( const char *  roname,
bool  missing_ok 
)

Definition at line 225 of file origin.c.

226{
228 Oid roident = InvalidOid;
229 HeapTuple tuple;
230 Datum roname_d;
231
232 roname_d = CStringGetTextDatum(roname);
233
234 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
235 if (HeapTupleIsValid(tuple))
236 {
238 roident = ident->roident;
239 ReleaseSysCache(tuple);
240 }
241 else if (!missing_ok)
243 (errcode(ERRCODE_UNDEFINED_OBJECT),
244 errmsg("replication origin \"%s\" does not exist",
245 roname)));
246
247 return roident;
248}
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define ident
Definition: indent_codes.h:47
FormData_pg_replication_origin * Form_pg_replication_origin
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 469 of file origin.c.

470{
471 HeapTuple tuple;
473
474 Assert(OidIsValid((Oid) roident));
475 Assert(roident != InvalidRepOriginId);
476 Assert(roident != DoNotReplicateId);
477
478 tuple = SearchSysCache1(REPLORIGIDENT,
479 ObjectIdGetDatum((Oid) roident));
480
481 if (HeapTupleIsValid(tuple))
482 {
484 *roname = text_to_cstring(&ric->roname);
485 ReleaseSysCache(tuple);
486
487 return true;
488 }
489 else
490 {
491 *roname = NULL;
492
493 if (!missing_ok)
495 (errcode(ERRCODE_UNDEFINED_OBJECT),
496 errmsg("replication origin with ID %d does not exist",
497 roident)));
498
499 return false;
500 }
501}

References Assert(), DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_check_prerequisites()

static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 189 of file origin.c.

190{
191 if (check_slots && max_replication_slots == 0)
193 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
194 errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0")));
195
196 if (!recoveryOK && RecoveryInProgress())
198 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
199 errmsg("cannot manipulate replication origins during recovery")));
200}
bool RecoveryInProgress(void)
Definition: xlog.c:6380

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, and RecoveryInProgress().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

◆ replorigin_create()

RepOriginId replorigin_create ( const char *  roname)

Definition at line 256 of file origin.c.

257{
258 Oid roident;
259 HeapTuple tuple = NULL;
260 Relation rel;
261 Datum roname_d;
262 SnapshotData SnapshotDirty;
263 SysScanDesc scan;
265
266 roname_d = CStringGetTextDatum(roname);
267
269
270 /*
271 * We need the numeric replication origin to be 16bit wide, so we cannot
272 * rely on the normal oid allocation. Instead we simply scan
273 * pg_replication_origin for the first unused id. That's not particularly
274 * efficient, but this should be a fairly infrequent operation - we can
275 * easily spend a bit more code on this when it turns out it needs to be
276 * faster.
277 *
278 * We handle concurrency by taking an exclusive lock (allowing reads!)
279 * over the table for the duration of the search. Because we use a "dirty
280 * snapshot" we can read rows that other in-progress sessions have
281 * written, even though they would be invisible with normal snapshots. Due
282 * to the exclusive lock there's no danger that new rows can appear while
283 * we're checking.
284 */
285 InitDirtySnapshot(SnapshotDirty);
286
287 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
288
289 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
290 {
291 bool nulls[Natts_pg_replication_origin];
292 Datum values[Natts_pg_replication_origin];
293 bool collides;
294
296
298 Anum_pg_replication_origin_roident,
299 BTEqualStrategyNumber, F_OIDEQ,
300 ObjectIdGetDatum(roident));
301
302 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
303 true /* indexOK */ ,
304 &SnapshotDirty,
305 1, &key);
306
307 collides = HeapTupleIsValid(systable_getnext(scan));
308
309 systable_endscan(scan);
310
311 if (!collides)
312 {
313 /*
314 * Ok, found an unused roident, insert the new row and do a CCI,
315 * so our callers can look it up if they want to.
316 */
317 memset(&nulls, 0, sizeof(nulls));
318
319 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
320 values[Anum_pg_replication_origin_roname - 1] = roname_d;
321
322 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
323 CatalogTupleInsert(rel, tuple);
325 break;
326 }
327 }
328
329 /* now release lock again, */
331
332 if (tuple == NULL)
334 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
335 errmsg("could not find free replication origin ID")));
336
337 heap_freetuple(tuple);
338 return roident;
339}
#define PG_UINT16_MAX
Definition: c.h:558
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
#define ExclusiveLock
Definition: lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define RelationGetDescr(relation)
Definition: rel.h:538
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:42
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool IsTransactionState(void)
Definition: xact.c:387
void CommandCounterIncrement(void)
Definition: xact.c:1100

References Assert(), BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum(), PG_UINT16_MAX, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char *  name,
bool  missing_ok,
bool  nowait 
)

Definition at line 415 of file origin.c.

416{
417 RepOriginId roident;
418 Relation rel;
419 HeapTuple tuple;
420
422
423 rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
424
425 roident = replorigin_by_name(name, missing_ok);
426
427 /* Lock the origin to prevent concurrent drops. */
428 LockSharedObject(ReplicationOriginRelationId, roident, 0,
430
431 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
432 if (!HeapTupleIsValid(tuple))
433 {
434 if (!missing_ok)
435 elog(ERROR, "cache lookup failed for replication origin with ID %d",
436 roident);
437
438 /*
439 * We don't need to retain the locks if the origin is already dropped.
440 */
441 UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
444 return;
445 }
446
447 replorigin_state_clear(roident, nowait);
448
449 /*
450 * Now, we can delete the catalog entry.
451 */
452 CatalogTupleDelete(rel, &tuple->t_self);
453 ReleaseSysCache(tuple);
454
456
457 /* We keep the lock on pg_replication_origin until commit */
458 table_close(rel, NoLock);
459}
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1082
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1142
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:345
ItemPointerData t_self
Definition: htup.h:65

References AccessExclusiveLock, Assert(), CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 1018 of file origin.c.

1019{
1020 int i;
1021 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1022 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1023
1024 /* prevent slots from being concurrently dropped */
1025 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1026
1027 for (i = 0; i < max_replication_slots; i++)
1028 {
1030
1032
1033 if (state->roident == node)
1034 {
1035 LWLockAcquire(&state->lock, LW_SHARED);
1036
1037 remote_lsn = state->remote_lsn;
1038 local_lsn = state->local_lsn;
1039
1040 LWLockRelease(&state->lock);
1041
1042 break;
1043 }
1044 }
1045
1046 LWLockRelease(ReplicationOriginLock);
1047
1048 if (flush && local_lsn != InvalidXLogRecPtr)
1049 XLogFlush(local_lsn);
1050
1051 return remote_lsn;
1052}

References i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, replication_states, and XLogFlush().

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 831 of file origin.c.

832{
833 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
834
835 switch (info)
836 {
838 {
839 xl_replorigin_set *xlrec =
841
843 xlrec->remote_lsn, record->EndRecPtr,
844 xlrec->force /* backward */ ,
845 false /* WAL log */ );
846 break;
847 }
849 {
850 xl_replorigin_drop *xlrec;
851 int i;
852
853 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
854
855 for (i = 0; i < max_replication_slots; i++)
856 {
858
859 /* found our slot */
860 if (state->roident == xlrec->node_id)
861 {
862 /* reset entry */
863 state->roident = InvalidRepOriginId;
864 state->remote_lsn = InvalidXLogRecPtr;
865 state->local_lsn = InvalidXLogRecPtr;
866 break;
867 }
868 }
869 break;
870 }
871 default:
872 elog(PANIC, "replorigin_redo: unknown op code %u", info);
873 }
874}
uint8_t uint8
Definition: c.h:500
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
RepOriginId node_id
Definition: origin.h:27
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1194 of file origin.c.

1195{
1197
1199
1200 if (session_replication_state == NULL)
1201 ereport(ERROR,
1202 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1203 errmsg("no replication origin is configured")));
1204
1205 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1206
1210
1211 LWLockRelease(ReplicationOriginLock);
1212
1214}

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::origin_cv, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and process_syncing_tables_for_sync().

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node,
int  acquired_by 
)

Definition at line 1101 of file origin.c.

1102{
1103 static bool registered_cleanup;
1104 int i;
1105 int free_slot = -1;
1106
1107 if (!registered_cleanup)
1108 {
1110 registered_cleanup = true;
1111 }
1112
1114
1115 if (session_replication_state != NULL)
1116 ereport(ERROR,
1117 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1118 errmsg("cannot setup replication origin when one is already setup")));
1119
1120 /* Lock exclusively, as we may have to create a new table entry. */
1121 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1122
1123 /*
1124 * Search for either an existing slot for the origin, or a free one we can
1125 * use.
1126 */
1127 for (i = 0; i < max_replication_slots; i++)
1128 {
1130
1131 /* remember where to insert if necessary */
1132 if (curstate->roident == InvalidRepOriginId &&
1133 free_slot == -1)
1134 {
1135 free_slot = i;
1136 continue;
1137 }
1138
1139 /* not our slot */
1140 if (curstate->roident != node)
1141 continue;
1142
1143 else if (curstate->acquired_by != 0 && acquired_by == 0)
1144 {
1145 ereport(ERROR,
1146 (errcode(ERRCODE_OBJECT_IN_USE),
1147 errmsg("replication origin with ID %d is already active for PID %d",
1148 curstate->roident, curstate->acquired_by)));
1149 }
1150
1151 /* ok, found slot */
1152 session_replication_state = curstate;
1153 break;
1154 }
1155
1156
1157 if (session_replication_state == NULL && free_slot == -1)
1158 ereport(ERROR,
1159 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1160 errmsg("could not find free replication state slot for replication origin with ID %d",
1161 node),
1162 errhint("Increase \"max_replication_slots\" and try again.")));
1163 else if (session_replication_state == NULL)
1164 {
1165 /* initialize new slot */
1170 }
1171
1172
1174
1175 if (acquired_by == 0)
1177 else if (session_replication_state->acquired_by != acquired_by)
1178 elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1179 node, acquired_by);
1180
1181 LWLockRelease(ReplicationOriginLock);
1182
1183 /* probably this one is pointless */
1185}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1059

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), elog, ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, and session_replication_state.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_state_clear()

static void replorigin_state_clear ( RepOriginId  roident,
bool  nowait 
)
static

Definition at line 345 of file origin.c.

346{
347 int i;
348
349 /*
350 * Clean up the slot state info, if there is any matching slot.
351 */
352restart:
353 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
354
355 for (i = 0; i < max_replication_slots; i++)
356 {
358
359 if (state->roident == roident)
360 {
361 /* found our slot, is it busy? */
362 if (state->acquired_by != 0)
363 {
365
366 if (nowait)
368 (errcode(ERRCODE_OBJECT_IN_USE),
369 errmsg("could not drop replication origin with ID %d, in use by PID %d",
370 state->roident,
371 state->acquired_by)));
372
373 /*
374 * We must wait and then retry. Since we don't know which CV
375 * to wait on until here, we can't readily use
376 * ConditionVariablePrepareToSleep (calling it here would be
377 * wrong, since we could miss the signal if we did so); just
378 * use ConditionVariableSleep directly.
379 */
380 cv = &state->origin_cv;
381
382 LWLockRelease(ReplicationOriginLock);
383
384 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
385 goto restart;
386 }
387
388 /* first make a WAL log entry */
389 {
390 xl_replorigin_drop xlrec;
391
392 xlrec.node_id = roident;
394 XLogRegisterData(&xlrec, sizeof(xlrec));
395 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
396 }
397
398 /* then clear the in-memory slot */
399 state->roident = InvalidRepOriginId;
400 state->remote_lsn = InvalidXLogRecPtr;
401 state->local_lsn = InvalidXLogRecPtr;
402 break;
403 }
404 }
405 LWLockRelease(ReplicationOriginLock);
407}
bool ConditionVariableCancelSleep(void)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)

References ConditionVariableCancelSleep(), ConditionVariableSleep(), ereport, errcode(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, replication_states, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 703 of file origin.c.

704{
705 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
706 int fd;
707 int readBytes;
709 int last_state = 0;
710 pg_crc32c file_crc;
712
713 /* don't want to overwrite already existing state */
714#ifdef USE_ASSERT_CHECKING
715 static bool already_started = false;
716
717 Assert(!already_started);
718 already_started = true;
719#endif
720
721 if (max_replication_slots == 0)
722 return;
723
725
726 elog(DEBUG2, "starting up replication origin progress state");
727
728 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
729
730 /*
731 * might have had max_replication_slots == 0 last run, or we just brought
732 * up a standby.
733 */
734 if (fd < 0 && errno == ENOENT)
735 return;
736 else if (fd < 0)
739 errmsg("could not open file \"%s\": %m",
740 path)));
741
742 /* verify magic, that is written even if nothing was active */
743 readBytes = read(fd, &magic, sizeof(magic));
744 if (readBytes != sizeof(magic))
745 {
746 if (readBytes < 0)
749 errmsg("could not read file \"%s\": %m",
750 path)));
751 else
754 errmsg("could not read file \"%s\": read %d of %zu",
755 path, readBytes, sizeof(magic))));
756 }
757 COMP_CRC32C(crc, &magic, sizeof(magic));
758
759 if (magic != REPLICATION_STATE_MAGIC)
761 (errmsg("replication checkpoint has wrong magic %u instead of %u",
762 magic, REPLICATION_STATE_MAGIC)));
763
764 /* we can skip locking here, no other access is possible */
765
766 /* recover individual states, until there are no more to be found */
767 while (true)
768 {
769 ReplicationStateOnDisk disk_state;
770
771 readBytes = read(fd, &disk_state, sizeof(disk_state));
772
773 /* no further data */
774 if (readBytes == sizeof(crc))
775 {
776 /* not pretty, but simple ... */
777 file_crc = *(pg_crc32c *) &disk_state;
778 break;
779 }
780
781 if (readBytes < 0)
782 {
785 errmsg("could not read file \"%s\": %m",
786 path)));
787 }
788
789 if (readBytes != sizeof(disk_state))
790 {
793 errmsg("could not read file \"%s\": read %d of %zu",
794 path, readBytes, sizeof(disk_state))));
795 }
796
797 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
798
799 if (last_state == max_replication_slots)
801 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
802 errmsg("could not find free replication state, increase \"max_replication_slots\"")));
803
804 /* copy data to shared memory */
805 replication_states[last_state].roident = disk_state.roident;
806 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
807 last_state++;
808
809 ereport(LOG,
810 (errmsg("recovered replication state of node %d to %X/%X",
811 disk_state.roident,
812 LSN_FORMAT_ARGS(disk_state.remote_lsn))));
813 }
814
815 /* now check checksum */
817 if (file_crc != crc)
820 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
821 crc, file_crc)));
822
823 if (CloseTransientFile(fd) != 0)
826 errmsg("could not close file \"%s\": %m",
827 path)));
828}
#define LOG
Definition: elog.h:31
#define DEBUG2
Definition: elog.h:29
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References Assert(), CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ replication_states

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 175 of file origin.c.

Referenced by ReplicationOriginShmemInit().

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp

◆ session_replication_state