PostgreSQL Source Code  git master
origin.h File Reference
Include dependency graph for origin.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  xl_replorigin_set
 
struct  xl_replorigin_drop
 

Macros

#define XLOG_REPLORIGIN_SET   0x00
 
#define XLOG_REPLORIGIN_DROP   0x10
 
#define InvalidRepOriginId   0
 
#define DoNotReplicateId   PG_UINT16_MAX
 

Typedefs

typedef struct xl_replorigin_set xl_replorigin_set
 
typedef struct xl_replorigin_drop xl_replorigin_drop
 

Functions

RepOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId replorigin_create (const char *roname)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
void replorigin_session_setup (RepOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_desc (StringInfo buf, XLogReaderState *record)
 
const char * replorigin_identify (uint8 info)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 

Variables

PGDLLIMPORT RepOriginId replorigin_session_origin
 
PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn
 
PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp
 

Macro Definition Documentation

◆ DoNotReplicateId

#define DoNotReplicateId   PG_UINT16_MAX

Definition at line 34 of file origin.h.

◆ InvalidRepOriginId

#define InvalidRepOriginId   0

Definition at line 33 of file origin.h.

◆ XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_DROP   0x10

Definition at line 31 of file origin.h.

◆ XLOG_REPLORIGIN_SET

#define XLOG_REPLORIGIN_SET   0x00

Definition at line 30 of file origin.h.

Typedef Documentation

◆ xl_replorigin_drop

◆ xl_replorigin_set

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 573 of file origin.c.

574 {
575  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
576  const char *path = "pg_logical/replorigin_checkpoint";
577  int tmpfd;
578  int i;
580  pg_crc32c crc;
581 
582  if (max_replication_slots == 0)
583  return;
584 
585  INIT_CRC32C(crc);
586 
587  /* make sure no old temp file is remaining */
588  if (unlink(tmppath) < 0 && errno != ENOENT)
589  ereport(PANIC,
591  errmsg("could not remove file \"%s\": %m",
592  tmppath)));
593 
594  /*
595  * no other backend can perform this at the same time; only one checkpoint
596  * can happen at a time.
597  */
598  tmpfd = OpenTransientFile(tmppath,
599  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
600  if (tmpfd < 0)
601  ereport(PANIC,
603  errmsg("could not create file \"%s\": %m",
604  tmppath)));
605 
606  /* write magic */
607  errno = 0;
608  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
609  {
610  /* if write didn't set errno, assume problem is no disk space */
611  if (errno == 0)
612  errno = ENOSPC;
613  ereport(PANIC,
615  errmsg("could not write to file \"%s\": %m",
616  tmppath)));
617  }
618  COMP_CRC32C(crc, &magic, sizeof(magic));
619 
620  /* prevent concurrent creations/drops */
621  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
622 
623  /* write actual data */
624  for (i = 0; i < max_replication_slots; i++)
625  {
626  ReplicationStateOnDisk disk_state;
627  ReplicationState *curstate = &replication_states[i];
628  XLogRecPtr local_lsn;
629 
630  if (curstate->roident == InvalidRepOriginId)
631  continue;
632 
633  /* zero, to avoid uninitialized padding bytes */
634  memset(&disk_state, 0, sizeof(disk_state));
635 
636  LWLockAcquire(&curstate->lock, LW_SHARED);
637 
638  disk_state.roident = curstate->roident;
639 
640  disk_state.remote_lsn = curstate->remote_lsn;
641  local_lsn = curstate->local_lsn;
642 
643  LWLockRelease(&curstate->lock);
644 
645  /* make sure we only write out a commit that's persistent */
646  XLogFlush(local_lsn);
647 
648  errno = 0;
649  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
650  sizeof(disk_state))
651  {
652  /* if write didn't set errno, assume problem is no disk space */
653  if (errno == 0)
654  errno = ENOSPC;
655  ereport(PANIC,
657  errmsg("could not write to file \"%s\": %m",
658  tmppath)));
659  }
660 
661  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
662  }
663 
664  LWLockRelease(ReplicationOriginLock);
665 
666  /* write out the CRC */
667  FIN_CRC32C(crc);
668  errno = 0;
669  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
670  {
671  /* if write didn't set errno, assume problem is no disk space */
672  if (errno == 0)
673  errno = ENOSPC;
674  ereport(PANIC,
676  errmsg("could not write to file \"%s\": %m",
677  tmppath)));
678  }
679 
680  if (CloseTransientFile(tmpfd) != 0)
681  ereport(PANIC,
683  errmsg("could not close file \"%s\": %m",
684  tmppath)));
685 
686  /* fsync, rename to permanent file, fsync file and directory */
687  durable_rename(tmppath, path, PANIC);
688 }
unsigned int uint32
Definition: c.h:506
#define PG_BINARY
Definition: c.h:1273
int errcode_for_file_access(void)
Definition: elog.c:880
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PANIC
Definition: elog.h:42
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
int CloseTransientFile(int fd)
Definition: fd.c:2809
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:73
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_SHARED
Definition: lwlock.h:115
static ReplicationState * replication_states
Definition: origin.c:166
#define REPLICATION_STATE_MAGIC
Definition: origin.c:182
#define InvalidRepOriginId
Definition: origin.h:33
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
return crc
int max_replication_slots
Definition: slot.c:141
XLogRecPtr remote_lsn
Definition: origin.c:142
RepOriginId roident
Definition: origin.c:141
XLogRecPtr remote_lsn
Definition: origin.c:111
XLogRecPtr local_lsn
Definition: origin.c:118
RepOriginId roident
Definition: origin.c:106
LWLock lock
Definition: origin.c:133
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2791
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 526 of file origin.c.

