PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 

Functions

static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static void apply_handle_insert_internal (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
 
static void apply_handle_delete_internal (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
 
static bool FindReplTupleInLocalRel (EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static bool ensure_transaction (void)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
 
static void slot_modify_cstrings (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 84 of file worker.c.

Referenced by LogicalRepApplyLoop().

Typedef Documentation

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SlotErrCallbackArg

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 1342 of file worker.c.

References generate_unaccent_rules::action, apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, and pq_getmsgbyte().

Referenced by LogicalRepApplyLoop().

1343 {
1344  char action = pq_getmsgbyte(s);
1345 
1346  switch (action)
1347  {
1348  /* BEGIN */
1349  case 'B':
1350  apply_handle_begin(s);
1351  break;
1352  /* COMMIT */
1353  case 'C':
1355  break;
1356  /* INSERT */
1357  case 'I':
1359  break;
1360  /* UPDATE */
1361  case 'U':
1363  break;
1364  /* DELETE */
1365  case 'D':
1367  break;
1368  /* TRUNCATE */
1369  case 'T':
1371  break;
1372  /* RELATION */
1373  case 'R':
1375  break;
1376  /* TYPE */
1377  case 'Y':
1378  apply_handle_type(s);
1379  break;
1380  /* ORIGIN */
1381  case 'O':
1383  break;
1384  default:
1385  ereport(ERROR,
1386  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1387  errmsg("invalid logical replication message type \"%c\"", action)));
1388  }
1389 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:579
static void apply_handle_insert(StringInfo s)
Definition: worker.c:610
int errcode(int sqlerrcode)
Definition: elog.c:610
#define ERROR
Definition: elog.h:43
static void apply_handle_delete(StringInfo s)
Definition: worker.c:867
static void apply_handle_begin(StringInfo s)
Definition: worker.c:479
static void apply_handle_commit(StringInfo s)
Definition: worker.c:498
static void apply_handle_update(StringInfo s)
Definition: worker.c:722
static void apply_handle_relation(StringInfo s)
Definition: worker.c:564
#define ereport(elevel,...)
Definition: elog.h:144
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_handle_origin(StringInfo s)
Definition: worker.c:542
int errmsg(const char *fmt,...)
Definition: elog.c:824
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:1234

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 479 of file worker.c.

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), pgstat_report_activity(), remote_final_lsn, and STATE_RUNNING.

Referenced by apply_dispatch().

480 {
481  LogicalRepBeginData begin_data;
482 
483  logicalrep_read_begin(s, &begin_data);
484 
485  remote_final_lsn = begin_data.final_lsn;
486 
487  in_remote_transaction = true;
488 
490 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:111
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
bool in_remote_transaction
Definition: worker.c:110
XLogRecPtr final_lsn
Definition: logicalproto.h:67

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 498 of file worker.c.

References AcceptInvalidationMessages(), am_tablesync_worker(), Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), logicalrep_read_commit(), maybe_reread_subscription(), pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), remote_final_lsn, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, STATE_IDLE, and store_flush_position().

Referenced by apply_dispatch().

499 {
500  LogicalRepCommitData commit_data;
501 
502  logicalrep_read_commit(s, &commit_data);
503 
504  Assert(commit_data.commit_lsn == remote_final_lsn);
505 
506  /* The synchronization worker runs in single transaction. */
508  {
509  /*
510  * Update origin state so we can restart streaming from correct
511  * position in case of crash.
512  */
515 
517  pgstat_report_stat(false);
518 
519  store_flush_position(commit_data.end_lsn);
520  }
521  else
522  {
523  /* Process any invalidation messages that might have accumulated. */
526  }
527 
528  in_remote_transaction = false;
529 
530  /* Process any tables that are being synchronized in parallel. */
531  process_syncing_tables(commit_data.end_lsn);
532 
534 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1449
void AcceptInvalidationMessages(void)
Definition: inval.c:681
static XLogRecPtr remote_final_lsn
Definition: worker.c:111
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
void CommitTransactionCommand(void)
Definition: xact.c:2919
bool in_remote_transaction
Definition: worker.c:110
static bool am_tablesync_worker(void)
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
static void maybe_reread_subscription(void)
Definition: worker.c:1803
#define Assert(condition)
Definition: c.h:745
bool IsTransactionState(void)
Definition: xact.c:356
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
XLogRecPtr commit_lsn
Definition: logicalproto.h:74
TimestampTz committime
Definition: logicalproto.h:76
void pgstat_report_stat(bool force)
Definition: pgstat.c:839

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 867 of file worker.c.

References AfterTriggerEndQuery(), apply_handle_delete_internal(), apply_handle_tuple_routing(), check_relation_updatable(), CMD_DELETE, CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecInitExtraTupleSlot(), ExecResetTupleTable(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetDescr, LogicalRepRelMapEntry::remoterel, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_cstrings(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

868 {
870  LogicalRepTupleData oldtup;
871  LogicalRepRelId relid;
872  EState *estate;
873  TupleTableSlot *remoteslot;
874  MemoryContext oldctx;
875 
877 
878  relid = logicalrep_read_delete(s, &oldtup);
881  {
882  /*
883  * The relation can't become interesting in the middle of the
884  * transaction so it's safe to unlock it.
885  */
887  return;
888  }
889 
890  /* Check if we can do the delete. */
892 
893  /* Initialize the executor state. */
894  estate = create_estate_for_relation(rel);
895  remoteslot = ExecInitExtraTupleSlot(estate,
897  &TTSOpsVirtual);
898 
900 
901  /* Build the search tuple. */
903  slot_store_cstrings(remoteslot, rel, oldtup.values);
904  MemoryContextSwitchTo(oldctx);
905 
906  /* For a partitioned table, apply delete to correct partition. */
907  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
909  remoteslot, NULL, rel, CMD_DELETE);
910  else
912  remoteslot, &rel->remoterel);
913 
915 
916  /* Handle queued AFTER triggers. */
917  AfterTriggerEndQuery(estate);
918 
919  ExecResetTupleTable(estate->es_tupleTable, false);
920  FreeExecutorState(estate);
921 
923 
925 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:153
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:198
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:276
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:929
#define RelationGetDescr(relation)
Definition: rel.h:482
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:399
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
Form_pg_class rd_rel
Definition: rel.h:109
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
void FreeExecutorState(EState *estate)
Definition: execUtils.c:191
static bool ensure_transaction(void)
Definition: worker.c:169
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:327
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:237
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:688
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1003
List * es_tupleTable
Definition: execnodes.h:557
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1006
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:512
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4452
uint32 LogicalRepRelId
Definition: logicalproto.h:39
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot,
LogicalRepRelation remoterel 
)
static

Definition at line 929 of file worker.c.

References DEBUG1, elog, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, and ResultRelInfo::ri_RelationDesc.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

932 {
933  Relation localrel = relinfo->ri_RelationDesc;
934  EPQState epqstate;
935  TupleTableSlot *localslot;
936  bool found;
937 
938  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
939  ExecOpenIndices(relinfo, false);
940 
941  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
942  remoteslot, &localslot);
943 
944  /* If found delete it. */
945  if (found)
946  {
947  EvalPlanQualSetSlot(&epqstate, localslot);
948 
949  /* Do the actual delete. */
950  ExecSimpleRelationDelete(estate, &epqstate, localslot);
951  }
952  else
953  {
954  /* The tuple to be deleted could not be found. */
955  elog(DEBUG1,
956  "logical replication could not find row for delete "
957  "in replication target relation \"%s\"",
958  RelationGetRelationName(localrel));
959  }
960 
961  /* Cleanup. */
962  ExecCloseIndices(relinfo);
963  EvalPlanQualEnd(&epqstate);
964 }
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:413
#define DEBUG1
Definition: elog.h:25
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2932
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define RelationGetRelationName(relation)
Definition: rel.h:490
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2486
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:974
#define elog(elevel,...)
Definition: elog.h:214
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:215

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 610 of file worker.c.

References AfterTriggerEndQuery(), apply_handle_insert_internal(), apply_handle_tuple_routing(), CMD_INSERT, CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecInitExtraTupleSlot(), ExecResetTupleTable(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_cstrings(), TTSOpsVirtual, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

611 {
613  LogicalRepTupleData newtup;
614  LogicalRepRelId relid;
615  EState *estate;
616  TupleTableSlot *remoteslot;
617  MemoryContext oldctx;
618 
620 
621  relid = logicalrep_read_insert(s, &newtup);
624  {
625  /*
626  * The relation can't become interesting in the middle of the
627  * transaction so it's safe to unlock it.
628  */
630  return;
631  }
632 
633  /* Initialize the executor state. */
634  estate = create_estate_for_relation(rel);
635  remoteslot = ExecInitExtraTupleSlot(estate,
637  &TTSOpsVirtual);
638 
639  /* Input functions may need an active snapshot, so get one */
641 
642  /* Process and store remote tuple in the slot */
644  slot_store_cstrings(remoteslot, rel, newtup.values);
645  slot_fill_defaults(rel, estate, remoteslot);
646  MemoryContextSwitchTo(oldctx);
647 
648  /* For a partitioned table, insert the tuple into a partition. */
649  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
651  remoteslot, NULL, rel, CMD_INSERT);
652  else
654  remoteslot);
655 
657 
658  /* Handle queued AFTER triggers. */
659  AfterTriggerEndQuery(estate);
660 
661  ExecResetTupleTable(estate->es_tupleTable, false);
662  FreeExecutorState(estate);
663 
665 
667 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:153
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:198
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define RelationGetDescr(relation)
Definition: rel.h:482
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:399
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
Form_pg_class rd_rel
Definition: rel.h:109
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:159
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:236
void FreeExecutorState(EState *estate)
Definition: execUtils.c:191
static bool ensure_transaction(void)
Definition: worker.c:169
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:327
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:237
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1003
List * es_tupleTable
Definition: execnodes.h:557
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1006
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:512
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4452
uint32 LogicalRepRelId
Definition: logicalproto.h:39
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:671
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot 
)
static

Definition at line 671 of file worker.c.

References ExecCloseIndices(), ExecOpenIndices(), and ExecSimpleRelationInsert().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

673 {
674  ExecOpenIndices(relinfo, false);
675 
676  /* Do the insert. */
677  ExecSimpleRelationInsert(estate, remoteslot);
678 
679  /* Cleanup. */
680  ExecCloseIndices(relinfo);
681 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 542 of file worker.c.

References am_tablesync_worker(), ereport, errcode(), errmsg(), ERROR, in_remote_transaction, and IsTransactionState().

Referenced by apply_dispatch().

543 {
544  /*
545  * ORIGIN message can only come inside remote transaction and before any
546  * actual writes.
547  */
548  if (!in_remote_transaction ||
550  ereport(ERROR,
551  (errcode(ERRCODE_PROTOCOL_VIOLATION),
552  errmsg("ORIGIN message sent out of order")));
553 }
int errcode(int sqlerrcode)
Definition: elog.c:610
#define ERROR
Definition: elog.h:43
bool in_remote_transaction
Definition: worker.c:110
static bool am_tablesync_worker(void)
#define ereport(elevel,...)
Definition: elog.h:144
bool IsTransactionState(void)
Definition: xact.c:356
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 564 of file worker.c.

References logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

565 {
566  LogicalRepRelation *rel;
567 
568  rel = logicalrep_read_rel(s);
570 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:375
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:172

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 1234 of file worker.c.

References CommandCounterIncrement(), DROP_RESTRICT, ensure_transaction(), ExecuteTruncateGuts(), find_all_inheritors(), lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, RowExclusiveLock, should_apply_changes_for_rel(), table_close(), and table_open().

Referenced by apply_dispatch().

1235 {
1236  bool cascade = false;
1237  bool restart_seqs = false;
1238  List *remote_relids = NIL;
1239  List *remote_rels = NIL;
1240  List *rels = NIL;
1241  List *part_rels = NIL;
1242  List *relids = NIL;
1243  List *relids_logged = NIL;
1244  ListCell *lc;
1245 
1247 
1248  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1249 
1250  foreach(lc, remote_relids)
1251  {
1252  LogicalRepRelId relid = lfirst_oid(lc);
1253  LogicalRepRelMapEntry *rel;
1254 
1255  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1256  if (!should_apply_changes_for_rel(rel))
1257  {
1258  /*
1259  * The relation can't become interesting in the middle of the
1260  * transaction so it's safe to unlock it.
1261  */
1263  continue;
1264  }
1265 
1266  remote_rels = lappend(remote_rels, rel);
1267  rels = lappend(rels, rel->localrel);
1268  relids = lappend_oid(relids, rel->localreloid);
1270  relids_logged = lappend_oid(relids_logged, rel->localreloid);
1271 
1272  /*
1273  * Truncate partitions if we got a message to truncate a partitioned
1274  * table.
1275  */
1276  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1277  {
1278  ListCell *child;
1279  List *children = find_all_inheritors(rel->localreloid,
1281  NULL);
1282 
1283  foreach(child, children)
1284  {
1285  Oid childrelid = lfirst_oid(child);
1286  Relation childrel;
1287 
1288  if (list_member_oid(relids, childrelid))
1289  continue;
1290 
1291  /* find_all_inheritors already got lock */
1292  childrel = table_open(childrelid, NoLock);
1293 
1294  /*
1295  * Ignore temp tables of other backends. See similar code in
1296  * ExecuteTruncate().
1297  */
1298  if (RELATION_IS_OTHER_TEMP(childrel))
1299  {
1300  table_close(childrel, RowExclusiveLock);
1301  continue;
1302  }
1303 
1304  rels = lappend(rels, childrel);
1305  part_rels = lappend(part_rels, childrel);
1306  relids = lappend_oid(relids, childrelid);
1307  /* Log this relation only if needed for logical decoding */
1308  if (RelationIsLogicallyLogged(childrel))
1309  relids_logged = lappend_oid(relids_logged, childrelid);
1310  }
1311  }
1312  }
1313 
1314  /*
1315  * Even if we used CASCADE on the upstream primary we explicitly default to
1316  * replaying changes without further cascading. This might be later
1317  * changeable with a user specified option.
1318  */
1319  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
1320 
1321  foreach(lc, remote_rels)
1322  {
1323  LogicalRepRelMapEntry *rel = lfirst(lc);
1324 
1326  }
1327  foreach(lc, part_rels)
1328  {
1329  Relation rel = lfirst(lc);
1330 
1331  table_close(rel, NoLock);
1332  }
1333 
1335 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:153
#define NIL
Definition: pg_list.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:325
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:399
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:635
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
static bool ensure_transaction(void)
Definition: worker.c:169
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1671
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:237
List * lappend(List *list, void *datum)
Definition: list.c:321
void CommandCounterIncrement(void)
Definition: xact.c:1006
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
#define lfirst(lc)
Definition: pg_list.h:190
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:593
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
LogicalRepRelMapEntry relmapentry,
CmdType  operation 
)
static

Definition at line 1003 of file worker.c.

References apply_handle_delete_internal(), apply_handle_insert_internal(), Assert, TupleConversionMap::attrMap, LogicalRepTupleData::changed, CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog, ERROR, EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCleanupTupleRouting(), ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), NIL, ModifyTableState::operation, PartitionRoutingInfo::pi_PartitionTupleSlot, PartitionRoutingInfo::pi_RootToPartitionMap, PlanState::plan, ModifyTableState::ps, RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionCheck, ResultRelInfo::ri_PartitionInfo, ResultRelInfo::ri_RelationDesc, slot_getallattrs(), slot_modify_cstrings(), PlanState::state, table_slot_create(), and LogicalRepTupleData::values.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

1009 {
1010  Relation parentrel = relinfo->ri_RelationDesc;
1011  ModifyTableState *mtstate = NULL;
1012  PartitionTupleRouting *proute = NULL;
1013  ResultRelInfo *partrelinfo;
1014  Relation partrel;
1015  TupleTableSlot *remoteslot_part;
1016  PartitionRoutingInfo *partinfo;
1017  TupleConversionMap *map;
1018  MemoryContext oldctx;
1019 
1020  /* ModifyTableState is needed for ExecFindPartition(). */
1021  mtstate = makeNode(ModifyTableState);
1022  mtstate->ps.plan = NULL;
1023  mtstate->ps.state = estate;
1024  mtstate->operation = operation;
1025  mtstate->resultRelInfo = relinfo;
1026  proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
1027 
1028  /*
1029  * Find the partition to which the "search tuple" belongs.
1030  */
1031  Assert(remoteslot != NULL);
1033  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1034  remoteslot, estate);
1035  Assert(partrelinfo != NULL);
1036  partrel = partrelinfo->ri_RelationDesc;
1037 
1038  /*
1039  * To perform any of the operations below, the tuple must match the
1040  * partition's rowtype. Convert if needed or just copy, using a dedicated
1041  * slot to store the tuple in any case.
1042  */
1043  partinfo = partrelinfo->ri_PartitionInfo;
1044  remoteslot_part = partinfo->pi_PartitionTupleSlot;
1045  if (remoteslot_part == NULL)
1046  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1047  map = partinfo->pi_RootToPartitionMap;
1048  if (map != NULL)
1049  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1050  remoteslot_part);
1051  else
1052  {
1053  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1054  slot_getallattrs(remoteslot_part);
1055  }
1056  MemoryContextSwitchTo(oldctx);
1057 
1058  estate->es_result_relation_info = partrelinfo;
1059  switch (operation)
1060  {
1061  case CMD_INSERT:
1062  apply_handle_insert_internal(partrelinfo, estate,
1063  remoteslot_part);
1064  break;
1065 
1066  case CMD_DELETE:
1067  apply_handle_delete_internal(partrelinfo, estate,
1068  remoteslot_part,
1069  &relmapentry->remoterel);
1070  break;
1071 
1072  case CMD_UPDATE:
1073 
1074  /*
1075  * For UPDATE, depending on whether or not the updated tuple
1076  * satisfies the partition's constraint, perform a simple UPDATE
1077  * of the partition or move the updated tuple into a different
1078  * suitable partition.
1079  */
1080  {
1081  AttrMap *attrmap = map ? map->attrMap : NULL;
1082  LogicalRepRelMapEntry *part_entry;
1083  TupleTableSlot *localslot;
1084  ResultRelInfo *partrelinfo_new;
1085  bool found;
1086 
1087  part_entry = logicalrep_partition_open(relmapentry, partrel,
1088  attrmap);
1089 
1090  /* Get the matching local tuple from the partition. */
1091  found = FindReplTupleInLocalRel(estate, partrel,
1092  &part_entry->remoterel,
1093  remoteslot_part, &localslot);
1094 
1096  if (found)
1097  {
1098  /* Apply the update. */
1099  slot_modify_cstrings(remoteslot_part, localslot,
1100  part_entry,
1101  newtup->values, newtup->changed);
1102  MemoryContextSwitchTo(oldctx);
1103  }
1104  else
1105  {
1106  /*
1107  * The tuple to be updated could not be found.
1108  *
1109  * TODO what to do here, change the log level to LOG
1110  * perhaps?
1111  */
1112  elog(DEBUG1,
1113  "logical replication did not find row for update "
1114  "in replication target relation \"%s\"",
1115  RelationGetRelationName(partrel));
1116  }
1117 
1118  /*
1119  * Does the updated tuple still satisfy the current
1120  * partition's constraint?
1121  */
1122  if (partrelinfo->ri_PartitionCheck == NULL ||
1123  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1124  false))
1125  {
1126  /*
1127  * Yes, so simply UPDATE the partition. We don't call
1128  * apply_handle_update_internal() here, which would
1129  * normally do the following work, to avoid repeating some
1130  * work already done above to find the local tuple in the
1131  * partition.
1132  */
1133  EPQState epqstate;
1134 
1135  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1136  ExecOpenIndices(partrelinfo, false);
1137 
1138  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1139  ExecSimpleRelationUpdate(estate, &epqstate, localslot,
1140  remoteslot_part);
1141  ExecCloseIndices(partrelinfo);
1142  EvalPlanQualEnd(&epqstate);
1143  }
1144  else
1145  {
1146  /* Move the tuple into the new partition. */
1147 
1148  /*
1149  * New partition will be found using tuple routing, which
1150  * can only occur via the parent table. We might need to
1151  * convert the tuple to the parent's rowtype. Note that
1152  * this is the tuple found in the partition, not the
1153  * original search tuple received by this function.
1154  */
1155  if (map)
1156  {
1157  TupleConversionMap *PartitionToRootMap =
1159  RelationGetDescr(parentrel));
1160 
1161  remoteslot =
1162  execute_attr_map_slot(PartitionToRootMap->attrMap,
1163  remoteslot_part, remoteslot);
1164  }
1165  else
1166  {
1167  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1168  slot_getallattrs(remoteslot);
1169  }
1170 
1171 
1172  /* Find the new partition. */
1174  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1175  proute, remoteslot,
1176  estate);
1177  MemoryContextSwitchTo(oldctx);
1178  Assert(partrelinfo_new != partrelinfo);
1179 
1180  /* DELETE old tuple found in the old partition. */
1181  estate->es_result_relation_info = partrelinfo;
1182  apply_handle_delete_internal(partrelinfo, estate,
1183  localslot,
1184  &relmapentry->remoterel);
1185 
1186  /* INSERT new tuple into the new partition. */
1187 
1188  /*
1189  * Convert the replacement tuple to match the destination
1190  * partition rowtype.
1191  */
1193  partrel = partrelinfo_new->ri_RelationDesc;
1194  partinfo = partrelinfo_new->ri_PartitionInfo;
1195  remoteslot_part = partinfo->pi_PartitionTupleSlot;
1196  if (remoteslot_part == NULL)
1197  remoteslot_part = table_slot_create(partrel,
1198  &estate->es_tupleTable);
1199  map = partinfo->pi_RootToPartitionMap;
1200  if (map != NULL)
1201  {
1202  remoteslot_part = execute_attr_map_slot(map->attrMap,
1203  remoteslot,
1204  remoteslot_part);
1205  }
1206  else
1207  {
1208  remoteslot_part = ExecCopySlot(remoteslot_part,
1209  remoteslot);
1210  slot_getallattrs(remoteslot);
1211  }
1212  MemoryContextSwitchTo(oldctx);
1213  estate->es_result_relation_info = partrelinfo_new;
1214  apply_handle_insert_internal(partrelinfo_new, estate,
1215  remoteslot_part);
1216  }
1217  }
1218  break;
1219 
1220  default:
1221  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1222  break;
1223  }
1224 
1225  ExecCleanupTupleRouting(mtstate, proute);
1226 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:413
#define DEBUG1
Definition: elog.h:25
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:929
#define RelationGetDescr(relation)
Definition: rel.h:482
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1174
struct PartitionRoutingInfo * ri_PartitionInfo
Definition: execnodes.h:490
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
CmdType operation
Definition: execnodes.h:1166
EState * state
Definition: execnodes.h:947
Definition: attmap.h:34
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2932
TupleConversionMap * pi_RootToPartitionMap
Definition: execPartition.h:37
#define ERROR
Definition: elog.h:43
PlanState ps
Definition: execnodes.h:1165
LogicalRepRelation remoterel
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RelationGetRelationName(relation)
Definition: rel.h:490
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, ModifyTableState *mtstate, Relation rel)
AttrMap * attrMap
Definition: tupconvert.h:27
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2486
List * es_tupleTable
Definition: execnodes.h:557
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:974
List * ri_PartitionCheck
Definition: execnodes.h:481
Plan * plan
Definition: execnodes.h:945
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:582
#define makeNode(_type_)
Definition: nodes.h:577
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
#define Assert(condition)
Definition: c.h:745
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:512
#define elog(elevel,...)
Definition: elog.h:214
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1783
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:401
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:215
TupleTableSlot * pi_PartitionTupleSlot
Definition: execPartition.h:49
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:671
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 579 of file worker.c.

References logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

580 {
581  LogicalRepTyp typ;
582 
583  logicalrep_read_typ(s, &typ);
585 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:427
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:419

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 722 of file worker.c.

References AfterTriggerEndQuery(), apply_handle_tuple_routing(), apply_handle_update_internal(), bms_add_member(), LogicalRepTupleData::changed, check_relation_updatable(), CMD_UPDATE, CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_range_table, EState::es_result_relation_info, EState::es_tupleTable, ExecInitExtraTupleSlot(), ExecResetTupleTable(), fill_extraUpdatedCols(), FirstLowInvalidHeapAttributeNumber, FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), i, list_nth(), LogicalRepRelMapEntry::localrel, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), TupleDescData::natts, NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_cstrings(), TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, RangeTblEntry::updatedCols, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

723 {
725  LogicalRepRelId relid;
726  EState *estate;
727  LogicalRepTupleData oldtup;
728  LogicalRepTupleData newtup;
729  bool has_oldtup;
730  TupleTableSlot *remoteslot;
731  RangeTblEntry *target_rte;
732  MemoryContext oldctx;
733 
735 
736  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
737  &newtup);
740  {
741  /*
742  * The relation can't become interesting in the middle of the
743  * transaction so it's safe to unlock it.
744  */
746  return;
747  }
748 
749  /* Check if we can do the update. */
751 
752  /* Initialize the executor state. */
753  estate = create_estate_for_relation(rel);
754  remoteslot = ExecInitExtraTupleSlot(estate,
756  &TTSOpsVirtual);
757 
758  /*
759  * Populate updatedCols so that per-column triggers can fire. This could
760  * include more columns than were actually changed on the publisher
761  * because the logical replication protocol doesn't contain that
762  * information. But it would for example exclude columns that only exist
763  * on the subscriber, since we are not touching those.
764  */
765  target_rte = list_nth(estate->es_range_table, 0);
766  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
767  {
768  if (newtup.changed[i])
769  target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
771  }
772 
774 
776 
777  /* Build the search tuple. */
779  slot_store_cstrings(remoteslot, rel,
780  has_oldtup ? oldtup.values : newtup.values);
781  MemoryContextSwitchTo(oldctx);
782 
783  /* For a partitioned table, apply update to correct partition. */
784  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
786  remoteslot, &newtup, rel, CMD_UPDATE);
787  else
789  remoteslot, &newtup, rel);
790 
792 
793  /* Handle queued AFTER triggers. */
794  AfterTriggerEndQuery(estate);
795 
796  ExecResetTupleTable(estate->es_tupleTable, false);
797  FreeExecutorState(estate);
798 
800 
802 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:153
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:198
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define RelationGetDescr(relation)
Definition: rel.h:482
static void apply_handle_update_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
Definition: worker.c:806
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:399
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
List * es_range_table
Definition: execnodes.h:510
Form_pg_class rd_rel
Definition: rel.h:109
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
void FreeExecutorState(EState *estate)
Definition: execUtils.c:191
static bool ensure_transaction(void)
Definition: worker.c:169
void fill_extraUpdatedCols(RangeTblEntry *target_rte, TupleDesc tupdesc)
Definition: analyze.c:2368
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:327
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:237
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:688
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1003
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
List * es_tupleTable
Definition: execnodes.h:557
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1006
Bitmapset * updatedCols
Definition: parsenodes.h:1123
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:210
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:512
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4452
int i
uint32 LogicalRepRelId
Definition: logicalproto.h:39
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
LogicalRepRelMapEntry relmapentry 
)
static

