PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.h File Reference
Include dependency graph for origin.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  xl_replorigin_set
 
struct  xl_replorigin_drop
 
struct  ReplOriginXactState
 

Macros

#define XLOG_REPLORIGIN_SET   0x00
 
#define XLOG_REPLORIGIN_DROP   0x10
 
#define InvalidReplOriginId   0
 
#define DoNotReplicateId   PG_UINT16_MAX
 
#define MAX_RONAME_LEN   512
 

Typedefs

typedef struct xl_replorigin_set xl_replorigin_set
 
typedef struct xl_replorigin_drop xl_replorigin_drop
 
typedef struct ReplOriginXactState ReplOriginXactState
 

Functions

ReplOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
ReplOriginId replorigin_create (const char *roname)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (ReplOriginId roident, bool missing_ok, char **roname)
 
void replorigin_advance (ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (ReplOriginId node, bool flush)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
void replorigin_session_setup (ReplOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void replorigin_xact_clear (bool clear_origin)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_desc (StringInfo buf, XLogReaderState *record)
 
const charreplorigin_identify (uint8 info)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 

Variables

PGDLLIMPORT ReplOriginXactState replorigin_xact_state
 
PGDLLIMPORT int max_active_replication_origins
 

Macro Definition Documentation

◆ DoNotReplicateId

#define DoNotReplicateId   PG_UINT16_MAX

Definition at line 34 of file origin.h.

◆ InvalidReplOriginId

#define InvalidReplOriginId   0

Definition at line 33 of file origin.h.

◆ MAX_RONAME_LEN

#define MAX_RONAME_LEN   512

Definition at line 41 of file origin.h.

◆ XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_DROP   0x10

Definition at line 31 of file origin.h.

◆ XLOG_REPLORIGIN_SET

#define XLOG_REPLORIGIN_SET   0x00

Definition at line 30 of file origin.h.

Typedef Documentation

◆ ReplOriginXactState

◆ xl_replorigin_drop

◆ xl_replorigin_set

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )
extern

Definition at line 605 of file origin.c.

606{
608 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
609 int tmpfd;
610 int i;
613
615 return;
616
618
619 /* make sure no old temp file is remaining */
620 if (unlink(tmppath) < 0 && errno != ENOENT)
623 errmsg("could not remove file \"%s\": %m",
624 tmppath)));
625
626 /*
627 * no other backend can perform this at the same time; only one checkpoint
628 * can happen at a time.
629 */
632 if (tmpfd < 0)
635 errmsg("could not create file \"%s\": %m",
636 tmppath)));
637
638 /* write magic */
639 errno = 0;
640 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
641 {
642 /* if write didn't set errno, assume problem is no disk space */
643 if (errno == 0)
644 errno = ENOSPC;
647 errmsg("could not write to file \"%s\": %m",
648 tmppath)));
649 }
650 COMP_CRC32C(crc, &magic, sizeof(magic));
651
652 /* prevent concurrent creations/drops */
654
655 /* write actual data */
656 for (i = 0; i < max_active_replication_origins; i++)
657 {
660 XLogRecPtr local_lsn;
661
662 if (curstate->roident == InvalidReplOriginId)
663 continue;
664
665 /* zero, to avoid uninitialized padding bytes */
666 memset(&disk_state, 0, sizeof(disk_state));
667
669
670 disk_state.roident = curstate->roident;
671
672 disk_state.remote_lsn = curstate->remote_lsn;
673 local_lsn = curstate->local_lsn;
674
675 LWLockRelease(&curstate->lock);
676
677 /* make sure we only write out a commit that's persistent */
678 XLogFlush(local_lsn);
679
680 errno = 0;
681 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
682 sizeof(disk_state))
683 {
684 /* if write didn't set errno, assume problem is no disk space */
685 if (errno == 0)
686 errno = ENOSPC;
689 errmsg("could not write to file \"%s\": %m",
690 tmppath)));
691 }
692
694 }
695
697
698 /* write out the CRC */
700 errno = 0;
701 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
702 {
703 /* if write didn't set errno, assume problem is no disk space */
704 if (errno == 0)
705 errno = ENOSPC;
708 errmsg("could not write to file \"%s\": %m",
709 tmppath)));
710 }
711
712 if (CloseTransientFile(tmpfd) != 0)
715 errmsg("could not close file \"%s\": %m",
716 tmppath)));
717
718 /* fsync, rename to permanent file, fsync file and directory */
720}
#define PG_BINARY
Definition c.h:1376
uint32_t uint32
Definition c.h:618
int errcode_for_file_access(void)
Definition elog.c:897
#define PANIC
Definition elog.h:42
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
static char * errmsg
int max_active_replication_origins
Definition origin.c:105
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:102
static ReplicationState * replication_states
Definition origin.c:177
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:101
#define REPLICATION_STATE_MAGIC
Definition origin.c:193
#define InvalidReplOriginId
Definition origin.h:33
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
static int fb(int x)
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2767
uint64 XLogRecPtr
Definition xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg, fb(), FIN_CRC32C, i, INIT_CRC32C, InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, REPLICATION_STATE_MAGIC, replication_states, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )
extern

Definition at line 558 of file origin.c.

559{
560 bool found;
561
563 return;
564
566 ShmemInitStruct("ReplicationOriginState",
568 &found);
570
571 if (!found)
572 {
573 int i;
574
576
578
579 for (i = 0; i < max_active_replication_origins; i++)
580 {
584 }
585 }
586}
#define MemSet(start, val, len)
Definition c.h:1109
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:699
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:182
Size ReplicationOriginShmemSize(void)
Definition origin.c:543
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:163

References ConditionVariableInit(), fb(), i, LWLockInitialize(), max_active_replication_origins, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )
extern

Definition at line 543 of file origin.c.

544{
545 Size size = 0;
546
548 return size;
549
550 size = add_size(size, offsetof(ReplicationStateCtl, states));
551
552 size = add_size(size,
554 return size;
555}
size_t Size
Definition c.h:691
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500

References add_size(), fb(), max_active_replication_origins, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( ReplOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)
extern

Definition at line 919 of file origin.c.

922{
923 int i;
926
928
929 /* we don't track DoNotReplicateId */
930 if (node == DoNotReplicateId)
931 return;
932
933 /*
934 * XXX: For the case where this is called by WAL replay, it'd be more
935 * efficient to restore into a backend local hashtable and only dump into
936 * shmem after recovery is finished. Let's wait with implementing that
937 * till it's shown to be a measurable expense
938 */
939
940 /* Lock exclusively, as we may have to create a new table entry. */
942
943 /*
944 * Search for either an existing slot for the origin, or a free one we can
945 * use.
946 */
947 for (i = 0; i < max_active_replication_origins; i++)
948 {
950
951 /* remember where to insert if necessary */
952 if (curstate->roident == InvalidReplOriginId &&
953 free_state == NULL)
954 {
956 continue;
957 }
958
959 /* not our slot */
960 if (curstate->roident != node)
961 {
962 continue;
963 }
964
965 /* ok, found slot */
967
969
970 /* Make sure it's not used by somebody else */
971 if (replication_state->refcount > 0)
972 {
975 (replication_state->acquired_by != 0)
976 ? errmsg("replication origin with ID %d is already active for PID %d",
977 replication_state->roident,
978 replication_state->acquired_by)
979 : errmsg("replication origin with ID %d is already active in another process",
980 replication_state->roident)));
981 }
982
983 break;
984 }
985
989 errmsg("could not find free replication state slot for replication origin with ID %d",
990 node),
991 errhint("Increase \"max_active_replication_origins\" and try again.")));
992
993 if (replication_state == NULL)
994 {
995 /* initialize new slot */
1000 replication_state->roident = node;
1001 }
1002
1004
1005 /*
1006 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1007 * and the standby gets the message. Primarily this will be called during
1008 * WAL replay (of commit records) where no WAL logging is necessary.
1009 */
1010 if (wal_log)
1011 {
1013
1015 xlrec.node_id = node;
1016 xlrec.force = go_backward;
1017
1019 XLogRegisterData(&xlrec, sizeof(xlrec));
1020
1022 }
1023
1024 /*
1025 * Due to - harmless - race conditions during a checkpoint we could see
1026 * values here that are older than the ones we already have in memory. We
1027 * could also see older values for prepared transactions when the prepare
1028 * is sent at a later point of time along with commit prepared and there
1029 * are other transactions commits between prepare and commit prepared. See
1030 * ReorderBufferFinishPrepared. Don't overwrite those.
1031 */
1032 if (go_backward || replication_state->remote_lsn < remote_commit)
1033 replication_state->remote_lsn = remote_commit;
1035 (go_backward || replication_state->local_lsn < local_commit))
1036 replication_state->local_lsn = local_commit;
1038
1039 /*
1040 * Release *after* changing the LSNs, slot isn't acquired and thus could
1041 * otherwise be dropped anytime.
1042 */
1044}
#define Assert(condition)
Definition c.h:945
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
XLogRecPtr remote_lsn
Definition origin.h:20
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:479
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:369
void XLogBeginInsert(void)
Definition xloginsert.c:153

References Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::remote_lsn, replication_states, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), XLogRecPtrIsValid, and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

ReplOriginId replorigin_by_name ( const char roname,
bool  missing_ok 
)
extern

Definition at line 232 of file origin.c.

233{
235 Oid roident = InvalidOid;
236 HeapTuple tuple;
238
240
242 if (HeapTupleIsValid(tuple))
243 {
245 roident = ident->roident;
246 ReleaseSysCache(tuple);
247 }
248 else if (!missing_ok)
251 errmsg("replication origin \"%s\" does not exist",
252 roname)));
253
254 return roident;
255}
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
END_CATALOG_STRUCT typedef FormData_pg_replication_origin * Form_pg_replication_origin
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220

References CStringGetTextDatum, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( ReplOriginId  roident,
bool  missing_ok,
char **  roname 
)
extern

Definition at line 502 of file origin.c.

503{
504 HeapTuple tuple;
506
507 Assert(OidIsValid((Oid) roident));
508 Assert(roident != InvalidReplOriginId);
509 Assert(roident != DoNotReplicateId);
510
512 ObjectIdGetDatum((Oid) roident));
513
514 if (HeapTupleIsValid(tuple))
515 {
517 *roname = text_to_cstring(&ric->roname);
518 ReleaseSysCache(tuple);
519
520 return true;
521 }
522 else
523 {
524 *roname = NULL;
525
526 if (!missing_ok)
529 errmsg("replication origin with ID %d does not exist",
530 roident)));
531
532 return false;
533 }
534}
#define OidIsValid(objectId)
Definition c.h:860
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
char * text_to_cstring(const text *t)
Definition varlena.c:217

References Assert, DoNotReplicateId, ereport, errcode(), errmsg, ERROR, fb(), Form_pg_replication_origin, GETSTRUCT(), HeapTupleIsValid, InvalidReplOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_create()

ReplOriginId replorigin_create ( const char roname)
extern

Definition at line 263 of file origin.c.

264{
265 Oid roident;
266 HeapTuple tuple = NULL;
267 Relation rel;
270 SysScanDesc scan;
272
273 /*
274 * To avoid needing a TOAST table for pg_replication_origin, we limit
275 * replication origin names to 512 bytes. This should be more than enough
276 * for all practical use.
277 */
281 errmsg("replication origin name is too long"),
282 errdetail("Replication origin names must be no longer than %d bytes.",
284
286
288
289 /*
290 * We need the numeric replication origin to be 16bit wide, so we cannot
291 * rely on the normal oid allocation. Instead we simply scan
292 * pg_replication_origin for the first unused id. That's not particularly
293 * efficient, but this should be a fairly infrequent operation - we can
294 * easily spend a bit more code on this when it turns out it needs to be
295 * faster.
296 *
297 * We handle concurrency by taking an exclusive lock (allowing reads!)
298 * over the table for the duration of the search. Because we use a "dirty
299 * snapshot" we can read rows that other in-progress sessions have
300 * written, even though they would be invisible with normal snapshots. Due
301 * to the exclusive lock there's no danger that new rows can appear while
302 * we're checking.
303 */
305
307
308 /*
309 * We want to be able to access pg_replication_origin without setting up a
310 * snapshot. To make that safe, it needs to not have a TOAST table, since
311 * TOASTed data cannot be fetched without a snapshot. As of this writing,
312 * its only varlena column is roname, which we limit to 512 bytes to avoid
313 * needing out-of-line storage. If you add a TOAST table to this catalog,
314 * be sure to set up a snapshot everywhere it might be needed. For more
315 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
316 */
317 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
318
319 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
320 {
321 bool nulls[Natts_pg_replication_origin];
323 bool collides;
324
326
327 ScanKeyInit(&key,
330 ObjectIdGetDatum(roident));
331
333 true /* indexOK */ ,
335 1, &key);
336
338
339 systable_endscan(scan);
340
341 if (!collides)
342 {
343 /*
344 * Ok, found an unused roident, insert the new row and do a CCI,
345 * so our callers can look it up if they want to.
346 */
347 memset(&nulls, 0, sizeof(nulls));
348
351
352 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
353 CatalogTupleInsert(rel, tuple);
355 break;
356 }
357 }
358
359 /* now release lock again, */
361
362 if (tuple == NULL)
365 errmsg("could not find free replication origin ID")));
366
367 heap_freetuple(tuple);
368 return roident;
369}
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define PG_UINT16_MAX
Definition c.h:673
int errdetail(const char *fmt,...) pg_attribute_printf(1
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1037
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define ExclusiveLock
Definition lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define MAX_RONAME_LEN
Definition origin.h:41
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
Form_pg_class rd_rel
Definition rel.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1102

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg, ERROR, ExclusiveLock, fb(), heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_desc()

void replorigin_desc ( StringInfo  buf,
XLogReaderState record 
)
extern

Definition at line 19 of file replorigindesc.c.

20{
21 char *rec = XLogRecGetData(record);
22 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
23
24 switch (info)
25 {
27 {
29
30 xlrec = (xl_replorigin_set *) rec;
31
32 appendStringInfo(buf, "set %u; lsn %X/%08X; force: %d",
33 xlrec->node_id,
34 LSN_FORMAT_ARGS(xlrec->remote_lsn),
35 xlrec->force);
36 break;
37 }
39 {
41
42 xlrec = (xl_replorigin_drop *) rec;
43
44 appendStringInfo(buf, "drop %u", xlrec->node_id);
45 break;
46 }
47 }
48}
uint8_t uint8
Definition c.h:616
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
static char buf[DEFAULT_XLOG_SEG_SIZE]
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414

References appendStringInfo(), buf, fb(), LSN_FORMAT_ARGS, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char name,
bool  missing_ok,
bool  nowait 
)
extern

Definition at line 448 of file origin.c.

449{
450 ReplOriginId roident;
451 Relation rel;
452 HeapTuple tuple;
453
455
457
458 roident = replorigin_by_name(name, missing_ok);
459
460 /* Lock the origin to prevent concurrent drops. */
463
465 if (!HeapTupleIsValid(tuple))
466 {
467 if (!missing_ok)
468 elog(ERROR, "cache lookup failed for replication origin with ID %d",
469 roident);
470
471 /*
472 * We don't need to retain the locks if the origin is already dropped.
473 */
477 return;
478 }
479
480 replorigin_state_clear(roident, nowait);
481
482 /*
483 * Now, we can delete the catalog entry.
484 */
485 CatalogTupleDelete(rel, &tuple->t_self);
486 ReleaseSysCache(tuple);
487
489
490 /* We keep the lock on pg_replication_origin until commit */
491 table_close(rel, NoLock);
492}
#define elog(elevel,...)
Definition elog.h:226
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:232
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:375
ItemPointerData t_self
Definition htup.h:65
const char * name
uint16 ReplOriginId
Definition xlogdefs.h:69

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, fb(), HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), ProcessSyncingTablesForApply(), and ProcessSyncingTablesForSync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( ReplOriginId  node,
bool  flush 
)
extern

Definition at line 1048 of file origin.c.

1049{
1050 int i;
1051 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1052 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1053
1054 /* prevent slots from being concurrently dropped */
1056
1057 for (i = 0; i < max_active_replication_origins; i++)
1058 {
1060
1062
1063 if (state->roident == node)
1064 {
1065 LWLockAcquire(&state->lock, LW_SHARED);
1066
1067 remote_lsn = state->remote_lsn;
1068 local_lsn = state->local_lsn;
1069
1070 LWLockRelease(&state->lock);
1071
1072 break;
1073 }
1074 }
1075
1077
1078 if (flush && XLogRecPtrIsValid(local_lsn))
1079 XLogFlush(local_lsn);
1080
1081 return remote_lsn;
1082}
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References fb(), i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, XLogFlush(), and XLogRecPtrIsValid.

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_identify()

const char * replorigin_identify ( uint8  info)
extern

Definition at line 51 of file replorigindesc.c.

52{
53 switch (info)
54 {
56 return "SET";
58 return "DROP";
59 default:
60 return NULL;
61 }
62}

References fb(), XLOG_REPLORIGIN_DROP, and XLOG_REPLORIGIN_SET.

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)
extern

Definition at line 858 of file origin.c.

859{
860 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
861
862 switch (info)
863 {
865 {
868
869 replorigin_advance(xlrec->node_id,
870 xlrec->remote_lsn, record->EndRecPtr,
871 xlrec->force /* backward */ ,
872 false /* WAL log */ );
873 break;
874 }
876 {
878 int i;
879
881
882 for (i = 0; i < max_active_replication_origins; i++)
883 {
885
886 /* found our slot */
887 if (state->roident == xlrec->node_id)
888 {
889 /* reset entry */
890 state->roident = InvalidReplOriginId;
891 state->remote_lsn = InvalidXLogRecPtr;
892 state->local_lsn = InvalidXLogRecPtr;
893 break;
894 }
895 }
896 break;
897 }
898 default:
899 elog(PANIC, "replorigin_redo: unknown op code %u", info);
900 }
901}
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:919
XLogRecPtr EndRecPtr
Definition xlogreader.h:206

References elog, XLogReaderState::EndRecPtr, fb(), i, InvalidReplOriginId, InvalidXLogRecPtr, max_active_replication_origins, PANIC, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)
extern

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )
extern

Definition at line 1277 of file origin.c.

1278{
1280
1282 ereport(ERROR,
1284 errmsg("no replication origin is configured")));
1285
1286 /*
1287 * Restrict explicit resetting of the replication origin if it was first
1288 * acquired by this process and others are still using it. While the
1289 * system handles this safely (as happens if the first session exits
1290 * without calling reset), it is best to avoid doing so.
1291 */
1294 ereport(ERROR,
1296 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1298 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1299 errhint("Reset the replication origin in all other processes before retrying.")));
1300
1302}
int MyProcPid
Definition globals.c:47
static void replorigin_session_reset_internal(void)
Definition origin.c:1086

References ReplicationState::acquired_by, Assert, ereport, errcode(), errdetail(), errhint(), errmsg, ERROR, fb(), max_active_replication_origins, MyProcPid, ReplicationState::refcount, replorigin_session_reset_internal(), ReplicationState::roident, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and ProcessSyncingTablesForSync().

◆ replorigin_session_setup()

void replorigin_session_setup ( ReplOriginId  node,
int  acquired_by 
)
extern

Definition at line 1147 of file origin.c.

1148{
1149 static bool registered_cleanup;
1150 int i;
1151 int free_slot = -1;
1152
1153 if (!registered_cleanup)
1154 {
1156 registered_cleanup = true;
1157 }
1158
1160
1162 ereport(ERROR,
1164 errmsg("cannot setup replication origin when one is already setup")));
1165
1166 /* Lock exclusively, as we may have to create a new table entry. */
1168
1169 /*
1170 * Search for either an existing slot for the origin, or a free one we can
1171 * use.
1172 */
1173 for (i = 0; i < max_active_replication_origins; i++)
1174 {
1176
1177 /* remember where to insert if necessary */
1178 if (curstate->roident == InvalidReplOriginId &&
1179 free_slot == -1)
1180 {
1181 free_slot = i;
1182 continue;
1183 }
1184
1185 /* not our slot */
1186 if (curstate->roident != node)
1187 continue;
1188
1189 else if (curstate->acquired_by != 0 && acquired_by == 0)
1190 {
1191 ereport(ERROR,
1193 errmsg("replication origin with ID %d is already active for PID %d",
1194 curstate->roident, curstate->acquired_by)));
1195 }
1196
1197 else if (curstate->acquired_by != acquired_by)
1198 {
1199 ereport(ERROR,
1201 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1202 node, acquired_by)));
1203 }
1204
1205 /*
1206 * The origin is in use, but PID is not recorded. This can happen if
1207 * the process that originally acquired the origin exited without
1208 * releasing it. To ensure correctness, other processes cannot acquire
1209 * the origin until all processes currently using it have released it.
1210 */
1211 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1212 ereport(ERROR,
1214 errmsg("replication origin with ID %d is already active in another process",
1215 curstate->roident)));
1216
1217 /* ok, found slot */
1219 break;
1220 }
1221
1222
1224 ereport(ERROR,
1226 errmsg("could not find free replication state slot for replication origin with ID %d",
1227 node),
1228 errhint("Increase \"max_active_replication_origins\" and try again.")));
1229 else if (session_replication_state == NULL)
1230 {
1231 if (acquired_by)
1232 ereport(ERROR,
1234 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1235 acquired_by, node)));
1236
1237 /* initialize new slot */
1242 }
1243
1244
1246
1247 if (acquired_by == 0)
1248 {
1251 }
1252 else
1253 {
1254 /*
1255 * Sanity check: the origin must already be acquired by the process
1256 * passed as input, and at least one process must be using it.
1257 */
1260 }
1261
1263
1265
1266 /* probably this one is pointless */
1268}
void ConditionVariableBroadcast(ConditionVariable *cv)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1120
ConditionVariable origin_cv
Definition origin.c:140

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg, ERROR, fb(), i, InvalidReplOriginId, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::refcount, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, session_replication_state, and XLogRecPtrIsValid.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ replorigin_xact_clear()

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )
extern

Definition at line 731 of file origin.c.

732{
733 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
734 int fd;
735 int readBytes;
737 int last_state = 0;
740
741 /* don't want to overwrite already existing state */
742#ifdef USE_ASSERT_CHECKING
743 static bool already_started = false;
744
746 already_started = true;
747#endif
748
750 return;
751
753
754 elog(DEBUG2, "starting up replication origin progress state");
755
757
758 /*
759 * might have had max_active_replication_origins == 0 last run, or we just
760 * brought up a standby.
761 */
762 if (fd < 0 && errno == ENOENT)
763 return;
764 else if (fd < 0)
767 errmsg("could not open file \"%s\": %m",
768 path)));
769
770 /* verify magic, that is written even if nothing was active */
771 readBytes = read(fd, &magic, sizeof(magic));
772 if (readBytes != sizeof(magic))
773 {
774 if (readBytes < 0)
777 errmsg("could not read file \"%s\": %m",
778 path)));
779 else
782 errmsg("could not read file \"%s\": read %d of %zu",
783 path, readBytes, sizeof(magic))));
784 }
785 COMP_CRC32C(crc, &magic, sizeof(magic));
786
787 if (magic != REPLICATION_STATE_MAGIC)
789 (errmsg("replication checkpoint has wrong magic %u instead of %u",
790 magic, REPLICATION_STATE_MAGIC)));
791
792 /* we can skip locking here, no other access is possible */
793
794 /* recover individual states, until there are no more to be found */
795 while (true)
796 {
798
799 readBytes = read(fd, &disk_state, sizeof(disk_state));
800
801 if (readBytes < 0)
802 {
805 errmsg("could not read file \"%s\": %m",
806 path)));
807 }
808
809 /* no further data */
810 if (readBytes == sizeof(crc))
811 {
812 memcpy(&file_crc, &disk_state, sizeof(file_crc));
813 break;
814 }
815
816 if (readBytes != sizeof(disk_state))
817 {
820 errmsg("could not read file \"%s\": read %d of %zu",
821 path, readBytes, sizeof(disk_state))));
822 }
823
825
829 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
830
831 /* copy data to shared memory */
834 last_state++;
835
836 ereport(LOG,
837 errmsg("recovered replication state of node %d to %X/%08X",
838 disk_state.roident,
839 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
840 }
841
842 /* now check checksum */
844 if (file_crc != crc)
847 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
848 crc, file_crc)));
849
850 if (CloseTransientFile(fd) != 0)
853 errmsg("could not close file \"%s\": %m",
854 path)));
855}
#define LOG
Definition elog.h:31
#define DEBUG2
Definition elog.h:29
#define read(a, b, c)
Definition win32.h:13
#define ERRCODE_DATA_CORRUPTED
static int fd(const char *x, int i)

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, fb(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, and ReplicationState::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ max_active_replication_origins

◆ replorigin_xact_state