PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 

Functions

static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static bool ensure_transaction (void)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
 
static void slot_modify_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void logicalrep_worker_sighup (SIGNAL_ARGS)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static volatile sig_atomic_t got_SIGHUP = false
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 79 of file worker.c.

Referenced by LogicalRepApplyLoop().

Typedef Documentation

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SlotErrCallbackArg

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 948 of file worker.c.

References generate_unaccent_rules::action, apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, and pq_getmsgbyte().

Referenced by LogicalRepApplyLoop().

949 {
950  char action = pq_getmsgbyte(s);
951 
952  switch (action)
953  {
954  /* BEGIN */
955  case 'B':
957  break;
958  /* COMMIT */
959  case 'C':
961  break;
962  /* INSERT */
963  case 'I':
965  break;
966  /* UPDATE */
967  case 'U':
969  break;
970  /* DELETE */
971  case 'D':
973  break;
974  /* TRUNCATE */
975  case 'T':
977  break;
978  /* RELATION */
979  case 'R':
981  break;
982  /* TYPE */
983  case 'Y':
985  break;
986  /* ORIGIN */
987  case 'O':
989  break;
990  default:
991  ereport(ERROR,
992  (errcode(ERRCODE_PROTOCOL_VIOLATION),
993  errmsg("invalid logical replication message type \"%c\"", action)));
994  }
995 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:538
static void apply_handle_insert(StringInfo s)
Definition: worker.c:568
int errcode(int sqlerrcode)
Definition: elog.c:570
#define ERROR
Definition: elog.h:43
static void apply_handle_delete(StringInfo s)
Definition: worker.c:787
static void apply_handle_begin(StringInfo s)
Definition: worker.c:438
#define ereport(elevel, rest)
Definition: elog.h:141
static void apply_handle_commit(StringInfo s)
Definition: worker.c:457
static void apply_handle_update(StringInfo s)
Definition: worker.c:665
static void apply_handle_relation(StringInfo s)
Definition: worker.c:523
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_handle_origin(StringInfo s)
Definition: worker.c:501
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:888

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 438 of file worker.c.

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), pgstat_report_activity(), remote_final_lsn, and STATE_RUNNING.

Referenced by apply_dispatch().

439 {
440  LogicalRepBeginData begin_data;
441 
442  logicalrep_read_begin(s, &begin_data);
443 
444  remote_final_lsn = begin_data.final_lsn;
445 
446  in_remote_transaction = true;
447 
449 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:106
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
bool in_remote_transaction
Definition: worker.c:105
XLogRecPtr final_lsn
Definition: logicalproto.h:66

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 457 of file worker.c.

References AcceptInvalidationMessages(), am_tablesync_worker(), Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), logicalrep_read_commit(), maybe_reread_subscription(), pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), remote_final_lsn, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, STATE_IDLE, and store_flush_position().

Referenced by apply_dispatch().

