PostgreSQL Source Code  git master
origin.h File Reference
Include dependency graph for origin.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  xl_replorigin_set
 
struct  xl_replorigin_drop
 

Macros

#define XLOG_REPLORIGIN_SET   0x00
 
#define XLOG_REPLORIGIN_DROP   0x10
 
#define InvalidRepOriginId   0
 
#define DoNotReplicateId   PG_UINT16_MAX
 

Typedefs

typedef struct xl_replorigin_set xl_replorigin_set
 
typedef struct xl_replorigin_drop xl_replorigin_drop
 

Functions

RepOriginId replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId replorigin_create (const char *roname)
 
void replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
void replorigin_session_setup (RepOriginId node, int acquired_by)
 
void replorigin_session_reset (void)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_desc (StringInfo buf, XLogReaderState *record)
 
const char * replorigin_identify (uint8 info)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 

Variables

PGDLLIMPORT RepOriginId replorigin_session_origin
 
PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn
 
PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp
 

Macro Definition Documentation

◆ DoNotReplicateId

#define DoNotReplicateId   PG_UINT16_MAX

Definition at line 34 of file origin.h.

◆ InvalidRepOriginId

#define InvalidRepOriginId   0

Definition at line 33 of file origin.h.

◆ XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_DROP   0x10

Definition at line 31 of file origin.h.

◆ XLOG_REPLORIGIN_SET

#define XLOG_REPLORIGIN_SET   0x00

Definition at line 30 of file origin.h.

Typedef Documentation

◆ xl_replorigin_drop

◆ xl_replorigin_set

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 574 of file origin.c.

575 {
576  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
577  const char *path = "pg_logical/replorigin_checkpoint";
578  int tmpfd;
579  int i;
581  pg_crc32c crc;
582 
583  if (max_replication_slots == 0)
584  return;
585 
586  INIT_CRC32C(crc);
587 
588  /* make sure no old temp file is remaining */
589  if (unlink(tmppath) < 0 && errno != ENOENT)
590  ereport(PANIC,
592  errmsg("could not remove file \"%s\": %m",
593  tmppath)));
594 
595  /*
596  * no other backend can perform this at the same time; only one checkpoint
597  * can happen at a time.
598  */
599  tmpfd = OpenTransientFile(tmppath,
600  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
601  if (tmpfd < 0)
602  ereport(PANIC,
604  errmsg("could not create file \"%s\": %m",
605  tmppath)));
606 
607  /* write magic */
608  errno = 0;
609  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
610  {
611  /* if write didn't set errno, assume problem is no disk space */
612  if (errno == 0)
613  errno = ENOSPC;
614  ereport(PANIC,
616  errmsg("could not write to file \"%s\": %m",
617  tmppath)));
618  }
619  COMP_CRC32C(crc, &magic, sizeof(magic));
620 
621  /* prevent concurrent creations/drops */
622  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
623 
624  /* write actual data */
625  for (i = 0; i < max_replication_slots; i++)
626  {
627  ReplicationStateOnDisk disk_state;
628  ReplicationState *curstate = &replication_states[i];
629  XLogRecPtr local_lsn;
630 
631  if (curstate->roident == InvalidRepOriginId)
632  continue;
633 
634  /* zero, to avoid uninitialized padding bytes */
635  memset(&disk_state, 0, sizeof(disk_state));
636 
637  LWLockAcquire(&curstate->lock, LW_SHARED);
638 
639  disk_state.roident = curstate->roident;
640 
641  disk_state.remote_lsn = curstate->remote_lsn;
642  local_lsn = curstate->local_lsn;
643 
644  LWLockRelease(&curstate->lock);
645 
646  /* make sure we only write out a commit that's persistent */
647  XLogFlush(local_lsn);
648 
649  errno = 0;
650  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
651  sizeof(disk_state))
652  {
653  /* if write didn't set errno, assume problem is no disk space */
654  if (errno == 0)
655  errno = ENOSPC;
656  ereport(PANIC,
658  errmsg("could not write to file \"%s\": %m",
659  tmppath)));
660  }
661 
662  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
663  }
664 
665  LWLockRelease(ReplicationOriginLock);
666 
667  /* write out the CRC */
668  FIN_CRC32C(crc);
669  errno = 0;
670  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
671  {
672  /* if write didn't set errno, assume problem is no disk space */
673  if (errno == 0)
674  errno = ENOSPC;
675  ereport(PANIC,
677  errmsg("could not write to file \"%s\": %m",
678  tmppath)));
679  }
680 
681  if (CloseTransientFile(tmpfd) != 0)
682  ereport(PANIC,
684  errmsg("could not close file \"%s\": %m",
685  tmppath)));
686 
687  /* fsync, rename to permanent file, fsync file and directory */
688  durable_rename(tmppath, path, PANIC);
689 }
unsigned int uint32
Definition: c.h:495
#define PG_BINARY
Definition: c.h:1262
int errcode_for_file_access(void)
Definition: elog.c:883
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define PANIC
Definition: elog.h:42
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
int CloseTransientFile(int fd)
Definition: fd.c:2809
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:73
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:117
static ReplicationState * replication_states
Definition: origin.c:167
#define REPLICATION_STATE_MAGIC
Definition: origin.c:183
#define InvalidRepOriginId
Definition: origin.h:33
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
return crc
int max_replication_slots
Definition: slot.c:119
XLogRecPtr remote_lsn
Definition: origin.c:143
RepOriginId roident
Definition: origin.c:142
XLogRecPtr remote_lsn
Definition: origin.c:112
XLogRecPtr local_lsn
Definition: origin.c:119
RepOriginId roident
Definition: origin.c:107
LWLock lock
Definition: origin.c:134
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2733
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 527 of file origin.c.

528 {
529  bool found;
530 
531  if (max_replication_slots == 0)
532  return;
533 
535  ShmemInitStruct("ReplicationOriginState",
537  &found);
539 
540  if (!found)
541  {
542  int i;
543 
545 
547 
548  for (i = 0; i < max_replication_slots; i++)
549  {
553  }
554  }
555 }
#define MemSet(start, val, len)
Definition: c.h:1009
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:703
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:190
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:172
Size ReplicationOriginShmemSize(void)
Definition: origin.c:507
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:152

References ConditionVariableInit(), i, LWLockInitialize(), LWTRANCHE_REPLICATION_ORIGIN_STATE, max_replication_slots, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 507 of file origin.c.

508 {
509  Size size = 0;
510 
511  /*
512  * XXX: max_replication_slots is arguably the wrong thing to use, as here
513  * we keep the replay state of *remote* transactions. But for now it seems
514  * sufficient to reuse it, rather than introduce a separate GUC.
515  */
516  if (max_replication_slots == 0)
517  return size;
518 
519  size = add_size(size, offsetof(ReplicationStateCtl, states));
520 
521  size = add_size(size,
523  return size;
524 }
size_t Size
Definition: c.h:594
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511

References add_size(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 889 of file origin.c.

892 {
893  int i;
894  ReplicationState *replication_state = NULL;
895  ReplicationState *free_state = NULL;
896 
897  Assert(node != InvalidRepOriginId);
898 
899  /* we don't track DoNotReplicateId */
900  if (node == DoNotReplicateId)
901  return;
902 
903  /*
904  * XXX: For the case where this is called by WAL replay, it'd be more
905  * efficient to restore into a backend local hashtable and only dump into
906  * shmem after recovery is finished. Let's wait with implementing that
907  * till it's shown to be a measurable expense
908  */
909 
910  /* Lock exclusively, as we may have to create a new table entry. */
911  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
912 
913  /*
914  * Search for either an existing slot for the origin, or a free one we can
915  * use.
916  */
917  for (i = 0; i < max_replication_slots; i++)
918  {
919  ReplicationState *curstate = &replication_states[i];
920 
921  /* remember where to insert if necessary */
922  if (curstate->roident == InvalidRepOriginId &&
923  free_state == NULL)
924  {
925  free_state = curstate;
926  continue;
927  }
928 
929  /* not our slot */
930  if (curstate->roident != node)
931  {
932  continue;
933  }
934 
935  /* ok, found slot */
936  replication_state = curstate;
937 
938  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
939 
940  /* Make sure it's not used by somebody else */
941  if (replication_state->acquired_by != 0)
942  {
943  ereport(ERROR,
944  (errcode(ERRCODE_OBJECT_IN_USE),
945  errmsg("replication origin with ID %d is already active for PID %d",
946  replication_state->roident,
947  replication_state->acquired_by)));
948  }
949 
950  break;
951  }
952 
953  if (replication_state == NULL && free_state == NULL)
954  ereport(ERROR,
955  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
956  errmsg("could not find free replication state slot for replication origin with ID %d",
957  node),
958  errhint("Increase max_replication_slots and try again.")));
959 
960  if (replication_state == NULL)
961  {
962  /* initialize new slot */
963  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
964  replication_state = free_state;
965  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
966  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
967  replication_state->roident = node;
968  }
969 
970  Assert(replication_state->roident != InvalidRepOriginId);
971 
972  /*
973  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
974  * and the standby gets the message. Primarily this will be called during
975  * WAL replay (of commit records) where no WAL logging is necessary.
976  */
977  if (wal_log)
978  {
979  xl_replorigin_set xlrec;
980 
981  xlrec.remote_lsn = remote_commit;
982  xlrec.node_id = node;
983  xlrec.force = go_backward;
984 
985  XLogBeginInsert();
986  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
987 
988  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
989  }
990 
991  /*
992  * Due to - harmless - race conditions during a checkpoint we could see
993  * values here that are older than the ones we already have in memory. We
994  * could also see older values for prepared transactions when the prepare
995  * is sent at a later point of time along with commit prepared and there
996  * are other transactions commits between prepare and commit prepared. See
997  * ReorderBufferFinishPrepared. Don't overwrite those.
998  */
999  if (go_backward || replication_state->remote_lsn < remote_commit)
1000  replication_state->remote_lsn = remote_commit;
1001  if (local_commit != InvalidXLogRecPtr &&
1002  (go_backward || replication_state->local_lsn < local_commit))
1003  replication_state->local_lsn = local_commit;
1004  LWLockRelease(&replication_state->lock);
1005 
1006  /*
1007  * Release *after* changing the LSNs, slot isn't acquired and thus could
1008  * otherwise be dropped anytime.
1009  */
1010  LWLockRelease(ReplicationOriginLock);
1011 }
int errhint(const char *fmt,...)
Definition: elog.c:1322
int errcode(int sqlerrcode)
Definition: elog.c:860
#define ERROR
Definition: elog.h:39
Assert(fmt[strlen(fmt) - 1] !='\n')
@ LW_EXCLUSIVE
Definition: lwlock.h:116
#define DoNotReplicateId
Definition: origin.h:34
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:365
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:475
void XLogBeginInsert(void)
Definition: xloginsert.c:150

References ReplicationState::acquired_by, Assert(), DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, ReplicationState::remote_lsn, xl_replorigin_set::remote_lsn, replication_states, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( const char *  roname,
bool  missing_ok 
)

Definition at line 222 of file origin.c.

223 {
225  Oid roident = InvalidOid;
226  HeapTuple tuple;
227  Datum roname_d;
228 
229  roname_d = CStringGetTextDatum(roname);
230 
231  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
232  if (HeapTupleIsValid(tuple))
233  {
235  roident = ident->roident;
236  ReleaseSysCache(tuple);
237  }
238  else if (!missing_ok)
239  ereport(ERROR,
240  (errcode(ERRCODE_UNDEFINED_OBJECT),
241  errmsg("replication origin \"%s\" does not exist",
242  roname)));
243 
244  return roident;
245 }
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define ident
Definition: indent_codes.h:47
FormData_pg_replication_origin * Form_pg_replication_origin
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:267
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:219

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 466 of file origin.c.

467 {
468  HeapTuple tuple;
470 
471  Assert(OidIsValid((Oid) roident));
472  Assert(roident != InvalidRepOriginId);
473  Assert(roident != DoNotReplicateId);
474 
475  tuple = SearchSysCache1(REPLORIGIDENT,
476  ObjectIdGetDatum((Oid) roident));
477 
478  if (HeapTupleIsValid(tuple))
479  {
480  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
481  *roname = text_to_cstring(&ric->roname);
482  ReleaseSysCache(tuple);
483 
484  return true;
485  }
486  else
487  {
488  *roname = NULL;
489 
490  if (!missing_ok)
491  ereport(ERROR,
492  (errcode(ERRCODE_UNDEFINED_OBJECT),
493  errmsg("replication origin with ID %d does not exist",
494  roident)));
495 
496  return false;
497  }
498 }
#define OidIsValid(objectId)
Definition: c.h:764
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
char * text_to_cstring(const text *t)
Definition: varlena.c:217

References Assert(), DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), and send_repl_origin().

◆ replorigin_create()

RepOriginId replorigin_create ( const char *  roname)

Definition at line 253 of file origin.c.

254 {
255  Oid roident;
256  HeapTuple tuple = NULL;
257  Relation rel;
258  Datum roname_d;
259  SnapshotData SnapshotDirty;
260  SysScanDesc scan;
262 
263  roname_d = CStringGetTextDatum(roname);
264 
266 
267  /*
268  * We need the numeric replication origin to be 16bit wide, so we cannot
269  * rely on the normal oid allocation. Instead we simply scan
270  * pg_replication_origin for the first unused id. That's not particularly
271  * efficient, but this should be a fairly infrequent operation - we can
272  * easily spend a bit more code on this when it turns out it needs to be
273  * faster.
274  *
275  * We handle concurrency by taking an exclusive lock (allowing reads!)
276  * over the table for the duration of the search. Because we use a "dirty
277  * snapshot" we can read rows that other in-progress sessions have
278  * written, even though they would be invisible with normal snapshots. Due
279  * to the exclusive lock there's no danger that new rows can appear while
280  * we're checking.
281  */
282  InitDirtySnapshot(SnapshotDirty);
283 
284  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
285 
286  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
287  {
288  bool nulls[Natts_pg_replication_origin];
289  Datum values[Natts_pg_replication_origin];
290  bool collides;
291 
293 
294  ScanKeyInit(&key,
295  Anum_pg_replication_origin_roident,
296  BTEqualStrategyNumber, F_OIDEQ,
297  ObjectIdGetDatum(roident));
298 
299  scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
300  true /* indexOK */ ,
301  &SnapshotDirty,
302  1, &key);
303 
304  collides = HeapTupleIsValid(systable_getnext(scan));
305 
306  systable_endscan(scan);
307 
308  if (!collides)
309  {
310  /*
311  * Ok, found an unused roident, insert the new row and do a CCI,
312  * so our callers can look it up if they want to.
313  */
314  memset(&nulls, 0, sizeof(nulls));
315 
316  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
317  values[Anum_pg_replication_origin_roname - 1] = roname_d;
318 
319  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
320  CatalogTupleInsert(rel, tuple);
322  break;
323  }
324  }
325 
326  /* now release lock again, */
328 
329  if (tuple == NULL)
330  ereport(ERROR,
331  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
332  errmsg("could not find free replication origin ID")));
333 
334  heap_freetuple(tuple);
335  return roident;
336 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define PG_UINT16_MAX
Definition: c.h:576
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:599
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:506
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
#define ExclusiveLock
Definition: lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define RelationGetDescr(relation)
Definition: rel.h:530
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:40
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool IsTransactionState(void)
Definition: xact.c:378
void CommandCounterIncrement(void)
Definition: xact.c:1078

References Assert(), BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum(), PG_UINT16_MAX, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

◆ replorigin_desc()

void replorigin_desc ( StringInfo  buf,
XLogReaderState record 
)

Definition at line 19 of file replorigindesc.c.

20 {
21  char *rec = XLogRecGetData(record);
22  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
23 
24  switch (info)
25  {
27  {
28  xl_replorigin_set *xlrec;
29 
30  xlrec = (xl_replorigin_set *) rec;
31 
32  appendStringInfo(buf, "set %u; lsn %X/%X; force: %d",
33  xlrec->node_id,
35  xlrec->force);
36  break;
37  }
39  {
40  xl_replorigin_drop *xlrec;
41 
42  xlrec = (xl_replorigin_drop *) rec;
43 
44  appendStringInfo(buf, "drop %u", xlrec->node_id);
45  break;
46  }
47  }
48 }
unsigned char uint8
Definition: c.h:493
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
static char * buf
Definition: pg_test_fsync.c:73
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
RepOriginId node_id
Definition: origin.h:27
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References appendStringInfo(), buf, xl_replorigin_set::force, LSN_FORMAT_ARGS, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, xl_replorigin_set::remote_lsn, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

◆ replorigin_drop_by_name()

void replorigin_drop_by_name ( const char *  name,
bool  missing_ok,
bool  nowait 
)

Definition at line 412 of file origin.c.

413 {
414  RepOriginId roident;
415  Relation rel;
416  HeapTuple tuple;
417 
419 
420  rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
421 
422  roident = replorigin_by_name(name, missing_ok);
423 
424  /* Lock the origin to prevent concurrent drops. */
425  LockSharedObject(ReplicationOriginRelationId, roident, 0,
427 
428  tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
429  if (!HeapTupleIsValid(tuple))
430  {
431  if (!missing_ok)
432  elog(ERROR, "cache lookup failed for replication origin with ID %d",
433  roident);
434 
435  /*
436  * We don't need to retain the locks if the origin is already dropped.
437  */
438  UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
441  return;
442  }
443 
444  replorigin_state_clear(roident, nowait);
445 
446  /*
447  * Now, we can delete the catalog entry.
448  */
449  CatalogTupleDelete(rel, &tuple->t_self);
450  ReleaseSysCache(tuple);
451 
453 
454  /* We keep the lock on pg_replication_origin until commit */
455  table_close(rel, NoLock);
456 }
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1105
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:222
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:342
ItemPointerData t_self
Definition: htup.h:65
const char * name
uint16 RepOriginId
Definition: xlogdefs.h:65

References AccessExclusiveLock, Assert(), CatalogTupleDelete(), CommandCounterIncrement(), elog(), ERROR, HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 1015 of file origin.c.

1016 {
1017  int i;
1018  XLogRecPtr local_lsn = InvalidXLogRecPtr;
1019  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1020 
1021  /* prevent slots from being concurrently dropped */
1022  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1023 
1024  for (i = 0; i < max_replication_slots; i++)
1025  {
1027 
1029 
1030  if (state->roident == node)
1031  {
1032  LWLockAcquire(&state->lock, LW_SHARED);
1033 
1034  remote_lsn = state->remote_lsn;
1035  local_lsn = state->local_lsn;
1036 
1037  LWLockRelease(&state->lock);
1038 
1039  break;
1040  }
1041  }
1042 
1043  LWLockRelease(ReplicationOriginLock);
1044 
1045  if (flush && local_lsn != InvalidXLogRecPtr)
1046  XLogFlush(local_lsn);
1047 
1048  return remote_lsn;
1049 }
Definition: regguts.h:323

References i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, replication_states, and XLogFlush().

Referenced by AlterSubscription(), and pg_replication_origin_progress().

◆ replorigin_identify()

const char* replorigin_identify ( uint8  info)

Definition at line 51 of file replorigindesc.c.

52 {
53  switch (info)
54  {
56  return "SET";
58  return "DROP";
59  default:
60  return NULL;
61  }
62 }

References XLOG_REPLORIGIN_DROP, and XLOG_REPLORIGIN_SET.

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 828 of file origin.c.

829 {
830  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
831 
832  switch (info)
833  {
834  case XLOG_REPLORIGIN_SET:
835  {
836  xl_replorigin_set *xlrec =
837  (xl_replorigin_set *) XLogRecGetData(record);
838 
840  xlrec->remote_lsn, record->EndRecPtr,
841  xlrec->force /* backward */ ,
842  false /* WAL log */ );
843  break;
844  }
846  {
847  xl_replorigin_drop *xlrec;
848  int i;
849 
850  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
851 
852  for (i = 0; i < max_replication_slots; i++)
853  {
855 
856  /* found our slot */
857  if (state->roident == xlrec->node_id)
858  {
859  /* reset entry */
860  state->roident = InvalidRepOriginId;
861  state->remote_lsn = InvalidXLogRecPtr;
862  state->local_lsn = InvalidXLogRecPtr;
863  break;
864  }
865  }
866  break;
867  }
868  default:
869  elog(PANIC, "replorigin_redo: unknown op code %u", info);
870  }
871 }
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:889
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References elog(), XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

◆ replorigin_session_advance()

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1238 of file origin.c.

1239 {
1240  XLogRecPtr remote_lsn;
1241  XLogRecPtr local_lsn;
1242 
1244 
1246  remote_lsn = session_replication_state->remote_lsn;
1247  local_lsn = session_replication_state->local_lsn;
1249 
1250  if (flush && local_lsn != InvalidXLogRecPtr)
1251  XLogFlush(local_lsn);
1252 
1253  return remote_lsn;
1254 }

References Assert(), InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, session_replication_state, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), pg_replication_origin_session_progress(), and run_apply_worker().

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1191 of file origin.c.

1192 {
1193  ConditionVariable *cv;
1194 
1196 
1197  if (session_replication_state == NULL)
1198  ereport(ERROR,
1199  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1200  errmsg("no replication origin is configured")));
1201 
1202  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1203 
1207 
1208  LWLockRelease(ReplicationOriginLock);
1209 
1211 }
void ConditionVariableBroadcast(ConditionVariable *cv)
ConditionVariable origin_cv
Definition: origin.c:129

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::origin_cv, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and process_syncing_tables_for_sync().

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node,
int  acquired_by 
)

Definition at line 1098 of file origin.c.

1099 {
1100  static bool registered_cleanup;
1101  int i;
1102  int free_slot = -1;
1103 
1104  if (!registered_cleanup)
1105  {
1107  registered_cleanup = true;
1108  }
1109 
1111 
1112  if (session_replication_state != NULL)
1113  ereport(ERROR,
1114  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1115  errmsg("cannot setup replication origin when one is already setup")));
1116 
1117  /* Lock exclusively, as we may have to create a new table entry. */
1118  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1119 
1120  /*
1121  * Search for either an existing slot for the origin, or a free one we can
1122  * use.
1123  */
1124  for (i = 0; i < max_replication_slots; i++)
1125  {
1126  ReplicationState *curstate = &replication_states[i];
1127 
1128  /* remember where to insert if necessary */
1129  if (curstate->roident == InvalidRepOriginId &&
1130  free_slot == -1)
1131  {
1132  free_slot = i;
1133  continue;
1134  }
1135 
1136  /* not our slot */
1137  if (curstate->roident != node)
1138  continue;
1139 
1140  else if (curstate->acquired_by != 0 && acquired_by == 0)
1141  {
1142  ereport(ERROR,
1143  (errcode(ERRCODE_OBJECT_IN_USE),
1144  errmsg("replication origin with ID %d is already active for PID %d",
1145  curstate->roident, curstate->acquired_by)));
1146  }
1147 
1148  /* ok, found slot */
1149  session_replication_state = curstate;
1150  break;
1151  }
1152 
1153 
1154  if (session_replication_state == NULL && free_slot == -1)
1155  ereport(ERROR,
1156  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1157  errmsg("could not find free replication state slot for replication origin with ID %d",
1158  node),
1159  errhint("Increase max_replication_slots and try again.")));
1160  else if (session_replication_state == NULL)
1161  {
1162  /* initialize new slot */
1167  }
1168 
1169 
1171 
1172  if (acquired_by == 0)
1174  else if (session_replication_state->acquired_by != acquired_by)
1175  elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1176  node, acquired_by);
1177 
1178  LWLockRelease(ReplicationOriginLock);
1179 
1180  /* probably this one is pointless */
1182 }
int MyProcPid
Definition: globals.c:45
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1056

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), elog(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, and session_replication_state.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 700 of file origin.c.

701 {
702  const char *path = "pg_logical/replorigin_checkpoint";
703  int fd;
704  int readBytes;
706  int last_state = 0;
707  pg_crc32c file_crc;
708  pg_crc32c crc;
709 
710  /* don't want to overwrite already existing state */
711 #ifdef USE_ASSERT_CHECKING
712  static bool already_started = false;
713 
714  Assert(!already_started);
715  already_started = true;
716 #endif
717 
718  if (max_replication_slots == 0)
719  return;
720 
721  INIT_CRC32C(crc);
722 
723  elog(DEBUG2, "starting up replication origin progress state");
724 
725  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
726 
727  /*
728  * might have had max_replication_slots == 0 last run, or we just brought
729  * up a standby.
730  */
731  if (fd < 0 && errno == ENOENT)
732  return;
733  else if (fd < 0)
734  ereport(PANIC,
736  errmsg("could not open file \"%s\": %m",
737  path)));
738 
739  /* verify magic, that is written even if nothing was active */
740  readBytes = read(fd, &magic, sizeof(magic));
741  if (readBytes != sizeof(magic))
742  {
743  if (readBytes < 0)
744  ereport(PANIC,
746  errmsg("could not read file \"%s\": %m",
747  path)));
748  else
749  ereport(PANIC,
751  errmsg("could not read file \"%s\": read %d of %zu",
752  path, readBytes, sizeof(magic))));
753  }
754  COMP_CRC32C(crc, &magic, sizeof(magic));
755 
756  if (magic != REPLICATION_STATE_MAGIC)
757  ereport(PANIC,
758  (errmsg("replication checkpoint has wrong magic %u instead of %u",
759  magic, REPLICATION_STATE_MAGIC)));
760 
761  /* we can skip locking here, no other access is possible */
762 
763  /* recover individual states, until there are no more to be found */
764  while (true)
765  {
766  ReplicationStateOnDisk disk_state;
767 
768  readBytes = read(fd, &disk_state, sizeof(disk_state));
769 
770  /* no further data */
771  if (readBytes == sizeof(crc))
772  {
773  /* not pretty, but simple ... */
774  file_crc = *(pg_crc32c *) &disk_state;
775  break;
776  }
777 
778  if (readBytes < 0)
779  {
780  ereport(PANIC,
782  errmsg("could not read file \"%s\": %m",
783  path)));
784  }
785 
786  if (readBytes != sizeof(disk_state))
787  {
788  ereport(PANIC,
790  errmsg("could not read file \"%s\": read %d of %zu",
791  path, readBytes, sizeof(disk_state))));
792  }
793 
794  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
795 
796  if (last_state == max_replication_slots)
797  ereport(PANIC,
798  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
799  errmsg("could not find free replication state, increase max_replication_slots")));
800 
801  /* copy data to shared memory */
802  replication_states[last_state].roident = disk_state.roident;
803  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
804  last_state++;
805 
806  ereport(LOG,
807  (errmsg("recovered replication state of node %d to %X/%X",
808  disk_state.roident,
809  LSN_FORMAT_ARGS(disk_state.remote_lsn))));
810  }
811 
812  /* now check checksum */
813  FIN_CRC32C(crc);
814  if (file_crc != crc)
815  ereport(PANIC,
817  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
818  crc, file_crc)));
819 
820  if (CloseTransientFile(fd) != 0)
821  ereport(PANIC,
823  errmsg("could not close file \"%s\": %m",
824  path)));
825 }
#define LOG
Definition: elog.h:31
#define DEBUG2
Definition: elog.h:29
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert(), CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog(), ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

Variable Documentation

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp