PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 

Functions

static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static bool ensure_transaction (void)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
 
static void slot_modify_cstrings (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void logicalrep_worker_sighup (SIGNAL_ARGS)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static volatile sig_atomic_t got_SIGHUP = false
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 79 of file worker.c.

Referenced by LogicalRepApplyLoop().

Typedef Documentation

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SlotErrCallbackArg

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 964 of file worker.c.

References generate_unaccent_rules::action, apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, and pq_getmsgbyte().

Referenced by LogicalRepApplyLoop().

965 {
966  char action = pq_getmsgbyte(s);
967 
968  switch (action)
969  {
970  /* BEGIN */
971  case 'B':
973  break;
974  /* COMMIT */
975  case 'C':
977  break;
978  /* INSERT */
979  case 'I':
981  break;
982  /* UPDATE */
983  case 'U':
985  break;
986  /* DELETE */
987  case 'D':
989  break;
990  /* TRUNCATE */
991  case 'T':
993  break;
994  /* RELATION */
995  case 'R':
997  break;
998  /* TYPE */
999  case 'Y':
1000  apply_handle_type(s);
1001  break;
1002  /* ORIGIN */
1003  case 'O':
1005  break;
1006  default:
1007  ereport(ERROR,
1008  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1009  errmsg("invalid logical replication message type \"%c\"", action)));
1010  }
1011 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:554
static void apply_handle_insert(StringInfo s)
Definition: worker.c:584
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
static void apply_handle_delete(StringInfo s)
Definition: worker.c:803
static void apply_handle_begin(StringInfo s)
Definition: worker.c:454
#define ereport(elevel, rest)
Definition: elog.h:141
static void apply_handle_commit(StringInfo s)
Definition: worker.c:473
static void apply_handle_update(StringInfo s)
Definition: worker.c:681
static void apply_handle_relation(StringInfo s)
Definition: worker.c:539
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_handle_origin(StringInfo s)
Definition: worker.c:517
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:904

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 454 of file worker.c.

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), pgstat_report_activity(), remote_final_lsn, and STATE_RUNNING.

Referenced by apply_dispatch().

455 {
456  LogicalRepBeginData begin_data;
457 
458  logicalrep_read_begin(s, &begin_data);
459 
460  remote_final_lsn = begin_data.final_lsn;
461 
462  in_remote_transaction = true;
463 
465 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:106
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3119
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
bool in_remote_transaction
Definition: worker.c:105
XLogRecPtr final_lsn
Definition: logicalproto.h:66

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 473 of file worker.c.

References AcceptInvalidationMessages(), am_tablesync_worker(), Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), logicalrep_read_commit(), maybe_reread_subscription(), pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), remote_final_lsn, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, STATE_IDLE, and store_flush_position().

Referenced by apply_dispatch().

474 {
475  LogicalRepCommitData commit_data;
476 
477  logicalrep_read_commit(s, &commit_data);
478 
479  Assert(commit_data.commit_lsn == remote_final_lsn);
480 
481  /* The synchronization worker runs in single transaction. */
483  {
484  /*
485  * Update origin state so we can restart streaming from correct
486  * position in case of crash.
487  */
490 
492  pgstat_report_stat(false);
493 
494  store_flush_position(commit_data.end_lsn);
495  }
496  else
497  {
498  /* Process any invalidation messages that might have accumulated. */
501  }
502 
503  in_remote_transaction = false;
504 
505  /* Process any tables that are being synchronized in parallel. */
506  process_syncing_tables(commit_data.end_lsn);
507 
509 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1071
void AcceptInvalidationMessages(void)
Definition: inval.c:681
static XLogRecPtr remote_final_lsn
Definition: worker.c:106
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3119
void CommitTransactionCommand(void)
Definition: xact.c:2898
bool in_remote_transaction
Definition: worker.c:105
static bool am_tablesync_worker(void)
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:153
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:154
static void maybe_reread_subscription(void)
Definition: worker.c:1425
#define Assert(condition)
Definition: c.h:739
bool IsTransactionState(void)
Definition: xact.c:355
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
XLogRecPtr commit_lsn
Definition: logicalproto.h:73
TimestampTz committime
Definition: logicalproto.h:75
void pgstat_report_stat(bool force)
Definition: pgstat.c:811

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 803 of file worker.c.

References AfterTriggerEndQuery(), Assert, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationDelete(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_cstrings(), table_slot_create(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

804 {
806  LogicalRepTupleData oldtup;
807  LogicalRepRelId relid;
808  Oid idxoid;
809  EState *estate;
810  EPQState epqstate;
811  TupleTableSlot *remoteslot;
812  TupleTableSlot *localslot;
813  bool found;
814  MemoryContext oldctx;
815 
817 
818  relid = logicalrep_read_delete(s, &oldtup);
821  {
822  /*
823  * The relation can't become interesting in the middle of the
824  * transaction so it's safe to unlock it.
825  */
827  return;
828  }
829 
830  /* Check if we can do the delete. */
832 
833  /* Initialize the executor state. */
834  estate = create_estate_for_relation(rel);
835  remoteslot = ExecInitExtraTupleSlot(estate,
837  &TTSOpsVirtual);
838  localslot = table_slot_create(rel->localrel,
839  &estate->es_tupleTable);
840  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
841 
844 
845  /* Find the tuple using the replica identity index. */
847  slot_store_cstrings(remoteslot, rel, oldtup.values);
848  MemoryContextSwitchTo(oldctx);
849 
850  /*
851  * Try to find tuple using either replica identity index, primary key or
852  * if needed, sequential scan.
853  */
854  idxoid = GetRelationIdentityOrPK(rel->localrel);
855  Assert(OidIsValid(idxoid) ||
856  (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
857 
858  if (OidIsValid(idxoid))
859  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
861  remoteslot, localslot);
862  else
864  remoteslot, localslot);
865  /* If found delete it. */
866  if (found)
867  {
868  EvalPlanQualSetSlot(&epqstate, localslot);
869 
870  /* Do the actual delete. */
871  ExecSimpleRelationDelete(estate, &epqstate, localslot);
872  }
873  else
874  {
875  /* The tuple to be deleted could not be found. */
876  elog(DEBUG1,
877  "logical replication could not find row for delete "
878  "in replication target relation \"%s\"",
880  }
881 
882  /* Cleanup. */
885 
886  /* Handle queued AFTER triggers. */
887  AfterTriggerEndQuery(estate);
888 
889  EvalPlanQualEnd(&epqstate);
890  ExecResetTupleTable(estate->es_tupleTable, false);
891  FreeExecutorState(estate);
892 
894 
896 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
#define NIL
Definition: pg_list.h:65
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:176
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:568
#define DEBUG1
Definition: elog.h:25
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:280
#define RelationGetDescr(relation)
Definition: rel.h:448
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:369
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:645
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2944
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:147
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:456
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:647
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2484
List * es_tupleTable
Definition: execnodes.h:551
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define Assert(condition)
Definition: c.h:739
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4812
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:228
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 584 of file worker.c.

References AfterTriggerEndQuery(), CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationInsert(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_cstrings(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

585 {
587  LogicalRepTupleData newtup;
588  LogicalRepRelId relid;
589  EState *estate;
590  TupleTableSlot *remoteslot;
591  MemoryContext oldctx;
592 
594 
595  relid = logicalrep_read_insert(s, &newtup);
598  {
599  /*
600  * The relation can't become interesting in the middle of the
601  * transaction so it's safe to unlock it.
602  */
604  return;
605  }
606 
607  /* Initialize the executor state. */
608  estate = create_estate_for_relation(rel);
609  remoteslot = ExecInitExtraTupleSlot(estate,
611  &TTSOpsVirtual);
612 
613  /* Input functions may need an active snapshot, so get one */
615 
616  /* Process and store remote tuple in the slot */
618  slot_store_cstrings(remoteslot, rel, newtup.values);
619  slot_fill_defaults(rel, estate, remoteslot);
620  MemoryContextSwitchTo(oldctx);
621 
623 
624  /* Do the insert. */
625  ExecSimpleRelationInsert(estate, remoteslot);
626 
627  /* Cleanup. */
630 
631  /* Handle queued AFTER triggers. */
632  AfterTriggerEndQuery(estate);
633 
634  ExecResetTupleTable(estate->es_tupleTable, false);
635  FreeExecutorState(estate);
636 
638 
640 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:176
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define RelationGetDescr(relation)
Definition: rel.h:448
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:369
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:214
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:147
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * es_tupleTable
Definition: execnodes.h:551
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1005
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4812
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
uint32 LogicalRepRelId
Definition: logicalproto.h:39
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 517 of file worker.c.

References am_tablesync_worker(), ereport, errcode(), errmsg(), ERROR, in_remote_transaction, and IsTransactionState().

Referenced by apply_dispatch().

518 {
519  /*
520  * ORIGIN message can only come inside remote transaction and before any
521  * actual writes.
522  */
523  if (!in_remote_transaction ||
525  ereport(ERROR,
526  (errcode(ERRCODE_PROTOCOL_VIOLATION),
527  errmsg("ORIGIN message sent out of order")));
528 }
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
bool in_remote_transaction
Definition: worker.c:105
static bool am_tablesync_worker(void)
#define ereport(elevel, rest)
Definition: elog.h:141
bool IsTransactionState(void)
Definition: xact.c:355
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 539 of file worker.c.

References logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

540 {
541  LogicalRepRelation *rel;
542 
543  rel = logicalrep_read_rel(s);
545 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:379
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:154

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 904 of file worker.c.

References CommandCounterIncrement(), DROP_RESTRICT, ensure_transaction(), ExecuteTruncateGuts(), lappend(), lappend_oid(), lfirst, lfirst_oid, LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationIsLogicallyLogged, RowExclusiveLock, and should_apply_changes_for_rel().

Referenced by apply_dispatch().

905 {
906  bool cascade = false;
907  bool restart_seqs = false;
908  List *remote_relids = NIL;
909  List *remote_rels = NIL;
910  List *rels = NIL;
911  List *relids = NIL;
912  List *relids_logged = NIL;
913  ListCell *lc;
914 
916 
917  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
918 
919  foreach(lc, remote_relids)
920  {
921  LogicalRepRelId relid = lfirst_oid(lc);
923 
926  {
927  /*
928  * The relation can't become interesting in the middle of the
929  * transaction so it's safe to unlock it.
930  */
932  continue;
933  }
934 
935  remote_rels = lappend(remote_rels, rel);
936  rels = lappend(rels, rel->localrel);
937  relids = lappend_oid(relids, rel->localreloid);
939  relids_logged = lappend_oid(relids_logged, rel->localreloid);
940  }
941 
942  /*
943  * Even if we used CASCADE on the upstream master we explicitly default to
944  * replaying changes without further cascading. This might be later
945  * changeable with a user specified option.
946  */
947  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
948 
949  foreach(lc, remote_rels)
950  {
951  LogicalRepRelMapEntry *rel = lfirst(lc);
952 
954  }
955 
957 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
#define NIL
Definition: pg_list.h:65
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:329
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:369
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:594
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
static bool ensure_transaction(void)
Definition: worker.c:147
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1620
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * lappend(List *list, void *datum)
Definition: list.c:322
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define lfirst(lc)
Definition: pg_list.h:190
Definition: pg_list.h:50
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 554 of file worker.c.

References logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

555 {
556  LogicalRepTyp typ;
557 
558  logicalrep_read_typ(s, &typ);
560 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:431
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:389

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 681 of file worker.c.

References AfterTriggerEndQuery(), Assert, LogicalRepTupleData::changed, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationUpdate(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_modify_cstrings(), slot_store_cstrings(), table_slot_create(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

682 {
684  LogicalRepRelId relid;
685  Oid idxoid;
686  EState *estate;
687  EPQState epqstate;
688  LogicalRepTupleData oldtup;
689  LogicalRepTupleData newtup;
690  bool has_oldtup;
691  TupleTableSlot *localslot;
692  TupleTableSlot *remoteslot;
693  bool found;
694  MemoryContext oldctx;
695 
697 
698  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
699  &newtup);
702  {
703  /*
704  * The relation can't become interesting in the middle of the
705  * transaction so it's safe to unlock it.
706  */
708  return;
709  }
710 
711  /* Check if we can do the update. */
713 
714  /* Initialize the executor state. */
715  estate = create_estate_for_relation(rel);
716  remoteslot = ExecInitExtraTupleSlot(estate,
718  &TTSOpsVirtual);
719  localslot = table_slot_create(rel->localrel,
720  &estate->es_tupleTable);
721  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
722 
725 
726  /* Build the search tuple. */
728  slot_store_cstrings(remoteslot, rel,
729  has_oldtup ? oldtup.values : newtup.values);
730  MemoryContextSwitchTo(oldctx);
731 
732  /*
733  * Try to find tuple using either replica identity index, primary key or
734  * if needed, sequential scan.
735  */
736  idxoid = GetRelationIdentityOrPK(rel->localrel);
737  Assert(OidIsValid(idxoid) ||
738  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
739 
740  if (OidIsValid(idxoid))
741  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
743  remoteslot, localslot);
744  else
746  remoteslot, localslot);
747 
748  ExecClearTuple(remoteslot);
749 
750  /*
751  * Tuple found.
752  *
753  * Note this will fail if there are other conflicting unique indexes.
754  */
755  if (found)
756  {
757  /* Process and store remote tuple in the slot */
759  slot_modify_cstrings(remoteslot, localslot, rel,
760  newtup.values, newtup.changed);
761  MemoryContextSwitchTo(oldctx);
762 
763  EvalPlanQualSetSlot(&epqstate, remoteslot);
764 
765  /* Do the actual update. */
766  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
767  }
768  else
769  {
770  /*
771  * The tuple to be updated could not be found.
772  *
773  * TODO what to do here, change the log level to LOG perhaps?
774  */
775  elog(DEBUG1,
776  "logical replication did not find row for update "
777  "in replication target relation \"%s\"",
779  }
780 
781  /* Cleanup. */
784 
785  /* Handle queued AFTER triggers. */
786  AfterTriggerEndQuery(estate);
787 
788  EvalPlanQualEnd(&epqstate);
789  ExecResetTupleTable(estate->es_tupleTable, false);
790  FreeExecutorState(estate);
791 
793 
795 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
#define NIL
Definition: pg_list.h:65
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:176
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:568
#define DEBUG1
Definition: elog.h:25
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define RelationGetDescr(relation)
Definition: rel.h:448
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:369
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:645
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2944
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:147
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:456
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:647
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2484
List * es_tupleTable
Definition: execnodes.h:551
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define Assert(condition)
Definition: c.h:739
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:214
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4812
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:228
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:377
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1582 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sighup(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

1583 {
1584  int worker_slot = DatumGetInt32(main_arg);
1585  MemoryContext oldctx;
1586  char originname[NAMEDATALEN];
1587  XLogRecPtr origin_startpos;
1588  char *myslotname;
1590 
1591  /* Attach to slot */
1592  logicalrep_worker_attach(worker_slot);
1593 
1594  /* Setup signal handling */
1596  pqsignal(SIGTERM, die);
1598 
1599  /*
1600  * We don't currently need any ResourceOwner in a walreceiver process, but
1601  * if we did, we could call CreateAuxProcessResourceOwner here.
1602  */
1603 
1604  /* Initialise stats to a sanish value */
1607 
1608  /* Load the libpq-specific functions */
1609  load_file("libpqwalreceiver", false);
1610 
1611  /* Run as replica session replication role. */
1612  SetConfigOption("session_replication_role", "replica",
1614 
1615  /* Connect to our database. */
1618  0);
1619 
1620  /* Load the subscription into persistent memory context. */
1622  "ApplyContext",
1626 
1628  if (!MySubscription)
1629  {
1630  ereport(LOG,
1631  (errmsg("logical replication apply worker for subscription %u will not "
1632  "start because the subscription was removed during startup",
1634  proc_exit(0);
1635  }
1636 
1637  MySubscriptionValid = true;
1638  MemoryContextSwitchTo(oldctx);
1639 
1640  if (!MySubscription->enabled)
1641  {
1642  ereport(LOG,
1643  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1644  "start because the subscription was disabled during startup",
1645  MySubscription->name)));
1646 
1647  proc_exit(0);
1648  }
1649 
1650  /* Setup synchronous commit according to the user's wishes */
1651  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1653 
1654  /* Keep us informed about subscription changes. */
1657  (Datum) 0);
1658 
1659  if (am_tablesync_worker())
1660  ereport(LOG,
1661  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1663  else
1664  ereport(LOG,
1665  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1666  MySubscription->name)));
1667 
1669 
1670  /* Connect to the origin and start the replication. */
1671  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1673 
1674  if (am_tablesync_worker())
1675  {
1676  char *syncslotname;
1677 
1678  /* This is table synchronization worker, call initial sync. */
1679  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1680 
1681  /* The slot name needs to be allocated in permanent memory context. */
1683  myslotname = pstrdup(syncslotname);
1684  MemoryContextSwitchTo(oldctx);
1685 
1686  pfree(syncslotname);
1687  }
1688  else
1689  {
1690  /* This is main apply worker */
1691  RepOriginId originid;
1692  TimeLineID startpointTLI;
1693  char *err;
1694 
1695  myslotname = MySubscription->slotname;
1696 
1697  /*
1698  * This shouldn't happen if the subscription is enabled, but guard
1699  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1700  * crash if slot is NULL.)
1701  */
1702  if (!myslotname)
1703  ereport(ERROR,
1704  (errmsg("subscription has no replication slot set")));
1705 
1706  /* Setup replication origin tracking. */
1708  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1709  originid = replorigin_by_name(originname, true);
1710  if (!OidIsValid(originid))
1711  originid = replorigin_create(originname);
1712  replorigin_session_setup(originid);
1713  replorigin_session_origin = originid;
1714  origin_startpos = replorigin_session_get_progress(false);
1716 
1718  &err);
1719  if (wrconn == NULL)
1720  ereport(ERROR,
1721  (errmsg("could not connect to the publisher: %s", err)));
1722 
1723  /*
1724  * We don't really use the output identify_system for anything but it
1725  * does some initializations on the upstream so let's still call it.
1726  */
1727  (void) walrcv_identify_system(wrconn, &startpointTLI);
1728 
1729  }
1730 
1731  /*
1732  * Setup callback for syscache so that we know when something changes in
1733  * the subscription relation state.
1734  */
1737  (Datum) 0);
1738 
1739  /* Build logical replication streaming options. */
1740  options.logical = true;
1741  options.startpoint = origin_startpos;
1742  options.slotname = myslotname;
1743  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1744  options.proto.logical.publication_names = MySubscription->publications;
1745 
1746  /* Start normal logical streaming replication. */
1747  walrcv_startstreaming(wrconn, &options);
1748 
1749  /* Run the main loop. */
1750  LogicalRepApplyLoop(origin_startpos);
1751 
1752  proc_exit(0);
1753 }
Subscription * MySubscription
Definition: worker.c:102
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:100
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:52
#define DatumGetInt32(X)
Definition: postgres.h:472
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:263
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2898
union WalRcvStreamOptions::@106 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:269
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1188
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1053
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:798
#define OidIsValid(objectId)
Definition: c.h:645
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1561
void pfree(void *pointer)
Definition: mcxt.c:1056
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
void logicalrep_worker_attach(int slot)
Definition: launcher.c:627
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7550
#define SIGHUP
Definition: win32_port.h:154
XLogRecPtr startpoint
Definition: walreceiver.h:153
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:239
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext TopMemoryContext
Definition: mcxt.c:44
MemoryContext ApplyContext
Definition: worker.c:98
static char ** options
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1568
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
uintptr_t Datum
Definition: postgres.h:367
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5708
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:152
void StartTransactionCommand(void)
Definition: xact.c:2797
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
bool MySubscriptionValid
Definition: worker.c:103
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1106
#define elog(elevel,...)
Definition: elog.h:228
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define snprintf
Definition: port.h:192
#define die(msg)
Definition: pg_test_fsync.c:96
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:256
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5737
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 647 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

648 {
649  /* Updatable, no error. */
650  if (rel->updatable)
651  return;
652 
653  /*
654  * We are in error mode so it's fine this is somewhat slow. It's better to
655  * give user correct error.
656  */
658  {
659  ereport(ERROR,
660  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
661  errmsg("publisher did not send replica identity column "
662  "expected by the logical replication target relation \"%s.%s\"",
663  rel->remoterel.nspname, rel->remoterel.relname)));
664  }
665 
666  ereport(ERROR,
667  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
668  errmsg("logical replication target relation \"%s.%s\" has "
669  "neither REPLICA IDENTITY index nor PRIMARY "
670  "KEY and published relation does not have "
671  "REPLICA IDENTITY FULL",
672  rel->remoterel.nspname, rel->remoterel.relname)));
673 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:568
int errcode(int sqlerrcode)
Definition: elog.c:608
#define OidIsValid(objectId)
Definition: c.h:645
#define ERROR
Definition: elog.h:43
LogicalRepRelation remoterel
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ create_estate_for_relation()

static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 176 of file worker.c.

References AccessShareLock, AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_num_result_relations, EState::es_output_cid, EState::es_result_relation_info, EState::es_result_relations, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

177 {
178  EState *estate;
179  ResultRelInfo *resultRelInfo;
180  RangeTblEntry *rte;
181 
182  estate = CreateExecutorState();
183 
184  rte = makeNode(RangeTblEntry);
185  rte->rtekind = RTE_RELATION;
186  rte->relid = RelationGetRelid(rel->localrel);
187  rte->relkind = rel->localrel->rd_rel->relkind;
189  ExecInitRangeTable(estate, list_make1(rte));
190 
191  resultRelInfo = makeNode(ResultRelInfo);
192  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
193 
194  estate->es_result_relations = resultRelInfo;
195  estate->es_num_result_relations = 1;
196  estate->es_result_relation_info = resultRelInfo;
197 
198  estate->es_output_cid = GetCurrentCommandId(true);
199 
200  /* Prepare to catch AFTER triggers. */
202 
203  return estate;
204 }
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:724
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
CommandId es_output_cid
Definition: execnodes.h:516
#define AccessShareLock
Definition: lockdefs.h:36
Form_pg_class rd_rel
Definition: rel.h:83
#define list_make1(x1)
Definition: pg_list.h:227
ResultRelInfo * es_result_relations
Definition: execnodes.h:519
EState * CreateExecutorState(void)
Definition: execUtils.c:88
int es_num_result_relations
Definition: execnodes.h:520
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4792
#define makeNode(_type_)
Definition: nodes.h:573
RTEKind rtekind
Definition: parsenodes.h:974
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:745
#define RelationGetRelid(relation)
Definition: rel.h:422
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ ensure_transaction()

static bool ensure_transaction ( void  )
static

Definition at line 147 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), maybe_reread_subscription(), MemoryContextSwitchTo(), SetCurrentStatementStartTimestamp(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

148 {
149  if (IsTransactionState())
150  {
152 
155 
156  return false;
157  }
158 
161 
163 
165  return true;
166 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void maybe_reread_subscription(void)
Definition: worker.c:1425
static MemoryContext ApplyMessageContext
Definition: worker.c:97
void StartTransactionCommand(void)
Definition: xact.c:2797
bool IsTransactionState(void)
Definition: xact.c:355
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:817

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 1027 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, FlushPosition::node, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

1029 {
1030  dlist_mutable_iter iter;
1031  XLogRecPtr local_flush = GetFlushRecPtr();
1032 
1034  *flush = InvalidXLogRecPtr;
1035 
1037  {
1038  FlushPosition *pos =
1039  dlist_container(FlushPosition, node, iter.cur);
1040 
1041  *write = pos->remote_end;
1042 
1043  if (pos->local_end <= local_flush)
1044  {
1045  *flush = pos->remote_end;
1046  dlist_delete(iter.cur);
1047  pfree(pos);
1048  }
1049  else
1050  {
1051  /*
1052  * Don't want to uselessly iterate over the rest of the list which
1053  * could potentially be long. Instead get the last element and
1054  * grab the write position from there.
1055  */
1056  pos = dlist_tail_element(FlushPosition, node,
1057  &lsn_mapping);
1058  *write = pos->remote_end;
1059  *have_pending_txes = true;
1060  return;
1061  }
1062  }
1063 
1064  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1065 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:88
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:85
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8267
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:84
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 568 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by apply_handle_delete(), apply_handle_update(), and check_relation_updatable().

569 {
570  Oid idxoid;
571 
572  idxoid = RelationGetReplicaIndex(rel);
573 
574  if (!OidIsValid(idxoid))
575  idxoid = RelationGetPrimaryKeyIndex(rel);
576 
577  return idxoid;
578 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4556
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:645
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4535

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 1759 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

1760 {
1761  return MyLogicalRepWorker != NULL;
1762 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56

◆ logicalrep_worker_sighup()

static void logicalrep_worker_sighup ( SIGNAL_ARGS  )
static

Definition at line 1568 of file worker.c.

References got_SIGHUP, MyLatch, and SetLatch().

Referenced by ApplyWorkerMain().

1569 {
1570  int save_errno = errno;
1571 
1572  got_SIGHUP = true;
1573 
1574  /* Waken anything waiting on the process latch */
1575  SetLatch(MyLatch);
1576 
1577  errno = save_errno;
1578 }
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:115

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 1106 of file worker.c.

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), got_SIGHUP, in_remote_transaction, StringInfoData::len, LOG, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

1107 {
1108  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1109 
1110  /*
1111  * Init the ApplyMessageContext which we clean up after each replication
1112  * protocol message.
1113  */
1115  "ApplyMessageContext",
1117 
1118  /* mark as idle, before starting to loop */
1120 
1121  for (;;)
1122  {
1124  int rc;
1125  int len;
1126  char *buf = NULL;
1127  bool endofstream = false;
1128  bool ping_sent = false;
1129  long wait_time;
1130 
1132 
1134 
1135  len = walrcv_receive(wrconn, &buf, &fd);
1136 
1137  if (len != 0)
1138  {
1139  /* Process the data */
1140  for (;;)
1141  {
1143 
1144  if (len == 0)
1145  {
1146  break;
1147  }
1148  else if (len < 0)
1149  {
1150  ereport(LOG,
1151  (errmsg("data stream from publisher has ended")));
1152  endofstream = true;
1153  break;
1154  }
1155  else
1156  {
1157  int c;
1158  StringInfoData s;
1159 
1160  /* Reset timeout. */
1161  last_recv_timestamp = GetCurrentTimestamp();
1162  ping_sent = false;
1163 
1164  /* Ensure we are reading the data into our memory context. */
1166 
1167  s.data = buf;
1168  s.len = len;
1169  s.cursor = 0;
1170  s.maxlen = -1;
1171 
1172  c = pq_getmsgbyte(&s);
1173 
1174  if (c == 'w')
1175  {
1176  XLogRecPtr start_lsn;
1177  XLogRecPtr end_lsn;
1178  TimestampTz send_time;
1179 
1180  start_lsn = pq_getmsgint64(&s);
1181  end_lsn = pq_getmsgint64(&s);
1182  send_time = pq_getmsgint64(&s);
1183 
1184  if (last_received < start_lsn)
1185  last_received = start_lsn;
1186 
1187  if (last_received < end_lsn)
1188  last_received = end_lsn;
1189 
1190  UpdateWorkerStats(last_received, send_time, false);
1191 
1192  apply_dispatch(&s);
1193  }
1194  else if (c == 'k')
1195  {
1196  XLogRecPtr end_lsn;
1198  bool reply_requested;
1199 
1200  end_lsn = pq_getmsgint64(&s);
1201  timestamp = pq_getmsgint64(&s);
1202  reply_requested = pq_getmsgbyte(&s);
1203 
1204  if (last_received < end_lsn)
1205  last_received = end_lsn;
1206 
1207  send_feedback(last_received, reply_requested, false);
1208  UpdateWorkerStats(last_received, timestamp, true);
1209  }
1210  /* other message types are purposefully ignored */
1211 
1213  }
1214 
1215  len = walrcv_receive(wrconn, &buf, &fd);
1216  }
1217  }
1218 
1219  /* confirm all writes so far */
1220  send_feedback(last_received, false, false);
1221 
1222  if (!in_remote_transaction)
1223  {
1224  /*
1225  * If we didn't get any transactions for a while there might be
1226  * unconsumed invalidation messages in the queue, consume them
1227  * now.
1228  */
1231 
1232  /* Process any table synchronization changes. */
1233  process_syncing_tables(last_received);
1234  }
1235 
1236  /* Cleanup the memory. */
1239 
1240  /* Check if we need to exit the streaming loop. */
1241  if (endofstream)
1242  {
1243  TimeLineID tli;
1244 
1245  walrcv_endstreaming(wrconn, &tli);
1246  break;
1247  }
1248 
1249  /*
1250  * Wait for more data or latch. If we have unflushed transactions,
1251  * wake up after WalWriterDelay to see if they've been flushed yet (in
1252  * which case we should send a feedback message). Otherwise, there's
1253  * no particular urgency about waking up unless we get data or a
1254  * signal.
1255  */
1256  if (!dlist_is_empty(&lsn_mapping))
1257  wait_time = WalWriterDelay;
1258  else
1259  wait_time = NAPTIME_PER_CYCLE;
1260 
1264  fd, wait_time,
1266 
1267  if (rc & WL_LATCH_SET)
1268  {
1271  }
1272 
1273  if (got_SIGHUP)
1274  {
1275  got_SIGHUP = false;
1277  }
1278 
1279  if (rc & WL_TIMEOUT)
1280  {
1281  /*
1282  * We didn't receive anything new. If we haven't heard anything
1283  * from the server for more than wal_receiver_timeout / 2, ping
1284  * the server. Also, if it's been longer than
1285  * wal_receiver_status_interval since the last update we sent,
1286  * send a status update to the master anyway, to report any
1287  * progress in applying WAL.
1288  */
1289  bool requestReply = false;
1290 
1291  /*
1292  * Check if time since last receive from standby has reached the
1293  * configured limit.
1294  */
1295  if (wal_receiver_timeout > 0)
1296  {
1298  TimestampTz timeout;
1299 
1300  timeout =
1301  TimestampTzPlusMilliseconds(last_recv_timestamp,
1303 
1304  if (now >= timeout)
1305  ereport(ERROR,
1306  (errmsg("terminating logical replication worker due to timeout")));
1307 
1308  /*
1309  * We didn't receive anything new, for half of receiver
1310  * replication timeout. Ping the server.
1311  */
1312  if (!ping_sent)
1313  {
1314  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1315  (wal_receiver_timeout / 2));
1316  if (now >= timeout)
1317  {
1318  requestReply = true;
1319  ping_sent = true;
1320  }
1321  }
1322  }
1323 
1324  send_feedback(last_received, requestReply, requestReply);
1325  }
1326  }
1327 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1336
WalReceiverConn * wrconn
Definition: worker.c:100
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:271
uint32 TimeLineID
Definition: xlogdefs.h:52
void AcceptInvalidationMessages(void)
Definition: inval.c:681
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static dlist_head lsn_mapping
Definition: worker.c:88
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3119
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1090
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:273
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ResetLatch(Latch *latch)
Definition: latch.c:519
int wal_receiver_timeout
Definition: walreceiver.c:77
#define ERROR
Definition: elog.h:43
#define NAPTIME_PER_CYCLE
Definition: worker.c:79
bool in_remote_transaction
Definition: worker.c:105
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * c
static char * buf
Definition: pg_test_fsync.c:67
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
int pgsocket
Definition: port.h:31
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext TopMemoryContext
Definition: mcxt.c:44
Definition: guc.h:72
MemoryContext ApplyContext
Definition: worker.c:98
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define PGINVALID_SOCKET
Definition: port.h:33
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_dispatch(StringInfo s)
Definition: worker.c:964
static void maybe_reread_subscription(void)
Definition: worker.c:1425
static MemoryContext ApplyMessageContext
Definition: worker.c:97
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
int WalWriterDelay
Definition: walwriter.c:69
int errmsg(const char *fmt,...)
Definition: elog.c:822
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:115

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 1425 of file worker.c.

References Assert, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), LogicalRepWorker::subid, and Subscription::synccommit.

Referenced by apply_handle_commit(), ensure_transaction(), and LogicalRepApplyLoop().

1426 {
1427  MemoryContext oldctx;
1429  bool started_tx = false;
1430 
1431  /* When cache state is valid there is nothing to do here. */
1432  if (MySubscriptionValid)
1433  return;
1434 
1435  /* This function might be called inside or outside of transaction. */
1436  if (!IsTransactionState())
1437  {
1439  started_tx = true;
1440  }
1441 
1442  /* Ensure allocations in permanent context. */
1444 
1445  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1446 
1447  /*
1448  * Exit if the subscription was removed. This normally should not happen
1449  * as the worker gets killed during DROP SUBSCRIPTION.
1450  */
1451  if (!newsub)
1452  {
1453  ereport(LOG,
1454  (errmsg("logical replication apply worker for subscription \"%s\" will "
1455  "stop because the subscription was removed",
1456  MySubscription->name)));
1457 
1458  proc_exit(0);
1459  }
1460 
1461  /*
1462  * Exit if the subscription was disabled. This normally should not happen
1463  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1464  */
1465  if (!newsub->enabled)
1466  {
1467  ereport(LOG,
1468  (errmsg("logical replication apply worker for subscription \"%s\" will "
1469  "stop because the subscription was disabled",
1470  MySubscription->name)));
1471 
1472  proc_exit(0);
1473  }
1474 
1475  /*
1476  * Exit if connection string was changed. The launcher will start new
1477  * worker.
1478  */
1479  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1480  {
1481  ereport(LOG,
1482  (errmsg("logical replication apply worker for subscription \"%s\" will "
1483  "restart because the connection information was changed",
1484  MySubscription->name)));
1485 
1486  proc_exit(0);
1487  }
1488 
1489  /*
1490  * Exit if subscription name was changed (it's used for
1491  * fallback_application_name). The launcher will start new worker.
1492  */
1493  if (strcmp(newsub->name, MySubscription->name) != 0)
1494  {
1495  ereport(LOG,
1496  (errmsg("logical replication apply worker for subscription \"%s\" will "
1497  "restart because subscription was renamed",
1498  MySubscription->name)));
1499 
1500  proc_exit(0);
1501  }
1502 
1503  /* !slotname should never happen when enabled is true. */
1504  Assert(newsub->slotname);
1505 
1506  /*
1507  * We need to make new connection to new slot if slot name has changed so
1508  * exit here as well if that's the case.
1509  */
1510  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1511  {
1512  ereport(LOG,
1513  (errmsg("logical replication apply worker for subscription \"%s\" will "
1514  "restart because the replication slot name was changed",
1515  MySubscription->name)));
1516 
1517  proc_exit(0);
1518  }
1519 
1520  /*
1521  * Exit if publication list was changed. The launcher will start new
1522  * worker.
1523  */
1525  {
1526  ereport(LOG,
1527  (errmsg("logical replication apply worker for subscription \"%s\" will "
1528  "restart because subscription's publications were changed",
1529  MySubscription->name)));
1530 
1531  proc_exit(0);
1532  }
1533 
1534  /* Check for other changes that should never happen too. */
1535  if (newsub->dbid != MySubscription->dbid)
1536  {
1537  elog(ERROR, "subscription %u changed unexpectedly",
1539  }
1540 
1541  /* Clean old subscription info and switch to new one. */
1544 
1545  MemoryContextSwitchTo(oldctx);
1546 
1547  /* Change synchronous commit according to the user's wishes */
1548  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1550 
1551  if (started_tx)
1553 
1554  MySubscriptionValid = true;
1555 }
Subscription * MySubscription
Definition: worker.c:102
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3011
void CommitTransactionCommand(void)
Definition: xact.c:2898
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:104
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
#define ERROR
Definition: elog.h:43
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7550
List * publications
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext ApplyContext
Definition: worker.c:98
#define Assert(condition)
Definition: c.h:739
void StartTransactionCommand(void)
Definition: xact.c:2797
bool IsTransactionState(void)
Definition: xact.c:355
bool MySubscriptionValid
Definition: worker.c:103
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 1336 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

1337 {
1338  static StringInfo reply_message = NULL;
1339  static TimestampTz send_time = 0;
1340 
1341  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1342  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1343  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1344 
1345  XLogRecPtr writepos;
1346  XLogRecPtr flushpos;
1347  TimestampTz now;
1348  bool have_pending_txes;
1349 
1350  /*
1351  * If the user doesn't want status to be reported to the publisher, be
1352  * sure to exit before doing anything at all.
1353  */
1354  if (!force && wal_receiver_status_interval <= 0)
1355  return;
1356 
1357  /* It's legal to not pass a recvpos */
1358  if (recvpos < last_recvpos)
1359  recvpos = last_recvpos;
1360 
1361  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1362 
1363  /*
1364  * No outstanding transactions to flush, we can report the latest received
1365  * position. This is important for synchronous replication.
1366  */
1367  if (!have_pending_txes)
1368  flushpos = writepos = recvpos;
1369 
1370  if (writepos < last_writepos)
1371  writepos = last_writepos;
1372 
1373  if (flushpos < last_flushpos)
1374  flushpos = last_flushpos;
1375 
1376  now = GetCurrentTimestamp();
1377 
1378  /* if we've already reported everything we're good */
1379  if (!force &&
1380  writepos == last_writepos &&
1381  flushpos == last_flushpos &&
1382  !TimestampDifferenceExceeds(send_time, now,
1384  return;
1385  send_time = now;
1386 
1387  if (!reply_message)
1388  {
1390 
1391  reply_message = makeStringInfo();
1392  MemoryContextSwitchTo(oldctx);
1393  }
1394  else
1395  resetStringInfo(reply_message);
1396 
1397  pq_sendbyte(reply_message, 'r');
1398  pq_sendint64(reply_message, recvpos); /* write */
1399  pq_sendint64(reply_message, flushpos); /* flush */
1400  pq_sendint64(reply_message, writepos); /* apply */
1401  pq_sendint64(reply_message, now); /* sendTime */
1402  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1403 
1404  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1405  force,
1406  (uint32) (recvpos >> 32), (uint32) recvpos,
1407  (uint32) (writepos >> 32), (uint32) writepos,
1408  (uint32) (flushpos >> 32), (uint32) flushpos
1409  );
1410 
1411  walrcv_send(wrconn, reply_message->data, reply_message->len);
1412 
1413  if (recvpos > last_recvpos)
1414  last_recvpos = recvpos;
1415  if (writepos > last_writepos)
1416  last_writepos = writepos;
1417  if (flushpos > last_flushpos)
1418  last_flushpos = flushpos;
1419 }
WalReceiverConn * wrconn
Definition: worker.c:100
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:76
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1027
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:113
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:359
MemoryContext ApplyContext
Definition: worker.c:98
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:275
#define elog(elevel,...)
Definition: elog.h:228
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 131 of file worker.c.

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

132 {
133  if (am_tablesync_worker())
134  return MyLogicalRepWorker->relid == rel->localreloid;
135  else
136  return (rel->state == SUBREL_STATE_READY ||
137  (rel->state == SUBREL_STATE_SYNCDONE &&
138  rel->statelsn <= remote_final_lsn));
139 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:106
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
static bool am_tablesync_worker(void)

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 214 of file worker.c.

References attnum, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, LogicalRepRelation::natts, TupleDescData::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

216 {
217  TupleDesc desc = RelationGetDescr(rel->localrel);
218  int num_phys_attrs = desc->natts;
219  int i;
220  int attnum,
221  num_defaults = 0;
222  int *defmap;
223  ExprState **defexprs;
224  ExprContext *econtext;
225 
226  econtext = GetPerTupleExprContext(estate);
227 
228  /* We got all the data via replication, no need to evaluate anything. */
229  if (num_phys_attrs == rel->remoterel.natts)
230  return;
231 
232  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
233  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
234 
235  for (attnum = 0; attnum < num_phys_attrs; attnum++)
236  {
237  Expr *defexpr;
238 
239  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
240  continue;
241 
242  if (rel->attrmap[attnum] >= 0)
243  continue;
244 
245  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
246 
247  if (defexpr != NULL)
248  {
249  /* Run the expression through planner */
250  defexpr = expression_planner(defexpr);
251 
252  /* Initialize executable expression in copycontext */
253  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
254  defmap[num_defaults] = attnum;
255  num_defaults++;
256  }
257 
258  }
259 
260  for (i = 0; i < num_defaults; i++)
261  slot->tts_values[defmap[i]] =
262  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
263 }
#define RelationGetDescr(relation)
Definition: rel.h:448
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Expr * expression_planner(Expr *expr)
Definition: planner.c:6051
Datum * tts_values
Definition: tuptable.h:126
#define GetPerTupleExprContext(estate)
Definition: executor.h:501
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:285
Node * build_column_default(Relation rel, int attrno)
int16 attnum
Definition: pg_attribute.h:79
void * palloc(Size size)
Definition: mcxt.c:949
int i
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:121

◆ slot_modify_cstrings()

static void slot_modify_cstrings ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
char **  values,
bool replaces 
)
static

Definition at line 377 of file worker.c.

References ErrorContextCallback::arg, Assert, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_update().

380 {
381  int natts = slot->tts_tupleDescriptor->natts;
382  int i;
383  SlotErrCallbackArg errarg;
384  ErrorContextCallback errcallback;
385 
386  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
387  ExecClearTuple(slot);
388 
389  /*
390  * Copy all the column data from srcslot, so that we'll have valid values
391  * for unreplaced columns.
392  */
393  Assert(natts == srcslot->tts_tupleDescriptor->natts);
394  slot_getallattrs(srcslot);
395  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
396  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
397 
398  /* For error reporting, push callback + info on the error context stack */
399  errarg.rel = rel;
400  errarg.local_attnum = -1;
401  errarg.remote_attnum = -1;
402  errcallback.callback = slot_store_error_callback;
403  errcallback.arg = (void *) &errarg;
404  errcallback.previous = error_context_stack;
405  error_context_stack = &errcallback;
406 
407  /* Call the "in" function for each replaced attribute */
408  for (i = 0; i < natts; i++)
409  {
411  int remoteattnum = rel->attrmap[i];
412 
413  if (remoteattnum < 0)
414  continue;
415 
416  if (!replaces[remoteattnum])
417  continue;
418 
419  if (values[remoteattnum] != NULL)
420  {
421  Oid typinput;
422  Oid typioparam;
423 
424  errarg.local_attnum = i;
425  errarg.remote_attnum = remoteattnum;
426 
427  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
428  slot->tts_values[i] =
429  OidInputFunctionCall(typinput, values[remoteattnum],
430  typioparam, att->atttypmod);
431  slot->tts_isnull[i] = false;
432 
433  errarg.local_attnum = -1;
434  errarg.remote_attnum = -1;
435  }
436  else
437  {
438  slot->tts_values[i] = (Datum) 0;
439  slot->tts_isnull[i] = true;
440  }
441  }
442 
443  /* Pop the error context stack */
444  error_context_stack = errcallback.previous;
445 
446  /* And finally, declare that "slot" contains a valid virtual tuple */
447  ExecStoreVirtualTuple(slot);
448 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:92
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:256
struct ErrorContextCallback * previous
Definition: elog.h:255
ErrorContextCallback * error_context_stack
Definition: elog.c:91
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
#define Assert(condition)
Definition: c.h:739
static void slot_store_error_callback(void *arg)
Definition: worker.c:269
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_cstrings()

static void slot_store_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values 
)
static

Definition at line 304 of file worker.c.

References ErrorContextCallback::arg, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

306 {
307  int natts = slot->tts_tupleDescriptor->natts;
308  int i;
309  SlotErrCallbackArg errarg;
310  ErrorContextCallback errcallback;
311 
312  ExecClearTuple(slot);
313 
314  /* Push callback + info on the error context stack */
315  errarg.rel = rel;
316  errarg.local_attnum = -1;
317  errarg.remote_attnum = -1;
318  errcallback.callback = slot_store_error_callback;
319  errcallback.arg = (void *) &errarg;
320  errcallback.previous = error_context_stack;
321  error_context_stack = &errcallback;
322 
323  /* Call the "in" function for each non-dropped attribute */
324  for (i = 0; i < natts; i++)
325  {
327  int remoteattnum = rel->attrmap[i];
328 
329  if (!att->attisdropped && remoteattnum >= 0 &&
330  values[remoteattnum] != NULL)
331  {
332  Oid typinput;
333  Oid typioparam;
334 
335  errarg.local_attnum = i;
336  errarg.remote_attnum = remoteattnum;
337 
338  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
339  slot->tts_values[i] =
340  OidInputFunctionCall(typinput, values[remoteattnum],
341  typioparam, att->atttypmod);
342  slot->tts_isnull[i] = false;
343 
344  errarg.local_attnum = -1;
345  errarg.remote_attnum = -1;
346  }
347  else
348  {
349  /*
350  * We assign NULL to dropped attributes, NULL values, and missing
351  * values (missing values should be later filled using
352  * slot_fill_defaults).
353  */
354  slot->tts_values[i] = (Datum) 0;
355  slot->tts_isnull[i] = true;
356  }
357  }
358 
359  /* Pop the error context stack */
360  error_context_stack = errcallback.previous;
361 
362  ExecStoreVirtualTuple(slot);
363 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:92
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:256
struct ErrorContextCallback * previous
Definition: elog.h:255
ErrorContextCallback * error_context_stack
Definition: elog.c:91
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
static void slot_store_error_callback(void *arg)
Definition: worker.c:269
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_error_callback()

static void slot_store_error_callback ( void *  arg)
static

Definition at line 269 of file worker.c.

References LogicalRepRelation::attnames, LogicalRepRelation::atttyps, errcontext, format_type_be(), get_atttype(), SlotErrCallbackArg::local_attnum, LogicalRepRelMapEntry::localreloid, logicalrep_typmap_gettypname(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, LogicalRepRelation::relname, SlotErrCallbackArg::remote_attnum, and LogicalRepRelMapEntry::remoterel.

Referenced by slot_modify_cstrings(), and slot_store_cstrings().

270 {
273  char *remotetypname;
274  Oid remotetypoid,
275  localtypoid;
276 
277  /* Nothing to do if remote attribute number is not set */
278  if (errarg->remote_attnum < 0)
279  return;
280 
281  rel = errarg->rel;
282  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
283 
284  /* Fetch remote type name from the LogicalRepTypMap cache */
285  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
286 
287  /* Fetch local type OID from the local sys cache */
288  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
289 
290  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
291  "remote type %s, local type %s",
292  rel->remoterel.nspname, rel->remoterel.relname,
293  rel->remoterel.attnames[errarg->remote_attnum],
294  remotetypname,
295  format_type_be(localtypoid));
296 }
LogicalRepRelMapEntry * rel
Definition: worker.c:92
char * format_type_be(Oid type_oid)
Definition: format_type.c:326
unsigned int Oid
Definition: postgres_ext.h:31
LogicalRepRelation remoterel
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:861
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:423
#define errcontext
Definition: elog.h:183
void * arg

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 1071 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit().

1072 {
1073  FlushPosition *flushpos;
1074 
1075  /* Need to do this in permanent context */
1077 
1078  /* Track commit lsn */
1079  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1080  flushpos->local_end = XactLastCommitEnd;
1081  flushpos->remote_end = remote_lsn;
1082 
1083  dlist_push_tail(&lsn_mapping, &flushpos->node);
1085 }
static dlist_head lsn_mapping
Definition: worker.c:88
dlist_node node
Definition: worker.c:83
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:353
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:85
MemoryContext ApplyContext
Definition: worker.c:98
XLogRecPtr local_end
Definition: worker.c:84
static MemoryContext ApplyMessageContext
Definition: worker.c:97
void * palloc(Size size)
Definition: mcxt.c:949

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1561 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

1562 {
1563  MySubscriptionValid = false;
1564 }
bool MySubscriptionValid
Definition: worker.c:103

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Definition at line 1090 of file worker.c.

References GetCurrentTimestamp(), LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, MyLogicalRepWorker, LogicalRepWorker::reply_lsn, and LogicalRepWorker::reply_time.

Referenced by LogicalRepApplyLoop().

1091 {
1092  MyLogicalRepWorker->last_lsn = last_lsn;
1093  MyLogicalRepWorker->last_send_time = send_time;
1095  if (reply)
1096  {
1097  MyLogicalRepWorker->reply_lsn = last_lsn;
1098  MyLogicalRepWorker->reply_time = send_time;
1099  }
1100 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
TimestampTz last_send_time
XLogRecPtr last_lsn
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
XLogRecPtr reply_lsn
TimestampTz last_recv_time
TimestampTz reply_time

Variable Documentation

◆ ApplyContext

MemoryContext ApplyContext = NULL

Definition at line 98 of file worker.c.

◆ ApplyMessageContext

MemoryContext ApplyMessageContext = NULL
static

Definition at line 97 of file worker.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 115 of file worker.c.

Referenced by logicalrep_worker_sighup(), and LogicalRepApplyLoop().

◆ in_remote_transaction

bool in_remote_transaction = false

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 88 of file worker.c.

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false

Definition at line 103 of file worker.c.

Referenced by ApplyWorkerMain(), maybe_reread_subscription(), and subscription_change_cb().

◆ remote_final_lsn

XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
static

Definition at line 106 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), and should_apply_changes_for_rel().

◆ wrconn