PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.h File Reference
Include dependency graph for origin.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  xl_replorigin_set
 
struct  xl_replorigin_drop
 
struct  ReplOriginXactState
 

Macros

#define XLOG_REPLORIGIN_SET   0x00
 
#define XLOG_REPLORIGIN_DROP   0x10
 
#define InvalidReplOriginId   0
 
#define DoNotReplicateId   PG_UINT16_MAX
 
#define MAX_RONAME_LEN   512
 

Typedefs

typedef struct xl_replorigin_set xl_replorigin_set
 
typedef struct xl_replorigin_drop xl_replorigin_drop
 
typedef struct ReplOriginXactState ReplOriginXactState
 

Functions

ReplOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
ReplOriginId replorigin_create (const char *roname)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (ReplOriginId roident, bool missing_ok, char **roname)
 
void replorigin_advance (ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (ReplOriginId node, bool flush)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
void replorigin_session_setup (ReplOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void replorigin_xact_clear (bool clear_origin)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_desc (StringInfo buf, XLogReaderState *record)
 
const charreplorigin_identify (uint8 info)
 

Variables

PGDLLIMPORT ReplOriginXactState replorigin_xact_state
 
PGDLLIMPORT int max_active_replication_origins
 

Macro Definition Documentation

◆ DoNotReplicateId

#define DoNotReplicateId   PG_UINT16_MAX

Definition at line 34 of file origin.h.

◆ InvalidReplOriginId

#define InvalidReplOriginId   0

Definition at line 33 of file origin.h.

◆ MAX_RONAME_LEN

#define MAX_RONAME_LEN   512

Definition at line 41 of file origin.h.

◆ XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_DROP   0x10

Definition at line 31 of file origin.h.

◆ XLOG_REPLORIGIN_SET

#define XLOG_REPLORIGIN_SET   0x00

Definition at line 30 of file origin.h.

Typedef Documentation

◆ ReplOriginXactState

◆ xl_replorigin_drop

◆ xl_replorigin_set

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )
extern

Definition at line 614 of file origin.c.

615{
617 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
618 int tmpfd;
619 int i;
622
624 return;
625
627
628 /* make sure no old temp file is remaining */
629 if (unlink(tmppath) < 0 && errno != ENOENT)
632 errmsg("could not remove file \"%s\": %m",
633 tmppath)));
634
635 /*
636 * no other backend can perform this at the same time; only one checkpoint
637 * can happen at a time.
638 */
641 if (tmpfd < 0)
644 errmsg("could not create file \"%s\": %m",
645 tmppath)));
646
647 /* write magic */
648 errno = 0;
649 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
650 {
651 /* if write didn't set errno, assume problem is no disk space */
652 if (errno == 0)
653 errno = ENOSPC;
656 errmsg("could not write to file \"%s\": %m",
657 tmppath)));
658 }
659 COMP_CRC32C(crc, &magic, sizeof(magic));
660
661 /* prevent concurrent creations/drops */
663
664 /* write actual data */
665 for (i = 0; i < max_active_replication_origins; i++)
666 {
669 XLogRecPtr local_lsn;
670
671 if (curstate->roident == InvalidReplOriginId)
672 continue;
673
674 /* zero, to avoid uninitialized padding bytes */
675 memset(&disk_state, 0, sizeof(disk_state));
676
678
679 disk_state.roident = curstate->roident;
680
681 disk_state.remote_lsn = curstate->remote_lsn;
682 local_lsn = curstate->local_lsn;
683
684 LWLockRelease(&curstate->lock);
685
686 /* make sure we only write out a commit that's persistent */
687 XLogFlush(local_lsn);
688
689 errno = 0;
690 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
691 sizeof(disk_state))
692 {
693 /* if write didn't set errno, assume problem is no disk space */
694 if (errno == 0)
695 errno = ENOSPC;
698 errmsg("could not write to file \"%s\": %m",
699 tmppath)));
700 }
701
703 }
704
706
707 /* write out the CRC */
709 errno = 0;
710 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
711 {
712 /* if write didn't set errno, assume problem is no disk space */
713 if (errno == 0)
714 errno = ENOSPC;
717 errmsg("could not write to file \"%s\": %m",
718 tmppath)));
719 }
720
721 if (CloseTransientFile(tmpfd) != 0)
724 errmsg("could not close file \"%s\": %m",
725 tmppath)));
726
727 /* fsync, rename to permanent file, fsync file and directory */
729}
#define PG_BINARY
Definition c.h:1374
uint32_t uint32
Definition c.h:624
int errcode_for_file_access(void)
Definition elog.c:897
#define PANIC
Definition elog.h:44
#define ereport(elevel,...)
Definition elog.h:152
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
static char * errmsg
int max_active_replication_origins
Definition origin.c:106
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:103
static ReplicationState * replication_states
Definition origin.c:178
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:102
#define REPLICATION_STATE_MAGIC
Definition origin.c:204
#define InvalidReplOriginId
Definition origin.h:33
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:173
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:178
return crc
static int fb(int x)
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
uint64 XLogRecPtr
Definition xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg, fb(), FIN_CRC32C, i, INIT_CRC32C, InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, REPLICATION_STATE_MAGIC, replication_states, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ replorigin_advance()

void replorigin_advance ( ReplOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)
extern

Definition at line 928 of file origin.c.

931{
932 int i;
935
937
938 /* we don't track DoNotReplicateId */
939 if (node == DoNotReplicateId)
940 return;
941
942 /*
943 * XXX: For the case where this is called by WAL replay, it'd be more
944 * efficient to restore into a backend local hashtable and only dump into
945 * shmem after recovery is finished. Let's wait with implementing that
946 * till it's shown to be a measurable expense
947 */
948
949 /* Lock exclusively, as we may have to create a new table entry. */
951
952 /*
953 * Search for either an existing slot for the origin, or a free one we can
954 * use.
955 */
956 for (i = 0; i < max_active_replication_origins; i++)
957 {
959
960 /* remember where to insert if necessary */
961 if (curstate->roident == InvalidReplOriginId &&
962 free_state == NULL)
963 {
965 continue;
966 }
967
968 /* not our slot */
969 if (curstate->roident != node)
970 {
971 continue;
972 }
973
974 /* ok, found slot */
976
978
979 /* Make sure it's not used by somebody else */
980 if (replication_state->refcount > 0)
981 {
984 (replication_state->acquired_by != 0)
985 ? errmsg("replication origin with ID %d is already active for PID %d",
986 replication_state->roident,
987 replication_state->acquired_by)
988 : errmsg("replication origin with ID %d is already active in another process",
989 replication_state->roident)));
990 }
991
992 break;
993 }
994
998 errmsg("could not find free replication state slot for replication origin with ID %d",
999 node),
1000 errhint("Increase \"max_active_replication_origins\" and try again.")));
1001
1002 if (replication_state == NULL)
1003 {
1004 /* initialize new slot */
1009 replication_state->roident = node;
1010 }
1011
1013
1014 /*
1015 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1016 * and the standby gets the message. Primarily this will be called during
1017 * WAL replay (of commit records) where no WAL logging is necessary.
1018 */
1019 if (wal_log)
1020 {
1022
1024 xlrec.node_id = node;
1025 xlrec.force = go_backward;
1026
1028 XLogRegisterData(&xlrec, sizeof(xlrec));
1029
1031 }
1032
1033 /*
1034 * Due to - harmless - race conditions during a checkpoint we could see
1035 * values here that are older than the ones we already have in memory. We
1036 * could also see older values for prepared transactions when the prepare
1037 * is sent at a later point of time along with commit prepared and there
1038 * are other transactions commits between prepare and commit prepared. See
1039 * ReorderBufferFinishPrepared. Don't overwrite those.
1040 */
1041 if (go_backward || replication_state->remote_lsn < remote_commit)
1042 replication_state->remote_lsn = remote_commit;
1044 (go_backward || replication_state->local_lsn < local_commit))
1045 replication_state->local_lsn = local_commit;
1047
1048 /*
1049 * Release *after* changing the LSNs, slot isn't acquired and thus could
1050 * otherwise be dropped anytime.
1051 */
1053}
#define Assert(condition)
Definition c.h:943
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:40
@ LW_EXCLUSIVE
Definition lwlock.h:104
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
XLogRecPtr remote_lsn
Definition origin.h:20
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:482
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:372
void XLogBeginInsert(void)
Definition xloginsert.c:153

References Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::remote_lsn, replication_states, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), XLogRecPtrIsValid, and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

ReplOriginId replorigin_by_name ( const char roname,
bool  missing_ok 
)
extern

Definition at line 243 of file origin.c.

244{
246 Oid roident = InvalidOid;
247 HeapTuple tuple;
249
251
253 if (HeapTupleIsValid(tuple))
254 {
256 roident = ident->roident;
257 ReleaseSysCache(tuple);
258 }
259 else if (!missing_ok)
262 errmsg("replication origin \"%s\" does not exist",
263 roname)));
264
265 return roident;
266}
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
END_CATALOG_STRUCT typedef FormData_pg_replication_origin * Form_pg_replication_origin
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221

References CStringGetTextDatum, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( ReplOriginId  roident,
bool  missing_ok,
char **  roname 
)
extern

Definition at line 513 of file origin.c.

514{
515 HeapTuple tuple;
517
518 Assert(OidIsValid((Oid) roident));
519 Assert(roident != InvalidReplOriginId);
520 Assert(roident != DoNotReplicateId);
521
523 ObjectIdGetDatum((Oid) roident));
524
525 if (HeapTupleIsValid(tuple))
526 {
528 *roname = text_to_cstring(&ric->roname);
529 ReleaseSysCache(tuple);
530
531 return true;
532 }
533 else
534 {
535 *roname = NULL;
536
537 if (!missing_ok)
540 errmsg("replication origin with ID %d does not exist",
541 roident)));
542
543 return false;
544 }
545}
#define OidIsValid(objectId)
Definition c.h:858
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
char * text_to_cstring(const text *t)
Definition varlena.c:217

References Assert, DoNotReplicateId, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, InvalidReplOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_create()

ReplOriginId replorigin_create ( const char roname)
extern

Definition at line 274 of file origin.c.

275{
276 Oid roident;
277 HeapTuple tuple = NULL;
278 Relation rel;
281 SysScanDesc scan;
283
284 /*
285 * To avoid needing a TOAST table for pg_replication_origin, we limit
286 * replication origin names to 512 bytes. This should be more than enough
287 * for all practical use.
288 */
292 errmsg("replication origin name is too long"),
293 errdetail("Replication origin names must be no longer than %d bytes.",
295
297
299
300 /*
301 * We need the numeric replication origin to be 16bit wide, so we cannot
302 * rely on the normal oid allocation. Instead we simply scan
303 * pg_replication_origin for the first unused id. That's not particularly
304 * efficient, but this should be a fairly infrequent operation - we can
305 * easily spend a bit more code on this when it turns out it needs to be
306 * faster.
307 *
308 * We handle concurrency by taking an exclusive lock (allowing reads!)
309 * over the table for the duration of the search. Because we use a "dirty
310 * snapshot" we can read rows that other in-progress sessions have
311 * written, even though they would be invisible with normal snapshots. Due
312 * to the exclusive lock there's no danger that new rows can appear while
313 * we're checking.
314 */
316
318
319 /*
320 * We want to be able to access pg_replication_origin without setting up a
321 * snapshot. To make that safe, it needs to not have a TOAST table, since
322 * TOASTed data cannot be fetched without a snapshot. As of this writing,
323 * its only varlena column is roname, which we limit to 512 bytes to avoid
324 * needing out-of-line storage. If you add a TOAST table to this catalog,
325 * be sure to set up a snapshot everywhere it might be needed. For more
326 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
327 */
328 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
329
330 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
331 {
332 bool nulls[Natts_pg_replication_origin];
334 bool collides;
335
337
338 ScanKeyInit(&key,
341 ObjectIdGetDatum(roident));
342
344 true /* indexOK */ ,
346 1, &key);
347
349
350 systable_endscan(scan);
351
352 if (!collides)
353 {
354 /*
355 * Ok, found an unused roident, insert the new row and do a CCI,
356 * so our callers can look it up if they want to.
357 */
358 memset(&nulls, 0, sizeof(nulls));
359
362
363 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
364 CatalogTupleInsert(rel, tuple);
366 break;
367 }
368 }
369
370 /* now release lock again, */
372
373 if (tuple == NULL)
376 errmsg("could not find free replication origin ID")));
377
378 heap_freetuple(tuple);
379 return roident;
380}
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define PG_UINT16_MAX
Definition c.h:671
int errdetail(const char *fmt,...) pg_attribute_printf(1
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:612
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:523
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define ExclusiveLock
Definition lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define MAX_RONAME_LEN
Definition origin.h:41
#define RelationGetDescr(relation)
Definition rel.h:542
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
Form_pg_class rd_rel
Definition rel.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1130

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg, ERROR, ExclusiveLock, fb(), heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_desc()

void replorigin_desc ( StringInfo  buf,
XLogReaderState record 
)
extern

Definition at line 19 of file replorigindesc.c.

20{
21 char *rec = XLogRecGetData(record);
22 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
23
24 switch (info)
25 {
27 {
29
30 xlrec = (xl_replorigin_set *) rec;
31
32 appendStringInfo(buf, "set %u; lsn %X/%08X; force: %d",
33 xlrec->node_id,
34 LSN_FORMAT_ARGS(xlrec->remote_lsn),
35 xlrec->force);
36 break;
37 }
39 {
41
42 xlrec = (xl_replorigin_drop *) rec;
43
44 appendStringInfo(buf, "drop %u", xlrec->node_id);
45 break;
46 }
47 }
48}
uint8_t uint8
Definition c.h:622
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
static char buf[DEFAULT_XLOG_SEG_SIZE]
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:410
#define XLogRecGetData(decoder)
Definition xlogreader.h:415

References appendStringInfo(), buf, fb(), LSN_FORMAT_ARGS, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char name,
bool  missing_ok,
bool  nowait 
)
extern

Definition at line 459 of file origin.c.

460{
461 ReplOriginId roident;
462 Relation rel;
463 HeapTuple tuple;
464
466
468
469 roident = replorigin_by_name(name, missing_ok);
470
471 /* Lock the origin to prevent concurrent drops. */
474
476 if (!HeapTupleIsValid(tuple))
477 {
478 if (!missing_ok)
479 elog(ERROR, "cache lookup failed for replication origin with ID %d",
480 roident);
481
482 /*
483 * We don't need to retain the locks if the origin is already dropped.
484 */
488 return;
489 }
490
491 replorigin_state_clear(roident, nowait);
492
493 /*
494 * Now, we can delete the catalog entry.
495 */
496 CatalogTupleDelete(rel, &tuple->t_self);
497 ReleaseSysCache(tuple);
498
500
501 /* We keep the lock on pg_replication_origin until commit */
502 table_close(rel, NoLock);
503}
#define elog(elevel,...)
Definition elog.h:228
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:386
ItemPointerData t_self
Definition htup.h:65
const char * name
uint16 ReplOriginId
Definition xlogdefs.h:69

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, fb(), HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), ProcessSyncingTablesForApply(), and ProcessSyncingTablesForSync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( ReplOriginId  node,
bool  flush 
)
extern

Definition at line 1057 of file origin.c.

1058{
1059 int i;
1060 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1061 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1062
1063 /* prevent slots from being concurrently dropped */
1065
1066 for (i = 0; i < max_active_replication_origins; i++)
1067 {
1069
1071
1072 if (state->roident == node)
1073 {
1074 LWLockAcquire(&state->lock, LW_SHARED);
1075
1076 remote_lsn = state->remote_lsn;
1077 local_lsn = state->local_lsn;
1078
1079 LWLockRelease(&state->lock);
1080
1081 break;
1082 }
1083 }
1084
1086
1087 if (flush && XLogRecPtrIsValid(local_lsn))
1088 XLogFlush(local_lsn);
1089
1090 return remote_lsn;
1091}
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References fb(), i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, XLogFlush(), and XLogRecPtrIsValid.

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_identify()

const char * replorigin_identify ( uint8  info)
extern

Definition at line 51 of file replorigindesc.c.

52{
53 switch (info)
54 {
56 return "SET";
58 return "DROP";
59 default:
60 return NULL;
61 }
62}

References fb(), XLOG_REPLORIGIN_DROP, and XLOG_REPLORIGIN_SET.

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)
extern