458 {
459  LogicalRepCommitData commit_data;
460 
461  logicalrep_read_commit(s, &commit_data);
462 
463  Assert(commit_data.commit_lsn == remote_final_lsn);
464 
465  /* The synchronization worker runs in single transaction. */
467  {
468  /*
469  * Update origin state so we can restart streaming from correct
470  * position in case of crash.
471  */
474 
476  pgstat_report_stat(false);
477 
478  store_flush_position(commit_data.end_lsn);
479  }
480  else
481  {
482  /* Process any invalidation messages that might have accumulated. */
485  }
486 
487  in_remote_transaction = false;
488 
489  /* Process any tables that are being synchronized in parallel. */
490  process_syncing_tables(commit_data.end_lsn);
491 
493 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1055
void AcceptInvalidationMessages(void)
Definition: inval.c:681
static XLogRecPtr remote_final_lsn
Definition: worker.c:106
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:536
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
void CommitTransactionCommand(void)
Definition: xact.c:2895
bool in_remote_transaction
Definition: worker.c:105
static bool am_tablesync_worker(void)
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
static void maybe_reread_subscription(void)
Definition: worker.c:1409
#define Assert(condition)
Definition: c.h:732
bool IsTransactionState(void)
Definition: xact.c:356
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
XLogRecPtr commit_lsn
Definition: logicalproto.h:73
TimestampTz committime
Definition: logicalproto.h:75
void pgstat_report_stat(bool force)
Definition: pgstat.c:813

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 787 of file worker.c.

References AfterTriggerEndQuery(), Assert, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationDelete(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_cstrings(), table_slot_create(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

788 {
790  LogicalRepTupleData oldtup;
791  LogicalRepRelId relid;
792  Oid idxoid;
793  EState *estate;
794  EPQState epqstate;
795  TupleTableSlot *remoteslot;
796  TupleTableSlot *localslot;
797  bool found;
798  MemoryContext oldctx;
799 
801 
802  relid = logicalrep_read_delete(s, &oldtup);
805  {
806  /*
807  * The relation can't become interesting in the middle of the
808  * transaction so it's safe to unlock it.
809  */
811  return;
812  }
813 
814  /* Check if we can do the delete. */
816 
817  /* Initialize the executor state. */
818  estate = create_estate_for_relation(rel);
819  remoteslot = ExecInitExtraTupleSlot(estate,
821  &TTSOpsVirtual);
822  localslot = table_slot_create(rel->localrel,
823  &estate->es_tupleTable);
824  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
825 
828 
829  /* Find the tuple using the replica identity index. */
831  slot_store_cstrings(remoteslot, rel, oldtup.values);
832  MemoryContextSwitchTo(oldctx);
833 
834  /*
835  * Try to find tuple using either replica identity index, primary key or
836  * if needed, sequential scan.
837  */
838  idxoid = GetRelationIdentityOrPK(rel->localrel);
839  Assert(OidIsValid(idxoid) ||
840  (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
841 
842  if (OidIsValid(idxoid))
843  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
845  remoteslot, localslot);
846  else
848  remoteslot, localslot);
849  /* If found delete it. */
850  if (found)
851  {
852  EvalPlanQualSetSlot(&epqstate, localslot);
853 
854  /* Do the actual delete. */
855  ExecSimpleRelationDelete(estate, &epqstate, localslot);
856  }
857  else
858  {
859  /* The tuple to be deleted could not be found. */
860  elog(DEBUG1,
861  "logical replication could not find row for delete "
862  "in replication target relation \"%s\"",
864  }
865 
866  /* Cleanup. */
869 
870  /* Handle queued AFTER triggers. */
871  AfterTriggerEndQuery(estate);
872 
873  EvalPlanQualEnd(&epqstate);
874  ExecResetTupleTable(estate->es_tupleTable, false);
875  FreeExecutorState(estate);
876 
878 
880 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
#define NIL
Definition: pg_list.h:65
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:176
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:552
#define DEBUG1
Definition: elog.h:25
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1796
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:280
#define RelationGetDescr(relation)
Definition: rel.h:445
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:638
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2953
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:147
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:453
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:631
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2493
List * es_tupleTable
Definition: execnodes.h:552
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1156
void CommandCounterIncrement(void)
Definition: xact.c:1003
#define Assert(condition)
Definition: c.h:732
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4805
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:226
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:522

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 568 of file worker.c.

References AfterTriggerEndQuery(), CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationInsert(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_cstrings(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

569 {
571  LogicalRepTupleData newtup;
572  LogicalRepRelId relid;
573  EState *estate;
574  TupleTableSlot *remoteslot;
575  MemoryContext oldctx;
576 
578 
579  relid = logicalrep_read_insert(s, &newtup);
582  {
583  /*
584  * The relation can't become interesting in the middle of the
585  * transaction so it's safe to unlock it.
586  */
588  return;
589  }
590 
591  /* Initialize the executor state. */
592  estate = create_estate_for_relation(rel);
593  remoteslot = ExecInitExtraTupleSlot(estate,
595  &TTSOpsVirtual);
596 
597  /* Input functions may need an active snapshot, so get one */
599 
600  /* Process and store remote tuple in the slot */
602  slot_store_cstrings(remoteslot, rel, newtup.values);
603  slot_fill_defaults(rel, estate, remoteslot);
604  MemoryContextSwitchTo(oldctx);
605 
607 
608  /* Do the insert. */
609  ExecSimpleRelationInsert(estate, remoteslot);
610 
611  /* Cleanup. */
614 
615  /* Handle queued AFTER triggers. */
616  AfterTriggerEndQuery(estate);
617 
618  ExecResetTupleTable(estate->es_tupleTable, false);
619  FreeExecutorState(estate);
620 
622 
624 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:176
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1796
#define RelationGetDescr(relation)
Definition: rel.h:445
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:214
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:147
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * es_tupleTable
Definition: execnodes.h:552
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1156
void CommandCounterIncrement(void)
Definition: xact.c:1003
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4805
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
uint32 LogicalRepRelId
Definition: logicalproto.h:39
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:522

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 501 of file worker.c.

References am_tablesync_worker(), ereport, errcode(), errmsg(), ERROR, in_remote_transaction, and IsTransactionState().

Referenced by apply_dispatch().

502 {
503  /*
504  * ORIGIN message can only come inside remote transaction and before any
505  * actual writes.
506  */
507  if (!in_remote_transaction ||
509  ereport(ERROR,
510  (errcode(ERRCODE_PROTOCOL_VIOLATION),
511  errmsg("ORIGIN message sent out of order")));
512 }
int errcode(int sqlerrcode)
Definition: elog.c:570
#define ERROR
Definition: elog.h:43
bool in_remote_transaction
Definition: worker.c:105
static bool am_tablesync_worker(void)
#define ereport(elevel, rest)
Definition: elog.h:141
bool IsTransactionState(void)
Definition: xact.c:356
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 523 of file worker.c.

References logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

524 {
525  LogicalRepRelation *rel;
526 
527  rel = logicalrep_read_rel(s);
529 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:379
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:154

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 888 of file worker.c.

References CommandCounterIncrement(), DROP_RESTRICT, ensure_transaction(), ExecuteTruncateGuts(), lappend(), lappend_oid(), lfirst, lfirst_oid, LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationIsLogicallyLogged, RowExclusiveLock, and should_apply_changes_for_rel().

Referenced by apply_dispatch().

889 {
890  bool cascade = false;
891  bool restart_seqs = false;
892  List *remote_relids = NIL;
893  List *remote_rels = NIL;
894  List *rels = NIL;
895  List *relids = NIL;
896  List *relids_logged = NIL;
897  ListCell *lc;
898 
900 
901  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
902 
903  foreach(lc, remote_relids)
904  {
905  LogicalRepRelId relid = lfirst_oid(lc);
907 
910  {
911  /*
912  * The relation can't become interesting in the middle of the
913  * transaction so it's safe to unlock it.
914  */
916  continue;
917  }
918 
919  remote_rels = lappend(remote_rels, rel);
920  rels = lappend(rels, rel->localrel);
921  relids = lappend_oid(relids, rel->localreloid);
923  relids_logged = lappend_oid(relids_logged, rel->localreloid);
924  }
925 
926  /*
927  * Even if we used CASCADE on the upstream master we explicitly default to
928  * replaying changes without further cascading. This might be later
929  * changeable with a user specified option.
930  */
931  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
932 
933  foreach(lc, remote_rels)
934  {
935  LogicalRepRelMapEntry *rel = lfirst(lc);
936 
938  }
939 
941 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
#define NIL
Definition: pg_list.h:65
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:329
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:591
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
static bool ensure_transaction(void)
Definition: worker.c:147
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1617
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * lappend(List *list, void *datum)
Definition: list.c:322
void CommandCounterIncrement(void)
Definition: xact.c:1003
#define lfirst(lc)
Definition: pg_list.h:190
Definition: pg_list.h:50
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 538 of file worker.c.

References logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

539 {
540  LogicalRepTyp typ;
541 
542  logicalrep_read_typ(s, &typ);
544 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:431
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:388

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 665 of file worker.c.

References AfterTriggerEndQuery(), Assert, LogicalRepTupleData::changed, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecCopySlot(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationUpdate(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_modify_cstrings(), slot_store_cstrings(), table_slot_create(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

666 {
668  LogicalRepRelId relid;
669  Oid idxoid;
670  EState *estate;
671  EPQState epqstate;
672  LogicalRepTupleData oldtup;
673  LogicalRepTupleData newtup;
674  bool has_oldtup;
675  TupleTableSlot *localslot;
676  TupleTableSlot *remoteslot;
677  bool found;
678  MemoryContext oldctx;
679 
681 
682  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
683  &newtup);
686  {
687  /*
688  * The relation can't become interesting in the middle of the
689  * transaction so it's safe to unlock it.
690  */
692  return;
693  }
694 
695  /* Check if we can do the update. */
697 
698  /* Initialize the executor state. */
699  estate = create_estate_for_relation(rel);
700  remoteslot = ExecInitExtraTupleSlot(estate,
702  &TTSOpsVirtual);
703  localslot = table_slot_create(rel->localrel,
704  &estate->es_tupleTable);
705  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
706 
709 
710  /* Build the search tuple. */
712  slot_store_cstrings(remoteslot, rel,
713  has_oldtup ? oldtup.values : newtup.values);
714  MemoryContextSwitchTo(oldctx);
715 
716  /*
717  * Try to find tuple using either replica identity index, primary key or
718  * if needed, sequential scan.
719  */
720  idxoid = GetRelationIdentityOrPK(rel->localrel);
721  Assert(OidIsValid(idxoid) ||
722  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
723 
724  if (OidIsValid(idxoid))
725  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
727  remoteslot, localslot);
728  else
730  remoteslot, localslot);
731 
732  ExecClearTuple(remoteslot);
733 
734  /*
735  * Tuple found.
736  *
737  * Note this will fail if there are other conflicting unique indexes.
738  */
739  if (found)
740  {
741  /* Process and store remote tuple in the slot */
743  ExecCopySlot(remoteslot, localslot);
744  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
745  MemoryContextSwitchTo(oldctx);
746 
747  EvalPlanQualSetSlot(&epqstate, remoteslot);
748 
749  /* Do the actual update. */
750  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
751  }
752  else
753  {
754  /*
755  * The tuple to be updated could not be found.
756  *
757  * TODO what to do here, change the log level to LOG perhaps?
758  */
759  elog(DEBUG1,
760  "logical replication did not find row for update "
761  "in replication target relation \"%s\"",
763  }
764 
765  /* Cleanup. */
768 
769  /* Handle queued AFTER triggers. */
770  AfterTriggerEndQuery(estate);
771 
772  EvalPlanQualEnd(&epqstate);
773  ExecResetTupleTable(estate->es_tupleTable, false);
774  FreeExecutorState(estate);
775 
777 
779 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:476
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
#define NIL
Definition: pg_list.h:65
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:176
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:552
#define DEBUG1
Definition: elog.h:25
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1796
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:426
#define RelationGetDescr(relation)
Definition: rel.h:445
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:638
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2953
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:147
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:453
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:631
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2493
List * es_tupleTable
Definition: execnodes.h:552
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1156
void CommandCounterIncrement(void)
Definition: xact.c:1003
#define Assert(condition)
Definition: c.h:732
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:214
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4805
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:372
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:226
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:522

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1566 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sighup(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

1567 {
1568  int worker_slot = DatumGetInt32(main_arg);
1569  MemoryContext oldctx;
1570  char originname[NAMEDATALEN];
1571  XLogRecPtr origin_startpos;
1572  char *myslotname;
1574 
1575  /* Attach to slot */
1576  logicalrep_worker_attach(worker_slot);
1577 
1578  /* Setup signal handling */
1580  pqsignal(SIGTERM, die);
1582 
1583  /*
1584  * We don't currently need any ResourceOwner in a walreceiver process, but
1585  * if we did, we could call CreateAuxProcessResourceOwner here.
1586  */
1587 
1588  /* Initialise stats to a sanish value */
1591 
1592  /* Load the libpq-specific functions */
1593  load_file("libpqwalreceiver", false);
1594 
1595  /* Run as replica session replication role. */
1596  SetConfigOption("session_replication_role", "replica",
1598 
1599  /* Connect to our database. */
1602  0);
1603 
1604  /* Load the subscription into persistent memory context. */
1606  "ApplyContext",
1610 
1612  if (!MySubscription)
1613  {
1614  ereport(LOG,
1615  (errmsg("logical replication apply worker for subscription %u will not "
1616  "start because the subscription was removed during startup",
1618  proc_exit(0);
1619  }
1620 
1621  MySubscriptionValid = true;
1622  MemoryContextSwitchTo(oldctx);
1623 
1624  if (!MySubscription->enabled)
1625  {
1626  ereport(LOG,
1627  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1628  "start because the subscription was disabled during startup",
1629  MySubscription->name)));
1630 
1631  proc_exit(0);
1632  }
1633 
1634  /* Setup synchronous commit according to the user's wishes */
1635  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1637 
1638  /* Keep us informed about subscription changes. */
1641  (Datum) 0);
1642 
1643  if (am_tablesync_worker())
1644  ereport(LOG,
1645  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1647  else
1648  ereport(LOG,
1649  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1650  MySubscription->name)));
1651 
1653 
1654  /* Connect to the origin and start the replication. */
1655  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1657 
1658  if (am_tablesync_worker())
1659  {
1660  char *syncslotname;
1661 
1662  /* This is table synchronization worker, call initial sync. */
1663  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1664 
1665  /* The slot name needs to be allocated in permanent memory context. */
1667  myslotname = pstrdup(syncslotname);
1668  MemoryContextSwitchTo(oldctx);
1669 
1670  pfree(syncslotname);
1671  }
1672  else
1673  {
1674  /* This is main apply worker */
1675  RepOriginId originid;
1676  TimeLineID startpointTLI;
1677  char *err;
1678 
1679  myslotname = MySubscription->slotname;
1680 
1681  /*
1682  * This shouldn't happen if the subscription is enabled, but guard
1683  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1684  * crash if slot is NULL.)
1685  */
1686  if (!myslotname)
1687  ereport(ERROR,
1688  (errmsg("subscription has no replication slot set")));
1689 
1690  /* Setup replication origin tracking. */
1692  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1693  originid = replorigin_by_name(originname, true);
1694  if (!OidIsValid(originid))
1695  originid = replorigin_create(originname);
1696  replorigin_session_setup(originid);
1697  replorigin_session_origin = originid;
1698  origin_startpos = replorigin_session_get_progress(false);
1700 
1702  &err);
1703  if (wrconn == NULL)
1704  ereport(ERROR,
1705  (errmsg("could not connect to the publisher: %s", err)));
1706 
1707  /*
1708  * We don't really use the output identify_system for anything but it
1709  * does some initializations on the upstream so let's still call it.
1710  */
1711  (void) walrcv_identify_system(wrconn, &startpointTLI);
1712 
1713  }
1714 
1715  /*
1716  * Setup callback for syscache so that we know when something changes in
1717  * the subscription relation state.
1718  */
1721  (Datum) 0);
1722 
1723  /* Build logical replication streaming options. */
1724  options.logical = true;
1725  options.startpoint = origin_startpos;
1726  options.slotname = myslotname;
1727  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1728  options.proto.logical.publication_names = MySubscription->publications;
1729 
1730  /* Start normal logical streaming replication. */
1731  walrcv_startstreaming(wrconn, &options);
1732 
1733  /* Run the main loop. */
1734  LogicalRepApplyLoop(origin_startpos);
1735 
1736  proc_exit(0);
1737 }
Subscription * MySubscription
Definition: worker.c:102
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:100
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:52
#define DatumGetInt32(X)
Definition: postgres.h:472
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:263
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2895
union WalRcvStreamOptions::@106 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:269
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1192
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1057
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:805
#define OidIsValid(objectId)
Definition: c.h:638
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1545
void pfree(void *pointer)
Definition: mcxt.c:1056
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
void logicalrep_worker_attach(int slot)
Definition: launcher.c:637
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7487
#define SIGHUP
Definition: win32_port.h:154
XLogRecPtr startpoint
Definition: walreceiver.h:153
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:243
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext TopMemoryContext
Definition: mcxt.c:44
MemoryContext ApplyContext
Definition: worker.c:98
static char ** options
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1552
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
uintptr_t Datum
Definition: postgres.h:367
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5708
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:156
void StartTransactionCommand(void)
Definition: xact.c:2794
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
bool MySubscriptionValid
Definition: worker.c:103
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1090
#define elog(elevel,...)
Definition: elog.h:226
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define snprintf
Definition: port.h:192
#define die(msg)
Definition: pg_test_fsync.c:97
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:263
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5737
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 631 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

632 {
633  /* Updatable, no error. */
634  if (rel->updatable)
635  return;
636 
637  /*
638  * We are in error mode so it's fine this is somewhat slow. It's better to
639  * give user correct error.
640  */
642  {
643  ereport(ERROR,
644  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
645  errmsg("publisher did not send replica identity column "
646  "expected by the logical replication target relation \"%s.%s\"",
647  rel->remoterel.nspname, rel->remoterel.relname)));
648  }
649 
650  ereport(ERROR,
651  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652  errmsg("logical replication target relation \"%s.%s\" has "
653  "neither REPLICA IDENTITY index nor PRIMARY "
654  "KEY and published relation does not have "
655  "REPLICA IDENTITY FULL",
656  rel->remoterel.nspname, rel->remoterel.relname)));
657 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:552
int errcode(int sqlerrcode)
Definition: elog.c:570
#define OidIsValid(objectId)
Definition: c.h:638
#define ERROR
Definition: elog.h:43
LogicalRepRelation remoterel
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ create_estate_for_relation()

static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 176 of file worker.c.

References AccessShareLock, AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_num_result_relations, EState::es_output_cid, EState::es_result_relation_info, EState::es_result_relations, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

177 {
178  EState *estate;
179  ResultRelInfo *resultRelInfo;
180  RangeTblEntry *rte;
181 
182  estate = CreateExecutorState();
183 
184  rte = makeNode(RangeTblEntry);
185  rte->rtekind = RTE_RELATION;
186  rte->relid = RelationGetRelid(rel->localrel);
187  rte->relkind = rel->localrel->rd_rel->relkind;
189  ExecInitRangeTable(estate, list_make1(rte));
190 
191  resultRelInfo = makeNode(ResultRelInfo);
192  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
193 
194  estate->es_result_relations = resultRelInfo;
195  estate->es_num_result_relations = 1;
196  estate->es_result_relation_info = resultRelInfo;
197 
198  estate->es_output_cid = GetCurrentCommandId(true);
199 
200  /* Prepare to catch AFTER triggers. */
202 
203  return estate;
204 }
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:724
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
CommandId es_output_cid
Definition: execnodes.h:517
#define AccessShareLock
Definition: lockdefs.h:36
Form_pg_class rd_rel
Definition: rel.h:83
#define list_make1(x1)
Definition: pg_list.h:227
ResultRelInfo * es_result_relations
Definition: execnodes.h:520
EState * CreateExecutorState(void)
Definition: execUtils.c:88
int es_num_result_relations
Definition: execnodes.h:521
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4785
#define makeNode(_type_)
Definition: nodes.h:573
RTEKind rtekind
Definition: parsenodes.h:974
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:746
#define RelationGetRelid(relation)
Definition: rel.h:419
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:522

◆ ensure_transaction()

static bool ensure_transaction ( void  )
static

Definition at line 147 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), maybe_reread_subscription(), MemoryContextSwitchTo(), SetCurrentStatementStartTimestamp(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

148 {
149  if (IsTransactionState())
150  {
152 
155 
156  return false;
157  }
158 
161 
163 
165  return true;
166 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void maybe_reread_subscription(void)
Definition: worker.c:1409
static MemoryContext ApplyMessageContext
Definition: worker.c:97
void StartTransactionCommand(void)
Definition: xact.c:2794
bool IsTransactionState(void)
Definition: xact.c:356
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:818

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 1011 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, FlushPosition::node, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

1013 {
1014  dlist_mutable_iter iter;
1015  XLogRecPtr local_flush = GetFlushRecPtr();
1016 
1018  *flush = InvalidXLogRecPtr;
1019 
1021  {
1022  FlushPosition *pos =
1023  dlist_container(FlushPosition, node, iter.cur);
1024 
1025  *write = pos->remote_end;
1026 
1027  if (pos->local_end <= local_flush)
1028  {
1029  *flush = pos->remote_end;
1030  dlist_delete(iter.cur);
1031  pfree(pos);
1032  }
1033  else
1034  {
1035  /*
1036  * Don't want to uselessly iterate over the rest of the list which
1037  * could potentially be long. Instead get the last element and
1038  * grab the write position from there.
1039  */
1040  pos = dlist_tail_element(FlushPosition, node,
1041  &lsn_mapping);
1042  *write = pos->remote_end;
1043  *have_pending_txes = true;
1044  return;
1045  }
1046  }
1047 
1048  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1049 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:88
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:85
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8249
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:84
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 552 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by apply_handle_delete(), apply_handle_update(), and check_relation_updatable().

553 {
554  Oid idxoid;
555 
556  idxoid = RelationGetReplicaIndex(rel);
557 
558  if (!OidIsValid(idxoid))
559  idxoid = RelationGetPrimaryKeyIndex(rel);
560 
561  return idxoid;
562 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4557
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:638
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4536

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 1743 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

1744 {
1745  return MyLogicalRepWorker != NULL;
1746 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64

◆ logicalrep_worker_sighup()

static void logicalrep_worker_sighup ( SIGNAL_ARGS  )
static

Definition at line 1552 of file worker.c.

References got_SIGHUP, MyLatch, and SetLatch().

Referenced by ApplyWorkerMain().

1553 {
1554  int save_errno = errno;
1555 
1556  got_SIGHUP = true;
1557 
1558  /* Waken anything waiting on the process latch */
1559  SetLatch(MyLatch);
1560 
1561  errno = save_errno;
1562 }
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:115

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 1090 of file worker.c.

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), got_SIGHUP, in_remote_transaction, StringInfoData::len, LOG, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

1091 {
1092  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1093 
1094  /*
1095  * Init the ApplyMessageContext which we clean up after each replication
1096  * protocol message.
1097  */
1099  "ApplyMessageContext",
1101 
1102  /* mark as idle, before starting to loop */
1104 
1105  for (;;)
1106  {
1108  int rc;
1109  int len;
1110  char *buf = NULL;
1111  bool endofstream = false;
1112  bool ping_sent = false;
1113  long wait_time;
1114 
1116 
1118 
1119  len = walrcv_receive(wrconn, &buf, &fd);
1120 
1121  if (len != 0)
1122  {
1123  /* Process the data */
1124  for (;;)
1125  {
1127 
1128  if (len == 0)
1129  {
1130  break;
1131  }
1132  else if (len < 0)
1133  {
1134  ereport(LOG,
1135  (errmsg("data stream from publisher has ended")));
1136  endofstream = true;
1137  break;
1138  }
1139  else
1140  {
1141  int c;
1142  StringInfoData s;
1143 
1144  /* Reset timeout. */
1145  last_recv_timestamp = GetCurrentTimestamp();
1146  ping_sent = false;
1147 
1148  /* Ensure we are reading the data into our memory context. */
1150 
1151  s.data = buf;
1152  s.len = len;
1153  s.cursor = 0;
1154  s.maxlen = -1;
1155 
1156  c = pq_getmsgbyte(&s);
1157 
1158  if (c == 'w')
1159  {
1160  XLogRecPtr start_lsn;
1161  XLogRecPtr end_lsn;
1162  TimestampTz send_time;
1163 
1164  start_lsn = pq_getmsgint64(&s);
1165  end_lsn = pq_getmsgint64(&s);
1166  send_time = pq_getmsgint64(&s);
1167 
1168  if (last_received < start_lsn)
1169  last_received = start_lsn;
1170 
1171  if (last_received < end_lsn)
1172  last_received = end_lsn;
1173 
1174  UpdateWorkerStats(last_received, send_time, false);
1175 
1176  apply_dispatch(&s);
1177  }
1178  else if (c == 'k')
1179  {
1180  XLogRecPtr end_lsn;
1182  bool reply_requested;
1183 
1184  end_lsn = pq_getmsgint64(&s);
1185  timestamp = pq_getmsgint64(&s);
1186  reply_requested = pq_getmsgbyte(&s);
1187 
1188  if (last_received < end_lsn)
1189  last_received = end_lsn;
1190 
1191  send_feedback(last_received, reply_requested, false);
1192  UpdateWorkerStats(last_received, timestamp, true);
1193  }
1194  /* other message types are purposefully ignored */
1195 
1197  }
1198 
1199  len = walrcv_receive(wrconn, &buf, &fd);
1200  }
1201  }
1202 
1203  /* confirm all writes so far */
1204  send_feedback(last_received, false, false);
1205 
1206  if (!in_remote_transaction)
1207  {
1208  /*
1209  * If we didn't get any transactions for a while there might be
1210  * unconsumed invalidation messages in the queue, consume them
1211  * now.
1212  */
1215 
1216  /* Process any table synchronization changes. */
1217  process_syncing_tables(last_received);
1218  }
1219 
1220  /* Cleanup the memory. */
1223 
1224  /* Check if we need to exit the streaming loop. */
1225  if (endofstream)
1226  {
1227  TimeLineID tli;
1228 
1229  walrcv_endstreaming(wrconn, &tli);
1230  break;
1231  }
1232 
1233  /*
1234  * Wait for more data or latch. If we have unflushed transactions,
1235  * wake up after WalWriterDelay to see if they've been flushed yet (in
1236  * which case we should send a feedback message). Otherwise, there's
1237  * no particular urgency about waking up unless we get data or a
1238  * signal.
1239  */
1240  if (!dlist_is_empty(&lsn_mapping))
1241  wait_time = WalWriterDelay;
1242  else
1243  wait_time = NAPTIME_PER_CYCLE;
1244 
1248  fd, wait_time,
1250 
1251  if (rc & WL_LATCH_SET)
1252  {
1255  }
1256 
1257  if (got_SIGHUP)
1258  {
1259  got_SIGHUP = false;
1261  }
1262 
1263  if (rc & WL_TIMEOUT)
1264  {
1265  /*
1266  * We didn't receive anything new. If we haven't heard anything
1267  * from the server for more than wal_receiver_timeout / 2, ping
1268  * the server. Also, if it's been longer than
1269  * wal_receiver_status_interval since the last update we sent,
1270  * send a status update to the master anyway, to report any
1271  * progress in applying WAL.
1272  */
1273  bool requestReply = false;
1274 
1275  /*
1276  * Check if time since last receive from standby has reached the
1277  * configured limit.
1278  */
1279  if (wal_receiver_timeout > 0)
1280  {
1282  TimestampTz timeout;
1283 
1284  timeout =
1285  TimestampTzPlusMilliseconds(last_recv_timestamp,
1287 
1288  if (now >= timeout)
1289  ereport(ERROR,
1290  (errmsg("terminating logical replication worker due to timeout")));
1291 
1292  /*
1293  * We didn't receive anything new, for half of receiver
1294  * replication timeout. Ping the server.
1295  */
1296  if (!ping_sent)
1297  {
1298  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1299  (wal_receiver_timeout / 2));
1300  if (now >= timeout)
1301  {
1302  requestReply = true;
1303  ping_sent = true;
1304  }
1305  }
1306  }
1307 
1308  send_feedback(last_received, requestReply, requestReply);
1309  }
1310  }
1311 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1320
WalReceiverConn * wrconn
Definition: worker.c:100
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:271
uint32 TimeLineID
Definition: xlogdefs.h:52
void AcceptInvalidationMessages(void)
Definition: inval.c:681
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static dlist_head lsn_mapping
Definition: worker.c:88
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:536
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1074
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:273
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ResetLatch(Latch *latch)
Definition: latch.c:519
int wal_receiver_timeout
Definition: walreceiver.c:76
#define ERROR
Definition: elog.h:43
#define NAPTIME_PER_CYCLE
Definition: worker.c:79
bool in_remote_transaction
Definition: worker.c:105
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * c
static char * buf
Definition: pg_test_fsync.c:68
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
int pgsocket
Definition: port.h:31
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext TopMemoryContext
Definition: mcxt.c:44
Definition: guc.h:72
MemoryContext ApplyContext
Definition: worker.c:98
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define PGINVALID_SOCKET
Definition: port.h:33
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_dispatch(StringInfo s)
Definition: worker.c:948
static void maybe_reread_subscription(void)
Definition: worker.c:1409
static MemoryContext ApplyMessageContext
Definition: worker.c:97
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
int WalWriterDelay
Definition: walwriter.c:68
int errmsg(const char *fmt,...)
Definition: elog.c:784
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:115

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 1409 of file worker.c.

References Assert, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), LogicalRepWorker::subid, and Subscription::synccommit.

Referenced by apply_handle_commit(), ensure_transaction(), and LogicalRepApplyLoop().

1410 {
1411  MemoryContext oldctx;
1413  bool started_tx = false;
1414 
1415  /* When cache state is valid there is nothing to do here. */
1416  if (MySubscriptionValid)
1417  return;
1418 
1419  /* This function might be called inside or outside of transaction. */
1420  if (!IsTransactionState())
1421  {
1423  started_tx = true;
1424  }
1425 
1426  /* Ensure allocations in permanent context. */
1428 
1429  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1430 
1431  /*
1432  * Exit if the subscription was removed. This normally should not happen
1433  * as the worker gets killed during DROP SUBSCRIPTION.
1434  */
1435  if (!newsub)
1436  {
1437  ereport(LOG,
1438  (errmsg("logical replication apply worker for subscription \"%s\" will "
1439  "stop because the subscription was removed",
1440  MySubscription->name)));
1441 
1442  proc_exit(0);
1443  }
1444 
1445  /*
1446  * Exit if the subscription was disabled. This normally should not happen
1447  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1448  */
1449  if (!newsub->enabled)
1450  {
1451  ereport(LOG,
1452  (errmsg("logical replication apply worker for subscription \"%s\" will "
1453  "stop because the subscription was disabled",
1454  MySubscription->name)));
1455 
1456  proc_exit(0);
1457  }
1458 
1459  /*
1460  * Exit if connection string was changed. The launcher will start new
1461  * worker.
1462  */
1463  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1464  {
1465  ereport(LOG,
1466  (errmsg("logical replication apply worker for subscription \"%s\" will "
1467  "restart because the connection information was changed",
1468  MySubscription->name)));
1469 
1470  proc_exit(0);
1471  }
1472 
1473  /*
1474  * Exit if subscription name was changed (it's used for
1475  * fallback_application_name). The launcher will start new worker.
1476  */
1477  if (strcmp(newsub->name, MySubscription->name) != 0)
1478  {
1479  ereport(LOG,
1480  (errmsg("logical replication apply worker for subscription \"%s\" will "
1481  "restart because subscription was renamed",
1482  MySubscription->name)));
1483 
1484  proc_exit(0);
1485  }
1486 
1487  /* !slotname should never happen when enabled is true. */
1488  Assert(newsub->slotname);
1489 
1490  /*
1491  * We need to make new connection to new slot if slot name has changed so
1492  * exit here as well if that's the case.
1493  */
1494  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1495  {
1496  ereport(LOG,
1497  (errmsg("logical replication apply worker for subscription \"%s\" will "
1498  "restart because the replication slot name was changed",
1499  MySubscription->name)));
1500 
1501  proc_exit(0);
1502  }
1503 
1504  /*
1505  * Exit if publication list was changed. The launcher will start new
1506  * worker.
1507  */
1509  {
1510  ereport(LOG,
1511  (errmsg("logical replication apply worker for subscription \"%s\" will "
1512  "restart because subscription's publications were changed",
1513  MySubscription->name)));
1514 
1515  proc_exit(0);
1516  }
1517 
1518  /* Check for other changes that should never happen too. */
1519  if (newsub->dbid != MySubscription->dbid)
1520  {
1521  elog(ERROR, "subscription %u changed unexpectedly",
1523  }
1524 
1525  /* Clean old subscription info and switch to new one. */
1528 
1529  MemoryContextSwitchTo(oldctx);
1530 
1531  /* Change synchronous commit according to the user's wishes */
1532  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1534 
1535  if (started_tx)
1537 
1538  MySubscriptionValid = true;
1539 }
Subscription * MySubscription
Definition: worker.c:102
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3008
void CommitTransactionCommand(void)
Definition: xact.c:2895
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:104
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64
#define ERROR
Definition: elog.h:43
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7487
List * publications
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext ApplyContext
Definition: worker.c:98
#define Assert(condition)
Definition: c.h:732
void StartTransactionCommand(void)
Definition: xact.c:2794
bool IsTransactionState(void)
Definition: xact.c:356
bool MySubscriptionValid
Definition: worker.c:103
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 1320 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

1321 {
1322  static StringInfo reply_message = NULL;
1323  static TimestampTz send_time = 0;
1324 
1325  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1326  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1327  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1328 
1329  XLogRecPtr writepos;
1330  XLogRecPtr flushpos;
1331  TimestampTz now;
1332  bool have_pending_txes;
1333 
1334  /*
1335  * If the user doesn't want status to be reported to the publisher, be
1336  * sure to exit before doing anything at all.
1337  */
1338  if (!force && wal_receiver_status_interval <= 0)
1339  return;
1340 
1341  /* It's legal to not pass a recvpos */
1342  if (recvpos < last_recvpos)
1343  recvpos = last_recvpos;
1344 
1345  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1346 
1347  /*
1348  * No outstanding transactions to flush, we can report the latest received
1349  * position. This is important for synchronous replication.
1350  */
1351  if (!have_pending_txes)
1352  flushpos = writepos = recvpos;
1353 
1354  if (writepos < last_writepos)
1355  writepos = last_writepos;
1356 
1357  if (flushpos < last_flushpos)
1358  flushpos = last_flushpos;
1359 
1360  now = GetCurrentTimestamp();
1361 
1362  /* if we've already reported everything we're good */
1363  if (!force &&
1364  writepos == last_writepos &&
1365  flushpos == last_flushpos &&
1366  !TimestampDifferenceExceeds(send_time, now,
1368  return;
1369  send_time = now;
1370 
1371  if (!reply_message)
1372  {
1374 
1375  reply_message = makeStringInfo();
1376  MemoryContextSwitchTo(oldctx);
1377  }
1378  else
1379  resetStringInfo(reply_message);
1380 
1381  pq_sendbyte(reply_message, 'r');
1382  pq_sendint64(reply_message, recvpos); /* write */
1383  pq_sendint64(reply_message, flushpos); /* flush */
1384  pq_sendint64(reply_message, writepos); /* apply */
1385  pq_sendint64(reply_message, now); /* sendTime */
1386  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1387 
1388  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1389  force,
1390  (uint32) (recvpos >> 32), (uint32) recvpos,
1391  (uint32) (writepos >> 32), (uint32) writepos,
1392  (uint32) (flushpos >> 32), (uint32) flushpos
1393  );
1394 
1395  walrcv_send(wrconn, reply_message->data, reply_message->len);
1396 
1397  if (recvpos > last_recvpos)
1398  last_recvpos = recvpos;
1399  if (writepos > last_writepos)
1400  last_writepos = writepos;
1401  if (flushpos > last_flushpos)
1402  last_flushpos = flushpos;
1403 }
WalReceiverConn * wrconn
Definition: worker.c:100
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:75
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1011
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:112
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:358
MemoryContext ApplyContext
Definition: worker.c:98
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:275
#define elog(elevel,...)
Definition: elog.h:226
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 131 of file worker.c.

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

132 {
133  if (am_tablesync_worker())
134  return MyLogicalRepWorker->relid == rel->localreloid;
135  else
136  return (rel->state == SUBREL_STATE_READY ||
137  (rel->state == SUBREL_STATE_SYNCDONE &&
138  rel->statelsn <= remote_final_lsn));
139 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:106
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64
static bool am_tablesync_worker(void)

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 214 of file worker.c.

References attnum, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, LogicalRepRelation::natts, TupleDescData::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

216 {
217  TupleDesc desc = RelationGetDescr(rel->localrel);
218  int num_phys_attrs = desc->natts;
219  int i;
220  int attnum,
221  num_defaults = 0;
222  int *defmap;
223  ExprState **defexprs;
224  ExprContext *econtext;
225 
226  econtext = GetPerTupleExprContext(estate);
227 
228  /* We got all the data via replication, no need to evaluate anything. */
229  if (num_phys_attrs == rel->remoterel.natts)
230  return;
231 
232  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
233  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
234 
235  for (attnum = 0; attnum < num_phys_attrs; attnum++)
236  {
237  Expr *defexpr;
238 
239  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
240  continue;
241 
242  if (rel->attrmap[attnum] >= 0)
243  continue;
244 
245  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
246 
247  if (defexpr != NULL)
248  {
249  /* Run the expression through planner */
250  defexpr = expression_planner(defexpr);
251 
252  /* Initialize executable expression in copycontext */
253  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
254  defmap[num_defaults] = attnum;
255  num_defaults++;
256  }
257 
258  }
259 
260  for (i = 0; i < num_defaults; i++)
261  slot->tts_values[defmap[i]] =
262  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
263 }
#define RelationGetDescr(relation)
Definition: rel.h:445
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Expr * expression_planner(Expr *expr)
Definition: planner.c:6046
Datum * tts_values
Definition: tuptable.h:126
#define GetPerTupleExprContext(estate)
Definition: executor.h:501
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:285
Node * build_column_default(Relation rel, int attrno)
int16 attnum
Definition: pg_attribute.h:79
void * palloc(Size size)
Definition: mcxt.c:949
int i
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:121

◆ slot_modify_cstrings()

static void slot_modify_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values,
bool replaces 
)
static

Definition at line 372 of file worker.c.

References ErrorContextCallback::arg, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_update().

374 {
375  int natts = slot->tts_tupleDescriptor->natts;
376  int i;
377  SlotErrCallbackArg errarg;
378  ErrorContextCallback errcallback;
379 
380  slot_getallattrs(slot);
381  ExecClearTuple(slot);
382 
383  /* Push callback + info on the error context stack */
384  errarg.rel = rel;
385  errarg.local_attnum = -1;
386  errarg.remote_attnum = -1;
387  errcallback.callback = slot_store_error_callback;
388  errcallback.arg = (void *) &errarg;
389  errcallback.previous = error_context_stack;
390  error_context_stack = &errcallback;
391 
392  /* Call the "in" function for each replaced attribute */
393  for (i = 0; i < natts; i++)
394  {
396  int remoteattnum = rel->attrmap[i];
397 
398  if (remoteattnum < 0)
399  continue;
400 
401  if (!replaces[remoteattnum])
402  continue;
403 
404  if (values[remoteattnum] != NULL)
405  {
406  Oid typinput;
407  Oid typioparam;
408 
409  errarg.local_attnum = i;
410  errarg.remote_attnum = remoteattnum;
411 
412  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
413  slot->tts_values[i] =
414  OidInputFunctionCall(typinput, values[remoteattnum],
415  typioparam, att->atttypmod);
416  slot->tts_isnull[i] = false;
417 
418  errarg.local_attnum = -1;
419  errarg.remote_attnum = -1;
420  }
421  else
422  {
423  slot->tts_values[i] = (Datum) 0;
424  slot->tts_isnull[i] = true;
425  }
426  }
427 
428  /* Pop the error context stack */
429  error_context_stack = errcallback.previous;
430 
431  ExecStoreVirtualTuple(slot);
432 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:426
LogicalRepRelMapEntry * rel
Definition: worker.c:92
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:254
struct ErrorContextCallback * previous
Definition: elog.h:253
ErrorContextCallback * error_context_stack
Definition: elog.c:88
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:355
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
static void slot_store_error_callback(void *arg)
Definition: worker.c:269
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1517

◆ slot_store_cstrings()

static void slot_store_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values 
)
static

Definition at line 304 of file worker.c.

References ErrorContextCallback::arg, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

306 {
307  int natts = slot->tts_tupleDescriptor->natts;
308  int i;
309  SlotErrCallbackArg errarg;
310  ErrorContextCallback errcallback;
311 
312  ExecClearTuple(slot);
313 
314  /* Push callback + info on the error context stack */
315  errarg.rel = rel;
316  errarg.local_attnum = -1;
317  errarg.remote_attnum = -1;
318  errcallback.callback = slot_store_error_callback;
319  errcallback.arg = (void *) &errarg;
320  errcallback.previous = error_context_stack;
321  error_context_stack = &errcallback;
322 
323  /* Call the "in" function for each non-dropped attribute */
324  for (i = 0; i < natts; i++)
325  {
327  int remoteattnum = rel->attrmap[i];
328 
329  if (!att->attisdropped && remoteattnum >= 0 &&
330  values[remoteattnum] != NULL)
331  {
332  Oid typinput;
333  Oid typioparam;
334 
335  errarg.local_attnum = i;
336  errarg.remote_attnum = remoteattnum;
337 
338  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
339  slot->tts_values[i] =
340  OidInputFunctionCall(typinput, values[remoteattnum],
341  typioparam, att->atttypmod);
342  slot->tts_isnull[i] = false;
343 
344  errarg.local_attnum = -1;
345  errarg.remote_attnum = -1;
346  }
347  else
348  {
349  /*
350  * We assign NULL to dropped attributes, NULL values, and missing
351  * values (missing values should be later filled using
352  * slot_fill_defaults).
353  */
354  slot->tts_values[i] = (Datum) 0;
355  slot->tts_isnull[i] = true;
356  }
357  }
358 
359  /* Pop the error context stack */
360  error_context_stack = errcallback.previous;
361 
362  ExecStoreVirtualTuple(slot);
363 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:426
LogicalRepRelMapEntry * rel
Definition: worker.c:92
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:254
struct ErrorContextCallback * previous
Definition: elog.h:253
ErrorContextCallback * error_context_stack
Definition: elog.c:88
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
static void slot_store_error_callback(void *arg)
Definition: worker.c:269
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1517

◆ slot_store_error_callback()

static void slot_store_error_callback ( void *  arg)
static

Definition at line 269 of file worker.c.

References LogicalRepRelation::attnames, LogicalRepRelation::atttyps, errcontext, format_type_be(), get_atttype(), SlotErrCallbackArg::local_attnum, LogicalRepRelMapEntry::localreloid, logicalrep_typmap_gettypname(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, LogicalRepRelation::relname, SlotErrCallbackArg::remote_attnum, and LogicalRepRelMapEntry::remoterel.

Referenced by slot_modify_cstrings(), and slot_store_cstrings().

270 {
273  char *remotetypname;
274  Oid remotetypoid,
275  localtypoid;
276 
277  /* Nothing to do if remote attribute number is not set */
278  if (errarg->remote_attnum < 0)
279  return;
280 
281  rel = errarg->rel;
282  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
283 
284  /* Fetch remote type name from the LogicalRepTypMap cache */
285  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
286 
287  /* Fetch local type OID from the local sys cache */
288  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
289 
290  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
291  "remote type %s, local type %s",
292  rel->remoterel.nspname, rel->remoterel.relname,
293  rel->remoterel.attnames[errarg->remote_attnum],
294  remotetypname,
295  format_type_be(localtypoid));
296 }
LogicalRepRelMapEntry * rel
Definition: worker.c:92
char * format_type_be(Oid type_oid)
Definition: format_type.c:326
unsigned int Oid
Definition: postgres_ext.h:31
LogicalRepRelation remoterel
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:861
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:422
#define errcontext
Definition: elog.h:183
void * arg

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 1055 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit().

1056 {
1057  FlushPosition *flushpos;
1058 
1059  /* Need to do this in permanent context */
1061 
1062  /* Track commit lsn */
1063  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1064  flushpos->local_end = XactLastCommitEnd;
1065  flushpos->remote_end = remote_lsn;
1066 
1067  dlist_push_tail(&lsn_mapping, &flushpos->node);
1069 }
static dlist_head lsn_mapping
Definition: worker.c:88
dlist_node node
Definition: worker.c:83
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:353
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:85
MemoryContext ApplyContext
Definition: worker.c:98
XLogRecPtr local_end
Definition: worker.c:84
static MemoryContext ApplyMessageContext
Definition: worker.c:97
void * palloc(Size size)
Definition: mcxt.c:949

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1545 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

1546 {
1547  MySubscriptionValid = false;
1548 }
bool MySubscriptionValid
Definition: worker.c:103

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Definition at line 1074 of file worker.c.

References GetCurrentTimestamp(), LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, MyLogicalRepWorker, LogicalRepWorker::reply_lsn, and LogicalRepWorker::reply_time.

Referenced by LogicalRepApplyLoop().

1075 {
1076  MyLogicalRepWorker->last_lsn = last_lsn;
1077  MyLogicalRepWorker->last_send_time = send_time;
1079  if (reply)
1080  {
1081  MyLogicalRepWorker->reply_lsn = last_lsn;
1082  MyLogicalRepWorker->reply_time = send_time;
1083  }
1084 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
TimestampTz last_send_time
XLogRecPtr last_lsn
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64
XLogRecPtr reply_lsn
TimestampTz last_recv_time
TimestampTz reply_time

Variable Documentation

◆ ApplyContext

MemoryContext ApplyContext = NULL

Definition at line 98 of file worker.c.

◆ ApplyMessageContext

MemoryContext ApplyMessageContext = NULL
static

Definition at line 97 of file worker.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 115 of file worker.c.

Referenced by logicalrep_worker_sighup(), and LogicalRepApplyLoop().

◆ in_remote_transaction

bool in_remote_transaction = false

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 88 of file worker.c.

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false

Definition at line 103 of file worker.c.

Referenced by ApplyWorkerMain(), maybe_reread_subscription(), and subscription_change_cb().

◆ remote_final_lsn

XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
static

Definition at line 106 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), and should_apply_changes_for_rel().

◆ wrconn