Definition at line 806 of file worker.c.

References LogicalRepTupleData::changed, DEBUG1, elog, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, slot_modify_cstrings(), and LogicalRepTupleData::values.

Referenced by apply_handle_update().

810 {
811  Relation localrel = relinfo->ri_RelationDesc;
812  EPQState epqstate;
813  TupleTableSlot *localslot;
814  bool found;
815  MemoryContext oldctx;
816 
817  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
818  ExecOpenIndices(relinfo, false);
819 
820  found = FindReplTupleInLocalRel(estate, localrel,
821  &relmapentry->remoterel,
822  remoteslot, &localslot);
823  ExecClearTuple(remoteslot);
824 
825  /*
826  * Tuple found.
827  *
828  * Note this will fail if there are other conflicting unique indexes.
829  */
830  if (found)
831  {
832  /* Process and store remote tuple in the slot */
834  slot_modify_cstrings(remoteslot, localslot, relmapentry,
835  newtup->values, newtup->changed);
836  MemoryContextSwitchTo(oldctx);
837 
838  EvalPlanQualSetSlot(&epqstate, remoteslot);
839 
840  /* Do the actual update. */
841  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
842  }
843  else
844  {
845  /*
846  * The tuple to be updated could not be found.
847  *
848  * TODO what to do here, change the log level to LOG perhaps?
849  */
850  elog(DEBUG1,
851  "logical replication did not find row for update "
852  "in replication target relation \"%s\"",
853  RelationGetRelationName(localrel));
854  }
855 
856  /* Cleanup. */
857  ExecCloseIndices(relinfo);
858  EvalPlanQualEnd(&epqstate);
859 }
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:413
#define DEBUG1
Definition: elog.h:25
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2932
LogicalRepRelation remoterel
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RelationGetRelationName(relation)
Definition: rel.h:490
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2486
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:974
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:512
#define elog(elevel,...)
Definition: elog.h:214
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:401
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:215

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1946 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

