93#include "utils/fmgroids.h"
102#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
103#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
171 .origin_timestamp = 0
204#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
212 errmsg(
"cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
217 errmsg(
"cannot manipulate replication origins during recovery")));
256 roident =
ident->roident;
259 else if (!missing_ok)
262 errmsg(
"replication origin \"%s\" does not exist",
292 errmsg(
"replication origin name is too long"),
293 errdetail(
"Replication origin names must be no longer than %d bytes.",
358 memset(&nulls, 0,
sizeof(nulls));
376 errmsg(
"could not find free replication origin ID")));
400 if (
state->roident == roident)
403 if (
state->refcount > 0)
410 (
state->acquired_by != 0)
411 ?
errmsg(
"could not drop replication origin with ID %d, in use by PID %d",
414 :
errmsg(
"could not drop replication origin with ID %d, in use by another process",
424 cv = &
state->origin_cv;
479 elog(
ERROR,
"cache lookup failed for replication origin with ID %d",
540 errmsg(
"replication origin with ID %d does not exist",
632 errmsg(
"could not remove file \"%s\": %m",
644 errmsg(
"could not create file \"%s\": %m",
649 if ((
write(
tmpfd, &magic,
sizeof(magic))) !=
sizeof(magic))
656 errmsg(
"could not write to file \"%s\": %m",
698 errmsg(
"could not write to file \"%s\": %m",
717 errmsg(
"could not write to file \"%s\": %m",
724 errmsg(
"could not close file \"%s\": %m",
751#ifdef USE_ASSERT_CHECKING
763 elog(
DEBUG2,
"starting up replication origin progress state");
776 errmsg(
"could not open file \"%s\": %m",
786 errmsg(
"could not read file \"%s\": %m",
791 errmsg(
"could not read file \"%s\": read %d of %zu",
798 (
errmsg(
"replication checkpoint has wrong magic %u instead of %u",
814 errmsg(
"could not read file \"%s\": %m",
829 errmsg(
"could not read file \"%s\": read %d of %zu",
838 errmsg(
"could not find free replication state, increase \"max_active_replication_origins\"")));
846 errmsg(
"recovered replication state of node %d to %X/%08X",
856 errmsg(
"replication slot checkpoint has wrong checksum %u, expected %u",
862 errmsg(
"could not close file \"%s\": %m",
908 elog(
PANIC,
"replorigin_redo: unknown op code %u", info);
985 ?
errmsg(
"replication origin with ID %d is already active for PID %d",
988 :
errmsg(
"replication origin with ID %d is already active in another process",
998 errmsg(
"could not find free replication state slot for replication origin with ID %d",
1000 errhint(
"Increase \"max_active_replication_origins\" and try again.")));
1024 xlrec.node_id = node;
1072 if (
state->roident == node)
1076 remote_lsn =
state->remote_lsn;
1077 local_lsn =
state->local_lsn;
1173 errmsg(
"cannot setup replication origin when one is already setup")));
1198 if (acquired_by == 0)
1205 errmsg(
"replication origin with ID %d is already active for PID %d",
1219 errmsg(
"replication origin with ID %d is already active in another process",
1229 if (
curstate->acquired_by != acquired_by)
1232 errmsg(
"replication origin with ID %d is not active for PID %d",
1248 if (acquired_by != 0)
1251 errmsg(
"cannot use PID %d for inactive replication origin with ID %d",
1252 acquired_by, node)));
1258 errmsg(
"could not find free replication state slot for replication origin with ID %d",
1260 errhint(
"Increase \"max_active_replication_origins\" and try again.")));
1271 if (acquired_by == 0)
1308 errmsg(
"no replication origin is configured")));
1320 errmsg(
"cannot reset replication origin with ID %d because it is still in use by other processes",
1322 errdetail(
"This session is the first process for this replication origin, and other processes are currently sharing it."),
1323 errhint(
"Reset the replication origin in all other processes before retrying.")));
1414 errmsg(
"replication origin name \"%s\" is reserved",
1416 errdetail(
"Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1423#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1425 elog(
WARNING,
"replication origins created by regression test cases should have names starting with \"regress_\"");
1544 errmsg(
"no replication origin is configured")));
1564 errmsg(
"no replication origin is configured")));
1649#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1678 memset(nulls, 1,
sizeof(nulls));
1710#undef REPLICATION_ORIGIN_PROGRESS_COLS
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define OidIsValid(objectId)
bool IsReservedName(const char *name)
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errcode_for_file_access(void)
int errcode(int sqlerrcode)
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
int CloseTransientFile(int fd)
int OpenTransientFile(const char *fileName, int fileFlags)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_DATUM(n)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
#define PG_RETURN_BOOL(x)
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
ReplOriginId replorigin_create(const char *roname)
static ReplicationStateCtl * replication_states_ctl
static void ReplicationOriginShmemInit(void *arg)
ReplOriginXactState replorigin_xact_state
static void ReplicationOriginShmemRequest(void *arg)
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
void replorigin_session_reset(void)
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
static bool IsReservedOriginName(const char *name)
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
int max_active_replication_origins
const ShmemCallbacks ReplicationOriginShmemCallbacks
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
static ReplicationState * replication_states
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
static void ReplicationOriginExitCleanup(int code, Datum arg)
void StartupReplicationOrigin(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
void CheckPointReplicationOrigin(void)
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
static ReplicationState * session_replication_state
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
#define REPLICATION_ORIGIN_PROGRESS_COLS
static void ReplicationOriginShmemAttach(void *arg)
XLogRecPtr replorigin_session_get_progress(bool flush)
static void replorigin_session_reset_internal(void)
void replorigin_xact_clear(bool clear_origin)
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
#define REPLICATION_STATE_MAGIC
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
void replorigin_redo(XLogReaderState *record)
#define XLOG_REPLORIGIN_DROP
#define InvalidReplOriginId
#define XLOG_REPLORIGIN_SET
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
static Datum LSNGetDatum(XLogRecPtr X)
END_CATALOG_STRUCT typedef FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
static int fd(const char *x, int i)
#define RelationGetDescr(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
#define ShmemRequestStruct(...)
#define InitDirtySnapshot(snapshotdata)
#define BTEqualStrategyNumber
TimestampTz origin_timestamp
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable origin_cv
ShmemRequestCallback request_fn
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
#define PG_GETARG_TIMESTAMPTZ(n)
char * text_to_cstring(const text *t)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
bool RecoveryInProgress(void)
void XLogFlush(XLogRecPtr record)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)