PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.h File Reference
Include dependency graph for origin.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  xl_replorigin_set
 
struct  xl_replorigin_drop
 
struct  ReplOriginXactState
 

Macros

#define XLOG_REPLORIGIN_SET   0x00
 
#define XLOG_REPLORIGIN_DROP   0x10
 
#define InvalidReplOriginId   0
 
#define DoNotReplicateId   PG_UINT16_MAX
 
#define MAX_RONAME_LEN   512
 

Typedefs

typedef struct xl_replorigin_set xl_replorigin_set
 
typedef struct xl_replorigin_drop xl_replorigin_drop
 
typedef struct ReplOriginXactState ReplOriginXactState
 

Functions

ReplOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
ReplOriginId replorigin_create (const char *roname)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (ReplOriginId roident, bool missing_ok, char **roname)
 
void replorigin_advance (ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (ReplOriginId node, bool flush)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
void replorigin_session_setup (ReplOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void replorigin_xact_clear (bool clear_origin)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_desc (StringInfo buf, XLogReaderState *record)
 
const charreplorigin_identify (uint8 info)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 

Variables

PGDLLIMPORT ReplOriginXactState replorigin_xact_state
 
PGDLLIMPORT int max_active_replication_origins
 

Macro Definition Documentation

◆ DoNotReplicateId

#define DoNotReplicateId   PG_UINT16_MAX

Definition at line 34 of file origin.h.

◆ InvalidReplOriginId

#define InvalidReplOriginId   0

Definition at line 33 of file origin.h.

◆ MAX_RONAME_LEN

#define MAX_RONAME_LEN   512

Definition at line 41 of file origin.h.

◆ XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_DROP   0x10

Definition at line 31 of file origin.h.

◆ XLOG_REPLORIGIN_SET

#define XLOG_REPLORIGIN_SET   0x00

Definition at line 30 of file origin.h.

Typedef Documentation

◆ ReplOriginXactState

◆ xl_replorigin_drop

◆ xl_replorigin_set

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )
extern

Definition at line 604 of file origin.c.

605{
607 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
608 int tmpfd;
609 int i;
612
614 return;
615
617
618 /* make sure no old temp file is remaining */
619 if (unlink(tmppath) < 0 && errno != ENOENT)
622 errmsg("could not remove file \"%s\": %m",
623 tmppath)));
624
625 /*
626 * no other backend can perform this at the same time; only one checkpoint
627 * can happen at a time.
628 */
631 if (tmpfd < 0)
634 errmsg("could not create file \"%s\": %m",
635 tmppath)));
636
637 /* write magic */
638 errno = 0;
639 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
640 {
641 /* if write didn't set errno, assume problem is no disk space */
642 if (errno == 0)
643 errno = ENOSPC;
646 errmsg("could not write to file \"%s\": %m",
647 tmppath)));
648 }
649 COMP_CRC32C(crc, &magic, sizeof(magic));
650
651 /* prevent concurrent creations/drops */
653
654 /* write actual data */
655 for (i = 0; i < max_active_replication_origins; i++)
656 {
659 XLogRecPtr local_lsn;
660
661 if (curstate->roident == InvalidReplOriginId)
662 continue;
663
664 /* zero, to avoid uninitialized padding bytes */
665 memset(&disk_state, 0, sizeof(disk_state));
666
668
669 disk_state.roident = curstate->roident;
670
671 disk_state.remote_lsn = curstate->remote_lsn;
672 local_lsn = curstate->local_lsn;
673
674 LWLockRelease(&curstate->lock);
675
676 /* make sure we only write out a commit that's persistent */
677 XLogFlush(local_lsn);
678
679 errno = 0;
680 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
681 sizeof(disk_state))
682 {
683 /* if write didn't set errno, assume problem is no disk space */
684 if (errno == 0)
685 errno = ENOSPC;
688 errmsg("could not write to file \"%s\": %m",
689 tmppath)));
690 }
691
693 }
694
696
697 /* write out the CRC */
699 errno = 0;
700 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
701 {
702 /* if write didn't set errno, assume problem is no disk space */
703 if (errno == 0)
704 errno = ENOSPC;
707 errmsg("could not write to file \"%s\": %m",
708 tmppath)));
709 }
710
711 if (CloseTransientFile(tmpfd) != 0)
714 errmsg("could not close file \"%s\": %m",
715 tmppath)));
716
717 /* fsync, rename to permanent file, fsync file and directory */
719}
#define PG_BINARY
Definition c.h:1287
uint32_t uint32
Definition c.h:546
int errcode_for_file_access(void)
Definition elog.c:886
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define PANIC
Definition elog.h:42
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:779
int CloseTransientFile(int fd)
Definition fd.c:2851
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2674
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
int max_active_replication_origins
Definition origin.c:104
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:101
static ReplicationState * replication_states
Definition origin.c:176
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:100
#define REPLICATION_STATE_MAGIC
Definition origin.c:192
#define InvalidReplOriginId
Definition origin.h:33
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
static int fb(int x)
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2783
uint64 XLogRecPtr
Definition xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), fb(), FIN_CRC32C, i, INIT_CRC32C, InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, REPLICATION_STATE_MAGIC, replication_states, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )
extern

Definition at line 557 of file origin.c.

558{
559 bool found;
560
562 return;
563
565 ShmemInitStruct("ReplicationOriginState",
567 &found);
569
570 if (!found)
571 {
572 int i;
573
575
577
578 for (i = 0; i < max_active_replication_origins; i++)
579 {
583 }
584 }
585}
#define MemSet(start, val, len)
Definition c.h:1013
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:181
Size ReplicationOriginShmemSize(void)
Definition origin.c:542
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:162

References ConditionVariableInit(), fb(), i, LWLockInitialize(), max_active_replication_origins, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )
extern

Definition at line 542 of file origin.c.

543{
544 Size size = 0;
545
547 return size;
548
549 size = add_size(size, offsetof(ReplicationStateCtl, states));
550
551 size = add_size(size,
553 return size;
554}
size_t Size
Definition c.h:619
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497

References add_size(), fb(), max_active_replication_origins, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( ReplOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)
extern

Definition at line 918 of file origin.c.

921{
922 int i;
925
927
928 /* we don't track DoNotReplicateId */
929 if (node == DoNotReplicateId)
930 return;
931
932 /*
933 * XXX: For the case where this is called by WAL replay, it'd be more
934 * efficient to restore into a backend local hashtable and only dump into
935 * shmem after recovery is finished. Let's wait with implementing that
936 * till it's shown to be a measurable expense
937 */
938
939 /* Lock exclusively, as we may have to create a new table entry. */
941
942 /*
943 * Search for either an existing slot for the origin, or a free one we can
944 * use.
945 */
946 for (i = 0; i < max_active_replication_origins; i++)
947 {
949
950 /* remember where to insert if necessary */
951 if (curstate->roident == InvalidReplOriginId &&
952 free_state == NULL)
953 {
955 continue;
956 }
957
958 /* not our slot */
959 if (curstate->roident != node)
960 {
961 continue;
962 }
963
964 /* ok, found slot */
966
968
969 /* Make sure it's not used by somebody else */
970 if (replication_state->refcount > 0)
971 {
974 (replication_state->acquired_by != 0)
975 ? errmsg("replication origin with ID %d is already active for PID %d",
976 replication_state->roident,
977 replication_state->acquired_by)
978 : errmsg("replication origin with ID %d is already active in another process",
979 replication_state->roident)));
980 }
981
982 break;
983 }
984
988 errmsg("could not find free replication state slot for replication origin with ID %d",
989 node),
990 errhint("Increase \"max_active_replication_origins\" and try again.")));
991
992 if (replication_state == NULL)
993 {
994 /* initialize new slot */
999 replication_state->roident = node;
1000 }
1001
1003
1004 /*
1005 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1006 * and the standby gets the message. Primarily this will be called during
1007 * WAL replay (of commit records) where no WAL logging is necessary.
1008 */
1009 if (wal_log)
1010 {
1012
1014 xlrec.node_id = node;
1015 xlrec.force = go_backward;
1016
1018 XLogRegisterData(&xlrec, sizeof(xlrec));
1019
1021 }
1022
1023 /*
1024 * Due to - harmless - race conditions during a checkpoint we could see
1025 * values here that are older than the ones we already have in memory. We
1026 * could also see older values for prepared transactions when the prepare
1027 * is sent at a later point of time along with commit prepared and there
1028 * are other transactions commits between prepare and commit prepared. See
1029 * ReorderBufferFinishPrepared. Don't overwrite those.
1030 */
1031 if (go_backward || replication_state->remote_lsn < remote_commit)
1032 replication_state->remote_lsn = remote_commit;
1034 (go_backward || replication_state->local_lsn < local_commit))
1035 replication_state->local_lsn = local_commit;
1037
1038 /*
1039 * Release *after* changing the LSNs, slot isn't acquired and thus could
1040 * otherwise be dropped anytime.
1041 */
1043}
#define Assert(condition)
Definition c.h:873
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
#define ERROR
Definition elog.h:39
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
XLogRecPtr remote_lsn
Definition origin.h:20
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152

References Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, fb(), i, InvalidReplOriginId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::remote_lsn, replication_states, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), XLogRecPtrIsValid, and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

ReplOriginId replorigin_by_name ( const char roname,
bool  missing_ok 
)
extern

Definition at line 231 of file origin.c.

232{
234 Oid roident = InvalidOid;
235 HeapTuple tuple;
237
239
241 if (HeapTupleIsValid(tuple))
242 {
244 roident = ident->roident;
245 ReleaseSysCache(tuple);
246 }
247 else if (!missing_ok)
250 errmsg("replication origin \"%s\" does not exist",
251 roname)));
252
253 return roident;
254}
#define CStringGetTextDatum(s)
Definition builtins.h:97
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
FormData_pg_replication_origin * Form_pg_replication_origin
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition syscache.c:220

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( ReplOriginId  roident,
bool  missing_ok,
char **  roname 
)
extern

Definition at line 501 of file origin.c.

502{
503 HeapTuple tuple;
505
506 Assert(OidIsValid((Oid) roident));
507 Assert(roident != InvalidReplOriginId);
508 Assert(roident != DoNotReplicateId);
509
511 ObjectIdGetDatum((Oid) roident));
512
513 if (HeapTupleIsValid(tuple))
514 {
516 *roname = text_to_cstring(&ric->roname);
517 ReleaseSysCache(tuple);
518
519 return true;
520 }
521 else
522 {
523 *roname = NULL;
524
525 if (!missing_ok)
528 errmsg("replication origin with ID %d does not exist",
529 roident)));
530
531 return false;
532 }
533}
#define OidIsValid(objectId)
Definition c.h:788
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
char * text_to_cstring(const text *t)
Definition varlena.c:214

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, InvalidReplOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_create()

ReplOriginId replorigin_create ( const char roname)
extern

Definition at line 262 of file origin.c.

263{
264 Oid roident;
265 HeapTuple tuple = NULL;
266 Relation rel;
269 SysScanDesc scan;
271
272 /*
273 * To avoid needing a TOAST table for pg_replication_origin, we limit
274 * replication origin names to 512 bytes. This should be more than enough
275 * for all practical use.
276 */
280 errmsg("replication origin name is too long"),
281 errdetail("Replication origin names must be no longer than %d bytes.",
283
285
287
288 /*
289 * We need the numeric replication origin to be 16bit wide, so we cannot
290 * rely on the normal oid allocation. Instead we simply scan
291 * pg_replication_origin for the first unused id. That's not particularly
292 * efficient, but this should be a fairly infrequent operation - we can
293 * easily spend a bit more code on this when it turns out it needs to be
294 * faster.
295 *
296 * We handle concurrency by taking an exclusive lock (allowing reads!)
297 * over the table for the duration of the search. Because we use a "dirty
298 * snapshot" we can read rows that other in-progress sessions have
299 * written, even though they would be invisible with normal snapshots. Due
300 * to the exclusive lock there's no danger that new rows can appear while
301 * we're checking.
302 */
304
306
307 /*
308 * We want to be able to access pg_replication_origin without setting up a
309 * snapshot. To make that safe, it needs to not have a TOAST table, since
310 * TOASTed data cannot be fetched without a snapshot. As of this writing,
311 * its only varlena column is roname, which we limit to 512 bytes to avoid
312 * needing out-of-line storage. If you add a TOAST table to this catalog,
313 * be sure to set up a snapshot everywhere it might be needed. For more
314 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
315 */
316 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
317
318 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
319 {
320 bool nulls[Natts_pg_replication_origin];
322 bool collides;
323
325
326 ScanKeyInit(&key,
329 ObjectIdGetDatum(roident));
330
332 true /* indexOK */ ,
334 1, &key);
335
337
338 systable_endscan(scan);
339
340 if (!collides)
341 {
342 /*
343 * Ok, found an unused roident, insert the new row and do a CCI,
344 * so our callers can look it up if they want to.
345 */
346 memset(&nulls, 0, sizeof(nulls));
347
350
351 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
352 CatalogTupleInsert(rel, tuple);
354 break;
355 }
356 }
357
358 /* now release lock again, */
360
361 if (tuple == NULL)
364 errmsg("could not find free replication origin ID")));
365
366 heap_freetuple(tuple);
367 return roident;
368}
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define PG_UINT16_MAX
Definition c.h:601
int errdetail(const char *fmt,...)
Definition elog.c:1216
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define ExclusiveLock
Definition lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define MAX_RONAME_LEN
Definition origin.h:41
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
Form_pg_class rd_rel
Definition rel.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool IsTransactionState(void)
Definition xact.c:388
void CommandCounterIncrement(void)
Definition xact.c:1101

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg(), ERROR, ExclusiveLock, fb(), heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_desc()

void replorigin_desc ( StringInfo  buf,
XLogReaderState record 
)
extern

Definition at line 19 of file replorigindesc.c.

20{
21 char *rec = XLogRecGetData(record);
22 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
23
24 switch (info)
25 {
27 {
29
30 xlrec = (xl_replorigin_set *) rec;
31
32 appendStringInfo(buf, "set %u; lsn %X/%08X; force: %d",
33 xlrec->node_id,
34 LSN_FORMAT_ARGS(xlrec->remote_lsn),
35 xlrec->force);
36 break;
37 }
39 {
41
42 xlrec = (xl_replorigin_drop *) rec;
43
44 appendStringInfo(buf, "drop %u", xlrec->node_id);
45 break;
46 }
47 }
48}
uint8_t uint8
Definition c.h:544
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
static char buf[DEFAULT_XLOG_SEG_SIZE]
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414

References appendStringInfo(), buf, fb(), LSN_FORMAT_ARGS, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char name,
bool  missing_ok,
bool  nowait 
)
extern

Definition at line 447 of file origin.c.

448{
449 ReplOriginId roident;
450 Relation rel;
451 HeapTuple tuple;
452
454
456
457 roident = replorigin_by_name(name, missing_ok);
458
459 /* Lock the origin to prevent concurrent drops. */
462
464 if (!HeapTupleIsValid(tuple))
465 {
466 if (!missing_ok)
467 elog(ERROR, "cache lookup failed for replication origin with ID %d",
468 roident);
469
470 /*
471 * We don't need to retain the locks if the origin is already dropped.
472 */
476 return;
477 }
478
479 replorigin_state_clear(roident, nowait);
480
481 /*
482 * Now, we can delete the catalog entry.
483 */
484 CatalogTupleDelete(rel, &tuple->t_self);
485 ReleaseSysCache(tuple);
486
488
489 /* We keep the lock on pg_replication_origin until commit */
490 table_close(rel, NoLock);
491}
#define elog(elevel,...)
Definition elog.h:226
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:231
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:374
ItemPointerData t_self
Definition htup.h:65
const char * name
uint16 ReplOriginId
Definition xlogdefs.h:69

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, fb(), HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), ProcessSyncingTablesForApply(), and ProcessSyncingTablesForSync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( ReplOriginId  node,
bool  flush 
)
extern

Definition at line 1047 of file origin.c.

1048{
1049 int i;
1050 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1051 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1052
1053 /* prevent slots from being concurrently dropped */
1055
1056 for (i = 0; i < max_active_replication_origins; i++)
1057 {
1059
1061
1062 if (state->roident == node)
1063 {
1064 LWLockAcquire(&state->lock, LW_SHARED);
1065
1066 remote_lsn = state->remote_lsn;
1067 local_lsn = state->local_lsn;
1068
1069 LWLockRelease(&state->lock);
1070
1071 break;
1072 }
1073 }
1074
1076
1077 if (flush && XLogRecPtrIsValid(local_lsn))
1078 XLogFlush(local_lsn);
1079
1080 return remote_lsn;
1081}
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References fb(), i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, XLogFlush(), and XLogRecPtrIsValid.

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_identify()

const char * replorigin_identify ( uint8  info)
extern

Definition at line 51 of file replorigindesc.c.

52{
53 switch (info)
54 {
56 return "SET";
58 return "DROP";
59 default:
60 return NULL;
61 }
62}

References fb(), XLOG_REPLORIGIN_DROP, and XLOG_REPLORIGIN_SET.

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)
extern

Definition at line 857 of file origin.c.

858{
859 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
860
861 switch (info)
862 {
864 {
867
868 replorigin_advance(xlrec->node_id,
869 xlrec->remote_lsn, record->EndRecPtr,
870 xlrec->force /* backward */ ,
871 false /* WAL log */ );
872 break;
873 }
875 {
877 int i;
878
880
881 for (i = 0; i < max_active_replication_origins; i++)
882 {
884
885 /* found our slot */
886 if (state->roident == xlrec->node_id)
887 {
888 /* reset entry */
889 state->roident = InvalidReplOriginId;
890 state->remote_lsn = InvalidXLogRecPtr;
891 state->local_lsn = InvalidXLogRecPtr;
892 break;
893 }
894 }
895 break;
896 }
897 default:
898 elog(PANIC, "replorigin_redo: unknown op code %u", info);
899 }
900}
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:918
XLogRecPtr EndRecPtr
Definition xlogreader.h:206

References elog, XLogReaderState::EndRecPtr, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, max_active_replication_origins, PANIC, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)
extern

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )
extern

Definition at line 1276 of file origin.c.

1277{
1279
1281 ereport(ERROR,
1283 errmsg("no replication origin is configured")));
1284
1285 /*
1286 * Restrict explicit resetting of the replication origin if it was first
1287 * acquired by this process and others are still using it. While the
1288 * system handles this safely (as happens if the first session exits
1289 * without calling reset), it is best to avoid doing so.
1290 */
1293 ereport(ERROR,
1295 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1297 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1298 errhint("Reset the replication origin in all other processes before retrying.")));
1299
1301}
int MyProcPid
Definition globals.c:47
static void replorigin_session_reset_internal(void)
Definition origin.c:1085

References ReplicationState::acquired_by, Assert, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, fb(), max_active_replication_origins, MyProcPid, ReplicationState::refcount, replorigin_session_reset_internal(), ReplicationState::roident, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and ProcessSyncingTablesForSync().

◆ replorigin_session_setup()

void replorigin_session_setup ( ReplOriginId  node,
int  acquired_by 
)
extern

Definition at line 1146 of file origin.c.

1147{
1148 static bool registered_cleanup;
1149 int i;
1150 int free_slot = -1;
1151
1152 if (!registered_cleanup)
1153 {
1155 registered_cleanup = true;
1156 }
1157
1159
1161 ereport(ERROR,
1163 errmsg("cannot setup replication origin when one is already setup")));
1164
1165 /* Lock exclusively, as we may have to create a new table entry. */
1167
1168 /*
1169 * Search for either an existing slot for the origin, or a free one we can
1170 * use.
1171 */
1172 for (i = 0; i < max_active_replication_origins; i++)
1173 {
1175
1176 /* remember where to insert if necessary */
1177 if (curstate->roident == InvalidReplOriginId &&
1178 free_slot == -1)
1179 {
1180 free_slot = i;
1181 continue;
1182 }
1183
1184 /* not our slot */
1185 if (curstate->roident != node)
1186 continue;
1187
1188 else if (curstate->acquired_by != 0 && acquired_by == 0)
1189 {
1190 ereport(ERROR,
1192 errmsg("replication origin with ID %d is already active for PID %d",
1193 curstate->roident, curstate->acquired_by)));
1194 }
1195
1196 else if (curstate->acquired_by != acquired_by)
1197 {
1198 ereport(ERROR,
1200 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1201 node, acquired_by)));
1202 }
1203
1204 /*
1205 * The origin is in use, but PID is not recorded. This can happen if
1206 * the process that originally acquired the origin exited without
1207 * releasing it. To ensure correctness, other processes cannot acquire
1208 * the origin until all processes currently using it have released it.
1209 */
1210 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1211 ereport(ERROR,
1213 errmsg("replication origin with ID %d is already active in another process",
1214 curstate->roident)));
1215
1216 /* ok, found slot */
1218 break;
1219 }
1220
1221
1223 ereport(ERROR,
1225 errmsg("could not find free replication state slot for replication origin with ID %d",
1226 node),
1227 errhint("Increase \"max_active_replication_origins\" and try again.")));
1228 else if (session_replication_state == NULL)
1229 {
1230 if (acquired_by)
1231 ereport(ERROR,
1233 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1234 acquired_by, node)));
1235
1236 /* initialize new slot */
1241 }
1242
1243
1245
1246 if (acquired_by == 0)
1247 {
1250 }
1251 else
1252 {
1253 /*
1254 * Sanity check: the origin must already be acquired by the process
1255 * passed as input, and at least one process must be using it.
1256 */
1259 }
1260
1262
1264
1265 /* probably this one is pointless */
1267}
void ConditionVariableBroadcast(ConditionVariable *cv)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1119
ConditionVariable origin_cv
Definition origin.c:139

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, fb(), i, InvalidReplOriginId, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::refcount, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, session_replication_state, and XLogRecPtrIsValid.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_xact_clear()

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )
extern

Definition at line 730 of file origin.c.

731{
732 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
733 int fd;
734 int readBytes;
736 int last_state = 0;
739
740 /* don't want to overwrite already existing state */
741#ifdef USE_ASSERT_CHECKING
742 static bool already_started = false;
743
745 already_started = true;
746#endif
747
749 return;
750
752
753 elog(DEBUG2, "starting up replication origin progress state");
754
756
757 /*
758 * might have had max_active_replication_origins == 0 last run, or we just
759 * brought up a standby.
760 */
761 if (fd < 0 && errno == ENOENT)
762 return;
763 else if (fd < 0)
766 errmsg("could not open file \"%s\": %m",
767 path)));
768
769 /* verify magic, that is written even if nothing was active */
770 readBytes = read(fd, &magic, sizeof(magic));
771 if (readBytes != sizeof(magic))
772 {
773 if (readBytes < 0)
776 errmsg("could not read file \"%s\": %m",
777 path)));
778 else
781 errmsg("could not read file \"%s\": read %d of %zu",
782 path, readBytes, sizeof(magic))));
783 }
784 COMP_CRC32C(crc, &magic, sizeof(magic));
785
786 if (magic != REPLICATION_STATE_MAGIC)
788 (errmsg("replication checkpoint has wrong magic %u instead of %u",
789 magic, REPLICATION_STATE_MAGIC)));
790
791 /* we can skip locking here, no other access is possible */
792
793 /* recover individual states, until there are no more to be found */
794 while (true)
795 {
797
798 readBytes = read(fd, &disk_state, sizeof(disk_state));
799
800 if (readBytes < 0)
801 {
804 errmsg("could not read file \"%s\": %m",
805 path)));
806 }
807
808 /* no further data */
809 if (readBytes == sizeof(crc))
810 {
811 memcpy(&file_crc, &disk_state, sizeof(file_crc));
812 break;
813 }
814
815 if (readBytes != sizeof(disk_state))
816 {
819 errmsg("could not read file \"%s\": read %d of %zu",
820 path, readBytes, sizeof(disk_state))));
821 }
822
824
828 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
829
830 /* copy data to shared memory */
833 last_state++;
834
835 ereport(LOG,
836 errmsg("recovered replication state of node %d to %X/%08X",
837 disk_state.roident,
838 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
839 }
840
841 /* now check checksum */
843 if (file_crc != crc)
846 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
847 crc, file_crc)));
848
849 if (CloseTransientFile(fd) != 0)
852 errmsg("could not close file \"%s\": %m",
853 path)));
854}
#define LOG
Definition elog.h:31
#define DEBUG2
Definition elog.h:29
#define read(a, b, c)
Definition win32.h:13
#define ERRCODE_DATA_CORRUPTED
static int fd(const char *x, int i)

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fb(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, and ReplicationState::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replorigin_xact_state