PostgreSQL Source Code  git master
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "funcapi.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "nodes/execnodes.h"
#include "replication/origin.h"
#include "replication/logical.h"
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct ReplicationStateOnDisk ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
RepOriginId replorigin_by_name (char *roname, bool missing_ok)
 
RepOriginId replorigin_create (char *roname)
 
void replorigin_drop (RepOriginId roident, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

◆ REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

◆ REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 177 of file origin.c.

Referenced by CheckPointReplicationOrigin(), and StartupReplicationOrigin().

Typedef Documentation

◆ ReplicationState

◆ ReplicationStateCtl

◆ ReplicationStateOnDisk

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 545 of file origin.c.

References CloseTransientFile(), COMP_CRC32C, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

546 {
547  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
548  const char *path = "pg_logical/replorigin_checkpoint";
549  int tmpfd;
550  int i;
552  pg_crc32c crc;
553 
554  if (max_replication_slots == 0)
555  return;
556 
557  INIT_CRC32C(crc);
558 
559  /* make sure no old temp file is remaining */
560  if (unlink(tmppath) < 0 && errno != ENOENT)
561  ereport(PANIC,
563  errmsg("could not remove file \"%s\": %m",
564  tmppath)));
565 
566  /*
567  * no other backend can perform this at the same time, we're protected by
568  * CheckpointLock.
569  */
570  tmpfd = OpenTransientFile(tmppath,
571  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
572  if (tmpfd < 0)
573  ereport(PANIC,
575  errmsg("could not create file \"%s\": %m",
576  tmppath)));
577 
578  /* write magic */
579  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
580  {
581  CloseTransientFile(tmpfd);
582  ereport(PANIC,
584  errmsg("could not write to file \"%s\": %m",
585  tmppath)));
586  }
587  COMP_CRC32C(crc, &magic, sizeof(magic));
588 
589  /* prevent concurrent creations/drops */
590  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
591 
592  /* write actual data */
593  for (i = 0; i < max_replication_slots; i++)
594  {
595  ReplicationStateOnDisk disk_state;
596  ReplicationState *curstate = &replication_states[i];
597  XLogRecPtr local_lsn;
598 
599  if (curstate->roident == InvalidRepOriginId)
600  continue;
601 
602  /* zero, to avoid uninitialized padding bytes */
603  memset(&disk_state, 0, sizeof(disk_state));
604 
605  LWLockAcquire(&curstate->lock, LW_SHARED);
606 
607  disk_state.roident = curstate->roident;
608 
609  disk_state.remote_lsn = curstate->remote_lsn;
610  local_lsn = curstate->local_lsn;
611 
612  LWLockRelease(&curstate->lock);
613 
614  /* make sure we only write out a commit that's persistent */
615  XLogFlush(local_lsn);
616 
617  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
618  sizeof(disk_state))
619  {
620  CloseTransientFile(tmpfd);
621  ereport(PANIC,
623  errmsg("could not write to file \"%s\": %m",
624  tmppath)));
625  }
626 
627  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
628  }
629 
630  LWLockRelease(ReplicationOriginLock);
631 
632  /* write out the CRC */
633  FIN_CRC32C(crc);
634  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
635  {
636  CloseTransientFile(tmpfd);
637  ereport(PANIC,
639  errmsg("could not write to file \"%s\": %m",
640  tmppath)));
641  }
642 
643  CloseTransientFile(tmpfd);
644 
645  /* fsync, rename to permanent file, fsync file and directory */
646  durable_rename(tmppath, path, PANIC);
647 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr local_lsn
Definition: origin.c:120
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_lsn
Definition: origin.c:144
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:143
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783
#define PG_BINARY
Definition: c.h:1080
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2396
LWLock lock
Definition: origin.c:135
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:325
#define ereport(elevel, rest)
Definition: elog.h:122
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:601
int CloseTransientFile(int fd)
Definition: fd.c:2566
#define REPLICATION_STATE_MAGIC
Definition: origin.c:177
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94

◆ pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1366 of file origin.c.

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

1367 {
1368  text *name = PG_GETARG_TEXT_PP(0);
1369  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1370  RepOriginId node;
1371 
1372  replorigin_check_prerequisites(true, false);
1373 
1374  /* lock to prevent the replication origin from vanishing */
1375  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1376 
1377  node = replorigin_by_name(text_to_cstring(name), false);
1378 
1379  /*
1380  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1381  * xact hasn't committed yet. This is why this function should be used to
1382  * set up the initial replication state, but not for replay.
1383  */
1384  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1385  true /* go backward */ , true /* WAL log */ );
1386 
1387  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1388 
1389  PG_RETURN_VOID();
1390 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:182
uint16 RepOriginId
Definition: xlogdefs.h:51
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:834
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:278
#define RowExclusiveLock
Definition: lockdefs.h:38
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:314
uint64 XLogRecPtr
Definition: xlogdefs.h:21
const char * name
Definition: encode.c:521
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:516
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:105

◆ pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1197 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), ReplicationState::roident, and text_to_cstring().

