PostgreSQL Source Code  git master
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
static bool IsReservedOriginName (const char *name)
 
RepOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId replorigin_create (const char *roname)
 
static void replorigin_state_clear (RepOriginId roident, bool nowait)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 183 of file origin.c.

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 574 of file origin.c.

575 {
576  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
577  const char *path = "pg_logical/replorigin_checkpoint";
578  int tmpfd;
579  int i;
581  pg_crc32c crc;
582 
583  if (max_replication_slots == 0)
584  return;
585 
586  INIT_CRC32C(crc);
587 
588  /* make sure no old temp file is remaining */
589  if (unlink(tmppath) < 0 && errno != ENOENT)
590  ereport(PANIC,
592  errmsg("could not remove file \"%s\": %m",
593  tmppath)));
594 
595  /*
596  * no other backend can perform this at the same time; only one checkpoint
597  * can happen at a time.
598  */
599  tmpfd = OpenTransientFile(tmppath,
600  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
601  if (tmpfd < 0)
602  ereport(PANIC,
604  errmsg("could not create file \"%s\": %m",
605  tmppath)));
606 
607  /* write magic */
608  errno = 0;
609  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
610  {
611  /* if write didn't set errno, assume problem is no disk space */
612  if (errno == 0)
613  errno = ENOSPC;
614  ereport(PANIC,
616  errmsg("could not write to file \"%s\": %m",
617  tmppath)));
618  }
619  COMP_CRC32C(crc, &magic, sizeof(magic));
620 
621  /* prevent concurrent creations/drops */
622  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
623 
624  /* write actual data */
625  for (i = 0; i < max_replication_slots; i++)
626  {
627  ReplicationStateOnDisk disk_state;
628  ReplicationState *curstate = &replication_states[i];
629  XLogRecPtr local_lsn;
630 
631  if (curstate->roident == InvalidRepOriginId)
632  continue;
633 
634  /* zero, to avoid uninitialized padding bytes */
635  memset(&disk_state, 0, sizeof(disk_state));
636 
637  LWLockAcquire(&curstate->lock, LW_SHARED);
638 
639  disk_state.roident = curstate->roident;
640 
641  disk_state.remote_lsn = curstate->remote_lsn;
642  local_lsn = curstate->local_lsn;
643 
644  LWLockRelease(&curstate->lock);
645 
646  /* make sure we only write out a commit that's persistent */
647  XLogFlush(local_lsn);
648 
649  errno = 0;
650  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
651  sizeof(disk_state))
652  {
653  /* if write didn't set errno, assume problem is no disk space */
654  if (errno == 0)
655  errno = ENOSPC;
656  ereport(PANIC,
658  errmsg("could not write to file \"%s\": %m",
659  tmppath)));
660  }
661 
662  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
663  }
664 
665  LWLockRelease(ReplicationOriginLock);
666 
667  /* write out the CRC */
668  FIN_CRC32C(crc);
669  errno = 0;
670  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
671  {
672  /* if write didn't set errno, assume problem is no disk space */
673  if (errno == 0)
674  errno = ENOSPC;
675  ereport(PANIC,
677  errmsg("could not write to file \"%s\": %m",
678  tmppath)));
679  }
680 
681  if (CloseTransientFile(tmpfd) != 0)
682  ereport(PANIC,
684  errmsg("could not close file \"%s\": %m",
685  tmppath)));
686 
687  /* fsync, rename to permanent file, fsync file and directory */
688  durable_rename(tmppath, path, PANIC);
689 }
unsigned int uint32
Definition: c.h:495
#define PG_BINARY
Definition: c.h:1262
int errcode_for_file_access(void)
Definition: elog.c:883
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define PANIC
Definition: elog.h:42
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
int CloseTransientFile(int fd)
Definition: fd.c:2809
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:73
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:117
static ReplicationState * replication_states
Definition: origin.c:167
#define REPLICATION_STATE_MAGIC
Definition: origin.c:183
#define InvalidRepOriginId
Definition: origin.h:33
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
return crc
int max_replication_slots
Definition: slot.c:103
XLogRecPtr remote_lsn
Definition: origin.c:143
RepOriginId roident
Definition: origin.c:142
XLogRecPtr remote_lsn
Definition: origin.c:112
XLogRecPtr local_lsn
Definition: origin.c:119
RepOriginId roident
Definition: origin.c:107
LWLock lock
Definition: origin.c:134
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2733
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ IsReservedOriginName()

static bool IsReservedOriginName ( const char *  name)
static