527 {
528  bool found;
529 
530  if (max_replication_slots == 0)
531  return;
532 
534  ShmemInitStruct("ReplicationOriginState",
536  &found);
538 
539  if (!found)
540  {
541  int i;
542 
544 
546 
547  for (i = 0; i < max_replication_slots; i++)
548  {
552  }
553  }
554 }
#define MemSet(start, val, len)
Definition: c.h:1020
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:709
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:188
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:171
Size ReplicationOriginShmemSize(void)
Definition: origin.c:506
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:151

References ConditionVariableInit(), i, LWLockInitialize(), LWTRANCHE_REPLICATION_ORIGIN_STATE, max_replication_slots, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 506 of file origin.c.

507 {
508  Size size = 0;
509 
510  /*
511  * XXX: max_replication_slots is arguably the wrong thing to use, as here
512  * we keep the replay state of *remote* transactions. But for now it seems
513  * sufficient to reuse it, rather than introduce a separate GUC.
514  */
515  if (max_replication_slots == 0)
516  return size;
517 
518  size = add_size(size, offsetof(ReplicationStateCtl, states));
519 
520  size = add_size(size,
522  return size;
523 }
size_t Size
Definition: c.h:605
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_replication_slots, mul_size(), and size.

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 888 of file origin.c.

891 {
892  int i;
893  ReplicationState *replication_state = NULL;
894  ReplicationState *free_state = NULL;
895 
896  Assert(node != InvalidRepOriginId);
897 
898  /* we don't track DoNotReplicateId */
899  if (node == DoNotReplicateId)
900  return;
901 
902  /*
903  * XXX: For the case where this is called by WAL replay, it'd be more
904  * efficient to restore into a backend local hashtable and only dump into
905  * shmem after recovery is finished. Let's wait with implementing that
906  * till it's shown to be a measurable expense
907  */
908 
909  /* Lock exclusively, as we may have to create a new table entry. */
910  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
911 
912  /*
913  * Search for either an existing slot for the origin, or a free one we can
914  * use.
915  */
916  for (i = 0; i < max_replication_slots; i++)
917  {
918  ReplicationState *curstate = &replication_states[i];
919 
920  /* remember where to insert if necessary */
921  if (curstate->roident == InvalidRepOriginId &&
922  free_state == NULL)
923  {
924  free_state = curstate;
925  continue;
926  }
927 
928  /* not our slot */
929  if (curstate->roident != node)
930  {
931  continue;
932  }
933 
934  /* ok, found slot */
935  replication_state = curstate;
936 
937  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
938 
939  /* Make sure it's not used by somebody else */
940  if (replication_state->acquired_by != 0)
941  {
942  ereport(ERROR,
943  (errcode(ERRCODE_OBJECT_IN_USE),
944  errmsg("replication origin with ID %d is already active for PID %d",
945  replication_state->roident,
946  replication_state->acquired_by)));
947  }
948 
949  break;
950  }
951 
952  if (replication_state == NULL && free_state == NULL)
953  ereport(ERROR,
954  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955  errmsg("could not find free replication state slot for replication origin with ID %d",
956  node),
957  errhint("Increase \"max_replication_slots\" and try again.")));
958 
959  if (replication_state == NULL)
960  {
961  /* initialize new slot */
962  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
963  replication_state = free_state;
964  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
965  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
966  replication_state->roident = node;
967  }
968 
969  Assert(replication_state->roident != InvalidRepOriginId);
970 
971  /*
972  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
973  * and the standby gets the message. Primarily this will be called during
974  * WAL replay (of commit records) where no WAL logging is necessary.
975  */
976  if (wal_log)
977  {
978  xl_replorigin_set xlrec;
979 
980  xlrec.remote_lsn = remote_commit;
981  xlrec.node_id = node;
982  xlrec.force = go_backward;
983 
984  XLogBeginInsert();
985  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
986 
987  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
988  }
989 
990  /*
991  * Due to - harmless - race conditions during a checkpoint we could see
992  * values here that are older than the ones we already have in memory. We
993  * could also see older values for prepared transactions when the prepare
994  * is sent at a later point of time along with commit prepared and there
995  * are other transactions commits between prepare and commit prepared. See
996  * ReorderBufferFinishPrepared. Don't overwrite those.
997  */
998  if (go_backward || replication_state->remote_lsn < remote_commit)
999  replication_state->remote_lsn = remote_commit;
1000  if (local_commit != InvalidXLogRecPtr &&
1001  (go_backward || replication_state->local_lsn < local_commit))
1002  replication_state->local_lsn = local_commit;
1003  LWLockRelease(&replication_state->lock);
1004 
1005  /*
1006  * Release *after* changing the LSNs, slot isn't acquired and thus could
1007  * otherwise be dropped anytime.
1008  */
1009  LWLockRelease(ReplicationOriginLock);
1010 }
#define Assert(condition)
Definition: c.h:858
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:857
#define ERROR
Definition: elog.h:39
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define DoNotReplicateId
Definition: origin.h:34
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:364
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogBeginInsert(void)
Definition: xloginsert.c:149

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, ReplicationState::remote_lsn, xl_replorigin_set::remote_lsn, replication_states, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( const char *  roname,
bool  missing_ok 
)

