PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "funcapi.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "nodes/execnodes.h"
#include "replication/origin.h"
#include "replication/logical.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/copydir.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct  ReplicationState
 
struct  ReplicationStateOnDisk
 
struct  ReplicationStateCtl
 

Macros

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
#define REPLICATION_ORIGIN_PROGRESS_COLS   4
 

Typedefs

typedef struct ReplicationState ReplicationState
 
typedef struct
ReplicationStateOnDisk 
ReplicationStateOnDisk
 
typedef struct ReplicationStateCtl ReplicationStateCtl
 

Functions

static void replorigin_check_prerequisites (bool check_slots, bool recoveryOK)
 
RepOriginId replorigin_by_name (char *roname, bool missing_ok)
 
RepOriginId replorigin_create (char *roname)
 
void replorigin_drop (RepOriginId roident)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
static void ReplicationOriginExitCleanup (int code, Datum arg)
 
void replorigin_session_setup (RepOriginId node)
 
void replorigin_session_reset (void)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
Datum pg_replication_origin_create (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_drop (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_oid (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_is_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_session_progress (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_setup (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_xact_reset (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_advance (PG_FUNCTION_ARGS)
 
Datum pg_replication_origin_progress (PG_FUNCTION_ARGS)
 
Datum pg_show_replication_origin_status (PG_FUNCTION_ARGS)
 

Variables

RepOriginId replorigin_session_origin = InvalidRepOriginId
 
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr
 
TimestampTz replorigin_session_origin_timestamp = 0
 
static ReplicationStatereplication_states
 
static ReplicationStateCtlreplication_states_ctl
 
static ReplicationStatesession_replication_state = NULL
 

Macro Definition Documentation

#define REPLICATION_ORIGIN_PROGRESS_COLS   4
#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 172 of file origin.c.

Referenced by CheckPointReplicationOrigin(), and StartupReplicationOrigin().

Typedef Documentation

Function Documentation

void CheckPointReplicationOrigin ( void  )

Definition at line 504 of file origin.c.

References CloseTransientFile(), COMP_CRC32C, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, ReplicationStateOnDisk::roident, unlink(), write, and XLogFlush().

Referenced by CheckPointGuts().

505 {
506  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
507  const char *path = "pg_logical/replorigin_checkpoint";
508  int tmpfd;
509  int i;
511  pg_crc32c crc;
512 
513  if (max_replication_slots == 0)
514  return;
515 
516  INIT_CRC32C(crc);
517 
518  /* make sure no old temp file is remaining */
519  if (unlink(tmppath) < 0 && errno != ENOENT)
520  ereport(PANIC,
522  errmsg("could not remove file \"%s\": %m",
523  tmppath)));
524 
525  /*
526  * no other backend can perform this at the same time, we're protected by
527  * CheckpointLock.
528  */
529  tmpfd = OpenTransientFile((char *) tmppath,
530  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
531  S_IRUSR | S_IWUSR);
532  if (tmpfd < 0)
533  ereport(PANIC,
535  errmsg("could not create file \"%s\": %m",
536  tmppath)));
537 
538  /* write magic */
539  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
540  {
541  CloseTransientFile(tmpfd);
542  ereport(PANIC,
544  errmsg("could not write to file \"%s\": %m",
545  tmppath)));
546  }
547  COMP_CRC32C(crc, &magic, sizeof(magic));
548 
549  /* prevent concurrent creations/drops */
550  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
551 
552  /* write actual data */
553  for (i = 0; i < max_replication_slots; i++)
554  {
555  ReplicationStateOnDisk disk_state;
556  ReplicationState *curstate = &replication_states[i];
557  XLogRecPtr local_lsn;
558 
559  if (curstate->roident == InvalidRepOriginId)
560  continue;
561 
562  /* zero, to avoid uninitialized padding bytes */
563  memset(&disk_state, 0, sizeof(disk_state));
564 
565  LWLockAcquire(&curstate->lock, LW_SHARED);
566 
567  disk_state.roident = curstate->roident;
568 
569  disk_state.remote_lsn = curstate->remote_lsn;
570  local_lsn = curstate->local_lsn;
571 
572  LWLockRelease(&curstate->lock);
573 
574  /* make sure we only write out a commit that's persistent */
575  XLogFlush(local_lsn);
576 
577  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
578  sizeof(disk_state))
579  {
580  CloseTransientFile(tmpfd);
581  ereport(PANIC,
583  errmsg("could not write to file \"%s\": %m",
584  tmppath)));
585  }
586 
587  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
588  }
589 
590  LWLockRelease(ReplicationOriginLock);
591 
592  /* write out the CRC */
593  FIN_CRC32C(crc);
594  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
595  {
596  CloseTransientFile(tmpfd);
597  ereport(PANIC,
599  errmsg("could not write to file \"%s\": %m",
600  tmppath)));
601  }
602 
603  CloseTransientFile(tmpfd);
604 
605  /* fsync, rename to permanent file, fsync file and directory */
606  durable_rename(tmppath, path, PANIC);
607 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr local_lsn
Definition: origin.c:120
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_lsn
Definition: origin.c:139
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:138
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2757
#define PG_BINARY
Definition: c.h:1038
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
LWLock lock
Definition: origin.c:130
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:593
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define REPLICATION_STATE_MAGIC
Definition: origin.c:172
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:161
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1311 of file origin.c.

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, ReplicationOriginRelationId, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

1312 {
1313  text *name = PG_GETARG_TEXT_PP(0);
1314  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1315  RepOriginId node;
1316 
1317  replorigin_check_prerequisites(true, false);
1318 
1319  /* lock to prevent the replication origin from vanishing */
1321 
1322  node = replorigin_by_name(text_to_cstring(name), false);
1323 
1324  /*
1325  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1326  * xact hasn't committed yet. This is why this function should be used to
1327  * set up the initial replication state, but not for replay.
1328  */
1329  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1330  true /* go backward */ , true /* WAL log */ );
1331 
1333 
1334  PG_RETURN_VOID();
1335 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:182
uint16 RepOriginId
Definition: xlogdefs.h:51
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:794
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:273
#define RowExclusiveLock
Definition: lockdefs.h:38
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:309
uint64 XLogRecPtr
Definition: xlogdefs.h:21
const char * name
Definition: encode.c:521
char * text_to_cstring(const text *t)
Definition: varlena.c:182
#define ReplicationOriginRelationId
Definition: c.h:439
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:105
Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1142 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), and text_to_cstring().

