PostgreSQL Source Code  git master
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
RepOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId replorigin_create (const char *roname)
 
static void replorigin_drop_guts (Relation rel, RepOriginId roident, bool nowait)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 180 of file origin.c.

Referenced by CheckPointReplicationOrigin(), and StartupReplicationOrigin().

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 557 of file origin.c.

References CloseTransientFile(), COMP_CRC32C, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

558 {
559  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
560  const char *path = "pg_logical/replorigin_checkpoint";
561  int tmpfd;
562  int i;
564  pg_crc32c crc;
565 
566  if (max_replication_slots == 0)
567  return;
568 
569  INIT_CRC32C(crc);
570 
571  /* make sure no old temp file is remaining */
572  if (unlink(tmppath) < 0 && errno != ENOENT)
573  ereport(PANIC,
575  errmsg("could not remove file \"%s\": %m",
576  tmppath)));
577 
578  /*
579  * no other backend can perform this at the same time; only one checkpoint
580  * can happen at a time.
581  */
582  tmpfd = OpenTransientFile(tmppath,
583  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
584  if (tmpfd < 0)
585  ereport(PANIC,
587  errmsg("could not create file \"%s\": %m",
588  tmppath)));
589 
590  /* write magic */
591  errno = 0;
592  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
593  {
594  /* if write didn't set errno, assume problem is no disk space */
595  if (errno == 0)
596  errno = ENOSPC;
597  ereport(PANIC,
599  errmsg("could not write to file \"%s\": %m",
600  tmppath)));
601  }
602  COMP_CRC32C(crc, &magic, sizeof(magic));
603 
604  /* prevent concurrent creations/drops */
605  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
606 
607  /* write actual data */
608  for (i = 0; i < max_replication_slots; i++)
609  {
610  ReplicationStateOnDisk disk_state;
611  ReplicationState *curstate = &replication_states[i];
612  XLogRecPtr local_lsn;
613 
614  if (curstate->roident == InvalidRepOriginId)
615  continue;
616 
617  /* zero, to avoid uninitialized padding bytes */
618  memset(&disk_state, 0, sizeof(disk_state));
619 
620  LWLockAcquire(&curstate->lock, LW_SHARED);
621 
622  disk_state.roident = curstate->roident;
623 
624  disk_state.remote_lsn = curstate->remote_lsn;
625  local_lsn = curstate->local_lsn;
626 
627  LWLockRelease(&curstate->lock);
628 
629  /* make sure we only write out a commit that's persistent */
630  XLogFlush(local_lsn);
631 
632  errno = 0;
633  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
634  sizeof(disk_state))
635  {
636  /* if write didn't set errno, assume problem is no disk space */
637  if (errno == 0)
638  errno = ENOSPC;
639  ereport(PANIC,
641  errmsg("could not write to file \"%s\": %m",
642  tmppath)));
643  }
644 
645  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
646  }
647 
648  LWLockRelease(ReplicationOriginLock);
649 
650  /* write out the CRC */
651  FIN_CRC32C(crc);
652  errno = 0;
653  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
654  {
655  /* if write didn't set errno, assume problem is no disk space */
656  if (errno == 0)
657  errno = ENOSPC;
658  ereport(PANIC,
660  errmsg("could not write to file \"%s\": %m",
661  tmppath)));
662  }
663 
664  if (CloseTransientFile(tmpfd) != 0)
665  ereport(PANIC,
667  errmsg("could not close file \"%s\": %m",
668  tmppath)));
669 
670  /* fsync, rename to permanent file, fsync file and directory */
671  durable_rename(tmppath, path, PANIC);
672 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr local_lsn
Definition: origin.c:117
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_lsn
Definition: origin.c:141
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:140
#define PANIC
Definition: elog.h:50
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2881
#define PG_BINARY
Definition: c.h:1271
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2467
LWLock lock
Definition: origin.c:132
int errcode_for_file_access(void)
Definition: elog.c:721
unsigned int uint32
Definition: c.h:441
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:692
int CloseTransientFile(int fd)
Definition: fd.c:2644
#define REPLICATION_STATE_MAGIC
Definition: origin.c:180
#define ereport(elevel,...)
Definition: elog.h:157
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
static ReplicationState * replication_states
Definition: origin.c:165
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1421 of file origin.c.

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

1422 {
1423  text *name = PG_GETARG_TEXT_PP(0);
1424  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1425  RepOriginId node;
1426 
1427  replorigin_check_prerequisites(true, false);
1428 
1429  /* lock to prevent the replication origin from vanishing */
1430  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1431 
1432  node = replorigin_by_name(text_to_cstring(name), false);
1433 
1434  /*
1435  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1436  * xact hasn't committed yet. This is why this function should be used to
1437  * set up the initial replication state, but not for replay.
1438  */
1439  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1440  true /* go backward */ , true /* WAL log */ );
1441 
1442  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1443 
1444  PG_RETURN_VOID();
1445 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:200
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
uint16 RepOriginId
Definition: xlogdefs.h:65
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:872
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define RowExclusiveLock
Definition: lockdefs.h:38
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:349
uint64 XLogRecPtr
Definition: xlogdefs.h:21
const char * name
Definition: encode.c:515
char * text_to_cstring(const text *t)
Definition: varlena.c:223
Definition: c.h:621
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1238 of file origin.c.

References DatumGetPointer, elog, ereport, errcode(), errdetail(), errmsg(), ERROR, IsReservedName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), ReplicationState::roident, text_to_cstring(), and WARNING.

1239 {
1240  char *name;
1241  RepOriginId roident;
1242 
1243  replorigin_check_prerequisites(false, false);
1244 
1246 
1247  /* Replication origins "pg_xxx" are reserved for internal use */
1248  if (IsReservedName(name))
1249  ereport(ERROR,
1250  (errcode(ERRCODE_RESERVED_NAME),
1251  errmsg("replication origin name \"%s\" is reserved",
1252  name),
1253  errdetail("Origin names starting with \"pg_\" are reserved.")));
1254 
1255  /*
1256  * If built with appropriate switch, whine when regression-testing
1257  * conventions for replication origin names are violated.
1258  */
1259 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1260  if (strncmp(name, "regress_", 8) != 0)
1261  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1262 #endif
1263 
1264  roident = replorigin_create(name);
1265 
1266  pfree(name);
1267 
1268  PG_RETURN_OID(roident);
1269 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
uint16 RepOriginId
Definition: xlogdefs.h:65
int errcode(int sqlerrcode)
Definition: elog.c:698
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
bool IsReservedName(const char *name)
Definition: catalog.c:218
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ERROR
Definition: elog.h:46
int errdetail(const char *fmt,...)
Definition: elog.c:1042
#define WARNING
Definition: elog.h:40
#define ereport(elevel,...)
Definition: elog.h:157
const char * name
Definition: encode.c:515
#define DatumGetPointer(X)
Definition: postgres.h:593
char * text_to_cstring(const text *t)
Definition: varlena.c:223
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
Definition: c.h:621
#define PG_RETURN_OID(x)
Definition: fmgr.h:360

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1275 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

1276 {
1277  char *name;
1278 
1279  replorigin_check_prerequisites(false, false);
1280 
1282 
1283  replorigin_drop_by_name(name, false, true);
1284 
1285  pfree(name);
1286 
1287  PG_RETURN_VOID();
1288 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:414
void pfree(void *pointer)
Definition: mcxt.c:1169
#define PG_RETURN_VOID()
Definition: fmgr.h:349
const char * name
Definition: encode.c:515
#define DatumGetPointer(X)
Definition: postgres.h:593
char * text_to_cstring(const text *t)
Definition: varlena.c:223
Definition: c.h:621

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1294 of file origin.c.

References DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), ReplicationState::roident, and text_to_cstring().

1295 {
1296  char *name;
1297  RepOriginId roident;
1298 
1299  replorigin_check_prerequisites(false, false);
1300 
1302  roident = replorigin_by_name(name, true);
1303 
1304  pfree(name);
1305 
1306  if (OidIsValid(roident))
1307  PG_RETURN_OID(roident);
1308  PG_RETURN_NULL();
1309 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
uint16 RepOriginId
Definition: xlogdefs.h:65
#define OidIsValid(objectId)
Definition: c.h:710
void pfree(void *pointer)
Definition: mcxt.c:1169
const char * name
Definition: encode.c:515
#define DatumGetPointer(X)
Definition: postgres.h:593
char * text_to_cstring(const text *t)
Definition: varlena.c:223
Definition: c.h:621
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
#define PG_RETURN_NULL()
Definition: fmgr.h:345

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1456 of file origin.c.

References Assert, DatumGetPointer, InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), ReplicationState::roident, and text_to_cstring().

1457 {
1458  char *name;
1459  bool flush;
1460  RepOriginId roident;
1461  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1462 
1463  replorigin_check_prerequisites(true, true);
1464 
1466  flush = PG_GETARG_BOOL(1);
1467 
1468  roident = replorigin_by_name(name, false);
1469  Assert(OidIsValid(roident));
1470 
1471  remote_lsn = replorigin_get_progress(roident, flush);
1472 
1473  if (remote_lsn == InvalidXLogRecPtr)
1474  PG_RETURN_NULL();
1475 
1476  PG_RETURN_LSN(remote_lsn);
1477 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
uint16 RepOriginId
Definition: xlogdefs.h:65
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define OidIsValid(objectId)
Definition: c.h:710
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
const char * name
Definition: encode.c:515
#define DatumGetPointer(X)
Definition: postgres.h:593
char * text_to_cstring(const text *t)
Definition: varlena.c:223
Definition: c.h:621
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:998
#define PG_RETURN_NULL()
Definition: fmgr.h:345

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1354 of file origin.c.

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

1355 {
1356  replorigin_check_prerequisites(false, false);
1357 
1359 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
RepOriginId replorigin_session_origin
Definition: origin.c:154
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1370 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_check_prerequisites(), and replorigin_session_get_progress().

1371 {
1372  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1373  bool flush = PG_GETARG_BOOL(0);
1374 
1375  replorigin_check_prerequisites(true, false);
1376 
1377  if (session_replication_state == NULL)
1378  ereport(ERROR,
1379  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1380  errmsg("no replication origin is configured")));
1381 
1382  remote_lsn = replorigin_session_get_progress(flush);
1383 
1384  if (remote_lsn == InvalidXLogRecPtr)
1385  PG_RETURN_NULL();
1386 
1387  PG_RETURN_LSN(remote_lsn);
1388 }
static ReplicationState * session_replication_state
Definition: origin.c:177
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define PG_RETURN_NULL()
Definition: fmgr.h:345

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1337 of file origin.c.

References InvalidRepOriginId, InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and replorigin_session_reset().

1338 {
1339  replorigin_check_prerequisites(true, false);
1340 
1342 
1346 
1347  PG_RETURN_VOID();
1348 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void replorigin_session_reset(void)
Definition: origin.c:1159
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
#define PG_RETURN_VOID()
Definition: fmgr.h:349
RepOriginId replorigin_session_origin
Definition: origin.c:154
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1315 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

1316 {
1317  char *name;
1318  RepOriginId origin;
1319 
1320  replorigin_check_prerequisites(true, false);
1321 
1323  origin = replorigin_by_name(name, false);
1324  replorigin_session_setup(origin);
1325 
1326  replorigin_session_origin = origin;
1327 
1328  pfree(name);
1329 
1330  PG_RETURN_VOID();
1331 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
uint16 RepOriginId
Definition: xlogdefs.h:65
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
void pfree(void *pointer)
Definition: mcxt.c:1169
#define PG_RETURN_VOID()
Definition: fmgr.h:349
RepOriginId replorigin_session_origin
Definition: origin.c:154
const char * name
Definition: encode.c:515
#define DatumGetPointer(X)
Definition: postgres.h:593
char * text_to_cstring(const text *t)
Definition: varlena.c:223
Definition: c.h:621

◆ pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1409 of file origin.c.

References InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1410 {
1411  replorigin_check_prerequisites(true, false);
1412 
1415 
1416  PG_RETURN_VOID();
1417 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
#define PG_RETURN_VOID()
Definition: fmgr.h:349

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1391 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1392 {
1393  XLogRecPtr location = PG_GETARG_LSN(0);
1394 
1395  replorigin_check_prerequisites(true, false);
1396 
1397  if (session_replication_state == NULL)
1398  ereport(ERROR,
1399  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1400  errmsg("no replication origin is configured")));
1401 
1402  replorigin_session_origin_lsn = location;
1404 
1405  PG_RETURN_VOID();
1406 }
static ReplicationState * session_replication_state
Definition: origin.c:177
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
int errcode(int sqlerrcode)
Definition: elog.c:698
#define ERROR
Definition: elog.h:46
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define ereport(elevel,...)
Definition: elog.h:157
#define PG_RETURN_VOID()
Definition: fmgr.h:349
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1481 of file origin.c.

References ReturnSetInfo::allowedModes, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, InvalidRepOriginId, IsA, ReplicationState::local_lsn, ReplicationState::lock, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ObjectIdGetDatum, ReplicationState::remote_lsn, REPLICATION_ORIGIN_PROGRESS_COLS, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::returnMode, ReplicationState::roident, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1482 {
1483  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1484  TupleDesc tupdesc;
1485  Tuplestorestate *tupstore;
1486  MemoryContext per_query_ctx;
1487  MemoryContext oldcontext;
1488  int i;
1490 
1491  /* we want to return 0 rows if slot is set to zero */
1492  replorigin_check_prerequisites(false, true);
1493 
1494  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1495  ereport(ERROR,
1496  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1497  errmsg("set-valued function called in context that cannot accept a set")));
1498  if (!(rsinfo->allowedModes & SFRM_Materialize))
1499  ereport(ERROR,
1500  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1501  errmsg("materialize mode required, but it is not allowed in this context")));
1502  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1503  elog(ERROR, "return type must be a row type");
1504 
1505  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1506  elog(ERROR, "wrong function definition");
1507 
1508  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1509  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1510 
1511  tupstore = tuplestore_begin_heap(true, false, work_mem);
1512  rsinfo->returnMode = SFRM_Materialize;
1513  rsinfo->setResult = tupstore;
1514  rsinfo->setDesc = tupdesc;
1515 
1516  MemoryContextSwitchTo(oldcontext);
1517 
1518 
1519  /* prevent slots from being concurrently dropped */
1520  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1521 
1522  /*
1523  * Iterate through all possible replication_states, display if they are
1524  * filled. Note that we do not take any locks, so slightly corrupted/out
1525  * of date values are a possibility.
1526  */
1527  for (i = 0; i < max_replication_slots; i++)
1528  {
1532  char *roname;
1533 
1534  state = &replication_states[i];
1535 
1536  /* unused slot, nothing to display */
1537  if (state->roident == InvalidRepOriginId)
1538  continue;
1539 
1540  memset(values, 0, sizeof(values));
1541  memset(nulls, 1, sizeof(nulls));
1542 
1543  values[0] = ObjectIdGetDatum(state->roident);
1544  nulls[0] = false;
1545 
1546  /*
1547  * We're not preventing the origin to be dropped concurrently, so
1548  * silently accept that it might be gone.
1549  */
1550  if (replorigin_by_oid(state->roident, true,
1551  &roname))
1552  {
1553  values[1] = CStringGetTextDatum(roname);
1554  nulls[1] = false;
1555  }
1556 
1557  LWLockAcquire(&state->lock, LW_SHARED);
1558 
1559  values[2] = LSNGetDatum(state->remote_lsn);
1560  nulls[2] = false;
1561 
1562  values[3] = LSNGetDatum(state->local_lsn);
1563  nulls[3] = false;
1564 
1565  LWLockRelease(&state->lock);
1566 
1567  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1568  }
1569 
1570  tuplestore_donestoring(tupstore);
1571 
1572  LWLockRelease(ReplicationOriginLock);
1573 
1574 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1575 
1576  return (Datum) 0;
1577 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
XLogRecPtr local_lsn
Definition: origin.c:117
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:449
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
LWLock lock
Definition: origin.c:132
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
uintptr_t Datum
Definition: postgres.h:411
int work_mem
Definition: globals.c:124
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:305
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
Definition: regguts.h:317
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
#define InvalidRepOriginId
Definition: origin.h:33
Tuplestorestate * setResult
Definition: execnodes.h:310
static Datum values[MAXATTR]
Definition: bootstrap.c:166
ExprContext * econtext
Definition: execnodes.h:303
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int i
static ReplicationState * replication_states
Definition: origin.c:165
#define CStringGetTextDatum(s)
Definition: builtins.h:82
#define REPLICATION_ORIGIN_PROGRESS_COLS

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1039 of file origin.c.

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, and ReplicationState::origin_cv.

Referenced by replorigin_session_setup().

1040 {
1041  ConditionVariable *cv = NULL;
1042 
1043  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1044 
1045  if (session_replication_state != NULL &&
1047  {
1049 
1052  }
1053 
1054  LWLockRelease(ReplicationOriginLock);
1055 
1056  if (cv)
1058 }
static ReplicationState * session_replication_state
Definition: origin.c:177
int MyProcPid
Definition: globals.c:43
void ConditionVariableBroadcast(ConditionVariable *cv)
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 510 of file origin.c.

References ConditionVariableInit(), i, ReplicationState::lock, LWLockInitialize(), LWTRANCHE_REPLICATION_ORIGIN_STATE, max_replication_slots, MemSet, ReplicationState::origin_cv, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateSharedMemoryAndSemaphores().

511 {
512  bool found;
513 
514  if (max_replication_slots == 0)
515  return;
516 
518  ShmemInitStruct("ReplicationOriginState",
520  &found);
522 
523  if (!found)
524  {
525  int i;
526 
528 
530 
531  for (i = 0; i < max_replication_slots; i++)
532  {
536  }
537  }
538 }
#define MemSet(start, val, len)
Definition: c.h:1008
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:736
Size ReplicationOriginShmemSize(void)
Definition: origin.c:490
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:170
int max_replication_slots
Definition: slot.c:99
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:150
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 490 of file origin.c.

References add_size(), max_replication_slots, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and ReplicationOriginShmemInit().

491 {
492  Size size = 0;
493 
494  /*
495  * XXX: max_replication_slots is arguably the wrong thing to use, as here
496  * we keep the replay state of *remote* transactions. But for now it seems
497  * sufficient to reuse it, rather than introduce a separate GUC.
498  */
499  if (max_replication_slots == 0)
500  return size;
501 
502  size = add_size(size, offsetof(ReplicationStateCtl, states));
503 
504  size = add_size(size,
506  return size;
507 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
int max_replication_slots
Definition: slot.c:99
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 872 of file origin.c.

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

875 {
876  int i;
877  ReplicationState *replication_state = NULL;
878  ReplicationState *free_state = NULL;
879 
880  Assert(node != InvalidRepOriginId);
881 
882  /* we don't track DoNotReplicateId */
883  if (node == DoNotReplicateId)
884  return;
885 
886  /*
887  * XXX: For the case where this is called by WAL replay, it'd be more
888  * efficient to restore into a backend local hashtable and only dump into
889  * shmem after recovery is finished. Let's wait with implementing that
890  * till it's shown to be a measurable expense
891  */
892 
893  /* Lock exclusively, as we may have to create a new table entry. */
894  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
895 
896  /*
897  * Search for either an existing slot for the origin, or a free one we can
898  * use.
899  */
900  for (i = 0; i < max_replication_slots; i++)
901  {
902  ReplicationState *curstate = &replication_states[i];
903 
904  /* remember where to insert if necessary */
905  if (curstate->roident == InvalidRepOriginId &&
906  free_state == NULL)
907  {
908  free_state = curstate;
909  continue;
910  }
911 
912  /* not our slot */
913  if (curstate->roident != node)
914  {
915  continue;
916  }
917 
918  /* ok, found slot */
919  replication_state = curstate;
920 
921  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
922 
923  /* Make sure it's not used by somebody else */
924  if (replication_state->acquired_by != 0)
925  {
926  ereport(ERROR,
927  (errcode(ERRCODE_OBJECT_IN_USE),
928  errmsg("replication origin with OID %d is already active for PID %d",
929  replication_state->roident,
930  replication_state->acquired_by)));
931  }
932 
933  break;
934  }
935 
936  if (replication_state == NULL && free_state == NULL)
937  ereport(ERROR,
938  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
939  errmsg("could not find free replication state slot for replication origin with OID %u",
940  node),
941  errhint("Increase max_replication_slots and try again.")));
942 
943  if (replication_state == NULL)
944  {
945  /* initialize new slot */
946  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
947  replication_state = free_state;
948  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
949  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
950  replication_state->roident = node;
951  }
952 
953  Assert(replication_state->roident != InvalidRepOriginId);
954 
955  /*
956  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
957  * and the standby gets the message. Primarily this will be called during
958  * WAL replay (of commit records) where no WAL logging is necessary.
959  */
960  if (wal_log)
961  {
962  xl_replorigin_set xlrec;
963 
964  xlrec.remote_lsn = remote_commit;
965  xlrec.node_id = node;
966  xlrec.force = go_backward;
967 
968  XLogBeginInsert();
969  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
970 
971  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
972  }
973 
974  /*
975  * Due to - harmless - race conditions during a checkpoint we could see
976  * values here that are older than the ones we already have in memory. We
977  * could also see older values for prepared transactions when the prepare
978  * is sent at a later point of time along with commit prepared and there
979  * are other transactions commits between prepare and commit prepared. See
980  * ReorderBufferFinishPrepared. Don't overwrite those.
981  */
982  if (go_backward || replication_state->remote_lsn < remote_commit)
983  replication_state->remote_lsn = remote_commit;
984  if (local_commit != InvalidXLogRecPtr &&
985  (go_backward || replication_state->local_lsn < local_commit))
986  replication_state->local_lsn = local_commit;
987  LWLockRelease(&replication_state->lock);
988 
989  /*
990  * Release *after* changing the LSNs, slot isn't acquired and thus could
991  * otherwise be dropped anytime.
992  */
993  LWLockRelease(ReplicationOriginLock);
994 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int errhint(const char *fmt,...)
Definition: elog.c:1156
XLogRecPtr local_lsn
Definition: origin.c:117
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:698
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define ERROR
Definition: elog.h:46
LWLock lock
Definition: origin.c:132
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:340
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:432
#define ereport(elevel,...)
Definition: elog.h:157
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:804
RepOriginId node_id
Definition: origin.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
static ReplicationState * replication_states
Definition: origin.c:165
void XLogBeginInsert(void)
Definition: xloginsert.c:135

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( const char *  roname,
bool  missing_ok 
)

Definition at line 209 of file origin.c.

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidOid, ReleaseSysCache(), REPLORIGNAME, ReplicationState::roident, and SearchSysCache1().

Referenced by ApplyWorkerMain(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), and replorigin_drop_by_name().

210 {
212  Oid roident = InvalidOid;
213  HeapTuple tuple;
214  Datum roname_d;
215 
216  roname_d = CStringGetTextDatum(roname);
217 
218  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
219  if (HeapTupleIsValid(tuple))
220  {
221  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
222  roident = ident->roident;
223  ReleaseSysCache(tuple);
224  }
225  else if (!missing_ok)
226  ereport(ERROR,
227  (errcode(ERRCODE_UNDEFINED_OBJECT),
228  errmsg("replication origin \"%s\" does not exist",
229  roname)));
230 
231  return roident;
232 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
int errcode(int sqlerrcode)
Definition: elog.c:698
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:46
FormData_pg_replication_origin * Form_pg_replication_origin
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
uintptr_t Datum
Definition: postgres.h:411
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define CStringGetTextDatum(s)
Definition: builtins.h:82

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 449 of file origin.c.

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum, OidIsValid, ReleaseSysCache(), REPLORIGIDENT, SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and send_repl_origin().

450 {
451  HeapTuple tuple;
453 
454  Assert(OidIsValid((Oid) roident));
455  Assert(roident != InvalidRepOriginId);
456  Assert(roident != DoNotReplicateId);
457 
459  ObjectIdGetDatum((Oid) roident));
460 
461  if (HeapTupleIsValid(tuple))
462  {
463  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
464  *roname = text_to_cstring(&ric->roname);
465  ReleaseSysCache(tuple);
466 
467  return true;
468  }
469  else
470  {
471  *roname = NULL;
472 
473  if (!missing_ok)
474  ereport(ERROR,
475  (errcode(ERRCODE_UNDEFINED_OBJECT),
476  errmsg("replication origin with OID %u does not exist",
477  roident)));
478 
479  return false;
480  }
481 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:698
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:710
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
FormData_pg_replication_origin * Form_pg_replication_origin
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define ereport(elevel,...)
Definition: elog.h:157
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:804
#define InvalidRepOriginId
Definition: origin.h:33
char * text_to_cstring(const text *t)
Definition: varlena.c:223
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ replorigin_check_prerequisites()

static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 183 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, and RecoveryInProgress().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

184 {
185  if (check_slots && max_replication_slots == 0)
186  ereport(ERROR,
187  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
189 
190  if (!recoveryOK && RecoveryInProgress())
191  ereport(ERROR,
192  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
193  errmsg("cannot manipulate replication origins during recovery")));
194 
195 }
int errcode(int sqlerrcode)
Definition: elog.c:698
bool RecoveryInProgress(void)
Definition: xlog.c:8248
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
int max_replication_slots
Definition: slot.c:99
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ replorigin_create()

RepOriginId replorigin_create ( const char *  roname)

Definition at line 240 of file origin.c.

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum, PG_UINT16_MAX, RelationGetDescr, ReplicationState::roident, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by ApplyWorkerMain(), CreateSubscription(), LogicalRepSyncTableStart(), and pg_replication_origin_create().

241 {
242  Oid roident;
243  HeapTuple tuple = NULL;
244  Relation rel;
245  Datum roname_d;
246  SnapshotData SnapshotDirty;
247  SysScanDesc scan;
249 
250  roname_d = CStringGetTextDatum(roname);
251 
253 
254  /*
255  * We need the numeric replication origin to be 16bit wide, so we cannot
256  * rely on the normal oid allocation. Instead we simply scan
257  * pg_replication_origin for the first unused id. That's not particularly
258  * efficient, but this should be a fairly infrequent operation - we can
259  * easily spend a bit more code on this when it turns out it needs to be
260  * faster.
261  *
262  * We handle concurrency by taking an exclusive lock (allowing reads!)
263  * over the table for the duration of the search. Because we use a "dirty
264  * snapshot" we can read rows that other in-progress sessions have
265  * written, even though they would be invisible with normal snapshots. Due
266  * to the exclusive lock there's no danger that new rows can appear while
267  * we're checking.
268  */
269  InitDirtySnapshot(SnapshotDirty);
270 
271  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
272 
273  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
274  {
275  bool nulls[Natts_pg_replication_origin];
276  Datum values[Natts_pg_replication_origin];
277  bool collides;
278 
280 
281  ScanKeyInit(&key,
282  Anum_pg_replication_origin_roident,
283  BTEqualStrategyNumber, F_OIDEQ,
284  ObjectIdGetDatum(roident));
285 
286  scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
287  true /* indexOK */ ,
288  &SnapshotDirty,
289  1, &key);
290 
291  collides = HeapTupleIsValid(systable_getnext(scan));
292 
293  systable_endscan(scan);
294 
295  if (!collides)
296  {
297  /*
298  * Ok, found an unused roident, insert the new row and do a CCI,
299  * so our callers can look it up if they want to.
300  */
301  memset(&nulls, 0, sizeof(nulls));
302 
303  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
304  values[Anum_pg_replication_origin_roname - 1] = roname_d;
305 
306  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
307  CatalogTupleInsert(rel, tuple);
309  break;
310  }
311  }
312 
313  /* now release lock again, */
315 
316  if (tuple == NULL)
317  ereport(ERROR,
318  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
319  errmsg("could not find free replication origin OID")));
320 
321  heap_freetuple(tuple);
322  return roident;
323 }
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:74
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:595
#define RelationGetDescr(relation)
Definition: rel.h:503
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:698
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:383
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:502
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
#define PG_UINT16_MAX
Definition: c.h:522
uintptr_t Datum
Definition: postgres.h:411
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:804
bool IsTransactionState(void)
Definition: xact.c:371
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:82
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
#define BTEqualStrategyNumber
Definition: stratnum.h:31

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char *  name,
bool  missing_ok,
bool  nowait 
)

Definition at line 414 of file origin.c.

References Assert, ExclusiveLock, IsTransactionState(), NoLock, OidIsValid, replorigin_by_name(), replorigin_drop_guts(), ReplicationState::roident, table_close(), and table_open().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), and process_syncing_tables_for_apply().

415 {
416  RepOriginId roident;
417  Relation rel;
418 
420 
421  /*
422  * To interlock against concurrent drops, we hold ExclusiveLock on
423  * pg_replication_origin till xact commit.
424  *
425  * XXX We can optimize this by acquiring the lock on a specific origin by
426  * using LockSharedObject if required. However, for that, we first to
427  * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
428  * the specific origin and then re-check if the origin still exists.
429  */
430  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
431 
432  roident = replorigin_by_name(name, missing_ok);
433 
434  if (OidIsValid(roident))
435  replorigin_drop_guts(rel, roident, nowait);
436 
437  /* We keep the lock on pg_replication_origin until commit */
438  table_close(rel, NoLock);
439 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define ExclusiveLock
Definition: lockdefs.h:44
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
uint16 RepOriginId
Definition: xlogdefs.h:65
#define OidIsValid(objectId)
Definition: c.h:710
static void replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
Definition: origin.c:329
#define NoLock
Definition: lockdefs.h:34
#define Assert(condition)
Definition: c.h:804
bool IsTransactionState(void)
Definition: xact.c:371
const char * name
Definition: encode.c:515
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ replorigin_drop_guts()

static void replorigin_drop_guts ( Relation  rel,
RepOriginId  roident,
bool  nowait 
)
static

Definition at line 329 of file origin.c.

References ReplicationState::acquired_by, CatalogTupleDelete(), CommandCounterIncrement(), ConditionVariableCancelSleep(), ConditionVariableSleep(), elog, ereport, errcode(), errmsg(), ERROR, HeapTupleIsValid, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, ObjectIdGetDatum, ReplicationState::origin_cv, ReleaseSysCache(), ReplicationState::remote_lsn, REPLORIGIDENT, ReplicationState::roident, SearchSysCache1(), HeapTupleData::t_self, WAIT_EVENT_REPLICATION_ORIGIN_DROP, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

330 {
331  HeapTuple tuple;
332  int i;
333 
334  /*
335  * First, clean up the slot state info, if there is any matching slot.
336  */
337 restart:
338  tuple = NULL;
339  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
340 
341  for (i = 0; i < max_replication_slots; i++)
342  {
344 
345  if (state->roident == roident)
346  {
347  /* found our slot, is it busy? */
348  if (state->acquired_by != 0)
349  {
350  ConditionVariable *cv;
351 
352  if (nowait)
353  ereport(ERROR,
354  (errcode(ERRCODE_OBJECT_IN_USE),
355  errmsg("could not drop replication origin with OID %d, in use by PID %d",
356  state->roident,
357  state->acquired_by)));
358 
359  /*
360  * We must wait and then retry. Since we don't know which CV
361  * to wait on until here, we can't readily use
362  * ConditionVariablePrepareToSleep (calling it here would be
363  * wrong, since we could miss the signal if we did so); just
364  * use ConditionVariableSleep directly.
365  */
366  cv = &state->origin_cv;
367 
368  LWLockRelease(ReplicationOriginLock);
369 
371  goto restart;
372  }
373 
374  /* first make a WAL log entry */
375  {
376  xl_replorigin_drop xlrec;
377 
378  xlrec.node_id = roident;
379  XLogBeginInsert();
380  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
381  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
382  }
383 
384  /* then clear the in-memory slot */
385  state->roident = InvalidRepOriginId;
386  state->remote_lsn = InvalidXLogRecPtr;
387  state->local_lsn = InvalidXLogRecPtr;
388  break;
389  }
390  }
391  LWLockRelease(ReplicationOriginLock);
393 
394  /*
395  * Now, we can delete the catalog entry.
396  */
398  if (!HeapTupleIsValid(tuple))
399  elog(ERROR, "cache lookup failed for replication origin with oid %u",
400  roident);
401 
402  CatalogTupleDelete(rel, &tuple->t_self);
403  ReleaseSysCache(tuple);
404 
406 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
int errcode(int sqlerrcode)
Definition: elog.c:698
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
void ConditionVariableCancelSleep(void)
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
ItemPointerData t_self
Definition: htup.h:65
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:340
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:432
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
void CommandCounterIncrement(void)
Definition: xact.c:1021
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define ereport(elevel,...)
Definition: elog.h:157
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:110
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
RepOriginId node_id
Definition: origin.h:27
Definition: regguts.h:317
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int i
static ReplicationState * replication_states
Definition: origin.c:165
void XLogBeginInsert(void)
Definition: xloginsert.c:135

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 998 of file origin.c.

References i, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::remote_lsn, ReplicationState::roident, and XLogFlush().

Referenced by pg_replication_origin_progress().

999 {
1000  int i;
1001  XLogRecPtr local_lsn = InvalidXLogRecPtr;
1002  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1003 
1004  /* prevent slots from being concurrently dropped */
1005  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1006 
1007  for (i = 0; i < max_replication_slots; i++)
1008  {
1010 
1011  state = &replication_states[i];
1012 
1013  if (state->roident == node)
1014  {
1015  LWLockAcquire(&state->lock, LW_SHARED);
1016 
1017  remote_lsn = state->remote_lsn;
1018  local_lsn = state->local_lsn;
1019 
1020  LWLockRelease(&state->lock);
1021 
1022  break;
1023  }
1024  }
1025 
1026  LWLockRelease(ReplicationOriginLock);
1027 
1028  if (flush && local_lsn != InvalidXLogRecPtr)
1029  XLogFlush(local_lsn);
1030 
1031  return remote_lsn;
1032 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2881
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
LWLock lock
Definition: origin.c:132
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 811 of file origin.c.

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, replorigin_advance(), ReplicationState::roident, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

812 {
813  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
814 
815  switch (info)
816  {
817  case XLOG_REPLORIGIN_SET:
818  {
819  xl_replorigin_set *xlrec =
820  (xl_replorigin_set *) XLogRecGetData(record);
821 
823  xlrec->remote_lsn, record->EndRecPtr,
824  xlrec->force /* backward */ ,
825  false /* WAL log */ );
826  break;
827  }
829  {
830  xl_replorigin_drop *xlrec;
831  int i;
832 
833  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
834 
835  for (i = 0; i < max_replication_slots; i++)
836  {
838 
839  /* found our slot */
840  if (state->roident == xlrec->node_id)
841  {
842  /* reset entry */
843  state->roident = InvalidRepOriginId;
844  state->remote_lsn = InvalidXLogRecPtr;
845  state->local_lsn = InvalidXLogRecPtr;
846  break;
847  }
848  }
849  break;
850  }
851  default:
852  elog(PANIC, "replorigin_redo: unknown op code %u", info);
853  }
854 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
unsigned char uint8
Definition: c.h:439
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:872
#define PANIC
Definition: elog.h:50
RepOriginId roident
Definition: origin.c:105
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
RepOriginId node_id
Definition: origin.h:27
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
Definition: regguts.h:317
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
#define elog(elevel,...)
Definition: elog.h:232
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ replorigin_session_advance()

void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1188 of file origin.c.

References Assert, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and ReplicationState::roident.

Referenced by EndPrepare(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

1189 {
1192 
1194  if (session_replication_state->local_lsn < local_commit)
1195  session_replication_state->local_lsn = local_commit;
1196  if (session_replication_state->remote_lsn < remote_commit)
1197  session_replication_state->remote_lsn = remote_commit;
1199 }
static ReplicationState * session_replication_state
Definition: origin.c:177
XLogRecPtr local_lsn
Definition: origin.c:117
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
LWLock lock
Definition: origin.c:132
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
#define InvalidRepOriginId
Definition: origin.h:33

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1206 of file origin.c.

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and XLogFlush().

Referenced by ApplyWorkerMain(), LogicalRepSyncTableStart(), and pg_replication_origin_session_progress().

1207 {
1208  XLogRecPtr remote_lsn;
1209  XLogRecPtr local_lsn;
1210 
1212 
1214  remote_lsn = session_replication_state->remote_lsn;
1215  local_lsn = session_replication_state->local_lsn;
1217 
1218  if (flush && local_lsn != InvalidXLogRecPtr)
1219  XLogFlush(local_lsn);
1220 
1221  return remote_lsn;
1222 }
static ReplicationState * session_replication_state
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2881
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
LWLock lock
Definition: origin.c:132
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1159 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, and ReplicationState::origin_cv.

Referenced by pg_replication_origin_session_reset().

1160 {
1161  ConditionVariable *cv;
1162 
1164 
1165  if (session_replication_state == NULL)
1166  ereport(ERROR,
1167  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1168  errmsg("no replication origin is configured")));
1169 
1170  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1171 
1175 
1176  LWLockRelease(ReplicationOriginLock);
1177 
1179 }
static ReplicationState * session_replication_state
Definition: origin.c:177
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:698
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
int max_replication_slots
Definition: slot.c:99
#define Assert(condition)
Definition: c.h:804
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node)

Definition at line 1071 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, ReplicationOriginExitCleanup(), and ReplicationState::roident.

Referenced by ApplyWorkerMain(), LogicalRepSyncTableStart(), and pg_replication_origin_session_setup().

1072 {
1073  static bool registered_cleanup;
1074  int i;
1075  int free_slot = -1;
1076 
1077  if (!registered_cleanup)
1078  {
1080  registered_cleanup = true;
1081  }
1082 
1084 
1085  if (session_replication_state != NULL)
1086  ereport(ERROR,
1087  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1088  errmsg("cannot setup replication origin when one is already setup")));
1089 
1090  /* Lock exclusively, as we may have to create a new table entry. */
1091  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1092 
1093  /*
1094  * Search for either an existing slot for the origin, or a free one we can
1095  * use.
1096  */
1097  for (i = 0; i < max_replication_slots; i++)
1098  {
1099  ReplicationState *curstate = &replication_states[i];
1100 
1101  /* remember where to insert if necessary */
1102  if (curstate->roident == InvalidRepOriginId &&
1103  free_slot == -1)
1104  {
1105  free_slot = i;
1106  continue;
1107  }
1108 
1109  /* not our slot */
1110  if (curstate->roident != node)
1111  continue;
1112 
1113  else if (curstate->acquired_by != 0)
1114  {
1115  ereport(ERROR,
1116  (errcode(ERRCODE_OBJECT_IN_USE),
1117  errmsg("replication origin with OID %d is already active for PID %d",
1118  curstate->roident, curstate->acquired_by)));
1119  }
1120 
1121  /* ok, found slot */
1122  session_replication_state = curstate;
1123  }
1124 
1125 
1126  if (session_replication_state == NULL && free_slot == -1)
1127  ereport(ERROR,
1128  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1129  errmsg("could not find free replication state slot for replication origin with OID %u",
1130  node),
1131  errhint("Increase max_replication_slots and try again.")));
1132  else if (session_replication_state == NULL)
1133  {
1134  /* initialize new slot */
1139  }
1140 
1141 
1143 
1145 
1146  LWLockRelease(ReplicationOriginLock);
1147 
1148  /* probably this one is pointless */
1150 }
static ReplicationState * session_replication_state
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:43
int errhint(const char *fmt,...)
Definition: elog.c:1156
XLogRecPtr local_lsn
Definition: origin.c:117
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1039
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:698
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define ERROR
Definition: elog.h:46
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
#define ereport(elevel,...)
Definition: elog.h:157
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:804
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 683 of file origin.c.

References Assert, CloseTransientFile(), COMP_CRC32C, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

684 {
685  const char *path = "pg_logical/replorigin_checkpoint";
686  int fd;
687  int readBytes;
689  int last_state = 0;
690  pg_crc32c file_crc;
691  pg_crc32c crc;
692 
693  /* don't want to overwrite already existing state */
694 #ifdef USE_ASSERT_CHECKING
695  static bool already_started = false;
696 
697  Assert(!already_started);
698  already_started = true;
699 #endif
700 
701  if (max_replication_slots == 0)
702  return;
703 
704  INIT_CRC32C(crc);
705 
706  elog(DEBUG2, "starting up replication origin progress state");
707 
708  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
709 
710  /*
711  * might have had max_replication_slots == 0 last run, or we just brought
712  * up a standby.
713  */
714  if (fd < 0 && errno == ENOENT)
715  return;
716  else if (fd < 0)
717  ereport(PANIC,
719  errmsg("could not open file \"%s\": %m",
720  path)));
721 
722  /* verify magic, that is written even if nothing was active */
723  readBytes = read(fd, &magic, sizeof(magic));
724  if (readBytes != sizeof(magic))
725  {
726  if (readBytes < 0)
727  ereport(PANIC,
729  errmsg("could not read file \"%s\": %m",
730  path)));
731  else
732  ereport(PANIC,
734  errmsg("could not read file \"%s\": read %d of %zu",
735  path, readBytes, sizeof(magic))));
736  }
737  COMP_CRC32C(crc, &magic, sizeof(magic));
738 
739  if (magic != REPLICATION_STATE_MAGIC)
740  ereport(PANIC,
741  (errmsg("replication checkpoint has wrong magic %u instead of %u",
742  magic, REPLICATION_STATE_MAGIC)));
743 
744  /* we can skip locking here, no other access is possible */
745 
746  /* recover individual states, until there are no more to be found */
747  while (true)
748  {
749  ReplicationStateOnDisk disk_state;
750 
751  readBytes = read(fd, &disk_state, sizeof(disk_state));
752 
753  /* no further data */
754  if (readBytes == sizeof(crc))
755  {
756  /* not pretty, but simple ... */
757  file_crc = *(pg_crc32c *) &disk_state;
758  break;
759  }
760 
761  if (readBytes < 0)
762  {
763  ereport(PANIC,
765  errmsg("could not read file \"%s\": %m",
766  path)));
767  }
768 
769  if (readBytes != sizeof(disk_state))
770  {
771  ereport(PANIC,
773  errmsg("could not read file \"%s\": read %d of %zu",
774  path, readBytes, sizeof(disk_state))));
775  }
776 
777  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
778 
779  if (last_state == max_replication_slots)
780  ereport(PANIC,
781  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
782  errmsg("could not find free replication state, increase max_replication_slots")));
783 
784  /* copy data to shared memory */
785  replication_states[last_state].roident = disk_state.roident;
786  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
787  last_state++;
788 
789  ereport(LOG,
790  (errmsg("recovered replication state of node %u to %X/%X",
791  disk_state.roident,
792  LSN_FORMAT_ARGS(disk_state.remote_lsn))));
793  }
794 
795  /* now check checksum */
796  FIN_CRC32C(crc);
797  if (file_crc != crc)
798  ereport(PANIC,
799  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
800  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
801  crc, file_crc)));
802 
803  if (CloseTransientFile(fd) != 0)
804  ereport(PANIC,
806  errmsg("could not close file \"%s\": %m",
807  path)));
808 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr remote_lsn
Definition: origin.c:141
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:140
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
#define PANIC
Definition: elog.h:50
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
RepOriginId roident
Definition: origin.c:105
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2467
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:721
unsigned int uint32
Definition: c.h:441
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2644
#define REPLICATION_STATE_MAGIC
Definition: origin.c:180
#define ereport(elevel,...)
Definition: elog.h:157
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:804
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
static ReplicationState * replication_states
Definition: origin.c:165
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
#define read(a, b, c)
Definition: win32.h:13

Variable Documentation

◆ replication_states

ReplicationState* replication_states
static

Definition at line 165 of file origin.c.

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 170 of file origin.c.

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp

◆ session_replication_state

ReplicationState* session_replication_state = NULL
static

Definition at line 177 of file origin.c.