Definition at line 221 of file origin.c.

222 {
224  Oid roident = InvalidOid;
225  HeapTuple tuple;
226  Datum roname_d;
227 
228  roname_d = CStringGetTextDatum(roname);
229 
230  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
231  if (HeapTupleIsValid(tuple))
232  {
234  roident = ident->roident;
235  ReleaseSysCache(tuple);
236  }
237  else if (!missing_ok)
238  ereport(ERROR,
239  (errcode(ERRCODE_UNDEFINED_OBJECT),
240  errmsg("replication origin \"%s\" does not exist",
241  roname)));
242 
243  return roident;
244 }
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define ident
Definition: indent_codes.h:47
FormData_pg_replication_origin * Form_pg_replication_origin
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:266
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:218

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 465 of file origin.c.

466 {
467  HeapTuple tuple;
469 
470  Assert(OidIsValid((Oid) roident));
471  Assert(roident != InvalidRepOriginId);
472  Assert(roident != DoNotReplicateId);
473 
474  tuple = SearchSysCache1(REPLORIGIDENT,
475  ObjectIdGetDatum((Oid) roident));
476 
477  if (HeapTupleIsValid(tuple))
478  {
479  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
480  *roname = text_to_cstring(&ric->roname);
481  ReleaseSysCache(tuple);
482 
483  return true;
484  }
485  else
486  {
487  *roname = NULL;
488 
489  if (!missing_ok)
490  ereport(ERROR,
491  (errcode(ERRCODE_UNDEFINED_OBJECT),
492  errmsg("replication origin with ID %d does not exist",
493  roident)));
494 
495  return false;
496  }
497 }
#define OidIsValid(objectId)
Definition: c.h:775
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
char * text_to_cstring(const text *t)
Definition: varlena.c:217

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_create()