1947 {
1948  int worker_slot = DatumGetInt32(main_arg);
1949  MemoryContext oldctx;
1950  char originname[NAMEDATALEN];
1951  XLogRecPtr origin_startpos;
1952  char *myslotname;
1954 
1955  /* Attach to slot */
1956  logicalrep_worker_attach(worker_slot);
1957 
1958  /* Setup signal handling */
1960  pqsignal(SIGTERM, die);
1962 
1963  /*
1964  * We don't currently need any ResourceOwner in a walreceiver process, but
1965  * if we did, we could call CreateAuxProcessResourceOwner here.
1966  */
1967 
1968  /* Initialise stats to a sanish value */
1971 
1972  /* Load the libpq-specific functions */
1973  load_file("libpqwalreceiver", false);
1974 
1975  /* Run as replica session replication role. */
1976  SetConfigOption("session_replication_role", "replica",
1978 
1979  /* Connect to our database. */
1982  0);
1983 
1984  /* Load the subscription into persistent memory context. */
1986  "ApplyContext",
1990 
1992  if (!MySubscription)
1993  {
1994  ereport(LOG,
1995  (errmsg("logical replication apply worker for subscription %u will not "
1996  "start because the subscription was removed during startup",
1998  proc_exit(0);
1999  }
2000 
2001  MySubscriptionValid = true;
2002  MemoryContextSwitchTo(oldctx);
2003 
2004  if (!MySubscription->enabled)
2005  {
2006  ereport(LOG,
2007  (errmsg("logical replication apply worker for subscription \"%s\" will not "
2008  "start because the subscription was disabled during startup",
2009  MySubscription->name)));
2010 
2011  proc_exit(0);
2012  }
2013 
2014  /* Setup synchronous commit according to the user's wishes */
2015  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2017 
2018  /* Keep us informed about subscription changes. */
2021  (Datum) 0);
2022 
2023  if (am_tablesync_worker())
2024  ereport(LOG,
2025  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
2027  else
2028  ereport(LOG,
2029  (errmsg("logical replication apply worker for subscription \"%s\" has started",
2030  MySubscription->name)));
2031 
2033 
2034  /* Connect to the origin and start the replication. */
2035  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
2037 
2038  if (am_tablesync_worker())
2039  {
2040  char *syncslotname;
2041 
2042  /* This is table synchronization worker, call initial sync. */
2043  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
2044 
2045  /* The slot name needs to be allocated in permanent memory context. */
2047  myslotname = pstrdup(syncslotname);
2048  MemoryContextSwitchTo(oldctx);
2049 
2050  pfree(syncslotname);
2051  }
2052  else
2053  {
2054  /* This is main apply worker */
2055  RepOriginId originid;
2056  TimeLineID startpointTLI;
2057  char *err;
2058 
2059  myslotname = MySubscription->slotname;
2060 
2061  /*
2062  * This shouldn't happen if the subscription is enabled, but guard
2063  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
2064  * crash if slot is NULL.)
2065  */
2066  if (!myslotname)
2067  ereport(ERROR,
2068  (errmsg("subscription has no replication slot set")));
2069 
2070  /* Setup replication origin tracking. */
2072  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
2073  originid = replorigin_by_name(originname, true);
2074  if (!OidIsValid(originid))
2075  originid = replorigin_create(originname);
2076  replorigin_session_setup(originid);
2077  replorigin_session_origin = originid;
2078  origin_startpos = replorigin_session_get_progress(false);
2080 
2082  &err);
2083  if (wrconn == NULL)
2084  ereport(ERROR,
2085  (errmsg("could not connect to the publisher: %s", err)));
2086 
2087  /*
2088  * We don't really use the output identify_system for anything but it
2089  * does some initializations on the upstream so let's still call it.
2090  */
2091  (void) walrcv_identify_system(wrconn, &startpointTLI);
2092 
2093  }
2094 
2095  /*
2096  * Setup callback for syscache so that we know when something changes in
2097  * the subscription relation state.
2098  */
2101  (Datum) 0);
2102 
2103  /* Build logical replication streaming options. */
2104  options.logical = true;
2105  options.startpoint = origin_startpos;
2106  options.slotname = myslotname;
2107  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
2108  options.proto.logical.publication_names = MySubscription->publications;
2109 
2110  /* Start normal logical streaming replication. */
2111  walrcv_startstreaming(wrconn, &options);
2112 
2113  /* Run the main loop. */
2114  LogicalRepApplyLoop(origin_startpos);
2115 
2116  proc_exit(0);
2117 }
Subscription * MySubscription
Definition: worker.c:107
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:105
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:52
#define DatumGetInt32(X)
Definition: postgres.h:472
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:404
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2919
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:410
union WalRcvStreamOptions::@104 proto
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1186
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1051
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:816
#define OidIsValid(objectId)
Definition: c.h:651
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1939
void pfree(void *pointer)
Definition: mcxt.c:1056
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
void logicalrep_worker_attach(int slot)
Definition: launcher.c:625
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7685
#define SIGHUP
Definition: win32_port.h:153
XLogRecPtr startpoint
Definition: walreceiver.h:168
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
MemoryContext TopMemoryContext
Definition: mcxt.c:44
MemoryContext ApplyContext
Definition: worker.c:103
static char ** options
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
uintptr_t Datum
Definition: postgres.h:367
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5734
#define ereport(elevel,...)
Definition: elog.h:144
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:154
void StartTransactionCommand(void)
Definition: xact.c:2818
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
bool MySubscriptionValid
Definition: worker.c:108
int errmsg(const char *fmt,...)
Definition: elog.c:824
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1484
#define elog(elevel,...)
Definition: elog.h:214
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1840
#define snprintf
Definition: port.h:193
#define die(msg)
Definition: pg_test_fsync.c:96
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:256
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5763
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:396

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 688 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

689 {
690  /* Updatable, no error. */
691  if (rel->updatable)
692  return;
693 
694  /*
695  * We are in error mode so it's fine this is somewhat slow. It's better to
696  * give user correct error.
697  */
699  {
700  ereport(ERROR,
701  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
702  errmsg("publisher did not send replica identity column "
703  "expected by the logical replication target relation \"%s.%s\"",
704  rel->remoterel.nspname, rel->remoterel.relname)));
705  }
706 
707  ereport(ERROR,
708  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
709  errmsg("logical replication target relation \"%s.%s\" has "
710  "neither REPLICA IDENTITY index nor PRIMARY "
711  "KEY and published relation does not have "
712  "REPLICA IDENTITY FULL",
713  rel->remoterel.nspname, rel->remoterel.relname)));
714 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:593
int errcode(int sqlerrcode)
Definition: elog.c:610
#define OidIsValid(objectId)
Definition: c.h:651
#define ERROR
Definition: elog.h:43
LogicalRepRelation remoterel
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ create_estate_for_relation()