1143 {
1144  char *name;
1145  RepOriginId roident;
1146 
1147  replorigin_check_prerequisites(false, false);
1148 
1150  roident = replorigin_create(name);
1151 
1152  pfree(name);
1153 
1154  PG_RETURN_OID(roident);
1155 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
void pfree(void *pointer)
Definition: mcxt.c:950
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:439
#define PG_RETURN_OID(x)
Definition: fmgr.h:320
Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1161 of file origin.c.

References Assert, DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_drop(), and text_to_cstring().

1162 {
1163  char *name;
1164  RepOriginId roident;
1165 
1166  replorigin_check_prerequisites(false, false);
1167 
1169 
1170  roident = replorigin_by_name(name, false);
1171  Assert(OidIsValid(roident));
1172 
1173  replorigin_drop(roident);
1174 
1175  pfree(name);
1176 
1177  PG_RETURN_VOID();
1178 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
#define OidIsValid(objectId)
Definition: c.h:538
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
void pfree(void *pointer)
Definition: mcxt.c:950
void replorigin_drop(RepOriginId roident)
Definition: origin.c:327
#define PG_RETURN_VOID()
Definition: fmgr.h:309
#define Assert(condition)
Definition: c.h:675
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:439
Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1184 of file origin.c.

References DatumGetPointer, name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

1185 {
1186  char *name;
1187  RepOriginId roident;
1188 
1189  replorigin_check_prerequisites(false, false);
1190 
1192  roident = replorigin_by_name(name, true);
1193 
1194  pfree(name);
1195 
1196  if (OidIsValid(roident))
1197  PG_RETURN_OID(roident);
1198  PG_RETURN_NULL();
1199 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
#define OidIsValid(objectId)
Definition: c.h:538
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
void pfree(void *pointer)
Definition: mcxt.c:950
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:439
#define PG_RETURN_OID(x)
Definition: fmgr.h:320
#define PG_RETURN_NULL()
Definition: fmgr.h:305
Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1346 of file origin.c.

References Assert, DatumGetPointer, InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), and text_to_cstring().

1347 {
1348  char *name;
1349  bool flush;
1350  RepOriginId roident;
1351  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1352 
1353  replorigin_check_prerequisites(true, true);
1354 
1356  flush = PG_GETARG_BOOL(1);
1357 
1358  roident = replorigin_by_name(name, false);
1359  Assert(OidIsValid(roident));
1360 
1361  remote_lsn = replorigin_get_progress(roident, flush);
1362 
1363  if (remote_lsn == InvalidXLogRecPtr)
1364  PG_RETURN_NULL();
1365 
1366  PG_RETURN_LSN(remote_lsn);
1367 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
#define OidIsValid(objectId)
Definition: c.h:538
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:439
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:917
#define PG_RETURN_NULL()
Definition: fmgr.h:305
Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1244 of file origin.c.

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

1245 {
1246  replorigin_check_prerequisites(false, false);
1247 
1249 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:319
RepOriginId replorigin_session_origin
Definition: origin.c:150
#define InvalidRepOriginId
Definition: origin.h:34
Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1260 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, NULL, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), and replorigin_session_get_progress().

1261 {
1262  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1263  bool flush = PG_GETARG_BOOL(0);
1264 
1265  replorigin_check_prerequisites(true, false);
1266 
1268  ereport(ERROR,
1269  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1270  errmsg("no replication origin is configured")));
1271 
1272  remote_lsn = replorigin_session_get_progress(flush);
1273 
1274  if (remote_lsn == InvalidXLogRecPtr)
1275  PG_RETURN_NULL();
1276 
1277  PG_RETURN_LSN(remote_lsn);
1278 }
static ReplicationState * session_replication_state
Definition: origin.c:169
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1110
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define PG_RETURN_NULL()
Definition: fmgr.h:305
Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1227 of file origin.c.

References InvalidRepOriginId, InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and replorigin_session_reset().

1228 {
1229  replorigin_check_prerequisites(true, false);
1230 
1232 
1236 
1237  PG_RETURN_VOID();
1238 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void replorigin_session_reset(void)
Definition: origin.c:1068
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
#define PG_RETURN_VOID()
Definition: fmgr.h:309
RepOriginId replorigin_session_origin
Definition: origin.c:150
#define InvalidRepOriginId
Definition: origin.h:34
Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1205 of file origin.c.

References DatumGetPointer, name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

1206 {
1207  char *name;
1208  RepOriginId origin;
1209 
1210  replorigin_check_prerequisites(true, false);
1211 
1213  origin = replorigin_by_name(name, false);
1214  replorigin_session_setup(origin);
1215 
1216  replorigin_session_origin = origin;
1217 
1218  pfree(name);
1219 
1220  PG_RETURN_VOID();
1221 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint16 RepOriginId
Definition: xlogdefs.h:51
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:983
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
void pfree(void *pointer)
Definition: mcxt.c:950
#define PG_RETURN_VOID()
Definition: fmgr.h:309
RepOriginId replorigin_session_origin
Definition: origin.c:150
const char * name
Definition: encode.c:521
#define DatumGetPointer(X)
Definition: postgres.h:555
char * text_to_cstring(const text *t)
Definition: varlena.c:182
Definition: c.h:439
Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1299 of file origin.c.

References InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1300 {
1301  replorigin_check_prerequisites(true, false);
1302 
1305 
1306  PG_RETURN_VOID();
1307 }
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
#define PG_RETURN_VOID()
Definition: fmgr.h:309
Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1281 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, NULL, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

1282 {
1283  XLogRecPtr location = PG_GETARG_LSN(0);
1284 
1285  replorigin_check_prerequisites(true, false);
1286 
1288  ereport(ERROR,
1289  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1290  errmsg("no replication origin is configured")));
1291 
1292  replorigin_session_origin_lsn = location;
1294 
1295  PG_RETURN_VOID();
1296 }
static ReplicationState * session_replication_state
Definition: origin.c:169
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
#define ereport(elevel, rest)
Definition: elog.h:122
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
#define PG_RETURN_VOID()
Definition: fmgr.h:309
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36
Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1371 of file origin.c.

References ReturnSetInfo::allowedModes, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, InvalidRepOriginId, IsA, ReplicationState::local_lsn, ReplicationState::lock, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), NULL, ObjectIdGetDatum, ReplicationState::remote_lsn, REPLICATION_ORIGIN_PROGRESS_COLS, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::returnMode, ReplicationState::roident, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1372 {
1373  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1374  TupleDesc tupdesc;
1375  Tuplestorestate *tupstore;
1376  MemoryContext per_query_ctx;
1377  MemoryContext oldcontext;
1378  int i;
1380 
1381  /* we we want to return 0 rows if slot is set to zero */
1382  replorigin_check_prerequisites(false, true);
1383 
1384  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1385  ereport(ERROR,
1386  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1387  errmsg("set-valued function called in context that cannot accept a set")));
1388  if (!(rsinfo->allowedModes & SFRM_Materialize))
1389  ereport(ERROR,
1390  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1391  errmsg("materialize mode required, but it is not allowed in this context")));
1392  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1393  elog(ERROR, "return type must be a row type");
1394 
1395  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1396  elog(ERROR, "wrong function definition");
1397 
1398  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1399  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1400 
1401  tupstore = tuplestore_begin_heap(true, false, work_mem);
1402  rsinfo->returnMode = SFRM_Materialize;
1403  rsinfo->setResult = tupstore;
1404  rsinfo->setDesc = tupdesc;
1405 
1406  MemoryContextSwitchTo(oldcontext);
1407 
1408 
1409  /* prevent slots from being concurrently dropped */
1410  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1411 
1412  /*
1413  * Iterate through all possible replication_states, display if they are
1414  * filled. Note that we do not take any locks, so slightly corrupted/out
1415  * of date values are a possibility.
1416  */
1417  for (i = 0; i < max_replication_slots; i++)
1418  {
1422  char *roname;
1423 
1424  state = &replication_states[i];
1425 
1426  /* unused slot, nothing to display */
1427  if (state->roident == InvalidRepOriginId)
1428  continue;
1429 
1430  memset(values, 0, sizeof(values));
1431  memset(nulls, 1, sizeof(nulls));
1432 
1433  values[0] = ObjectIdGetDatum(state->roident);
1434  nulls[0] = false;
1435 
1436  /*
1437  * We're not preventing the origin to be dropped concurrently, so
1438  * silently accept that it might be gone.
1439  */
1440  if (replorigin_by_oid(state->roident, true,
1441  &roname))
1442  {
1443  values[1] = CStringGetTextDatum(roname);
1444  nulls[1] = false;
1445  }
1446 
1447  LWLockAcquire(&state->lock, LW_SHARED);
1448 
1449  values[2] = LSNGetDatum(state->remote_lsn);
1450  nulls[2] = false;
1451 
1452  values[3] = LSNGetDatum(state->local_lsn);
1453  nulls[3] = false;
1454 
1455  LWLockRelease(&state->lock);
1456 
1457  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1458  }
1459 
1460  tuplestore_donestoring(tupstore);
1461 
1462  LWLockRelease(ReplicationOriginLock);
1463 
1464 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1465 
1466  return (Datum) 0;
1467 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
XLogRecPtr local_lsn
Definition: origin.c:120
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:398
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:130
#define ereport(elevel, rest)
Definition: elog.h:122
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
uintptr_t Datum
Definition: postgres.h:372
int work_mem
Definition: globals.c:113
int allowedModes
Definition: execnodes.h:268
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define NULL
Definition: c.h:229
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
#define InvalidRepOriginId
Definition: origin.h:34
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:163
ExprContext * econtext
Definition: execnodes.h:266
TupleDesc setDesc
Definition: execnodes.h:274
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:161
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
#define REPLICATION_ORIGIN_PROGRESS_COLS
static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 958 of file origin.c.

References ReplicationState::acquired_by, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, and NULL.

Referenced by replorigin_session_setup().

959 {
960  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
961 
964  {
967  }
968 
969  LWLockRelease(ReplicationOriginLock);
970 }
static ReplicationState * session_replication_state
Definition: origin.c:169
int MyProcPid
Definition: globals.c:39
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define NULL
Definition: c.h:229
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
void ReplicationOriginShmemInit ( void  )

Definition at line 457 of file origin.c.

References i, LWLockInitialize(), LWLockRegisterTranche(), LWTRANCHE_REPLICATION_ORIGIN, max_replication_slots, MemSet, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateSharedMemoryAndSemaphores().

458 {
459  bool found;
460 
461  if (max_replication_slots == 0)
462  return;
463 
465  ShmemInitStruct("ReplicationOriginState",
467  &found);
469 
470  if (!found)
471  {
472  int i;
473 
475 
477 
478  for (i = 0; i < max_replication_slots; i++)
481  }
482 
484  "replication_origin");
485 }
#define MemSet(start, val, len)
Definition: c.h:857
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:667
Size ReplicationOriginShmemSize(void)
Definition: origin.c:437
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:162
int max_replication_slots
Definition: slot.c:99
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:146
void LWLockRegisterTranche(int tranche_id, char *tranche_name)
Definition: lwlock.c:592
int i
static ReplicationState * replication_states
Definition: origin.c:161
Size ReplicationOriginShmemSize ( void  )

