PostgreSQL Source Code  git master
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
RepOriginId replorigin_by_name (char *roname, bool missing_ok)
 
RepOriginId replorigin_create (char *roname)
 
void replorigin_drop (RepOriginId roident, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 174 of file origin.c.

Referenced by CheckPointReplicationOrigin(), and StartupReplicationOrigin().

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 542 of file origin.c.

References CloseTransientFile(), COMP_CRC32C, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

543 {
544  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
545  const char *path = "pg_logical/replorigin_checkpoint";
546  int tmpfd;
547  int i;
549  pg_crc32c crc;
550 
551  if (max_replication_slots == 0)
552  return;
553 
554  INIT_CRC32C(crc);
555 
556  /* make sure no old temp file is remaining */
557  if (unlink(tmppath) < 0 && errno != ENOENT)
558  ereport(PANIC,
560  errmsg("could not remove file \"%s\": %m",
561  tmppath)));
562 
563  /*
564  * no other backend can perform this at the same time, we're protected by
565  * CheckpointLock.
566  */
567  tmpfd = OpenTransientFile(tmppath,
568  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
569  if (tmpfd < 0)
570  ereport(PANIC,
572  errmsg("could not create file \"%s\": %m",
573  tmppath)));
574 
575  /* write magic */
576  errno = 0;
577  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
578  {
579  /* if write didn't set errno, assume problem is no disk space */
580  if (errno == 0)
581  errno = ENOSPC;
582  ereport(PANIC,
584  errmsg("could not write to file \"%s\": %m",
585  tmppath)));
586  }
587  COMP_CRC32C(crc, &magic, sizeof(magic));
588 
589  /* prevent concurrent creations/drops */
590  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
591 
592  /* write actual data */
593  for (i = 0; i < max_replication_slots; i++)
594  {
595  ReplicationStateOnDisk disk_state;
596  ReplicationState *curstate = &replication_states[i];
597  XLogRecPtr local_lsn;
598 
599  if (curstate->roident == InvalidRepOriginId)
600  continue;
601 
602  /* zero, to avoid uninitialized padding bytes */
603  memset(&disk_state, 0, sizeof(disk_state));
604 
605  LWLockAcquire(&curstate->lock, LW_SHARED);
606 
607  disk_state.roident = curstate->roident;
608 
609  disk_state.remote_lsn = curstate->remote_lsn;
610  local_lsn = curstate->local_lsn;
611 
612  LWLockRelease(&curstate->lock);
613 
614  /* make sure we only write out a commit that's persistent */
615  XLogFlush(local_lsn);
616 
617  errno = 0;
618  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
619  sizeof(disk_state))
620  {
621  /* if write didn't set errno, assume problem is no disk space */
622  if (errno == 0)
623  errno = ENOSPC;
624  ereport(PANIC,
626  errmsg("could not write to file \"%s\": %m",
627  tmppath)));
628  }
629 
630  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
631  }
632 
633  LWLockRelease(ReplicationOriginLock);
634 
635  /* write out the CRC */
636  FIN_CRC32C(crc);
637  errno = 0;
638  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
639  {
640  /* if write didn't set errno, assume problem is no disk space */
641  if (errno == 0)
642  errno = ENOSPC;
643  ereport(PANIC,
645  errmsg("could not write to file \"%s\": %m",
646  tmppath)));
647  }
648 
649  if (CloseTransientFile(tmpfd) != 0)
650  ereport(PANIC,
652  errmsg("could not close file \"%s\": %m",
653  tmppath)));
654 
655  /* fsync, rename to permanent file, fsync file and directory */
656  durable_rename(tmppath, path, PANIC);
657 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr local_lsn
Definition: origin.c:117
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_lsn
Definition: origin.c:141
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:140
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2805
#define PG_BINARY
Definition: c.h:1222
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
LWLock lock
Definition: origin.c:132
int errcode_for_file_access(void)
Definition: elog.c:631
unsigned int uint32
Definition: c.h:359
#define ereport(elevel, rest)
Definition: elog.h:141
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:643
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define REPLICATION_STATE_MAGIC
Definition: origin.c:174
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:822
int i
static ReplicationState * replication_states
Definition: origin.c:163
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1407 of file origin.c.

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

1408 {
1409  text *name = PG_GETARG_TEXT_PP(0);
1410  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1411  RepOriginId node;
1412 
1413  replorigin_check_prerequisites(true, false);
1414 
1415  /* lock to prevent the replication origin from vanishing */
1416  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1417 
1418  node = replorigin_by_name(text_to_cstring(name), false);
1419 
1420  /*
1421  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1422  * xact hasn't committed yet. This is why this function should be used to
1423  * set up the initial replication state, but not for replay.
1424  */
1425  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1426  true /* go backward */ , true /* WAL log */ );
1427 
1428  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1429 
1430  PG_RETURN_VOID();
1431 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:199
uint16 RepOriginId
Definition: xlogdefs.h:58
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:857
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:303
#define RowExclusiveLock
Definition: lockdefs.h:38
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:339
uint64 XLogRecPtr
Definition: xlogdefs.h:21
const char * name
Definition: encode.c:521
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:556
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:108

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1220 of file origin.c.

References DatumGetPointer, elog, ereport, errcode(), errdetail(), errmsg(), ERROR, IsReservedName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), ReplicationState::roident, text_to_cstring(), and WARNING.

