PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "funcapi.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "nodes/execnodes.h"
#include "replication/origin.h"
#include "replication/logical.h"
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct
ReplicationStateOnDisk 
ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
RepOriginId replorigin_by_name (char *roname, bool missing_ok)
 
RepOriginId replorigin_create (char *roname)
 
void replorigin_drop (RepOriginId roident, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

#define REPLICATION_ORIGIN_PROGRESS_COLS   4
#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 177 of file origin.c.

Referenced by CheckPointReplicationOrigin(), and StartupReplicationOrigin().

Typedef Documentation

Function Documentation

void CheckPointReplicationOrigin ( void  )

Definition at line 524 of file origin.c.

References CloseTransientFile(), COMP_CRC32C, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, ReplicationStateOnDisk::roident, unlink(), write, and XLogFlush().

Referenced by CheckPointGuts().

525 {
526  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
527  const char *path = "pg_logical/replorigin_checkpoint";
528  int tmpfd;
529  int i;
531  pg_crc32c crc;
532 
533  if (max_replication_slots == 0)
534  return;
535 
536  INIT_CRC32C(crc);
537 
538  /* make sure no old temp file is remaining */
539  if (unlink(tmppath) < 0 && errno != ENOENT)
540  ereport(PANIC,
542  errmsg("could not remove file \"%s\": %m",
543  tmppath)));
544 
545  /*
546  * no other backend can perform this at the same time, we're protected by
547  * CheckpointLock.
548  */
549  tmpfd = OpenTransientFile((char *) tmppath,
550  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
551  S_IRUSR | S_IWUSR);
552  if (tmpfd < 0)
553  ereport(PANIC,
555  errmsg("could not create file \"%s\": %m",
556  tmppath)));
557 
558  /* write magic */
559  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
560  {
561  CloseTransientFile(tmpfd);
562  ereport(PANIC,
564  errmsg("could not write to file \"%s\": %m",
565  tmppath)));
566  }
567  COMP_CRC32C(crc, &magic, sizeof(magic));
568 
569  /* prevent concurrent creations/drops */
570  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
571 
572  /* write actual data */
573  for (i = 0; i < max_replication_slots; i++)
574  {
575  ReplicationStateOnDisk disk_state;
576  ReplicationState *curstate = &replication_states[i];
577  XLogRecPtr local_lsn;
578 
579  if (curstate->roident == InvalidRepOriginId)
580  continue;
581 
582  /* zero, to avoid uninitialized padding bytes */
583  memset(&disk_state, 0, sizeof(disk_state));
584 
585  LWLockAcquire(&curstate->lock, LW_SHARED);
586 
587  disk_state.roident = curstate->roident;
588 
589  disk_state.remote_lsn = curstate->remote_lsn;
590  local_lsn = curstate->local_lsn;
591 
592  LWLockRelease(&curstate->lock);
593 
594  /* make sure we only write out a commit that's persistent */
595  XLogFlush(local_lsn);
596 
597  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
598  sizeof(disk_state))
599  {
600  CloseTransientFile(tmpfd);
601  ereport(PANIC,
603  errmsg("could not write to file \"%s\": %m",
604  tmppath)));
605  }
606 
607  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
608  }
609 
610  LWLockRelease(ReplicationOriginLock);
611 
612  /* write out the CRC */
613  FIN_CRC32C(crc);
614  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
615  {
616  CloseTransientFile(tmpfd);
617  ereport(PANIC,
619  errmsg("could not write to file \"%s\": %m",
620  tmppath)));
621  }
622 
623  CloseTransientFile(tmpfd);
624 
625  /* fsync, rename to permanent file, fsync file and directory */
626  durable_rename(tmppath, path, PANIC);
627 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr local_lsn
Definition: origin.c:120
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_lsn
Definition: origin.c:144
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:143
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2773
#define PG_BINARY
Definition: c.h:1027
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
LWLock lock
Definition: origin.c:135
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:258
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:593
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define REPLICATION_STATE_MAGIC
Definition: origin.c:177
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1346 of file origin.c.

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, ReplicationOriginRelationId, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

1347 {
1348  text *name = PG_GETARG_TEXT_PP(0);
1349  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1350  RepOriginId node;
1351 
1352  replorigin_check_prerequisites(true, false);
1353 
1354  /* lock to prevent the replication origin from vanishing */
1356 
1357  node = replorigin_by_name(text_to_cstring(name), false);
1358 
1359  /*
1360  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1361  * xact hasn't committed yet. This is why this function should be used to
1362  * set up the initial replication state, but not for replay.
1363  */
1364  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1365  true /* go backward */ , true /* WAL log */ );
1366 
1368 
1369  PG_RETURN_VOID();
1370 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:182
uint16 RepOriginId
Definition: xlogdefs.h:51
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:814
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:273
#define RowExclusiveLock
Definition: lockdefs.h:38
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:309
uint64 XLogRecPtr
Definition: xlogdefs.h:21
const char * name
Definition: encode.c:521
char * text_to_cstring(const text *t)
Definition: varlena.c:182
#define ReplicationOriginRelationId
Definition: c.h:433
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:105
Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1177 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), and text_to_cstring().

1178 {
1179  char *name;
1180  RepOriginId roident;
1181 
1182  replorigin_check_prerequisites(false, false);
1183 
1185  roident = replorigin_create(name);
1186 
1187  pfree(name);
1188 
1189  PG_RETURN_OID(roident);
1190 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
void pfree(void *pointer)
Definition: mcxt.c:949
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:433
#define PG_RETURN_OID(x)
Definition: fmgr.h:320
Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1196 of file origin.c.

References Assert, DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_drop(), and text_to_cstring().

1197 {
1198  char *name;
1199  RepOriginId roident;
1200 
1201  replorigin_check_prerequisites(false, false);
1202 
1204 
1205  roident = replorigin_by_name(name, false);
1206  Assert(OidIsValid(roident));
1207 
1208  replorigin_drop(roident, true);
1209 
1210  pfree(name);
1211 
1212  PG_RETURN_VOID();
1213 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:332
uint16 RepOriginId
Definition: xlogdefs.h:51
#define OidIsValid(objectId)
Definition: c.h:532
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void pfree(void *pointer)
Definition: mcxt.c:949
#define PG_RETURN_VOID()
Definition: fmgr.h:309
#define Assert(condition)
Definition: c.h:664
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:433
Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1219 of file origin.c.

References DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

1220 {
1221  char *name;
1222  RepOriginId roident;
1223 
1224  replorigin_check_prerequisites(false, false);
1225 
1227  roident = replorigin_by_name(name, true);
1228 
1229  pfree(name);
1230 
1231  if (OidIsValid(roident))
1232  PG_RETURN_OID(roident);
1233  PG_RETURN_NULL();
1234 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
#define OidIsValid(objectId)
Definition: c.h:532
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void pfree(void *pointer)
Definition: mcxt.c:949
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:433
#define PG_RETURN_OID(x)
Definition: fmgr.h:320
#define PG_RETURN_NULL()
Definition: fmgr.h:305
Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1381 of file origin.c.

References Assert, DatumGetPointer, InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), and text_to_cstring().

1382 {
1383  char *name;
1384  bool flush;
1385  RepOriginId roident;
1386  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1387 
1388  replorigin_check_prerequisites(true, true);
1389 
1391  flush = PG_GETARG_BOOL(1);
1392 
1393  roident = replorigin_by_name(name, false);
1394  Assert(OidIsValid(roident));
1395 
1396  remote_lsn = replorigin_get_progress(roident, flush);
1397 
1398  if (remote_lsn == InvalidXLogRecPtr)
1399  PG_RETURN_NULL();
1400 
1401  PG_RETURN_LSN(remote_lsn);
1402 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
#define OidIsValid(objectId)
Definition: c.h:532
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:664
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:433
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:937
#define PG_RETURN_NULL()
Definition: fmgr.h:305
Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1279 of file origin.c.

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

1280 {
1281  replorigin_check_prerequisites(false, false);
1282 
1284 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:319
RepOriginId replorigin_session_origin
Definition: origin.c:155
#define InvalidRepOriginId
Definition: origin.h:34
Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1295 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), and replorigin_session_get_progress().

1296 {
1297  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1298  bool flush = PG_GETARG_BOOL(0);
1299 
1300  replorigin_check_prerequisites(true, false);
1301 
1302  if (session_replication_state == NULL)
1303  ereport(ERROR,
1304  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1305  errmsg("no replication origin is configured")));
1306 
1307  remote_lsn = replorigin_session_get_progress(flush);
1308 
1309  if (remote_lsn == InvalidXLogRecPtr)
1310  PG_RETURN_NULL();
1311 
1312  PG_RETURN_LSN(remote_lsn);
1313 }
static ReplicationState * session_replication_state
Definition: origin.c:174
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1145
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define PG_RETURN_NULL()
Definition: fmgr.h:305
Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1262 of file origin.c.

References InvalidRepOriginId, InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and replorigin_session_reset().

1263 {
1264  replorigin_check_prerequisites(true, false);
1265 
1267 
1271 
1272  PG_RETURN_VOID();
1273 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void replorigin_session_reset(void)
Definition: origin.c:1098
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
#define PG_RETURN_VOID()
Definition: fmgr.h:309
RepOriginId replorigin_session_origin
Definition: origin.c:155
#define InvalidRepOriginId
Definition: origin.h:34
Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1240 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

1241 {
1242  char *name;
1243  RepOriginId origin;
1244 
1245  replorigin_check_prerequisites(true, false);
1246 
1248  origin = replorigin_by_name(name, false);
1249  replorigin_session_setup(origin);
1250 
1251  replorigin_session_origin = origin;
1252 
1253  pfree(name);
1254 
1255  PG_RETURN_VOID();
1256 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1010
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void pfree(void *pointer)
Definition: mcxt.c:949
#define PG_RETURN_VOID()
Definition: fmgr.h:309
RepOriginId replorigin_session_origin
Definition: origin.c:155
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:433
Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1334 of file origin.c.

References InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1335 {
1336  replorigin_check_prerequisites(true, false);
1337 
1340 
1341  PG_RETURN_VOID();
1342 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
#define PG_RETURN_VOID()
Definition: fmgr.h:309
Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1316 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1317 {
1318  XLogRecPtr location = PG_GETARG_LSN(0);
1319 
1320  replorigin_check_prerequisites(true, false);
1321 
1322  if (session_replication_state == NULL)
1323  ereport(ERROR,
1324  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1325  errmsg("no replication origin is configured")));
1326 
1327  replorigin_session_origin_lsn = location;
1329 
1330  PG_RETURN_VOID();
1331 }
static ReplicationState * session_replication_state
Definition: origin.c:174
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
#define ereport(elevel, rest)
Definition: elog.h:122
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:309
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36
Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1406 of file origin.c.

References ReturnSetInfo::allowedModes, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, InvalidRepOriginId, IsA, ReplicationState::local_lsn, ReplicationState::lock, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ObjectIdGetDatum, ReplicationState::remote_lsn, REPLICATION_ORIGIN_PROGRESS_COLS, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::returnMode, ReplicationState::roident, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1407 {
1408  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1409  TupleDesc tupdesc;
1410  Tuplestorestate *tupstore;
1411  MemoryContext per_query_ctx;
1412  MemoryContext oldcontext;
1413  int i;
1415 
1416  /* we we want to return 0 rows if slot is set to zero */
1417  replorigin_check_prerequisites(false, true);
1418 
1419  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1420  ereport(ERROR,
1421  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1422  errmsg("set-valued function called in context that cannot accept a set")));
1423  if (!(rsinfo->allowedModes & SFRM_Materialize))
1424  ereport(ERROR,
1425  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1426  errmsg("materialize mode required, but it is not allowed in this context")));
1427  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1428  elog(ERROR, "return type must be a row type");
1429 
1430  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1431  elog(ERROR, "wrong function definition");
1432 
1433  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1434  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1435 
1436  tupstore = tuplestore_begin_heap(true, false, work_mem);
1437  rsinfo->returnMode = SFRM_Materialize;
1438  rsinfo->setResult = tupstore;
1439  rsinfo->setDesc = tupdesc;
1440 
1441  MemoryContextSwitchTo(oldcontext);
1442 
1443 
1444  /* prevent slots from being concurrently dropped */
1445  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1446 
1447  /*
1448  * Iterate through all possible replication_states, display if they are
1449  * filled. Note that we do not take any locks, so slightly corrupted/out
1450  * of date values are a possibility.
1451  */
1452  for (i = 0; i < max_replication_slots; i++)
1453  {
1457  char *roname;
1458 
1459  state = &replication_states[i];
1460 
1461  /* unused slot, nothing to display */
1462  if (state->roident == InvalidRepOriginId)
1463  continue;
1464 
1465  memset(values, 0, sizeof(values));
1466  memset(nulls, 1, sizeof(nulls));
1467 
1468  values[0] = ObjectIdGetDatum(state->roident);
1469  nulls[0] = false;
1470 
1471  /*
1472  * We're not preventing the origin to be dropped concurrently, so
1473  * silently accept that it might be gone.
1474  */
1475  if (replorigin_by_oid(state->roident, true,
1476  &roname))
1477  {
1478  values[1] = CStringGetTextDatum(roname);
1479  nulls[1] = false;
1480  }
1481 
1482  LWLockAcquire(&state->lock, LW_SHARED);
1483 
1484  values[2] = LSNGetDatum(state->remote_lsn);
1485  nulls[2] = false;
1486 
1487  values[3] = LSNGetDatum(state->local_lsn);
1488  nulls[3] = false;
1489 
1490  LWLockRelease(&state->lock);
1491 
1492  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1493  }
1494 
1495  tuplestore_donestoring(tupstore);
1496 
1497  LWLockRelease(ReplicationOriginLock);
1498 
1499 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1500 
1501  return (Datum) 0;
1502 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
XLogRecPtr local_lsn
Definition: origin.c:120
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:415
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:135
#define ereport(elevel, rest)
Definition: elog.h:122
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
uintptr_t Datum
Definition: postgres.h:372
int work_mem
Definition: globals.c:113
int allowedModes
Definition: execnodes.h:268
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
#define InvalidRepOriginId
Definition: origin.h:34
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:164
ExprContext * econtext
Definition: execnodes.h:266
TupleDesc setDesc
Definition: execnodes.h:274
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
#define REPLICATION_ORIGIN_PROGRESS_COLS
static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 978 of file origin.c.

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, and ReplicationState::origin_cv.

Referenced by replorigin_session_setup().

979 {
980  ConditionVariable *cv = NULL;
981 
982  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
983 
984  if (session_replication_state != NULL &&
986  {
988 
991  }
992 
993  LWLockRelease(ReplicationOriginLock);
994 
995  if (cv)
997 }
static ReplicationState * session_replication_state
Definition: origin.c:174
int MyProcPid
Definition: globals.c:39
int ConditionVariableBroadcast(ConditionVariable *cv)
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
void ReplicationOriginShmemInit ( void  )

Definition at line 474 of file origin.c.

References ConditionVariableInit(), i, LWLockInitialize(), LWLockRegisterTranche(), LWTRANCHE_REPLICATION_ORIGIN, max_replication_slots, MemSet, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateSharedMemoryAndSemaphores().

475 {
476  bool found;
477 
478  if (max_replication_slots == 0)
479  return;
480 
482  ShmemInitStruct("ReplicationOriginState",
484  &found);
486 
487  if (!found)
488  {
489  int i;
490 
492 
494 
495  for (i = 0; i < max_replication_slots; i++)
496  {
500  }
501  }
502 
504  "replication_origin");
505 }
#define MemSet(start, val, len)
Definition: c.h:846
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:673
Size ReplicationOriginShmemSize(void)
Definition: origin.c:454
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:167
int max_replication_slots
Definition: slot.c:99
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:151
void LWLockRegisterTranche(int tranche_id, char *tranche_name)
Definition: lwlock.c:598
int i
static ReplicationState * replication_states
Definition: origin.c:166
Size ReplicationOriginShmemSize ( void  )

Definition at line 454 of file origin.c.

References add_size(), max_replication_slots, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and ReplicationOriginShmemInit().

455 {
456  Size size = 0;
457 
458  /*
459  * XXX: max_replication_slots is arguably the wrong thing to use, as here
460  * we keep the replay state of *remote* transactions. But for now it seems
461  * sufficient to reuse it, lest we introduce a separate GUC.
462  */
463  if (max_replication_slots == 0)
464  return size;
465 
466  size = add_size(size, offsetof(ReplicationStateCtl, states));
467 
468  size = add_size(size,
470  return size;
471 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int max_replication_slots
Definition: slot.c:99
size_t Size
Definition: c.h:350
#define offsetof(type, field)
Definition: c.h:549
void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 814 of file origin.c.

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by pg_replication_origin_advance(), replorigin_redo(), and xact_redo_commit().

817 {
818  int i;
819  ReplicationState *replication_state = NULL;
820  ReplicationState *free_state = NULL;
821 
822  Assert(node != InvalidRepOriginId);
823 
824  /* we don't track DoNotReplicateId */
825  if (node == DoNotReplicateId)
826  return;
827 
828  /*
829  * XXX: For the case where this is called by WAL replay, it'd be more
830  * efficient to restore into a backend local hashtable and only dump into
831  * shmem after recovery is finished. Let's wait with implementing that
832  * till it's shown to be a measurable expense
833  */
834 
835  /* Lock exclusively, as we may have to create a new table entry. */
836  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
837 
838  /*
839  * Search for either an existing slot for the origin, or a free one we can
840  * use.
841  */
842  for (i = 0; i < max_replication_slots; i++)
843  {
844  ReplicationState *curstate = &replication_states[i];
845 
846  /* remember where to insert if necessary */
847  if (curstate->roident == InvalidRepOriginId &&
848  free_state == NULL)
849  {
850  free_state = curstate;
851  continue;
852  }
853 
854  /* not our slot */
855  if (curstate->roident != node)
856  {
857  continue;
858  }
859 
860  /* ok, found slot */
861  replication_state = curstate;
862 
863  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
864 
865  /* Make sure it's not used by somebody else */
866  if (replication_state->acquired_by != 0)
867  {
868  ereport(ERROR,
869  (errcode(ERRCODE_OBJECT_IN_USE),
870  errmsg("replication origin with OID %d is already active for PID %d",
871  replication_state->roident,
872  replication_state->acquired_by)));
873  }
874 
875  break;
876  }
877 
878  if (replication_state == NULL && free_state == NULL)
879  ereport(ERROR,
880  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
881  errmsg("could not find free replication state slot for replication origin with OID %u",
882  node),
883  errhint("Increase max_replication_slots and try again.")));
884 
885  if (replication_state == NULL)
886  {
887  /* initialize new slot */
888  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
889  replication_state = free_state;
890  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
891  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
892  replication_state->roident = node;
893  }
894 
895  Assert(replication_state->roident != InvalidRepOriginId);
896 
897  /*
898  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
899  * and the standby gets the message. Primarily this will be called during
900  * WAL replay (of commit records) where no WAL logging is necessary.
901  */
902  if (wal_log)
903  {
904  xl_replorigin_set xlrec;
905 
906  xlrec.remote_lsn = remote_commit;
907  xlrec.node_id = node;
908  xlrec.force = go_backward;
909 
910  XLogBeginInsert();
911  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
912 
913  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
914  }
915 
916  /*
917  * Due to - harmless - race conditions during a checkpoint we could see
918  * values here that are older than the ones we already have in memory.
919  * Don't overwrite those.
920  */
921  if (go_backward || replication_state->remote_lsn < remote_commit)
922  replication_state->remote_lsn = remote_commit;
923  if (local_commit != InvalidXLogRecPtr &&
924  (go_backward || replication_state->local_lsn < local_commit))
925  replication_state->local_lsn = local_commit;
926  LWLockRelease(&replication_state->lock);
927 
928  /*
929  * Release *after* changing the LSNs, slot isn't acquired and thus could
930  * otherwise be dropped anytime.
931  */
932  LWLockRelease(ReplicationOriginLock);
933 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
#define DoNotReplicateId
Definition: origin.h:35
int errcode(int sqlerrcode)
Definition: elog.c:575
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:135
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
#define ereport(elevel, rest)
Definition: elog.h:122
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:664
RepOriginId node_id
Definition: origin.h:22
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
XLogRecPtr remote_lsn
Definition: origin.h:21
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
void XLogBeginInsert(void)
Definition: xloginsert.c:120
RepOriginId replorigin_by_name ( char *  roname,
bool  missing_ok 
)

Definition at line 211 of file origin.c.

References CStringGetTextDatum, elog, ERROR, GETSTRUCT, HeapTupleIsValid, InvalidOid, ReleaseSysCache(), REPLORIGNAME, and SearchSysCache1.

Referenced by ApplyWorkerMain(), DropSubscription(), pg_replication_origin_advance(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), and pg_replication_origin_session_setup().

212 {
214  Oid roident = InvalidOid;
215  HeapTuple tuple;
216  Datum roname_d;
217 
218  roname_d = CStringGetTextDatum(roname);
219 
220  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
221  if (HeapTupleIsValid(tuple))
222  {
223  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
224  roident = ident->roident;
225  ReleaseSysCache(tuple);
226  }
227  else if (!missing_ok)
228  elog(ERROR, "cache lookup failed for replication origin '%s'",
229  roname);
230 
231  return roident;
232 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
unsigned int Oid
Definition: postgres_ext.h:31
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:159
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 415 of file origin.c.

References Assert, DoNotReplicateId, elog, ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum, OidIsValid, ReleaseSysCache(), REPLORIGIDENT, SearchSysCache1, and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and pgoutput_begin_txn().

416 {
417  HeapTuple tuple;
419 
420  Assert(OidIsValid((Oid) roident));
421  Assert(roident != InvalidRepOriginId);
422  Assert(roident != DoNotReplicateId);
423 
425  ObjectIdGetDatum((Oid) roident));
426 
427  if (HeapTupleIsValid(tuple))
428  {
429  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
430  *roname = text_to_cstring(&ric->roname);
431  ReleaseSysCache(tuple);
432 
433  return true;
434  }
435  else
436  {
437  *roname = NULL;
438 
439  if (!missing_ok)
440  elog(ERROR, "cache lookup failed for replication origin with oid %u",
441  roident);
442 
443  return false;
444  }
445 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
#define DoNotReplicateId
Definition: origin.h:35
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:532
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:159
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define Assert(condition)
Definition: c.h:664
#define InvalidRepOriginId
Definition: origin.h:34
char * text_to_cstring(const text *t)
Definition: varlena.c:182
#define elog
Definition: elog.h:219
static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 180 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, RecoveryInProgress(), and superuser().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

181 {
182  if (!superuser())
183  ereport(ERROR,
184  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
185  errmsg("only superusers can query or manipulate replication origins")));
186 
187  if (check_slots && max_replication_slots == 0)
188  ereport(ERROR,
189  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 
192  if (!recoveryOK && RecoveryInProgress())
193  ereport(ERROR,
194  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195  errmsg("cannot manipulate replication origins during recovery")));
196 
197 }
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
bool RecoveryInProgress(void)
Definition: xlog.c:7962
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
int errmsg(const char *fmt,...)
Definition: elog.c:797
RepOriginId replorigin_create ( char *  roname)

Definition at line 240 of file origin.c.

References Anum_pg_replication_origin_roident, Anum_pg_replication_origin_roname, Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_close, heap_form_tuple(), heap_freetuple(), heap_open(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), Natts_pg_replication_origin, ObjectIdGetDatum, PG_UINT16_MAX, RelationGetDescr, ReplicationOriginIdentIndex, ReplicationOriginRelationId, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), and values.

Referenced by ApplyWorkerMain(), CreateSubscription(), and pg_replication_origin_create().

241 {
242  Oid roident;
243  HeapTuple tuple = NULL;
244  Relation rel;
245  Datum roname_d;
246  SnapshotData SnapshotDirty;
247  SysScanDesc scan;
248  ScanKeyData key;
249 
250  roname_d = CStringGetTextDatum(roname);
251 
253 
254  /*
255  * We need the numeric replication origin to be 16bit wide, so we cannot
256  * rely on the normal oid allocation. Instead we simply scan
257  * pg_replication_origin for the first unused id. That's not particularly
258  * efficient, but this should be a fairly infrequent operation - we can
259  * easily spend a bit more code on this when it turns out it needs to be
260  * faster.
261  *
262  * We handle concurrency by taking an exclusive lock (allowing reads!)
263  * over the table for the duration of the search. Because we use a "dirty
264  * snapshot" we can read rows that other in-progress sessions have
265  * written, even though they would be invisible with normal snapshots. Due
266  * to the exclusive lock there's no danger that new rows can appear while
267  * we're checking.
268  */
269  InitDirtySnapshot(SnapshotDirty);
270 
272 
273  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
274  {
275  bool nulls[Natts_pg_replication_origin];
277  bool collides;
278 
280 
281  ScanKeyInit(&key,
283  BTEqualStrategyNumber, F_OIDEQ,
284  ObjectIdGetDatum(roident));
285 
287  true /* indexOK */ ,
288  &SnapshotDirty,
289  1, &key);
290 
291  collides = HeapTupleIsValid(systable_getnext(scan));
292 
293  systable_endscan(scan);
294 
295  if (!collides)
296  {
297  /*
298  * Ok, found an unused roident, insert the new row and do a CCI,
299  * so our callers can look it up if they want to.
300  */
301  memset(&nulls, 0, sizeof(nulls));
302 
304  values[Anum_pg_replication_origin_roname - 1] = roname_d;
305 
306  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
307  CatalogTupleInsert(rel, tuple);
309  break;
310  }
311  }
312 
313  /* now release lock again, */
315 
316  if (tuple == NULL)
317  ereport(ERROR,
318  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
319  errmsg("could not find free replication origin OID")));
320 
321  heap_freetuple(tuple);
322  return roident;
323 }
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:499
#define Anum_pg_replication_origin_roident
#define RelationGetDescr(relation)
Definition: rel.h:428
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:575
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:328
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:416
#define Anum_pg_replication_origin_roname
#define ReplicationOriginIdentIndex
Definition: indexing.h:333
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:323
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define InitDirtySnapshot(snapshotdata)
Definition: tqual.h:102
#define ereport(elevel, rest)
Definition: elog.h:122
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:923
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define Assert(condition)
Definition: c.h:664
bool IsTransactionState(void)
Definition: xact.c:351
#define Natts_pg_replication_origin
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define ReplicationOriginRelationId
int errmsg(const char *fmt,...)
Definition: elog.c:797
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void replorigin_drop ( RepOriginId  roident,
bool  nowait 
)

Definition at line 332 of file origin.c.

References ReplicationState::acquired_by, Assert, CatalogTupleDelete(), CommandCounterIncrement(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), elog, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_close, heap_open(), HeapTupleIsValid, i, InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, ObjectIdGetDatum, ReplicationState::origin_cv, ReleaseSysCache(), ReplicationState::remote_lsn, ReplicationOriginRelationId, REPLORIGIDENT, ReplicationState::roident, SearchSysCache1, HeapTupleData::t_self, WAIT_EVENT_REPLICATION_ORIGIN_DROP, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by DropSubscription(), and pg_replication_origin_drop().

333 {
334  HeapTuple tuple;
335  Relation rel;
336  int i;
337 
339 
341 
342 restart:
343  tuple = NULL;
344  /* cleanup the slot state info */
345  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
346 
347  for (i = 0; i < max_replication_slots; i++)
348  {
350 
351  /* found our slot */
352  if (state->roident == roident)
353  {
354  if (state->acquired_by != 0)
355  {
356  ConditionVariable *cv;
357 
358  if (nowait)
359  ereport(ERROR,
360  (errcode(ERRCODE_OBJECT_IN_USE),
361  errmsg("could not drop replication origin with OID %d, in use by PID %d",
362  state->roident,
363  state->acquired_by)));
364  cv = &state->origin_cv;
365 
366  LWLockRelease(ReplicationOriginLock);
370  goto restart;
371  }
372 
373  /* first WAL log */
374  {
375  xl_replorigin_drop xlrec;
376 
377  xlrec.node_id = roident;
378  XLogBeginInsert();
379  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
380  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
381  }
382 
383  /* then reset the in-memory entry */
384  state->roident = InvalidRepOriginId;
385  state->remote_lsn = InvalidXLogRecPtr;
386  state->local_lsn = InvalidXLogRecPtr;
387  break;
388  }
389  }
390  LWLockRelease(ReplicationOriginLock);
391 
393  if (!HeapTupleIsValid(tuple))
394  elog(ERROR, "cache lookup failed for replication origin with oid %u",
395  roident);
396 
397  CatalogTupleDelete(rel, &tuple->t_self);
398  ReleaseSysCache(tuple);
399 
401 
402  /* now release lock again */
404 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:575
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
#define heap_close(r, l)
Definition: heapam.h:97
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:159
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableCancelSleep(void)
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define ereport(elevel, rest)
Definition: elog.h:122
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
void CommandCounterIncrement(void)
Definition: xact.c:923
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:113
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
RepOriginId node_id
Definition: origin.h:28
#define Assert(condition)
Definition: c.h:664
Definition: regguts.h:298
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
bool IsTransactionState(void)
Definition: xact.c:351
#define InvalidRepOriginId
Definition: origin.h:34
#define ReplicationOriginRelationId
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 937 of file origin.c.

References i, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::remote_lsn, ReplicationState::roident, and XLogFlush().

Referenced by pg_replication_origin_progress().

938 {
939  int i;
940  XLogRecPtr local_lsn = InvalidXLogRecPtr;
941  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
942 
943  /* prevent slots from being concurrently dropped */
944  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
945 
946  for (i = 0; i < max_replication_slots; i++)
947  {
949 
950  state = &replication_states[i];
951 
952  if (state->roident == node)
953  {
954  LWLockAcquire(&state->lock, LW_SHARED);
955 
956  remote_lsn = state->remote_lsn;
957  local_lsn = state->local_lsn;
958 
959  LWLockRelease(&state->lock);
960 
961  break;
962  }
963  }
964 
965  LWLockRelease(ReplicationOriginLock);
966 
967  if (flush && local_lsn != InvalidXLogRecPtr)
968  XLogFlush(local_lsn);
969 
970  return remote_lsn;
971 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2773
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
LWLock lock
Definition: origin.c:135
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
int i
static ReplicationState * replication_states
Definition: origin.c:166
void replorigin_redo ( XLogReaderState record)

Definition at line 753 of file origin.c.

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, replorigin_advance(), ReplicationState::roident, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

754 {
755  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
756 
757  switch (info)
758  {
759  case XLOG_REPLORIGIN_SET:
760  {
761  xl_replorigin_set *xlrec =
762  (xl_replorigin_set *) XLogRecGetData(record);
763 
765  xlrec->remote_lsn, record->EndRecPtr,
766  xlrec->force /* backward */ ,
767  false /* WAL log */ );
768  break;
769  }
771  {
772  xl_replorigin_drop *xlrec;
773  int i;
774 
775  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
776 
777  for (i = 0; i < max_replication_slots; i++)
778  {
780 
781  /* found our slot */
782  if (state->roident == xlrec->node_id)
783  {
784  /* reset entry */
785  state->roident = InvalidRepOriginId;
786  state->remote_lsn = InvalidXLogRecPtr;
787  state->local_lsn = InvalidXLogRecPtr;
788  break;
789  }
790  }
791  break;
792  }
793  default:
794  elog(PANIC, "replorigin_redo: unknown op code %u", info);
795  }
796 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
unsigned char uint8
Definition: c.h:256
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:814
#define PANIC
Definition: elog.h:53
RepOriginId roident
Definition: origin.c:108
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
#define XLogRecGetData(decoder)
Definition: xlogreader.h:226
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:222
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
RepOriginId node_id
Definition: origin.h:28
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:22
XLogRecPtr remote_lsn
Definition: origin.h:21
#define InvalidRepOriginId
Definition: origin.h:34
int i
static ReplicationState * replication_states
Definition: origin.c:166
#define elog
Definition: elog.h:219
void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1127 of file origin.c.

References Assert, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and ReplicationState::roident.

Referenced by RecordTransactionCommit(), and RecordTransactionCommitPrepared().

1128 {
1131 
1133  if (session_replication_state->local_lsn < local_commit)
1134  session_replication_state->local_lsn = local_commit;
1135  if (session_replication_state->remote_lsn < remote_commit)
1136  session_replication_state->remote_lsn = remote_commit;
1138 }
static ReplicationState * session_replication_state
Definition: origin.c:174
XLogRecPtr local_lsn
Definition: origin.c:120
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
LWLock lock
Definition: origin.c:135
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:664
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
#define InvalidRepOriginId
Definition: origin.h:34
XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1145 of file origin.c.

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and XLogFlush().

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_progress().

1146 {
1147  XLogRecPtr remote_lsn;
1148  XLogRecPtr local_lsn;
1149 
1151 
1153  remote_lsn = session_replication_state->remote_lsn;
1154  local_lsn = session_replication_state->local_lsn;
1156 
1157  if (flush && local_lsn != InvalidXLogRecPtr)
1158  XLogFlush(local_lsn);
1159 
1160  return remote_lsn;
1161 }
static ReplicationState * session_replication_state
Definition: origin.c:174
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2773
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
LWLock lock
Definition: origin.c:135
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:664
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
void replorigin_session_reset ( void  )

Definition at line 1098 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, and ReplicationState::origin_cv.

Referenced by pg_replication_origin_session_reset().

1099 {
1100  ConditionVariable *cv;
1101 
1103 
1104  if (session_replication_state == NULL)
1105  ereport(ERROR,
1106  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1107  errmsg("no replication origin is configured")));
1108 
1109  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1110 
1114 
1115  LWLockRelease(ReplicationOriginLock);
1116 
1118 }
static ReplicationState * session_replication_state
Definition: origin.c:174
int ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:575
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
#define Assert(condition)
Definition: c.h:664
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
int errmsg(const char *fmt,...)
Definition: elog.c:797
void replorigin_session_setup ( RepOriginId  node)

Definition at line 1010 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, ReplicationOriginExitCleanup(), and ReplicationState::roident.

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_setup().

1011 {
1012  static bool registered_cleanup;
1013  int i;
1014  int free_slot = -1;
1015 
1016  if (!registered_cleanup)
1017  {
1019  registered_cleanup = true;
1020  }
1021 
1023 
1024  if (session_replication_state != NULL)
1025  ereport(ERROR,
1026  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1027  errmsg("cannot setup replication origin when one is already setup")));
1028 
1029  /* Lock exclusively, as we may have to create a new table entry. */
1030  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1031 
1032  /*
1033  * Search for either an existing slot for the origin, or a free one we can
1034  * use.
1035  */
1036  for (i = 0; i < max_replication_slots; i++)
1037  {
1038  ReplicationState *curstate = &replication_states[i];
1039 
1040  /* remember where to insert if necessary */
1041  if (curstate->roident == InvalidRepOriginId &&
1042  free_slot == -1)
1043  {
1044  free_slot = i;
1045  continue;
1046  }
1047 
1048  /* not our slot */
1049  if (curstate->roident != node)
1050  continue;
1051 
1052  else if (curstate->acquired_by != 0)
1053  {
1054  ereport(ERROR,
1055  (errcode(ERRCODE_OBJECT_IN_USE),
1056  errmsg("replication identifier %d is already active for PID %d",
1057  curstate->roident, curstate->acquired_by)));
1058  }
1059 
1060  /* ok, found slot */
1061  session_replication_state = curstate;
1062  }
1063 
1064 
1065  if (session_replication_state == NULL && free_slot == -1)
1066  ereport(ERROR,
1067  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1068  errmsg("could not find free replication state slot for replication origin with OID %u",
1069  node),
1070  errhint("Increase max_replication_slots and try again.")));
1071  else if (session_replication_state == NULL)
1072  {
1073  /* initialize new slot */
1078  }
1079 
1080 
1082 
1084 
1085  LWLockRelease(ReplicationOriginLock);
1086 
1087  /* probably this one is pointless */
1089 }
static ReplicationState * session_replication_state
Definition: origin.c:174
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:39
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:978
int ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:575
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define ERROR
Definition: elog.h:43
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:664
ConditionVariable origin_cv
Definition: origin.c:130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:166
void StartupReplicationOrigin ( void  )

Definition at line 638 of file origin.c.

References Assert, CloseTransientFile(), COMP_CRC32C, DEBUG2, elog, ereport, errcode(), errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

639 {
640  const char *path = "pg_logical/replorigin_checkpoint";
641  int fd;
642  int readBytes;
644  int last_state = 0;
645  pg_crc32c file_crc;
646  pg_crc32c crc;
647 
648  /* don't want to overwrite already existing state */
649 #ifdef USE_ASSERT_CHECKING
650  static bool already_started = false;
651 
652  Assert(!already_started);
653  already_started = true;
654 #endif
655 
656  if (max_replication_slots == 0)
657  return;
658 
659  INIT_CRC32C(crc);
660 
661  elog(DEBUG2, "starting up replication origin progress state");
662 
663  fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
664 
665  /*
666  * might have had max_replication_slots == 0 last run, or we just brought
667  * up a standby.
668  */
669  if (fd < 0 && errno == ENOENT)
670  return;
671  else if (fd < 0)
672  ereport(PANIC,
674  errmsg("could not open file \"%s\": %m",
675  path)));
676 
677  /* verify magic, that is written even if nothing was active */
678  readBytes = read(fd, &magic, sizeof(magic));
679  if (readBytes != sizeof(magic))
680  ereport(PANIC,
681  (errmsg("could not read file \"%s\": %m",
682  path)));
683  COMP_CRC32C(crc, &magic, sizeof(magic));
684 
685  if (magic != REPLICATION_STATE_MAGIC)
686  ereport(PANIC,
687  (errmsg("replication checkpoint has wrong magic %u instead of %u",
688  magic, REPLICATION_STATE_MAGIC)));
689 
690  /* we can skip locking here, no other access is possible */
691 
692  /* recover individual states, until there are no more to be found */
693  while (true)
694  {
695  ReplicationStateOnDisk disk_state;
696 
697  readBytes = read(fd, &disk_state, sizeof(disk_state));
698 
699  /* no further data */
700  if (readBytes == sizeof(crc))
701  {
702  /* not pretty, but simple ... */
703  file_crc = *(pg_crc32c *) &disk_state;
704  break;
705  }
706 
707  if (readBytes < 0)
708  {
709  ereport(PANIC,
711  errmsg("could not read file \"%s\": %m",
712  path)));
713  }
714 
715  if (readBytes != sizeof(disk_state))
716  {
717  ereport(PANIC,
719  errmsg("could not read file \"%s\": read %d of %zu",
720  path, readBytes, sizeof(disk_state))));
721  }
722 
723  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
724 
725  if (last_state == max_replication_slots)
726  ereport(PANIC,
727  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
728  errmsg("could not find free replication state, increase max_replication_slots")));
729 
730  /* copy data to shared memory */
731  replication_states[last_state].roident = disk_state.roident;
732  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
733  last_state++;
734 
735  elog(LOG, "recovered replication state of node %u to %X/%X",
736  disk_state.roident,
737  (uint32) (disk_state.remote_lsn >> 32),
738  (uint32) disk_state.remote_lsn);
739  }
740 
741  /* now check checksum */
742  FIN_CRC32C(crc);
743  if (file_crc != crc)
744  ereport(PANIC,
745  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
746  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
747  crc, file_crc)));
748 
749  CloseTransientFile(fd);
750 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr remote_lsn
Definition: origin.c:144
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:143
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
#define PANIC
Definition: elog.h:53
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1027
RepOriginId roident
Definition: origin.c:108
#define DEBUG2
Definition: elog.h:24
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:258
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define REPLICATION_STATE_MAGIC
Definition: origin.c:177
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:664
int errmsg(const char *fmt,...)
Definition: elog.c:797
static ReplicationState * replication_states
Definition: origin.c:166
#define elog
Definition: elog.h:219
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
#define read(a, b, c)
Definition: win32.h:13

Variable Documentation

ReplicationState* replication_states
static

Definition at line 166 of file origin.c.

ReplicationStateCtl* replication_states_ctl
static

Definition at line 167 of file origin.c.

ReplicationState* session_replication_state = NULL
static

Definition at line 174 of file origin.c.