1198 {
1199  char *name;
1200  RepOriginId roident;
1201 
1202  replorigin_check_prerequisites(false, false);
1203 
1205  roident = replorigin_create(name);
1206 
1207  pfree(name);
1208 
1209  PG_RETURN_OID(roident);
1210 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:238
uint16 RepOriginId
Definition: xlogdefs.h:51
void pfree(void *pointer)
Definition: mcxt.c:1031
RepOriginId replorigin_create(char *roname)
Definition: origin.c:242
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:532
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:516
#define PG_RETURN_OID(x)
Definition: fmgr.h:325

◆ pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1216 of file origin.c.

References Assert, DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_drop(), ReplicationState::roident, and text_to_cstring().

1217 {
1218  char *name;
1219  RepOriginId roident;
1220 
1221  replorigin_check_prerequisites(false, false);
1222 
1224 
1225  roident = replorigin_by_name(name, false);
1226  Assert(OidIsValid(roident));
1227 
1228  replorigin_drop(roident, true);
1229 
1230  pfree(name);
1231 
1232  PG_RETURN_VOID();
1233 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:238
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:334
uint16 RepOriginId
Definition: xlogdefs.h:51
#define OidIsValid(objectId)
Definition: c.h:605
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void pfree(void *pointer)
Definition: mcxt.c:1031
#define PG_RETURN_VOID()
Definition: fmgr.h:314
#define Assert(condition)
Definition: c.h:699
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:532
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:516

◆ pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1239 of file origin.c.

References DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), ReplicationState::roident, and text_to_cstring().

1240 {
1241  char *name;
1242  RepOriginId roident;
1243 
1244  replorigin_check_prerequisites(false, false);
1245 
1247  roident = replorigin_by_name(name, true);
1248 
1249  pfree(name);
1250 
1251  if (OidIsValid(roident))
1252  PG_RETURN_OID(roident);
1253  PG_RETURN_NULL();
1254 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:238
uint16 RepOriginId
Definition: xlogdefs.h:51
#define OidIsValid(objectId)
Definition: c.h:605
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void pfree(void *pointer)
Definition: mcxt.c:1031
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:532
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:516
#define PG_RETURN_OID(x)
Definition: fmgr.h:325
#define PG_RETURN_NULL()
Definition: fmgr.h:310

◆ pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1401 of file origin.c.

References Assert, DatumGetPointer, InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), ReplicationState::roident, and text_to_cstring().

1402 {
1403  char *name;
1404  bool flush;
1405  RepOriginId roident;
1406  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1407 
1408  replorigin_check_prerequisites(true, true);
1409 
1411  flush = PG_GETARG_BOOL(1);
1412 
1413  roident = replorigin_by_name(name, false);
1414  Assert(OidIsValid(roident));
1415 
1416  remote_lsn = replorigin_get_progress(roident, flush);
1417 
1418  if (remote_lsn == InvalidXLogRecPtr)
1419  PG_RETURN_NULL();
1420 
1421  PG_RETURN_LSN(remote_lsn);
1422 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:238
uint16 RepOriginId
Definition: xlogdefs.h:51
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:244
#define OidIsValid(objectId)
Definition: c.h:605
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:532
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:516
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:957
#define PG_RETURN_NULL()
Definition: fmgr.h:310

◆ pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1299 of file origin.c.

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

1300 {
1301  replorigin_check_prerequisites(false, false);
1302 
1304 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:324
RepOriginId replorigin_session_origin
Definition: origin.c:155
#define InvalidRepOriginId
Definition: origin.h:34

◆ pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1315 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, ReplicationState::remote_lsn, replorigin_check_prerequisites(), and replorigin_session_get_progress().

1316 {
1317  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1318  bool flush = PG_GETARG_BOOL(0);
1319 
1320  replorigin_check_prerequisites(true, false);
1321 
1322  if (session_replication_state == NULL)
1323  ereport(ERROR,
1324  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1325  errmsg("no replication origin is configured")));
1326 
1327  remote_lsn = replorigin_session_get_progress(flush);
1328 
1329  if (remote_lsn == InvalidXLogRecPtr)
1330  PG_RETURN_NULL();
1331 
1332  PG_RETURN_LSN(remote_lsn);
1333 }
static ReplicationState * session_replication_state
Definition: origin.c:174
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1165
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:244
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define PG_RETURN_NULL()
Definition: fmgr.h:310

◆ pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1282 of file origin.c.

References InvalidRepOriginId, InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and replorigin_session_reset().

1283 {
1284  replorigin_check_prerequisites(true, false);
1285 
1287 
1291 
1292  PG_RETURN_VOID();
1293 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void replorigin_session_reset(void)
Definition: origin.c:1118
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
#define PG_RETURN_VOID()
Definition: fmgr.h:314
RepOriginId replorigin_session_origin
Definition: origin.c:155
#define InvalidRepOriginId
Definition: origin.h:34

◆ pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1260 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

1261 {
1262  char *name;
1263  RepOriginId origin;
1264 
1265  replorigin_check_prerequisites(true, false);
1266 
1268  origin = replorigin_by_name(name, false);
1269  replorigin_session_setup(origin);
1270 
1271  replorigin_session_origin = origin;
1272 
1273  pfree(name);
1274 
1275  PG_RETURN_VOID();
1276 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:238
uint16 RepOriginId
Definition: xlogdefs.h:51
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1030
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void pfree(void *pointer)
Definition: mcxt.c:1031
#define PG_RETURN_VOID()
Definition: fmgr.h:314
RepOriginId replorigin_session_origin
Definition: origin.c:155
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:532
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:516

◆ pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1354 of file origin.c.

References InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1355 {
1356  replorigin_check_prerequisites(true, false);
1357 
1360 
1361  PG_RETURN_VOID();
1362 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
#define PG_RETURN_VOID()
Definition: fmgr.h:314

◆ pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1336 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1337 {
1338  XLogRecPtr location = PG_GETARG_LSN(0);
1339 
1340  replorigin_check_prerequisites(true, false);
1341 
1342  if (session_replication_state == NULL)
1343  ereport(ERROR,
1344  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1345  errmsg("no replication origin is configured")));
1346 
1347  replorigin_session_origin_lsn = location;
1349 
1350  PG_RETURN_VOID();
1351 }
static ReplicationState * session_replication_state
Definition: origin.c:174
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
#define ereport(elevel, rest)
Definition: elog.h:122
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:314
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36

◆ pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1426 of file origin.c.

References ReturnSetInfo::allowedModes, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, InvalidRepOriginId, IsA, ReplicationState::local_lsn, ReplicationState::lock, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ObjectIdGetDatum, ReplicationState::remote_lsn, REPLICATION_ORIGIN_PROGRESS_COLS, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::returnMode, ReplicationState::roident, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1427 {
1428  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1429  TupleDesc tupdesc;
1430  Tuplestorestate *tupstore;
1431  MemoryContext per_query_ctx;
1432  MemoryContext oldcontext;
1433  int i;
1435 
1436  /* we we want to return 0 rows if slot is set to zero */
1437  replorigin_check_prerequisites(false, true);
1438 
1439  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1440  ereport(ERROR,
1441  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1442  errmsg("set-valued function called in context that cannot accept a set")));
1443  if (!(rsinfo->allowedModes & SFRM_Materialize))
1444  ereport(ERROR,
1445  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1446  errmsg("materialize mode required, but it is not allowed in this context")));
1447  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1448  elog(ERROR, "return type must be a row type");
1449 
1450  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1451  elog(ERROR, "wrong function definition");
1452 
1453  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1454  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1455 
1456  tupstore = tuplestore_begin_heap(true, false, work_mem);
1457  rsinfo->returnMode = SFRM_Materialize;
1458  rsinfo->setResult = tupstore;
1459  rsinfo->setDesc = tupdesc;
1460 
1461  MemoryContextSwitchTo(oldcontext);
1462 
1463 
1464  /* prevent slots from being concurrently dropped */
1465  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1466 
1467  /*
1468  * Iterate through all possible replication_states, display if they are
1469  * filled. Note that we do not take any locks, so slightly corrupted/out
1470  * of date values are a possibility.
1471  */
1472  for (i = 0; i < max_replication_slots; i++)
1473  {
1477  char *roname;
1478 
1479  state = &replication_states[i];
1480 
1481  /* unused slot, nothing to display */
1482  if (state->roident == InvalidRepOriginId)
1483  continue;
1484 
1485  memset(values, 0, sizeof(values));
1486  memset(nulls, 1, sizeof(nulls));
1487 
1488  values[0] = ObjectIdGetDatum(state->roident);
1489  nulls[0] = false;
1490 
1491  /*
1492  * We're not preventing the origin to be dropped concurrently, so
1493  * silently accept that it might be gone.
1494  */
1495  if (replorigin_by_oid(state->roident, true,
1496  &roname))
1497  {
1498  values[1] = CStringGetTextDatum(roname);
1499  nulls[1] = false;
1500  }
1501 
1502  LWLockAcquire(&state->lock, LW_SHARED);
1503 
1504  values[2] = LSNGetDatum(state->remote_lsn);
1505  nulls[2] = false;
1506 
1507  values[3] = LSNGetDatum(state->local_lsn);
1508  nulls[3] = false;
1509 
1510  LWLockRelease(&state->lock);
1511 
1512  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1513  }
1514 
1515  tuplestore_donestoring(tupstore);
1516 
1517  LWLockRelease(ReplicationOriginLock);
1518 
1519 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1520 
1521  return (Datum) 0;
1522 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define IsA(nodeptr, _type_)
Definition: nodes.h:568
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
XLogRecPtr local_lsn
Definition: origin.c:120
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:434
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
#define ObjectIdGetDatum(X)
Definition: postgres.h:490
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:135
#define ereport(elevel, rest)
Definition: elog.h:122
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
uintptr_t Datum
Definition: postgres.h:365
int work_mem
Definition: globals.c:122
int allowedModes
Definition: execnodes.h:297
SetFunctionReturnMode returnMode
Definition: execnodes.h:299
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:225
#define InvalidRepOriginId
Definition: origin.h:34
Tuplestorestate * setResult
Definition: execnodes.h:302
static Datum values[MAXATTR]
Definition: bootstrap.c:164
ExprContext * econtext
Definition: execnodes.h:295
TupleDesc setDesc
Definition: execnodes.h:303
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define CStringGetTextDatum(s)
Definition: builtins.h:95
#define elog
Definition: elog.h:219
#define REPLICATION_ORIGIN_PROGRESS_COLS

◆ ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 998 of file origin.c.

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, and ReplicationState::origin_cv.

Referenced by replorigin_session_setup().

999 {
1000  ConditionVariable *cv = NULL;
1001 
1002  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1003 
1004  if (session_replication_state != NULL &&
1006  {
1008 
1011  }
1012 
1013  LWLockRelease(ReplicationOriginLock);
1014 
1015  if (cv)
1017 }
static ReplicationState * session_replication_state
Definition: origin.c:174
int MyProcPid
Definition: globals.c:42
void ConditionVariableBroadcast(ConditionVariable *cv)
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 495 of file origin.c.

References ConditionVariableInit(), i, ReplicationState::lock, LWLockInitialize(), LWLockRegisterTranche(), LWTRANCHE_REPLICATION_ORIGIN, max_replication_slots, MemSet, ReplicationState::origin_cv, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateSharedMemoryAndSemaphores().

496 {
497  bool found;
498 
499  if (max_replication_slots == 0)
500  return;
501 
503  ShmemInitStruct("ReplicationOriginState",
505  &found);
507 
508  if (!found)
509  {
510  int i;
511 
513 
515 
516  for (i = 0; i < max_replication_slots; i++)
517  {
521  }
522  }
523 
525  "replication_origin");
526 }
#define MemSet(start, val, len)
Definition: c.h:908
void LWLockRegisterTranche(int tranche_id, const char *tranche_name)
Definition: lwlock.c:602
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:677
Size ReplicationOriginShmemSize(void)
Definition: origin.c:475
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:167
int max_replication_slots
Definition: slot.c:99
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:151
int i
static ReplicationState * replication_states
Definition: origin.c:166

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 475 of file origin.c.

References add_size(), max_replication_slots, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and ReplicationOriginShmemInit().

476 {
477  Size size = 0;
478 
479  /*
480  * XXX: max_replication_slots is arguably the wrong thing to use, as here
481  * we keep the replay state of *remote* transactions. But for now it seems
482  * sufficient to reuse it, lest we introduce a separate GUC.
483  */
484  if (max_replication_slots == 0)
485  return size;
486 
487  size = add_size(size, offsetof(ReplicationStateCtl, states));
488 
489  size = add_size(size,
491  return size;
492 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int max_replication_slots
Definition: slot.c:99
size_t Size
Definition: c.h:433
#define offsetof(type, field)
Definition: c.h:622

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 834 of file origin.c.

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), and xact_redo_commit().

837 {
838  int i;
839  ReplicationState *replication_state = NULL;
840  ReplicationState *free_state = NULL;
841 
842  Assert(node != InvalidRepOriginId);
843 
844  /* we don't track DoNotReplicateId */
845  if (node == DoNotReplicateId)
846  return;
847 
848  /*
849  * XXX: For the case where this is called by WAL replay, it'd be more
850  * efficient to restore into a backend local hashtable and only dump into
851  * shmem after recovery is finished. Let's wait with implementing that
852  * till it's shown to be a measurable expense
853  */
854 
855  /* Lock exclusively, as we may have to create a new table entry. */
856  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
857 
858  /*
859  * Search for either an existing slot for the origin, or a free one we can
860  * use.
861  */
862  for (i = 0; i < max_replication_slots; i++)
863  {
864  ReplicationState *curstate = &replication_states[i];
865 
866  /* remember where to insert if necessary */
867  if (curstate->roident == InvalidRepOriginId &&
868  free_state == NULL)
869  {
870  free_state = curstate;
871  continue;
872  }
873 
874  /* not our slot */
875  if (curstate->roident != node)
876  {
877  continue;
878  }
879 
880  /* ok, found slot */
881  replication_state = curstate;
882 
883  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
884 
885  /* Make sure it's not used by somebody else */
886  if (replication_state->acquired_by != 0)
887  {
888  ereport(ERROR,
889  (errcode(ERRCODE_OBJECT_IN_USE),
890  errmsg("replication origin with OID %d is already active for PID %d",
891  replication_state->roident,
892  replication_state->acquired_by)));
893  }
894 
895  break;
896  }
897 
898  if (replication_state == NULL && free_state == NULL)
899  ereport(ERROR,
900  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
901  errmsg("could not find free replication state slot for replication origin with OID %u",
902  node),
903  errhint("Increase max_replication_slots and try again.")));
904 
905  if (replication_state == NULL)
906  {
907  /* initialize new slot */
908  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
909  replication_state = free_state;
910  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
911  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
912  replication_state->roident = node;
913  }
914 
915  Assert(replication_state->roident != InvalidRepOriginId);
916 
917  /*
918  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
919  * and the standby gets the message. Primarily this will be called during
920  * WAL replay (of commit records) where no WAL logging is necessary.
921  */
922  if (wal_log)
923  {
924  xl_replorigin_set xlrec;
925 
926  xlrec.remote_lsn = remote_commit;
927  xlrec.node_id = node;
928  xlrec.force = go_backward;
929 
930  XLogBeginInsert();
931  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
932 
933  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
934  }
935 
936  /*
937  * Due to - harmless - race conditions during a checkpoint we could see
938  * values here that are older than the ones we already have in memory.
939  * Don't overwrite those.
940  */
941  if (go_backward || replication_state->remote_lsn < remote_commit)
942  replication_state->remote_lsn = remote_commit;
943  if (local_commit != InvalidXLogRecPtr &&
944  (go_backward || replication_state->local_lsn < local_commit))
945  replication_state->local_lsn = local_commit;
946  LWLockRelease(&replication_state->lock);
947 
948  /*
949  * Release *after* changing the LSNs, slot isn't acquired and thus could
950  * otherwise be dropped anytime.
951  */
952  LWLockRelease(ReplicationOriginLock);
953 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
#define DoNotReplicateId
Definition: origin.h:35
int errcode(int sqlerrcode)
Definition: elog.c:575
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:135
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
#define ereport(elevel, rest)
Definition: elog.h:122
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:699
RepOriginId node_id
Definition: origin.h:22
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
XLogRecPtr remote_lsn
Definition: origin.h:21
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
void XLogBeginInsert(void)
Definition: xloginsert.c:120

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( char *  roname,
bool  missing_ok 
)

Definition at line 211 of file origin.c.

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidOid, ReleaseSysCache(), REPLORIGNAME, ReplicationState::roident, and SearchSysCache1().

Referenced by ApplyWorkerMain(), DropSubscription(), pg_replication_origin_advance(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), and pg_replication_origin_session_setup().

212 {
214  Oid roident = InvalidOid;
215  HeapTuple tuple;
216  Datum roname_d;
217 
218  roname_d = CStringGetTextDatum(roname);
219 
220  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
221  if (HeapTupleIsValid(tuple))
222  {
223  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
224  roident = ident->roident;
225  ReleaseSysCache(tuple);
226  }
227  else if (!missing_ok)
228  ereport(ERROR,
229  (errcode(ERRCODE_UNDEFINED_OBJECT),
230  errmsg("replication origin \"%s\" does not exist",
231  roname)));
232 
233  return roident;
234 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:673
int errcode(int sqlerrcode)
Definition: elog.c:575
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
#define ereport(elevel, rest)
Definition: elog.h:122
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1112
uintptr_t Datum
Definition: postgres.h:365
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:95

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 434 of file origin.c.

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum, OidIsValid, ReleaseSysCache(), REPLORIGIDENT, SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and pgoutput_begin_txn().

435 {
436  HeapTuple tuple;
438 
439  Assert(OidIsValid((Oid) roident));
440  Assert(roident != InvalidRepOriginId);
441  Assert(roident != DoNotReplicateId);
442 
444  ObjectIdGetDatum((Oid) roident));
445 
446  if (HeapTupleIsValid(tuple))
447  {
448  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
449  *roname = text_to_cstring(&ric->roname);
450  ReleaseSysCache(tuple);
451 
452  return true;
453  }
454  else
455  {
456  *roname = NULL;
457 
458  if (!missing_ok)
459  ereport(ERROR,
460  (errcode(ERRCODE_UNDEFINED_OBJECT),
461  errmsg("replication origin with OID %u does not exist",
462  roident)));
463 
464  return false;
465  }
466 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:673
#define DoNotReplicateId
Definition: origin.h:35
int errcode(int sqlerrcode)
Definition: elog.c:575
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:605
#define ObjectIdGetDatum(X)
Definition: postgres.h:490
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
#define ereport(elevel, rest)
Definition: elog.h:122
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1112
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:699
#define InvalidRepOriginId
Definition: origin.h:34
char * text_to_cstring(const text *t)
Definition: varlena.c:182
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ replorigin_check_prerequisites()

static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 180 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, RecoveryInProgress(), and superuser().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

181 {
182  if (!superuser())
183  ereport(ERROR,
184  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
185  errmsg("only superusers can query or manipulate replication origins")));
186 
187  if (check_slots && max_replication_slots == 0)
188  ereport(ERROR,
189  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 
192  if (!recoveryOK && RecoveryInProgress())
193  ereport(ERROR,
194  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195  errmsg("cannot manipulate replication origins during recovery")));
196 
197 }
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
bool RecoveryInProgress(void)
Definition: xlog.c:7939
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ replorigin_create()

RepOriginId replorigin_create ( char *  roname)

Definition at line 242 of file origin.c.

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_close, heap_form_tuple(), heap_freetuple(), heap_open(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), ObjectIdGetDatum, PG_UINT16_MAX, RelationGetDescr, ReplicationOriginIdentIndex, ReplicationState::roident, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), and values.

Referenced by ApplyWorkerMain(), CreateSubscription(), and pg_replication_origin_create().

243 {
244  Oid roident;
245  HeapTuple tuple = NULL;
246  Relation rel;
247  Datum roname_d;
248  SnapshotData SnapshotDirty;
249  SysScanDesc scan;
250  ScanKeyData key;
251 
252  roname_d = CStringGetTextDatum(roname);
253 
255 
256  /*
257  * We need the numeric replication origin to be 16bit wide, so we cannot
258  * rely on the normal oid allocation. Instead we simply scan
259  * pg_replication_origin for the first unused id. That's not particularly
260  * efficient, but this should be a fairly infrequent operation - we can
261  * easily spend a bit more code on this when it turns out it needs to be
262  * faster.
263  *
264  * We handle concurrency by taking an exclusive lock (allowing reads!)
265  * over the table for the duration of the search. Because we use a "dirty
266  * snapshot" we can read rows that other in-progress sessions have
267  * written, even though they would be invisible with normal snapshots. Due
268  * to the exclusive lock there's no danger that new rows can appear while
269  * we're checking.
270  */
271  InitDirtySnapshot(SnapshotDirty);
272 
273  rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
274 
275  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
276  {
277  bool nulls[Natts_pg_replication_origin];
278  Datum values[Natts_pg_replication_origin];
279  bool collides;
280 
282 
283  ScanKeyInit(&key,
284  Anum_pg_replication_origin_roident,
285  BTEqualStrategyNumber, F_OIDEQ,
286  ObjectIdGetDatum(roident));
287 
289  true /* indexOK */ ,
290  &SnapshotDirty,
291  1, &key);
292 
293  collides = HeapTupleIsValid(systable_getnext(scan));
294 
295  systable_endscan(scan);
296 
297  if (!collides)
298  {
299  /*
300  * Ok, found an unused roident, insert the new row and do a CCI,
301  * so our callers can look it up if they want to.
302  */
303  memset(&nulls, 0, sizeof(nulls));
304 
305  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
306  values[Anum_pg_replication_origin_roname - 1] = roname_d;
307 
308  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
309  CatalogTupleInsert(rel, tuple);
311  break;
312  }
313  }
314 
315  /* now release lock again, */
317 
318  if (tuple == NULL)
319  ereport(ERROR,
320  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
321  errmsg("could not find free replication origin OID")));
322 
323  heap_freetuple(tuple);
324  return roident;
325 }
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:502
#define RelationGetDescr(relation)
Definition: rel.h:433
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:575
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1773
unsigned int Oid
Definition: postgres_ext.h:31
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:331
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:419
#define ReplicationOriginIdentIndex
Definition: indexing.h:334
#define ObjectIdGetDatum(X)
Definition: postgres.h:490
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:406
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:163
#define InitDirtySnapshot(snapshotdata)
Definition: tqual.h:103
#define ereport(elevel, rest)
Definition: elog.h:122
uintptr_t Datum
Definition: postgres.h:365
void CommandCounterIncrement(void)
Definition: xact.c:914
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1294
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:699
bool IsTransactionState(void)
Definition: xact.c:350
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:95
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define BTEqualStrategyNumber
Definition: stratnum.h:31

◆ replorigin_drop()

void replorigin_drop ( RepOriginId  roident,
bool  nowait 
)

Definition at line 334 of file origin.c.

References ReplicationState::acquired_by, Assert, CatalogTupleDelete(), CommandCounterIncrement(), ConditionVariableCancelSleep(), ConditionVariableSleep(), elog, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_close, heap_open(), HeapTupleIsValid, i, InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, ObjectIdGetDatum, ReplicationState::origin_cv, ReleaseSysCache(), ReplicationState::remote_lsn, REPLORIGIDENT, ReplicationState::roident, SearchSysCache1(), HeapTupleData::t_self, WAIT_EVENT_REPLICATION_ORIGIN_DROP, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by DropSubscription(), and pg_replication_origin_drop().

335 {
336  HeapTuple tuple;
337  Relation rel;
338  int i;
339 
341 
342  /*
343  * To interlock against concurrent drops, we hold ExclusiveLock on
344  * pg_replication_origin throughout this function.
345  */
346  rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
347 
348  /*
349  * First, clean up the slot state info, if there is any matching slot.
350  */
351 restart:
352  tuple = NULL;
353  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
354 
355  for (i = 0; i < max_replication_slots; i++)
356  {
358 
359  if (state->roident == roident)
360  {
361  /* found our slot, is it busy? */
362  if (state->acquired_by != 0)
363  {
364  ConditionVariable *cv;
365 
366  if (nowait)
367  ereport(ERROR,
368  (errcode(ERRCODE_OBJECT_IN_USE),
369  errmsg("could not drop replication origin with OID %d, in use by PID %d",
370  state->roident,
371  state->acquired_by)));
372 
373  /*
374  * We must wait and then retry. Since we don't know which CV
375  * to wait on until here, we can't readily use
376  * ConditionVariablePrepareToSleep (calling it here would be
377  * wrong, since we could miss the signal if we did so); just
378  * use ConditionVariableSleep directly.
379  */
380  cv = &state->origin_cv;
381 
382  LWLockRelease(ReplicationOriginLock);
383 
385  goto restart;
386  }
387 
388  /* first make a WAL log entry */
389  {
390  xl_replorigin_drop xlrec;
391 
392  xlrec.node_id = roident;
393  XLogBeginInsert();
394  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
395  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
396  }
397 
398  /* then clear the in-memory slot */
399  state->roident = InvalidRepOriginId;
400  state->remote_lsn = InvalidXLogRecPtr;
401  state->local_lsn = InvalidXLogRecPtr;
402  break;
403  }
404  }
405  LWLockRelease(ReplicationOriginLock);
407 
408  /*
409  * Now, we can delete the catalog entry.
410  */
412  if (!HeapTupleIsValid(tuple))
413  elog(ERROR, "cache lookup failed for replication origin with oid %u",
414  roident);
415 
416  CatalogTupleDelete(rel, &tuple->t_self);
417  ReleaseSysCache(tuple);
418 
420 
421  /* now release lock again */
423 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:575
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:256
#define heap_close(r, l)
Definition: heapam.h:97
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
void ConditionVariableCancelSleep(void)
#define ObjectIdGetDatum(X)
Definition: postgres.h:490
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define ereport(elevel, rest)
Definition: elog.h:122
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1112
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
void CommandCounterIncrement(void)
Definition: xact.c:914
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1294
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:113
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
RepOriginId node_id
Definition: origin.h:28
#define Assert(condition)
Definition: c.h:699
Definition: regguts.h:298
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
bool IsTransactionState(void)
Definition: xact.c:350
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 957 of file origin.c.

References i, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::remote_lsn, ReplicationState::roident, and XLogFlush().

Referenced by pg_replication_origin_progress().

958 {
959  int i;
960  XLogRecPtr local_lsn = InvalidXLogRecPtr;
961  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
962 
963  /* prevent slots from being concurrently dropped */
964  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
965 
966  for (i = 0; i < max_replication_slots; i++)
967  {
969 
970  state = &replication_states[i];
971 
972  if (state->roident == node)
973  {
974  LWLockAcquire(&state->lock, LW_SHARED);
975 
976  remote_lsn = state->remote_lsn;
977  local_lsn = state->local_lsn;
978 
979  LWLockRelease(&state->lock);
980 
981  break;
982  }
983  }
984 
985  LWLockRelease(ReplicationOriginLock);
986 
987  if (flush && local_lsn != InvalidXLogRecPtr)
988  XLogFlush(local_lsn);
989 
990  return remote_lsn;
991 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
LWLock lock
Definition: origin.c:135
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
int i
static ReplicationState * replication_states
Definition: origin.c:166

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 773 of file origin.c.

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, replorigin_advance(), ReplicationState::roident, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

774 {
775  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
776 
777  switch (info)
778  {
779  case XLOG_REPLORIGIN_SET:
780  {
781  xl_replorigin_set *xlrec =
782  (xl_replorigin_set *) XLogRecGetData(record);
783 
785  xlrec->remote_lsn, record->EndRecPtr,
786  xlrec->force /* backward */ ,
787  false /* WAL log */ );
788  break;
789  }
791  {
792  xl_replorigin_drop *xlrec;
793  int i;
794 
795  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
796 
797  for (i = 0; i < max_replication_slots; i++)
798  {
800 
801  /* found our slot */
802  if (state->roident == xlrec->node_id)
803  {
804  /* reset entry */
805  state->roident = InvalidRepOriginId;
806  state->remote_lsn = InvalidXLogRecPtr;
807  state->local_lsn = InvalidXLogRecPtr;
808  break;
809  }
810  }
811  break;
812  }
813  default:
814  elog(PANIC, "replorigin_redo: unknown op code %u", info);
815  }
816 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
unsigned char uint8
Definition: c.h:323
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:834
#define PANIC
Definition: elog.h:53
RepOriginId roident
Definition: origin.c:108
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
#define XLogRecGetData(decoder)
Definition: xlogreader.h:226
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:222
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
RepOriginId node_id
Definition: origin.h:28
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:22
XLogRecPtr remote_lsn
Definition: origin.h:21
#define InvalidRepOriginId
Definition: origin.h:34
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define elog
Definition: elog.h:219

◆ replorigin_session_advance()

void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1147 of file origin.c.

References Assert, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and ReplicationState::roident.

Referenced by EndPrepare(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

1148 {
1151 
1153  if (session_replication_state->local_lsn < local_commit)
1154  session_replication_state->local_lsn = local_commit;
1155  if (session_replication_state->remote_lsn < remote_commit)
1156  session_replication_state->remote_lsn = remote_commit;
1158 }
static ReplicationState * session_replication_state
Definition: origin.c:174
XLogRecPtr local_lsn
Definition: origin.c:120
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
LWLock lock
Definition: origin.c:135
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:699
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
#define InvalidRepOriginId
Definition: origin.h:34

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1165 of file origin.c.

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and XLogFlush().

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_progress().

1166 {
1167  XLogRecPtr remote_lsn;
1168  XLogRecPtr local_lsn;
1169 
1171 
1173  remote_lsn = session_replication_state->remote_lsn;
1174  local_lsn = session_replication_state->local_lsn;
1176 
1177  if (flush && local_lsn != InvalidXLogRecPtr)
1178  XLogFlush(local_lsn);
1179 
1180  return remote_lsn;
1181 }
static ReplicationState * session_replication_state
Definition: origin.c:174
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
LWLock lock
Definition: origin.c:135
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1118 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, and ReplicationState::origin_cv.

Referenced by pg_replication_origin_session_reset().

1119 {
1120  ConditionVariable *cv;
1121 
1123 
1124  if (session_replication_state == NULL)
1125  ereport(ERROR,
1126  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1127  errmsg("no replication origin is configured")));
1128 
1129  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1130 
1134 
1135  LWLockRelease(ReplicationOriginLock);
1136 
1138 }
static ReplicationState * session_replication_state
Definition: origin.c:174
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:575
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
#define Assert(condition)
Definition: c.h:699
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node)

Definition at line 1030 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, ReplicationOriginExitCleanup(), and ReplicationState::roident.

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_setup().

1031 {
1032  static bool registered_cleanup;
1033  int i;
1034  int free_slot = -1;
1035 
1036  if (!registered_cleanup)
1037  {
1039  registered_cleanup = true;
1040  }
1041 
1043 
1044  if (session_replication_state != NULL)
1045  ereport(ERROR,
1046  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1047  errmsg("cannot setup replication origin when one is already setup")));
1048 
1049  /* Lock exclusively, as we may have to create a new table entry. */
1050  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1051 
1052  /*
1053  * Search for either an existing slot for the origin, or a free one we can
1054  * use.
1055  */
1056  for (i = 0; i < max_replication_slots; i++)
1057  {
1058  ReplicationState *curstate = &replication_states[i];
1059 
1060  /* remember where to insert if necessary */
1061  if (curstate->roident == InvalidRepOriginId &&
1062  free_slot == -1)
1063  {
1064  free_slot = i;
1065  continue;
1066  }
1067 
1068  /* not our slot */
1069  if (curstate->roident != node)
1070  continue;
1071 
1072  else if (curstate->acquired_by != 0)
1073  {
1074  ereport(ERROR,
1075  (errcode(ERRCODE_OBJECT_IN_USE),
1076  errmsg("replication identifier %d is already active for PID %d",
1077  curstate->roident, curstate->acquired_by)));
1078  }
1079 
1080  /* ok, found slot */
1081  session_replication_state = curstate;
1082  }
1083 
1084 
1085  if (session_replication_state == NULL && free_slot == -1)
1086  ereport(ERROR,
1087  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1088  errmsg("could not find free replication state slot for replication origin with OID %u",
1089  node),
1090  errhint("Increase max_replication_slots and try again.")));
1091  else if (session_replication_state == NULL)
1092  {
1093  /* initialize new slot */
1098  }
1099 
1100 
1102 
1104 
1105  LWLockRelease(ReplicationOriginLock);
1106 
1107  /* probably this one is pointless */
1109 }
static ReplicationState * session_replication_state
Definition: origin.c:174
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:42
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:998
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:575
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
#define ERROR
Definition: elog.h:43
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:359
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:699
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 658 of file origin.c.