Definition at line 205 of file origin.c.

206 {
207  return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
209 }
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_ORIGIN_ANY
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
const char * name

References LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE, name, and pg_strcasecmp().

Referenced by pg_replication_origin_create().

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1457 of file origin.c.

1458 {
1459  text *name = PG_GETARG_TEXT_PP(0);
1460  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1461  RepOriginId node;
1462 
1463  replorigin_check_prerequisites(true, false);
1464 
1465  /* lock to prevent the replication origin from vanishing */
1466  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1467 
1468  node = replorigin_by_name(text_to_cstring(name), false);
1469 
1470  /*
1471  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1472  * xact hasn't committed yet. This is why this function should be used to
1473  * set up the initial replication state, but not for replay.
1474  */
1475  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1476  true /* go backward */ , true /* WAL log */ );
1477 
1478  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1479 
1480  PG_RETURN_VOID();
1481 }
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define RowExclusiveLock
Definition: lockdefs.h:38
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:222
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:186
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:889
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:33
Definition: c.h:676
char * text_to_cstring(const text *t)
Definition: varlena.c:217
uint16 RepOriginId
Definition: xlogdefs.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1270 of file origin.c.

1271 {
1272  char *name;
1273  RepOriginId roident;
1274 
1275  replorigin_check_prerequisites(false, false);
1276 
1278 
1279  /*
1280  * Replication origins "any and "none" are reserved for system options.
1281  * The origins "pg_xxx" are reserved for internal use.
1282  */
1284  ereport(ERROR,
1285  (errcode(ERRCODE_RESERVED_NAME),
1286  errmsg("replication origin name \"%s\" is reserved",
1287  name),
1288  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1290 
1291  /*
1292  * If built with appropriate switch, whine when regression-testing
1293  * conventions for replication origin names are violated.
1294  */
1295 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1296  if (strncmp(name, "regress_", 8) != 0)
1297  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1298 #endif
1299 
1300  roident = replorigin_create(name);
1301 
1302  pfree(name);
1303 
1304  PG_RETURN_OID(roident);
1305 }
bool IsReservedName(const char *name)
Definition: catalog.c:219
int errdetail(const char *fmt,...)
Definition: elog.c:1208
int errcode(int sqlerrcode)
Definition: elog.c:860
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
void pfree(void *pointer)
Definition: mcxt.c:1431
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:253
static bool IsReservedOriginName(const char *name)
Definition: origin.c:205
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312

References DatumGetPointer(), elog(), ereport, errcode(), errdetail(), errmsg(), ERROR, IsReservedName(), IsReservedOriginName(), LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE, name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), text_to_cstring(), and WARNING.

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1311 of file origin.c.

1312 {
1313  char *name;
1314 
1315  replorigin_check_prerequisites(false, false);
1316 
1318 
1319  replorigin_drop_by_name(name, false, true);
1320 
1321  pfree(name);
1322 
1323  PG_RETURN_VOID();
1324 }
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:412

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1330 of file origin.c.

1331 {
1332  char *name;
1333  RepOriginId roident;
1334 
1335  replorigin_check_prerequisites(false, false);
1336 
1338  roident = replorigin_by_name(name, true);
1339 
1340  pfree(name);
1341 
1342  if (OidIsValid(roident))
1343  PG_RETURN_OID(roident);
1344  PG_RETURN_NULL();
1345 }
#define OidIsValid(objectId)
Definition: c.h:764
#define PG_RETURN_NULL()
Definition: fmgr.h:345

References DatumGetPointer(), name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1492 of file origin.c.

1493 {
1494  char *name;
1495  bool flush;
1496  RepOriginId roident;
1497  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1498 
1499  replorigin_check_prerequisites(true, true);
1500 
1502  flush = PG_GETARG_BOOL(1);
1503 
1504  roident = replorigin_by_name(name, false);
1505  Assert(OidIsValid(roident));
1506 
1507  remote_lsn = replorigin_get_progress(roident, flush);
1508 
1509  if (remote_lsn == InvalidXLogRecPtr)
1510  PG_RETURN_NULL();
1511 
1512  PG_RETURN_LSN(remote_lsn);
1513 }
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
Assert(fmt[strlen(fmt) - 1] !='\n')
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1015
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:34

References Assert(), DatumGetPointer(), InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), and text_to_cstring().

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1390 of file origin.c.

1391 {
1392  replorigin_check_prerequisites(false, false);
1393 
1395 }
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
RepOriginId replorigin_session_origin
Definition: origin.c:156

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1406 of file origin.c.

1407 {
1408  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1409  bool flush = PG_GETARG_BOOL(0);
1410 
1411  replorigin_check_prerequisites(true, false);
1412 
1413  if (session_replication_state == NULL)
1414  ereport(ERROR,
1415  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1416  errmsg("no replication origin is configured")));
1417 
1418  remote_lsn = replorigin_session_get_progress(flush);
1419 
1420  if (remote_lsn == InvalidXLogRecPtr)
1421  PG_RETURN_NULL();
1422 
1423  PG_RETURN_LSN(remote_lsn);
1424 }
static ReplicationState * session_replication_state
Definition: origin.c:180
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1238

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), replorigin_session_get_progress(), and session_replication_state.

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1351 of file origin.c.

1352 {
1353  char *name;
1354  RepOriginId origin;
1355 
1356  replorigin_check_prerequisites(true, false);
1357 
1359  origin = replorigin_by_name(name, false);
1360  replorigin_session_setup(origin, 0);
1361 
1362  replorigin_session_origin = origin;
1363 
1364  pfree(name);
1365 
1366  PG_RETURN_VOID();
1367 }
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1098

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

◆ pg_replication_origin_xact_reset()

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1427 of file origin.c.

1428 {
1429  XLogRecPtr location = PG_GETARG_LSN(0);
1430 
1431  replorigin_check_prerequisites(true, false);
1432 
1433  if (session_replication_state == NULL)
1434  ereport(ERROR,
1435  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1436  errmsg("no replication origin is configured")));
1437 
1438  replorigin_session_origin_lsn = location;
1440 
1441  PG_RETURN_VOID();
1442 }
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:64

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and session_replication_state.

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1517 of file origin.c.

1518 {
1519  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1520  int i;
1522 
1523  /* we want to return 0 rows if slot is set to zero */
1524  replorigin_check_prerequisites(false, true);
1525 
1526  InitMaterializedSRF(fcinfo, 0);
1527 
1528  /* prevent slots from being concurrently dropped */
1529  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1530 
1531  /*
1532  * Iterate through all possible replication_states, display if they are
1533  * filled. Note that we do not take any locks, so slightly corrupted/out
1534  * of date values are a possibility.
1535  */
1536  for (i = 0; i < max_replication_slots; i++)
1537  {
1541  char *roname;
1542 
1544 
1545  /* unused slot, nothing to display */
1546  if (state->roident == InvalidRepOriginId)
1547  continue;
1548 
1549  memset(values, 0, sizeof(values));
1550  memset(nulls, 1, sizeof(nulls));
1551 
1552  values[0] = ObjectIdGetDatum(state->roident);
1553  nulls[0] = false;
1554 
1555  /*
1556  * We're not preventing the origin to be dropped concurrently, so
1557  * silently accept that it might be gone.
1558  */
1559  if (replorigin_by_oid(state->roident, true,
1560  &roname))
1561  {
1562  values[1] = CStringGetTextDatum(roname);
1563  nulls[1] = false;
1564  }
1565 
1566  LWLockAcquire(&state->lock, LW_SHARED);
1567 
1568  values[2] = LSNGetDatum(state->remote_lsn);
1569  nulls[2] = false;
1570 
1571  values[3] = LSNGetDatum(state->local_lsn);
1572  nulls[3] = false;
1573 
1574  LWLockRelease(&state->lock);
1575 
1576  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1577  values, nulls);
1578  }
1579 
1580  LWLockRelease(ReplicationOriginLock);
1581 
1582 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1583 
1584  return (Datum) 0;
1585 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:97
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:466
#define REPLICATION_ORIGIN_PROGRESS_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
Definition: regguts.h:323
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750

References CStringGetTextDatum, i, InitMaterializedSRF(), InvalidRepOriginId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ObjectIdGetDatum(), REPLICATION_ORIGIN_PROGRESS_COLS, replication_states, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::setDesc, ReturnSetInfo::setResult, tuplestore_putvalues(), and values.

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1056 of file origin.c.

1057 {
1058  ConditionVariable *cv = NULL;
1059 
1060  if (session_replication_state == NULL)
1061  return;
1062 
1063  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1064 
1066  {
1068 
1071  }
1072 
1073  LWLockRelease(ReplicationOriginLock);
1074 
1075  if (cv)
1077 }
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:45
@ LW_EXCLUSIVE
Definition: lwlock.h:116
ConditionVariable origin_cv
Definition: origin.c:129

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, ReplicationState::origin_cv, and session_replication_state.

Referenced by replorigin_session_setup().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 527 of file origin.c.