static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 198 of file worker.c.

References AccessShareLock, AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_num_result_relations, EState::es_output_cid, EState::es_result_relation_info, EState::es_result_relations, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

199 {
200  EState *estate;
201  ResultRelInfo *resultRelInfo;
202  RangeTblEntry *rte;
203 
204  estate = CreateExecutorState();
205 
206  rte = makeNode(RangeTblEntry);
207  rte->rtekind = RTE_RELATION;
208  rte->relid = RelationGetRelid(rel->localrel);
209  rte->relkind = rel->localrel->rd_rel->relkind;
211  ExecInitRangeTable(estate, list_make1(rte));
212 
213  resultRelInfo = makeNode(ResultRelInfo);
214  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
215 
216  estate->es_result_relations = resultRelInfo;
217  estate->es_num_result_relations = 1;
218  estate->es_result_relation_info = resultRelInfo;
219 
220  estate->es_output_cid = GetCurrentCommandId(true);
221 
222  /* Prepare to catch AFTER triggers. */
224 
225  return estate;
226 }
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:765
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
CommandId es_output_cid
Definition: execnodes.h:522
#define AccessShareLock
Definition: lockdefs.h:36
Form_pg_class rd_rel
Definition: rel.h:109
#define list_make1(x1)
Definition: pg_list.h:227
ResultRelInfo * es_result_relations
Definition: execnodes.h:525
EState * CreateExecutorState(void)
Definition: execUtils.c:89
int es_num_result_relations
Definition: execnodes.h:526
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4432
#define makeNode(_type_)
Definition: nodes.h:577
RTEKind rtekind
Definition: parsenodes.h:976
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:746
#define RelationGetRelid(relation)
Definition: rel.h:456
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ ensure_transaction()

static bool ensure_transaction ( void  )
static

