PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
origin.h File Reference
Include dependency graph for origin.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  xl_replorigin_set
 
struct  xl_replorigin_drop
 

Macros

#define XLOG_REPLORIGIN_SET   0x00
 
#define XLOG_REPLORIGIN_DROP   0x10
 
#define InvalidRepOriginId   0
 
#define DoNotReplicateId   PG_UINT16_MAX
 

Typedefs

typedef struct xl_replorigin_set xl_replorigin_set
 
typedef struct xl_replorigin_drop xl_replorigin_drop
 

Functions

RepOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId replorigin_create (const char *roname)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
void replorigin_session_setup (RepOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_desc (StringInfo buf, XLogReaderState *record)
 
const char * replorigin_identify (uint8 info)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 

Variables

PGDLLIMPORT RepOriginId replorigin_session_origin
 
PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn
 
PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp
 
PGDLLIMPORT int max_active_replication_origins
 

Macro Definition Documentation

◆ DoNotReplicateId

#define DoNotReplicateId   PG_UINT16_MAX

Definition at line 34 of file origin.h.

◆ InvalidRepOriginId

#define InvalidRepOriginId   0

Definition at line 33 of file origin.h.

◆ XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_DROP   0x10

Definition at line 31 of file origin.h.

◆ XLOG_REPLORIGIN_SET

#define XLOG_REPLORIGIN_SET   0x00

Definition at line 30 of file origin.h.

Typedef Documentation

◆ xl_replorigin_drop

◆ xl_replorigin_set

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 573 of file origin.c.

574{
575 const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
576 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
577 int tmpfd;
578 int i;
581
583 return;
584
586
587 /* make sure no old temp file is remaining */
588 if (unlink(tmppath) < 0 && errno != ENOENT)
591 errmsg("could not remove file \"%s\": %m",
592 tmppath)));
593
594 /*
595 * no other backend can perform this at the same time; only one checkpoint
596 * can happen at a time.
597 */
598 tmpfd = OpenTransientFile(tmppath,
599 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
600 if (tmpfd < 0)
603 errmsg("could not create file \"%s\": %m",
604 tmppath)));
605
606 /* write magic */
607 errno = 0;
608 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
609 {
610 /* if write didn't set errno, assume problem is no disk space */
611 if (errno == 0)
612 errno = ENOSPC;
615 errmsg("could not write to file \"%s\": %m",
616 tmppath)));
617 }
618 COMP_CRC32C(crc, &magic, sizeof(magic));
619
620 /* prevent concurrent creations/drops */
621 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
622
623 /* write actual data */
624 for (i = 0; i < max_active_replication_origins; i++)
625 {
626 ReplicationStateOnDisk disk_state;
628 XLogRecPtr local_lsn;
629
630 if (curstate->roident == InvalidRepOriginId)
631 continue;
632
633 /* zero, to avoid uninitialized padding bytes */
634 memset(&disk_state, 0, sizeof(disk_state));
635
636 LWLockAcquire(&curstate->lock, LW_SHARED);
637
638 disk_state.roident = curstate->roident;
639
640 disk_state.remote_lsn = curstate->remote_lsn;
641 local_lsn = curstate->local_lsn;
642
643 LWLockRelease(&curstate->lock);
644
645 /* make sure we only write out a commit that's persistent */
646 XLogFlush(local_lsn);
647
648 errno = 0;
649 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
650 sizeof(disk_state))
651 {
652 /* if write didn't set errno, assume problem is no disk space */
653 if (errno == 0)
654 errno = ENOSPC;
657 errmsg("could not write to file \"%s\": %m",
658 tmppath)));
659 }
660
661 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
662 }
663
664 LWLockRelease(ReplicationOriginLock);
665
666 /* write out the CRC */
668 errno = 0;
669 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
670 {
671 /* if write didn't set errno, assume problem is no disk space */
672 if (errno == 0)
673 errno = ENOSPC;
676 errmsg("could not write to file \"%s\": %m",
677 tmppath)));
678 }
679
680 if (CloseTransientFile(tmpfd) != 0)
683 errmsg("could not close file \"%s\": %m",
684 tmppath)));
685
686 /* fsync, rename to permanent file, fsync file and directory */
687 durable_rename(tmppath, path, PANIC);
688}
#define PG_BINARY
Definition: c.h:1244
uint32_t uint32
Definition: c.h:502
int errcode_for_file_access(void)
Definition: elog.c:877
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define PANIC
Definition: elog.h:42
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
int CloseTransientFile(int fd)
Definition: fd.c:2871
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2694
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:115
int max_active_replication_origins
Definition: origin.c:104
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition: origin.c:101
static ReplicationState * replication_states
Definition: origin.c:171
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition: origin.c:100
#define REPLICATION_STATE_MAGIC
Definition: origin.c:187
#define InvalidRepOriginId
Definition: origin.h:33
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:158
return crc
XLogRecPtr remote_lsn
Definition: origin.c:150
RepOriginId roident
Definition: origin.c:149
XLogRecPtr remote_lsn
Definition: origin.c:119
XLogRecPtr local_lsn
Definition: origin.c:126
RepOriginId roident
Definition: origin.c:114
LWLock lock
Definition: origin.c:141
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2923
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 526 of file origin.c.