528 {
529  bool found;
530 
531  if (max_replication_slots == 0)
532  return;
533 
535  ShmemInitStruct("ReplicationOriginState",
537  &found);
539 
540  if (!found)
541  {
542  int i;
543 
545 
547 
548  for (i = 0; i < max_replication_slots; i++)
549  {
553  }
554  }
555 }
#define MemSet(start, val, len)
Definition: c.h:1009
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:703
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:190
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:172
Size ReplicationOriginShmemSize(void)
Definition: origin.c:507
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:152

References ConditionVariableInit(), i, LWLockInitialize(), LWTRANCHE_REPLICATION_ORIGIN_STATE, max_replication_slots, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 507 of file origin.c.

508 {
509  Size size = 0;
510 
511  /*
512  * XXX: max_replication_slots is arguably the wrong thing to use, as here
513  * we keep the replay state of *remote* transactions. But for now it seems
514  * sufficient to reuse it, rather than introduce a separate GUC.
515  */
516  if (max_replication_slots == 0)
517  return size;
518 
519  size = add_size(size, offsetof(ReplicationStateCtl, states));
520 
521  size = add_size(size,
523  return size;
524 }
size_t Size
Definition: c.h:594
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511

References add_size(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 889 of file origin.c.

892 {
893  int i;
894  ReplicationState *replication_state = NULL;
895  ReplicationState *free_state = NULL;
896 
897  Assert(node != InvalidRepOriginId);
898 
899  /* we don't track DoNotReplicateId */
900  if (node == DoNotReplicateId)
901  return;
902 
903  /*
904  * XXX: For the case where this is called by WAL replay, it'd be more
905  * efficient to restore into a backend local hashtable and only dump into
906  * shmem after recovery is finished. Let's wait with implementing that
907  * till it's shown to be a measurable expense
908  */
909 
910  /* Lock exclusively, as we may have to create a new table entry. */
911  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
912 
913  /*
914  * Search for either an existing slot for the origin, or a free one we can
915  * use.
916  */
917  for (i = 0; i < max_replication_slots; i++)
918  {
919  ReplicationState *curstate = &replication_states[i];
920 
921  /* remember where to insert if necessary */
922  if (curstate->roident == InvalidRepOriginId &&
923  free_state == NULL)
924  {
925  free_state = curstate;
926  continue;
927  }
928 
929  /* not our slot */
930  if (curstate->roident != node)
931  {
932  continue;
933  }
934 
935  /* ok, found slot */
936  replication_state = curstate;
937 
938  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
939 
940  /* Make sure it's not used by somebody else */
941  if (replication_state->acquired_by != 0)
942  {
943  ereport(ERROR,
944  (errcode(ERRCODE_OBJECT_IN_USE),
945  errmsg("replication origin with ID %d is already active for PID %d",
946  replication_state->roident,
947  replication_state->acquired_by)));
948  }
949 
950  break;
951  }
952 
953  if (replication_state == NULL && free_state == NULL)
954  ereport(ERROR,
955  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
956  errmsg("could not find free replication state slot for replication origin with ID %d",
957  node),
958  errhint("Increase max_replication_slots and try again.")));
959 
960  if (replication_state == NULL)
961  {
962  /* initialize new slot */
963  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
964  replication_state = free_state;
965  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
966  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
967  replication_state->roident = node;
968  }
969 
970  Assert(replication_state->roident != InvalidRepOriginId);
971 
972  /*
973  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
974  * and the standby gets the message. Primarily this will be called during
975  * WAL replay (of commit records) where no WAL logging is necessary.
976  */
977  if (wal_log)
978  {
979  xl_replorigin_set xlrec;
980 
981  xlrec.remote_lsn = remote_commit;
982  xlrec.node_id = node;
983  xlrec.force = go_backward;
984 
985  XLogBeginInsert();
986  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
987 
988  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
989  }
990 
991  /*
992  * Due to - harmless - race conditions during a checkpoint we could see
993  * values here that are older than the ones we already have in memory. We
994  * could also see older values for prepared transactions when the prepare
995  * is sent at a later point of time along with commit prepared and there
996  * are other transactions commits between prepare and commit prepared. See
997  * ReorderBufferFinishPrepared. Don't overwrite those.
998  */
999  if (go_backward || replication_state->remote_lsn < remote_commit)
1000  replication_state->remote_lsn = remote_commit;
1001  if (local_commit != InvalidXLogRecPtr &&
1002  (go_backward || replication_state->local_lsn < local_commit))
1003  replication_state->local_lsn = local_commit;
1004  LWLockRelease(&replication_state->lock);
1005 
1006  /*
1007  * Release *after* changing the LSNs, slot isn't acquired and thus could
1008  * otherwise be dropped anytime.
1009  */
1010  LWLockRelease(ReplicationOriginLock);
1011 }
int errhint(const char *fmt,...)
Definition: elog.c:1322
#define DoNotReplicateId
Definition: origin.h:34
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:365
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:475
void XLogBeginInsert(void)
Definition: xloginsert.c:150

References ReplicationState::acquired_by, Assert(), DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, ReplicationState::remote_lsn, xl_replorigin_set::remote_lsn, replication_states, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( const char *  roname,
bool  missing_ok 
)

Definition at line 222 of file origin.c.

223 {
225  Oid roident = InvalidOid;
226  HeapTuple tuple;
227  Datum roname_d;
228 
229  roname_d = CStringGetTextDatum(roname);
230 
231  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
232  if (HeapTupleIsValid(tuple))
233  {
235  roident = ident->roident;
236  ReleaseSysCache(tuple);
237  }
238  else if (!missing_ok)
239  ereport(ERROR,
240  (errcode(ERRCODE_UNDEFINED_OBJECT),
241  errmsg("replication origin \"%s\" does not exist",
242  roname)));
243 
244  return roident;
245 }
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define ident
Definition: indent_codes.h:47
FormData_pg_replication_origin * Form_pg_replication_origin
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:267
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:219

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 466 of file origin.c.

467 {
468  HeapTuple tuple;
470 
471  Assert(OidIsValid((Oid) roident));
472  Assert(roident != InvalidRepOriginId);
473  Assert(roident != DoNotReplicateId);
474 
475  tuple = SearchSysCache1(REPLORIGIDENT,
476  ObjectIdGetDatum((Oid) roident));
477 
478  if (HeapTupleIsValid(tuple))
479  {
480  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
481  *roname = text_to_cstring(&ric->roname);
482  ReleaseSysCache(tuple);
483 
484  return true;
485  }
486  else
487  {
488  *roname = NULL;
489 
490  if (!missing_ok)
491  ereport(ERROR,
492  (errcode(ERRCODE_UNDEFINED_OBJECT),
493  errmsg("replication origin with ID %d does not exist",
494  roident)));
495 
496  return false;
497  }
498 }

References Assert(), DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_check_prerequisites()

static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 186 of file origin.c.

187 {
188  if (check_slots && max_replication_slots == 0)
189  ereport(ERROR,
190  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
191  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
192 
193  if (!recoveryOK && RecoveryInProgress())
194  ereport(ERROR,
195  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
196  errmsg("cannot manipulate replication origins during recovery")));
197 }
bool RecoveryInProgress(void)
Definition: xlog.c:6211

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, and RecoveryInProgress().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

◆ replorigin_create()

RepOriginId replorigin_create ( const char *  roname)

Definition at line 253 of file origin.c.

254 {
255  Oid roident;
256  HeapTuple tuple = NULL;
257  Relation rel;
258  Datum roname_d;
259  SnapshotData SnapshotDirty;
260  SysScanDesc scan;
262 
263  roname_d = CStringGetTextDatum(roname);
264 
266 
267  /*
268  * We need the numeric replication origin to be 16bit wide, so we cannot
269  * rely on the normal oid allocation. Instead we simply scan
270  * pg_replication_origin for the first unused id. That's not particularly
271  * efficient, but this should be a fairly infrequent operation - we can
272  * easily spend a bit more code on this when it turns out it needs to be
273  * faster.
274  *
275  * We handle concurrency by taking an exclusive lock (allowing reads!)
276  * over the table for the duration of the search. Because we use a "dirty
277  * snapshot" we can read rows that other in-progress sessions have
278  * written, even though they would be invisible with normal snapshots. Due
279  * to the exclusive lock there's no danger that new rows can appear while
280  * we're checking.
281  */
282  InitDirtySnapshot(SnapshotDirty);
283 
284  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
285 
286  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
287  {
288  bool nulls[Natts_pg_replication_origin];
289  Datum values[Natts_pg_replication_origin];
290  bool collides;
291 
293 
294  ScanKeyInit(&key,
295  Anum_pg_replication_origin_roident,
296  BTEqualStrategyNumber, F_OIDEQ,
297  ObjectIdGetDatum(roident));
298 
299  scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
300  true /* indexOK */ ,
301  &SnapshotDirty,
302  1, &key);
303 
304  collides = HeapTupleIsValid(systable_getnext(scan));
305 
306  systable_endscan(scan);
307 
308  if (!collides)
309  {
310  /*
311  * Ok, found an unused roident, insert the new row and do a CCI,
312  * so our callers can look it up if they want to.
313  */
314  memset(&nulls, 0, sizeof(nulls));
315 
316  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
317  values[Anum_pg_replication_origin_roname - 1] = roname_d;
318 
319  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
320  CatalogTupleInsert(rel, tuple);
322  break;
323  }
324  }
325 
326  /* now release lock again, */
328 
329  if (tuple == NULL)
330  ereport(ERROR,
331  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
332  errmsg("could not find free replication origin ID")));
333 
334  heap_freetuple(tuple);
335  return roident;
336 }
#define PG_UINT16_MAX
Definition: c.h:576
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:599
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:506
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
#define ExclusiveLock
Definition: lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define RelationGetDescr(relation)
Definition: rel.h:530
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:40
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool IsTransactionState(void)
Definition: xact.c:378
void CommandCounterIncrement(void)
Definition: xact.c:1078

References Assert(), BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum(), PG_UINT16_MAX, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char *  name,
bool  missing_ok,
bool  nowait 
)

Definition at line 412 of file origin.c.

413 {
414  RepOriginId roident;
415  Relation rel;
416  HeapTuple tuple;
417 
419 
420  rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
421 
422  roident = replorigin_by_name(name, missing_ok);
423 
424  /* Lock the origin to prevent concurrent drops. */
425  LockSharedObject(ReplicationOriginRelationId, roident, 0,
427 
428  tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
429  if (!HeapTupleIsValid(tuple))
430  {
431  if (!missing_ok)
432  elog(ERROR, "cache lookup failed for replication origin with ID %d",
433  roident);
434 
435  /*
436  * We don't need to retain the locks if the origin is already dropped.
437  */
438  UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
441  return;
442  }
443 
444  replorigin_state_clear(roident, nowait);
445 
446  /*
447  * Now, we can delete the catalog entry.
448  */
449  CatalogTupleDelete(rel, &tuple->t_self);
450  ReleaseSysCache(tuple);
451 
453 
454  /* We keep the lock on pg_replication_origin until commit */
455  table_close(rel, NoLock);
456 }
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1105
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:342
ItemPointerData t_self
Definition: htup.h:65

References AccessExclusiveLock, Assert(), CatalogTupleDelete(), CommandCounterIncrement(), elog(), ERROR, HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 1015 of file origin.c.

1016 {
1017  int i;
1018  XLogRecPtr local_lsn = InvalidXLogRecPtr;
1019  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1020 
1021  /* prevent slots from being concurrently dropped */
1022  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1023 
1024  for (i = 0; i < max_replication_slots; i++)
1025  {
1027 
1029 
1030  if (state->roident == node)
1031  {
1032  LWLockAcquire(&state->lock, LW_SHARED);
1033 
1034  remote_lsn = state->remote_lsn;
1035  local_lsn = state->local_lsn;
1036 
1037  LWLockRelease(&state->lock);
1038 
1039  break;
1040  }
1041  }
1042 
1043  LWLockRelease(ReplicationOriginLock);
1044 
1045  if (flush && local_lsn != InvalidXLogRecPtr)
1046  XLogFlush(local_lsn);
1047 
1048  return remote_lsn;
1049 }

References i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, replication_states, and XLogFlush().

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 828 of file origin.c.

829 {
830  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
831 
832  switch (info)
833  {
834  case XLOG_REPLORIGIN_SET:
835  {
836  xl_replorigin_set *xlrec =
837  (xl_replorigin_set *) XLogRecGetData(record);
838 
840  xlrec->remote_lsn, record->EndRecPtr,
841  xlrec->force /* backward */ ,
842  false /* WAL log */ );
843  break;
844  }
846  {
847  xl_replorigin_drop *xlrec;
848  int i;
849 
850  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
851 
852  for (i = 0; i < max_replication_slots; i++)
853  {
855 
856  /* found our slot */
857  if (state->roident == xlrec->node_id)
858  {
859  /* reset entry */
860  state->roident = InvalidRepOriginId;
861  state->remote_lsn = InvalidXLogRecPtr;
862  state->local_lsn = InvalidXLogRecPtr;
863  break;
864  }
865  }
866  break;
867  }
868  default:
869  elog(PANIC, "replorigin_redo: unknown op code %u", info);
870  }
871 }
unsigned char uint8
Definition: c.h:493
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
RepOriginId node_id
Definition: origin.h:27
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References elog(), XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1238 of file origin.c.

1239 {
1240  XLogRecPtr remote_lsn;
1241  XLogRecPtr local_lsn;
1242 
1244 
1246  remote_lsn = session_replication_state->remote_lsn;
1247  local_lsn = session_replication_state->local_lsn;
1249 
1250  if (flush && local_lsn != InvalidXLogRecPtr)
1251  XLogFlush(local_lsn);
1252 
1253  return remote_lsn;
1254 }

References Assert(), InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, session_replication_state, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), pg_replication_origin_session_progress(), and run_apply_worker().

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1191 of file origin.c.

1192 {
1193  ConditionVariable *cv;
1194 
1196 
1197  if (session_replication_state == NULL)
1198  ereport(ERROR,
1199  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1200  errmsg("no replication origin is configured")));
1201 
1202  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1203 
1207 
1208  LWLockRelease(ReplicationOriginLock);
1209 
1211 }

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::origin_cv, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and process_syncing_tables_for_sync().

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node,
int  acquired_by 
)

Definition at line 1098 of file origin.c.

1099 {
1100  static bool registered_cleanup;
1101  int i;
1102  int free_slot = -1;
1103 
1104  if (!registered_cleanup)
1105  {
1107  registered_cleanup = true;
1108  }
1109 
1111 
1112  if (session_replication_state != NULL)
1113  ereport(ERROR,
1114  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1115  errmsg("cannot setup replication origin when one is already setup")));
1116 
1117  /* Lock exclusively, as we may have to create a new table entry. */
1118  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1119 
1120  /*
1121  * Search for either an existing slot for the origin, or a free one we can
1122  * use.
1123  */
1124  for (i = 0; i < max_replication_slots; i++)
1125  {
1126  ReplicationState *curstate = &replication_states[i];
1127 
1128  /* remember where to insert if necessary */
1129  if (curstate->roident == InvalidRepOriginId &&
1130  free_slot == -1)
1131  {
1132  free_slot = i;
1133  continue;
1134  }
1135 
1136  /* not our slot */
1137  if (curstate->roident != node)
1138  continue;
1139 
1140  else if (curstate->acquired_by != 0 && acquired_by == 0)
1141  {
1142  ereport(ERROR,
1143  (errcode(ERRCODE_OBJECT_IN_USE),
1144  errmsg("replication origin with ID %d is already active for PID %d",
1145  curstate->roident, curstate->acquired_by)));
1146  }
1147 
1148  /* ok, found slot */
1149  session_replication_state = curstate;
1150  break;
1151  }
1152 
1153 
1154  if (session_replication_state == NULL && free_slot == -1)
1155  ereport(ERROR,
1156  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1157  errmsg("could not find free replication state slot for replication origin with ID %d",
1158  node),
1159  errhint("Increase max_replication_slots and try again.")));
1160  else if (session_replication_state == NULL)
1161  {
1162  /* initialize new slot */
1167  }
1168 
1169 
1171 
1172  if (acquired_by == 0)
1174  else if (session_replication_state->acquired_by != acquired_by)
1175  elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1176  node, acquired_by);
1177 
1178  LWLockRelease(ReplicationOriginLock);
1179 
1180  /* probably this one is pointless */
1182 }
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1056

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), elog(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, and session_replication_state.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_state_clear()

static void replorigin_state_clear ( RepOriginId  roident,
bool  nowait 
)
static

Definition at line 342 of file origin.c.

343 {
344  int i;
345 
346  /*
347  * Clean up the slot state info, if there is any matching slot.
348  */
349 restart:
350  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
351 
352  for (i = 0; i < max_replication_slots; i++)
353  {
355 
356  if (state->roident == roident)
357  {
358  /* found our slot, is it busy? */
359  if (state->acquired_by != 0)
360  {
361  ConditionVariable *cv;
362 
363  if (nowait)
364  ereport(ERROR,
365  (errcode(ERRCODE_OBJECT_IN_USE),
366  errmsg("could not drop replication origin with ID %d, in use by PID %d",
367  state->roident,
368  state->acquired_by)));
369 
370  /*
371  * We must wait and then retry. Since we don't know which CV
372  * to wait on until here, we can't readily use
373  * ConditionVariablePrepareToSleep (calling it here would be
374  * wrong, since we could miss the signal if we did so); just
375  * use ConditionVariableSleep directly.
376  */
377  cv = &state->origin_cv;
378 
379  LWLockRelease(ReplicationOriginLock);
380 
381  ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
382  goto restart;
383  }
384 
385  /* first make a WAL log entry */
386  {
387  xl_replorigin_drop xlrec;
388 
389  xlrec.node_id = roident;
390  XLogBeginInsert();
391  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
392  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
393  }
394 
395  /* then clear the in-memory slot */
396  state->roident = InvalidRepOriginId;
397  state->remote_lsn = InvalidXLogRecPtr;
398  state->local_lsn = InvalidXLogRecPtr;
399  break;
400  }
401  }
402  LWLockRelease(ReplicationOriginLock);
404 }
bool ConditionVariableCancelSleep(void)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)

References ConditionVariableCancelSleep(), ConditionVariableSleep(), ereport, errcode(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, replication_states, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 700 of file origin.c.

701 {
702  const char *path = "pg_logical/replorigin_checkpoint";
703  int fd;
704  int readBytes;
706  int last_state = 0;
707  pg_crc32c file_crc;
708  pg_crc32c crc;
709 
710  /* don't want to overwrite already existing state */
711 #ifdef USE_ASSERT_CHECKING
712  static bool already_started = false;
713 
714  Assert(!already_started);
715  already_started = true;
716 #endif
717 
718  if (max_replication_slots == 0)
719  return;
720 
721  INIT_CRC32C(crc);
722 
723  elog(DEBUG2, "starting up replication origin progress state");
724 
725  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
726 
727  /*
728  * might have had max_replication_slots == 0 last run, or we just brought
729  * up a standby.
730  */
731  if (fd < 0 && errno == ENOENT)
732  return;
733  else if (fd < 0)
734  ereport(PANIC,
736  errmsg("could not open file \"%s\": %m",
737  path)));
738 
739  /* verify magic, that is written even if nothing was active */
740  readBytes = read(fd, &magic, sizeof(magic));
741  if (readBytes != sizeof(magic))
742  {
743  if (readBytes < 0)
744  ereport(PANIC,
746  errmsg("could not read file \"%s\": %m",
747  path)));
748  else
749  ereport(PANIC,
751  errmsg("could not read file \"%s\": read %d of %zu",
752  path, readBytes, sizeof(magic))));
753  }
754  COMP_CRC32C(crc, &magic, sizeof(magic));
755 
756  if (magic != REPLICATION_STATE_MAGIC)
757  ereport(PANIC,
758  (errmsg("replication checkpoint has wrong magic %u instead of %u",
759  magic, REPLICATION_STATE_MAGIC)));
760 
761  /* we can skip locking here, no other access is possible */
762 
763  /* recover individual states, until there are no more to be found */
764  while (true)
765  {
766  ReplicationStateOnDisk disk_state;
767 
768  readBytes = read(fd, &disk_state, sizeof(disk_state));
769 
770  /* no further data */
771  if (readBytes == sizeof(crc))
772  {
773  /* not pretty, but simple ... */
774  file_crc = *(pg_crc32c *) &disk_state;
775  break;
776  }
777 
778  if (readBytes < 0)
779  {
780  ereport(PANIC,
782  errmsg("could not read file \"%s\": %m",
783  path)));
784  }
785 
786  if (readBytes != sizeof(disk_state))
787  {
788  ereport(PANIC,
790  errmsg("could not read file \"%s\": read %d of %zu",
791  path, readBytes, sizeof(disk_state))));
792  }
793 
794  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
795 
796  if (last_state == max_replication_slots)
797  ereport(PANIC,
798  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
799  errmsg("could not find free replication state, increase max_replication_slots")));
800 
801  /* copy data to shared memory */
802  replication_states[last_state].roident = disk_state.roident;
803  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
804  last_state++;
805 
806  ereport(LOG,
807  (errmsg("recovered replication state of node %d to %X/%X",
808  disk_state.roident,
809  LSN_FORMAT_ARGS(disk_state.remote_lsn))));
810  }
811 
812  /* now check checksum */
813  FIN_CRC32C(crc);
814  if (file_crc != crc)
815  ereport(PANIC,
817  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
818  crc, file_crc)));
819 
820  if (CloseTransientFile(fd) != 0)
821  ereport(PANIC,
823  errmsg("could not close file \"%s\": %m",
824  path)));
825 }
#define LOG
Definition: elog.h:31
#define DEBUG2
Definition: elog.h:29
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References Assert(), CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog(), ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ replication_states

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 172 of file origin.c.

Referenced by ReplicationOriginShmemInit().

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp

◆ session_replication_state