PostgreSQL Source Code  git master
origin.h File Reference
Include dependency graph for origin.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  xl_replorigin_set
 
struct  xl_replorigin_drop
 

Macros

#define XLOG_REPLORIGIN_SET   0x00
 
#define XLOG_REPLORIGIN_DROP   0x10
 
#define InvalidRepOriginId   0
 
#define DoNotReplicateId   PG_UINT16_MAX
 

Typedefs

typedef struct xl_replorigin_set xl_replorigin_set
 
typedef struct xl_replorigin_drop xl_replorigin_drop
 

Functions

RepOriginId replorigin_by_name (char *name, bool missing_ok)
 
RepOriginId replorigin_create (char *name)
 
void replorigin_drop (RepOriginId roident, bool nowait)
 
bool replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
void replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
XLogRecPtr replorigin_get_progress (RepOriginId node, bool flush)
 
void replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
void replorigin_session_setup (RepOriginId node)
 
void replorigin_session_reset (void)
 
XLogRecPtr replorigin_session_get_progress (bool flush)
 
void CheckPointReplicationOrigin (void)
 
void StartupReplicationOrigin (void)
 
void replorigin_redo (XLogReaderState *record)
 
void replorigin_desc (StringInfo buf, XLogReaderState *record)
 
const char * replorigin_identify (uint8 info)
 
Size ReplicationOriginShmemSize (void)
 
void ReplicationOriginShmemInit (void)
 

Variables

PGDLLIMPORT RepOriginId replorigin_session_origin
 
PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn
 
PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp
 

Macro Definition Documentation

◆ DoNotReplicateId

#define DoNotReplicateId   PG_UINT16_MAX

◆ InvalidRepOriginId

◆ XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_DROP   0x10

Definition at line 31 of file origin.h.

Referenced by replorigin_desc(), replorigin_drop(), replorigin_identify(), and replorigin_redo().

◆ XLOG_REPLORIGIN_SET

#define XLOG_REPLORIGIN_SET   0x00

Definition at line 30 of file origin.h.

Referenced by replorigin_advance(), replorigin_desc(), replorigin_identify(), and replorigin_redo().

Typedef Documentation

◆ xl_replorigin_drop

◆ xl_replorigin_set

Function Documentation

◆ CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 540 of file origin.c.

References CloseTransientFile(), COMP_CRC32C, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

541 {
542  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
543  const char *path = "pg_logical/replorigin_checkpoint";
544  int tmpfd;
545  int i;
547  pg_crc32c crc;
548 
549  if (max_replication_slots == 0)
550  return;
551 
552  INIT_CRC32C(crc);
553 
554  /* make sure no old temp file is remaining */
555  if (unlink(tmppath) < 0 && errno != ENOENT)
556  ereport(PANIC,
558  errmsg("could not remove file \"%s\": %m",
559  tmppath)));
560 
561  /*
562  * no other backend can perform this at the same time, we're protected by
563  * CheckpointLock.
564  */
565  tmpfd = OpenTransientFile(tmppath,
566  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
567  if (tmpfd < 0)
568  ereport(PANIC,
570  errmsg("could not create file \"%s\": %m",
571  tmppath)));
572 
573  /* write magic */
574  errno = 0;
575  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
576  {
577  /* if write didn't set errno, assume problem is no disk space */
578  if (errno == 0)
579  errno = ENOSPC;
580  ereport(PANIC,
582  errmsg("could not write to file \"%s\": %m",
583  tmppath)));
584  }
585  COMP_CRC32C(crc, &magic, sizeof(magic));
586 
587  /* prevent concurrent creations/drops */
588  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
589 
590  /* write actual data */
591  for (i = 0; i < max_replication_slots; i++)
592  {
593  ReplicationStateOnDisk disk_state;
594  ReplicationState *curstate = &replication_states[i];
595  XLogRecPtr local_lsn;
596 
597  if (curstate->roident == InvalidRepOriginId)
598  continue;
599 
600  /* zero, to avoid uninitialized padding bytes */
601  memset(&disk_state, 0, sizeof(disk_state));
602 
603  LWLockAcquire(&curstate->lock, LW_SHARED);
604 
605  disk_state.roident = curstate->roident;
606 
607  disk_state.remote_lsn = curstate->remote_lsn;
608  local_lsn = curstate->local_lsn;
609 
610  LWLockRelease(&curstate->lock);
611 
612  /* make sure we only write out a commit that's persistent */
613  XLogFlush(local_lsn);
614 
615  errno = 0;
616  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
617  sizeof(disk_state))
618  {
619  /* if write didn't set errno, assume problem is no disk space */
620  if (errno == 0)
621  errno = ENOSPC;
622  ereport(PANIC,
624  errmsg("could not write to file \"%s\": %m",
625  tmppath)));
626  }
627 
628  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
629  }
630 
631  LWLockRelease(ReplicationOriginLock);
632 
633  /* write out the CRC */
634  FIN_CRC32C(crc);
635  errno = 0;
636  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
637  {
638  /* if write didn't set errno, assume problem is no disk space */
639  if (errno == 0)
640  errno = ENOSPC;
641  ereport(PANIC,
643  errmsg("could not write to file \"%s\": %m",
644  tmppath)));
645  }
646 
647  if (CloseTransientFile(tmpfd) != 0)
648  ereport(PANIC,
650  errmsg("could not close file \"%s\": %m",
651  tmppath)));
652 
653  /* fsync, rename to permanent file, fsync file and directory */
654  durable_rename(tmppath, path, PANIC);
655 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr local_lsn
Definition: origin.c:117
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_lsn
Definition: origin.c:141
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:140
#define PANIC
Definition: elog.h:55
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2860
#define PG_BINARY
Definition: c.h:1259
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2404
LWLock lock
Definition: origin.c:132
int magic
Definition: regguts.h:464
int errcode_for_file_access(void)
Definition: elog.c:719
unsigned int uint32
Definition: c.h:429
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:687
int CloseTransientFile(int fd)
Definition: fd.c:2581
#define REPLICATION_STATE_MAGIC
Definition: origin.c:180
#define ereport(elevel,...)
Definition: elog.h:155
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:907
int i
static ReplicationState * replication_states
Definition: origin.c:165
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94

◆ ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 493 of file origin.c.

References ConditionVariableInit(), i, ReplicationState::lock, LWLockInitialize(), LWTRANCHE_REPLICATION_ORIGIN_STATE, max_replication_slots, MemSet, ReplicationState::origin_cv, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateSharedMemoryAndSemaphores().

494 {
495  bool found;
496 
497  if (max_replication_slots == 0)
498  return;
499 
501  ShmemInitStruct("ReplicationOriginState",
503  &found);
505 
506  if (!found)
507  {
508  int i;
509 
511 
513 
514  for (i = 0; i < max_replication_slots; i++)
515  {
519  }
520  }
521 }
#define MemSet(start, val, len)
Definition: c.h:996
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:743
Size ReplicationOriginShmemSize(void)
Definition: origin.c:473
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:170
int max_replication_slots
Definition: slot.c:99
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:150
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 473 of file origin.c.

References add_size(), max_replication_slots, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and ReplicationOriginShmemInit().

474 {
475  Size size = 0;
476 
477  /*
478  * XXX: max_replication_slots is arguably the wrong thing to use, as here
479  * we keep the replay state of *remote* transactions. But for now it seems
480  * sufficient to reuse it, rather than introduce a separate GUC.
481  */
482  if (max_replication_slots == 0)
483  return size;
484 
485  size = add_size(size, offsetof(ReplicationStateCtl, states));
486 
487  size = add_size(size,
489  return size;
490 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
int max_replication_slots
Definition: slot.c:99
size_t Size
Definition: c.h:528
#define offsetof(type, field)
Definition: c.h:715

◆ replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 856 of file origin.c.

References ReplicationState::acquired_by, Assert, DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), and xact_redo_commit().

859 {
860  int i;
861  ReplicationState *replication_state = NULL;
862  ReplicationState *free_state = NULL;
863 
864  Assert(node != InvalidRepOriginId);
865 
866  /* we don't track DoNotReplicateId */
867  if (node == DoNotReplicateId)
868  return;
869 
870  /*
871  * XXX: For the case where this is called by WAL replay, it'd be more
872  * efficient to restore into a backend local hashtable and only dump into
873  * shmem after recovery is finished. Let's wait with implementing that
874  * till it's shown to be a measurable expense
875  */
876 
877  /* Lock exclusively, as we may have to create a new table entry. */
878  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
879 
880  /*
881  * Search for either an existing slot for the origin, or a free one we can
882  * use.
883  */
884  for (i = 0; i < max_replication_slots; i++)
885  {
886  ReplicationState *curstate = &replication_states[i];
887 
888  /* remember where to insert if necessary */
889  if (curstate->roident == InvalidRepOriginId &&
890  free_state == NULL)
891  {
892  free_state = curstate;
893  continue;
894  }
895 
896  /* not our slot */
897  if (curstate->roident != node)
898  {
899  continue;
900  }
901 
902  /* ok, found slot */
903  replication_state = curstate;
904 
905  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
906 
907  /* Make sure it's not used by somebody else */
908  if (replication_state->acquired_by != 0)
909  {
910  ereport(ERROR,
911  (errcode(ERRCODE_OBJECT_IN_USE),
912  errmsg("replication origin with OID %d is already active for PID %d",
913  replication_state->roident,
914  replication_state->acquired_by)));
915  }
916 
917  break;
918  }
919 
920  if (replication_state == NULL && free_state == NULL)
921  ereport(ERROR,
922  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
923  errmsg("could not find free replication state slot for replication origin with OID %u",
924  node),
925  errhint("Increase max_replication_slots and try again.")));
926 
927  if (replication_state == NULL)
928  {
929  /* initialize new slot */
930  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
931  replication_state = free_state;
932  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
933  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
934  replication_state->roident = node;
935  }
936 
937  Assert(replication_state->roident != InvalidRepOriginId);
938 
939  /*
940  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
941  * and the standby gets the message. Primarily this will be called during
942  * WAL replay (of commit records) where no WAL logging is necessary.
943  */
944  if (wal_log)
945  {
946  xl_replorigin_set xlrec;
947 
948  xlrec.remote_lsn = remote_commit;
949  xlrec.node_id = node;
950  xlrec.force = go_backward;
951 
952  XLogBeginInsert();
953  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
954 
955  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
956  }
957 
958  /*
959  * Due to - harmless - race conditions during a checkpoint we could see
960  * values here that are older than the ones we already have in memory.
961  * Don't overwrite those.
962  */
963  if (go_backward || replication_state->remote_lsn < remote_commit)
964  replication_state->remote_lsn = remote_commit;
965  if (local_commit != InvalidXLogRecPtr &&
966  (go_backward || replication_state->local_lsn < local_commit))
967  replication_state->local_lsn = local_commit;
968  LWLockRelease(&replication_state->lock);
969 
970  /*
971  * Release *after* changing the LSNs, slot isn't acquired and thus could
972  * otherwise be dropped anytime.
973  */
974  LWLockRelease(ReplicationOriginLock);
975 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int errhint(const char *fmt,...)
Definition: elog.c:1154
XLogRecPtr local_lsn
Definition: origin.c:117
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:696
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
#define ERROR
Definition: elog.h:45
LWLock lock
Definition: origin.c:132
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
#define ereport(elevel,...)
Definition: elog.h:155
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:792
RepOriginId node_id
Definition: origin.h:21
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:907
int i
static ReplicationState * replication_states
Definition: origin.c:165
void XLogBeginInsert(void)
Definition: xloginsert.c:123

◆ replorigin_by_name()

RepOriginId replorigin_by_name ( char *  name,
bool  missing_ok 
)

Definition at line 209 of file origin.c.

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidOid, ReleaseSysCache(), REPLORIGNAME, ReplicationState::roident, and SearchSysCache1().

Referenced by ApplyWorkerMain(), DropSubscription(), pg_replication_origin_advance(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), and pg_replication_origin_session_setup().

210 {
212  Oid roident = InvalidOid;
213  HeapTuple tuple;
214  Datum roname_d;
215 
216  roname_d = CStringGetTextDatum(roname);
217 
218  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
219  if (HeapTupleIsValid(tuple))
220  {
221  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
222  roident = ident->roident;
223  ReleaseSysCache(tuple);
224  }
225  else if (!missing_ok)
226  ereport(ERROR,
227  (errcode(ERRCODE_UNDEFINED_OBJECT),
228  errmsg("replication origin \"%s\" does not exist",
229  roname)));
230 
231  return roident;
232 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:696
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:45
FormData_pg_replication_origin * Form_pg_replication_origin
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int errmsg(const char *fmt,...)
Definition: elog.c:907
#define CStringGetTextDatum(s)
Definition: builtins.h:82

◆ replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 432 of file origin.c.

References Assert, DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum, OidIsValid, ReleaseSysCache(), REPLORIGIDENT, SearchSysCache1(), and text_to_cstring().

Referenced by pg_show_replication_origin_status(), pgoutput_begin_txn(), and pgoutput_stream_start().

433 {
434  HeapTuple tuple;
436 
437  Assert(OidIsValid((Oid) roident));
438  Assert(roident != InvalidRepOriginId);
439  Assert(roident != DoNotReplicateId);
440 
442  ObjectIdGetDatum((Oid) roident));
443 
444  if (HeapTupleIsValid(tuple))
445  {
446  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
447  *roname = text_to_cstring(&ric->roname);
448  ReleaseSysCache(tuple);
449 
450  return true;
451  }
452  else
453  {
454  *roname = NULL;
455 
456  if (!missing_ok)
457  ereport(ERROR,
458  (errcode(ERRCODE_UNDEFINED_OBJECT),
459  errmsg("replication origin with OID %u does not exist",
460  roident)));
461 
462  return false;
463  }
464 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define DoNotReplicateId
Definition: origin.h:34
int errcode(int sqlerrcode)
Definition: elog.c:696
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:698
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:45
FormData_pg_replication_origin * Form_pg_replication_origin
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define ereport(elevel,...)
Definition: elog.h:155
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:792
#define InvalidRepOriginId
Definition: origin.h:33
char * text_to_cstring(const text *t)
Definition: varlena.c:222
int errmsg(const char *fmt,...)
Definition: elog.c:907

◆ replorigin_create()

RepOriginId replorigin_create ( char *  name)

Definition at line 240 of file origin.c.

References Assert, BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, ObjectIdGetDatum, PG_UINT16_MAX, RelationGetDescr, ReplicationOriginIdentIndex, ReplicationState::roident, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by ApplyWorkerMain(), CreateSubscription(), and pg_replication_origin_create().

241 {
242  Oid roident;
243  HeapTuple tuple = NULL;
244  Relation rel;
245  Datum roname_d;
246  SnapshotData SnapshotDirty;
247  SysScanDesc scan;
249 
250  roname_d = CStringGetTextDatum(roname);
251 
253 
254  /*
255  * We need the numeric replication origin to be 16bit wide, so we cannot
256  * rely on the normal oid allocation. Instead we simply scan
257  * pg_replication_origin for the first unused id. That's not particularly
258  * efficient, but this should be a fairly infrequent operation - we can
259  * easily spend a bit more code on this when it turns out it needs to be
260  * faster.
261  *
262  * We handle concurrency by taking an exclusive lock (allowing reads!)
263  * over the table for the duration of the search. Because we use a "dirty
264  * snapshot" we can read rows that other in-progress sessions have
265  * written, even though they would be invisible with normal snapshots. Due
266  * to the exclusive lock there's no danger that new rows can appear while
267  * we're checking.
268  */
269  InitDirtySnapshot(SnapshotDirty);
270 
271  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
272 
273  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
274  {
275  bool nulls[Natts_pg_replication_origin];
276  Datum values[Natts_pg_replication_origin];
277  bool collides;
278 
280 
281  ScanKeyInit(&key,
282  Anum_pg_replication_origin_roident,
283  BTEqualStrategyNumber, F_OIDEQ,
284  ObjectIdGetDatum(roident));
285 
287  true /* indexOK */ ,
288  &SnapshotDirty,
289  1, &key);
290 
291  collides = HeapTupleIsValid(systable_getnext(scan));
292 
293  systable_endscan(scan);
294 
295  if (!collides)
296  {
297  /*
298  * Ok, found an unused roident, insert the new row and do a CCI,
299  * so our callers can look it up if they want to.
300  */
301  memset(&nulls, 0, sizeof(nulls));
302 
303  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
304  values[Anum_pg_replication_origin_roname - 1] = roname_d;
305 
306  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
307  CatalogTupleInsert(rel, tuple);
309  break;
310  }
311  }
312 
313  /* now release lock again, */
315 
316  if (tuple == NULL)
317  ereport(ERROR,
318  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
319  errmsg("could not find free replication origin OID")));
320 
321  heap_freetuple(tuple);
322  return roident;
323 }
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:75
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:593
#define RelationGetDescr(relation)
Definition: rel.h:483
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:696
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define ReplicationOriginIdentIndex
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:381
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:500
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:45
#define PG_UINT16_MAX
Definition: c.h:510
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:792
bool IsTransactionState(void)
Definition: xact.c:371
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int errmsg(const char *fmt,...)
Definition: elog.c:907
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:82
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:222
#define BTEqualStrategyNumber
Definition: stratnum.h:31

◆ replorigin_desc()

void replorigin_desc ( StringInfo  buf,
XLogReaderState record 
)

Definition at line 19 of file replorigindesc.c.

References appendStringInfo(), xl_replorigin_set::force, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, xl_replorigin_set::remote_lsn, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

20 {
21  char *rec = XLogRecGetData(record);
23 
24  switch (info)
25  {
27  {
28  xl_replorigin_set *xlrec;
29 
30  xlrec = (xl_replorigin_set *) rec;
31 
32  appendStringInfo(buf, "set %u; lsn %X/%X; force: %d",
33  xlrec->node_id,
34  (uint32) (xlrec->remote_lsn >> 32),
35  (uint32) xlrec->remote_lsn,
36  xlrec->force);
37  break;
38  }
40  {
41  xl_replorigin_drop *xlrec;
42 
43  xlrec = (xl_replorigin_drop *) rec;
44 
45  appendStringInfo(buf, "drop %u", xlrec->node_id);
46  break;
47  }
48  }
49 }
unsigned char uint8
Definition: c.h:427
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
unsigned int uint32
Definition: c.h:429
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
RepOriginId node_id
Definition: origin.h:27
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
long info
Definition: regguts.h:467

◆ replorigin_drop()

void replorigin_drop ( RepOriginId  roident,
bool  nowait 
)

Definition at line 332 of file origin.c.

References ReplicationState::acquired_by, Assert, CatalogTupleDelete(), CommandCounterIncrement(), ConditionVariableCancelSleep(), ConditionVariableSleep(), elog, ereport, errcode(), errmsg(), ERROR, ExclusiveLock, HeapTupleIsValid, i, InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, xl_replorigin_drop::node_id, ObjectIdGetDatum, ReplicationState::origin_cv, ReleaseSysCache(), ReplicationState::remote_lsn, REPLORIGIDENT, ReplicationState::roident, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), WAIT_EVENT_REPLICATION_ORIGIN_DROP, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by DropSubscription(), and pg_replication_origin_drop().

333 {
334  HeapTuple tuple;
335  Relation rel;
336  int i;
337 
339 
340  /*
341  * To interlock against concurrent drops, we hold ExclusiveLock on
342  * pg_replication_origin throughout this function.
343  */
344  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
345 
346  /*
347  * First, clean up the slot state info, if there is any matching slot.
348  */
349 restart:
350  tuple = NULL;
351  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
352 
353  for (i = 0; i < max_replication_slots; i++)
354  {
356 
357  if (state->roident == roident)
358  {
359  /* found our slot, is it busy? */
360  if (state->acquired_by != 0)
361  {
362  ConditionVariable *cv;
363 
364  if (nowait)
365  ereport(ERROR,
366  (errcode(ERRCODE_OBJECT_IN_USE),
367  errmsg("could not drop replication origin with OID %d, in use by PID %d",
368  state->roident,
369  state->acquired_by)));
370 
371  /*
372  * We must wait and then retry. Since we don't know which CV
373  * to wait on until here, we can't readily use
374  * ConditionVariablePrepareToSleep (calling it here would be
375  * wrong, since we could miss the signal if we did so); just
376  * use ConditionVariableSleep directly.
377  */
378  cv = &state->origin_cv;
379 
380  LWLockRelease(ReplicationOriginLock);
381 
383  goto restart;
384  }
385 
386  /* first make a WAL log entry */
387  {
388  xl_replorigin_drop xlrec;
389 
390  xlrec.node_id = roident;
391  XLogBeginInsert();
392  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
393  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
394  }
395 
396  /* then clear the in-memory slot */
397  state->roident = InvalidRepOriginId;
398  state->remote_lsn = InvalidXLogRecPtr;
399  state->local_lsn = InvalidXLogRecPtr;
400  break;
401  }
402  }
403  LWLockRelease(ReplicationOriginLock);
405 
406  /*
407  * Now, we can delete the catalog entry.
408  */
410  if (!HeapTupleIsValid(tuple))
411  elog(ERROR, "cache lookup failed for replication origin with oid %u",
412  roident);
413 
414  CatalogTupleDelete(rel, &tuple->t_self);
415  ReleaseSysCache(tuple);
416 
418 
419  /* now release lock again */
421 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
XLogRecPtr local_lsn
Definition: origin.c:117
#define ExclusiveLock
Definition: lockdefs.h:44
int errcode(int sqlerrcode)
Definition: elog.c:696
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:351
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
void ConditionVariableCancelSleep(void)
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:45
ItemPointerData t_self
Definition: htup.h:65
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
void CommandCounterIncrement(void)
Definition: xact.c:1021
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define ereport(elevel,...)
Definition: elog.h:155
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:110
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
RepOriginId node_id
Definition: origin.h:27
#define Assert(condition)
Definition: c.h:792
Definition: regguts.h:298
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
bool IsTransactionState(void)
Definition: xact.c:371
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:907
#define elog(elevel,...)
Definition: elog.h:228
int i
static ReplicationState * replication_states
Definition: origin.c:165
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void XLogBeginInsert(void)
Definition: xloginsert.c:123

◆ replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 979 of file origin.c.

References i, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationState::remote_lsn, ReplicationState::roident, and XLogFlush().

Referenced by pg_replication_origin_progress().

980 {
981  int i;
982  XLogRecPtr local_lsn = InvalidXLogRecPtr;
983  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
984 
985  /* prevent slots from being concurrently dropped */
986  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
987 
988  for (i = 0; i < max_replication_slots; i++)
989  {
991 
992  state = &replication_states[i];
993 
994  if (state->roident == node)
995  {
996  LWLockAcquire(&state->lock, LW_SHARED);
997 
998  remote_lsn = state->remote_lsn;
999  local_lsn = state->local_lsn;
1000 
1001  LWLockRelease(&state->lock);
1002 
1003  break;
1004  }
1005  }
1006 
1007  LWLockRelease(ReplicationOriginLock);
1008 
1009  if (flush && local_lsn != InvalidXLogRecPtr)
1010  XLogFlush(local_lsn);
1011 
1012  return remote_lsn;
1013 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2860
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
LWLock lock
Definition: origin.c:132
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ replorigin_identify()

const char* replorigin_identify ( uint8  info)

Definition at line 52 of file replorigindesc.c.

References XLOG_REPLORIGIN_DROP, and XLOG_REPLORIGIN_SET.

53 {
54  switch (info)
55  {
57  return "SET";
59  return "DROP";
60  default:
61  return NULL;
62  }
63 }
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
long info
Definition: regguts.h:467

◆ replorigin_redo()

void replorigin_redo ( XLogReaderState record)

Definition at line 795 of file origin.c.

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, max_replication_slots, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, ReplicationState::remote_lsn, replorigin_advance(), ReplicationState::roident, XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

796 {
797  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
798 
799  switch (info)
800  {
801  case XLOG_REPLORIGIN_SET:
802  {
803  xl_replorigin_set *xlrec =
804  (xl_replorigin_set *) XLogRecGetData(record);
805 
807  xlrec->remote_lsn, record->EndRecPtr,
808  xlrec->force /* backward */ ,
809  false /* WAL log */ );
810  break;
811  }
813  {
814  xl_replorigin_drop *xlrec;
815  int i;
816 
817  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
818 
819  for (i = 0; i < max_replication_slots; i++)
820  {
822 
823  /* found our slot */
824  if (state->roident == xlrec->node_id)
825  {
826  /* reset entry */
827  state->roident = InvalidRepOriginId;
828  state->remote_lsn = InvalidXLogRecPtr;
829  state->local_lsn = InvalidXLogRecPtr;
830  break;
831  }
832  }
833  break;
834  }
835  default:
836  elog(PANIC, "replorigin_redo: unknown op code %u", info);
837  }
838 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
unsigned char uint8
Definition: c.h:427
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:856
#define PANIC
Definition: elog.h:55
RepOriginId roident
Definition: origin.c:105
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
RepOriginId node_id
Definition: origin.h:27
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
#define InvalidRepOriginId
Definition: origin.h:33
long info
Definition: regguts.h:467
#define elog(elevel,...)
Definition: elog.h:228
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ replorigin_session_advance()

void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1169 of file origin.c.

References Assert, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and ReplicationState::roident.

Referenced by EndPrepare(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

1170 {
1173 
1175  if (session_replication_state->local_lsn < local_commit)
1176  session_replication_state->local_lsn = local_commit;
1177  if (session_replication_state->remote_lsn < remote_commit)
1178  session_replication_state->remote_lsn = remote_commit;
1180 }
static ReplicationState * session_replication_state
Definition: origin.c:177
XLogRecPtr local_lsn
Definition: origin.c:117
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
LWLock lock
Definition: origin.c:132
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:792
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
#define InvalidRepOriginId
Definition: origin.h:33

◆ replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush)

Definition at line 1187 of file origin.c.

References Assert, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, and XLogFlush().

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_progress().

1188 {
1189  XLogRecPtr remote_lsn;
1190  XLogRecPtr local_lsn;
1191 
1193 
1195  remote_lsn = session_replication_state->remote_lsn;
1196  local_lsn = session_replication_state->local_lsn;
1198 
1199  if (flush && local_lsn != InvalidXLogRecPtr)
1200  XLogFlush(local_lsn);
1201 
1202  return remote_lsn;
1203 }
static ReplicationState * session_replication_state
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr local_lsn
Definition: origin.c:117
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2860
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
LWLock lock
Definition: origin.c:132
XLogRecPtr remote_lsn
Definition: origin.c:110
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:792
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206

◆ replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1140 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, and ReplicationState::origin_cv.

Referenced by pg_replication_origin_session_reset().

1141 {
1142  ConditionVariable *cv;
1143 
1145 
1146  if (session_replication_state == NULL)
1147  ereport(ERROR,
1148  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1149  errmsg("no replication origin is configured")));
1150 
1151  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1152 
1156 
1157  LWLockRelease(ReplicationOriginLock);
1158 
1160 }
static ReplicationState * session_replication_state
Definition: origin.c:177
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:696
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
#define ERROR
Definition: elog.h:45
#define ereport(elevel,...)
Definition: elog.h:155
int max_replication_slots
Definition: slot.c:99
#define Assert(condition)
Definition: c.h:792
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
int errmsg(const char *fmt,...)
Definition: elog.c:907

◆ replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node)

Definition at line 1052 of file origin.c.

References ReplicationState::acquired_by, Assert, ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_replication_slots, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, ReplicationOriginExitCleanup(), and ReplicationState::roident.

Referenced by ApplyWorkerMain(), and pg_replication_origin_session_setup().

1053 {
1054  static bool registered_cleanup;
1055  int i;
1056  int free_slot = -1;
1057 
1058  if (!registered_cleanup)
1059  {
1061  registered_cleanup = true;
1062  }
1063 
1065 
1066  if (session_replication_state != NULL)
1067  ereport(ERROR,
1068  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1069  errmsg("cannot setup replication origin when one is already setup")));
1070 
1071  /* Lock exclusively, as we may have to create a new table entry. */
1072  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1073 
1074  /*
1075  * Search for either an existing slot for the origin, or a free one we can
1076  * use.
1077  */
1078  for (i = 0; i < max_replication_slots; i++)
1079  {
1080  ReplicationState *curstate = &replication_states[i];
1081 
1082  /* remember where to insert if necessary */
1083  if (curstate->roident == InvalidRepOriginId &&
1084  free_slot == -1)
1085  {
1086  free_slot = i;
1087  continue;
1088  }
1089 
1090  /* not our slot */
1091  if (curstate->roident != node)
1092  continue;
1093 
1094  else if (curstate->acquired_by != 0)
1095  {
1096  ereport(ERROR,
1097  (errcode(ERRCODE_OBJECT_IN_USE),
1098  errmsg("replication origin with OID %d is already active for PID %d",
1099  curstate->roident, curstate->acquired_by)));
1100  }
1101 
1102  /* ok, found slot */
1103  session_replication_state = curstate;
1104  }
1105 
1106 
1107  if (session_replication_state == NULL && free_slot == -1)
1108  ereport(ERROR,
1109  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1110  errmsg("could not find free replication state slot for replication origin with OID %u",
1111  node),
1112  errhint("Increase max_replication_slots and try again.")));
1113  else if (session_replication_state == NULL)
1114  {
1115  /* initialize new slot */
1120  }
1121 
1122 
1124 
1126 
1127  LWLockRelease(ReplicationOriginLock);
1128 
1129  /* probably this one is pointless */
1131 }
static ReplicationState * session_replication_state
Definition: origin.c:177
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:41
int errhint(const char *fmt,...)
Definition: elog.c:1154
XLogRecPtr local_lsn
Definition: origin.c:117
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1020
void ConditionVariableBroadcast(ConditionVariable *cv)
int errcode(int sqlerrcode)
Definition: elog.c:696
RepOriginId roident
Definition: origin.c:105
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1810
#define ERROR
Definition: elog.h:45
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
#define ereport(elevel,...)
Definition: elog.h:155
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:792
ConditionVariable origin_cv
Definition: origin.c:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
#define InvalidRepOriginId
Definition: origin.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:907
int i
static ReplicationState * replication_states
Definition: origin.c:165

◆ StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 666 of file origin.c.

References Assert, CloseTransientFile(), COMP_CRC32C, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, max_replication_slots, OpenTransientFile(), PANIC, PG_BINARY, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

667 {
668  const char *path = "pg_logical/replorigin_checkpoint";
669  int fd;
670  int readBytes;
672  int last_state = 0;
673  pg_crc32c file_crc;
674  pg_crc32c crc;
675 
676  /* don't want to overwrite already existing state */
677 #ifdef USE_ASSERT_CHECKING
678  static bool already_started = false;
679 
680  Assert(!already_started);
681  already_started = true;
682 #endif
683 
684  if (max_replication_slots == 0)
685  return;
686 
687  INIT_CRC32C(crc);
688 
689  elog(DEBUG2, "starting up replication origin progress state");
690 
691  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
692 
693  /*
694  * might have had max_replication_slots == 0 last run, or we just brought
695  * up a standby.
696  */
697  if (fd < 0 && errno == ENOENT)
698  return;
699  else if (fd < 0)
700  ereport(PANIC,
702  errmsg("could not open file \"%s\": %m",
703  path)));
704 
705  /* verify magic, that is written even if nothing was active */
706  readBytes = read(fd, &magic, sizeof(magic));
707  if (readBytes != sizeof(magic))
708  {
709  if (readBytes < 0)
710  ereport(PANIC,
712  errmsg("could not read file \"%s\": %m",
713  path)));
714  else
715  ereport(PANIC,
717  errmsg("could not read file \"%s\": read %d of %zu",
718  path, readBytes, sizeof(magic))));
719  }
720  COMP_CRC32C(crc, &magic, sizeof(magic));
721 
722  if (magic != REPLICATION_STATE_MAGIC)
723  ereport(PANIC,
724  (errmsg("replication checkpoint has wrong magic %u instead of %u",
725  magic, REPLICATION_STATE_MAGIC)));
726 
727  /* we can skip locking here, no other access is possible */
728 
729  /* recover individual states, until there are no more to be found */
730  while (true)
731  {
732  ReplicationStateOnDisk disk_state;
733 
734  readBytes = read(fd, &disk_state, sizeof(disk_state));
735 
736  /* no further data */
737  if (readBytes == sizeof(crc))
738  {
739  /* not pretty, but simple ... */
740  file_crc = *(pg_crc32c *) &disk_state;
741  break;
742  }
743 
744  if (readBytes < 0)
745  {
746  ereport(PANIC,
748  errmsg("could not read file \"%s\": %m",
749  path)));
750  }
751 
752  if (readBytes != sizeof(disk_state))
753  {
754  ereport(PANIC,
756  errmsg("could not read file \"%s\": read %d of %zu",
757  path, readBytes, sizeof(disk_state))));
758  }
759 
760  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
761 
762  if (last_state == max_replication_slots)
763  ereport(PANIC,
764  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
765  errmsg("could not find free replication state, increase max_replication_slots")));
766 
767  /* copy data to shared memory */
768  replication_states[last_state].roident = disk_state.roident;
769  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
770  last_state++;
771 
772  ereport(LOG,
773  (errmsg("recovered replication state of node %u to %X/%X",
774  disk_state.roident,
775  (uint32) (disk_state.remote_lsn >> 32),
776  (uint32) disk_state.remote_lsn)));
777  }
778 
779  /* now check checksum */
780  FIN_CRC32C(crc);
781  if (file_crc != crc)
782  ereport(PANIC,
783  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
784  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
785  crc, file_crc)));
786 
787  if (CloseTransientFile(fd) != 0)
788  ereport(PANIC,
790  errmsg("could not close file \"%s\": %m",
791  path)));
792 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr remote_lsn
Definition: origin.c:141
uint32 pg_crc32c
Definition: pg_crc32c.h:38
RepOriginId roident
Definition: origin.c:140
int errcode(int sqlerrcode)
Definition: elog.c:696
#define LOG
Definition: elog.h:26
#define PANIC
Definition: elog.h:55
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1259
RepOriginId roident
Definition: origin.c:105
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2404
int magic
Definition: regguts.h:464
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:719
unsigned int uint32
Definition: c.h:429
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2581
#define REPLICATION_STATE_MAGIC
Definition: origin.c:180
#define ereport(elevel,...)
Definition: elog.h:155
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:110
#define Assert(condition)
Definition: c.h:792
int errmsg(const char *fmt,...)
Definition: elog.c:907
#define elog(elevel,...)
Definition: elog.h:228
static ReplicationState * replication_states
Definition: origin.c:165
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
#define read(a, b, c)
Definition: win32.h:13

Variable Documentation

◆ replorigin_session_origin

◆ replorigin_session_origin_lsn

◆ replorigin_session_origin_timestamp