93 #include "utils/fmgroids.h"
182 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
189 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 errmsg(
"cannot query or manipulate replication origin when max_replication_slots = 0")));
194 (
errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195 errmsg(
"cannot manipulate replication origins during recovery")));
234 roident =
ident->roident;
237 else if (!missing_ok)
239 (
errcode(ERRCODE_UNDEFINED_OBJECT),
240 errmsg(
"replication origin \"%s\" does not exist",
287 bool nulls[Natts_pg_replication_origin];
294 Anum_pg_replication_origin_roident,
313 memset(&nulls, 0,
sizeof(nulls));
316 values[Anum_pg_replication_origin_roname - 1] = roname_d;
330 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
331 errmsg(
"could not find free replication origin ID")));
355 if (
state->roident == roident)
358 if (
state->acquired_by != 0)
364 (
errcode(ERRCODE_OBJECT_IN_USE),
365 errmsg(
"could not drop replication origin with ID %d, in use by PID %d",
367 state->acquired_by)));
376 cv = &
state->origin_cv;
431 elog(
ERROR,
"cache lookup failed for replication origin with ID %d",
491 (
errcode(ERRCODE_UNDEFINED_OBJECT),
492 errmsg(
"replication origin with ID %d does not exist",
575 const char *tmppath =
"pg_logical/replorigin_checkpoint.tmp";
576 const char *path =
"pg_logical/replorigin_checkpoint";
588 if (unlink(tmppath) < 0 && errno != ENOENT)
591 errmsg(
"could not remove file \"%s\": %m",
599 O_CREAT | O_EXCL | O_WRONLY |
PG_BINARY);
603 errmsg(
"could not create file \"%s\": %m",
608 if ((
write(tmpfd, &magic,
sizeof(magic))) !=
sizeof(magic))
615 errmsg(
"could not write to file \"%s\": %m",
634 memset(&disk_state, 0,
sizeof(disk_state));
649 if ((
write(tmpfd, &disk_state,
sizeof(disk_state))) !=
657 errmsg(
"could not write to file \"%s\": %m",
676 errmsg(
"could not write to file \"%s\": %m",
683 errmsg(
"could not close file \"%s\": %m",
701 const char *path =
"pg_logical/replorigin_checkpoint";
710 #ifdef USE_ASSERT_CHECKING
711 static bool already_started =
false;
714 already_started =
true;
722 elog(
DEBUG2,
"starting up replication origin progress state");
730 if (
fd < 0 && errno == ENOENT)
735 errmsg(
"could not open file \"%s\": %m",
739 readBytes =
read(
fd, &magic,
sizeof(magic));
740 if (readBytes !=
sizeof(magic))
745 errmsg(
"could not read file \"%s\": %m",
750 errmsg(
"could not read file \"%s\": read %d of %zu",
751 path, readBytes,
sizeof(magic))));
757 (
errmsg(
"replication checkpoint has wrong magic %u instead of %u",
767 readBytes =
read(
fd, &disk_state,
sizeof(disk_state));
770 if (readBytes ==
sizeof(
crc))
781 errmsg(
"could not read file \"%s\": %m",
785 if (readBytes !=
sizeof(disk_state))
789 errmsg(
"could not read file \"%s\": read %d of %zu",
790 path, readBytes,
sizeof(disk_state))));
797 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798 errmsg(
"could not find free replication state, increase max_replication_slots")));
806 (
errmsg(
"recovered replication state of node %d to %X/%X",
816 errmsg(
"replication slot checkpoint has wrong checksum %u, expected %u",
822 errmsg(
"could not close file \"%s\": %m",
868 elog(
PANIC,
"replorigin_redo: unknown op code %u", info);
890 bool go_backward,
bool wal_log)
924 free_state = curstate;
935 replication_state = curstate;
943 (
errcode(ERRCODE_OBJECT_IN_USE),
944 errmsg(
"replication origin with ID %d is already active for PID %d",
952 if (replication_state == NULL && free_state == NULL)
954 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955 errmsg(
"could not find free replication state slot for replication origin with ID %d",
957 errhint(
"Increase max_replication_slots and try again.")));
959 if (replication_state == NULL)
963 replication_state = free_state;
966 replication_state->
roident = node;
982 xlrec.
force = go_backward;
998 if (go_backward || replication_state->
remote_lsn < remote_commit)
999 replication_state->
remote_lsn = remote_commit;
1001 (go_backward || replication_state->
local_lsn < local_commit))
1002 replication_state->
local_lsn = local_commit;
1029 if (
state->roident == node)
1033 remote_lsn =
state->remote_lsn;
1034 local_lsn =
state->local_lsn;
1097 static bool registered_cleanup;
1101 if (!registered_cleanup)
1104 registered_cleanup =
true;
1111 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1112 errmsg(
"cannot setup replication origin when one is already setup")));
1134 if (curstate->
roident != node)
1137 else if (curstate->
acquired_by != 0 && acquired_by == 0)
1140 (
errcode(ERRCODE_OBJECT_IN_USE),
1141 errmsg(
"replication origin with ID %d is already active for PID %d",
1152 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1153 errmsg(
"could not find free replication state slot for replication origin with ID %d",
1155 errhint(
"Increase max_replication_slots and try again.")));
1168 if (acquired_by == 0)
1171 elog(
ERROR,
"could not find replication state slot for replication origin with OID %u which was acquired by %d",
1195 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1196 errmsg(
"no replication origin is configured")));
1281 (
errcode(ERRCODE_RESERVED_NAME),
1282 errmsg(
"replication origin name \"%s\" is reserved",
1284 errdetail(
"Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1291 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1292 if (strncmp(
name,
"regress_", 8) != 0)
1293 elog(
WARNING,
"replication origins created by regression test cases should have names starting with \"regress_\"");
1411 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1412 errmsg(
"no replication origin is configured")));
1431 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1432 errmsg(
"no replication origin is configured")));
1546 memset(nulls, 1,
sizeof(nulls));
1578 #undef REPLICATION_ORIGIN_PROGRESS_COLS
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
#define OidIsValid(objectId)
bool IsReservedName(const char *name)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
void ConditionVariableCancelSleep(void)
elog(ERROR, "%s: %s", p2, msg)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
int CloseTransientFile(int fd)
int OpenTransientFile(const char *fileName, int fileFlags)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_DATUM(n)
#define PG_GETARG_BOOL(n)
#define PG_RETURN_BOOL(x)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Assert(fmt[strlen(fmt) - 1] !='\n')
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
static ReplicationStateCtl * replication_states_ctl
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Size ReplicationOriginShmemSize(void)
RepOriginId replorigin_create(const char *roname)
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
void replorigin_session_reset(void)
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
static ReplicationState * replication_states
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
static void ReplicationOriginExitCleanup(int code, Datum arg)
void StartupReplicationOrigin(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
static void replorigin_state_clear(RepOriginId roident, bool nowait)
void replorigin_session_setup(RepOriginId node, int acquired_by)
void CheckPointReplicationOrigin(void)
static ReplicationState * session_replication_state
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
void ReplicationOriginShmemInit(void)
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
#define REPLICATION_STATE_MAGIC
XLogRecPtr replorigin_session_origin_lsn
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
void replorigin_redo(XLogReaderState *record)
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define InvalidRepOriginId
#define XLOG_REPLORIGIN_DROP
#define XLOG_REPLORIGIN_SET
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
static Datum LSNGetDatum(XLogRecPtr X)
FormData_pg_replication_origin * Form_pg_replication_origin
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_ORIGIN_ANY
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
static int fd(const char *x, int i)
#define RelationGetDescr(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
int max_replication_slots
#define InitDirtySnapshot(snapshotdata)
#define BTEqualStrategyNumber
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable origin_cv
Tuplestorestate * setResult
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
#define PG_GETARG_TIMESTAMPTZ(n)
char * text_to_cstring(const text *t)
@ WAIT_EVENT_REPLICATION_ORIGIN_DROP
bool IsTransactionState(void)
void CommandCounterIncrement(void)
bool RecoveryInProgress(void)
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
void XLogRegisterData(char *data, uint32 len)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)