Definition at line 437 of file origin.c.

References add_size(), max_replication_slots, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and ReplicationOriginShmemInit().

438 {
439  Size size = 0;
440 
441  /*
442  * XXX: max_replication_slots is arguably the wrong thing to use, as here
443  * we keep the replay state of *remote* transactions. But for now it seems
444  * sufficient to reuse it, lest we introduce a separate GUC.
445  */
446  if (max_replication_slots == 0)
447  return size;
448 
449  size = add_size(size, offsetof(ReplicationStateCtl, states));
450 
451  size = add_size(size,
453  return size;
454 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int max_replication_slots
Definition: slot.c:99
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555
void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 794 of file origin.c.

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, NULL, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by pg_replication_origin_advance(), replorigin_redo(), and xact_redo_commit().

797 {
798  int i;
799  ReplicationState *replication_state = NULL;
800  ReplicationState *free_state = NULL;
801 
802  Assert(node != InvalidRepOriginId);
803 
804  /* we don't track DoNotReplicateId */
805  if (node == DoNotReplicateId)
806  return;
807 
808  /*
809  * XXX: For the case where this is called by WAL replay, it'd be more
810  * efficient to restore into a backend local hashtable and only dump into
811  * shmem after recovery is finished. Let's wait with implementing that
812  * till it's shown to be a measurable expense
813  */
814 
815  /* Lock exclusively, as we may have to create a new table entry. */
816  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
817 
818  /*
819  * Search for either an existing slot for the origin, or a free one we can
820  * use.
821  */
822  for (i = 0; i < max_replication_slots; i++)
823  {
824  ReplicationState *curstate = &replication_states[i];
825 
826  /* remember where to insert if necessary */
827  if (curstate->roident == InvalidRepOriginId &&
828  free_state == NULL)
829  {
830  free_state = curstate;
831  continue;
832  }
833 
834  /* not our slot */
835  if (curstate->roident != node)
836  {
837  continue;
838  }
839 
840  /* ok, found slot */
841  replication_state = curstate;
842 
843  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
844 
845  /* Make sure it's not used by somebody else */
846  if (replication_state->acquired_by != 0)
847  {
848  ereport(ERROR,
849  (errcode(ERRCODE_OBJECT_IN_USE),
850  errmsg("replication origin with OID %d is already active for PID %d",
851  replication_state->roident,
852  replication_state->acquired_by)));
853  }
854 
855  break;
856  }
857 
858  if (replication_state == NULL && free_state == NULL)
859  ereport(ERROR,
860  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
861  errmsg("could not find free replication state slot for replication origin with OID %u",
862  node),
863  errhint("Increase max_replication_slots and try again.")));
864 
865  if (replication_state == NULL)
866  {
867  /* initialize new slot */
868  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
869  replication_state = free_state;
870  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
871  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
872  replication_state->roident = node;
873  }
874 
875  Assert(replication_state->roident != InvalidRepOriginId);
876 
877  /*
878  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
879  * and the standby gets the message. Primarily this will be called during
880  * WAL replay (of commit records) where no WAL logging is necessary.
881  */
882  if (wal_log)
883  {
884  xl_replorigin_set xlrec;
885 
886  xlrec.remote_lsn = remote_commit;
887  xlrec.node_id = node;
888  xlrec.force = go_backward;
889 
890  XLogBeginInsert();
891  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
892 
893  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
894  }
895 
896  /*
897  * Due to - harmless - race conditions during a checkpoint we could see
898  * values here that are older than the ones we already have in memory.
899  * Don't overwrite those.
900  */
901  if (go_backward || replication_state->remote_lsn < remote_commit)
902  replication_state->remote_lsn = remote_commit;
903  if (local_commit != InvalidXLogRecPtr &&
904  (go_backward || replication_state->local_lsn < local_commit))
905  replication_state->local_lsn = local_commit;
906  LWLockRelease(&replication_state->lock);
907 
908  /*
909  * Release *after* changing the LSNs, slot isn't acquired and thus could
910  * otherwise be dropped anytime.
911  */
912  LWLockRelease(ReplicationOriginLock);
913 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
#define DoNotReplicateId
Definition: origin.h:35
int errcode(int sqlerrcode)
Definition: elog.c:575
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define ERROR
Definition: elog.h:43
LWLock lock
Definition: origin.c:130
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
#define ereport(elevel, rest)
Definition: elog.h:122
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
RepOriginId node_id
Definition: origin.h:22
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
XLogRecPtr remote_lsn
Definition: origin.h:21
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:161
void XLogBeginInsert(void)
Definition: xloginsert.c:120
RepOriginId replorigin_by_name ( char *  roname,
bool  missing_ok 
)

Definition at line 206 of file origin.c.

References CStringGetTextDatum, elog, ERROR, GETSTRUCT, HeapTupleIsValid, InvalidOid, ReleaseSysCache(), REPLORIGNAME, and SearchSysCache1.

Referenced by ApplyWorkerMain(), DropSubscription(), pg_replication_origin_advance(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), and pg_replication_origin_session_setup().

207 {
209  Oid roident = InvalidOid;
210  HeapTuple tuple;
211  Datum roname_d;
212 
213  roname_d = CStringGetTextDatum(roname);
214 
215  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
216  if (HeapTupleIsValid(tuple))
217  {
218  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
219  roident = ident->roident;
220  ReleaseSysCache(tuple);
221  }
222  else if (!missing_ok)
223  elog(ERROR, "cache lookup failed for replication origin '%s'",
224  roname);
225 
226  return roident;
227 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
unsigned int Oid
Definition: postgres_ext.h:31
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:156
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 398 of file origin.c.

References Assert, DoNotReplicateId, elog, ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, NULL, ObjectIdGetDatum, OidIsValid, ReleaseSysCache(), REPLORIGIDENT, SearchSysCache1, and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and pgoutput_begin_txn().

399 {
400  HeapTuple tuple;
402 
403  Assert(OidIsValid((Oid) roident));
404  Assert(roident != InvalidRepOriginId);
405  Assert(roident != DoNotReplicateId);
406 
408  ObjectIdGetDatum((Oid) roident));
409 
410  if (HeapTupleIsValid(tuple))
411  {
412  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
413  *roname = text_to_cstring(&ric->roname);
414  ReleaseSysCache(tuple);
415 
416  return true;
417  }
418  else
419  {
420  *roname = NULL;
421 
422  if (!missing_ok)
423  elog(ERROR, "cache lookup failed for replication origin with oid %u",
424  roident);
425 
426  return false;
427  }
428 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
#define DoNotReplicateId
Definition: origin.h:35
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:538
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:156
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
FormData_pg_replication_origin * Form_pg_replication_origin
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define InvalidRepOriginId
Definition: origin.h:34
char * text_to_cstring(const text *t)
Definition: varlena.c:182
#define elog
Definition: elog.h:219
static void replorigin_check_prerequisites ( bool  check_slots,
bool  recoveryOK 
)
static

Definition at line 175 of file origin.c.

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, RecoveryInProgress(), and superuser().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

176 {
177  if (!superuser())
178  ereport(ERROR,
179  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
180  errmsg("only superusers can query or manipulate replication origins")));
181 
182  if (check_slots && max_replication_slots == 0)
183  ereport(ERROR,
184  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
185  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
186 
187  if (!recoveryOK && RecoveryInProgress())
188  ereport(ERROR,
189  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
190  errmsg("cannot manipulate replication origins during recovery")));
191 
192 }
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
bool RecoveryInProgress(void)
Definition: xlog.c:7878
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
int errmsg(const char *fmt,...)
Definition: elog.c:797
RepOriginId replorigin_create ( char *  roname)

Definition at line 235 of file origin.c.

References Anum_pg_replication_origin_roident, Anum_pg_replication_origin_roname, Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_close, heap_form_tuple(), heap_freetuple(), heap_open(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), Natts_pg_replication_origin, NULL, ObjectIdGetDatum, PG_UINT16_MAX, RelationGetDescr, ReplicationOriginIdentIndex, ReplicationOriginRelationId, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), and values.

Referenced by ApplyWorkerMain(), CreateSubscription(), and pg_replication_origin_create().

236 {
237  Oid roident;
238  HeapTuple tuple = NULL;
239  Relation rel;
240  Datum roname_d;
241  SnapshotData SnapshotDirty;
242  SysScanDesc scan;
243  ScanKeyData key;
244 
245  roname_d = CStringGetTextDatum(roname);
246 
248 
249  /*
250  * We need the numeric replication origin to be 16bit wide, so we cannot
251  * rely on the normal oid allocation. Instead we simply scan
252  * pg_replication_origin for the first unused id. That's not particularly
253  * efficient, but this should be a fairly infrequent operation - we can
254  * easily spend a bit more code on this when it turns out it needs to be
255  * faster.
256  *
257  * We handle concurrency by taking an exclusive lock (allowing reads!)
258  * over the table for the duration of the search. Because we use a "dirty
259  * snapshot" we can read rows that other in-progress sessions have
260  * written, even though they would be invisible with normal snapshots. Due
261  * to the exclusive lock there's no danger that new rows can appear while
262  * we're checking.
263  */
264  InitDirtySnapshot(SnapshotDirty);
265 
267 
268  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
269  {
270  bool nulls[Natts_pg_replication_origin];
272  bool collides;
273 
275 
276  ScanKeyInit(&key,
278  BTEqualStrategyNumber, F_OIDEQ,
279  ObjectIdGetDatum(roident));
280 
282  true /* indexOK */ ,
283  &SnapshotDirty,
284  1, &key);
285 
286  collides = HeapTupleIsValid(systable_getnext(scan));
287 
288  systable_endscan(scan);
289 
290  if (!collides)
291  {
292  /*
293  * Ok, found an unused roident, insert the new row and do a CCI,
294  * so our callers can look it up if they want to.
295  */
296  memset(&nulls, 0, sizeof(nulls));
297 
299  values[Anum_pg_replication_origin_roname - 1] = roname_d;
300 
301  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
302  CatalogTupleInsert(rel, tuple);
304  break;
305  }
306  }
307 
308  /* now release lock again, */
310 
311  if (tuple == NULL)
312  ereport(ERROR,
313  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
314  errmsg("could not find free replication origin OID")));
315 
316  heap_freetuple(tuple);
317  return roident;
318 }
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:499
#define Anum_pg_replication_origin_roident
#define RelationGetDescr(relation)
Definition: rel.h:428
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:575
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
unsigned int Oid
Definition: postgres_ext.h:31
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:328
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:416
#define Anum_pg_replication_origin_roname
#define ReplicationOriginIdentIndex
Definition: indexing.h:333
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:338
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define InitDirtySnapshot(snapshotdata)
Definition: tqual.h:100
#define ereport(elevel, rest)
Definition: elog.h:122
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:922
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
bool IsTransactionState(void)
Definition: xact.c:350
#define Natts_pg_replication_origin
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define ReplicationOriginRelationId
int errmsg(const char *fmt,...)
Definition: elog.c:797
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void replorigin_drop ( RepOriginId  roident)

Definition at line 327 of file origin.c.

References ReplicationState::acquired_by, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_close, heap_open(), HeapTupleIsValid, i, InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, NULL, ObjectIdGetDatum, ReleaseSysCache(), ReplicationState::remote_lsn, ReplicationOriginRelationId, REPLORIGIDENT, ReplicationState::roident, SearchSysCache1, HeapTupleData::t_self, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by DropSubscription(), and pg_replication_origin_drop().

328 {
329  HeapTuple tuple = NULL;
330  Relation rel;
331  int i;
332 
334 
336 
337  /* cleanup the slot state info */
338  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
339 
340  for (i = 0; i < max_replication_slots; i++)
341  {
343 
344  /* found our slot */
345  if (state->roident == roident)
346  {
347  if (state->acquired_by != 0)
348  {
349  ereport(ERROR,
350  (errcode(ERRCODE_OBJECT_IN_USE),
351  errmsg("could not drop replication origin with OID %d, in use by PID %d",
352  state->roident,
353  state->acquired_by)));
354  }
355 
356  /* first WAL log */
357  {
358  xl_replorigin_drop xlrec;
359 
360  xlrec.node_id = roident;
361  XLogBeginInsert();
362  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
363  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
364  }
365 
366  /* then reset the in-memory entry */
367  state->roident = InvalidRepOriginId;
368  state->remote_lsn = InvalidXLogRecPtr;
369  state->local_lsn = InvalidXLogRecPtr;
370  break;
371  }
372  }
373  LWLockRelease(ReplicationOriginLock);
374 
376  if (!HeapTupleIsValid(tuple))
377  elog(ERROR, "cache lookup failed for replication origin with oid %u",
378  roident);
379 
380  CatalogTupleDelete(rel, &tuple->t_self);
381  ReleaseSysCache(tuple);
382 
384 
385  /* now release lock again, */
387 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:575
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
#define heap_close(r, l)
Definition: heapam.h:97
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:156
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define ereport(elevel, rest)
Definition: elog.h:122
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
void CommandCounterIncrement(void)
Definition: xact.c:922
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
RepOriginId node_id
Definition: origin.h:28
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
bool IsTransactionState(void)
Definition: xact.c:350
#define InvalidRepOriginId
Definition: origin.h:34
#define ReplicationOriginRelationId
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:161
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 917 of file origin.c.

References i, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::remote_lsn, ReplicationState::roident, and XLogFlush().

Referenced by pg_replication_origin_progress().

918 {
919  int i;
920  XLogRecPtr local_lsn = InvalidXLogRecPtr;
921  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
922 
923  /* prevent slots from being concurrently dropped */
924  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
925 
926  for (i = 0; i < max_replication_slots; i++)
927  {
929 
930  state = &replication_states[i];
931 
932  if (state->roident == node)
933  {
934  LWLockAcquire(&state->lock, LW_SHARED);
935 
936  remote_lsn = state->remote_lsn;
937  local_lsn = state->local_lsn;
938 
939  LWLockRelease(&state->lock);
940 
941  break;
942  }
943  }
944 
945  LWLockRelease(ReplicationOriginLock);
946 
947  if (flush && local_lsn != InvalidXLogRecPtr)
948  XLogFlush(local_lsn);
949 
950  return remote_lsn;
951 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2757
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
LWLock lock
Definition: origin.c:130
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
int i
static ReplicationState * replication_states
Definition: origin.c:161
void replorigin_redo ( XLogReaderState record)

Definition at line 733 of file origin.c.

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, replorigin_advance(), ReplicationState::roident, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

734 {
735  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
736 
737  switch (info)
738  {
739  case XLOG_REPLORIGIN_SET:
740  {
741  xl_replorigin_set *xlrec =
742  (xl_replorigin_set *) XLogRecGetData(record);
743 
745  xlrec->remote_lsn, record->EndRecPtr,
746  xlrec->force /* backward */ ,
747  false /* WAL log */ );
748  break;
749  }
751  {
752  xl_replorigin_drop *xlrec;
753  int i;
754 
755  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
756 
757  for (i = 0; i < max_replication_slots; i++)
758  {
760 
761  /* found our slot */
762  if (state->roident == xlrec->node_id)
763  {
764  /* reset entry */
765  state->roident = InvalidRepOriginId;
766  state->remote_lsn = InvalidXLogRecPtr;
767  state->local_lsn = InvalidXLogRecPtr;
768  break;
769  }
770  }
771  break;
772  }
773  default:
774  elog(PANIC, "replorigin_redo: unknown op code %u", info);
775  }
776 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
unsigned char uint8
Definition: c.h:266
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:794
#define PANIC
Definition: elog.h:53
RepOriginId roident
Definition: origin.c:108
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
#define XLogRecGetData(decoder)
Definition: xlogreader.h:220
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:216
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
RepOriginId node_id
Definition: origin.h:28
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:22
XLogRecPtr remote_lsn
Definition: origin.h:21
#define InvalidRepOriginId
Definition: origin.h:34
int i
static ReplicationState * replication_states
Definition: origin.c:161
#define elog
Definition: elog.h:219
void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1092 of file origin.c.

References Assert, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NULL, ReplicationState::remote_lsn, and ReplicationState::roident.

Referenced by RecordTransactionCommit(), and RecordTransactionCommitPrepared().

1093 {
1096 
1098  if (session_replication_state->local_lsn < local_commit)
1099  session_replication_state->local_lsn = local_commit;
1100  if (session_replication_state->remote_lsn < remote_commit)
1101  session_replication_state->remote_lsn = remote_commit;
1103 }
static ReplicationState * session_replication_state
Definition: origin.c:169
XLogRecPtr local_lsn
Definition: origin.c:120
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
LWLock lock
Definition: origin.c:130
XLogRecPtr remote_lsn
Definition: origin.c:113
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define InvalidRepOriginId
Definition: origin.h:34
XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1110 of file origin.c.

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), NULL, ReplicationState::remote_lsn, and XLogFlush().

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_progress().

1111 {
1112  XLogRecPtr remote_lsn;
1113  XLogRecPtr local_lsn;
1114 
1116 
1118  remote_lsn = session_replication_state->remote_lsn;
1119  local_lsn = session_replication_state->local_lsn;
1121 
1122  if (flush && local_lsn != InvalidXLogRecPtr)
1123  XLogFlush(local_lsn);
1124 
1125  return remote_lsn;
1126 }
static ReplicationState * session_replication_state
Definition: origin.c:169
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:120
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2757
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
LWLock lock
Definition: origin.c:130
XLogRecPtr remote_lsn
Definition: origin.c:113
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
void replorigin_session_reset ( void  )

Definition at line 1068 of file origin.c.

References ReplicationState::acquired_by, Assert, ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, and NULL.

Referenced by pg_replication_origin_session_reset().

1069 {
1071 
1073  ereport(ERROR,
1074  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1075  errmsg("no replication origin is configured")));
1076 
1077  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1078 
1081 
1082  LWLockRelease(ReplicationOriginLock);
1083 }
static ReplicationState * session_replication_state
Definition: origin.c:169
int errcode(int sqlerrcode)
Definition: elog.c:575
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
int errmsg(const char *fmt,...)
Definition: elog.c:797
void replorigin_session_setup ( RepOriginId  node)