RepOriginId replorigin_create ( const char *  roname)

Definition at line 252 of file origin.c.

253 {
254  Oid roident;
255  HeapTuple tuple = NULL;
256  Relation rel;
257  Datum roname_d;
258  SnapshotData SnapshotDirty;
259  SysScanDesc scan;
261 
262  roname_d = CStringGetTextDatum(roname);
263 
265 
266  /*
267  * We need the numeric replication origin to be 16bit wide, so we cannot
268  * rely on the normal oid allocation. Instead we simply scan
269  * pg_replication_origin for the first unused id. That's not particularly
270  * efficient, but this should be a fairly infrequent operation - we can
271  * easily spend a bit more code on this when it turns out it needs to be
272  * faster.
273  *
274  * We handle concurrency by taking an exclusive lock (allowing reads!)
275  * over the table for the duration of the search. Because we use a "dirty
276  * snapshot" we can read rows that other in-progress sessions have
277  * written, even though they would be invisible with normal snapshots. Due
278  * to the exclusive lock there's no danger that new rows can appear while
279  * we're checking.
280  */
281  InitDirtySnapshot(SnapshotDirty);
282 
283  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
284 
285  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
286  {
287  bool nulls[Natts_pg_replication_origin];
288  Datum values[Natts_pg_replication_origin];
289  bool collides;
290 
292 
293  ScanKeyInit(&key,
294  Anum_pg_replication_origin_roident,
295  BTEqualStrategyNumber, F_OIDEQ,
296  ObjectIdGetDatum(roident));
297 
298  scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
299  true /* indexOK */ ,
300  &SnapshotDirty,
301  1, &key);
302 
303  collides = HeapTupleIsValid(systable_getnext(scan));
304 
305  systable_endscan(scan);
306 
307  if (!collides)
308  {
309  /*
310  * Ok, found an unused roident, insert the new row and do a CCI,
311  * so our callers can look it up if they want to.
312  */
313  memset(&nulls, 0, sizeof(nulls));
314 
315  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
316  values[Anum_pg_replication_origin_roname - 1] = roname_d;
317 
318  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
319  CatalogTupleInsert(rel, tuple);
321  break;
322  }
323  }
324 
325  /* now release lock again, */
327 
328  if (tuple == NULL)
329  ereport(ERROR,
330  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
331  errmsg("could not find free replication origin ID")));
332 
333  heap_freetuple(tuple);
334  return roident;
335 }
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define PG_UINT16_MAX
Definition: c.h:587
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:596
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:503
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:384
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
#define ExclusiveLock
Definition: lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define RelationGetDescr(relation)
Definition: rel.h:531
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:40
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool IsTransactionState(void)
Definition: xact.c:384
void CommandCounterIncrement(void)
Definition: xact.c:1097

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum(), PG_UINT16_MAX, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_desc()

void replorigin_desc ( StringInfo  buf,
XLogReaderState record 
)

Definition at line 19 of file replorigindesc.c.

20 {
21  char *rec = XLogRecGetData(record);
22  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
23 
24  switch (info)
25  {
27  {
28  xl_replorigin_set *xlrec;
29 
30  xlrec = (xl_replorigin_set *) rec;
31 
32  appendStringInfo(buf, "set %u; lsn %X/%X; force: %d",
33  xlrec->node_id,
35  xlrec->force);
36  break;
37  }
39  {
40  xl_replorigin_drop *xlrec;
41 
42  xlrec = (xl_replorigin_drop *) rec;
43 
44  appendStringInfo(buf, "drop %u", xlrec->node_id);
45  break;
46  }
47  }
48 }
unsigned char uint8
Definition: c.h:504
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
static char * buf
Definition: pg_test_fsync.c:73
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
RepOriginId node_id
Definition: origin.h:27
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References appendStringInfo(), buf, xl_replorigin_set::force, LSN_FORMAT_ARGS, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, xl_replorigin_set::remote_lsn, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char *  name,
bool  missing_ok,
bool  nowait 
)

Definition at line 411 of file origin.c.

412 {
413  RepOriginId roident;
414  Relation rel;
415  HeapTuple tuple;
416 
418 
419  rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
420 
421  roident = replorigin_by_name(name, missing_ok);
422 
423  /* Lock the origin to prevent concurrent drops. */
424  LockSharedObject(ReplicationOriginRelationId, roident, 0,
426 
427  tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
428  if (!HeapTupleIsValid(tuple))
429  {
430  if (!missing_ok)
431  elog(ERROR, "cache lookup failed for replication origin with ID %d",
432  roident);
433 
434  /*
435  * We don't need to retain the locks if the origin is already dropped.
436  */
437  UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
440  return;
441  }
442 
443  replorigin_state_clear(roident, nowait);
444 
445  /*
446  * Now, we can delete the catalog entry.
447  */
448  CatalogTupleDelete(rel, &tuple->t_self);
449  ReleaseSysCache(tuple);
450 
452 
453  /* We keep the lock on pg_replication_origin until commit */
454  table_close(rel, NoLock);
455 }
#define elog(elevel,...)
Definition: elog.h:224
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1083
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1142
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:341
ItemPointerData t_self
Definition: htup.h:65
const char * name
uint16 RepOriginId
Definition: xlogdefs.h:65

References AccessExclusiveLock, Assert, CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 1014 of file origin.c.

1015 {
1016  int i;
1017  XLogRecPtr local_lsn = InvalidXLogRecPtr;
1018  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1019 
1020  /* prevent slots from being concurrently dropped */
1021  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1022 
1023  for (i = 0; i < max_replication_slots; i++)
1024  {
1026 
1028 
1029  if (state->roident == node)
1030  {
1031  LWLockAcquire(&state->lock, LW_SHARED);
1032 
1033  remote_lsn = state->remote_lsn;
1034  local_lsn = state->local_lsn;
1035 
1036  LWLockRelease(&state->lock);
1037 
1038  break;
1039  }
1040  }
1041 
1042  LWLockRelease(ReplicationOriginLock);
1043 
1044  if (flush && local_lsn != InvalidXLogRecPtr)
1045  XLogFlush(local_lsn);
1046 
1047  return remote_lsn;
1048 }
Definition: regguts.h:323

References i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, replication_states, and XLogFlush().

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_identify()

const char* replorigin_identify ( uint8  info)

Definition at line 51 of file replorigindesc.c.

52 {
53  switch (info)
54  {
56  return "SET";
58  return "DROP";
59  default:
60  return NULL;
61  }
62 }

References XLOG_REPLORIGIN_DROP, and XLOG_REPLORIGIN_SET.

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 827 of file origin.c.

828 {
829  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
830 
831  switch (info)
832  {
833  case XLOG_REPLORIGIN_SET:
834  {
835  xl_replorigin_set *xlrec =
836  (xl_replorigin_set *) XLogRecGetData(record);
837 
839  xlrec->remote_lsn, record->EndRecPtr,
840  xlrec->force /* backward */ ,
841  false /* WAL log */ );
842  break;
843  }
845  {
846  xl_replorigin_drop *xlrec;
847  int i;
848 
849  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
850 
851  for (i = 0; i < max_replication_slots; i++)
852  {
854 
855  /* found our slot */
856  if (state->roident == xlrec->node_id)
857  {
858  /* reset entry */
859  state->roident = InvalidRepOriginId;
860  state->remote_lsn = InvalidXLogRecPtr;
861  state->local_lsn = InvalidXLogRecPtr;
862  break;
863  }
864  }
865  break;
866  }
867  default:
868  elog(PANIC, "replorigin_redo: unknown op code %u", info);
869  }
870 }
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:888
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1237 of file origin.c.

1238 {
1239  XLogRecPtr remote_lsn;
1240  XLogRecPtr local_lsn;
1241 
1243 
1245  remote_lsn = session_replication_state->remote_lsn;
1246  local_lsn = session_replication_state->local_lsn;
1248 
1249  if (flush && local_lsn != InvalidXLogRecPtr)
1250  XLogFlush(local_lsn);
1251 
1252  return remote_lsn;
1253 }

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, session_replication_state, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), pg_replication_origin_session_progress(), and run_apply_worker().

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1190 of file origin.c.

1191 {
1192  ConditionVariable *cv;
1193 
1195 
1196  if (session_replication_state == NULL)
1197  ereport(ERROR,
1198  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1199  errmsg("no replication origin is configured")));
1200 
1201  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1202 
1206 
1207  LWLockRelease(ReplicationOriginLock);
1208 
1210 }
void ConditionVariableBroadcast(ConditionVariable *cv)
ConditionVariable origin_cv
Definition: origin.c:128

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::origin_cv, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and process_syncing_tables_for_sync().

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node,
int  acquired_by 
)

Definition at line 1097 of file origin.c.

1098 {
1099  static bool registered_cleanup;
1100  int i;
1101  int free_slot = -1;
1102 
1103  if (!registered_cleanup)
1104  {
1106  registered_cleanup = true;
1107  }
1108 
1110 
1111  if (session_replication_state != NULL)
1112  ereport(ERROR,
1113  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1114  errmsg("cannot setup replication origin when one is already setup")));
1115 
1116  /* Lock exclusively, as we may have to create a new table entry. */
1117  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1118 
1119  /*
1120  * Search for either an existing slot for the origin, or a free one we can
1121  * use.
1122  */
1123  for (i = 0; i < max_replication_slots; i++)
1124  {
1125  ReplicationState *curstate = &replication_states[i];
1126 
1127  /* remember where to insert if necessary */
1128  if (curstate->roident == InvalidRepOriginId &&
1129  free_slot == -1)
1130  {
1131  free_slot = i;
1132  continue;
1133  }
1134 
1135  /* not our slot */
1136  if (curstate->roident != node)
1137  continue;
1138 
1139  else if (curstate->acquired_by != 0 && acquired_by == 0)
1140  {
1141  ereport(ERROR,
1142  (errcode(ERRCODE_OBJECT_IN_USE),
1143  errmsg("replication origin with ID %d is already active for PID %d",
1144  curstate->roident, curstate->acquired_by)));
1145  }
1146 
1147  /* ok, found slot */
1148  session_replication_state = curstate;
1149  break;
1150  }
1151 
1152 
1153  if (session_replication_state == NULL && free_slot == -1)
1154  ereport(ERROR,
1155  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1156  errmsg("could not find free replication state slot for replication origin with ID %d",
1157  node),
1158  errhint("Increase \"max_replication_slots\" and try again.")));
1159  else if (session_replication_state == NULL)
1160  {
1161  /* initialize new slot */
1166  }
1167 
1168 
1170 
1171  if (acquired_by == 0)
1173  else if (session_replication_state->acquired_by != acquired_by)
1174  elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175  node, acquired_by);
1176 
1177  LWLockRelease(ReplicationOriginLock);
1178 
1179  /* probably this one is pointless */
1181 }
int MyProcPid
Definition: globals.c:45
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1055

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), elog, ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, and session_replication_state.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 699 of file origin.c.

700 {
701  const char *path = "pg_logical/replorigin_checkpoint";
702  int fd;
703  int readBytes;
705  int last_state = 0;
706  pg_crc32c file_crc;
707  pg_crc32c crc;
708 
709  /* don't want to overwrite already existing state */
710 #ifdef USE_ASSERT_CHECKING
711  static bool already_started = false;
712 
713  Assert(!already_started);
714  already_started = true;
715 #endif
716 
717  if (max_replication_slots == 0)
718  return;
719 
720  INIT_CRC32C(crc);
721 
722  elog(DEBUG2, "starting up replication origin progress state");
723 
724  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
725 
726  /*
727  * might have had max_replication_slots == 0 last run, or we just brought
728  * up a standby.
729  */
730  if (fd < 0 && errno == ENOENT)
731  return;
732  else if (fd < 0)
733  ereport(PANIC,
735  errmsg("could not open file \"%s\": %m",
736  path)));
737 
738  /* verify magic, that is written even if nothing was active */
739  readBytes = read(fd, &magic, sizeof(magic));
740  if (readBytes != sizeof(magic))
741  {
742  if (readBytes < 0)
743  ereport(PANIC,
745  errmsg("could not read file \"%s\": %m",
746  path)));
747  else
748  ereport(PANIC,
750  errmsg("could not read file \"%s\": read %d of %zu",
751  path, readBytes, sizeof(magic))));
752  }
753  COMP_CRC32C(crc, &magic, sizeof(magic));
754 
755  if (magic != REPLICATION_STATE_MAGIC)
756  ereport(PANIC,
757  (errmsg("replication checkpoint has wrong magic %u instead of %u",
758  magic, REPLICATION_STATE_MAGIC)));
759 
760  /* we can skip locking here, no other access is possible */
761 
762  /* recover individual states, until there are no more to be found */
763  while (true)
764  {
765  ReplicationStateOnDisk disk_state;
766 
767  readBytes = read(fd, &disk_state, sizeof(disk_state));
768 
769  /* no further data */
770  if (readBytes == sizeof(crc))
771  {
772  /* not pretty, but simple ... */
773  file_crc = *(pg_crc32c *) &disk_state;
774  break;
775  }
776 
777  if (readBytes < 0)
778  {
779  ereport(PANIC,
781  errmsg("could not read file \"%s\": %m",
782  path)));
783  }
784 
785  if (readBytes != sizeof(disk_state))
786  {
787  ereport(PANIC,
789  errmsg("could not read file \"%s\": read %d of %zu",
790  path, readBytes, sizeof(disk_state))));
791  }
792 
793  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
794 
795  if (last_state == max_replication_slots)
796  ereport(PANIC,
797  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798  errmsg("could not find free replication state, increase \"max_replication_slots\"")));
799 
800  /* copy data to shared memory */
801  replication_states[last_state].roident = disk_state.roident;
802  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
803  last_state++;
804 
805  ereport(LOG,
806  (errmsg("recovered replication state of node %d to %X/%X",
807  disk_state.roident,
808  LSN_FORMAT_ARGS(disk_state.remote_lsn))));
809  }
810 
811  /* now check checksum */
812  FIN_CRC32C(crc);
813  if (file_crc != crc)
814  ereport(PANIC,
816  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
817  crc, file_crc)));
818 
819  if (CloseTransientFile(fd) != 0)
820  ereport(PANIC,
822  errmsg("could not close file \"%s\": %m",
823  path)));
824 }
#define LOG
Definition: elog.h:31
#define DEBUG2
Definition: elog.h:29
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert, CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp