PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "funcapi.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "nodes/makefuncs.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/timeout.h"
#include "utils/tqual.h"
#include "utils/syscache.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 

Functions

static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static bool ensure_transaction (void)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
 
static void slot_modify_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void logicalrep_worker_sighup (SIGNAL_ARGS)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static volatile sig_atomic_t got_SIGHUP = false
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 93 of file worker.c.

Referenced by LogicalRepApplyLoop().

Typedef Documentation

◆ FlushPosition

◆ SlotErrCallbackArg

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 960 of file worker.c.

References generate_unaccent_rules::action, apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, and pq_getmsgbyte().

Referenced by LogicalRepApplyLoop().

961 {
962  char action = pq_getmsgbyte(s);
963 
964  switch (action)
965  {
966  /* BEGIN */
967  case 'B':
969  break;
970  /* COMMIT */
971  case 'C':
973  break;
974  /* INSERT */
975  case 'I':
977  break;
978  /* UPDATE */
979  case 'U':
981  break;
982  /* DELETE */
983  case 'D':
985  break;
986  /* TRUNCATE */
987  case 'T':
989  break;
990  /* RELATION */
991  case 'R':
993  break;
994  /* TYPE */
995  case 'Y':
997  break;
998  /* ORIGIN */
999  case 'O':
1001  break;
1002  default:
1003  ereport(ERROR,
1004  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1005  errmsg("invalid logical replication message type \"%c\"", action)));
1006  }
1007 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:555
static void apply_handle_insert(StringInfo s)
Definition: worker.c:585
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static void apply_handle_delete(StringInfo s)
Definition: worker.c:800
static void apply_handle_begin(StringInfo s)
Definition: worker.c:455
#define ereport(elevel, rest)
Definition: elog.h:122
static void apply_handle_commit(StringInfo s)
Definition: worker.c:474
static void apply_handle_update(StringInfo s)
Definition: worker.c:679
static void apply_handle_relation(StringInfo s)
Definition: worker.c:540
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_handle_origin(StringInfo s)
Definition: worker.c:518
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:900

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 455 of file worker.c.

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), pgstat_report_activity(), remote_final_lsn, and STATE_RUNNING.

Referenced by apply_dispatch().

456 {
457  LogicalRepBeginData begin_data;
458 
459  logicalrep_read_begin(s, &begin_data);
460 
461  remote_final_lsn = begin_data.final_lsn;
462 
463  in_remote_transaction = true;
464 
466 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:120
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
bool in_remote_transaction
Definition: worker.c:119
XLogRecPtr final_lsn
Definition: logicalproto.h:66

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 474 of file worker.c.

References AcceptInvalidationMessages(), am_tablesync_worker(), Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), logicalrep_read_commit(), maybe_reread_subscription(), pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), remote_final_lsn, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, STATE_IDLE, and store_flush_position().

Referenced by apply_dispatch().

475 {
476  LogicalRepCommitData commit_data;
477 
478  logicalrep_read_commit(s, &commit_data);
479 
480  Assert(commit_data.commit_lsn == remote_final_lsn);
481 
482  /* The synchronization worker runs in single transaction. */
484  {
485  /*
486  * Update origin state so we can restart streaming from correct
487  * position in case of crash.
488  */
491 
493  pgstat_report_stat(false);
494 
495  store_flush_position(commit_data.end_lsn);
496  }
497  else
498  {
499  /* Process any invalidation messages that might have accumulated. */
502  }
503 
504  in_remote_transaction = false;
505 
506  /* Process any tables that are being synchronized in parallel. */
507  process_syncing_tables(commit_data.end_lsn);
508 
510 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1067
void AcceptInvalidationMessages(void)
Definition: inval.c:679
static XLogRecPtr remote_final_lsn
Definition: worker.c:120
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:544
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
void CommitTransactionCommand(void)
Definition: xact.c:2744
bool in_remote_transaction
Definition: worker.c:119
static bool am_tablesync_worker(void)
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
static void maybe_reread_subscription(void)
Definition: worker.c:1424
#define Assert(condition)
Definition: c.h:699
bool IsTransactionState(void)
Definition: xact.c:350
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
XLogRecPtr commit_lsn
Definition: logicalproto.h:73
TimestampTz committime
Definition: logicalproto.h:75
void pgstat_report_stat(bool force)
Definition: pgstat.c:812

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 800 of file worker.c.

References AfterTriggerEndQuery(), Assert, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, ensure_transaction(), ereport, errmsg(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationDelete(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_cstrings(), and LogicalRepTupleData::values.

Referenced by apply_dispatch().

801 {
803  LogicalRepTupleData oldtup;
804  LogicalRepRelId relid;
805  Oid idxoid;
806  EState *estate;
807  EPQState epqstate;
808  TupleTableSlot *remoteslot;
809  TupleTableSlot *localslot;
810  bool found;
811  MemoryContext oldctx;
812 
814 
815  relid = logicalrep_read_delete(s, &oldtup);
818  {
819  /*
820  * The relation can't become interesting in the middle of the
821  * transaction so it's safe to unlock it.
822  */
824  return;
825  }
826 
827  /* Check if we can do the delete. */
829 
830  /* Initialize the executor state. */
831  estate = create_estate_for_relation(rel);
832  remoteslot = ExecInitExtraTupleSlot(estate,
833  RelationGetDescr(rel->localrel));
834  localslot = ExecInitExtraTupleSlot(estate,
835  RelationGetDescr(rel->localrel));
836  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
837 
840 
841  /* Find the tuple using the replica identity index. */
843  slot_store_cstrings(remoteslot, rel, oldtup.values);
844  MemoryContextSwitchTo(oldctx);
845 
846  /*
847  * Try to find tuple using either replica identity index, primary key or
848  * if needed, sequential scan.
849  */
850  idxoid = GetRelationIdentityOrPK(rel->localrel);
851  Assert(OidIsValid(idxoid) ||
852  (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
853 
854  if (OidIsValid(idxoid))
855  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
857  remoteslot, localslot);
858  else
860  remoteslot, localslot);
861  /* If found delete it. */
862  if (found)
863  {
864  EvalPlanQualSetSlot(&epqstate, localslot);
865 
866  /* Do the actual delete. */
867  ExecSimpleRelationDelete(estate, &epqstate, localslot);
868  }
869  else
870  {
871  /* The tuple to be deleted could not be found. */
872  ereport(DEBUG1,
873  (errmsg("logical replication could not find row for delete "
874  "in replication target relation \"%s\"",
876  }
877 
878  /* Cleanup. */
881 
882  /* Handle queued AFTER triggers. */
883  AfterTriggerEndQuery(estate);
884 
885  EvalPlanQualEnd(&epqstate);
886  ExecResetTupleTable(estate->es_tupleTable, false);
887  FreeExecutorState(estate);
888 
890 
892 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:145
#define NIL
Definition: pg_list.h:69
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:190
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:569
#define DEBUG1
Definition: elog.h:25
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:280
#define RelationGetDescr(relation)
Definition: rel.h:433
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
#define OidIsValid(objectId)
Definition: c.h:605
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc)
Definition: execTuples.c:931
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3251
void FreeExecutorState(EState *estate)
Definition: execUtils.c:188
static bool ensure_transaction(void)
Definition: worker.c:161
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:321
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:441
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
#define ereport(elevel, rest)
Definition: elog.h:122
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:645
List * es_tupleTable
Definition: execnodes.h:525
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:186
void CommandCounterIncrement(void)
Definition: xact.c:914
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2828
#define Assert(condition)
Definition: c.h:699
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:494
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4785
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:208
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:492

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 585 of file worker.c.

References AfterTriggerEndQuery(), CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationInsert(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_cstrings(), and LogicalRepTupleData::values.

Referenced by apply_dispatch().

586 {
588  LogicalRepTupleData newtup;
589  LogicalRepRelId relid;
590  EState *estate;
591  TupleTableSlot *remoteslot;
592  MemoryContext oldctx;
593 
595 
596  relid = logicalrep_read_insert(s, &newtup);
599  {
600  /*
601  * The relation can't become interesting in the middle of the
602  * transaction so it's safe to unlock it.
603  */
605  return;
606  }
607 
608  /* Initialize the executor state. */
609  estate = create_estate_for_relation(rel);
610  remoteslot = ExecInitExtraTupleSlot(estate,
611  RelationGetDescr(rel->localrel));
612 
613  /* Process and store remote tuple in the slot */
615  slot_store_cstrings(remoteslot, rel, newtup.values);
616  slot_fill_defaults(rel, estate, remoteslot);
617  MemoryContextSwitchTo(oldctx);
618 
621 
622  /* Do the insert. */
623  ExecSimpleRelationInsert(estate, remoteslot);
624 
625  /* Cleanup. */
628 
629  /* Handle queued AFTER triggers. */
630  AfterTriggerEndQuery(estate);
631 
632  ExecResetTupleTable(estate->es_tupleTable, false);
633  FreeExecutorState(estate);
634 
636 
638 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:145
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:190
#define RelationGetDescr(relation)
Definition: rel.h:433
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc)
Definition: execTuples.c:931
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:231
void FreeExecutorState(EState *estate)
Definition: execUtils.c:188
static bool ensure_transaction(void)
Definition: worker.c:161
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:321
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * es_tupleTable
Definition: execnodes.h:525
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:186
void CommandCounterIncrement(void)
Definition: xact.c:914
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:494
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4785
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
uint32 LogicalRepRelId
Definition: logicalproto.h:39
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:492

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 518 of file worker.c.

References am_tablesync_worker(), ereport, errcode(), errmsg(), ERROR, in_remote_transaction, and IsTransactionState().

Referenced by apply_dispatch().

519 {
520  /*
521  * ORIGIN message can only come inside remote transaction and before any
522  * actual writes.
523  */
524  if (!in_remote_transaction ||
526  ereport(ERROR,
527  (errcode(ERRCODE_PROTOCOL_VIOLATION),
528  errmsg("ORIGIN message sent out of order")));
529 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
bool in_remote_transaction
Definition: worker.c:119
static bool am_tablesync_worker(void)
#define ereport(elevel, rest)
Definition: elog.h:122
bool IsTransactionState(void)
Definition: xact.c:350
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 540 of file worker.c.

References logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

541 {
542  LogicalRepRelation *rel;
543 
544  rel = logicalrep_read_rel(s);
546 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:379
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:154

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 900 of file worker.c.

References CommandCounterIncrement(), DROP_RESTRICT, ensure_transaction(), ExecuteTruncateGuts(), lappend(), lappend_oid(), lfirst, lfirst_oid, LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationIsLogicallyLogged, RowExclusiveLock, and should_apply_changes_for_rel().

Referenced by apply_dispatch().

901 {
902  bool cascade = false;
903  bool restart_seqs = false;
904  List *remote_relids = NIL;
905  List *remote_rels = NIL;
906  List *rels = NIL;
907  List *relids = NIL;
908  List *relids_logged = NIL;
909  ListCell *lc;
910 
912 
913  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
914 
915  foreach(lc, remote_relids)
916  {
917  LogicalRepRelId relid = lfirst_oid(lc);
919 
922  {
923  /*
924  * The relation can't become interesting in the middle of the
925  * transaction so it's safe to unlock it.
926  */
928  continue;
929  }
930 
931  remote_rels = lappend(remote_rels, rel);
932  rels = lappend(rels, rel->localrel);
933  relids = lappend_oid(relids, rel->localreloid);
935  relids_logged = lappend_oid(relids_logged, rel->localreloid);
936  }
937 
938  /*
939  * Even if we used CASCADE on the upstream master we explicitly
940  * default to replaying changes without further cascading.
941  * This might be later changeable with a user specified option.
942  */
943  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
944 
945  foreach(lc, remote_rels)
946  {
947  LogicalRepRelMapEntry *rel = lfirst(lc);
948 
950  }
951 
953 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:145
#define NIL
Definition: pg_list.h:69
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:329
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:580
List * lappend_oid(List *list, Oid datum)
Definition: list.c:164
static bool ensure_transaction(void)
Definition: worker.c:161
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1411
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * lappend(List *list, void *datum)
Definition: list.c:128
void CommandCounterIncrement(void)
Definition: xact.c:914
#define lfirst(lc)
Definition: pg_list.h:106
Definition: pg_list.h:45
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define lfirst_oid(lc)
Definition: pg_list.h:108

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 555 of file worker.c.

References logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

556 {
557  LogicalRepTyp typ;
558 
559  logicalrep_read_typ(s, &typ);
561 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:431
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:388

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 679 of file worker.c.

References AfterTriggerEndQuery(), Assert, LogicalRepTupleData::changed, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationUpdate(), ExecStoreTuple(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), InvalidBuffer, LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_modify_cstrings(), slot_store_cstrings(), TupleTableSlot::tts_tuple, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

680 {
682  LogicalRepRelId relid;
683  Oid idxoid;
684  EState *estate;
685  EPQState epqstate;
686  LogicalRepTupleData oldtup;
687  LogicalRepTupleData newtup;
688  bool has_oldtup;
689  TupleTableSlot *localslot;
690  TupleTableSlot *remoteslot;
691  bool found;
692  MemoryContext oldctx;
693 
695 
696  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
697  &newtup);
700  {
701  /*
702  * The relation can't become interesting in the middle of the
703  * transaction so it's safe to unlock it.
704  */
706  return;
707  }
708 
709  /* Check if we can do the update. */
711 
712  /* Initialize the executor state. */
713  estate = create_estate_for_relation(rel);
714  remoteslot = ExecInitExtraTupleSlot(estate,
715  RelationGetDescr(rel->localrel));
716  localslot = ExecInitExtraTupleSlot(estate,
717  RelationGetDescr(rel->localrel));
718  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
719 
722 
723  /* Build the search tuple. */
725  slot_store_cstrings(remoteslot, rel,
726  has_oldtup ? oldtup.values : newtup.values);
727  MemoryContextSwitchTo(oldctx);
728 
729  /*
730  * Try to find tuple using either replica identity index, primary key or
731  * if needed, sequential scan.
732  */
733  idxoid = GetRelationIdentityOrPK(rel->localrel);
734  Assert(OidIsValid(idxoid) ||
735  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
736 
737  if (OidIsValid(idxoid))
738  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
740  remoteslot, localslot);
741  else
743  remoteslot, localslot);
744 
745  ExecClearTuple(remoteslot);
746 
747  /*
748  * Tuple found.
749  *
750  * Note this will fail if there are other conflicting unique indexes.
751  */
752  if (found)
753  {
754  /* Process and store remote tuple in the slot */
756  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
757  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
758  MemoryContextSwitchTo(oldctx);
759 
760  EvalPlanQualSetSlot(&epqstate, remoteslot);
761 
762  /* Do the actual update. */
763  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
764  }
765  else
766  {
767  /*
768  * The tuple to be updated could not be found.
769  *
770  * TODO what to do here, change the log level to LOG perhaps?
771  */
772  elog(DEBUG1,
773  "logical replication did not find row for update "
774  "in replication target relation \"%s\"",
776  }
777 
778  /* Cleanup. */
781 
782  /* Handle queued AFTER triggers. */
783  AfterTriggerEndQuery(estate);
784 
785  EvalPlanQualEnd(&epqstate);
786  ExecResetTupleTable(estate->es_tupleTable, false);
787  FreeExecutorState(estate);
788 
790 
792 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:145
#define NIL
Definition: pg_list.h:69
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:356
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:190
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:569
#define DEBUG1
Definition: elog.h:25
#define RelationGetDescr(relation)
Definition: rel.h:433
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:475
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define InvalidBuffer
Definition: buf.h:25
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
#define OidIsValid(objectId)
Definition: c.h:605
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc)
Definition: execTuples.c:931
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3251
void FreeExecutorState(EState *estate)
Definition: execUtils.c:188
static bool ensure_transaction(void)
Definition: worker.c:161
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:321
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:441
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:645
List * es_tupleTable
Definition: execnodes.h:525
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:186
void CommandCounterIncrement(void)
Definition: xact.c:914
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2828
#define Assert(condition)
Definition: c.h:699
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:214
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:494
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4785
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:389
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
HeapTuple tts_tuple
Definition: tuptable.h:122
#define elog
Definition: elog.h:219
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:208
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:492

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1581 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), Assert, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, CurrentResourceOwner, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die(), elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sighup(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, ResourceOwnerCreate(), server_version, SetConfigOption(), SIGHUP, Subscription::slotname, WalRcvStreamOptions::slotname, snprintf(), WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

1582 {
1583  int worker_slot = DatumGetInt32(main_arg);
1584  MemoryContext oldctx;
1585  char originname[NAMEDATALEN];
1586  XLogRecPtr origin_startpos;
1587  char *myslotname;
1589 
1590  /* Attach to slot */
1591  logicalrep_worker_attach(worker_slot);
1592 
1593  /* Setup signal handling */
1595  pqsignal(SIGTERM, die);
1597 
1598  /* Initialise stats to a sanish value */
1601 
1602  /* Load the libpq-specific functions */
1603  load_file("libpqwalreceiver", false);
1604 
1605  Assert(CurrentResourceOwner == NULL);
1607  "logical replication apply");
1608 
1609  /* Run as replica session replication role. */
1610  SetConfigOption("session_replication_role", "replica",
1612 
1613  /* Connect to our database. */
1616  0);
1617 
1618  /* Load the subscription into persistent memory context. */
1620  "ApplyContext",
1624 
1626  if (!MySubscription)
1627  {
1628  ereport(LOG,
1629  (errmsg("logical replication apply worker for subscription %u will not "
1630  "start because the subscription was removed during startup",
1632  proc_exit(0);
1633  }
1634 
1635  MySubscriptionValid = true;
1636  MemoryContextSwitchTo(oldctx);
1637 
1638  if (!MySubscription->enabled)
1639  {
1640  ereport(LOG,
1641  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1642  "start because the subscription was disabled during startup",
1643  MySubscription->name)));
1644 
1645  proc_exit(0);
1646  }
1647 
1648  /* Setup synchronous commit according to the user's wishes */
1649  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1651 
1652  /* Keep us informed about subscription changes. */
1655  (Datum) 0);
1656 
1657  if (am_tablesync_worker())
1658  ereport(LOG,
1659  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1661  else
1662  ereport(LOG,
1663  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1664  MySubscription->name)));
1665 
1667 
1668  /* Connect to the origin and start the replication. */
1669  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1671 
1672  if (am_tablesync_worker())
1673  {
1674  char *syncslotname;
1675 
1676  /* This is table synchroniation worker, call initial sync. */
1677  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1678 
1679  /* The slot name needs to be allocated in permanent memory context. */
1681  myslotname = pstrdup(syncslotname);
1682  MemoryContextSwitchTo(oldctx);
1683 
1684  pfree(syncslotname);
1685  }
1686  else
1687  {
1688  /* This is main apply worker */
1689  RepOriginId originid;
1690  TimeLineID startpointTLI;
1691  char *err;
1692  int server_version;
1693 
1694  myslotname = MySubscription->slotname;
1695 
1696  /*
1697  * This shouldn't happen if the subscription is enabled, but guard
1698  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1699  * crash if slot is NULL.)
1700  */
1701  if (!myslotname)
1702  ereport(ERROR,
1703  (errmsg("subscription has no replication slot set")));
1704 
1705  /* Setup replication origin tracking. */
1707  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1708  originid = replorigin_by_name(originname, true);
1709  if (!OidIsValid(originid))
1710  originid = replorigin_create(originname);
1711  replorigin_session_setup(originid);
1712  replorigin_session_origin = originid;
1713  origin_startpos = replorigin_session_get_progress(false);
1715 
1717  &err);
1718  if (wrconn == NULL)
1719  ereport(ERROR,
1720  (errmsg("could not connect to the publisher: %s", err)));
1721 
1722  /*
1723  * We don't really use the output identify_system for anything but it
1724  * does some initializations on the upstream so let's still call it.
1725  */
1726  (void) walrcv_identify_system(wrconn, &startpointTLI,
1727  &server_version);
1728 
1729  }
1730 
1731  /*
1732  * Setup callback for syscache so that we know when something changes in
1733  * the subscription relation state.
1734  */
1737  (Datum) 0);
1738 
1739  /* Build logical replication streaming options. */
1740  options.logical = true;
1741  options.startpoint = origin_startpos;
1742  options.slotname = myslotname;
1743  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1744  options.proto.logical.publication_names = MySubscription->publications;
1745 
1746  /* Start normal logical streaming replication. */
1747  walrcv_startstreaming(wrconn, &options);
1748 
1749  /* Run the main loop. */
1750  LogicalRepApplyLoop(origin_startpos);
1751 
1752  proc_exit(0);
1753 }
Subscription * MySubscription
Definition: worker.c:116
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:114
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:45
#define DatumGetInt32(X)
Definition: postgres.h:455
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
ResourceOwner CurrentResourceOwner
Definition: resowner.c:140
char * pstrdup(const char *in)
Definition: mcxt.c:1161
void CommitTransactionCommand(void)
Definition: xact.c:2744
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:51
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:267
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1165
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1030
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:815
#define OidIsValid(objectId)
Definition: c.h:605
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
union WalRcvStreamOptions::@107 proto
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1560
void pfree(void *pointer)
Definition: mcxt.c:1031
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:263
void logicalrep_worker_attach(int slot)
Definition: launcher.c:612
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6903
#define SIGHUP
Definition: win32_port.h:163
XLogRecPtr startpoint
Definition: walreceiver.h:154
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:242
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:44
MemoryContext ApplyContext
Definition: worker.c:112
static char ** options
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:170
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1567
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
uintptr_t Datum
Definition: postgres.h:365
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5560
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
RepOriginId replorigin_session_origin
Definition: origin.c:155
void StartTransactionCommand(void)
Definition: xact.c:2673
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:79
bool MySubscriptionValid
Definition: worker.c:117
int errmsg(const char *fmt,...)
Definition: elog.c:797
void die(SIGNAL_ARGS)
Definition: postgres.c:2657
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1102
#define elog
Definition: elog.h:219
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:418
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5589
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 645 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

646 {
647  /* Updatable, no error. */
648  if (rel->updatable)
649  return;
650 
651  /*
652  * We are in error mode so it's fine this is somewhat slow. It's better to
653  * give user correct error.
654  */
656  {
657  ereport(ERROR,
658  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
659  errmsg("publisher did not send replica identity column "
660  "expected by the logical replication target relation \"%s.%s\"",
661  rel->remoterel.nspname, rel->remoterel.relname)));
662  }
663 
664  ereport(ERROR,
665  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
666  errmsg("logical replication target relation \"%s.%s\" has "
667  "neither REPLICA IDENTITY index nor PRIMARY "
668  "KEY and published relation does not have "
669  "REPLICA IDENTITY FULL",
670  rel->remoterel.nspname, rel->remoterel.relname)));
671 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:569
int errcode(int sqlerrcode)
Definition: elog.c:575
#define OidIsValid(objectId)
Definition: c.h:605
#define ERROR
Definition: elog.h:43
LogicalRepRelation remoterel
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ create_estate_for_relation()

static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 190 of file worker.c.

References AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_num_result_relations, EState::es_output_cid, EState::es_range_table, EState::es_result_relation_info, EState::es_result_relations, EState::es_trig_tuple_slot, ExecInitExtraTupleSlot(), GetCurrentCommandId(), InitResultRelInfo(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, ResultRelInfo::ri_TrigDesc, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

191 {
192  EState *estate;
193  ResultRelInfo *resultRelInfo;
194  RangeTblEntry *rte;
195 
196  estate = CreateExecutorState();
197 
198  rte = makeNode(RangeTblEntry);
199  rte->rtekind = RTE_RELATION;
200  rte->relid = RelationGetRelid(rel->localrel);
201  rte->relkind = rel->localrel->rd_rel->relkind;
202  estate->es_range_table = list_make1(rte);
203 
204  resultRelInfo = makeNode(ResultRelInfo);
205  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
206 
207  estate->es_result_relations = resultRelInfo;
208  estate->es_num_result_relations = 1;
209  estate->es_result_relation_info = resultRelInfo;
210 
211  estate->es_output_cid = GetCurrentCommandId(true);
212 
213  /* Triggers might need a slot */
214  if (resultRelInfo->ri_TrigDesc)
215  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
216 
217  /* Prepare to catch AFTER triggers. */
219 
220  return estate;
221 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1305
CommandId es_output_cid
Definition: execnodes.h:487
List * es_range_table
Definition: execnodes.h:480
Form_pg_class rd_rel
Definition: rel.h:84
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc)
Definition: execTuples.c:931
#define list_make1(x1)
Definition: pg_list.h:139
ResultRelInfo * es_result_relations
Definition: execnodes.h:490
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:512
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:409
EState * CreateExecutorState(void)
Definition: execUtils.c:80
int es_num_result_relations
Definition: execnodes.h:491
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4765
#define makeNode(_type_)
Definition: nodes.h:565
RTEKind rtekind
Definition: parsenodes.h:962
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:679
#define RelationGetRelid(relation)
Definition: rel.h:407
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:492

◆ ensure_transaction()

static bool ensure_transaction ( void  )
static

Definition at line 161 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), maybe_reread_subscription(), MemoryContextSwitchTo(), SetCurrentStatementStartTimestamp(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

162 {
163  if (IsTransactionState())
164  {
166 
169 
170  return false;
171  }
172 
175 
177 
179  return true;
180 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void maybe_reread_subscription(void)
Definition: worker.c:1424
static MemoryContext ApplyMessageContext
Definition: worker.c:111
void StartTransactionCommand(void)
Definition: xact.c:2673
bool IsTransactionState(void)
Definition: xact.c:350
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:732

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 1023 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, FlushPosition::node, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

1025 {
1026  dlist_mutable_iter iter;
1027  XLogRecPtr local_flush = GetFlushRecPtr();
1028 
1030  *flush = InvalidXLogRecPtr;
1031 
1033  {
1034  FlushPosition *pos =
1035  dlist_container(FlushPosition, node, iter.cur);
1036 
1037  *write = pos->remote_end;
1038 
1039  if (pos->local_end <= local_flush)
1040  {
1041  *flush = pos->remote_end;
1042  dlist_delete(iter.cur);
1043  pfree(pos);
1044  }
1045  else
1046  {
1047  /*
1048  * Don't want to uselessly iterate over the rest of the list which
1049  * could potentially be long. Instead get the last element and
1050  * grab the write position from there.
1051  */
1052  pos = dlist_tail_element(FlushPosition, node,
1053  &lsn_mapping);
1054  *write = pos->remote_end;
1055  *have_pending_txes = true;
1056  return;
1057  }
1058  }
1059 
1060  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1061 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:102
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:99
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8271
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1031
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:98
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 569 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by apply_handle_delete(), apply_handle_update(), and check_relation_updatable().

570 {
571  Oid idxoid;
572 
573  idxoid = RelationGetReplicaIndex(rel);
574 
575  if (!OidIsValid(idxoid))
576  idxoid = RelationGetPrimaryKeyIndex(rel);
577 
578  return idxoid;
579 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4521
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:605
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4500

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 1759 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

1760 {
1761  return MyLogicalRepWorker != NULL;
1762 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63

◆ logicalrep_worker_sighup()

static void logicalrep_worker_sighup ( SIGNAL_ARGS  )
static

Definition at line 1567 of file worker.c.

References got_SIGHUP, MyLatch, and SetLatch().

Referenced by ApplyWorkerMain().

1568 {
1569  int save_errno = errno;
1570 
1571  got_SIGHUP = true;
1572 
1573  /* Waken anything waiting on the process latch */
1574  SetLatch(MyLatch);
1575 
1576  errno = save_errno;
1577 }
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
struct Latch * MyLatch
Definition: globals.c:55
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:129

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 1102 of file worker.c.

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), got_SIGHUP, in_remote_transaction, StringInfoData::len, LOG, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), proc_exit(), process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

1103 {
1104  /*
1105  * Init the ApplyMessageContext which we clean up after each replication
1106  * protocol message.
1107  */
1109  "ApplyMessageContext",
1111 
1112  /* mark as idle, before starting to loop */
1114 
1115  for (;;)
1116  {
1118  int rc;
1119  int len;
1120  char *buf = NULL;
1121  bool endofstream = false;
1122  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1123  bool ping_sent = false;
1124  long wait_time;
1125 
1127 
1129 
1130  len = walrcv_receive(wrconn, &buf, &fd);
1131 
1132  if (len != 0)
1133  {
1134  /* Process the data */
1135  for (;;)
1136  {
1138 
1139  if (len == 0)
1140  {
1141  break;
1142  }
1143  else if (len < 0)
1144  {
1145  ereport(LOG,
1146  (errmsg("data stream from publisher has ended")));
1147  endofstream = true;
1148  break;
1149  }
1150  else
1151  {
1152  int c;
1153  StringInfoData s;
1154 
1155  /* Reset timeout. */
1156  last_recv_timestamp = GetCurrentTimestamp();
1157  ping_sent = false;
1158 
1159  /* Ensure we are reading the data into our memory context. */
1161 
1162  s.data = buf;
1163  s.len = len;
1164  s.cursor = 0;
1165  s.maxlen = -1;
1166 
1167  c = pq_getmsgbyte(&s);
1168 
1169  if (c == 'w')
1170  {
1171  XLogRecPtr start_lsn;
1172  XLogRecPtr end_lsn;
1173  TimestampTz send_time;
1174 
1175  start_lsn = pq_getmsgint64(&s);
1176  end_lsn = pq_getmsgint64(&s);
1177  send_time = pq_getmsgint64(&s);
1178 
1179  if (last_received < start_lsn)
1180  last_received = start_lsn;
1181 
1182  if (last_received < end_lsn)
1183  last_received = end_lsn;
1184 
1185  UpdateWorkerStats(last_received, send_time, false);
1186 
1187  apply_dispatch(&s);
1188  }
1189  else if (c == 'k')
1190  {
1191  XLogRecPtr end_lsn;
1193  bool reply_requested;
1194 
1195  end_lsn = pq_getmsgint64(&s);
1196  timestamp = pq_getmsgint64(&s);
1197  reply_requested = pq_getmsgbyte(&s);
1198 
1199  if (last_received < end_lsn)
1200  last_received = end_lsn;
1201 
1202  send_feedback(last_received, reply_requested, false);
1203  UpdateWorkerStats(last_received, timestamp, true);
1204  }
1205  /* other message types are purposefully ignored */
1206 
1208  }
1209 
1210  len = walrcv_receive(wrconn, &buf, &fd);
1211  }
1212  }
1213 
1214  /* confirm all writes so far */
1215  send_feedback(last_received, false, false);
1216 
1217  if (!in_remote_transaction)
1218  {
1219  /*
1220  * If we didn't get any transactions for a while there might be
1221  * unconsumed invalidation messages in the queue, consume them
1222  * now.
1223  */
1226 
1227  /* Process any table synchronization changes. */
1228  process_syncing_tables(last_received);
1229  }
1230 
1231  /* Cleanup the memory. */
1234 
1235  /* Check if we need to exit the streaming loop. */
1236  if (endofstream)
1237  {
1238  TimeLineID tli;
1239 
1240  walrcv_endstreaming(wrconn, &tli);
1241  break;
1242  }
1243 
1244  /*
1245  * Wait for more data or latch. If we have unflushed transactions,
1246  * wake up after WalWriterDelay to see if they've been flushed yet (in
1247  * which case we should send a feedback message). Otherwise, there's
1248  * no particular urgency about waking up unless we get data or a
1249  * signal.
1250  */
1251  if (!dlist_is_empty(&lsn_mapping))
1252  wait_time = WalWriterDelay;
1253  else
1254  wait_time = NAPTIME_PER_CYCLE;
1255 
1259  fd, wait_time,
1261 
1262  /* Emergency bailout if postmaster has died */
1263  if (rc & WL_POSTMASTER_DEATH)
1264  proc_exit(1);
1265 
1266  if (rc & WL_LATCH_SET)
1267  {
1270  }
1271 
1272  if (got_SIGHUP)
1273  {
1274  got_SIGHUP = false;
1276  }
1277 
1278  if (rc & WL_TIMEOUT)
1279  {
1280  /*
1281  * We didn't receive anything new. If we haven't heard anything
1282  * from the server for more than wal_receiver_timeout / 2, ping
1283  * the server. Also, if it's been longer than
1284  * wal_receiver_status_interval since the last update we sent,
1285  * send a status update to the master anyway, to report any
1286  * progress in applying WAL.
1287  */
1288  bool requestReply = false;
1289 
1290  /*
1291  * Check if time since last receive from standby has reached the
1292  * configured limit.
1293  */
1294  if (wal_receiver_timeout > 0)
1295  {
1297  TimestampTz timeout;
1298 
1299  timeout =
1300  TimestampTzPlusMilliseconds(last_recv_timestamp,
1302 
1303  if (now >= timeout)
1304  ereport(ERROR,
1305  (errmsg("terminating logical replication worker due to timeout")));
1306 
1307  /*
1308  * We didn't receive anything new, for half of receiver
1309  * replication timeout. Ping the server.
1310  */
1311  if (!ping_sent)
1312  {
1313  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1314  (wal_receiver_timeout / 2));
1315  if (now >= timeout)
1316  {
1317  requestReply = true;
1318  ping_sent = true;
1319  }
1320  }
1321  }
1322 
1323  send_feedback(last_received, requestReply, requestReply);
1324  }
1325  }
1326 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1335
WalReceiverConn * wrconn
Definition: worker.c:114
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:269
uint32 TimeLineID
Definition: xlogdefs.h:45
void AcceptInvalidationMessages(void)
Definition: inval.c:679
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static dlist_head lsn_mapping
Definition: worker.c:102
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:544
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1086
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:271
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:104
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
int wal_receiver_timeout
Definition: walreceiver.c:76
#define ERROR
Definition: elog.h:43
#define NAPTIME_PER_CYCLE
Definition: worker.c:93
bool in_remote_transaction
Definition: worker.c:119
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * c
static char * buf
Definition: pg_test_fsync.c:67
int pgsocket
Definition: port.h:31
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:44
Definition: guc.h:72
MemoryContext ApplyContext
Definition: worker.c:112
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:170
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define PGINVALID_SOCKET
Definition: port.h:33
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_dispatch(StringInfo s)
Definition: worker.c:960
static void maybe_reread_subscription(void)
Definition: worker.c:1424
static MemoryContext ApplyMessageContext
Definition: worker.c:111
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
int WalWriterDelay
Definition: walwriter.c:68
int errmsg(const char *fmt,...)
Definition: elog.c:797
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define WL_LATCH_SET
Definition: latch.h:124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:129

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 1424 of file worker.c.

References Assert, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), LogicalRepWorker::subid, and Subscription::synccommit.

Referenced by apply_handle_commit(), ensure_transaction(), and LogicalRepApplyLoop().

1425 {
1426  MemoryContext oldctx;
1428  bool started_tx = false;
1429 
1430  /* When cache state is valid there is nothing to do here. */
1431  if (MySubscriptionValid)
1432  return;
1433 
1434  /* This function might be called inside or outside of transaction. */
1435  if (!IsTransactionState())
1436  {
1438  started_tx = true;
1439  }
1440 
1441  /* Ensure allocations in permanent context. */
1443 
1444  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1445 
1446  /*
1447  * Exit if the subscription was removed. This normally should not happen
1448  * as the worker gets killed during DROP SUBSCRIPTION.
1449  */
1450  if (!newsub)
1451  {
1452  ereport(LOG,
1453  (errmsg("logical replication apply worker for subscription \"%s\" will "
1454  "stop because the subscription was removed",
1455  MySubscription->name)));
1456 
1457  proc_exit(0);
1458  }
1459 
1460  /*
1461  * Exit if the subscription was disabled. This normally should not happen
1462  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1463  */
1464  if (!newsub->enabled)
1465  {
1466  ereport(LOG,
1467  (errmsg("logical replication apply worker for subscription \"%s\" will "
1468  "stop because the subscription was disabled",
1469  MySubscription->name)));
1470 
1471  proc_exit(0);
1472  }
1473 
1474  /*
1475  * Exit if connection string was changed. The launcher will start new
1476  * worker.
1477  */
1478  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1479  {
1480  ereport(LOG,
1481  (errmsg("logical replication apply worker for subscription \"%s\" will "
1482  "restart because the connection information was changed",
1483  MySubscription->name)));
1484 
1485  proc_exit(0);
1486  }
1487 
1488  /*
1489  * Exit if subscription name was changed (it's used for
1490  * fallback_application_name). The launcher will start new worker.
1491  */
1492  if (strcmp(newsub->name, MySubscription->name) != 0)
1493  {
1494  ereport(LOG,
1495  (errmsg("logical replication apply worker for subscription \"%s\" will "
1496  "restart because subscription was renamed",
1497  MySubscription->name)));
1498 
1499  proc_exit(0);
1500  }
1501 
1502  /* !slotname should never happen when enabled is true. */
1503  Assert(newsub->slotname);
1504 
1505  /*
1506  * We need to make new connection to new slot if slot name has changed so
1507  * exit here as well if that's the case.
1508  */
1509  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1510  {
1511  ereport(LOG,
1512  (errmsg("logical replication apply worker for subscription \"%s\" will "
1513  "restart because the replication slot name was changed",
1514  MySubscription->name)));
1515 
1516  proc_exit(0);
1517  }
1518 
1519  /*
1520  * Exit if publication list was changed. The launcher will start new
1521  * worker.
1522  */
1524  {
1525  ereport(LOG,
1526  (errmsg("logical replication apply worker for subscription \"%s\" will "
1527  "restart because subscription's publications were changed",
1528  MySubscription->name)));
1529 
1530  proc_exit(0);
1531  }
1532 
1533  /* Check for other changes that should never happen too. */
1534  if (newsub->dbid != MySubscription->dbid)
1535  {
1536  elog(ERROR, "subscription %u changed unexpectedly",
1538  }
1539 
1540  /* Clean old subscription info and switch to new one. */
1543 
1544  MemoryContextSwitchTo(oldctx);
1545 
1546  /* Change synchronous commit according to the user's wishes */
1547  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1549 
1550  if (started_tx)
1552 
1553  MySubscriptionValid = true;
1554 }
Subscription * MySubscription
Definition: worker.c:116
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2986
void CommitTransactionCommand(void)
Definition: xact.c:2744
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:104
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6903
List * publications
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext ApplyContext
Definition: worker.c:112
#define Assert(condition)
Definition: c.h:699
void StartTransactionCommand(void)
Definition: xact.c:2673
bool IsTransactionState(void)
Definition: xact.c:350
bool MySubscriptionValid
Definition: worker.c:117
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 1335 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

1336 {
1337  static StringInfo reply_message = NULL;
1338  static TimestampTz send_time = 0;
1339 
1340  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1341  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1342  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1343 
1344  XLogRecPtr writepos;
1345  XLogRecPtr flushpos;
1346  TimestampTz now;
1347  bool have_pending_txes;
1348 
1349  /*
1350  * If the user doesn't want status to be reported to the publisher, be
1351  * sure to exit before doing anything at all.
1352  */
1353  if (!force && wal_receiver_status_interval <= 0)
1354  return;
1355 
1356  /* It's legal to not pass a recvpos */
1357  if (recvpos < last_recvpos)
1358  recvpos = last_recvpos;
1359 
1360  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1361 
1362  /*
1363  * No outstanding transactions to flush, we can report the latest received
1364  * position. This is important for synchronous replication.
1365  */
1366  if (!have_pending_txes)
1367  flushpos = writepos = recvpos;
1368 
1369  if (writepos < last_writepos)
1370  writepos = last_writepos;
1371 
1372  if (flushpos < last_flushpos)
1373  flushpos = last_flushpos;
1374 
1375  now = GetCurrentTimestamp();
1376 
1377  /* if we've already reported everything we're good */
1378  if (!force &&
1379  writepos == last_writepos &&
1380  flushpos == last_flushpos &&
1381  !TimestampDifferenceExceeds(send_time, now,
1383  return;
1384  send_time = now;
1385 
1386  if (!reply_message)
1387  {
1389 
1390  reply_message = makeStringInfo();
1391  MemoryContextSwitchTo(oldctx);
1392  }
1393  else
1394  resetStringInfo(reply_message);
1395 
1396  pq_sendbyte(reply_message, 'r');
1397  pq_sendint64(reply_message, recvpos); /* write */
1398  pq_sendint64(reply_message, flushpos); /* flush */
1399  pq_sendint64(reply_message, writepos); /* apply */
1400  pq_sendint64(reply_message, now); /* sendTime */
1401  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1402 
1403  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1404  force,
1405  (uint32) (recvpos >> 32), (uint32) recvpos,
1406  (uint32) (writepos >> 32), (uint32) writepos,
1407  (uint32) (flushpos >> 32), (uint32) flushpos
1408  );
1409 
1410  walrcv_send(wrconn, reply_message->data, reply_message->len);
1411 
1412  if (recvpos > last_recvpos)
1413  last_recvpos = recvpos;
1414  if (writepos > last_writepos)
1415  last_writepos = writepos;
1416  if (flushpos > last_flushpos)
1417  last_flushpos = flushpos;
1418 }
WalReceiverConn * wrconn
Definition: worker.c:114
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
int wal_receiver_status_interval
Definition: walreceiver.c:75
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1023
static StringInfoData reply_message
Definition: walreceiver.c:112
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:325
MemoryContext ApplyContext
Definition: worker.c:112
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:273
#define elog
Definition: elog.h:219
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 145 of file worker.c.

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

146 {
147  if (am_tablesync_worker())
148  return MyLogicalRepWorker->relid == rel->localreloid;
149  else
150  return (rel->state == SUBREL_STATE_READY ||
151  (rel->state == SUBREL_STATE_SYNCDONE &&
152  rel->statelsn <= remote_final_lsn));
153 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:120
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
static bool am_tablesync_worker(void)

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 231 of file worker.c.

References attnum, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, LogicalRepRelation::natts, tupleDesc::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

233 {
234  TupleDesc desc = RelationGetDescr(rel->localrel);
235  int num_phys_attrs = desc->natts;
236  int i;
237  int attnum,
238  num_defaults = 0;
239  int *defmap;
240  ExprState **defexprs;
241  ExprContext *econtext;
242 
243  econtext = GetPerTupleExprContext(estate);
244 
245  /* We got all the data via replication, no need to evaluate anything. */
246  if (num_phys_attrs == rel->remoterel.natts)
247  return;
248 
249  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
250  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
251 
252  for (attnum = 0; attnum < num_phys_attrs; attnum++)
253  {
254  Expr *defexpr;
255 
256  if (TupleDescAttr(desc, attnum)->attisdropped)
257  continue;
258 
259  if (rel->attrmap[attnum] >= 0)
260  continue;
261 
262  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
263 
264  if (defexpr != NULL)
265  {
266  /* Run the expression through planner */
267  defexpr = expression_planner(defexpr);
268 
269  /* Initialize executable expression in copycontext */
270  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
271  defmap[num_defaults] = attnum;
272  num_defaults++;
273  }
274 
275  }
276 
277  for (i = 0; i < num_defaults; i++)
278  slot->tts_values[defmap[i]] =
279  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
280 }
#define RelationGetDescr(relation)
Definition: rel.h:433
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
Expr * expression_planner(Expr *expr)
Definition: planner.c:5888
Datum * tts_values
Definition: tuptable.h:130
int natts
Definition: tupdesc.h:82
#define GetPerTupleExprContext(estate)
Definition: executor.h:489
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:132
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:281
Node * build_column_default(Relation rel, int attrno)
int16 attnum
Definition: pg_attribute.h:79
void * palloc(Size size)
Definition: mcxt.c:924
int i
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:119

◆ slot_modify_cstrings()

static void slot_modify_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values,
bool replaces 
)
static

Definition at line 389 of file worker.c.

References ErrorContextCallback::arg, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, tupleDesc::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_update().

391 {
392  int natts = slot->tts_tupleDescriptor->natts;
393  int i;
394  SlotErrCallbackArg errarg;
395  ErrorContextCallback errcallback;
396 
397  slot_getallattrs(slot);
398  ExecClearTuple(slot);
399 
400  /* Push callback + info on the error context stack */
401  errarg.rel = rel;
402  errarg.local_attnum = -1;
403  errarg.remote_attnum = -1;
404  errcallback.callback = slot_store_error_callback;
405  errcallback.arg = (void *) &errarg;
406  errcallback.previous = error_context_stack;
407  error_context_stack = &errcallback;
408 
409  /* Call the "in" function for each replaced attribute */
410  for (i = 0; i < natts; i++)
411  {
413  int remoteattnum = rel->attrmap[i];
414 
415  if (remoteattnum < 0)
416  continue;
417 
418  if (!replaces[remoteattnum])
419  continue;
420 
421  if (values[remoteattnum] != NULL)
422  {
423  Oid typinput;
424  Oid typioparam;
425 
426  errarg.local_attnum = i;
427  errarg.remote_attnum = remoteattnum;
428 
429  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
430  slot->tts_values[i] =
431  OidInputFunctionCall(typinput, values[remoteattnum],
432  typioparam, att->atttypmod);
433  slot->tts_isnull[i] = false;
434 
435  errarg.local_attnum = -1;
436  errarg.remote_attnum = -1;
437  }
438  else
439  {
440  slot->tts_values[i] = (Datum) 0;
441  slot->tts_isnull[i] = true;
442  }
443  }
444 
445  /* Pop the error context stack */
446  error_context_stack = errcallback.previous;
447 
448  ExecStoreVirtualTuple(slot);
449 }
LogicalRepRelMapEntry * rel
Definition: worker.c:106
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:475
Datum * tts_values
Definition: tuptable.h:130
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:239
struct ErrorContextCallback * previous
Definition: elog.h:238
int natts
Definition: tupdesc.h:82
ErrorContextCallback * error_context_stack
Definition: elog.c:88
bool * tts_isnull
Definition: tuptable.h:132
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2617
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1612
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:365
static void slot_store_error_callback(void *arg)
Definition: worker.c:286
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1823
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:524

◆ slot_store_cstrings()

static void slot_store_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values 
)
static

Definition at line 321 of file worker.c.

References ErrorContextCallback::arg, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, tupleDesc::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

323 {
324  int natts = slot->tts_tupleDescriptor->natts;
325  int i;
326  SlotErrCallbackArg errarg;
327  ErrorContextCallback errcallback;
328 
329  ExecClearTuple(slot);
330 
331  /* Push callback + info on the error context stack */
332  errarg.rel = rel;
333  errarg.local_attnum = -1;
334  errarg.remote_attnum = -1;
335  errcallback.callback = slot_store_error_callback;
336  errcallback.arg = (void *) &errarg;
337  errcallback.previous = error_context_stack;
338  error_context_stack = &errcallback;
339 
340  /* Call the "in" function for each non-dropped attribute */
341  for (i = 0; i < natts; i++)
342  {
344  int remoteattnum = rel->attrmap[i];
345 
346  if (!att->attisdropped && remoteattnum >= 0 &&
347  values[remoteattnum] != NULL)
348  {
349  Oid typinput;
350  Oid typioparam;
351 
352  errarg.local_attnum = i;
353  errarg.remote_attnum = remoteattnum;
354 
355  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
356  slot->tts_values[i] =
357  OidInputFunctionCall(typinput, values[remoteattnum],
358  typioparam, att->atttypmod);
359  slot->tts_isnull[i] = false;
360 
361  errarg.local_attnum = -1;
362  errarg.remote_attnum = -1;
363  }
364  else
365  {
366  /*
367  * We assign NULL to dropped attributes, NULL values, and missing
368  * values (missing values should be later filled using
369  * slot_fill_defaults).
370  */
371  slot->tts_values[i] = (Datum) 0;
372  slot->tts_isnull[i] = true;
373  }
374  }
375 
376  /* Pop the error context stack */
377  error_context_stack = errcallback.previous;
378 
379  ExecStoreVirtualTuple(slot);
380 }
LogicalRepRelMapEntry * rel
Definition: worker.c:106
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:475
Datum * tts_values
Definition: tuptable.h:130
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:239
struct ErrorContextCallback * previous
Definition: elog.h:238
int natts
Definition: tupdesc.h:82
ErrorContextCallback * error_context_stack
Definition: elog.c:88
bool * tts_isnull
Definition: tuptable.h:132
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2617
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:365
static void slot_store_error_callback(void *arg)
Definition: worker.c:286
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1823
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:524

◆ slot_store_error_callback()

static void slot_store_error_callback ( void *  arg)
static

Definition at line 286 of file worker.c.

References LogicalRepRelation::attnames, LogicalRepRelation::atttyps, errcontext, format_type_be(), get_atttype(), SlotErrCallbackArg::local_attnum, LogicalRepRelMapEntry::localreloid, logicalrep_typmap_gettypname(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, LogicalRepRelation::relname, SlotErrCallbackArg::remote_attnum, and LogicalRepRelMapEntry::remoterel.

Referenced by slot_modify_cstrings(), and slot_store_cstrings().

287 {
290  char *remotetypname;
291  Oid remotetypoid,
292  localtypoid;
293 
294  /* Nothing to do if remote attribute number is not set */
295  if (errarg->remote_attnum < 0)
296  return;
297 
298  rel = errarg->rel;
299  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
300 
301  /* Fetch remote type name from the LogicalRepTypMap cache */
302  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
303 
304  /* Fetch local type OID from the local sys cache */
305  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
306 
307  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
308  "remote type %s, local type %s",
309  rel->remoterel.nspname, rel->remoterel.relname,
310  rel->remoterel.attnames[errarg->remote_attnum],
311  remotetypname,
312  format_type_be(localtypoid));
313 }
LogicalRepRelMapEntry * rel
Definition: worker.c:106
char * format_type_be(Oid type_oid)
Definition: format_type.c:328
unsigned int Oid
Definition: postgres_ext.h:31
LogicalRepRelation remoterel
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:863
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:422
#define errcontext
Definition: elog.h:164
void * arg

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 1067 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit().

1068 {
1069  FlushPosition *flushpos;
1070 
1071  /* Need to do this in permanent context */
1073 
1074  /* Track commit lsn */
1075  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1076  flushpos->local_end = XactLastCommitEnd;
1077  flushpos->remote_end = remote_lsn;
1078 
1079  dlist_push_tail(&lsn_mapping, &flushpos->node);
1081 }
static dlist_head lsn_mapping
Definition: worker.c:102
dlist_node node
Definition: worker.c:97
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:340
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:99
MemoryContext ApplyContext
Definition: worker.c:112
XLogRecPtr local_end
Definition: worker.c:98
static MemoryContext ApplyMessageContext
Definition: worker.c:111
void * palloc(Size size)
Definition: mcxt.c:924

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1560 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

1561 {
1562  MySubscriptionValid = false;
1563 }
bool MySubscriptionValid
Definition: worker.c:117

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Definition at line 1086 of file worker.c.

References GetCurrentTimestamp(), LogicalRepWorker::last_lsn, last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, MyLogicalRepWorker, LogicalRepWorker::reply_lsn, and LogicalRepWorker::reply_time.

Referenced by LogicalRepApplyLoop().

1087 {
1089  MyLogicalRepWorker->last_send_time = send_time;
1091  if (reply)
1092  {
1094  MyLogicalRepWorker->reply_time = send_time;
1095  }
1096 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
TimestampTz last_send_time
XLogRecPtr last_lsn
XLogRecPtr last_lsn
Definition: walsender.c:214
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
XLogRecPtr reply_lsn
TimestampTz last_recv_time
TimestampTz reply_time

Variable Documentation

◆ ApplyContext

MemoryContext ApplyContext = NULL

Definition at line 112 of file worker.c.

◆ ApplyMessageContext

MemoryContext ApplyMessageContext = NULL
static

Definition at line 111 of file worker.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 129 of file worker.c.

Referenced by logicalrep_worker_sighup(), and LogicalRepApplyLoop().

◆ in_remote_transaction

bool in_remote_transaction = false

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 102 of file worker.c.

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false

Definition at line 117 of file worker.c.

Referenced by ApplyWorkerMain(), maybe_reread_subscription(), and subscription_change_cb().

◆ remote_final_lsn

XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
static

Definition at line 120 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), and should_apply_changes_for_rel().

◆ wrconn