Definition at line 983 of file origin.c.

References ReplicationState::acquired_by, Assert, ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, NULL, on_shmem_exit(), ReplicationState::remote_lsn, ReplicationOriginExitCleanup(), and ReplicationState::roident.

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_setup().

984 {
985  static bool registered_cleanup;
986  int i;
987  int free_slot = -1;
988 
989  if (!registered_cleanup)
990  {
992  registered_cleanup = true;
993  }
994 
996 
998  ereport(ERROR,
999  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1000  errmsg("cannot setup replication origin when one is already setup")));
1001 
1002  /* Lock exclusively, as we may have to create a new table entry. */
1003  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1004 
1005  /*
1006  * Search for either an existing slot for the origin, or a free one we can
1007  * use.
1008  */
1009  for (i = 0; i < max_replication_slots; i++)
1010  {
1011  ReplicationState *curstate = &replication_states[i];
1012 
1013  /* remember where to insert if necessary */
1014  if (curstate->roident == InvalidRepOriginId &&
1015  free_slot == -1)
1016  {
1017  free_slot = i;
1018  continue;
1019  }
1020 
1021  /* not our slot */
1022  if (curstate->roident != node)
1023  continue;
1024 
1025  else if (curstate->acquired_by != 0)
1026  {
1027  ereport(ERROR,
1028  (errcode(ERRCODE_OBJECT_IN_USE),
1029  errmsg("replication identifier %d is already active for PID %d",
1030  curstate->roident, curstate->acquired_by)));
1031  }
1032 
1033  /* ok, found slot */
1034  session_replication_state = curstate;
1035  }
1036 
1037 
1038  if (session_replication_state == NULL && free_slot == -1)
1039  ereport(ERROR,
1040  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1041  errmsg("could not find free replication state slot for replication origin with OID %u",
1042  node),
1043  errhint("Increase max_replication_slots and try again.")));
1044  else if (session_replication_state == NULL)
1045  {
1046  /* initialize new slot */
1051  }
1052 
1053 
1055 
1057 
1058  LWLockRelease(ReplicationOriginLock);
1059 }
static ReplicationState * session_replication_state
Definition: origin.c:169
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:39
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:958
int errcode(int sqlerrcode)
Definition: elog.c:575
RepOriginId roident
Definition: origin.c:108
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define ERROR
Definition: elog.h:43
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define ereport(elevel, rest)
Definition: elog.h:122
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define InvalidRepOriginId
Definition: origin.h:34
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static ReplicationState * replication_states
Definition: origin.c:161
void StartupReplicationOrigin ( void  )

Definition at line 618 of file origin.c.

References Assert, CloseTransientFile(), COMP_CRC32C, DEBUG2, elog, ereport, errcode(), errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

619 {
620  const char *path = "pg_logical/replorigin_checkpoint";
621  int fd;
622  int readBytes;
624  int last_state = 0;
625  pg_crc32c file_crc;
626  pg_crc32c crc;
627 
628  /* don't want to overwrite already existing state */
629 #ifdef USE_ASSERT_CHECKING
630  static bool already_started = false;
631 
632  Assert(!already_started);
633  already_started = true;
634 #endif
635 
636  if (max_replication_slots == 0)
637  return;
638 
639  INIT_CRC32C(crc);
640 
641  elog(DEBUG2, "starting up replication origin progress state");
642 
643  fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
644 
645  /*
646  * might have had max_replication_slots == 0 last run, or we just brought
647  * up a standby.
648  */
649  if (fd < 0 && errno == ENOENT)
650  return;
651  else if (fd < 0)
652  ereport(PANIC,
654  errmsg("could not open file \"%s\": %m",
655  path)));
656 
657  /* verify magic, that is written even if nothing was active */
658  readBytes = read(fd, &magic, sizeof(magic));
659  if (readBytes != sizeof(magic))
660  ereport(PANIC,
661  (errmsg("could not read file \"%s\": %m",
662  path)));
663  COMP_CRC32C(crc, &magic, sizeof(magic));
664 
665  if (magic != REPLICATION_STATE_MAGIC)
666  ereport(PANIC,
667  (errmsg("replication checkpoint has wrong magic %u instead of %u",
668  magic, REPLICATION_STATE_MAGIC)));
669 
670  /* we can skip locking here, no other access is possible */
671 
672  /* recover individual states, until there are no more to be found */
673  while (true)
674  {
675  ReplicationStateOnDisk disk_state;
676 
677  readBytes = read(fd, &disk_state, sizeof(disk_state));
678 
679  /* no further data */
680  if (readBytes == sizeof(crc))
681  {
682  /* not pretty, but simple ... */
683  file_crc = *(pg_crc32c *) &disk_state;
684  break;
685  }
686 
687  if (readBytes < 0)
688  {
689  ereport(PANIC,
691  errmsg("could not read file \"%s\": %m",
692  path)));
693  }
694 
695  if (readBytes != sizeof(disk_state))
696  {
697  ereport(PANIC,
699  errmsg("could not read file \"%s\": read %d of %zu",
700  path, readBytes, sizeof(disk_state))));
701  }
702 
703  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
704 
705  if (last_state == max_replication_slots)
706  ereport(PANIC,
707  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
708  errmsg("could not find free replication state, increase max_replication_slots")));
709 
710  /* copy data to shared memory */
711  replication_states[last_state].roident = disk_state.roident;
712  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
713  last_state++;
714 
715  elog(LOG, "recovered replication state of node %u to %X/%X",
716  disk_state.roident,
717  (uint32) (disk_state.remote_lsn >> 32),
718  (uint32) disk_state.remote_lsn);
719  }
720 
721  /* now check checksum */
722  FIN_CRC32C(crc);
723  if (file_crc != crc)
724  ereport(PANIC,
725  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
726  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
727  crc, file_crc)));
728 
729  CloseTransientFile(fd);
730 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr remote_lsn
Definition: origin.c:139
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:138
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
#define PANIC
Definition: elog.h:53
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
RepOriginId roident
Definition: origin.c:108
#define DEBUG2
Definition: elog.h:24
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define REPLICATION_STATE_MAGIC
Definition: origin.c:172
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define Assert(condition)
Definition: c.h:675
int errmsg(const char *fmt,...)
Definition: elog.c:797
static ReplicationState * replication_states
Definition: origin.c:161
#define elog
Definition: elog.h:219
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
#define read(a, b, c)
Definition: win32.h:13

Variable Documentation

ReplicationState* replication_states
static

Definition at line 161 of file origin.c.

ReplicationStateCtl* replication_states_ctl
static

Definition at line 162 of file origin.c.

ReplicationState* session_replication_state = NULL
static

Definition at line 169 of file origin.c.