527{
528 bool found;
529
531 return;
532
534 ShmemInitStruct("ReplicationOriginState",
536 &found);
538
539 if (!found)
540 {
541 int i;
542
544
546
547 for (i = 0; i < max_active_replication_origins; i++)
548 {
552 }
553 }
554}
#define MemSet(start, val, len)
Definition: c.h:991
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:721
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:192
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:176
Size ReplicationOriginShmemSize(void)
Definition: origin.c:511
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:159

References ConditionVariableInit(), i, LWLockInitialize(), LWTRANCHE_REPLICATION_ORIGIN_STATE, max_active_replication_origins, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 511 of file origin.c.

512{
513 Size size = 0;
514
516 return size;
517
518 size = add_size(size, offsetof(ReplicationStateCtl, states));
519
520 size = add_size(size,
522 return size;
523}
size_t Size
Definition: c.h:576
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_active_replication_origins, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 888 of file origin.c.

891{
892 int i;
893 ReplicationState *replication_state = NULL;
894 ReplicationState *free_state = NULL;
895
896 Assert(node != InvalidRepOriginId);
897
898 /* we don't track DoNotReplicateId */
899 if (node == DoNotReplicateId)
900 return;
901
902 /*
903 * XXX: For the case where this is called by WAL replay, it'd be more
904 * efficient to restore into a backend local hashtable and only dump into
905 * shmem after recovery is finished. Let's wait with implementing that
906 * till it's shown to be a measurable expense
907 */
908
909 /* Lock exclusively, as we may have to create a new table entry. */
910 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
911
912 /*
913 * Search for either an existing slot for the origin, or a free one we can
914 * use.
915 */
916 for (i = 0; i < max_active_replication_origins; i++)
917 {
919
920 /* remember where to insert if necessary */
921 if (curstate->roident == InvalidRepOriginId &&
922 free_state == NULL)
923 {
924 free_state = curstate;
925 continue;
926 }
927
928 /* not our slot */
929 if (curstate->roident != node)
930 {
931 continue;
932 }
933
934 /* ok, found slot */
935 replication_state = curstate;
936
937 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
938
939 /* Make sure it's not used by somebody else */
940 if (replication_state->acquired_by != 0)
941 {
943 (errcode(ERRCODE_OBJECT_IN_USE),
944 errmsg("replication origin with ID %d is already active for PID %d",
945 replication_state->roident,
946 replication_state->acquired_by)));
947 }
948
949 break;
950 }
951
952 if (replication_state == NULL && free_state == NULL)
954 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955 errmsg("could not find free replication state slot for replication origin with ID %d",
956 node),
957 errhint("Increase \"max_active_replication_origins\" and try again.")));
958
959 if (replication_state == NULL)
960 {
961 /* initialize new slot */
962 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
963 replication_state = free_state;
964 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
965 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
966 replication_state->roident = node;
967 }
968
969 Assert(replication_state->roident != InvalidRepOriginId);
970
971 /*
972 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
973 * and the standby gets the message. Primarily this will be called during
974 * WAL replay (of commit records) where no WAL logging is necessary.
975 */
976 if (wal_log)
977 {
978 xl_replorigin_set xlrec;
979
980 xlrec.remote_lsn = remote_commit;
981 xlrec.node_id = node;
982 xlrec.force = go_backward;
983
985 XLogRegisterData(&xlrec, sizeof(xlrec));
986
987 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
988 }
989
990 /*
991 * Due to - harmless - race conditions during a checkpoint we could see
992 * values here that are older than the ones we already have in memory. We
993 * could also see older values for prepared transactions when the prepare
994 * is sent at a later point of time along with commit prepared and there
995 * are other transactions commits between prepare and commit prepared. See
996 * ReorderBufferFinishPrepared. Don't overwrite those.
997 */
998 if (go_backward || replication_state->remote_lsn < remote_commit)
999 replication_state->remote_lsn = remote_commit;
1000 if (local_commit != InvalidXLogRecPtr &&
1001 (go_backward || replication_state->local_lsn < local_commit))
1002 replication_state->local_lsn = local_commit;
1003 LWLockRelease(&replication_state->lock);
1004
1005 /*
1006 * Release *after* changing the LSNs, slot isn't acquired and thus could
1007 * otherwise be dropped anytime.
1008 */
1009 LWLockRelease(ReplicationOriginLock);
1010}
int errhint(const char *fmt,...)
Definition: elog.c:1318
int errcode(int sqlerrcode)
Definition: elog.c:854
#define ERROR
Definition: elog.h:39
Assert(PointerIsAligned(start, uint64))
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define DoNotReplicateId
Definition: origin.h:34
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:364
void XLogBeginInsert(void)
Definition: xloginsert.c:149

References ReplicationState::acquired_by, Assert(), DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::node_id, ReplicationState::remote_lsn, xl_replorigin_set::remote_lsn, replication_states, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( const char *  roname,
bool  missing_ok 
)

Definition at line 226 of file origin.c.

227{
229 Oid roident = InvalidOid;
230 HeapTuple tuple;
231 Datum roname_d;
232
233 roname_d = CStringGetTextDatum(roname);
234
235 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 if (HeapTupleIsValid(tuple))
237 {
239 roident = ident->roident;
240 ReleaseSysCache(tuple);
241 }
242 else if (!missing_ok)
244 (errcode(ERRCODE_UNDEFINED_OBJECT),
245 errmsg("replication origin \"%s\" does not exist",
246 roname)));
247
248 return roident;
249}
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define ident
Definition: indent_codes.h:47
FormData_pg_replication_origin * Form_pg_replication_origin
uintptr_t Datum
Definition: postgres.h:69
#define InvalidOid
Definition: postgres_ext.h:35
unsigned int Oid
Definition: postgres_ext.h:30
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 470 of file origin.c.

471{
472 HeapTuple tuple;
474
475 Assert(OidIsValid((Oid) roident));
476 Assert(roident != InvalidRepOriginId);
477 Assert(roident != DoNotReplicateId);
478
479 tuple = SearchSysCache1(REPLORIGIDENT,
480 ObjectIdGetDatum((Oid) roident));
481
482 if (HeapTupleIsValid(tuple))
483 {
485 *roname = text_to_cstring(&ric->roname);
486 ReleaseSysCache(tuple);
487
488 return true;
489 }
490 else
491 {
492 *roname = NULL;
493
494 if (!missing_ok)
496 (errcode(ERRCODE_UNDEFINED_OBJECT),
497 errmsg("replication origin with ID %d does not exist",
498 roident)));
499
500 return false;
501 }
502}
#define OidIsValid(objectId)
Definition: c.h:746
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
char * text_to_cstring(const text *t)
Definition: varlena.c:225

References Assert(), DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_create()

RepOriginId replorigin_create ( const char *  roname)

Definition at line 257 of file origin.c.

258{
259 Oid roident;
260 HeapTuple tuple = NULL;
261 Relation rel;
262 Datum roname_d;
263 SnapshotData SnapshotDirty;
264 SysScanDesc scan;
266
267 roname_d = CStringGetTextDatum(roname);
268
270
271 /*
272 * We need the numeric replication origin to be 16bit wide, so we cannot
273 * rely on the normal oid allocation. Instead we simply scan
274 * pg_replication_origin for the first unused id. That's not particularly
275 * efficient, but this should be a fairly infrequent operation - we can
276 * easily spend a bit more code on this when it turns out it needs to be
277 * faster.
278 *
279 * We handle concurrency by taking an exclusive lock (allowing reads!)
280 * over the table for the duration of the search. Because we use a "dirty
281 * snapshot" we can read rows that other in-progress sessions have
282 * written, even though they would be invisible with normal snapshots. Due
283 * to the exclusive lock there's no danger that new rows can appear while
284 * we're checking.
285 */
286 InitDirtySnapshot(SnapshotDirty);
287
288 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
289
290 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
291 {
292 bool nulls[Natts_pg_replication_origin];
293 Datum values[Natts_pg_replication_origin];
294 bool collides;
295
297
299 Anum_pg_replication_origin_roident,
300 BTEqualStrategyNumber, F_OIDEQ,
301 ObjectIdGetDatum(roident));
302
303 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
304 true /* indexOK */ ,
305 &SnapshotDirty,
306 1, &key);
307
308 collides = HeapTupleIsValid(systable_getnext(scan));
309
310 systable_endscan(scan);
311
312 if (!collides)
313 {
314 /*
315 * Ok, found an unused roident, insert the new row and do a CCI,
316 * so our callers can look it up if they want to.
317 */
318 memset(&nulls, 0, sizeof(nulls));
319
320 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
321 values[Anum_pg_replication_origin_roname - 1] = roname_d;
322
323 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
324 CatalogTupleInsert(rel, tuple);
326 break;
327 }
328 }
329
330 /* now release lock again, */
332
333 if (tuple == NULL)
335 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
336 errmsg("could not find free replication origin ID")));
337
338 heap_freetuple(tuple);
339 return roident;
340}
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define PG_UINT16_MAX
Definition: c.h:558
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
#define ExclusiveLock
Definition: lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define RelationGetDescr(relation)
Definition: rel.h:542
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:42
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool IsTransactionState(void)
Definition: xact.c:387
void CommandCounterIncrement(void)
Definition: xact.c:1100

References Assert(), BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum(), PG_UINT16_MAX, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_desc()

void replorigin_desc ( StringInfo  buf,
XLogReaderState record 
)

Definition at line 19 of file replorigindesc.c.

20{
21 char *rec = XLogRecGetData(record);
22 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
23
24 switch (info)
25 {
27 {
28 xl_replorigin_set *xlrec;
29
30 xlrec = (xl_replorigin_set *) rec;
31
32 appendStringInfo(buf, "set %u; lsn %X/%X; force: %d",
33 xlrec->node_id,
35 xlrec->force);
36 break;
37 }
39 {
40 xl_replorigin_drop *xlrec;
41
42 xlrec = (xl_replorigin_drop *) rec;
43
44 appendStringInfo(buf, "drop %u", xlrec->node_id);
45 break;
46 }
47 }
48}
uint8_t uint8
Definition: c.h:500
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
static char * buf
Definition: pg_test_fsync.c:72
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
RepOriginId node_id
Definition: origin.h:27
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415

References appendStringInfo(), buf, xl_replorigin_set::force, LSN_FORMAT_ARGS, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, xl_replorigin_set::remote_lsn, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char *  name,
bool  missing_ok,
bool  nowait 
)

Definition at line 416 of file origin.c.

417{
418 RepOriginId roident;
419 Relation rel;
420 HeapTuple tuple;
421
423
424 rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
425
426 roident = replorigin_by_name(name, missing_ok);
427
428 /* Lock the origin to prevent concurrent drops. */
429 LockSharedObject(ReplicationOriginRelationId, roident, 0,
431
432 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
433 if (!HeapTupleIsValid(tuple))
434 {
435 if (!missing_ok)
436 elog(ERROR, "cache lookup failed for replication origin with ID %d",
437 roident);
438
439 /*
440 * We don't need to retain the locks if the origin is already dropped.
441 */
442 UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
445 return;
446 }
447
448 replorigin_state_clear(roident, nowait);
449
450 /*
451 * Now, we can delete the catalog entry.
452 */
453 CatalogTupleDelete(rel, &tuple->t_self);
454 ReleaseSysCache(tuple);
455
457
458 /* We keep the lock on pg_replication_origin until commit */
459 table_close(rel, NoLock);
460}
#define elog(elevel,...)
Definition: elog.h:226
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1082
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1142
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:346
ItemPointerData t_self
Definition: htup.h:65
const char * name
uint16 RepOriginId
Definition: xlogdefs.h:65

References AccessExclusiveLock, Assert(), CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 1014 of file origin.c.

1015{
1016 int i;
1017 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1018 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1019
1020 /* prevent slots from being concurrently dropped */
1021 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1022
1023 for (i = 0; i < max_active_replication_origins; i++)
1024 {
1026
1028
1029 if (state->roident == node)
1030 {
1031 LWLockAcquire(&state->lock, LW_SHARED);
1032
1033 remote_lsn = state->remote_lsn;
1034 local_lsn = state->local_lsn;
1035
1036 LWLockRelease(&state->lock);
1037
1038 break;
1039 }
1040 }
1041
1042 LWLockRelease(ReplicationOriginLock);
1043
1044 if (flush && local_lsn != InvalidXLogRecPtr)
1045 XLogFlush(local_lsn);
1046
1047 return remote_lsn;
1048}
Definition: regguts.h:323

References i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, and XLogFlush().

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_identify()

const char * replorigin_identify ( uint8  info)

Definition at line 51 of file replorigindesc.c.

52{
53 switch (info)
54 {
56 return "SET";
58 return "DROP";
59 default:
60 return NULL;
61 }
62}

References XLOG_REPLORIGIN_DROP, and XLOG_REPLORIGIN_SET.

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 827 of file origin.c.

828{
829 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
830
831 switch (info)
832 {
834 {
835 xl_replorigin_set *xlrec =
837
839 xlrec->remote_lsn, record->EndRecPtr,
840 xlrec->force /* backward */ ,
841 false /* WAL log */ );
842 break;
843 }
845 {
846 xl_replorigin_drop *xlrec;
847 int i;
848
849 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
850
851 for (i = 0; i < max_active_replication_origins; i++)
852 {
854
855 /* found our slot */
856 if (state->roident == xlrec->node_id)
857 {
858 /* reset entry */
859 state->roident = InvalidRepOriginId;
860 state->remote_lsn = InvalidXLogRecPtr;
861 state->local_lsn = InvalidXLogRecPtr;
862 break;
863 }
864 }
865 break;
866 }
867 default:
868 elog(PANIC, "replorigin_redo: unknown op code %u", info);
869 }
870}
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:888
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, max_active_replication_origins, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1190 of file origin.c.

1191{
1193
1195
1196 if (session_replication_state == NULL)
1197 ereport(ERROR,
1198 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1199 errmsg("no replication origin is configured")));
1200
1201 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1202
1206
1207 LWLockRelease(ReplicationOriginLock);
1208
1210}
void ConditionVariableBroadcast(ConditionVariable *cv)
ConditionVariable origin_cv
Definition: origin.c:136

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, ReplicationState::origin_cv, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and process_syncing_tables_for_sync().

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node,
int  acquired_by 
)

Definition at line 1097 of file origin.c.

1098{
1099 static bool registered_cleanup;
1100 int i;
1101 int free_slot = -1;
1102
1103 if (!registered_cleanup)
1104 {
1106 registered_cleanup = true;
1107 }
1108
1110
1111 if (session_replication_state != NULL)
1112 ereport(ERROR,
1113 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1114 errmsg("cannot setup replication origin when one is already setup")));
1115
1116 /* Lock exclusively, as we may have to create a new table entry. */
1117 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1118
1119 /*
1120 * Search for either an existing slot for the origin, or a free one we can
1121 * use.
1122 */
1123 for (i = 0; i < max_active_replication_origins; i++)
1124 {
1126
1127 /* remember where to insert if necessary */
1128 if (curstate->roident == InvalidRepOriginId &&
1129 free_slot == -1)
1130 {
1131 free_slot = i;
1132 continue;
1133 }
1134
1135 /* not our slot */
1136 if (curstate->roident != node)
1137 continue;
1138
1139 else if (curstate->acquired_by != 0 && acquired_by == 0)
1140 {
1141 ereport(ERROR,
1142 (errcode(ERRCODE_OBJECT_IN_USE),
1143 errmsg("replication origin with ID %d is already active for PID %d",
1144 curstate->roident, curstate->acquired_by)));
1145 }
1146
1147 /* ok, found slot */
1148 session_replication_state = curstate;
1149 break;
1150 }
1151
1152
1153 if (session_replication_state == NULL && free_slot == -1)
1154 ereport(ERROR,
1155 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1156 errmsg("could not find free replication state slot for replication origin with ID %d",
1157 node),
1158 errhint("Increase \"max_active_replication_origins\" and try again.")));
1159 else if (session_replication_state == NULL)
1160 {
1161 /* initialize new slot */
1166 }
1167
1168
1170
1171 if (acquired_by == 0)
1173 else if (session_replication_state->acquired_by != acquired_by)
1174 elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175 node, acquired_by);
1176
1177 LWLockRelease(ReplicationOriginLock);
1178
1179 /* probably this one is pointless */
1181}
int MyProcPid
Definition: globals.c:48
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1055

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), elog, ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, and session_replication_state.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 699 of file origin.c.

700{
701 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
702 int fd;
703 int readBytes;
705 int last_state = 0;
706 pg_crc32c file_crc;
708
709 /* don't want to overwrite already existing state */
710#ifdef USE_ASSERT_CHECKING
711 static bool already_started = false;
712
713 Assert(!already_started);
714 already_started = true;
715#endif
716
718 return;
719
721
722 elog(DEBUG2, "starting up replication origin progress state");
723
724 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
725
726 /*
727 * might have had max_active_replication_origins == 0 last run, or we just
728 * brought up a standby.
729 */
730 if (fd < 0 && errno == ENOENT)
731 return;
732 else if (fd < 0)
735 errmsg("could not open file \"%s\": %m",
736 path)));
737
738 /* verify magic, that is written even if nothing was active */
739 readBytes = read(fd, &magic, sizeof(magic));
740 if (readBytes != sizeof(magic))
741 {
742 if (readBytes < 0)
745 errmsg("could not read file \"%s\": %m",
746 path)));
747 else
750 errmsg("could not read file \"%s\": read %d of %zu",
751 path, readBytes, sizeof(magic))));
752 }
753 COMP_CRC32C(crc, &magic, sizeof(magic));
754
755 if (magic != REPLICATION_STATE_MAGIC)
757 (errmsg("replication checkpoint has wrong magic %u instead of %u",
758 magic, REPLICATION_STATE_MAGIC)));
759
760 /* we can skip locking here, no other access is possible */
761
762 /* recover individual states, until there are no more to be found */
763 while (true)
764 {
765 ReplicationStateOnDisk disk_state;
766
767 readBytes = read(fd, &disk_state, sizeof(disk_state));
768
769 /* no further data */
770 if (readBytes == sizeof(crc))
771 {
772 /* not pretty, but simple ... */
773 file_crc = *(pg_crc32c *) &disk_state;
774 break;
775 }
776
777 if (readBytes < 0)
778 {
781 errmsg("could not read file \"%s\": %m",
782 path)));
783 }
784
785 if (readBytes != sizeof(disk_state))
786 {
789 errmsg("could not read file \"%s\": read %d of %zu",
790 path, readBytes, sizeof(disk_state))));
791 }
792
793 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
794
795 if (last_state == max_active_replication_origins)
797 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
799
800 /* copy data to shared memory */
801 replication_states[last_state].roident = disk_state.roident;
802 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
803 last_state++;
804
805 ereport(LOG,
806 (errmsg("recovered replication state of node %d to %X/%X",
807 disk_state.roident,
808 LSN_FORMAT_ARGS(disk_state.remote_lsn))));
809 }
810
811 /* now check checksum */
813 if (file_crc != crc)
816 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
817 crc, file_crc)));
818
819 if (CloseTransientFile(fd) != 0)
822 errmsg("could not close file \"%s\": %m",
823 path)));
824}
#define LOG
Definition: elog.h:31
#define DEBUG2
Definition: elog.h:29
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert(), CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp