PostgreSQL Source Code  git master
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "funcapi.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "nodes/execnodes.h"
#include "replication/origin.h"
#include "replication/logical.h"
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/snapmgr.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
RepOriginId replorigin_by_name (char *roname, bool missing_ok)
 
RepOriginId replorigin_create (char *roname)
 
void replorigin_drop (RepOriginId roident, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 178 of file origin.c.

Referenced by CheckPointReplicationOrigin(), and StartupReplicationOrigin().

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 546 of file origin.c.

References CloseTransientFile(), COMP_CRC32C, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

547 {
548  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
549  const char *path = "pg_logical/replorigin_checkpoint";
550  int tmpfd;
551  int i;
553  pg_crc32c crc;
554 
555  if (max_replication_slots == 0)
556  return;
557 
558  INIT_CRC32C(crc);
559 
560  /* make sure no old temp file is remaining */
561  if (unlink(tmppath) < 0 && errno != ENOENT)
562  ereport(PANIC,
564  errmsg("could not remove file \"%s\": %m",
565  tmppath)));
566 
567  /*
568  * no other backend can perform this at the same time, we're protected by
569  * CheckpointLock.
570  */
571  tmpfd = OpenTransientFile(tmppath,
572  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
573  if (tmpfd < 0)
574  ereport(PANIC,
576  errmsg("could not create file \"%s\": %m",
577  tmppath)));
578 
579  /* write magic */
580  errno = 0;
581  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
582  {
583  /* if write didn't set errno, assume problem is no disk space */
584  if (errno == 0)
585  errno = ENOSPC;
586  ereport(PANIC,
588  errmsg("could not write to file \"%s\": %m",
589  tmppath)));
590  }
591  COMP_CRC32C(crc, &magic, sizeof(magic));
592 
593  /* prevent concurrent creations/drops */
594  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
595 
596  /* write actual data */
597  for (i = 0; i < max_replication_slots; i++)
598  {
599  ReplicationStateOnDisk disk_state;
600  ReplicationState *curstate = &replication_states[i];
601  XLogRecPtr local_lsn;
602 
603  if (curstate->roident == InvalidRepOriginId)
604  continue;
605 
606  /* zero, to avoid uninitialized padding bytes */
607  memset(&disk_state, 0, sizeof(disk_state));
608 
609  LWLockAcquire(&curstate->lock, LW_SHARED);
610 
611  disk_state.roident = curstate->roident;
612 
613  disk_state.remote_lsn = curstate->remote_lsn;
614  local_lsn = curstate->local_lsn;
615 
616  LWLockRelease(&curstate->lock);
617 
618  /* make sure we only write out a commit that's persistent */
619  XLogFlush(local_lsn);
620 
621  errno = 0;
622  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
623  sizeof(disk_state))
624  {
625  /* if write didn't set errno, assume problem is no disk space */
626  if (errno == 0)
627  errno = ENOSPC;
628  ereport(PANIC,
630  errmsg("could not write to file \"%s\": %m",
631  tmppath)));
632  }
633 
634  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
635  }
636 
637  LWLockRelease(ReplicationOriginLock);
638 
639  /* write out the CRC */
640  FIN_CRC32C(crc);
641  errno = 0;
642  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
643  {
644  /* if write didn't set errno, assume problem is no disk space */
645  if (errno == 0)
646  errno = ENOSPC;
647  ereport(PANIC,
649  errmsg("could not write to file \"%s\": %m",
650  tmppath)));
651  }
652 
653  if (CloseTransientFile(tmpfd) != 0)
654  ereport(PANIC,
656  errmsg("could not close file \"%s\": %m",
657  tmppath)));
658 
659  /* fsync, rename to permanent file, fsync file and directory */
660  durable_rename(tmppath, path, PANIC);
661 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr local_lsn
Definition: origin.c:121
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_lsn
Definition: origin.c:145
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:144
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2798
#define PG_BINARY
Definition: c.h:1191
RepOriginId roident
Definition: origin.c:109
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2257
LWLock lock
Definition: origin.c:136
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:608
int CloseTransientFile(int fd)
Definition: fd.c:2434
#define REPLICATION_STATE_MAGIC
Definition: origin.c:178
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:114
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:784
int i
static ReplicationState * replication_states
Definition: origin.c:167
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1411 of file origin.c.

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

1412 {
1413  text *name = PG_GETARG_TEXT_PP(0);
1414  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1415  RepOriginId node;
1416 
1417  replorigin_check_prerequisites(true, false);
1418 
1419  /* lock to prevent the replication origin from vanishing */
1420  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1421 
1422  node = replorigin_by_name(text_to_cstring(name), false);
1423 
1424  /*
1425  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1426  * xact hasn't committed yet. This is why this function should be used to
1427  * set up the initial replication state, but not for replay.
1428  */
1429  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1430  true /* go backward */ , true /* WAL log */ );
1431 
1432  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1433 
1434  PG_RETURN_VOID();
1435 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:199
uint16 RepOriginId
Definition: xlogdefs.h:58
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:861
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:303
#define RowExclusiveLock
Definition: lockdefs.h:38
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:339
uint64 XLogRecPtr
Definition: xlogdefs.h:21
const char * name
Definition: encode.c:521
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:549
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:108

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1224 of file origin.c.

References DatumGetPointer, elog, ereport, errcode(), errdetail(), errmsg(), ERROR, IsReservedName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), ReplicationState::roident, text_to_cstring(), and WARNING.

1225 {
1226  char *name;
1227  RepOriginId roident;
1228 
1229  replorigin_check_prerequisites(false, false);
1230 
1232 
1233  /* Replication origins "pg_xxx" are reserved for internal use */
1234  if (IsReservedName(name))
1235  ereport(ERROR,
1236  (errcode(ERRCODE_RESERVED_NAME),
1237  errmsg("replication origin name \"%s\" is reserved",
1238  name),
1239  errdetail("Origin names starting with \"pg_\" are reserved.")));
1240 
1241  /*
1242  * If built with appropriate switch, whine when regression-testing
1243  * conventions for replication origin names are violated.
1244  */
1245 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1246  if (strncmp(name, "regress_", 8) != 0)
1247  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1248 #endif
1249 
1250  roident = replorigin_create(name);
1251 
1252  pfree(name);
1253 
1254  PG_RETURN_OID(roident);
1255 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
int errcode(int sqlerrcode)
Definition: elog.c:570
bool IsReservedName(const char *name)
Definition: catalog.c:214
void pfree(void *pointer)
Definition: mcxt.c:1031
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:860
RepOriginId replorigin_create(char *roname)
Definition: origin.c:243
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
Definition: c.h:549
#define PG_RETURN_OID(x)
Definition: fmgr.h:350

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1261 of file origin.c.

References Assert, DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_drop(), ReplicationState::roident, and text_to_cstring().

1262 {
1263  char *name;
1264  RepOriginId roident;
1265 
1266  replorigin_check_prerequisites(false, false);
1267 
1269 
1270  roident = replorigin_by_name(name, false);
1271  Assert(OidIsValid(roident));
1272 
1273  replorigin_drop(roident, true);
1274 
1275  pfree(name);
1276 
1277  PG_RETURN_VOID();
1278 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:335
uint16 RepOriginId
Definition: xlogdefs.h:58
#define OidIsValid(objectId)
Definition: c.h:638
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
void pfree(void *pointer)
Definition: mcxt.c:1031
#define PG_RETURN_VOID()
Definition: fmgr.h:339
#define Assert(condition)
Definition: c.h:732
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:549

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1284 of file origin.c.

References DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), ReplicationState::roident, and text_to_cstring().

1285 {
1286  char *name;
1287  RepOriginId roident;
1288 
1289  replorigin_check_prerequisites(false, false);
1290 
1292  roident = replorigin_by_name(name, true);
1293 
1294  pfree(name);
1295 
1296  if (OidIsValid(roident))
1297  PG_RETURN_OID(roident);
1298  PG_RETURN_NULL();
1299 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
#define OidIsValid(objectId)
Definition: c.h:638
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
void pfree(void *pointer)
Definition: mcxt.c:1031
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:549
#define PG_RETURN_OID(x)
Definition: fmgr.h:350
#define PG_RETURN_NULL()
Definition: fmgr.h:335

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1446 of file origin.c.

References Assert, DatumGetPointer, InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), ReplicationState::roident, and text_to_cstring().

1447 {
1448  char *name;
1449  bool flush;
1450  RepOriginId roident;
1451  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1452 
1453  replorigin_check_prerequisites(true, true);
1454 
1456  flush = PG_GETARG_BOOL(1);
1457 
1458  roident = replorigin_by_name(name, false);
1459  Assert(OidIsValid(roident));
1460 
1461  remote_lsn = replorigin_get_progress(roident, flush);
1462 
1463  if (remote_lsn == InvalidXLogRecPtr)
1464  PG_RETURN_NULL();
1465 
1466  PG_RETURN_LSN(remote_lsn);
1467 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
#define OidIsValid(objectId)
Definition: c.h:638
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:549
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:984
#define PG_RETURN_NULL()
Definition: fmgr.h:335

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1344 of file origin.c.

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

1345 {
1346  replorigin_check_prerequisites(false, false);
1347 
1349 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:349
RepOriginId replorigin_session_origin
Definition: origin.c:156
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1360 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_check_prerequisites(), and replorigin_session_get_progress().

1361 {
1362  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1363  bool flush = PG_GETARG_BOOL(0);
1364 
1365  replorigin_check_prerequisites(true, false);
1366 
1367  if (session_replication_state == NULL)
1368  ereport(ERROR,
1369  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1370  errmsg("no replication origin is configured")));
1371 
1372  remote_lsn = replorigin_session_get_progress(flush);
1373 
1374  if (remote_lsn == InvalidXLogRecPtr)
1375  PG_RETURN_NULL();
1376 
1377  PG_RETURN_LSN(remote_lsn);
1378 }
static ReplicationState * session_replication_state
Definition: origin.c:175
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1192
int errcode(int sqlerrcode)
Definition: elog.c:570
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define PG_RETURN_NULL()
Definition: fmgr.h:335

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1327 of file origin.c.

References InvalidRepOriginId, InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and replorigin_session_reset().

1328 {
1329  replorigin_check_prerequisites(true, false);
1330 
1332 
1336 
1337  PG_RETURN_VOID();
1338 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void replorigin_session_reset(void)
Definition: origin.c:1145
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
#define PG_RETURN_VOID()
Definition: fmgr.h:339
RepOriginId replorigin_session_origin
Definition: origin.c:156
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1305 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

1306 {
1307  char *name;
1308  RepOriginId origin;
1309 
1310  replorigin_check_prerequisites(true, false);
1311 
1313  origin = replorigin_by_name(name, false);
1314  replorigin_session_setup(origin);
1315 
1316  replorigin_session_origin = origin;
1317 
1318  pfree(name);
1319 
1320  PG_RETURN_VOID();
1321 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1057
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
void pfree(void *pointer)
Definition: mcxt.c:1031
#define PG_RETURN_VOID()
Definition: fmgr.h:339
RepOriginId replorigin_session_origin
Definition: origin.c:156
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:549

◆ pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1399 of file origin.c.

References InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1400 {
1401  replorigin_check_prerequisites(true, false);
1402 
1405 
1406  PG_RETURN_VOID();
1407 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
#define PG_RETURN_VOID()
Definition: fmgr.h:339

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1381 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1382 {
1383  XLogRecPtr location = PG_GETARG_LSN(0);
1384 
1385  replorigin_check_prerequisites(true, false);
1386 
1387  if (session_replication_state == NULL)
1388  ereport(ERROR,
1389  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1390  errmsg("no replication origin is configured")));
1391 
1392  replorigin_session_origin_lsn = location;
1394 
1395  PG_RETURN_VOID();
1396 }
static ReplicationState * session_replication_state
Definition: origin.c:175
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
int errcode(int sqlerrcode)
Definition: elog.c:570
#define ERROR
Definition: elog.h:43
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
#define ereport(elevel, rest)
Definition: elog.h:141
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:339
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1471 of file origin.c.

References ReturnSetInfo::allowedModes, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, InvalidRepOriginId, IsA, ReplicationState::local_lsn, ReplicationState::lock, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ObjectIdGetDatum, ReplicationState::remote_lsn, REPLICATION_ORIGIN_PROGRESS_COLS, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::returnMode, ReplicationState::roident, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1472 {
1473  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1474  TupleDesc tupdesc;
1475  Tuplestorestate *tupstore;
1476  MemoryContext per_query_ctx;
1477  MemoryContext oldcontext;
1478  int i;
1480 
1481  /* we want to return 0 rows if slot is set to zero */
1482  replorigin_check_prerequisites(false, true);
1483 
1484  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1485  ereport(ERROR,
1486  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1487  errmsg("set-valued function called in context that cannot accept a set")));
1488  if (!(rsinfo->allowedModes & SFRM_Materialize))
1489  ereport(ERROR,
1490  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1491  errmsg("materialize mode required, but it is not allowed in this context")));
1492  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1493  elog(ERROR, "return type must be a row type");
1494 
1495  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1496  elog(ERROR, "wrong function definition");
1497 
1498  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1499  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1500 
1501  tupstore = tuplestore_begin_heap(true, false, work_mem);
1502  rsinfo->returnMode = SFRM_Materialize;
1503  rsinfo->setResult = tupstore;
1504  rsinfo->setDesc = tupdesc;
1505 
1506  MemoryContextSwitchTo(oldcontext);
1507 
1508 
1509  /* prevent slots from being concurrently dropped */
1510  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1511 
1512  /*
1513  * Iterate through all possible replication_states, display if they are
1514  * filled. Note that we do not take any locks, so slightly corrupted/out
1515  * of date values are a possibility.
1516  */
1517  for (i = 0; i < max_replication_slots; i++)
1518  {
1522  char *roname;
1523 
1524  state = &replication_states[i];
1525 
1526  /* unused slot, nothing to display */
1527  if (state->roident == InvalidRepOriginId)
1528  continue;
1529 
1530  memset(values, 0, sizeof(values));
1531  memset(nulls, 1, sizeof(nulls));
1532 
1533  values[0] = ObjectIdGetDatum(state->roident);
1534  nulls[0] = false;
1535 
1536  /*
1537  * We're not preventing the origin to be dropped concurrently, so
1538  * silently accept that it might be gone.
1539  */
1540  if (replorigin_by_oid(state->roident, true,
1541  &roname))
1542  {
1543  values[1] = CStringGetTextDatum(roname);
1544  nulls[1] = false;
1545  }
1546 
1547  LWLockAcquire(&state->lock, LW_SHARED);
1548 
1549  values[2] = LSNGetDatum(state->remote_lsn);
1550  nulls[2] = false;
1551 
1552  values[3] = LSNGetDatum(state->local_lsn);
1553  nulls[3] = false;
1554 
1555  LWLockRelease(&state->lock);
1556 
1557  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1558  }
1559 
1560  tuplestore_donestoring(tupstore);
1561 
1562  LWLockRelease(ReplicationOriginLock);
1563 
1564 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1565 
1566  return (Datum) 0;
1567 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:181
#define IsA(nodeptr, _type_)
Definition: nodes.h:575
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
XLogRecPtr local_lsn
Definition: origin.c:121
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:435
RepOriginId roident
Definition: origin.c:109
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:136
#define ereport(elevel, rest)
Definition: elog.h:141
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
uintptr_t Datum
Definition: postgres.h:367
int work_mem
Definition: globals.c:121
int allowedModes
Definition: execnodes.h:303
SetFunctionReturnMode returnMode
Definition: execnodes.h:305
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:114
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:231
#define InvalidRepOriginId
Definition: origin.h:33
Tuplestorestate * setResult
Definition: execnodes.h:308
static Datum values[MAXATTR]
Definition: bootstrap.c:167
ExprContext * econtext
Definition: execnodes.h:301
TupleDesc setDesc
Definition: execnodes.h:309
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
int i
static ReplicationState * replication_states
Definition: origin.c:167
#define CStringGetTextDatum(s)
Definition: builtins.h:83
#define REPLICATION_ORIGIN_PROGRESS_COLS

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1025 of file origin.c.

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, and ReplicationState::origin_cv.

Referenced by replorigin_session_setup().

1026 {
1027  ConditionVariable *cv = NULL;
1028 
1029  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1030 
1031  if (session_replication_state != NULL &&
1033  {
1035 
1038  }
1039 
1040  LWLockRelease(ReplicationOriginLock);
1041 
1042  if (cv)
1044 }
static ReplicationState * session_replication_state
Definition: origin.c:175
int MyProcPid
Definition: globals.c:40
void ConditionVariableBroadcast(ConditionVariable *cv)
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
ConditionVariable origin_cv
Definition: origin.c:131
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 496 of file origin.c.

References ConditionVariableInit(), i, ReplicationState::lock, LWLockInitialize(), LWLockRegisterTranche(), LWTRANCHE_REPLICATION_ORIGIN, max_replication_slots, MemSet, ReplicationState::origin_cv, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateSharedMemoryAndSemaphores().

497 {
498  bool found;
499 
500  if (max_replication_slots == 0)
501  return;
502 
504  ShmemInitStruct("ReplicationOriginState",
506  &found);
508 
509  if (!found)
510  {
511  int i;
512 
514 
516 
517  for (i = 0; i < max_replication_slots; i++)
518  {
522  }
523  }
524 
526  "replication_origin");
527 }
#define MemSet(start, val, len)
Definition: c.h:955
void LWLockRegisterTranche(int tranche_id, const char *tranche_name)
Definition: lwlock.c:603
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:678
Size ReplicationOriginShmemSize(void)
Definition: origin.c:476
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:168
int max_replication_slots
Definition: slot.c:99
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:152
int i
static ReplicationState * replication_states
Definition: origin.c:167

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 476 of file origin.c.

References add_size(), max_replication_slots, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and ReplicationOriginShmemInit().

477 {
478  Size size = 0;
479 
480  /*
481  * XXX: max_replication_slots is arguably the wrong thing to use, as here
482  * we keep the replay state of *remote* transactions. But for now it seems
483  * sufficient to reuse it, lest we introduce a separate GUC.
484  */
485  if (max_replication_slots == 0)
486  return size;
487 
488  size = add_size(size, offsetof(ReplicationStateCtl, states));
489 
490  size = add_size(size,
492  return size;
493 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int max_replication_slots
Definition: slot.c:99
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:655

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 861 of file origin.c.

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), and xact_redo_commit().

864 {
865  int i;
866  ReplicationState *replication_state = NULL;
867  ReplicationState *free_state = NULL;
868 
869  Assert(node != InvalidRepOriginId);
870 
871  /* we don't track DoNotReplicateId */
872  if (node == DoNotReplicateId)
873  return;
874 
875  /*
876  * XXX: For the case where this is called by WAL replay, it'd be more
877  * efficient to restore into a backend local hashtable and only dump into
878  * shmem after recovery is finished. Let's wait with implementing that
879  * till it's shown to be a measurable expense
880  */
881 
882  /* Lock exclusively, as we may have to create a new table entry. */
883  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
884 
885  /*
886  * Search for either an existing slot for the origin, or a free one we can
887  * use.
888  */
889  for (i = 0; i < max_replication_slots; i++)
890  {
891  ReplicationState *curstate = &replication_states[i];
892 
893  /* remember where to insert if necessary */
894  if (curstate->roident == InvalidRepOriginId &&
895  free_state == NULL)
896  {
897  free_state = curstate;
898  continue;
899  }
900 
901  /* not our slot */
902  if (curstate->roident != node)
903  {
904  continue;
905  }
906 
907  /* ok, found slot */
908  replication_state = curstate;
909 
910  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
911 
912  /* Make sure it's not used by somebody else */
913  if (replication_state->acquired_by != 0)
914  {
915  ereport(ERROR,
916  (errcode(ERRCODE_OBJECT_IN_USE),
917  errmsg("replication origin with OID %d is already active for PID %d",
918  replication_state->roident,
919  replication_state->acquired_by)));
920  }
921 
922  break;
923  }
924 
925  if (replication_state == NULL && free_state == NULL)
926  ereport(ERROR,
927  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
928  errmsg("could not find free replication state slot for replication origin with OID %u",
929  node),
930  errhint("Increase max_replication_slots and try again.")));
931 
932  if (replication_state == NULL)
933  {
934  /* initialize new slot */
935  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
936  replication_state = free_state;
937  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
938  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
939  replication_state->roident = node;
940  }
941 
942  Assert(replication_state->roident != InvalidRepOriginId);
943 
944  /*
945  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
946  * and the standby gets the message. Primarily this will be called during
947  * WAL replay (of commit records) where no WAL logging is necessary.
948  */
949  if (wal_log)
950  {
951  xl_replorigin_set xlrec;
952 
953  xlrec.remote_lsn = remote_commit;
954  xlrec.node_id = node;
955  xlrec.force = go_backward;
956 
957  XLogBeginInsert();
958  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
959 
960  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
961  }
962 
963  /*
964  * Due to - harmless - race conditions during a checkpoint we could see
965  * values here that are older than the ones we already have in memory.
966  * Don't overwrite those.
967  */
968  if (go_backward || replication_state->remote_lsn < remote_commit)
969  replication_state->remote_lsn = remote_commit;
970  if (local_commit != InvalidXLogRecPtr &&
971  (go_backward || replication_state->local_lsn < local_commit))
972  replication_state->local_lsn = local_commit;
973  LWLockRelease(&replication_state->lock);
974 
975  /*
976  * Release *after* changing the LSNs, slot isn't acquired and thus could
977  * otherwise be dropped anytime.
978  */
979  LWLockRelease(ReplicationOriginLock);
980 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int errhint(const char *fmt,...)
Definition: elog.c:974
XLogRecPtr local_lsn
Definition: origin.c:121
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:570
RepOriginId roident
Definition: origin.c:109
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:136
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:114
#define Assert(condition)
Definition: c.h:732
RepOriginId node_id
Definition: origin.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:784
int i
static ReplicationState * replication_states
Definition: origin.c:167
void XLogBeginInsert(void)
Definition: xloginsert.c:120

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( char *  roname,
bool  missing_ok 
)

Definition at line 212 of file origin.c.

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidOid, ReleaseSysCache(), REPLORIGNAME, ReplicationState::roident, and SearchSysCache1().

Referenced by ApplyWorkerMain(), DropSubscription(), pg_replication_origin_advance(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), and pg_replication_origin_session_setup().

213 {
215  Oid roident = InvalidOid;
216  HeapTuple tuple;
217  Datum roname_d;
218 
219  roname_d = CStringGetTextDatum(roname);
220 
221  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
222  if (HeapTupleIsValid(tuple))
223  {
224  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
225  roident = ident->roident;
226  ReleaseSysCache(tuple);
227  }
228  else if (!missing_ok)
229  ereport(ERROR,
230  (errcode(ERRCODE_UNDEFINED_OBJECT),
231  errmsg("replication origin \"%s\" does not exist",
232  roname)));
233 
234  return roident;
235 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:570
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1124
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1172
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define CStringGetTextDatum(s)
Definition: builtins.h:83

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 435 of file origin.c.

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum, OidIsValid, ReleaseSysCache(), REPLORIGIDENT, SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and pgoutput_begin_txn().

436 {
437  HeapTuple tuple;
439 
440  Assert(OidIsValid((Oid) roident));
441  Assert(roident != InvalidRepOriginId);
442  Assert(roident != DoNotReplicateId);
443 
445  ObjectIdGetDatum((Oid) roident));
446 
447  if (HeapTupleIsValid(tuple))
448  {
449  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
450  *roname = text_to_cstring(&ric->roname);
451  ReleaseSysCache(tuple);
452 
453  return true;
454  }
455  else
456  {
457  *roname = NULL;
458 
459  if (!missing_ok)
460  ereport(ERROR,
461  (errcode(ERRCODE_UNDEFINED_OBJECT),
462  errmsg("replication origin with OID %u does not exist",
463  roident)));
464 
465  return false;
466  }
467 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:570
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:638
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1124
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1172
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:732
#define InvalidRepOriginId
Definition: origin.h:33
char * text_to_cstring(const text *t)
Definition: varlena.c:204
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ replorigin_check_prerequisites()

static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 181 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, RecoveryInProgress(), and superuser().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

182 {
183  if (!superuser())
184  ereport(ERROR,
185  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
186  errmsg("only superusers can query or manipulate replication origins")));
187 
188  if (check_slots && max_replication_slots == 0)
189  ereport(ERROR,
190  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
191  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
192 
193  if (!recoveryOK && RecoveryInProgress())
194  ereport(ERROR,
195  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
196  errmsg("cannot manipulate replication origins during recovery")));
197 
198 }
int errcode(int sqlerrcode)
Definition: elog.c:570
bool superuser(void)
Definition: superuser.c:47
bool RecoveryInProgress(void)
Definition: xlog.c:7898
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int max_replication_slots
Definition: slot.c:99
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ replorigin_create()

RepOriginId replorigin_create ( char *  roname)

Definition at line 243 of file origin.c.

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum, PG_UINT16_MAX, RelationGetDescr, ReplicationOriginIdentIndex, ReplicationState::roident, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by ApplyWorkerMain(), CreateSubscription(), and pg_replication_origin_create().

244 {
245  Oid roident;
246  HeapTuple tuple = NULL;
247  Relation rel;
248  Datum roname_d;
249  SnapshotData SnapshotDirty;
250  SysScanDesc scan;
252 
253  roname_d = CStringGetTextDatum(roname);
254 
256 
257  /*
258  * We need the numeric replication origin to be 16bit wide, so we cannot
259  * rely on the normal oid allocation. Instead we simply scan
260  * pg_replication_origin for the first unused id. That's not particularly
261  * efficient, but this should be a fairly infrequent operation - we can
262  * easily spend a bit more code on this when it turns out it needs to be
263  * faster.
264  *
265  * We handle concurrency by taking an exclusive lock (allowing reads!)
266  * over the table for the duration of the search. Because we use a "dirty
267  * snapshot" we can read rows that other in-progress sessions have
268  * written, even though they would be invisible with normal snapshots. Due
269  * to the exclusive lock there's no danger that new rows can appear while
270  * we're checking.
271  */
272  InitDirtySnapshot(SnapshotDirty);
273 
274  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
275 
276  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
277  {
278  bool nulls[Natts_pg_replication_origin];
279  Datum values[Natts_pg_replication_origin];
280  bool collides;
281 
283 
284  ScanKeyInit(&key,
285  Anum_pg_replication_origin_roident,
286  BTEqualStrategyNumber, F_OIDEQ,
287  ObjectIdGetDatum(roident));
288 
290  true /* indexOK */ ,
291  &SnapshotDirty,
292  1, &key);
293 
294  collides = HeapTupleIsValid(systable_getnext(scan));
295 
296  systable_endscan(scan);
297 
298  if (!collides)
299  {
300  /*
301  * Ok, found an unused roident, insert the new row and do a CCI,
302  * so our callers can look it up if they want to.
303  */
304  memset(&nulls, 0, sizeof(nulls));
305 
306  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
307  values[Anum_pg_replication_origin_roname - 1] = roname_d;
308 
309  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
310  CatalogTupleInsert(rel, tuple);
312  break;
313  }
314  }
315 
316  /* now release lock again, */
318 
319  if (tuple == NULL)
320  ereport(ERROR,
321  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
322  errmsg("could not find free replication origin OID")));
323 
324  heap_freetuple(tuple);
325  return roident;
326 }
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:77
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:525
#define RelationGetDescr(relation)
Definition: rel.h:442
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:570
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:352
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:444
#define ReplicationOriginIdentIndex
Definition: indexing.h:340
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:439
#define ereport(elevel, rest)
Definition: elog.h:141
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1003
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:732
bool IsTransactionState(void)
Definition: xact.c:356
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:83
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
#define BTEqualStrategyNumber
Definition: stratnum.h:31

◆ replorigin_drop()

void replorigin_drop ( RepOriginId  roident,
bool  nowait 
)

Definition at line 335 of file origin.c.

References ReplicationState::acquired_by, Assert, CatalogTupleDelete(), CommandCounterIncrement(), ConditionVariableCancelSleep(), ConditionVariableSleep(), elog, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, HeapTupleIsValid, i, InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, ObjectIdGetDatum, ReplicationState::origin_cv, ReleaseSysCache(), ReplicationState::remote_lsn, REPLORIGIDENT, ReplicationState::roident, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), WAIT_EVENT_REPLICATION_ORIGIN_DROP, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by DropSubscription(), and pg_replication_origin_drop().

336 {
337  HeapTuple tuple;
338  Relation rel;
339  int i;
340 
342 
343  /*
344  * To interlock against concurrent drops, we hold ExclusiveLock on
345  * pg_replication_origin throughout this function.
346  */
347  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
348 
349  /*
350  * First, clean up the slot state info, if there is any matching slot.
351  */
352 restart:
353  tuple = NULL;
354  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
355 
356  for (i = 0; i < max_replication_slots; i++)
357  {
359 
360  if (state->roident == roident)
361  {
362  /* found our slot, is it busy? */
363  if (state->acquired_by != 0)
364  {
365  ConditionVariable *cv;
366 
367  if (nowait)
368  ereport(ERROR,
369  (errcode(ERRCODE_OBJECT_IN_USE),
370  errmsg("could not drop replication origin with OID %d, in use by PID %d",
371  state->roident,
372  state->acquired_by)));
373 
374  /*
375  * We must wait and then retry. Since we don't know which CV
376  * to wait on until here, we can't readily use
377  * ConditionVariablePrepareToSleep (calling it here would be
378  * wrong, since we could miss the signal if we did so); just
379  * use ConditionVariableSleep directly.
380  */
381  cv = &state->origin_cv;
382 
383  LWLockRelease(ReplicationOriginLock);
384 
386  goto restart;
387  }
388 
389  /* first make a WAL log entry */
390  {
391  xl_replorigin_drop xlrec;
392 
393  xlrec.node_id = roident;
394  XLogBeginInsert();
395  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
396  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
397  }
398 
399  /* then clear the in-memory slot */
400  state->roident = InvalidRepOriginId;
401  state->remote_lsn = InvalidXLogRecPtr;
402  state->local_lsn = InvalidXLogRecPtr;
403  break;
404  }
405  }
406  LWLockRelease(ReplicationOriginLock);
408 
409  /*
410  * Now, we can delete the catalog entry.
411  */
413  if (!HeapTupleIsValid(tuple))
414  elog(ERROR, "cache lookup failed for replication origin with oid %u",
415  roident);
416 
417  CatalogTupleDelete(rel, &tuple->t_self);
418  ReleaseSysCache(tuple);
419 
421 
422  /* now release lock again */
424 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
XLogRecPtr local_lsn
Definition: origin.c:121
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:570
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
RepOriginId roident
Definition: origin.c:109
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
void ConditionVariableCancelSleep(void)
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1124
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
void CommandCounterIncrement(void)
Definition: xact.c:1003
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1172
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:114
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
RepOriginId node_id
Definition: origin.h:27
#define Assert(condition)
Definition: c.h:732
Definition: regguts.h:298
ConditionVariable origin_cv
Definition: origin.c:131
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool IsTransactionState(void)
Definition: xact.c:356
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
int i
static ReplicationState * replication_states
Definition: origin.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void XLogBeginInsert(void)
Definition: xloginsert.c:120

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 984 of file origin.c.

References i, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::remote_lsn, ReplicationState::roident, and XLogFlush().

Referenced by pg_replication_origin_progress().

985 {
986  int i;
987  XLogRecPtr local_lsn = InvalidXLogRecPtr;
988  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
989 
990  /* prevent slots from being concurrently dropped */
991  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
992 
993  for (i = 0; i < max_replication_slots; i++)
994  {
996 
997  state = &replication_states[i];
998 
999  if (state->roident == node)
1000  {
1001  LWLockAcquire(&state->lock, LW_SHARED);
1002 
1003  remote_lsn = state->remote_lsn;
1004  local_lsn = state->local_lsn;
1005 
1006  LWLockRelease(&state->lock);
1007 
1008  break;
1009  }
1010  }
1011 
1012  LWLockRelease(ReplicationOriginLock);
1013 
1014  if (flush && local_lsn != InvalidXLogRecPtr)
1015  XLogFlush(local_lsn);
1016 
1017  return remote_lsn;
1018 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:121
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2798
RepOriginId roident
Definition: origin.c:109
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LWLock lock
Definition: origin.c:136
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:114
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
static ReplicationState * replication_states
Definition: origin.c:167

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 800 of file origin.c.

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, replorigin_advance(), ReplicationState::roident, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

801 {
802  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
803 
804  switch (info)
805  {
806  case XLOG_REPLORIGIN_SET:
807  {
808  xl_replorigin_set *xlrec =
809  (xl_replorigin_set *) XLogRecGetData(record);
810 
812  xlrec->remote_lsn, record->EndRecPtr,
813  xlrec->force /* backward */ ,
814  false /* WAL log */ );
815  break;
816  }
818  {
819  xl_replorigin_drop *xlrec;
820  int i;
821 
822  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
823 
824  for (i = 0; i < max_replication_slots; i++)
825  {
827 
828  /* found our slot */
829  if (state->roident == xlrec->node_id)
830  {
831  /* reset entry */
832  state->roident = InvalidRepOriginId;
833  state->remote_lsn = InvalidXLogRecPtr;
834  state->local_lsn = InvalidXLogRecPtr;
835  break;
836  }
837  }
838  break;
839  }
840  default:
841  elog(PANIC, "replorigin_redo: unknown op code %u", info);
842  }
843 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:121
unsigned char uint8
Definition: c.h:356
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:861
#define PANIC
Definition: elog.h:53
RepOriginId roident
Definition: origin.c:109
XLogRecPtr EndRecPtr
Definition: xlogreader.h:124
#define XLogRecGetData(decoder)
Definition: xlogreader.h:237
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:233
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:114
RepOriginId node_id
Definition: origin.h:27
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
#define elog(elevel,...)
Definition: elog.h:226
int i
static ReplicationState * replication_states
Definition: origin.c:167

◆ replorigin_session_advance()

void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1174 of file origin.c.

References Assert, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and ReplicationState::roident.

Referenced by EndPrepare(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

1175 {
1178 
1180  if (session_replication_state->local_lsn < local_commit)
1181  session_replication_state->local_lsn = local_commit;
1182  if (session_replication_state->remote_lsn < remote_commit)
1183  session_replication_state->remote_lsn = remote_commit;
1185 }
static ReplicationState * session_replication_state
Definition: origin.c:175
XLogRecPtr local_lsn
Definition: origin.c:121
RepOriginId roident
Definition: origin.c:109
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LWLock lock
Definition: origin.c:136
XLogRecPtr remote_lsn
Definition: origin.c:114
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define InvalidRepOriginId
Definition: origin.h:33

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1192 of file origin.c.

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and XLogFlush().

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_progress().

1193 {
1194  XLogRecPtr remote_lsn;
1195  XLogRecPtr local_lsn;
1196 
1198 
1200  remote_lsn = session_replication_state->remote_lsn;
1201  local_lsn = session_replication_state->local_lsn;
1203 
1204  if (flush && local_lsn != InvalidXLogRecPtr)
1205  XLogFlush(local_lsn);
1206 
1207  return remote_lsn;
1208 }
static ReplicationState * session_replication_state
Definition: origin.c:175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:121
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2798
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LWLock lock
Definition: origin.c:136
XLogRecPtr remote_lsn
Definition: origin.c:114
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1145 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, and ReplicationState::origin_cv.

Referenced by pg_replication_origin_session_reset().

1146 {
1147  ConditionVariable *cv;
1148 
1150 
1151  if (session_replication_state == NULL)
1152  ereport(ERROR,
1153  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1154  errmsg("no replication origin is configured")));
1155 
1156  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1157 
1161 
1162  LWLockRelease(ReplicationOriginLock);
1163 
1165 }
static ReplicationState * session_replication_state
Definition: origin.c:175
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:570
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int max_replication_slots
Definition: slot.c:99
#define Assert(condition)
Definition: c.h:732
ConditionVariable origin_cv
Definition: origin.c:131
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node)

Definition at line 1057 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, ReplicationOriginExitCleanup(), and ReplicationState::roident.

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_setup().

1058 {
1059  static bool registered_cleanup;
1060  int i;
1061  int free_slot = -1;
1062 
1063  if (!registered_cleanup)
1064  {
1066  registered_cleanup = true;
1067  }
1068 
1070 
1071  if (session_replication_state != NULL)
1072  ereport(ERROR,
1073  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1074  errmsg("cannot setup replication origin when one is already setup")));
1075 
1076  /* Lock exclusively, as we may have to create a new table entry. */
1077  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1078 
1079  /*
1080  * Search for either an existing slot for the origin, or a free one we can
1081  * use.
1082  */
1083  for (i = 0; i < max_replication_slots; i++)
1084  {
1085  ReplicationState *curstate = &replication_states[i];
1086 
1087  /* remember where to insert if necessary */
1088  if (curstate->roident == InvalidRepOriginId &&
1089  free_slot == -1)
1090  {
1091  free_slot = i;
1092  continue;
1093  }
1094 
1095  /* not our slot */
1096  if (curstate->roident != node)
1097  continue;
1098 
1099  else if (curstate->acquired_by != 0)
1100  {
1101  ereport(ERROR,
1102  (errcode(ERRCODE_OBJECT_IN_USE),
1103  errmsg("replication identifier %d is already active for PID %d",
1104  curstate->roident, curstate->acquired_by)));
1105  }
1106 
1107  /* ok, found slot */
1108  session_replication_state = curstate;
1109  }
1110 
1111 
1112  if (session_replication_state == NULL && free_slot == -1)
1113  ereport(ERROR,
1114  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1115  errmsg("could not find free replication state slot for replication origin with OID %u",
1116  node),
1117  errhint("Increase max_replication_slots and try again.")));
1118  else if (session_replication_state == NULL)
1119  {
1120  /* initialize new slot */
1125  }
1126 
1127 
1129 
1131 
1132  LWLockRelease(ReplicationOriginLock);
1133 
1134  /* probably this one is pointless */
1136 }
static ReplicationState * session_replication_state
Definition: origin.c:175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
int errhint(const char *fmt,...)
Definition: elog.c:974
XLogRecPtr local_lsn
Definition: origin.c:121
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1025
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:570
RepOriginId roident
Definition: origin.c:109
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ERROR
Definition: elog.h:43
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
#define ereport(elevel, rest)
Definition: elog.h:141
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:114
#define Assert(condition)
Definition: c.h:732
ConditionVariable origin_cv
Definition: origin.c:131
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:784
int i
static ReplicationState * replication_states
Definition: origin.c:167

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 672 of file origin.c.

References Assert, CloseTransientFile(), COMP_CRC32C, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

673 {
674  const char *path = "pg_logical/replorigin_checkpoint";
675  int fd;
676  int readBytes;
678  int last_state = 0;
679  pg_crc32c file_crc;
680  pg_crc32c crc;
681 
682  /* don't want to overwrite already existing state */
683 #ifdef USE_ASSERT_CHECKING
684  static bool already_started = false;
685 
686  Assert(!already_started);
687  already_started = true;
688 #endif
689 
690  if (max_replication_slots == 0)
691  return;
692 
693  INIT_CRC32C(crc);
694 
695  elog(DEBUG2, "starting up replication origin progress state");
696 
697  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
698 
699  /*
700  * might have had max_replication_slots == 0 last run, or we just brought
701  * up a standby.
702  */
703  if (fd < 0 && errno == ENOENT)
704  return;
705  else if (fd < 0)
706  ereport(PANIC,
708  errmsg("could not open file \"%s\": %m",
709  path)));
710 
711  /* verify magic, that is written even if nothing was active */
712  readBytes = read(fd, &magic, sizeof(magic));
713  if (readBytes != sizeof(magic))
714  {
715  if (readBytes < 0)
716  ereport(PANIC,
718  errmsg("could not read file \"%s\": %m",
719  path)));
720  else
721  ereport(PANIC,
723  errmsg("could not read file \"%s\": read %d of %zu",
724  path, readBytes, sizeof(magic))));
725  }
726  COMP_CRC32C(crc, &magic, sizeof(magic));
727 
728  if (magic != REPLICATION_STATE_MAGIC)
729  ereport(PANIC,
730  (errmsg("replication checkpoint has wrong magic %u instead of %u",
731  magic, REPLICATION_STATE_MAGIC)));
732 
733  /* we can skip locking here, no other access is possible */
734 
735  /* recover individual states, until there are no more to be found */
736  while (true)
737  {
738  ReplicationStateOnDisk disk_state;
739 
740  readBytes = read(fd, &disk_state, sizeof(disk_state));
741 
742  /* no further data */
743  if (readBytes == sizeof(crc))
744  {
745  /* not pretty, but simple ... */
746  file_crc = *(pg_crc32c *) &disk_state;
747  break;
748  }
749 
750  if (readBytes < 0)
751  {
752  ereport(PANIC,
754  errmsg("could not read file \"%s\": %m",
755  path)));
756  }
757 
758  if (readBytes != sizeof(disk_state))
759  {
760  ereport(PANIC,
762  errmsg("could not read file \"%s\": read %d of %zu",
763  path, readBytes, sizeof(disk_state))));
764  }
765 
766  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
767 
768  if (last_state == max_replication_slots)
769  ereport(PANIC,
770  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
771  errmsg("could not find free replication state, increase max_replication_slots")));
772 
773  /* copy data to shared memory */
774  replication_states[last_state].roident = disk_state.roident;
775  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
776  last_state++;
777 
778  elog(LOG, "recovered replication state of node %u to %X/%X",
779  disk_state.roident,
780  (uint32) (disk_state.remote_lsn >> 32),
781  (uint32) disk_state.remote_lsn);
782  }
783 
784  /* now check checksum */
785  FIN_CRC32C(crc);
786  if (file_crc != crc)
787  ereport(PANIC,
788  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
789  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
790  crc, file_crc)));
791 
792  if (CloseTransientFile(fd) != 0)
793  ereport(PANIC,
795  errmsg("could not close file \"%s\": %m",
796  path)));
797 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr remote_lsn
Definition: origin.c:145
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:144
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LOG
Definition: elog.h:26
#define PANIC
Definition: elog.h:53
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
RepOriginId roident
Definition: origin.c:109
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2257
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:44
int CloseTransientFile(int fd)
Definition: fd.c:2434
#define REPLICATION_STATE_MAGIC
Definition: origin.c:178
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:114
#define Assert(condition)
Definition: c.h:732
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
static ReplicationState * replication_states
Definition: origin.c:167
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
#define read(a, b, c)
Definition: win32.h:13

Variable Documentation

◆ replication_states

ReplicationState* replication_states
static

Definition at line 167 of file origin.c.

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 168 of file origin.c.

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp

◆ session_replication_state

ReplicationState* session_replication_state = NULL
static

Definition at line 175 of file origin.c.