Definition at line 169 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), maybe_reread_subscription(), MemoryContextSwitchTo(), SetCurrentStatementStartTimestamp(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

170 {
171  if (IsTransactionState())
172  {
174 
177 
178  return false;
179  }
180 
183 
185 
187  return true;
188 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void maybe_reread_subscription(void)
Definition: worker.c:1803
static MemoryContext ApplyMessageContext
Definition: worker.c:102
void StartTransactionCommand(void)
Definition: xact.c:2818
bool IsTransactionState(void)
Definition: xact.c:356
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:818

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( EState estate,
Relation  localrel,
LogicalRepRelation remoterel,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 974 of file worker.c.

References Assert, EState::es_tupleTable, GetRelationIdentityOrPK(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, and table_slot_create().

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

978 {
979  Oid idxoid;
980  bool found;
981 
982  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
983 
984  idxoid = GetRelationIdentityOrPK(localrel);
985  Assert(OidIsValid(idxoid) ||
986  (remoterel->replident == REPLICA_IDENTITY_FULL));
987 
988  if (OidIsValid(idxoid))
989  found = RelationFindReplTupleByIndex(localrel, idxoid,
991  remoteslot, *localslot);
992  else
994  remoteslot, *localslot);
995 
996  return found;
997 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:593
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:651
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
List * es_tupleTable
Definition: execnodes.h:557
#define Assert(condition)
Definition: c.h:745
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 1405 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, FlushPosition::node, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

1407 {
1408  dlist_mutable_iter iter;
1409  XLogRecPtr local_flush = GetFlushRecPtr();
1410 
1412  *flush = InvalidXLogRecPtr;
1413 
1415  {
1416  FlushPosition *pos =
1417  dlist_container(FlushPosition, node, iter.cur);
1418 
1419  *write = pos->remote_end;
1420 
1421  if (pos->local_end <= local_flush)
1422  {
1423  *flush = pos->remote_end;
1424  dlist_delete(iter.cur);
1425  pfree(pos);
1426  }
1427  else
1428  {
1429  /*
1430  * Don't want to uselessly iterate over the rest of the list which
1431  * could potentially be long. Instead get the last element and
1432  * grab the write position from there.
1433  */
1434  pos = dlist_tail_element(FlushPosition, node,
1435  &lsn_mapping);
1436  *write = pos->remote_end;
1437  *have_pending_txes = true;
1438  return;
1439  }
1440  }
1441 
1442  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1443 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:93
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:90
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8424
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:89
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 593 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by check_relation_updatable(), and FindReplTupleInLocalRel().

594 {
595  Oid idxoid;
596 
597  idxoid = RelationGetReplicaIndex(rel);
598 
599  if (!OidIsValid(idxoid))
600  idxoid = RelationGetPrimaryKeyIndex(rel);
601 
602  return idxoid;
603 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4716
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:651
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4695

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 2123 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

2124 {
2125  return MyLogicalRepWorker != NULL;
2126 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 1484 of file worker.c.

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), in_remote_transaction, StringInfoData::len, LOG, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

1485 {
1486  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1487 
1488  /*
1489  * Init the ApplyMessageContext which we clean up after each replication
1490  * protocol message.
1491  */
1493  "ApplyMessageContext",
1495 
1496  /* mark as idle, before starting to loop */
1498 
1499  for (;;)
1500  {
1502  int rc;
1503  int len;
1504  char *buf = NULL;
1505  bool endofstream = false;
1506  bool ping_sent = false;
1507  long wait_time;
1508 
1510 
1512 
1513  len = walrcv_receive(wrconn, &buf, &fd);
1514 
1515  if (len != 0)
1516  {
1517  /* Process the data */
1518  for (;;)
1519  {
1521 
1522  if (len == 0)
1523  {
1524  break;
1525  }
1526  else if (len < 0)
1527  {
1528  ereport(LOG,
1529  (errmsg("data stream from publisher has ended")));
1530  endofstream = true;
1531  break;
1532  }
1533  else
1534  {
1535  int c;
1536  StringInfoData s;
1537 
1538  /* Reset timeout. */
1539  last_recv_timestamp = GetCurrentTimestamp();
1540  ping_sent = false;
1541 
1542  /* Ensure we are reading the data into our memory context. */
1544 
1545  s.data = buf;
1546  s.len = len;
1547  s.cursor = 0;
1548  s.maxlen = -1;
1549 
1550  c = pq_getmsgbyte(&s);
1551 
1552  if (c == 'w')
1553  {
1554  XLogRecPtr start_lsn;
1555  XLogRecPtr end_lsn;
1556  TimestampTz send_time;
1557 
1558  start_lsn = pq_getmsgint64(&s);
1559  end_lsn = pq_getmsgint64(&s);
1560  send_time = pq_getmsgint64(&s);
1561 
1562  if (last_received < start_lsn)
1563  last_received = start_lsn;
1564 
1565  if (last_received < end_lsn)
1566  last_received = end_lsn;
1567 
1568  UpdateWorkerStats(last_received, send_time, false);
1569 
1570  apply_dispatch(&s);
1571  }
1572  else if (c == 'k')
1573  {
1574  XLogRecPtr end_lsn;
1576  bool reply_requested;
1577 
1578  end_lsn = pq_getmsgint64(&s);
1579  timestamp = pq_getmsgint64(&s);
1580  reply_requested = pq_getmsgbyte(&s);
1581 
1582  if (last_received < end_lsn)
1583  last_received = end_lsn;
1584 
1585  send_feedback(last_received, reply_requested, false);
1586  UpdateWorkerStats(last_received, timestamp, true);
1587  }
1588  /* other message types are purposefully ignored */
1589 
1591  }
1592 
1593  len = walrcv_receive(wrconn, &buf, &fd);
1594  }
1595  }
1596 
1597  /* confirm all writes so far */
1598  send_feedback(last_received, false, false);
1599 
1600  if (!in_remote_transaction)
1601  {
1602  /*
1603  * If we didn't get any transactions for a while there might be
1604  * unconsumed invalidation messages in the queue, consume them
1605  * now.
1606  */
1609 
1610  /* Process any table synchronization changes. */
1611  process_syncing_tables(last_received);
1612  }
1613 
1614  /* Cleanup the memory. */
1617 
1618  /* Check if we need to exit the streaming loop. */
1619  if (endofstream)
1620  {
1621  TimeLineID tli;
1622 
1623  walrcv_endstreaming(wrconn, &tli);
1624  break;
1625  }
1626 
1627  /*
1628  * Wait for more data or latch. If we have unflushed transactions,
1629  * wake up after WalWriterDelay to see if they've been flushed yet (in
1630  * which case we should send a feedback message). Otherwise, there's
1631  * no particular urgency about waking up unless we get data or a
1632  * signal.
1633  */
1634  if (!dlist_is_empty(&lsn_mapping))
1635  wait_time = WalWriterDelay;
1636  else
1637  wait_time = NAPTIME_PER_CYCLE;
1638 
1642  fd, wait_time,
1644 
1645  if (rc & WL_LATCH_SET)
1646  {
1649  }
1650 
1651  if (ConfigReloadPending)
1652  {
1653  ConfigReloadPending = false;
1655  }
1656 
1657  if (rc & WL_TIMEOUT)
1658  {
1659  /*
1660  * We didn't receive anything new. If we haven't heard anything
1661  * from the server for more than wal_receiver_timeout / 2, ping
1662  * the server. Also, if it's been longer than
1663  * wal_receiver_status_interval since the last update we sent,
1664  * send a status update to the primary anyway, to report any
1665  * progress in applying WAL.
1666  */
1667  bool requestReply = false;
1668 
1669  /*
1670  * Check if time since last receive from standby has reached the
1671  * configured limit.
1672  */
1673  if (wal_receiver_timeout > 0)
1674  {
1676  TimestampTz timeout;
1677 
1678  timeout =
1679  TimestampTzPlusMilliseconds(last_recv_timestamp,
1681 
1682  if (now >= timeout)
1683  ereport(ERROR,
1684  (errmsg("terminating logical replication worker due to timeout")));
1685 
1686  /*
1687  * We didn't receive anything new, for half of receiver
1688  * replication timeout. Ping the server.
1689  */
1690  if (!ping_sent)
1691  {
1692  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1693  (wal_receiver_timeout / 2));
1694  if (now >= timeout)
1695  {
1696  requestReply = true;
1697  ping_sent = true;
1698  }
1699  }
1700  }
1701 
1702  send_feedback(last_received, requestReply, requestReply);
1703  }
1704  }
1705 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1714
WalReceiverConn * wrconn
Definition: worker.c:105
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:412
uint32 TimeLineID
Definition: xlogdefs.h:52
void AcceptInvalidationMessages(void)
Definition: inval.c:681
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static dlist_head lsn_mapping
Definition: worker.c:93
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1468
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:414
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ResetLatch(Latch *latch)
Definition: latch.c:540
int wal_receiver_timeout
Definition: walreceiver.c:89
#define ERROR
Definition: elog.h:43
#define NAPTIME_PER_CYCLE
Definition: worker.c:84
bool in_remote_transaction
Definition: worker.c:110
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * c
static char * buf
Definition: pg_test_fsync.c:67
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:390
int pgsocket
Definition: port.h:31
MemoryContext TopMemoryContext
Definition: mcxt.c:44
Definition: guc.h:72
MemoryContext ApplyContext
Definition: worker.c:103
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define PGINVALID_SOCKET
Definition: port.h:33
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:144
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_dispatch(StringInfo s)
Definition: worker.c:1342
static void maybe_reread_subscription(void)
Definition: worker.c:1803
static MemoryContext ApplyMessageContext
Definition: worker.c:102
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
int WalWriterDelay
Definition: walwriter.c:70
int errmsg(const char *fmt,...)
Definition: elog.c:824
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 1803 of file worker.c.

References Assert, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), LogicalRepWorker::subid, and Subscription::synccommit.

Referenced by apply_handle_commit(), ensure_transaction(), and LogicalRepApplyLoop().

1804 {
1805  MemoryContext oldctx;
1807  bool started_tx = false;
1808 
1809  /* When cache state is valid there is nothing to do here. */
1810  if (MySubscriptionValid)
1811  return;
1812 
1813  /* This function might be called inside or outside of transaction. */
1814  if (!IsTransactionState())
1815  {
1817  started_tx = true;
1818  }
1819 
1820  /* Ensure allocations in permanent context. */
1822 
1823  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1824 
1825  /*
1826  * Exit if the subscription was removed. This normally should not happen
1827  * as the worker gets killed during DROP SUBSCRIPTION.
1828  */
1829  if (!newsub)
1830  {
1831  ereport(LOG,
1832  (errmsg("logical replication apply worker for subscription \"%s\" will "
1833  "stop because the subscription was removed",
1834  MySubscription->name)));
1835 
1836  proc_exit(0);
1837  }
1838 
1839  /*
1840  * Exit if the subscription was disabled. This normally should not happen
1841  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1842  */
1843  if (!newsub->enabled)
1844  {
1845  ereport(LOG,
1846  (errmsg("logical replication apply worker for subscription \"%s\" will "
1847  "stop because the subscription was disabled",
1848  MySubscription->name)));
1849 
1850  proc_exit(0);
1851  }
1852 
1853  /*
1854  * Exit if connection string was changed. The launcher will start new
1855  * worker.
1856  */
1857  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1858  {
1859  ereport(LOG,
1860  (errmsg("logical replication apply worker for subscription \"%s\" will "
1861  "restart because the connection information was changed",
1862  MySubscription->name)));
1863 
1864  proc_exit(0);
1865  }
1866 
1867  /*
1868  * Exit if subscription name was changed (it's used for
1869  * fallback_application_name). The launcher will start new worker.
1870  */
1871  if (strcmp(newsub->name, MySubscription->name) != 0)
1872  {
1873  ereport(LOG,
1874  (errmsg("logical replication apply worker for subscription \"%s\" will "
1875  "restart because subscription was renamed",
1876  MySubscription->name)));
1877 
1878  proc_exit(0);
1879  }
1880 
1881  /* !slotname should never happen when enabled is true. */
1882  Assert(newsub->slotname);
1883 
1884  /*
1885  * We need to make new connection to new slot if slot name has changed so
1886  * exit here as well if that's the case.
1887  */
1888  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1889  {
1890  ereport(LOG,
1891  (errmsg("logical replication apply worker for subscription \"%s\" will "
1892  "restart because the replication slot name was changed",
1893  MySubscription->name)));
1894 
1895  proc_exit(0);
1896  }
1897 
1898  /*
1899  * Exit if publication list was changed. The launcher will start new
1900  * worker.
1901  */
1903  {
1904  ereport(LOG,
1905  (errmsg("logical replication apply worker for subscription \"%s\" will "
1906  "restart because subscription's publications were changed",
1907  MySubscription->name)));
1908 
1909  proc_exit(0);
1910  }
1911 
1912  /* Check for other changes that should never happen too. */
1913  if (newsub->dbid != MySubscription->dbid)
1914  {
1915  elog(ERROR, "subscription %u changed unexpectedly",
1917  }
1918 
1919  /* Clean old subscription info and switch to new one. */
1922 
1923  MemoryContextSwitchTo(oldctx);
1924 
1925  /* Change synchronous commit according to the user's wishes */
1926  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1928 
1929  if (started_tx)
1931 
1932  MySubscriptionValid = true;
1933 }
Subscription * MySubscription
Definition: worker.c:107
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3033
void CommitTransactionCommand(void)
Definition: xact.c:2919
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:104
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7685
List * publications
MemoryContext ApplyContext
Definition: worker.c:103
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:745
void StartTransactionCommand(void)
Definition: xact.c:2818
bool IsTransactionState(void)
Definition: xact.c:356
bool MySubscriptionValid
Definition: worker.c:108
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 1714 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

1715 {
1716  static StringInfo reply_message = NULL;
1717  static TimestampTz send_time = 0;
1718 
1719  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1720  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1721  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1722 
1723  XLogRecPtr writepos;
1724  XLogRecPtr flushpos;
1725  TimestampTz now;
1726  bool have_pending_txes;
1727 
1728  /*
1729  * If the user doesn't want status to be reported to the publisher, be
1730  * sure to exit before doing anything at all.
1731  */
1732  if (!force && wal_receiver_status_interval <= 0)
1733  return;
1734 
1735  /* It's legal to not pass a recvpos */
1736  if (recvpos < last_recvpos)
1737  recvpos = last_recvpos;
1738 
1739  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1740 
1741  /*
1742  * No outstanding transactions to flush, we can report the latest received
1743  * position. This is important for synchronous replication.
1744  */
1745  if (!have_pending_txes)
1746  flushpos = writepos = recvpos;
1747 
1748  if (writepos < last_writepos)
1749  writepos = last_writepos;
1750 
1751  if (flushpos < last_flushpos)
1752  flushpos = last_flushpos;
1753 
1754  now = GetCurrentTimestamp();
1755 
1756  /* if we've already reported everything we're good */
1757  if (!force &&
1758  writepos == last_writepos &&
1759  flushpos == last_flushpos &&
1760  !TimestampDifferenceExceeds(send_time, now,
1762  return;
1763  send_time = now;
1764 
1765  if (!reply_message)
1766  {
1768 
1769  reply_message = makeStringInfo();
1770  MemoryContextSwitchTo(oldctx);
1771  }
1772  else
1773  resetStringInfo(reply_message);
1774 
1775  pq_sendbyte(reply_message, 'r');
1776  pq_sendint64(reply_message, recvpos); /* write */
1777  pq_sendint64(reply_message, flushpos); /* flush */
1778  pq_sendint64(reply_message, writepos); /* apply */
1779  pq_sendint64(reply_message, now); /* sendTime */
1780  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1781 
1782  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1783  force,
1784  (uint32) (recvpos >> 32), (uint32) recvpos,
1785  (uint32) (writepos >> 32), (uint32) writepos,
1786  (uint32) (flushpos >> 32), (uint32) flushpos
1787  );
1788 
1789  walrcv_send(wrconn, reply_message->data, reply_message->len);
1790 
1791  if (recvpos > last_recvpos)
1792  last_recvpos = recvpos;
1793  if (writepos > last_writepos)
1794  last_writepos = writepos;
1795  if (flushpos > last_flushpos)
1796  last_flushpos = flushpos;
1797 }
WalReceiverConn * wrconn
Definition: worker.c:105
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1673
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1405
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:124
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:374
MemoryContext ApplyContext
Definition: worker.c:103
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:416
#define elog(elevel,...)
Definition: elog.h:214
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 153 of file worker.c.

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

154 {
155  if (am_tablesync_worker())
156  return MyLogicalRepWorker->relid == rel->localreloid;
157  else
158  return (rel->state == SUBREL_STATE_READY ||
159  (rel->state == SUBREL_STATE_SYNCDONE &&
160  rel->statelsn <= remote_final_lsn));
161 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:111
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
static bool am_tablesync_worker(void)

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 236 of file worker.c.

References Assert, attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, LogicalRepRelation::natts, TupleDescData::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

238 {
239  TupleDesc desc = RelationGetDescr(rel->localrel);
240  int num_phys_attrs = desc->natts;
241  int i;
242  int attnum,
243  num_defaults = 0;
244  int *defmap;
245  ExprState **defexprs;
246  ExprContext *econtext;
247 
248  econtext = GetPerTupleExprContext(estate);
249 
250  /* We got all the data via replication, no need to evaluate anything. */
251  if (num_phys_attrs == rel->remoterel.natts)
252  return;
253 
254  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
255  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
256 
257  Assert(rel->attrmap->maplen == num_phys_attrs);
258  for (attnum = 0; attnum < num_phys_attrs; attnum++)
259  {
260  Expr *defexpr;
261 
262  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
263  continue;
264 
265  if (rel->attrmap->attnums[attnum] >= 0)
266  continue;
267 
268  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
269 
270  if (defexpr != NULL)
271  {
272  /* Run the expression through planner */
273  defexpr = expression_planner(defexpr);
274 
275  /* Initialize executable expression in copycontext */
276  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
277  defmap[num_defaults] = attnum;
278  num_defaults++;
279  }
280 
281  }
282 
283  for (i = 0; i < num_defaults; i++)
284  slot->tts_values[defmap[i]] =
285  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
286 }
#define RelationGetDescr(relation)
Definition: rel.h:482
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Expr * expression_planner(Expr *expr)
Definition: planner.c:6156
int maplen
Definition: attmap.h:37
Datum * tts_values
Definition: tuptable.h:126
#define GetPerTupleExprContext(estate)
Definition: executor.h:507
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:290
Node * build_column_default(Relation rel, int attrno)
int16 attnum
Definition: pg_attribute.h:79
#define Assert(condition)
Definition: c.h:745
AttrNumber * attnums
Definition: attmap.h:36
void * palloc(Size size)
Definition: mcxt.c:949
int i
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123

◆ slot_modify_cstrings()

static void slot_modify_cstrings ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
char **  values,
bool replaces 
)
static

Definition at line 401 of file worker.c.

References ErrorContextCallback::arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

404 {
405  int natts = slot->tts_tupleDescriptor->natts;
406  int i;
407  SlotErrCallbackArg errarg;
408  ErrorContextCallback errcallback;
409 
410  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
411  ExecClearTuple(slot);
412 
413  /*
414  * Copy all the column data from srcslot, so that we'll have valid values
415  * for unreplaced columns.
416  */
417  Assert(natts == srcslot->tts_tupleDescriptor->natts);
418  slot_getallattrs(srcslot);
419  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
420  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
421 
422  /* For error reporting, push callback + info on the error context stack */
423  errarg.rel = rel;
424  errarg.local_attnum = -1;
425  errarg.remote_attnum = -1;
426  errcallback.callback = slot_store_error_callback;
427  errcallback.arg = (void *) &errarg;
428  errcallback.previous = error_context_stack;
429  error_context_stack = &errcallback;
430 
431  /* Call the "in" function for each replaced attribute */
432  Assert(natts == rel->attrmap->maplen);
433  for (i = 0; i < natts; i++)
434  {
436  int remoteattnum = rel->attrmap->attnums[i];
437 
438  if (remoteattnum < 0)
439  continue;
440 
441  if (!replaces[remoteattnum])
442  continue;
443 
444  if (values[remoteattnum] != NULL)
445  {
446  Oid typinput;
447  Oid typioparam;
448 
449  errarg.local_attnum = i;
450  errarg.remote_attnum = remoteattnum;
451 
452  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
453  slot->tts_values[i] =
454  OidInputFunctionCall(typinput, values[remoteattnum],
455  typioparam, att->atttypmod);
456  slot->tts_isnull[i] = false;
457 
458  errarg.local_attnum = -1;
459  errarg.remote_attnum = -1;
460  }
461  else
462  {
463  slot->tts_values[i] = (Datum) 0;
464  slot->tts_isnull[i] = true;
465  }
466  }
467 
468  /* Pop the error context stack */
469  error_context_stack = errcallback.previous;
470 
471  /* And finally, declare that "slot" contains a valid virtual tuple */
472  ExecStoreVirtualTuple(slot);
473 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:97
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
int maplen
Definition: attmap.h:37
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:229
struct ErrorContextCallback * previous
Definition: elog.h:228
ErrorContextCallback * error_context_stack
Definition: elog.c:92
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2751
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
#define Assert(condition)
Definition: c.h:745
AttrNumber * attnums
Definition: attmap.h:36
static void slot_store_error_callback(void *arg)
Definition: worker.c:292
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_cstrings()

static void slot_store_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values 
)
static

Definition at line 327 of file worker.c.

References ErrorContextCallback::arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, SlotErrCallbackArg::local_attnum, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

329 {
330  int natts = slot->tts_tupleDescriptor->natts;
331  int i;
332  SlotErrCallbackArg errarg;
333  ErrorContextCallback errcallback;
334 
335  ExecClearTuple(slot);
336 
337  /* Push callback + info on the error context stack */
338  errarg.rel = rel;
339  errarg.local_attnum = -1;
340  errarg.remote_attnum = -1;
341  errcallback.callback = slot_store_error_callback;
342  errcallback.arg = (void *) &errarg;
343  errcallback.previous = error_context_stack;
344  error_context_stack = &errcallback;
345 
346  /* Call the "in" function for each non-dropped attribute */
347  Assert(natts == rel->attrmap->maplen);
348  for (i = 0; i < natts; i++)
349  {
351  int remoteattnum = rel->attrmap->attnums[i];
352 
353  if (!att->attisdropped && remoteattnum >= 0 &&
354  values[remoteattnum] != NULL)
355  {
356  Oid typinput;
357  Oid typioparam;
358 
359  errarg.local_attnum = i;
360  errarg.remote_attnum = remoteattnum;
361 
362  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
363  slot->tts_values[i] =
364  OidInputFunctionCall(typinput, values[remoteattnum],
365  typioparam, att->atttypmod);
366  slot->tts_isnull[i] = false;
367 
368  errarg.local_attnum = -1;
369  errarg.remote_attnum = -1;
370  }
371  else
372  {
373  /*
374  * We assign NULL to dropped attributes, NULL values, and missing
375  * values (missing values should be later filled using
376  * slot_fill_defaults).
377  */
378  slot->tts_values[i] = (Datum) 0;
379  slot->tts_isnull[i] = true;
380  }
381  }
382 
383  /* Pop the error context stack */
384  error_context_stack = errcallback.previous;
385 
386  ExecStoreVirtualTuple(slot);
387 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:97
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
int maplen
Definition: attmap.h:37
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:229
struct ErrorContextCallback * previous
Definition: elog.h:228
ErrorContextCallback * error_context_stack
Definition: elog.c:92
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2751
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
#define Assert(condition)
Definition: c.h:745
AttrNumber * attnums
Definition: attmap.h:36
static void slot_store_error_callback(void *arg)
Definition: worker.c:292
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_error_callback()

static void slot_store_error_callback ( void *  arg)
static

Definition at line 292 of file worker.c.

References LogicalRepRelation::attnames, LogicalRepRelation::atttyps, errcontext, format_type_be(), get_atttype(), SlotErrCallbackArg::local_attnum, LogicalRepRelMapEntry::localreloid, logicalrep_typmap_gettypname(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, LogicalRepRelation::relname, SlotErrCallbackArg::remote_attnum, and LogicalRepRelMapEntry::remoterel.

Referenced by slot_modify_cstrings(), and slot_store_cstrings().

293 {
296  char *remotetypname;
297  Oid remotetypoid,
298  localtypoid;
299 
300  /* Nothing to do if remote attribute number is not set */
301  if (errarg->remote_attnum < 0)
302  return;
303 
304  rel = errarg->rel;
305  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
306 
307  /* Fetch remote type name from the LogicalRepTypMap cache */
308  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
309 
310  /* Fetch local type OID from the local sys cache */
311  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
312 
313  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
314  "remote type %s, local type %s",
315  rel->remoterel.nspname, rel->remoterel.relname,
316  rel->remoterel.attnames[errarg->remote_attnum],
317  remotetypname,
318  format_type_be(localtypoid));
319 }
LogicalRepRelMapEntry * rel
Definition: worker.c:97
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
unsigned int Oid
Definition: postgres_ext.h:31
LogicalRepRelation remoterel
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:911
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:453
#define errcontext
Definition: elog.h:185
void * arg

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 1449 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit().

1450 {
1451  FlushPosition *flushpos;
1452 
1453  /* Need to do this in permanent context */
1455 
1456  /* Track commit lsn */
1457  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1458  flushpos->local_end = XactLastCommitEnd;
1459  flushpos->remote_end = remote_lsn;
1460 
1461  dlist_push_tail(&lsn_mapping, &flushpos->node);
1463 }
static dlist_head lsn_mapping
Definition: worker.c:93
dlist_node node
Definition: worker.c:88
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:364
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:90
MemoryContext ApplyContext
Definition: worker.c:103
XLogRecPtr local_end
Definition: worker.c:89
static MemoryContext ApplyMessageContext
Definition: worker.c:102
void * palloc(Size size)
Definition: mcxt.c:949

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1939 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

1940 {
1941  MySubscriptionValid = false;
1942 }
bool MySubscriptionValid
Definition: worker.c:108

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Definition at line 1468 of file worker.c.

References GetCurrentTimestamp(), LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, MyLogicalRepWorker, LogicalRepWorker::reply_lsn, and LogicalRepWorker::reply_time.

Referenced by LogicalRepApplyLoop().

1469 {
1470  MyLogicalRepWorker->last_lsn = last_lsn;
1471  MyLogicalRepWorker->last_send_time = send_time;
1473  if (reply)
1474  {
1475  MyLogicalRepWorker->reply_lsn = last_lsn;
1476  MyLogicalRepWorker->reply_time = send_time;
1477  }
1478 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
TimestampTz last_send_time
XLogRecPtr last_lsn
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
XLogRecPtr reply_lsn
TimestampTz last_recv_time
TimestampTz reply_time

Variable Documentation

◆ ApplyContext

MemoryContext ApplyContext = NULL

Definition at line 103 of file worker.c.

◆ ApplyMessageContext

MemoryContext ApplyMessageContext = NULL
static

Definition at line 102 of file worker.c.

◆ in_remote_transaction

bool in_remote_transaction = false

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 93 of file worker.c.

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false

Definition at line 108 of file worker.c.

Referenced by ApplyWorkerMain(), maybe_reread_subscription(), and subscription_change_cb().

◆ remote_final_lsn

XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
static

Definition at line 111 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), and should_apply_changes_for_rel().

◆ wrconn