92#include "utils/fmgroids.h"
100#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
187#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
194 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 errmsg(
"cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
199 (
errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 errmsg(
"cannot manipulate replication origins during recovery")));
239 roident =
ident->roident;
242 else if (!missing_ok)
244 (
errcode(ERRCODE_UNDEFINED_OBJECT),
245 errmsg(
"replication origin \"%s\" does not exist",
292 bool nulls[Natts_pg_replication_origin];
299 Anum_pg_replication_origin_roident,
318 memset(&nulls, 0,
sizeof(nulls));
321 values[Anum_pg_replication_origin_roname - 1] = roname_d;
335 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
336 errmsg(
"could not find free replication origin ID")));
360 if (
state->roident == roident)
363 if (
state->acquired_by != 0)
369 (
errcode(ERRCODE_OBJECT_IN_USE),
370 errmsg(
"could not drop replication origin with ID %d, in use by PID %d",
372 state->acquired_by)));
381 cv = &
state->origin_cv;
436 elog(
ERROR,
"cache lookup failed for replication origin with ID %d",
496 (
errcode(ERRCODE_UNDEFINED_OBJECT),
497 errmsg(
"replication origin with ID %d does not exist",
588 if (unlink(tmppath) < 0 && errno != ENOENT)
591 errmsg(
"could not remove file \"%s\": %m",
599 O_CREAT | O_EXCL | O_WRONLY |
PG_BINARY);
603 errmsg(
"could not create file \"%s\": %m",
608 if ((
write(tmpfd, &magic,
sizeof(magic))) !=
sizeof(magic))
615 errmsg(
"could not write to file \"%s\": %m",
634 memset(&disk_state, 0,
sizeof(disk_state));
649 if ((
write(tmpfd, &disk_state,
sizeof(disk_state))) !=
657 errmsg(
"could not write to file \"%s\": %m",
676 errmsg(
"could not write to file \"%s\": %m",
683 errmsg(
"could not close file \"%s\": %m",
710#ifdef USE_ASSERT_CHECKING
711 static bool already_started =
false;
714 already_started =
true;
722 elog(
DEBUG2,
"starting up replication origin progress state");
730 if (
fd < 0 && errno == ENOENT)
735 errmsg(
"could not open file \"%s\": %m",
739 readBytes =
read(
fd, &magic,
sizeof(magic));
740 if (readBytes !=
sizeof(magic))
745 errmsg(
"could not read file \"%s\": %m",
750 errmsg(
"could not read file \"%s\": read %d of %zu",
751 path, readBytes,
sizeof(magic))));
757 (
errmsg(
"replication checkpoint has wrong magic %u instead of %u",
767 readBytes =
read(
fd, &disk_state,
sizeof(disk_state));
770 if (readBytes ==
sizeof(
crc))
781 errmsg(
"could not read file \"%s\": %m",
785 if (readBytes !=
sizeof(disk_state))
789 errmsg(
"could not read file \"%s\": read %d of %zu",
790 path, readBytes,
sizeof(disk_state))));
797 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798 errmsg(
"could not find free replication state, increase \"max_active_replication_origins\"")));
806 (
errmsg(
"recovered replication state of node %d to %X/%X",
816 errmsg(
"replication slot checkpoint has wrong checksum %u, expected %u",
822 errmsg(
"could not close file \"%s\": %m",
868 elog(
PANIC,
"replorigin_redo: unknown op code %u", info);
890 bool go_backward,
bool wal_log)
924 free_state = curstate;
935 replication_state = curstate;
943 (
errcode(ERRCODE_OBJECT_IN_USE),
944 errmsg(
"replication origin with ID %d is already active for PID %d",
952 if (replication_state == NULL && free_state == NULL)
954 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955 errmsg(
"could not find free replication state slot for replication origin with ID %d",
957 errhint(
"Increase \"max_active_replication_origins\" and try again.")));
959 if (replication_state == NULL)
963 replication_state = free_state;
966 replication_state->
roident = node;
982 xlrec.
force = go_backward;
998 if (go_backward || replication_state->
remote_lsn < remote_commit)
999 replication_state->
remote_lsn = remote_commit;
1001 (go_backward || replication_state->
local_lsn < local_commit))
1002 replication_state->
local_lsn = local_commit;
1029 if (
state->roident == node)
1033 remote_lsn =
state->remote_lsn;
1034 local_lsn =
state->local_lsn;
1099 static bool registered_cleanup;
1103 if (!registered_cleanup)
1106 registered_cleanup =
true;
1113 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1114 errmsg(
"cannot setup replication origin when one is already setup")));
1136 if (curstate->
roident != node)
1139 else if (curstate->
acquired_by != 0 && acquired_by == 0)
1142 (
errcode(ERRCODE_OBJECT_IN_USE),
1143 errmsg(
"replication origin with ID %d is already active for PID %d",
1155 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1156 errmsg(
"could not find free replication state slot for replication origin with ID %d",
1158 errhint(
"Increase \"max_active_replication_origins\" and try again.")));
1171 if (acquired_by == 0)
1174 elog(
ERROR,
"could not find replication state slot for replication origin with OID %u which was acquired by %d",
1198 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1199 errmsg(
"no replication origin is configured")));
1284 (
errcode(ERRCODE_RESERVED_NAME),
1285 errmsg(
"replication origin name \"%s\" is reserved",
1287 errdetail(
"Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1288 LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1294#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1295 if (strncmp(
name,
"regress_", 8) != 0)
1296 elog(
WARNING,
"replication origins created by regression test cases should have names starting with \"regress_\"");
1414 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1415 errmsg(
"no replication origin is configured")));
1434 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1435 errmsg(
"no replication origin is configured")));
1549 memset(nulls, 1,
sizeof(nulls));
1581#undef REPLICATION_ORIGIN_PROGRESS_COLS
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
#define OidIsValid(objectId)
bool IsReservedName(const char *name)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
int CloseTransientFile(int fd)
int OpenTransientFile(const char *fileName, int fileFlags)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_DATUM(n)
#define PG_GETARG_BOOL(n)
#define PG_RETURN_BOOL(x)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
static ReplicationStateCtl * replication_states_ctl
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Size ReplicationOriginShmemSize(void)
RepOriginId replorigin_create(const char *roname)
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
void replorigin_session_reset(void)
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
int max_active_replication_origins
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
static ReplicationState * replication_states
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
static void ReplicationOriginExitCleanup(int code, Datum arg)
void StartupReplicationOrigin(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
static void replorigin_state_clear(RepOriginId roident, bool nowait)
void replorigin_session_setup(RepOriginId node, int acquired_by)
void CheckPointReplicationOrigin(void)
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
static ReplicationState * session_replication_state
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
void ReplicationOriginShmemInit(void)
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
#define REPLICATION_STATE_MAGIC
XLogRecPtr replorigin_session_origin_lsn
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
void replorigin_redo(XLogReaderState *record)
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define InvalidRepOriginId
#define XLOG_REPLORIGIN_DROP
#define XLOG_REPLORIGIN_SET
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
static Datum LSNGetDatum(XLogRecPtr X)
FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
static int fd(const char *x, int i)
#define RelationGetDescr(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define InitDirtySnapshot(snapshotdata)
#define BTEqualStrategyNumber
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable origin_cv
Tuplestorestate * setResult
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
#define PG_GETARG_TIMESTAMPTZ(n)
char * text_to_cstring(const text *t)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
bool RecoveryInProgress(void)
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)