PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 

Functions

static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static bool ensure_transaction (void)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
 
static void slot_modify_cstrings (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 80 of file worker.c.

Referenced by LogicalRepApplyLoop().

Typedef Documentation

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SlotErrCallbackArg

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 981 of file worker.c.

References generate_unaccent_rules::action, apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, and pq_getmsgbyte().

Referenced by LogicalRepApplyLoop().

982 {
983  char action = pq_getmsgbyte(s);
984 
985  switch (action)
986  {
987  /* BEGIN */
988  case 'B':
990  break;
991  /* COMMIT */
992  case 'C':
994  break;
995  /* INSERT */
996  case 'I':
998  break;
999  /* UPDATE */
1000  case 'U':
1002  break;
1003  /* DELETE */
1004  case 'D':
1006  break;
1007  /* TRUNCATE */
1008  case 'T':
1010  break;
1011  /* RELATION */
1012  case 'R':
1014  break;
1015  /* TYPE */
1016  case 'Y':
1017  apply_handle_type(s);
1018  break;
1019  /* ORIGIN */
1020  case 'O':
1022  break;
1023  default:
1024  ereport(ERROR,
1025  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1026  errmsg("invalid logical replication message type \"%c\"", action)));
1027  }
1028 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:555
static void apply_handle_insert(StringInfo s)
Definition: worker.c:585
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
static void apply_handle_delete(StringInfo s)
Definition: worker.c:820
static void apply_handle_begin(StringInfo s)
Definition: worker.c:455
#define ereport(elevel, rest)
Definition: elog.h:141
static void apply_handle_commit(StringInfo s)
Definition: worker.c:474
static void apply_handle_update(StringInfo s)
Definition: worker.c:682
static void apply_handle_relation(StringInfo s)
Definition: worker.c:540
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_handle_origin(StringInfo s)
Definition: worker.c:518
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:921

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 455 of file worker.c.

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), pgstat_report_activity(), remote_final_lsn, and STATE_RUNNING.

Referenced by apply_dispatch().

456 {
457  LogicalRepBeginData begin_data;
458 
459  logicalrep_read_begin(s, &begin_data);
460 
461  remote_final_lsn = begin_data.final_lsn;
462 
463  in_remote_transaction = true;
464 
466 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:107
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3114
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
bool in_remote_transaction
Definition: worker.c:106
XLogRecPtr final_lsn
Definition: logicalproto.h:66

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 474 of file worker.c.

References AcceptInvalidationMessages(), am_tablesync_worker(), Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), logicalrep_read_commit(), maybe_reread_subscription(), pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), remote_final_lsn, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, STATE_IDLE, and store_flush_position().

Referenced by apply_dispatch().

475 {
476  LogicalRepCommitData commit_data;
477 
478  logicalrep_read_commit(s, &commit_data);
479 
480  Assert(commit_data.commit_lsn == remote_final_lsn);
481 
482  /* The synchronization worker runs in single transaction. */
484  {
485  /*
486  * Update origin state so we can restart streaming from correct
487  * position in case of crash.
488  */
491 
493  pgstat_report_stat(false);
494 
495  store_flush_position(commit_data.end_lsn);
496  }
497  else
498  {
499  /* Process any invalidation messages that might have accumulated. */
502  }
503 
504  in_remote_transaction = false;
505 
506  /* Process any tables that are being synchronized in parallel. */
507  process_syncing_tables(commit_data.end_lsn);
508 
510 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1088
void AcceptInvalidationMessages(void)
Definition: inval.c:681
static XLogRecPtr remote_final_lsn
Definition: worker.c:107
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3114
void CommitTransactionCommand(void)
Definition: xact.c:2898
bool in_remote_transaction
Definition: worker.c:106
static bool am_tablesync_worker(void)
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:153
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:154
static void maybe_reread_subscription(void)
Definition: worker.c:1442
#define Assert(condition)
Definition: c.h:739
bool IsTransactionState(void)
Definition: xact.c:355
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
XLogRecPtr commit_lsn
Definition: logicalproto.h:73
TimestampTz committime
Definition: logicalproto.h:75
void pgstat_report_stat(bool force)
Definition: pgstat.c:806

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 820 of file worker.c.

References AfterTriggerEndQuery(), Assert, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationDelete(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_cstrings(), table_slot_create(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

821 {
823  LogicalRepTupleData oldtup;
824  LogicalRepRelId relid;
825  Oid idxoid;
826  EState *estate;
827  EPQState epqstate;
828  TupleTableSlot *remoteslot;
829  TupleTableSlot *localslot;
830  bool found;
831  MemoryContext oldctx;
832 
834 
835  relid = logicalrep_read_delete(s, &oldtup);
838  {
839  /*
840  * The relation can't become interesting in the middle of the
841  * transaction so it's safe to unlock it.
842  */
844  return;
845  }
846 
847  /* Check if we can do the delete. */
849 
850  /* Initialize the executor state. */
851  estate = create_estate_for_relation(rel);
852  remoteslot = ExecInitExtraTupleSlot(estate,
854  &TTSOpsVirtual);
855  localslot = table_slot_create(rel->localrel,
856  &estate->es_tupleTable);
857  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
858 
861 
862  /* Find the tuple using the replica identity index. */
864  slot_store_cstrings(remoteslot, rel, oldtup.values);
865  MemoryContextSwitchTo(oldctx);
866 
867  /*
868  * Try to find tuple using either replica identity index, primary key or
869  * if needed, sequential scan.
870  */
871  idxoid = GetRelationIdentityOrPK(rel->localrel);
872  Assert(OidIsValid(idxoid) ||
873  (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
874 
875  if (OidIsValid(idxoid))
876  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
878  remoteslot, localslot);
879  else
881  remoteslot, localslot);
882  /* If found delete it. */
883  if (found)
884  {
885  EvalPlanQualSetSlot(&epqstate, localslot);
886 
887  /* Do the actual delete. */
888  ExecSimpleRelationDelete(estate, &epqstate, localslot);
889  }
890  else
891  {
892  /* The tuple to be deleted could not be found. */
893  elog(DEBUG1,
894  "logical replication could not find row for delete "
895  "in replication target relation \"%s\"",
897  }
898 
899  /* Cleanup. */
902 
903  /* Handle queued AFTER triggers. */
904  AfterTriggerEndQuery(estate);
905 
906  EvalPlanQualEnd(&epqstate);
907  ExecResetTupleTable(estate->es_tupleTable, false);
908  FreeExecutorState(estate);
909 
911 
913 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:129
#define NIL
Definition: pg_list.h:65
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:174
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:569
#define DEBUG1
Definition: elog.h:25
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:276
#define RelationGetDescr(relation)
Definition: rel.h:454
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:381
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:645
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2944
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:145
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:303
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:462
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:648
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2484
List * es_tupleTable
Definition: execnodes.h:551
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define Assert(condition)
Definition: c.h:739
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4805
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:228
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 585 of file worker.c.

References AfterTriggerEndQuery(), CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationInsert(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_cstrings(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

586 {
588  LogicalRepTupleData newtup;
589  LogicalRepRelId relid;
590  EState *estate;
591  TupleTableSlot *remoteslot;
592  MemoryContext oldctx;
593 
595 
596  relid = logicalrep_read_insert(s, &newtup);
599  {
600  /*
601  * The relation can't become interesting in the middle of the
602  * transaction so it's safe to unlock it.
603  */
605  return;
606  }
607 
608  /* Initialize the executor state. */
609  estate = create_estate_for_relation(rel);
610  remoteslot = ExecInitExtraTupleSlot(estate,
612  &TTSOpsVirtual);
613 
614  /* Input functions may need an active snapshot, so get one */
616 
617  /* Process and store remote tuple in the slot */
619  slot_store_cstrings(remoteslot, rel, newtup.values);
620  slot_fill_defaults(rel, estate, remoteslot);
621  MemoryContextSwitchTo(oldctx);
622 
624 
625  /* Do the insert. */
626  ExecSimpleRelationInsert(estate, remoteslot);
627 
628  /* Cleanup. */
631 
632  /* Handle queued AFTER triggers. */
633  AfterTriggerEndQuery(estate);
634 
635  ExecResetTupleTable(estate->es_tupleTable, false);
636  FreeExecutorState(estate);
637 
639 
641 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:129
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:174
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define RelationGetDescr(relation)
Definition: rel.h:454
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:381
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:159
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:212
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:145
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:303
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * es_tupleTable
Definition: execnodes.h:551
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1005
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4805
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
uint32 LogicalRepRelId
Definition: logicalproto.h:39
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 518 of file worker.c.

References am_tablesync_worker(), ereport, errcode(), errmsg(), ERROR, in_remote_transaction, and IsTransactionState().

Referenced by apply_dispatch().

519 {
520  /*
521  * ORIGIN message can only come inside remote transaction and before any
522  * actual writes.
523  */
524  if (!in_remote_transaction ||
526  ereport(ERROR,
527  (errcode(ERRCODE_PROTOCOL_VIOLATION),
528  errmsg("ORIGIN message sent out of order")));
529 }
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
bool in_remote_transaction
Definition: worker.c:106
static bool am_tablesync_worker(void)
#define ereport(elevel, rest)
Definition: elog.h:141
bool IsTransactionState(void)
Definition: xact.c:355
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 540 of file worker.c.

References logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

541 {
542  LogicalRepRelation *rel;
543 
544  rel = logicalrep_read_rel(s);
546 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:375
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:154

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 921 of file worker.c.

References CommandCounterIncrement(), DROP_RESTRICT, ensure_transaction(), ExecuteTruncateGuts(), lappend(), lappend_oid(), lfirst, lfirst_oid, LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationIsLogicallyLogged, RowExclusiveLock, and should_apply_changes_for_rel().

Referenced by apply_dispatch().

922 {
923  bool cascade = false;
924  bool restart_seqs = false;
925  List *remote_relids = NIL;
926  List *remote_rels = NIL;
927  List *rels = NIL;
928  List *relids = NIL;
929  List *relids_logged = NIL;
930  ListCell *lc;
931 
933 
934  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
935 
936  foreach(lc, remote_relids)
937  {
938  LogicalRepRelId relid = lfirst_oid(lc);
940 
943  {
944  /*
945  * The relation can't become interesting in the middle of the
946  * transaction so it's safe to unlock it.
947  */
949  continue;
950  }
951 
952  remote_rels = lappend(remote_rels, rel);
953  rels = lappend(rels, rel->localrel);
954  relids = lappend_oid(relids, rel->localreloid);
956  relids_logged = lappend_oid(relids_logged, rel->localreloid);
957  }
958 
959  /*
960  * Even if we used CASCADE on the upstream master we explicitly default to
961  * replaying changes without further cascading. This might be later
962  * changeable with a user specified option.
963  */
964  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
965 
966  foreach(lc, remote_rels)
967  {
968  LogicalRepRelMapEntry *rel = lfirst(lc);
969 
971  }
972 
974 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:129
#define NIL
Definition: pg_list.h:65
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:325
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:381
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:600
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
static bool ensure_transaction(void)
Definition: worker.c:145
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1645
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
List * lappend(List *list, void *datum)
Definition: list.c:322
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define lfirst(lc)
Definition: pg_list.h:190
Definition: pg_list.h:50
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 555 of file worker.c.

References logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

556 {
557  LogicalRepTyp typ;
558 
559  logicalrep_read_typ(s, &typ);
561 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:427
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:401

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 682 of file worker.c.

References AfterTriggerEndQuery(), Assert, bms_add_member(), LogicalRepTupleData::changed, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_range_table, EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSimpleRelationUpdate(), FirstLowInvalidHeapAttributeNumber, FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), i, list_nth(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), TupleDescData::natts, NIL, NoLock, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_modify_cstrings(), slot_store_cstrings(), table_slot_create(), TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, RangeTblEntry::updatedCols, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

683 {
685  LogicalRepRelId relid;
686  Oid idxoid;
687  EState *estate;
688  EPQState epqstate;
689  LogicalRepTupleData oldtup;
690  LogicalRepTupleData newtup;
691  bool has_oldtup;
692  TupleTableSlot *localslot;
693  TupleTableSlot *remoteslot;
694  RangeTblEntry *target_rte;
695  bool found;
696  MemoryContext oldctx;
697 
699 
700  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
701  &newtup);
704  {
705  /*
706  * The relation can't become interesting in the middle of the
707  * transaction so it's safe to unlock it.
708  */
710  return;
711  }
712 
713  /* Check if we can do the update. */
715 
716  /* Initialize the executor state. */
717  estate = create_estate_for_relation(rel);
718  remoteslot = ExecInitExtraTupleSlot(estate,
720  &TTSOpsVirtual);
721  localslot = table_slot_create(rel->localrel,
722  &estate->es_tupleTable);
723  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
724 
725  /*
726  * Populate updatedCols so that per-column triggers can fire. This could
727  * include more columns than were actually changed on the publisher
728  * because the logical replication protocol doesn't contain that
729  * information. But it would for example exclude columns that only exist
730  * on the subscriber, since we are not touching those.
731  */
732  target_rte = list_nth(estate->es_range_table, 0);
733  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
734  {
735  if (newtup.changed[i])
736  target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
738  }
739 
742 
743  /* Build the search tuple. */
745  slot_store_cstrings(remoteslot, rel,
746  has_oldtup ? oldtup.values : newtup.values);
747  MemoryContextSwitchTo(oldctx);
748 
749  /*
750  * Try to find tuple using either replica identity index, primary key or
751  * if needed, sequential scan.
752  */
753  idxoid = GetRelationIdentityOrPK(rel->localrel);
754  Assert(OidIsValid(idxoid) ||
755  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
756 
757  if (OidIsValid(idxoid))
758  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
760  remoteslot, localslot);
761  else
763  remoteslot, localslot);
764 
765  ExecClearTuple(remoteslot);
766 
767  /*
768  * Tuple found.
769  *
770  * Note this will fail if there are other conflicting unique indexes.
771  */
772  if (found)
773  {
774  /* Process and store remote tuple in the slot */
776  slot_modify_cstrings(remoteslot, localslot, rel,
777  newtup.values, newtup.changed);
778  MemoryContextSwitchTo(oldctx);
779 
780  EvalPlanQualSetSlot(&epqstate, remoteslot);
781 
782  /* Do the actual update. */
783  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
784  }
785  else
786  {
787  /*
788  * The tuple to be updated could not be found.
789  *
790  * TODO what to do here, change the log level to LOG perhaps?
791  */
792  elog(DEBUG1,
793  "logical replication did not find row for update "
794  "in replication target relation \"%s\"",
796  }
797 
798  /* Cleanup. */
801 
802  /* Handle queued AFTER triggers. */
803  AfterTriggerEndQuery(estate);
804 
805  EvalPlanQualEnd(&epqstate);
806  ExecResetTupleTable(estate->es_tupleTable, false);
807  FreeExecutorState(estate);
808 
810 
812 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:129
#define NIL
Definition: pg_list.h:65
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:174
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:569
#define DEBUG1
Definition: elog.h:25
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define RelationGetDescr(relation)
Definition: rel.h:454
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:381
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
List * es_range_table
Definition: execnodes.h:504
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:645
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2944
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
static bool ensure_transaction(void)
Definition: worker.c:145
LogicalRepRelation remoterel
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:303
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:462
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:648
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2484
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
List * es_tupleTable
Definition: execnodes.h:551
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1005
Bitmapset * updatedCols
Definition: parsenodes.h:1121
#define Assert(condition)
Definition: c.h:739
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:210
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4805
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:228
int i
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:377
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1585 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

1586 {
1587  int worker_slot = DatumGetInt32(main_arg);
1588  MemoryContext oldctx;
1589  char originname[NAMEDATALEN];
1590  XLogRecPtr origin_startpos;
1591  char *myslotname;
1593 
1594  /* Attach to slot */
1595  logicalrep_worker_attach(worker_slot);
1596 
1597  /* Setup signal handling */
1599  pqsignal(SIGTERM, die);
1601 
1602  /*
1603  * We don't currently need any ResourceOwner in a walreceiver process, but
1604  * if we did, we could call CreateAuxProcessResourceOwner here.
1605  */
1606 
1607  /* Initialise stats to a sanish value */
1610 
1611  /* Load the libpq-specific functions */
1612  load_file("libpqwalreceiver", false);
1613 
1614  /* Run as replica session replication role. */
1615  SetConfigOption("session_replication_role", "replica",
1617 
1618  /* Connect to our database. */
1621  0);
1622 
1623  /* Load the subscription into persistent memory context. */
1625  "ApplyContext",
1629 
1631  if (!MySubscription)
1632  {
1633  ereport(LOG,
1634  (errmsg("logical replication apply worker for subscription %u will not "
1635  "start because the subscription was removed during startup",
1637  proc_exit(0);
1638  }
1639 
1640  MySubscriptionValid = true;
1641  MemoryContextSwitchTo(oldctx);
1642 
1643  if (!MySubscription->enabled)
1644  {
1645  ereport(LOG,
1646  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1647  "start because the subscription was disabled during startup",
1648  MySubscription->name)));
1649 
1650  proc_exit(0);
1651  }
1652 
1653  /* Setup synchronous commit according to the user's wishes */
1654  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1656 
1657  /* Keep us informed about subscription changes. */
1660  (Datum) 0);
1661 
1662  if (am_tablesync_worker())
1663  ereport(LOG,
1664  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1666  else
1667  ereport(LOG,
1668  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1669  MySubscription->name)));
1670 
1672 
1673  /* Connect to the origin and start the replication. */
1674  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1676 
1677  if (am_tablesync_worker())
1678  {
1679  char *syncslotname;
1680 
1681  /* This is table synchronization worker, call initial sync. */
1682  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1683 
1684  /* The slot name needs to be allocated in permanent memory context. */
1686  myslotname = pstrdup(syncslotname);
1687  MemoryContextSwitchTo(oldctx);
1688 
1689  pfree(syncslotname);
1690  }
1691  else
1692  {
1693  /* This is main apply worker */
1694  RepOriginId originid;
1695  TimeLineID startpointTLI;
1696  char *err;
1697 
1698  myslotname = MySubscription->slotname;
1699 
1700  /*
1701  * This shouldn't happen if the subscription is enabled, but guard
1702  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1703  * crash if slot is NULL.)
1704  */
1705  if (!myslotname)
1706  ereport(ERROR,
1707  (errmsg("subscription has no replication slot set")));
1708 
1709  /* Setup replication origin tracking. */
1711  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1712  originid = replorigin_by_name(originname, true);
1713  if (!OidIsValid(originid))
1714  originid = replorigin_create(originname);
1715  replorigin_session_setup(originid);
1716  replorigin_session_origin = originid;
1717  origin_startpos = replorigin_session_get_progress(false);
1719 
1721  &err);
1722  if (wrconn == NULL)
1723  ereport(ERROR,
1724  (errmsg("could not connect to the publisher: %s", err)));
1725 
1726  /*
1727  * We don't really use the output identify_system for anything but it
1728  * does some initializations on the upstream so let's still call it.
1729  */
1730  (void) walrcv_identify_system(wrconn, &startpointTLI);
1731 
1732  }
1733 
1734  /*
1735  * Setup callback for syscache so that we know when something changes in
1736  * the subscription relation state.
1737  */
1740  (Datum) 0);
1741 
1742  /* Build logical replication streaming options. */
1743  options.logical = true;
1744  options.startpoint = origin_startpos;
1745  options.slotname = myslotname;
1746  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1747  options.proto.logical.publication_names = MySubscription->publications;
1748 
1749  /* Start normal logical streaming replication. */
1750  walrcv_startstreaming(wrconn, &options);
1751 
1752  /* Run the main loop. */
1753  LogicalRepApplyLoop(origin_startpos);
1754 
1755  proc_exit(0);
1756 }
Subscription * MySubscription
Definition: worker.c:103
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:101
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:52
#define DatumGetInt32(X)
Definition: postgres.h:472
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:272
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2898
union WalRcvStreamOptions::@106 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:278
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1188
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1053
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:798
#define OidIsValid(objectId)
Definition: c.h:645
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1578
void pfree(void *pointer)
Definition: mcxt.c:1056
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
void logicalrep_worker_attach(int slot)
Definition: launcher.c:625
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7563
#define SIGHUP
Definition: win32_port.h:154
XLogRecPtr startpoint
Definition: walreceiver.h:160
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:239
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext TopMemoryContext
Definition: mcxt.c:44
MemoryContext ApplyContext
Definition: worker.c:99
static char ** options
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
uintptr_t Datum
Definition: postgres.h:367
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5708
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:152
void StartTransactionCommand(void)
Definition: xact.c:2797
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
bool MySubscriptionValid
Definition: worker.c:104
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1123
#define elog(elevel,...)
Definition: elog.h:228
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define snprintf
Definition: port.h:192
#define die(msg)
Definition: pg_test_fsync.c:96
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:256
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5737
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:264

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 648 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

649 {
650  /* Updatable, no error. */
651  if (rel->updatable)
652  return;
653 
654  /*
655  * We are in error mode so it's fine this is somewhat slow. It's better to
656  * give user correct error.
657  */
659  {
660  ereport(ERROR,
661  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
662  errmsg("publisher did not send replica identity column "
663  "expected by the logical replication target relation \"%s.%s\"",
664  rel->remoterel.nspname, rel->remoterel.relname)));
665  }
666 
667  ereport(ERROR,
668  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
669  errmsg("logical replication target relation \"%s.%s\" has "
670  "neither REPLICA IDENTITY index nor PRIMARY "
671  "KEY and published relation does not have "
672  "REPLICA IDENTITY FULL",
673  rel->remoterel.nspname, rel->remoterel.relname)));
674 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:569
int errcode(int sqlerrcode)
Definition: elog.c:608
#define OidIsValid(objectId)
Definition: c.h:645
#define ERROR
Definition: elog.h:43
LogicalRepRelation remoterel
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ create_estate_for_relation()

static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 174 of file worker.c.

References AccessShareLock, AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_num_result_relations, EState::es_output_cid, EState::es_result_relation_info, EState::es_result_relations, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

175 {
176  EState *estate;
177  ResultRelInfo *resultRelInfo;
178  RangeTblEntry *rte;
179 
180  estate = CreateExecutorState();
181 
182  rte = makeNode(RangeTblEntry);
183  rte->rtekind = RTE_RELATION;
184  rte->relid = RelationGetRelid(rel->localrel);
185  rte->relkind = rel->localrel->rd_rel->relkind;
187  ExecInitRangeTable(estate, list_make1(rte));
188 
189  resultRelInfo = makeNode(ResultRelInfo);
190  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
191 
192  estate->es_result_relations = resultRelInfo;
193  estate->es_num_result_relations = 1;
194  estate->es_result_relation_info = resultRelInfo;
195 
196  estate->es_output_cid = GetCurrentCommandId(true);
197 
198  /* Prepare to catch AFTER triggers. */
200 
201  return estate;
202 }
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:724
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
CommandId es_output_cid
Definition: execnodes.h:516
#define AccessShareLock
Definition: lockdefs.h:36
Form_pg_class rd_rel
Definition: rel.h:84
#define list_make1(x1)
Definition: pg_list.h:227
ResultRelInfo * es_result_relations
Definition: execnodes.h:519
EState * CreateExecutorState(void)
Definition: execUtils.c:88
int es_num_result_relations
Definition: execnodes.h:520
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4785
#define makeNode(_type_)
Definition: nodes.h:573
RTEKind rtekind
Definition: parsenodes.h:974
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:745
#define RelationGetRelid(relation)
Definition: rel.h:428
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521

◆ ensure_transaction()

static bool ensure_transaction ( void  )
static

Definition at line 145 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), maybe_reread_subscription(), MemoryContextSwitchTo(), SetCurrentStatementStartTimestamp(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

146 {
147  if (IsTransactionState())
148  {
150 
153 
154  return false;
155  }
156 
159 
161 
163  return true;
164 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void maybe_reread_subscription(void)
Definition: worker.c:1442
static MemoryContext ApplyMessageContext
Definition: worker.c:98
void StartTransactionCommand(void)
Definition: xact.c:2797
bool IsTransactionState(void)
Definition: xact.c:355
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:817

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 1044 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, FlushPosition::node, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

1046 {
1047  dlist_mutable_iter iter;
1048  XLogRecPtr local_flush = GetFlushRecPtr();
1049 
1051  *flush = InvalidXLogRecPtr;
1052 
1054  {
1055  FlushPosition *pos =
1056  dlist_container(FlushPosition, node, iter.cur);
1057 
1058  *write = pos->remote_end;
1059 
1060  if (pos->local_end <= local_flush)
1061  {
1062  *flush = pos->remote_end;
1063  dlist_delete(iter.cur);
1064  pfree(pos);
1065  }
1066  else
1067  {
1068  /*
1069  * Don't want to uselessly iterate over the rest of the list which
1070  * could potentially be long. Instead get the last element and
1071  * grab the write position from there.
1072  */
1073  pos = dlist_tail_element(FlushPosition, node,
1074  &lsn_mapping);
1075  *write = pos->remote_end;
1076  *have_pending_txes = true;
1077  return;
1078  }
1079  }
1080 
1081  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1082 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:89
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:86
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8262
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:85
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 569 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by apply_handle_delete(), apply_handle_update(), and check_relation_updatable().

570 {
571  Oid idxoid;
572 
573  idxoid = RelationGetReplicaIndex(rel);
574 
575  if (!OidIsValid(idxoid))
576  idxoid = RelationGetPrimaryKeyIndex(rel);
577 
578  return idxoid;
579 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4539
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:645
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4518

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 1762 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

1763 {
1764  return MyLogicalRepWorker != NULL;
1765 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 1123 of file worker.c.

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), in_remote_transaction, StringInfoData::len, LOG, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

1124 {
1125  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1126 
1127  /*
1128  * Init the ApplyMessageContext which we clean up after each replication
1129  * protocol message.
1130  */
1132  "ApplyMessageContext",
1134 
1135  /* mark as idle, before starting to loop */
1137 
1138  for (;;)
1139  {
1141  int rc;
1142  int len;
1143  char *buf = NULL;
1144  bool endofstream = false;
1145  bool ping_sent = false;
1146  long wait_time;
1147 
1149 
1151 
1152  len = walrcv_receive(wrconn, &buf, &fd);
1153 
1154  if (len != 0)
1155  {
1156  /* Process the data */
1157  for (;;)
1158  {
1160 
1161  if (len == 0)
1162  {
1163  break;
1164  }
1165  else if (len < 0)
1166  {
1167  ereport(LOG,
1168  (errmsg("data stream from publisher has ended")));
1169  endofstream = true;
1170  break;
1171  }
1172  else
1173  {
1174  int c;
1175  StringInfoData s;
1176 
1177  /* Reset timeout. */
1178  last_recv_timestamp = GetCurrentTimestamp();
1179  ping_sent = false;
1180 
1181  /* Ensure we are reading the data into our memory context. */
1183 
1184  s.data = buf;
1185  s.len = len;
1186  s.cursor = 0;
1187  s.maxlen = -1;
1188 
1189  c = pq_getmsgbyte(&s);
1190 
1191  if (c == 'w')
1192  {
1193  XLogRecPtr start_lsn;
1194  XLogRecPtr end_lsn;
1195  TimestampTz send_time;
1196 
1197  start_lsn = pq_getmsgint64(&s);
1198  end_lsn = pq_getmsgint64(&s);
1199  send_time = pq_getmsgint64(&s);
1200 
1201  if (last_received < start_lsn)
1202  last_received = start_lsn;
1203 
1204  if (last_received < end_lsn)
1205  last_received = end_lsn;
1206 
1207  UpdateWorkerStats(last_received, send_time, false);
1208 
1209  apply_dispatch(&s);
1210  }
1211  else if (c == 'k')
1212  {
1213  XLogRecPtr end_lsn;
1215  bool reply_requested;
1216 
1217  end_lsn = pq_getmsgint64(&s);
1218  timestamp = pq_getmsgint64(&s);
1219  reply_requested = pq_getmsgbyte(&s);
1220 
1221  if (last_received < end_lsn)
1222  last_received = end_lsn;
1223 
1224  send_feedback(last_received, reply_requested, false);
1225  UpdateWorkerStats(last_received, timestamp, true);
1226  }
1227  /* other message types are purposefully ignored */
1228 
1230  }
1231 
1232  len = walrcv_receive(wrconn, &buf, &fd);
1233  }
1234  }
1235 
1236  /* confirm all writes so far */
1237  send_feedback(last_received, false, false);
1238 
1239  if (!in_remote_transaction)
1240  {
1241  /*
1242  * If we didn't get any transactions for a while there might be
1243  * unconsumed invalidation messages in the queue, consume them
1244  * now.
1245  */
1248 
1249  /* Process any table synchronization changes. */
1250  process_syncing_tables(last_received);
1251  }
1252 
1253  /* Cleanup the memory. */
1256 
1257  /* Check if we need to exit the streaming loop. */
1258  if (endofstream)
1259  {
1260  TimeLineID tli;
1261 
1262  walrcv_endstreaming(wrconn, &tli);
1263  break;
1264  }
1265 
1266  /*
1267  * Wait for more data or latch. If we have unflushed transactions,
1268  * wake up after WalWriterDelay to see if they've been flushed yet (in
1269  * which case we should send a feedback message). Otherwise, there's
1270  * no particular urgency about waking up unless we get data or a
1271  * signal.
1272  */
1273  if (!dlist_is_empty(&lsn_mapping))
1274  wait_time = WalWriterDelay;
1275  else
1276  wait_time = NAPTIME_PER_CYCLE;
1277 
1281  fd, wait_time,
1283 
1284  if (rc & WL_LATCH_SET)
1285  {
1288  }
1289 
1290  if (ConfigReloadPending)
1291  {
1292  ConfigReloadPending = false;
1294  }
1295 
1296  if (rc & WL_TIMEOUT)
1297  {
1298  /*
1299  * We didn't receive anything new. If we haven't heard anything
1300  * from the server for more than wal_receiver_timeout / 2, ping
1301  * the server. Also, if it's been longer than
1302  * wal_receiver_status_interval since the last update we sent,
1303  * send a status update to the master anyway, to report any
1304  * progress in applying WAL.
1305  */
1306  bool requestReply = false;
1307 
1308  /*
1309  * Check if time since last receive from standby has reached the
1310  * configured limit.
1311  */
1312  if (wal_receiver_timeout > 0)
1313  {
1315  TimestampTz timeout;
1316 
1317  timeout =
1318  TimestampTzPlusMilliseconds(last_recv_timestamp,
1320 
1321  if (now >= timeout)
1322  ereport(ERROR,
1323  (errmsg("terminating logical replication worker due to timeout")));
1324 
1325  /*
1326  * We didn't receive anything new, for half of receiver
1327  * replication timeout. Ping the server.
1328  */
1329  if (!ping_sent)
1330  {
1331  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1332  (wal_receiver_timeout / 2));
1333  if (now >= timeout)
1334  {
1335  requestReply = true;
1336  ping_sent = true;
1337  }
1338  }
1339  }
1340 
1341  send_feedback(last_received, requestReply, requestReply);
1342  }
1343  }
1344 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1353
WalReceiverConn * wrconn
Definition: worker.c:101
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:280
uint32 TimeLineID
Definition: xlogdefs.h:52
void AcceptInvalidationMessages(void)
Definition: inval.c:681
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static dlist_head lsn_mapping
Definition: worker.c:89
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3114
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1107
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:282
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ResetLatch(Latch *latch)
Definition: latch.c:519
int wal_receiver_timeout
Definition: walreceiver.c:78
#define ERROR
Definition: elog.h:43
#define NAPTIME_PER_CYCLE
Definition: worker.c:80
bool in_remote_transaction
Definition: worker.c:106
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * c
static char * buf
Definition: pg_test_fsync.c:67
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
int pgsocket
Definition: port.h:31
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext TopMemoryContext
Definition: mcxt.c:44
Definition: guc.h:72
MemoryContext ApplyContext
Definition: worker.c:99
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define PGINVALID_SOCKET
Definition: port.h:33
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_dispatch(StringInfo s)
Definition: worker.c:981
static void maybe_reread_subscription(void)
Definition: worker.c:1442
static MemoryContext ApplyMessageContext
Definition: worker.c:98
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
int WalWriterDelay
Definition: walwriter.c:70
int errmsg(const char *fmt,...)
Definition: elog.c:822
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 1442 of file worker.c.

References Assert, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), LogicalRepWorker::subid, and Subscription::synccommit.

Referenced by apply_handle_commit(), ensure_transaction(), and LogicalRepApplyLoop().

1443 {
1444  MemoryContext oldctx;
1446  bool started_tx = false;
1447 
1448  /* When cache state is valid there is nothing to do here. */
1449  if (MySubscriptionValid)
1450  return;
1451 
1452  /* This function might be called inside or outside of transaction. */
1453  if (!IsTransactionState())
1454  {
1456  started_tx = true;
1457  }
1458 
1459  /* Ensure allocations in permanent context. */
1461 
1462  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1463 
1464  /*
1465  * Exit if the subscription was removed. This normally should not happen
1466  * as the worker gets killed during DROP SUBSCRIPTION.
1467  */
1468  if (!newsub)
1469  {
1470  ereport(LOG,
1471  (errmsg("logical replication apply worker for subscription \"%s\" will "
1472  "stop because the subscription was removed",
1473  MySubscription->name)));
1474 
1475  proc_exit(0);
1476  }
1477 
1478  /*
1479  * Exit if the subscription was disabled. This normally should not happen
1480  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1481  */
1482  if (!newsub->enabled)
1483  {
1484  ereport(LOG,
1485  (errmsg("logical replication apply worker for subscription \"%s\" will "
1486  "stop because the subscription was disabled",
1487  MySubscription->name)));
1488 
1489  proc_exit(0);
1490  }
1491 
1492  /*
1493  * Exit if connection string was changed. The launcher will start new
1494  * worker.
1495  */
1496  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1497  {
1498  ereport(LOG,
1499  (errmsg("logical replication apply worker for subscription \"%s\" will "
1500  "restart because the connection information was changed",
1501  MySubscription->name)));
1502 
1503  proc_exit(0);
1504  }
1505 
1506  /*
1507  * Exit if subscription name was changed (it's used for
1508  * fallback_application_name). The launcher will start new worker.
1509  */
1510  if (strcmp(newsub->name, MySubscription->name) != 0)
1511  {
1512  ereport(LOG,
1513  (errmsg("logical replication apply worker for subscription \"%s\" will "
1514  "restart because subscription was renamed",
1515  MySubscription->name)));
1516 
1517  proc_exit(0);
1518  }
1519 
1520  /* !slotname should never happen when enabled is true. */
1521  Assert(newsub->slotname);
1522 
1523  /*
1524  * We need to make new connection to new slot if slot name has changed so
1525  * exit here as well if that's the case.
1526  */
1527  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1528  {
1529  ereport(LOG,
1530  (errmsg("logical replication apply worker for subscription \"%s\" will "
1531  "restart because the replication slot name was changed",
1532  MySubscription->name)));
1533 
1534  proc_exit(0);
1535  }
1536 
1537  /*
1538  * Exit if publication list was changed. The launcher will start new
1539  * worker.
1540  */
1542  {
1543  ereport(LOG,
1544  (errmsg("logical replication apply worker for subscription \"%s\" will "
1545  "restart because subscription's publications were changed",
1546  MySubscription->name)));
1547 
1548  proc_exit(0);
1549  }
1550 
1551  /* Check for other changes that should never happen too. */
1552  if (newsub->dbid != MySubscription->dbid)
1553  {
1554  elog(ERROR, "subscription %u changed unexpectedly",
1556  }
1557 
1558  /* Clean old subscription info and switch to new one. */
1561 
1562  MemoryContextSwitchTo(oldctx);
1563 
1564  /* Change synchronous commit according to the user's wishes */
1565  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1567 
1568  if (started_tx)
1570 
1571  MySubscriptionValid = true;
1572 }
Subscription * MySubscription
Definition: worker.c:103
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3018
void CommitTransactionCommand(void)
Definition: xact.c:2898
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:104
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7563
List * publications
#define ereport(elevel, rest)
Definition: elog.h:141
MemoryContext ApplyContext
Definition: worker.c:99
#define Assert(condition)
Definition: c.h:739
void StartTransactionCommand(void)
Definition: xact.c:2797
bool IsTransactionState(void)
Definition: xact.c:355
bool MySubscriptionValid
Definition: worker.c:104
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 1353 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

1354 {
1355  static StringInfo reply_message = NULL;
1356  static TimestampTz send_time = 0;
1357 
1358  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1359  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1360  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1361 
1362  XLogRecPtr writepos;
1363  XLogRecPtr flushpos;
1364  TimestampTz now;
1365  bool have_pending_txes;
1366 
1367  /*
1368  * If the user doesn't want status to be reported to the publisher, be
1369  * sure to exit before doing anything at all.
1370  */
1371  if (!force && wal_receiver_status_interval <= 0)
1372  return;
1373 
1374  /* It's legal to not pass a recvpos */
1375  if (recvpos < last_recvpos)
1376  recvpos = last_recvpos;
1377 
1378  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1379 
1380  /*
1381  * No outstanding transactions to flush, we can report the latest received
1382  * position. This is important for synchronous replication.
1383  */
1384  if (!have_pending_txes)
1385  flushpos = writepos = recvpos;
1386 
1387  if (writepos < last_writepos)
1388  writepos = last_writepos;
1389 
1390  if (flushpos < last_flushpos)
1391  flushpos = last_flushpos;
1392 
1393  now = GetCurrentTimestamp();
1394 
1395  /* if we've already reported everything we're good */
1396  if (!force &&
1397  writepos == last_writepos &&
1398  flushpos == last_flushpos &&
1399  !TimestampDifferenceExceeds(send_time, now,
1401  return;
1402  send_time = now;
1403 
1404  if (!reply_message)
1405  {
1407 
1408  reply_message = makeStringInfo();
1409  MemoryContextSwitchTo(oldctx);
1410  }
1411  else
1412  resetStringInfo(reply_message);
1413 
1414  pq_sendbyte(reply_message, 'r');
1415  pq_sendint64(reply_message, recvpos); /* write */
1416  pq_sendint64(reply_message, flushpos); /* flush */
1417  pq_sendint64(reply_message, writepos); /* apply */
1418  pq_sendint64(reply_message, now); /* sendTime */
1419  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1420 
1421  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1422  force,
1423  (uint32) (recvpos >> 32), (uint32) recvpos,
1424  (uint32) (writepos >> 32), (uint32) writepos,
1425  (uint32) (flushpos >> 32), (uint32) flushpos
1426  );
1427 
1428  walrcv_send(wrconn, reply_message->data, reply_message->len);
1429 
1430  if (recvpos > last_recvpos)
1431  last_recvpos = recvpos;
1432  if (writepos > last_writepos)
1433  last_writepos = writepos;
1434  if (flushpos > last_flushpos)
1435  last_flushpos = flushpos;
1436 }
WalReceiverConn * wrconn
Definition: worker.c:101
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:77
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1044
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:114
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:359
MemoryContext ApplyContext
Definition: worker.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:284
#define elog(elevel,...)
Definition: elog.h:228
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 129 of file worker.c.

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

130 {
131  if (am_tablesync_worker())
132  return MyLogicalRepWorker->relid == rel->localreloid;
133  else
134  return (rel->state == SUBREL_STATE_READY ||
135  (rel->state == SUBREL_STATE_SYNCDONE &&
136  rel->statelsn <= remote_final_lsn));
137 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:107
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
static bool am_tablesync_worker(void)

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 212 of file worker.c.

References Assert, attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, LogicalRepRelation::natts, TupleDescData::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

214 {
215  TupleDesc desc = RelationGetDescr(rel->localrel);
216  int num_phys_attrs = desc->natts;
217  int i;
218  int attnum,
219  num_defaults = 0;
220  int *defmap;
221  ExprState **defexprs;
222  ExprContext *econtext;
223 
224  econtext = GetPerTupleExprContext(estate);
225 
226  /* We got all the data via replication, no need to evaluate anything. */
227  if (num_phys_attrs == rel->remoterel.natts)
228  return;
229 
230  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
231  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
232 
233  Assert(rel->attrmap->maplen == num_phys_attrs);
234  for (attnum = 0; attnum < num_phys_attrs; attnum++)
235  {
236  Expr *defexpr;
237 
238  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
239  continue;
240 
241  if (rel->attrmap->attnums[attnum] >= 0)
242  continue;
243 
244  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
245 
246  if (defexpr != NULL)
247  {
248  /* Run the expression through planner */
249  defexpr = expression_planner(defexpr);
250 
251  /* Initialize executable expression in copycontext */
252  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
253  defmap[num_defaults] = attnum;
254  num_defaults++;
255  }
256 
257  }
258 
259  for (i = 0; i < num_defaults; i++)
260  slot->tts_values[defmap[i]] =
261  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
262 }
#define RelationGetDescr(relation)
Definition: rel.h:454
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Expr * expression_planner(Expr *expr)
Definition: planner.c:6051
int maplen
Definition: attmap.h:37
Datum * tts_values
Definition: tuptable.h:126
#define GetPerTupleExprContext(estate)
Definition: executor.h:501
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:285
Node * build_column_default(Relation rel, int attrno)
int16 attnum
Definition: pg_attribute.h:79
#define Assert(condition)
Definition: c.h:739
AttrNumber * attnums
Definition: attmap.h:36
void * palloc(Size size)
Definition: mcxt.c:949
int i
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:121

◆ slot_modify_cstrings()

static void slot_modify_cstrings ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
char **  values,
bool replaces 
)
static

Definition at line 377 of file worker.c.

References ErrorContextCallback::arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_update().

380 {
381  int natts = slot->tts_tupleDescriptor->natts;
382  int i;
383  SlotErrCallbackArg errarg;
384  ErrorContextCallback errcallback;
385 
386  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
387  ExecClearTuple(slot);
388 
389  /*
390  * Copy all the column data from srcslot, so that we'll have valid values
391  * for unreplaced columns.
392  */
393  Assert(natts == srcslot->tts_tupleDescriptor->natts);
394  slot_getallattrs(srcslot);
395  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
396  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
397 
398  /* For error reporting, push callback + info on the error context stack */
399  errarg.rel = rel;
400  errarg.local_attnum = -1;
401  errarg.remote_attnum = -1;
402  errcallback.callback = slot_store_error_callback;
403  errcallback.arg = (void *) &errarg;
404  errcallback.previous = error_context_stack;
405  error_context_stack = &errcallback;
406 
407  /* Call the "in" function for each replaced attribute */
408  Assert(natts == rel->attrmap->maplen);
409  for (i = 0; i < natts; i++)
410  {
412  int remoteattnum = rel->attrmap->attnums[i];
413 
414  if (remoteattnum < 0)
415  continue;
416 
417  if (!replaces[remoteattnum])
418  continue;
419 
420  if (values[remoteattnum] != NULL)
421  {
422  Oid typinput;
423  Oid typioparam;
424 
425  errarg.local_attnum = i;
426  errarg.remote_attnum = remoteattnum;
427 
428  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
429  slot->tts_values[i] =
430  OidInputFunctionCall(typinput, values[remoteattnum],
431  typioparam, att->atttypmod);
432  slot->tts_isnull[i] = false;
433 
434  errarg.local_attnum = -1;
435  errarg.remote_attnum = -1;
436  }
437  else
438  {
439  slot->tts_values[i] = (Datum) 0;
440  slot->tts_isnull[i] = true;
441  }
442  }
443 
444  /* Pop the error context stack */
445  error_context_stack = errcallback.previous;
446 
447  /* And finally, declare that "slot" contains a valid virtual tuple */
448  ExecStoreVirtualTuple(slot);
449 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:93
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
int maplen
Definition: attmap.h:37
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:256
struct ErrorContextCallback * previous
Definition: elog.h:255
ErrorContextCallback * error_context_stack
Definition: elog.c:91
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
#define Assert(condition)
Definition: c.h:739
AttrNumber * attnums
Definition: attmap.h:36
static void slot_store_error_callback(void *arg)
Definition: worker.c:268
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_cstrings()

static void slot_store_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values 
)
static

Definition at line 303 of file worker.c.

References ErrorContextCallback::arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

305 {
306  int natts = slot->tts_tupleDescriptor->natts;
307  int i;
308  SlotErrCallbackArg errarg;
309  ErrorContextCallback errcallback;
310 
311  ExecClearTuple(slot);
312 
313  /* Push callback + info on the error context stack */
314  errarg.rel = rel;
315  errarg.local_attnum = -1;
316  errarg.remote_attnum = -1;
317  errcallback.callback = slot_store_error_callback;
318  errcallback.arg = (void *) &errarg;
319  errcallback.previous = error_context_stack;
320  error_context_stack = &errcallback;
321 
322  /* Call the "in" function for each non-dropped attribute */
323  Assert(natts == rel->attrmap->maplen);
324  for (i = 0; i < natts; i++)
325  {
327  int remoteattnum = rel->attrmap->attnums[i];
328 
329  if (!att->attisdropped && remoteattnum >= 0 &&
330  values[remoteattnum] != NULL)
331  {
332  Oid typinput;
333  Oid typioparam;
334 
335  errarg.local_attnum = i;
336  errarg.remote_attnum = remoteattnum;
337 
338  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
339  slot->tts_values[i] =
340  OidInputFunctionCall(typinput, values[remoteattnum],
341  typioparam, att->atttypmod);
342  slot->tts_isnull[i] = false;
343 
344  errarg.local_attnum = -1;
345  errarg.remote_attnum = -1;
346  }
347  else
348  {
349  /*
350  * We assign NULL to dropped attributes, NULL values, and missing
351  * values (missing values should be later filled using
352  * slot_fill_defaults).
353  */
354  slot->tts_values[i] = (Datum) 0;
355  slot->tts_isnull[i] = true;
356  }
357  }
358 
359  /* Pop the error context stack */
360  error_context_stack = errcallback.previous;
361 
362  ExecStoreVirtualTuple(slot);
363 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:93
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
int maplen
Definition: attmap.h:37
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:256
struct ErrorContextCallback * previous
Definition: elog.h:255
ErrorContextCallback * error_context_stack
Definition: elog.c:91
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
#define Assert(condition)
Definition: c.h:739
AttrNumber * attnums
Definition: attmap.h:36
static void slot_store_error_callback(void *arg)
Definition: worker.c:268
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_error_callback()

static void slot_store_error_callback ( void *  arg)
static

Definition at line 268 of file worker.c.

References LogicalRepRelation::attnames, LogicalRepRelation::atttyps, errcontext, format_type_be(), get_atttype(), SlotErrCallbackArg::local_attnum, LogicalRepRelMapEntry::localreloid, logicalrep_typmap_gettypname(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, LogicalRepRelation::relname, SlotErrCallbackArg::remote_attnum, and LogicalRepRelMapEntry::remoterel.

Referenced by slot_modify_cstrings(), and slot_store_cstrings().

269 {
272  char *remotetypname;
273  Oid remotetypoid,
274  localtypoid;
275 
276  /* Nothing to do if remote attribute number is not set */
277  if (errarg->remote_attnum < 0)
278  return;
279 
280  rel = errarg->rel;
281  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
282 
283  /* Fetch remote type name from the LogicalRepTypMap cache */
284  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
285 
286  /* Fetch local type OID from the local sys cache */
287  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
288 
289  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
290  "remote type %s, local type %s",
291  rel->remoterel.nspname, rel->remoterel.relname,
292  rel->remoterel.attnames[errarg->remote_attnum],
293  remotetypname,
294  format_type_be(localtypoid));
295 }
LogicalRepRelMapEntry * rel
Definition: worker.c:93
char * format_type_be(Oid type_oid)
Definition: format_type.c:326
unsigned int Oid
Definition: postgres_ext.h:31
LogicalRepRelation remoterel
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:861
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:435
#define errcontext
Definition: elog.h:183
void * arg

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 1088 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit().

1089 {
1090  FlushPosition *flushpos;
1091 
1092  /* Need to do this in permanent context */
1094 
1095  /* Track commit lsn */
1096  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1097  flushpos->local_end = XactLastCommitEnd;
1098  flushpos->remote_end = remote_lsn;
1099 
1100  dlist_push_tail(&lsn_mapping, &flushpos->node);
1102 }
static dlist_head lsn_mapping
Definition: worker.c:89
dlist_node node
Definition: worker.c:84
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:352
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:86
MemoryContext ApplyContext
Definition: worker.c:99
XLogRecPtr local_end
Definition: worker.c:85
static MemoryContext ApplyMessageContext
Definition: worker.c:98
void * palloc(Size size)
Definition: mcxt.c:949

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1578 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

1579 {
1580  MySubscriptionValid = false;
1581 }
bool MySubscriptionValid
Definition: worker.c:104

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Definition at line 1107 of file worker.c.

References GetCurrentTimestamp(), LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, MyLogicalRepWorker, LogicalRepWorker::reply_lsn, and LogicalRepWorker::reply_time.

Referenced by LogicalRepApplyLoop().

1108 {
1109  MyLogicalRepWorker->last_lsn = last_lsn;
1110  MyLogicalRepWorker->last_send_time = send_time;
1112  if (reply)
1113  {
1114  MyLogicalRepWorker->reply_lsn = last_lsn;
1115  MyLogicalRepWorker->reply_time = send_time;
1116  }
1117 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
TimestampTz last_send_time
XLogRecPtr last_lsn
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
XLogRecPtr reply_lsn
TimestampTz last_recv_time
TimestampTz reply_time

Variable Documentation

◆ ApplyContext

MemoryContext ApplyContext = NULL

Definition at line 99 of file worker.c.

◆ ApplyMessageContext

MemoryContext ApplyMessageContext = NULL
static

Definition at line 98 of file worker.c.

◆ in_remote_transaction

bool in_remote_transaction = false

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 89 of file worker.c.

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false

Definition at line 104 of file worker.c.

Referenced by ApplyWorkerMain(), maybe_reread_subscription(), and subscription_change_cb().

◆ remote_final_lsn

XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
static

Definition at line 107 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), and should_apply_changes_for_rel().

◆ wrconn