Definition at line 867 of file origin.c.

868{
869 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
870
871 switch (info)
872 {
874 {
877
878 replorigin_advance(xlrec->node_id,
879 xlrec->remote_lsn, record->EndRecPtr,
880 xlrec->force /* backward */ ,
881 false /* WAL log */ );
882 break;
883 }
885 {
887 int i;
888
890
891 for (i = 0; i < max_active_replication_origins; i++)
892 {
894
895 /* found our slot */
896 if (state->roident == xlrec->node_id)
897 {
898 /* reset entry */
899 state->roident = InvalidReplOriginId;
900 state->remote_lsn = InvalidXLogRecPtr;
901 state->local_lsn = InvalidXLogRecPtr;
902 break;
903 }
904 }
905 break;
906 }
907 default:
908 elog(PANIC, "replorigin_redo: unknown op code %u", info);
909 }
910}
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:928
XLogRecPtr EndRecPtr
Definition xlogreader.h:206

References elog, XLogReaderState::EndRecPtr, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, max_active_replication_origins, PANIC, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)
extern

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )
extern

Definition at line 1301 of file origin.c.

1302{
1304
1306 ereport(ERROR,
1308 errmsg("no replication origin is configured")));
1309
1310 /*
1311 * Restrict explicit resetting of the replication origin if it was first
1312 * acquired by this process and others are still using it. While the
1313 * system handles this safely (as happens if the first session exits
1314 * without calling reset), it is best to avoid doing so.
1315 */
1318 ereport(ERROR,
1320 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1322 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1323 errhint("Reset the replication origin in all other processes before retrying.")));
1324
1326}
int MyProcPid
Definition globals.c:49
static void replorigin_session_reset_internal(void)
Definition origin.c:1095

References ReplicationState::acquired_by, Assert, ereport, errcode(), errdetail(), errhint(), errmsg, ERROR, fb(), max_active_replication_origins, MyProcPid, ReplicationState::refcount, replorigin_session_reset_internal(), ReplicationState::roident, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and ProcessSyncingTablesForSync().

◆ replorigin_session_setup()

void replorigin_session_setup ( ReplOriginId  node,
int  acquired_by 
)
extern

Definition at line 1156 of file origin.c.

1157{
1158 static bool registered_cleanup;
1159 int i;
1160 int free_slot = -1;
1161
1162 if (!registered_cleanup)
1163 {
1165 registered_cleanup = true;
1166 }
1167
1169
1171 ereport(ERROR,
1173 errmsg("cannot setup replication origin when one is already setup")));
1174
1175 /* Lock exclusively, as we may have to create a new table entry. */
1177
1178 /*
1179 * Search for either an existing slot for the origin, or a free one we can
1180 * use.
1181 */
1182 for (i = 0; i < max_active_replication_origins; i++)
1183 {
1185
1186 /* remember where to insert if necessary */
1187 if (curstate->roident == InvalidReplOriginId &&
1188 free_slot == -1)
1189 {
1190 free_slot = i;
1191 continue;
1192 }
1193
1194 /* not our slot */
1195 if (curstate->roident != node)
1196 continue;
1197
1198 if (acquired_by == 0)
1199 {
1200 /* With acquired_by == 0, we need the origin to be free */
1201 if (curstate->acquired_by != 0)
1202 {
1203 ereport(ERROR,
1205 errmsg("replication origin with ID %d is already active for PID %d",
1206 curstate->roident, curstate->acquired_by)));
1207 }
1208 else if (curstate->refcount > 0)
1209 {
1210 /*
1211 * The origin is in use, but PID is not recorded. This can
1212 * happen if the process that originally acquired the origin
1213 * exited without releasing it. To ensure correctness, other
1214 * processes cannot acquire the origin until all processes
1215 * currently using it have released it.
1216 */
1217 ereport(ERROR,
1219 errmsg("replication origin with ID %d is already active in another process",
1220 curstate->roident)));
1221 }
1222 }
1223 else
1224 {
1225 /*
1226 * With acquired_by != 0, we need the origin to be active by the
1227 * given PID
1228 */
1229 if (curstate->acquired_by != acquired_by)
1230 ereport(ERROR,
1232 errmsg("replication origin with ID %d is not active for PID %d",
1233 curstate->roident, acquired_by)));
1234
1235 /*
1236 * Here, it is okay to have refcount > 0 as more than one process
1237 * can safely re-use the origin.
1238 */
1239 }
1240
1241 /* ok, found slot */
1243 break;
1244 }
1245
1247 {
1248 if (acquired_by != 0)
1249 ereport(ERROR,
1251 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1252 acquired_by, node)));
1253
1254 /* initialize new slot */
1255 if (free_slot == -1)
1256 ereport(ERROR,
1258 errmsg("could not find free replication state slot for replication origin with ID %d",
1259 node),
1260 errhint("Increase \"max_active_replication_origins\" and try again.")));
1261
1266 }
1267
1268
1270
1271 if (acquired_by == 0)
1272 {
1275 }
1276 else
1277 {
1278 /*
1279 * Sanity check: the origin must already be acquired by the process
1280 * passed as input, and at least one process must be using it.
1281 */
1284 }
1285
1287
1289
1290 /* probably this one is pointless */
1292}
void ConditionVariableBroadcast(ConditionVariable *cv)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1129
ConditionVariable origin_cv
Definition origin.c:141

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::refcount, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, session_replication_state, and XLogRecPtrIsValid.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_xact_clear()

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )
extern

Definition at line 740 of file origin.c.

741{
742 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
743 int fd;
744 int readBytes;
746 int last_state = 0;
749
750 /* don't want to overwrite already existing state */
751#ifdef USE_ASSERT_CHECKING
752 static bool already_started = false;
753
755 already_started = true;
756#endif
757
759 return;
760
762
763 elog(DEBUG2, "starting up replication origin progress state");
764
766
767 /*
768 * might have had max_active_replication_origins == 0 last run, or we just
769 * brought up a standby.
770 */
771 if (fd < 0 && errno == ENOENT)
772 return;
773 else if (fd < 0)
776 errmsg("could not open file \"%s\": %m",
777 path)));
778
779 /* verify magic, that is written even if nothing was active */
780 readBytes = read(fd, &magic, sizeof(magic));
781 if (readBytes != sizeof(magic))
782 {
783 if (readBytes < 0)
786 errmsg("could not read file \"%s\": %m",
787 path)));
788 else
791 errmsg("could not read file \"%s\": read %d of %zu",
792 path, readBytes, sizeof(magic))));
793 }
794 COMP_CRC32C(crc, &magic, sizeof(magic));
795
796 if (magic != REPLICATION_STATE_MAGIC)
798 (errmsg("replication checkpoint has wrong magic %u instead of %u",
799 magic, REPLICATION_STATE_MAGIC)));
800
801 /* we can skip locking here, no other access is possible */
802
803 /* recover individual states, until there are no more to be found */
804 while (true)
805 {
807
808 readBytes = read(fd, &disk_state, sizeof(disk_state));
809
810 if (readBytes < 0)
811 {
814 errmsg("could not read file \"%s\": %m",
815 path)));
816 }
817
818 /* no further data */
819 if (readBytes == sizeof(crc))
820 {
821 memcpy(&file_crc, &disk_state, sizeof(file_crc));
822 break;
823 }
824
825 if (readBytes != sizeof(disk_state))
826 {
829 errmsg("could not read file \"%s\": read %d of %zu",
830 path, readBytes, sizeof(disk_state))));
831 }
832
834
838 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
839
840 /* copy data to shared memory */
843 last_state++;
844
845 ereport(LOG,
846 errmsg("recovered replication state of node %d to %X/%08X",
847 disk_state.roident,
848 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
849 }
850
851 /* now check checksum */
853 if (file_crc != crc)
856 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
857 crc, file_crc)));
858
859 if (CloseTransientFile(fd) != 0)
862 errmsg("could not close file \"%s\": %m",
863 path)));
864}
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
#define LOG
Definition elog.h:32
#define DEBUG2
Definition elog.h:30
#define read(a, b, c)
Definition win32.h:13
#define ERRCODE_DATA_CORRUPTED
static int fd(const char *x, int i)

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, fb(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, memcpy(), OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, and ReplicationState::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replorigin_xact_state