References Assert, CloseTransientFile(), COMP_CRC32C, DEBUG2, elog, ereport, errcode(), errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

659 {
660  const char *path = "pg_logical/replorigin_checkpoint";
661  int fd;
662  int readBytes;
664  int last_state = 0;
665  pg_crc32c file_crc;
666  pg_crc32c crc;
667 
668  /* don't want to overwrite already existing state */
669 #ifdef USE_ASSERT_CHECKING
670  static bool already_started = false;
671 
672  Assert(!already_started);
673  already_started = true;
674 #endif
675 
676  if (max_replication_slots == 0)
677  return;
678 
679  INIT_CRC32C(crc);
680 
681  elog(DEBUG2, "starting up replication origin progress state");
682 
683  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
684 
685  /*
686  * might have had max_replication_slots == 0 last run, or we just brought
687  * up a standby.
688  */
689  if (fd < 0 && errno == ENOENT)
690  return;
691  else if (fd < 0)
692  ereport(PANIC,
694  errmsg("could not open file \"%s\": %m",
695  path)));
696 
697  /* verify magic, that is written even if nothing was active */
698  readBytes = read(fd, &magic, sizeof(magic));
699  if (readBytes != sizeof(magic))
700  ereport(PANIC,
701  (errmsg("could not read file \"%s\": %m",
702  path)));
703  COMP_CRC32C(crc, &magic, sizeof(magic));
704 
705  if (magic != REPLICATION_STATE_MAGIC)
706  ereport(PANIC,
707  (errmsg("replication checkpoint has wrong magic %u instead of %u",
708  magic, REPLICATION_STATE_MAGIC)));
709 
710  /* we can skip locking here, no other access is possible */
711 
712  /* recover individual states, until there are no more to be found */
713  while (true)
714  {
715  ReplicationStateOnDisk disk_state;
716 
717  readBytes = read(fd, &disk_state, sizeof(disk_state));
718 
719  /* no further data */
720  if (readBytes == sizeof(crc))
721  {
722  /* not pretty, but simple ... */
723  file_crc = *(pg_crc32c *) &disk_state;
724  break;
725  }
726 
727  if (readBytes < 0)
728  {
729  ereport(PANIC,
731  errmsg("could not read file \"%s\": %m",
732  path)));
733  }
734 
735  if (readBytes != sizeof(disk_state))
736  {
737  ereport(PANIC,
739  errmsg("could not read file \"%s\": read %d of %zu",
740  path, readBytes, sizeof(disk_state))));
741  }
742 
743  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
744 
745  if (last_state == max_replication_slots)
746  ereport(PANIC,
747  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
748  errmsg("could not find free replication state, increase max_replication_slots")));
749 
750  /* copy data to shared memory */
751  replication_states[last_state].roident = disk_state.roident;
752  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
753  last_state++;
754 
755  elog(LOG, "recovered replication state of node %u to %X/%X",
756  disk_state.roident,
757  (uint32) (disk_state.remote_lsn >> 32),
758  (uint32) disk_state.remote_lsn);
759  }
760 
761  /* now check checksum */
762  FIN_CRC32C(crc);
763  if (file_crc != crc)
764  ereport(PANIC,
765  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
766  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
767  crc, file_crc)));
768 
769  CloseTransientFile(fd);
770 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr remote_lsn
Definition: origin.c:144
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:143
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
#define PANIC
Definition: elog.h:53
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1080
RepOriginId roident
Definition: origin.c:108
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2396
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:325
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2566
#define REPLICATION_STATE_MAGIC
Definition: origin.c:177
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:699
int errmsg(const char *fmt,...)
Definition: elog.c:797
static ReplicationState * replication_states
Definition: origin.c:166
#define elog
Definition: elog.h:219
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
#define read(a, b, c)
Definition: win32.h:13

Variable Documentation

◆ replication_states

ReplicationState* replication_states
static

Definition at line 166 of file origin.c.

◆ replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 167 of file origin.c.

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp

◆ session_replication_state

ReplicationState* session_replication_state = NULL
static

Definition at line 174 of file origin.c.