1221 {
1222  char *name;
1223  RepOriginId roident;
1224 
1225  replorigin_check_prerequisites(false, false);
1226 
1228 
1229  /* Replication origins "pg_xxx" are reserved for internal use */
1230  if (IsReservedName(name))
1231  ereport(ERROR,
1232  (errcode(ERRCODE_RESERVED_NAME),
1233  errmsg("replication origin name \"%s\" is reserved",
1234  name),
1235  errdetail("Origin names starting with \"pg_\" are reserved.")));
1236 
1237  /*
1238  * If built with appropriate switch, whine when regression-testing
1239  * conventions for replication origin names are violated.
1240  */
1241 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1242  if (strncmp(name, "regress_", 8) != 0)
1243  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1244 #endif
1245 
1246  roident = replorigin_create(name);
1247 
1248  pfree(name);
1249 
1250  PG_RETURN_OID(roident);
1251 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
int errcode(int sqlerrcode)
Definition: elog.c:608
bool IsReservedName(const char *name)
Definition: catalog.c:213
void pfree(void *pointer)
Definition: mcxt.c:1056
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:955
RepOriginId replorigin_create(char *roname)
Definition: origin.c:239
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
Definition: c.h:556
#define PG_RETURN_OID(x)
Definition: fmgr.h:350

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1257 of file origin.c.

References Assert, DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_drop(), ReplicationState::roident, and text_to_cstring().

1258 {
1259  char *name;
1260  RepOriginId roident;
1261 
1262  replorigin_check_prerequisites(false, false);
1263 
1265 
1266  roident = replorigin_by_name(name, false);
1267  Assert(OidIsValid(roident));
1268 
1269  replorigin_drop(roident, true);
1270 
1271  pfree(name);
1272 
1273  PG_RETURN_VOID();
1274 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:331
uint16 RepOriginId
Definition: xlogdefs.h:58
#define OidIsValid(objectId)
Definition: c.h:645
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
void pfree(void *pointer)
Definition: mcxt.c:1056
#define PG_RETURN_VOID()
Definition: fmgr.h:339
#define Assert(condition)
Definition: c.h:739
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:556

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1280 of file origin.c.

References DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), ReplicationState::roident, and text_to_cstring().

1281 {
1282  char *name;
1283  RepOriginId roident;
1284 
1285  replorigin_check_prerequisites(false, false);
1286 
1288  roident = replorigin_by_name(name, true);
1289 
1290  pfree(name);
1291 
1292  if (OidIsValid(roident))
1293  PG_RETURN_OID(roident);
1294  PG_RETURN_NULL();
1295 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
#define OidIsValid(objectId)
Definition: c.h:645
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
void pfree(void *pointer)
Definition: mcxt.c:1056
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:556
#define PG_RETURN_OID(x)
Definition: fmgr.h:350
#define PG_RETURN_NULL()
Definition: fmgr.h:335

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1442 of file origin.c.

References Assert, DatumGetPointer, InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), ReplicationState::roident, and text_to_cstring().

1443 {
1444  char *name;
1445  bool flush;
1446  RepOriginId roident;
1447  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1448 
1449  replorigin_check_prerequisites(true, true);
1450 
1452  flush = PG_GETARG_BOOL(1);
1453 
1454  roident = replorigin_by_name(name, false);
1455  Assert(OidIsValid(roident));
1456 
1457  remote_lsn = replorigin_get_progress(roident, flush);
1458 
1459  if (remote_lsn == InvalidXLogRecPtr)
1460  PG_RETURN_NULL();
1461 
1462  PG_RETURN_LSN(remote_lsn);
1463 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
#define OidIsValid(objectId)
Definition: c.h:645
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:556
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:980
#define PG_RETURN_NULL()
Definition: fmgr.h:335

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1340 of file origin.c.

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

1341 {
1342  replorigin_check_prerequisites(false, false);
1343 
1345 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:349
RepOriginId replorigin_session_origin
Definition: origin.c:152
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1356 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_check_prerequisites(), and replorigin_session_get_progress().

1357 {
1358  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1359  bool flush = PG_GETARG_BOOL(0);
1360 
1361  replorigin_check_prerequisites(true, false);
1362 
1363  if (session_replication_state == NULL)
1364  ereport(ERROR,
1365  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1366  errmsg("no replication origin is configured")));
1367 
1368  remote_lsn = replorigin_session_get_progress(flush);
1369 
1370  if (remote_lsn == InvalidXLogRecPtr)
1371  PG_RETURN_NULL();
1372 
1373  PG_RETURN_LSN(remote_lsn);
1374 }
static ReplicationState * session_replication_state
Definition: origin.c:171
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1188
int errcode(int sqlerrcode)
Definition: elog.c:608
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define PG_RETURN_NULL()
Definition: fmgr.h:335

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1323 of file origin.c.

References InvalidRepOriginId, InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and replorigin_session_reset().

1324 {
1325  replorigin_check_prerequisites(true, false);
1326 
1328 
1332 
1333  PG_RETURN_VOID();
1334 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void replorigin_session_reset(void)
Definition: origin.c:1141
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:153
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:154
#define PG_RETURN_VOID()
Definition: fmgr.h:339
RepOriginId replorigin_session_origin
Definition: origin.c:152
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1301 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

1302 {
1303  char *name;
1304  RepOriginId origin;
1305 
1306  replorigin_check_prerequisites(true, false);
1307 
1309  origin = replorigin_by_name(name, false);
1310  replorigin_session_setup(origin);
1311 
1312  replorigin_session_origin = origin;
1313 
1314  pfree(name);
1315 
1316  PG_RETURN_VOID();
1317 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:263
uint16 RepOriginId
Definition: xlogdefs.h:58
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1053
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
void pfree(void *pointer)
Definition: mcxt.c:1056
#define PG_RETURN_VOID()
Definition: fmgr.h:339
RepOriginId replorigin_session_origin
Definition: origin.c:152
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:549
char * text_to_cstring(const text *t)
Definition: varlena.c:204
Definition: c.h:556

◆ pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1395 of file origin.c.

References InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1396 {
1397  replorigin_check_prerequisites(true, false);
1398 
1401 
1402  PG_RETURN_VOID();
1403 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:153
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:154
#define PG_RETURN_VOID()
Definition: fmgr.h:339

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1377 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1378 {
1379  XLogRecPtr location = PG_GETARG_LSN(0);
1380 
1381  replorigin_check_prerequisites(true, false);
1382 
1383  if (session_replication_state == NULL)
1384  ereport(ERROR,
1385  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1386  errmsg("no replication origin is configured")));
1387 
1388  replorigin_session_origin_lsn = location;
1390 
1391  PG_RETURN_VOID();
1392 }
static ReplicationState * session_replication_state
Definition: origin.c:171
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:153
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:154
#define ereport(elevel, rest)
Definition: elog.h:141
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:339
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1467 of file origin.c.

References ReturnSetInfo::allowedModes, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, InvalidRepOriginId, IsA, ReplicationState::local_lsn, ReplicationState::lock, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ObjectIdGetDatum, ReplicationState::remote_lsn, REPLICATION_ORIGIN_PROGRESS_COLS, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::returnMode, ReplicationState::roident, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1468 {
1469  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1470  TupleDesc tupdesc;
1471  Tuplestorestate *tupstore;
1472  MemoryContext per_query_ctx;
1473  MemoryContext oldcontext;
1474  int i;
1476 
1477  /* we want to return 0 rows if slot is set to zero */
1478  replorigin_check_prerequisites(false, true);
1479 
1480  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1481  ereport(ERROR,
1482  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1483  errmsg("set-valued function called in context that cannot accept a set")));
1484  if (!(rsinfo->allowedModes & SFRM_Materialize))
1485  ereport(ERROR,
1486  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1487  errmsg("materialize mode required, but it is not allowed in this context")));
1488  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1489  elog(ERROR, "return type must be a row type");
1490 
1491  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1492  elog(ERROR, "wrong function definition");
1493 
1494  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1495  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1496 
1497  tupstore = tuplestore_begin_heap(true, false, work_mem);
1498  rsinfo->returnMode = SFRM_Materialize;
1499  rsinfo->setResult = tupstore;
1500  rsinfo->setDesc = tupdesc;
1501 
1502  MemoryContextSwitchTo(oldcontext);
1503 
1504 
1505  /* prevent slots from being concurrently dropped */
1506  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1507 
1508  /*
1509  * Iterate through all possible replication_states, display if they are
1510  * filled. Note that we do not take any locks, so slightly corrupted/out
1511  * of date values are a possibility.
1512  */
1513  for (i = 0; i < max_replication_slots; i++)
1514  {
1518  char *roname;
1519 
1520  state = &replication_states[i];
1521 
1522  /* unused slot, nothing to display */
1523  if (state->roident == InvalidRepOriginId)
1524  continue;
1525 
1526  memset(values, 0, sizeof(values));
1527  memset(nulls, 1, sizeof(nulls));
1528 
1529  values[0] = ObjectIdGetDatum(state->roident);
1530  nulls[0] = false;
1531 
1532  /*
1533  * We're not preventing the origin to be dropped concurrently, so
1534  * silently accept that it might be gone.
1535  */
1536  if (replorigin_by_oid(state->roident, true,
1537  &roname))
1538  {
1539  values[1] = CStringGetTextDatum(roname);
1540  nulls[1] = false;
1541  }
1542 
1543  LWLockAcquire(&state->lock, LW_SHARED);
1544 
1545  values[2] = LSNGetDatum(state->remote_lsn);
1546  nulls[2] = false;
1547 
1548  values[3] = LSNGetDatum(state->local_lsn);
1549  nulls[3] = false;
1550 
1551  LWLockRelease(&state->lock);
1552 
1553  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1554  }
1555 
1556  tuplestore_donestoring(tupstore);
1557 
1558  LWLockRelease(ReplicationOriginLock);
1559 
1560 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1561 
1562  return (Datum) 0;
1563 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:177
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
XLogRecPtr local_lsn
Definition: origin.c:117
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:431
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:132
#define ereport(elevel, rest)
Definition: elog.h:141
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
uintptr_t Datum
Definition: postgres.h:367
int work_mem
Definition: globals.c:121
int allowedModes
Definition: execnodes.h:302
SetFunctionReturnMode returnMode
Definition: execnodes.h:304
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:230
#define InvalidRepOriginId
Definition: origin.h:33
Tuplestorestate * setResult
Definition: execnodes.h:307
static Datum values[MAXATTR]
Definition: bootstrap.c:167
ExprContext * econtext
Definition: execnodes.h:300
TupleDesc setDesc
Definition: execnodes.h:308
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
int i
static ReplicationState * replication_states
Definition: origin.c:163
#define CStringGetTextDatum(s)
Definition: builtins.h:83
#define REPLICATION_ORIGIN_PROGRESS_COLS

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1021 of file origin.c.

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, and ReplicationState::origin_cv.

Referenced by replorigin_session_setup().

1022 {
1023  ConditionVariable *cv = NULL;
1024 
1025  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1026 
1027  if (session_replication_state != NULL &&
1029  {
1031 
1034  }
1035 
1036  LWLockRelease(ReplicationOriginLock);
1037 
1038  if (cv)
1040 }
static ReplicationState * session_replication_state
Definition: origin.c:171
int MyProcPid
Definition: globals.c:40
void ConditionVariableBroadcast(ConditionVariable *cv)
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 492 of file origin.c.

References ConditionVariableInit(), i, ReplicationState::lock, LWLockInitialize(), LWLockRegisterTranche(), LWTRANCHE_REPLICATION_ORIGIN, max_replication_slots, MemSet, ReplicationState::origin_cv, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateSharedMemoryAndSemaphores().

493 {
494  bool found;
495 
496  if (max_replication_slots == 0)
497  return;
498 
500  ShmemInitStruct("ReplicationOriginState",
502  &found);
504 
505  if (!found)
506  {
507  int i;
508 
510 
512 
513  for (i = 0; i < max_replication_slots; i++)
514  {
518  }
519  }
520 
522  "replication_origin");
523 }
#define MemSet(start, val, len)
Definition: c.h:962
void LWLockRegisterTranche(int tranche_id, const char *tranche_name)
Definition: lwlock.c:603
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:678
Size ReplicationOriginShmemSize(void)
Definition: origin.c:472
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:164
int max_replication_slots
Definition: slot.c:99
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:148
int i
static ReplicationState * replication_states
Definition: origin.c:163

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 472 of file origin.c.

References add_size(), max_replication_slots, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and ReplicationOriginShmemInit().

473 {
474  Size size = 0;
475 
476  /*
477  * XXX: max_replication_slots is arguably the wrong thing to use, as here
478  * we keep the replay state of *remote* transactions. But for now it seems
479  * sufficient to reuse it, lest we introduce a separate GUC.
480  */
481  if (max_replication_slots == 0)
482  return size;
483 
484  size = add_size(size, offsetof(ReplicationStateCtl, states));
485 
486  size = add_size(size,
488  return size;
489 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int max_replication_slots
Definition: slot.c:99
size_t Size
Definition: c.h:467
#define offsetof(type, field)
Definition: c.h:662

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 857 of file origin.c.

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), and xact_redo_commit().

860 {
861  int i;
862  ReplicationState *replication_state = NULL;
863  ReplicationState *free_state = NULL;
864 
865  Assert(node != InvalidRepOriginId);
866 
867  /* we don't track DoNotReplicateId */
868  if (node == DoNotReplicateId)
869  return;
870 
871  /*
872  * XXX: For the case where this is called by WAL replay, it'd be more
873  * efficient to restore into a backend local hashtable and only dump into
874  * shmem after recovery is finished. Let's wait with implementing that
875  * till it's shown to be a measurable expense
876  */
877 
878  /* Lock exclusively, as we may have to create a new table entry. */
879  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
880 
881  /*
882  * Search for either an existing slot for the origin, or a free one we can
883  * use.
884  */
885  for (i = 0; i < max_replication_slots; i++)
886  {
887  ReplicationState *curstate = &replication_states[i];
888 
889  /* remember where to insert if necessary */
890  if (curstate->roident == InvalidRepOriginId &&
891  free_state == NULL)
892  {
893  free_state = curstate;
894  continue;
895  }
896 
897  /* not our slot */
898  if (curstate->roident != node)
899  {
900  continue;
901  }
902 
903  /* ok, found slot */
904  replication_state = curstate;
905 
906  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
907 
908  /* Make sure it's not used by somebody else */
909  if (replication_state->acquired_by != 0)
910  {
911  ereport(ERROR,
912  (errcode(ERRCODE_OBJECT_IN_USE),
913  errmsg("replication origin with OID %d is already active for PID %d",
914  replication_state->roident,
915  replication_state->acquired_by)));
916  }
917 
918  break;
919  }
920 
921  if (replication_state == NULL && free_state == NULL)
922  ereport(ERROR,
923  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
924  errmsg("could not find free replication state slot for replication origin with OID %u",
925  node),
926  errhint("Increase max_replication_slots and try again.")));
927 
928  if (replication_state == NULL)
929  {
930  /* initialize new slot */
931  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
932  replication_state = free_state;
933  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
934  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
935  replication_state->roident = node;
936  }
937 
938  Assert(replication_state->roident != InvalidRepOriginId);
939 
940  /*
941  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
942  * and the standby gets the message. Primarily this will be called during
943  * WAL replay (of commit records) where no WAL logging is necessary.
944  */
945  if (wal_log)
946  {
947  xl_replorigin_set xlrec;
948 
949  xlrec.remote_lsn = remote_commit;
950  xlrec.node_id = node;
951  xlrec.force = go_backward;
952 
953  XLogBeginInsert();
954  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
955 
956  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
957  }
958 
959  /*
960  * Due to - harmless - race conditions during a checkpoint we could see
961  * values here that are older than the ones we already have in memory.
962  * Don't overwrite those.
963  */
964  if (go_backward || replication_state->remote_lsn < remote_commit)
965  replication_state->remote_lsn = remote_commit;
966  if (local_commit != InvalidXLogRecPtr &&
967  (go_backward || replication_state->local_lsn < local_commit))
968  replication_state->local_lsn = local_commit;
969  LWLockRelease(&replication_state->lock);
970 
971  /*
972  * Release *after* changing the LSNs, slot isn't acquired and thus could
973  * otherwise be dropped anytime.
974  */
975  LWLockRelease(ReplicationOriginLock);
976 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int errhint(const char *fmt,...)
Definition: elog.c:1069
XLogRecPtr local_lsn
Definition: origin.c:117
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:608
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:132
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:739
RepOriginId node_id
Definition: origin.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:822
int i
static ReplicationState * replication_states
Definition: origin.c:163
void XLogBeginInsert(void)
Definition: xloginsert.c:120

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( char *  roname,
bool  missing_ok 
)

Definition at line 208 of file origin.c.

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidOid, ReleaseSysCache(), REPLORIGNAME, ReplicationState::roident, and SearchSysCache1().

Referenced by ApplyWorkerMain(), DropSubscription(), pg_replication_origin_advance(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), and pg_replication_origin_session_setup().

209 {
211  Oid roident = InvalidOid;
212  HeapTuple tuple;
213  Datum roname_d;
214 
215  roname_d = CStringGetTextDatum(roname);
216 
217  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
218  if (HeapTupleIsValid(tuple))
219  {
220  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
221  roident = ident->roident;
222  ReleaseSysCache(tuple);
223  }
224  else if (!missing_ok)
225  ereport(ERROR,
226  (errcode(ERRCODE_UNDEFINED_OBJECT),
227  errmsg("replication origin \"%s\" does not exist",
228  roname)));
229 
230  return roident;
231 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:608
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define CStringGetTextDatum(s)
Definition: builtins.h:83

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 431 of file origin.c.

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum, OidIsValid, ReleaseSysCache(), REPLORIGIDENT, SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and pgoutput_begin_txn().

432 {
433  HeapTuple tuple;
435 
436  Assert(OidIsValid((Oid) roident));
437  Assert(roident != InvalidRepOriginId);
438  Assert(roident != DoNotReplicateId);
439 
441  ObjectIdGetDatum((Oid) roident));
442 
443  if (HeapTupleIsValid(tuple))
444  {
445  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
446  *roname = text_to_cstring(&ric->roname);
447  ReleaseSysCache(tuple);
448 
449  return true;
450  }
451  else
452  {
453  *roname = NULL;
454 
455  if (!missing_ok)
456  ereport(ERROR,
457  (errcode(ERRCODE_UNDEFINED_OBJECT),
458  errmsg("replication origin with OID %u does not exist",
459  roident)));
460 
461  return false;
462  }
463 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:608
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:645
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:739
#define InvalidRepOriginId
Definition: origin.h:33
char * text_to_cstring(const text *t)
Definition: varlena.c:204
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ replorigin_check_prerequisites()

static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 177 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, RecoveryInProgress(), and superuser().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

178 {
179  if (!superuser())
180  ereport(ERROR,
181  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
182  errmsg("only superusers can query or manipulate replication origins")));
183 
184  if (check_slots && max_replication_slots == 0)
185  ereport(ERROR,
186  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
187  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
188 
189  if (!recoveryOK && RecoveryInProgress())
190  ereport(ERROR,
191  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
192  errmsg("cannot manipulate replication origins during recovery")));
193 
194 }
int errcode(int sqlerrcode)
Definition: elog.c:608
bool superuser(void)
Definition: superuser.c:46
bool RecoveryInProgress(void)
Definition: xlog.c:7935
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int max_replication_slots
Definition: slot.c:99
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ replorigin_create()

RepOriginId replorigin_create ( char *  roname)

Definition at line 239 of file origin.c.

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum, PG_UINT16_MAX, RelationGetDescr, ReplicationOriginIdentIndex, ReplicationState::roident, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by ApplyWorkerMain(), CreateSubscription(), and pg_replication_origin_create().

240 {
241  Oid roident;
242  HeapTuple tuple = NULL;
243  Relation rel;
244  Datum roname_d;
245  SnapshotData SnapshotDirty;
246  SysScanDesc scan;
248 
249  roname_d = CStringGetTextDatum(roname);
250 
252 
253  /*
254  * We need the numeric replication origin to be 16bit wide, so we cannot
255  * rely on the normal oid allocation. Instead we simply scan
256  * pg_replication_origin for the first unused id. That's not particularly
257  * efficient, but this should be a fairly infrequent operation - we can
258  * easily spend a bit more code on this when it turns out it needs to be
259  * faster.
260  *
261  * We handle concurrency by taking an exclusive lock (allowing reads!)
262  * over the table for the duration of the search. Because we use a "dirty
263  * snapshot" we can read rows that other in-progress sessions have
264  * written, even though they would be invisible with normal snapshots. Due
265  * to the exclusive lock there's no danger that new rows can appear while
266  * we're checking.
267  */
268  InitDirtySnapshot(SnapshotDirty);
269 
270  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
271 
272  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
273  {
274  bool nulls[Natts_pg_replication_origin];
275  Datum values[Natts_pg_replication_origin];
276  bool collides;
277 
279 
280  ScanKeyInit(&key,
281  Anum_pg_replication_origin_roident,
282  BTEqualStrategyNumber, F_OIDEQ,
283  ObjectIdGetDatum(roident));
284 
286  true /* indexOK */ ,
287  &SnapshotDirty,
288  1, &key);
289 
290  collides = HeapTupleIsValid(systable_getnext(scan));
291 
292  systable_endscan(scan);
293 
294  if (!collides)
295  {
296  /*
297  * Ok, found an unused roident, insert the new row and do a CCI,
298  * so our callers can look it up if they want to.
299  */
300  memset(&nulls, 0, sizeof(nulls));
301 
302  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
303  values[Anum_pg_replication_origin_roname - 1] = roname_d;
304 
305  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
306  CatalogTupleInsert(rel, tuple);
308  break;
309  }
310  }
311 
312  /* now release lock again, */
314 
315  if (tuple == NULL)
316  ereport(ERROR,
317  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
318  errmsg("could not find free replication origin OID")));
319 
320  heap_freetuple(tuple);
321  return roident;
322 }
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:76
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:525
#define RelationGetDescr(relation)
Definition: rel.h:448
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:608
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:352
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:444
#define ReplicationOriginIdentIndex
Definition: indexing.h:340
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:440
#define ereport(elevel, rest)
Definition: elog.h:141
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:739
bool IsTransactionState(void)
Definition: xact.c:355
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:83
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
#define BTEqualStrategyNumber
Definition: stratnum.h:31

◆ replorigin_drop()

void replorigin_drop ( RepOriginId  roident,
bool  nowait 
)

Definition at line 331 of file origin.c.

References ReplicationState::acquired_by, Assert, CatalogTupleDelete(), CommandCounterIncrement(), ConditionVariableCancelSleep(), ConditionVariableSleep(), elog, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, HeapTupleIsValid, i, InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, ObjectIdGetDatum, ReplicationState::origin_cv, ReleaseSysCache(), ReplicationState::remote_lsn, REPLORIGIDENT, ReplicationState::roident, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), WAIT_EVENT_REPLICATION_ORIGIN_DROP, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by DropSubscription(), and pg_replication_origin_drop().

332 {
333  HeapTuple tuple;
334  Relation rel;
335  int i;
336 
338 
339  /*
340  * To interlock against concurrent drops, we hold ExclusiveLock on
341  * pg_replication_origin throughout this function.
342  */
343  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
344 
345  /*
346  * First, clean up the slot state info, if there is any matching slot.
347  */
348 restart:
349  tuple = NULL;
350  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
351 
352  for (i = 0; i < max_replication_slots; i++)
353  {
355 
356  if (state->roident == roident)
357  {
358  /* found our slot, is it busy? */
359  if (state->acquired_by != 0)
360  {
361  ConditionVariable *cv;
362 
363  if (nowait)
364  ereport(ERROR,
365  (errcode(ERRCODE_OBJECT_IN_USE),
366  errmsg("could not drop replication origin with OID %d, in use by PID %d",
367  state->roident,
368  state->acquired_by)));
369 
370  /*
371  * We must wait and then retry. Since we don't know which CV
372  * to wait on until here, we can't readily use
373  * ConditionVariablePrepareToSleep (calling it here would be
374  * wrong, since we could miss the signal if we did so); just
375  * use ConditionVariableSleep directly.
376  */
377  cv = &state->origin_cv;
378 
379  LWLockRelease(ReplicationOriginLock);
380 
382  goto restart;
383  }
384 
385  /* first make a WAL log entry */
386  {
387  xl_replorigin_drop xlrec;
388 
389  xlrec.node_id = roident;
390  XLogBeginInsert();
391  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
392  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
393  }
394 
395  /* then clear the in-memory slot */
396  state->roident = InvalidRepOriginId;
397  state->remote_lsn = InvalidXLogRecPtr;
398  state->local_lsn = InvalidXLogRecPtr;
399  break;
400  }
401  }
402  LWLockRelease(ReplicationOriginLock);
404 
405  /*
406  * Now, we can delete the catalog entry.
407  */
409  if (!HeapTupleIsValid(tuple))
410  elog(ERROR, "cache lookup failed for replication origin with oid %u",
411  roident);
412 
413  CatalogTupleDelete(rel, &tuple->t_self);
414  ReleaseSysCache(tuple);
415 
417 
418  /* now release lock again */
420 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
XLogRecPtr local_lsn
Definition: origin.c:117
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:608
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
void ConditionVariableCancelSleep(void)
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
void CommandCounterIncrement(void)
Definition: xact.c:1005
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:110
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
RepOriginId node_id
Definition: origin.h:27
#define Assert(condition)
Definition: c.h:739
Definition: regguts.h:298
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
bool IsTransactionState(void)
Definition: xact.c:355
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
int i
static ReplicationState * replication_states
Definition: origin.c:163
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void XLogBeginInsert(void)
Definition: xloginsert.c:120

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 980 of file origin.c.

References i, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::remote_lsn, ReplicationState::roident, and XLogFlush().

Referenced by pg_replication_origin_progress().

981 {
982  int i;
983  XLogRecPtr local_lsn = InvalidXLogRecPtr;
984  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
985 
986  /* prevent slots from being concurrently dropped */
987  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
988 
989  for (i = 0; i < max_replication_slots; i++)
990  {
992 
993  state = &replication_states[i];
994 
995  if (state->roident == node)
996  {
997  LWLockAcquire(&state->lock, LW_SHARED);
998 
999  remote_lsn = state->remote_lsn;
1000  local_lsn = state->local_lsn;
1001 
1002  LWLockRelease(&state->lock);
1003 
1004  break;
1005  }
1006  }
1007 
1008  LWLockRelease(ReplicationOriginLock);
1009 
1010  if (flush && local_lsn != InvalidXLogRecPtr)
1011  XLogFlush(local_lsn);
1012 
1013  return remote_lsn;
1014 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2805
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LWLock lock
Definition: origin.c:132
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int i
static ReplicationState * replication_states
Definition: origin.c:163

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 796 of file origin.c.

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, replorigin_advance(), ReplicationState::roident, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

797 {
798  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
799 
800  switch (info)
801  {
802  case XLOG_REPLORIGIN_SET:
803  {
804  xl_replorigin_set *xlrec =
805  (xl_replorigin_set *) XLogRecGetData(record);
806 
808  xlrec->remote_lsn, record->EndRecPtr,
809  xlrec->force /* backward */ ,
810  false /* WAL log */ );
811  break;
812  }
814  {
815  xl_replorigin_drop *xlrec;
816  int i;
817 
818  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
819 
820  for (i = 0; i < max_replication_slots; i++)
821  {
823 
824  /* found our slot */
825  if (state->roident == xlrec->node_id)
826  {
827  /* reset entry */
828  state->roident = InvalidRepOriginId;
829  state->remote_lsn = InvalidXLogRecPtr;
830  state->local_lsn = InvalidXLogRecPtr;
831  break;
832  }
833  }
834  break;
835  }
836  default:
837  elog(PANIC, "replorigin_redo: unknown op code %u", info);
838  }
839 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
unsigned char uint8
Definition: c.h:357
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:857
#define PANIC
Definition: elog.h:53
RepOriginId roident
Definition: origin.c:105
XLogRecPtr EndRecPtr
Definition: xlogreader.h:132
#define XLogRecGetData(decoder)
Definition: xlogreader.h:283
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:279
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
RepOriginId node_id
Definition: origin.h:27
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
#define elog(elevel,...)
Definition: elog.h:228
int i
static ReplicationState * replication_states
Definition: origin.c:163

◆ replorigin_session_advance()

void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1170 of file origin.c.

References Assert, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and ReplicationState::roident.

Referenced by EndPrepare(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

1171 {
1174 
1176  if (session_replication_state->local_lsn < local_commit)
1177  session_replication_state->local_lsn = local_commit;
1178  if (session_replication_state->remote_lsn < remote_commit)
1179  session_replication_state->remote_lsn = remote_commit;
1181 }
static ReplicationState * session_replication_state
Definition: origin.c:171
XLogRecPtr local_lsn
Definition: origin.c:117
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LWLock lock
Definition: origin.c:132
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:739
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define InvalidRepOriginId
Definition: origin.h:33

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1188 of file origin.c.

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and XLogFlush().

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_progress().

1189 {
1190  XLogRecPtr remote_lsn;
1191  XLogRecPtr local_lsn;
1192 
1194 
1196  remote_lsn = session_replication_state->remote_lsn;
1197  local_lsn = session_replication_state->local_lsn;
1199 
1200  if (flush && local_lsn != InvalidXLogRecPtr)
1201  XLogFlush(local_lsn);
1202 
1203  return remote_lsn;
1204 }
static ReplicationState * session_replication_state
Definition: origin.c:171
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2805
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LWLock lock
Definition: origin.c:132
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1141 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, and ReplicationState::origin_cv.

Referenced by pg_replication_origin_session_reset().

1142 {
1143  ConditionVariable *cv;
1144 
1146 
1147  if (session_replication_state == NULL)
1148  ereport(ERROR,
1149  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1150  errmsg("no replication origin is configured")));
1151 
1152  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1153 
1157 
1158  LWLockRelease(ReplicationOriginLock);
1159 
1161 }
static ReplicationState * session_replication_state
Definition: origin.c:171
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:608
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int max_replication_slots
Definition: slot.c:99
#define Assert(condition)
Definition: c.h:739
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node)

Definition at line 1053 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, ReplicationOriginExitCleanup(), and ReplicationState::roident.

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_setup().

1054 {
1055  static bool registered_cleanup;
1056  int i;
1057  int free_slot = -1;
1058 
1059  if (!registered_cleanup)
1060  {
1062  registered_cleanup = true;
1063  }
1064 
1066 
1067  if (session_replication_state != NULL)
1068  ereport(ERROR,
1069  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1070  errmsg("cannot setup replication origin when one is already setup")));
1071 
1072  /* Lock exclusively, as we may have to create a new table entry. */
1073  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1074 
1075  /*
1076  * Search for either an existing slot for the origin, or a free one we can
1077  * use.
1078  */
1079  for (i = 0; i < max_replication_slots; i++)
1080  {
1081  ReplicationState *curstate = &replication_states[i];
1082 
1083  /* remember where to insert if necessary */
1084  if (curstate->roident == InvalidRepOriginId &&
1085  free_slot == -1)
1086  {
1087  free_slot = i;
1088  continue;
1089  }
1090 
1091  /* not our slot */
1092  if (curstate->roident != node)
1093  continue;
1094 
1095  else if (curstate->acquired_by != 0)
1096  {
1097  ereport(ERROR,
1098  (errcode(ERRCODE_OBJECT_IN_USE),
1099  errmsg("replication origin with OID %d is already active for PID %d",
1100  curstate->roident, curstate->acquired_by)));
1101  }
1102 
1103  /* ok, found slot */
1104  session_replication_state = curstate;
1105  }
1106 
1107 
1108  if (session_replication_state == NULL && free_slot == -1)
1109  ereport(ERROR,
1110  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1111  errmsg("could not find free replication state slot for replication origin with OID %u",
1112  node),
1113  errhint("Increase max_replication_slots and try again.")));
1114  else if (session_replication_state == NULL)
1115  {
1116  /* initialize new slot */
1121  }
1122 
1123 
1125 
1127 
1128  LWLockRelease(ReplicationOriginLock);
1129 
1130  /* probably this one is pointless */
1132 }
static ReplicationState * session_replication_state
Definition: origin.c:171
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
int errhint(const char *fmt,...)
Definition: elog.c:1069
XLogRecPtr local_lsn
Definition: origin.c:117
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1021
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:608
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define ERROR
Definition: elog.h:43
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
#define ereport(elevel, rest)
Definition: elog.h:141
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:739
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:822
int i
static ReplicationState * replication_states
Definition: origin.c:163

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 668 of file origin.c.

References Assert, CloseTransientFile(), COMP_CRC32C, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

669 {
670  const char *path = "pg_logical/replorigin_checkpoint";
671  int fd;
672  int readBytes;
674  int last_state = 0;
675  pg_crc32c file_crc;
676  pg_crc32c crc;
677 
678  /* don't want to overwrite already existing state */
679 #ifdef USE_ASSERT_CHECKING
680  static bool already_started = false;
681 
682  Assert(!already_started);
683  already_started = true;
684 #endif
685 
686  if (max_replication_slots == 0)
687  return;
688 
689  INIT_CRC32C(crc);
690 
691  elog(DEBUG2, "starting up replication origin progress state");
692 
693  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
694 
695  /*
696  * might have had max_replication_slots == 0 last run, or we just brought
697  * up a standby.
698  */
699  if (fd < 0 && errno == ENOENT)
700  return;
701  else if (fd < 0)
702  ereport(PANIC,
704  errmsg("could not open file \"%s\": %m",
705  path)));
706 
707  /* verify magic, that is written even if nothing was active */
708  readBytes = read(fd, &magic, sizeof(magic));
709  if (readBytes != sizeof(magic))
710  {
711  if (readBytes < 0)
712  ereport(PANIC,
714  errmsg("could not read file \"%s\": %m",
715  path)));
716  else
717  ereport(PANIC,
719  errmsg("could not read file \"%s\": read %d of %zu",
720  path, readBytes, sizeof(magic))));
721  }
722  COMP_CRC32C(crc, &magic, sizeof(magic));
723 
724  if (magic != REPLICATION_STATE_MAGIC)
725  ereport(PANIC,
726  (errmsg("replication checkpoint has wrong magic %u instead of %u",
727  magic, REPLICATION_STATE_MAGIC)));
728 
729  /* we can skip locking here, no other access is possible */
730 
731  /* recover individual states, until there are no more to be found */
732  while (true)
733  {
734  ReplicationStateOnDisk disk_state;
735 
736  readBytes = read(fd, &disk_state, sizeof(disk_state));
737 
738  /* no further data */
739  if (readBytes == sizeof(crc))
740  {
741  /* not pretty, but simple ... */
742  file_crc = *(pg_crc32c *) &disk_state;
743  break;
744  }
745 
746  if (readBytes < 0)
747  {
748  ereport(PANIC,
750  errmsg("could not read file \"%s\": %m",
751  path)));
752  }
753 
754  if (readBytes != sizeof(disk_state))
755  {
756  ereport(PANIC,
758  errmsg("could not read file \"%s\": read %d of %zu",
759  path, readBytes, sizeof(disk_state))));
760  }
761 
762  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
763 
764  if (last_state == max_replication_slots)
765  ereport(PANIC,
766  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
767  errmsg("could not find free replication state, increase max_replication_slots")));
768 
769  /* copy data to shared memory */
770  replication_states[last_state].roident = disk_state.roident;
771  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
772  last_state++;
773 
774  elog(LOG, "recovered replication state of node %u to %X/%X",
775  disk_state.roident,
776  (uint32) (disk_state.remote_lsn >> 32),
777  (uint32) disk_state.remote_lsn);
778  }
779 
780  /* now check checksum */
781  FIN_CRC32C(crc);
782  if (file_crc != crc)
783  ereport(PANIC,
784  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
785  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
786  crc, file_crc)));
787 
788  if (CloseTransientFile(fd) != 0)
789  ereport(PANIC,
791  errmsg("could not close file \"%s\": %m",
792  path)));
793 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr remote_lsn
Definition: origin.c:141
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:140
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LOG
Definition: elog.h:26
#define PANIC
Definition: elog.h:53
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
RepOriginId roident
Definition: origin.c:105
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:631
unsigned int uint32
Definition: c.h:359
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define REPLICATION_STATE_MAGIC
Definition: origin.c:174
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:739
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
static ReplicationState * replication_states
Definition: origin.c:163
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
#define read(a, b, c)
Definition: win32.h:13

Variable Documentation

◆ replication_states

ReplicationState* replication_states
static

Definition at line 163 of file origin.c.

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 164 of file origin.c.

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp

◆ session_replication_state

ReplicationState* session_replication_state = NULL
static

Definition at line 171 of file origin.c.