PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/dynahash.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 
struct  StreamXidHash
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 
typedef struct StreamXidHash StreamXidHash
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_cleanup_files (Oid subid, TransactionId xid)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static void apply_dispatch (StringInfo s)
 
static void apply_handle_commit_internal (StringInfo s, LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
 
static void apply_handle_delete_internal (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
 
static bool FindReplTupleInLocalRel (EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static bool ensure_transaction (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_stream_start (StringInfo s)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static HTABxidhash = NULL
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 121 of file worker.c.

Referenced by LogicalRepApplyLoop().

Typedef Documentation

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SlotErrCallbackArg

◆ StreamXidHash

typedef struct StreamXidHash StreamXidHash

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 1900 of file worker.c.

References generate_unaccent_rules::action, apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_END, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_handle_stream_commit(), and LogicalRepApplyLoop().

1901 {
1903 
1904  switch (action)
1905  {
1906  case LOGICAL_REP_MSG_BEGIN:
1907  apply_handle_begin(s);
1908  return;
1909 
1912  return;
1913 
1916  return;
1917 
1920  return;
1921 
1924  return;
1925 
1928  return;
1929 
1932  return;
1933 
1934  case LOGICAL_REP_MSG_TYPE:
1935  apply_handle_type(s);
1936  return;
1937 
1940  return;
1941 
1944  return;
1945 
1948  return;
1949 
1952  return;
1953 
1956  return;
1957  }
1958 
1959  ereport(ERROR,
1960  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1961  errmsg("invalid logical replication message type \"%c\"", action)));
1962 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:1109
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1143
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:825
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:746
int errcode(int sqlerrcode)
Definition: elog.c:694
#define ERROR
Definition: elog.h:45
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:796
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1422
static void apply_handle_begin(StringInfo s)
Definition: worker.c:687
static void apply_handle_commit(StringInfo s)
Definition: worker.c:706
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:923
static void apply_handle_update(StringInfo s)
Definition: worker.c:1261
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1091
#define ereport(elevel,...)
Definition: elog.h:155
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_handle_origin(StringInfo s)
Definition: worker.c:728
LogicalRepMsgType
Definition: logicalproto.h:46
int errmsg(const char *fmt,...)
Definition: elog.c:905
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:1789

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 687 of file worker.c.

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), pgstat_report_activity(), remote_final_lsn, and STATE_RUNNING.

Referenced by apply_dispatch().

688 {
689  LogicalRepBeginData begin_data;
690 
691  logicalrep_read_begin(s, &begin_data);
692 
693  remote_final_lsn = begin_data.final_lsn;
694 
695  in_remote_transaction = true;
696 
698 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:165
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:59
bool in_remote_transaction
Definition: worker.c:164
XLogRecPtr final_lsn
Definition: logicalproto.h:112

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 706 of file worker.c.

References apply_handle_commit_internal(), Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, logicalrep_read_commit(), pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, and STATE_IDLE.

Referenced by apply_dispatch().

707 {
708  LogicalRepCommitData commit_data;
709 
710  logicalrep_read_commit(s, &commit_data);
711 
712  Assert(commit_data.commit_lsn == remote_final_lsn);
713 
714  apply_handle_commit_internal(s, &commit_data);
715 
716  /* Process any tables that are being synchronized in parallel. */
717  process_syncing_tables(commit_data.end_lsn);
718 
720 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:165
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:587
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
Definition: worker.c:1056
#define Assert(condition)
Definition: c.h:804
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:94

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( StringInfo  s,
LogicalRepCommitData commit_data 
)
static

Definition at line 1056 of file worker.c.

References AcceptInvalidationMessages(), LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and store_flush_position().

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

1057 {
1058  if (IsTransactionState())
1059  {
1060  /*
1061  * Update origin state so we can restart streaming from correct
1062  * position in case of crash.
1063  */
1064  replorigin_session_origin_lsn = commit_data->end_lsn;
1066 
1068  pgstat_report_stat(false);
1069 
1070  store_flush_position(commit_data->end_lsn);
1071  }
1072  else
1073  {
1074  /* Process any invalidation messages that might have accumulated. */
1077  }
1078 
1079  in_remote_transaction = false;
1080 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2022
void AcceptInvalidationMessages(void)
Definition: inval.c:687
void CommitTransactionCommand(void)
Definition: xact.c:2939
void pgstat_report_stat(bool disconnect)
Definition: pgstat.c:870
bool in_remote_transaction
Definition: worker.c:164
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
static void maybe_reread_subscription(void)
Definition: worker.c:2380
bool IsTransactionState(void)
Definition: xact.c:371
TimestampTz committime
Definition: logicalproto.h:121

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 1422 of file worker.c.

References AfterTriggerEndQuery(), apply_handle_delete_internal(), apply_handle_tuple_routing(), check_relation_updatable(), CMD_DELETE, CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_tupleTable, ExecInitExtraTupleSlot(), ExecResetTupleTable(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), handle_streamed_transaction(), InitResultRelInfo(), LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), makeNode, MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetDescr, LogicalRepRelMapEntry::remoterel, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), and TTSOpsVirtual.

Referenced by apply_dispatch().

1423 {
1424  ResultRelInfo *resultRelInfo;
1425  LogicalRepRelMapEntry *rel;
1426  LogicalRepTupleData oldtup;
1427  LogicalRepRelId relid;
1428  EState *estate;
1429  TupleTableSlot *remoteslot;
1430  MemoryContext oldctx;
1431 
1433  return;
1434 
1436 
1437  relid = logicalrep_read_delete(s, &oldtup);
1438  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1439  if (!should_apply_changes_for_rel(rel))
1440  {
1441  /*
1442  * The relation can't become interesting in the middle of the
1443  * transaction so it's safe to unlock it.
1444  */
1446  return;
1447  }
1448 
1449  /* Check if we can do the delete. */
1451 
1452  /* Initialize the executor state. */
1453  estate = create_estate_for_relation(rel);
1454  remoteslot = ExecInitExtraTupleSlot(estate,
1455  RelationGetDescr(rel->localrel),
1456  &TTSOpsVirtual);
1457  resultRelInfo = makeNode(ResultRelInfo);
1458  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
1459 
1461 
1462  /* Build the search tuple. */
1464  slot_store_data(remoteslot, rel, &oldtup);
1465  MemoryContextSwitchTo(oldctx);
1466 
1467  /* For a partitioned table, apply delete to correct partition. */
1468  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1469  apply_handle_tuple_routing(resultRelInfo, estate,
1470  remoteslot, NULL, rel, CMD_DELETE);
1471  else
1472  apply_handle_delete_internal(resultRelInfo, estate,
1473  remoteslot, &rel->remoterel);
1474 
1476 
1477  /* Handle queued AFTER triggers. */
1478  AfterTriggerEndQuery(estate);
1479 
1480  ExecResetTupleTable(estate->es_tupleTable, false);
1481  FreeExecutorState(estate);
1482 
1484 
1486 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:263
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:344
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:289
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:1490
#define RelationGetDescr(relation)
Definition: rel.h:483
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:447
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
Form_pg_class rd_rel
Definition: rel.h:110
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
static bool ensure_transaction(void)
Definition: worker.c:279
LogicalRepRelation remoterel
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
#define RowExclusiveLock
Definition: lockdefs.h:38
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:272
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1227
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1564
List * es_tupleTable
Definition: execnodes.h:574
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1021
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1182
#define makeNode(_type_)
Definition: nodes.h:581
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4535
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:465
uint32 LogicalRepRelId
Definition: logicalproto.h:84

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot,
LogicalRepRelation remoterel 
)
static

Definition at line 1490 of file worker.c.

References DEBUG1, elog, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, and ResultRelInfo::ri_RelationDesc.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

1493 {
1494  Relation localrel = relinfo->ri_RelationDesc;
1495  EPQState epqstate;
1496  TupleTableSlot *localslot;
1497  bool found;
1498 
1499  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1500  ExecOpenIndices(relinfo, false);
1501 
1502  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1503  remoteslot, &localslot);
1504 
1505  /* If found delete it. */
1506  if (found)
1507  {
1508  EvalPlanQualSetSlot(&epqstate, localslot);
1509 
1510  /* Do the actual delete. */
1511  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1512  }
1513  else
1514  {
1515  /* The tuple to be deleted could not be found. */
1516  elog(DEBUG1,
1517  "logical replication could not find row for delete "
1518  "in replication target relation \"%s\"",
1519  RelationGetRelationName(localrel));
1520  }
1521 
1522  /* Cleanup. */
1523  ExecCloseIndices(relinfo);
1524  EvalPlanQualEnd(&epqstate);
1525 }
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:415
#define DEBUG1
Definition: elog.h:25
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2801
#define RelationGetRelationName(relation)
Definition: rel.h:491
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2381
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1535
#define elog(elevel,...)
Definition: elog.h:227
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:217

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 1143 of file worker.c.

References AfterTriggerEndQuery(), apply_handle_insert_internal(), apply_handle_tuple_routing(), CMD_INSERT, CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_tupleTable, ExecInitExtraTupleSlot(), ExecResetTupleTable(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), handle_streamed_transaction(), InitResultRelInfo(), LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), makeNode, MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), and TTSOpsVirtual.

Referenced by apply_dispatch().

1144 {
1145  ResultRelInfo *resultRelInfo;
1146  LogicalRepRelMapEntry *rel;
1147  LogicalRepTupleData newtup;
1148  LogicalRepRelId relid;
1149  EState *estate;
1150  TupleTableSlot *remoteslot;
1151  MemoryContext oldctx;
1152 
1154  return;
1155 
1157 
1158  relid = logicalrep_read_insert(s, &newtup);
1159  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1160  if (!should_apply_changes_for_rel(rel))
1161  {
1162  /*
1163  * The relation can't become interesting in the middle of the
1164  * transaction so it's safe to unlock it.
1165  */
1167  return;
1168  }
1169 
1170  /* Initialize the executor state. */
1171  estate = create_estate_for_relation(rel);
1172  remoteslot = ExecInitExtraTupleSlot(estate,
1173  RelationGetDescr(rel->localrel),
1174  &TTSOpsVirtual);
1175  resultRelInfo = makeNode(ResultRelInfo);
1176  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
1177 
1178  /* Input functions may need an active snapshot, so get one */
1180 
1181  /* Process and store remote tuple in the slot */
1183  slot_store_data(remoteslot, rel, &newtup);
1184  slot_fill_defaults(rel, estate, remoteslot);
1185  MemoryContextSwitchTo(oldctx);
1186 
1187  /* For a partitioned table, insert the tuple into a partition. */
1188  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1189  apply_handle_tuple_routing(resultRelInfo, estate,
1190  remoteslot, NULL, rel, CMD_INSERT);
1191  else
1192  apply_handle_insert_internal(resultRelInfo, estate,
1193  remoteslot);
1194 
1196 
1197  /* Handle queued AFTER triggers. */
1198  AfterTriggerEndQuery(estate);
1199 
1200  ExecResetTupleTable(estate->es_tupleTable, false);
1201  FreeExecutorState(estate);
1202 
1204 
1206 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:263
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:344
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define RelationGetDescr(relation)
Definition: rel.h:483
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:447
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
Form_pg_class rd_rel
Definition: rel.h:110
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:374
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
static bool ensure_transaction(void)
Definition: worker.c:279
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
#define RowExclusiveLock
Definition: lockdefs.h:38
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:272
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1564
List * es_tupleTable
Definition: execnodes.h:574
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1021
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1182
#define makeNode(_type_)
Definition: nodes.h:581
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4535
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:465
uint32 LogicalRepRelId
Definition: logicalproto.h:84
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:1210

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot 
)
static

Definition at line 1210 of file worker.c.

References ExecCloseIndices(), ExecOpenIndices(), and ExecSimpleRelationInsert().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

1212 {
1213  ExecOpenIndices(relinfo, false);
1214 
1215  /* Do the insert. */
1216  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1217 
1218  /* Cleanup. */
1219  ExecCloseIndices(relinfo);
1220 }
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 728 of file worker.c.

References am_tablesync_worker(), ereport, errcode(), errmsg(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

729 {
730  /*
731  * ORIGIN message can only come inside streaming transaction or inside
732  * remote transaction and before any actual writes.
733  */
737  ereport(ERROR,
738  (errcode(ERRCODE_PROTOCOL_VIOLATION),
739  errmsg("ORIGIN message sent out of order")));
740 }
int errcode(int sqlerrcode)
Definition: elog.c:694
#define ERROR
Definition: elog.h:45
bool in_remote_transaction
Definition: worker.c:164
bool in_streamed_transaction
Definition: worker.c:168
static bool am_tablesync_worker(void)
#define ereport(elevel,...)
Definition: elog.h:155
bool IsTransactionState(void)
Definition: xact.c:371
int errmsg(const char *fmt,...)
Definition: elog.c:905

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 1091 of file worker.c.

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

1092 {
1093  LogicalRepRelation *rel;
1094 
1096  return;
1097 
1098  rel = logicalrep_read_rel(s);
1100 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:397
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:171
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 825 of file worker.c.

References Assert, BufFileClose(), BufFileOpenShared(), BufFileTruncateShared(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), ensure_transaction(), fd(), SubXactInfo::fileno, HASH_FIND, hash_search(), i, in_streamed_transaction, logicalrep_read_stream_abort(), MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, stream_cleanup_files(), StreamXidHash::stream_fileset, LogicalRepWorker::subid, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_dispatch().

826 {
827  TransactionId xid;
828  TransactionId subxid;
829 
831 
832  logicalrep_read_stream_abort(s, &xid, &subxid);
833 
834  /*
835  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
836  * just delete the files with serialized info.
837  */
838  if (xid == subxid)
840  else
841  {
842  /*
843  * OK, so it's a subxact. We need to read the subxact file for the
844  * toplevel transaction, determine the offset tracked for the subxact,
845  * and truncate the file with changes. We also remove the subxacts
846  * with higher offsets (or rather higher XIDs).
847  *
848  * We intentionally scan the array from the tail, because we're likely
849  * aborting a change for the most recent subtransactions.
850  *
851  * We can't use the binary search here as subxact XIDs won't
852  * necessarily arrive in sorted order, consider the case where we have
853  * released the savepoint for multiple subtransactions and then
854  * performed rollback to savepoint for one of the earlier
855  * sub-transaction.
856  */
857 
858  int64 i;
859  int64 subidx;
860  BufFile *fd;
861  bool found = false;
862  char path[MAXPGPATH];
863  StreamXidHash *ent;
864 
865  subidx = -1;
868 
869  for (i = subxact_data.nsubxacts; i > 0; i--)
870  {
871  if (subxact_data.subxacts[i - 1].xid == subxid)
872  {
873  subidx = (i - 1);
874  found = true;
875  break;
876  }
877  }
878 
879  /*
880  * If it's an empty sub-transaction then we will not find the subxid
881  * here so just cleanup the subxact info and return.
882  */
883  if (!found)
884  {
885  /* Cleanup the subxact info */
887 
889  return;
890  }
891 
892  Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
893 
895  (void *) &xid,
896  HASH_FIND,
897  &found);
898  Assert(found);
899 
900  /* open the changes file */
902  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
903 
904  /* OK, truncate the file at the right offset */
906  subxact_data.subxacts[subidx].offset);
907  BufFileClose(fd);
908 
909  /* discard the subxacts added later */
910  subxact_data.nsubxacts = subidx;
911 
912  /* write the updated subxact list */
914 
916  }
917 }
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:2571
int fileno
Definition: worker.c:184
uint32 TransactionId
Definition: c.h:587
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2726
void CommitTransactionCommand(void)
Definition: xact.c:2939
void BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:861
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
uint32 nsubxacts
Definition: worker.c:191
void BufFileClose(BufFile *file)
Definition: buffile.c:395
SubXactInfo * subxacts
Definition: worker.c:194
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:2739
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
static bool ensure_transaction(void)
Definition: worker.c:279
#define MAXPGPATH
bool in_streamed_transaction
Definition: worker.c:168
SharedFileSet * stream_fileset
Definition: worker.c:149
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
TransactionId xid
Definition: worker.c:183
static HTAB * xidhash
Definition: worker.c:176
#define Assert(condition)
Definition: c.h:804
off_t offset
Definition: worker.c:185
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:2493
static ApplySubXactData subxact_data
Definition: worker.c:197
int i
static void cleanup_subxact_info(void)
Definition: worker.c:2905
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:866

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 923 of file worker.c.

References appendBinaryStringInfo(), apply_dispatch(), apply_handle_commit_internal(), Assert, BufFileClose(), BufFileOpenShared(), BufFileRead(), changes_filename(), CHECK_FOR_INTERRUPTS, LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog, LogicalRepCommitData::end_lsn, ensure_transaction(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_FIND, hash_search(), in_remote_transaction, in_streamed_transaction, initStringInfo(), logicalrep_read_stream_commit(), MAXPGPATH, MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pfree(), pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, repalloc(), resetStringInfo(), s2, STATE_IDLE, STATE_RUNNING, stream_cleanup_files(), StreamXidHash::stream_fileset, LogicalRepWorker::subid, and TopTransactionContext.

Referenced by apply_dispatch().

924 {
925  TransactionId xid;
927  int nchanges;
928  char path[MAXPGPATH];
929  char *buffer = NULL;
930  bool found;
931  LogicalRepCommitData commit_data;
932  StreamXidHash *ent;
933  MemoryContext oldcxt;
934  BufFile *fd;
935 
937 
938  xid = logicalrep_read_stream_commit(s, &commit_data);
939 
940  elog(DEBUG1, "received commit for streamed transaction %u", xid);
941 
943 
944  /*
945  * Allocate file handle and memory required to process all the messages in
946  * TopTransactionContext to avoid them getting reset after each message is
947  * processed.
948  */
950 
951  /* open the spool file for the committed transaction */
953  elog(DEBUG1, "replaying changes from file \"%s\"", path);
955  (void *) &xid,
956  HASH_FIND,
957  &found);
958  Assert(found);
959  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
960 
961  buffer = palloc(BLCKSZ);
962  initStringInfo(&s2);
963 
964  MemoryContextSwitchTo(oldcxt);
965 
966  remote_final_lsn = commit_data.commit_lsn;
967 
968  /*
969  * Make sure the handle apply_dispatch methods are aware we're in a remote
970  * transaction.
971  */
972  in_remote_transaction = true;
974 
975  /*
976  * Read the entries one by one and pass them through the same logic as in
977  * apply_dispatch.
978  */
979  nchanges = 0;
980  while (true)
981  {
982  int nbytes;
983  int len;
984 
986 
987  /* read length of the on-disk record */
988  nbytes = BufFileRead(fd, &len, sizeof(len));
989 
990  /* have we reached end of the file? */
991  if (nbytes == 0)
992  break;
993 
994  /* do we have a correct length? */
995  if (nbytes != sizeof(len))
996  ereport(ERROR,
998  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
999  path)));
1000 
1001  Assert(len > 0);
1002 
1003  /* make sure we have sufficiently large buffer */
1004  buffer = repalloc(buffer, len);
1005 
1006  /* and finally read the data into the buffer */
1007  if (BufFileRead(fd, buffer, len) != len)
1008  ereport(ERROR,
1010  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1011  path)));
1012 
1013  /* copy the buffer to the stringinfo and call apply_dispatch */
1014  resetStringInfo(&s2);
1015  appendBinaryStringInfo(&s2, buffer, len);
1016 
1017  /* Ensure we are reading the data into our memory context. */
1019 
1020  apply_dispatch(&s2);
1021 
1023 
1024  MemoryContextSwitchTo(oldcxt);
1025 
1026  nchanges++;
1027 
1028  if (nchanges % 1000 == 0)
1029  elog(DEBUG1, "replayed %d changes from file '%s'",
1030  nchanges, path);
1031  }
1032 
1033  BufFileClose(fd);
1034 
1035  pfree(buffer);
1036  pfree(s2.data);
1037 
1038  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1039  nchanges, path);
1040 
1041  apply_handle_commit_internal(s, &commit_data);
1042 
1043  /* unlink the files with serialized changes and subxact info */
1045 
1046  /* Process any tables that are being synchronized in parallel. */
1047  process_syncing_tables(commit_data.end_lsn);
1048 
1050 }
#define DEBUG1
Definition: elog.h:25
MemoryContext TopTransactionContext
Definition: mcxt.c:49
static XLogRecPtr remote_final_lsn
Definition: worker.c:165
uint32 TransactionId
Definition: c.h:587
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:824
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:587
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2726
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
Definition: worker.c:1056
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void pfree(void *pointer)
Definition: mcxt.c:1057
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:2739
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:45
static bool ensure_transaction(void)
Definition: worker.c:279
bool in_remote_transaction
Definition: worker.c:164
#define MAXPGPATH
bool in_streamed_transaction
Definition: worker.c:168
int errcode_for_file_access(void)
Definition: elog.c:717
SharedFileSet * stream_fileset
Definition: worker.c:149
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
char * s2
#define ereport(elevel,...)
Definition: elog.h:155
static HTAB * xidhash
Definition: worker.c:176
static void apply_dispatch(StringInfo s)
Definition: worker.c:1900
static MemoryContext ApplyMessageContext
Definition: worker.c:153
#define Assert(condition)
Definition: c.h:804
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:905
#define elog(elevel,...)
Definition: elog.h:227
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 746 of file worker.c.

References ApplyContext, Assert, ensure_transaction(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, in_streamed_transaction, HASHCTL::keysize, logicalrep_read_stream_start(), MyLogicalRepWorker, pgstat_report_activity(), STATE_RUNNING, stream_open_file(), stream_xid, LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_dispatch().

747 {
748  bool first_segment;
749  HASHCTL hash_ctl;
750 
752 
753  /*
754  * Start a transaction on stream start, this transaction will be committed
755  * on the stream stop unless it is a tablesync worker in which case it
756  * will be committed after processing all the messages. We need the
757  * transaction for handling the buffile, used for serializing the
758  * streaming data and subxact info.
759  */
761 
762  /* notify handle methods we're processing a remote transaction */
764 
765  /* extract XID of the top-level transaction */
766  stream_xid = logicalrep_read_stream_start(s, &first_segment);
767 
768  /*
769  * Initialize the xidhash table if we haven't yet. This will be used for
770  * the entire duration of the apply worker so create it in permanent
771  * context.
772  */
773  if (xidhash == NULL)
774  {
775  hash_ctl.keysize = sizeof(TransactionId);
776  hash_ctl.entrysize = sizeof(StreamXidHash);
777  hash_ctl.hcxt = ApplyContext;
778  xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
780  }
781 
782  /* open the spool file for this transaction */
784 
785  /* if this is not the first segment, open existing subxact file */
786  if (!first_segment)
788 
790 }
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:2571
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
Size entrysize
Definition: hsearch.h:76
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
static bool ensure_transaction(void)
Definition: worker.c:279
bool in_streamed_transaction
Definition: worker.c:168
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext ApplyContext
Definition: worker.c:154
#define HASH_BLOBS
Definition: hsearch.h:97
Size keysize
Definition: hsearch.h:75
static HTAB * xidhash
Definition: worker.c:176
#define Assert(condition)
Definition: c.h:804
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:774
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:2782
static TransactionId stream_xid
Definition: worker.c:170
struct StreamXidHash StreamXidHash

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 796 of file worker.c.

References Assert, CommitTransactionCommand(), in_streamed_transaction, IsTransactionState(), MemoryContextReset(), MyLogicalRepWorker, pgstat_report_activity(), STATE_IDLE, stream_close_file(), stream_xid, LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_dispatch().

797 {
799 
800  /*
801  * Close the file with serialized changes, and serialize information about
802  * subxacts for the toplevel transaction.
803  */
806 
807  /* We must be in a valid transaction state */
809 
810  /* Commit the per-stream transaction */
812 
813  in_streamed_transaction = false;
814 
815  /* Reset per-stream context */
817 
819 }
static void stream_close_file(void)
Definition: worker.c:2857
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
void CommitTransactionCommand(void)
Definition: xact.c:2939
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
bool in_streamed_transaction
Definition: worker.c:168
static MemoryContext LogicalStreamingContext
Definition: worker.c:157
#define Assert(condition)
Definition: c.h:804
bool IsTransactionState(void)
Definition: xact.c:371
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:2493
static TransactionId stream_xid
Definition: worker.c:170

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 1789 of file worker.c.

References CommandCounterIncrement(), DROP_RESTRICT, ensure_transaction(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, RowExclusiveLock, should_apply_changes_for_rel(), table_close(), and table_open().

Referenced by apply_dispatch().

1790 {
1791  bool cascade = false;
1792  bool restart_seqs = false;
1793  List *remote_relids = NIL;
1794  List *remote_rels = NIL;
1795  List *rels = NIL;
1796  List *part_rels = NIL;
1797  List *relids = NIL;
1798  List *relids_logged = NIL;
1799  ListCell *lc;
1800 
1802  return;
1803 
1805 
1806  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1807 
1808  foreach(lc, remote_relids)
1809  {
1810  LogicalRepRelId relid = lfirst_oid(lc);
1811  LogicalRepRelMapEntry *rel;
1812 
1813  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1814  if (!should_apply_changes_for_rel(rel))
1815  {
1816  /*
1817  * The relation can't become interesting in the middle of the
1818  * transaction so it's safe to unlock it.
1819  */
1821  continue;
1822  }
1823 
1824  remote_rels = lappend(remote_rels, rel);
1825  rels = lappend(rels, rel->localrel);
1826  relids = lappend_oid(relids, rel->localreloid);
1828  relids_logged = lappend_oid(relids_logged, rel->localreloid);
1829 
1830  /*
1831  * Truncate partitions if we got a message to truncate a partitioned
1832  * table.
1833  */
1834  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1835  {
1836  ListCell *child;
1837  List *children = find_all_inheritors(rel->localreloid,
1839  NULL);
1840 
1841  foreach(child, children)
1842  {
1843  Oid childrelid = lfirst_oid(child);
1844  Relation childrel;
1845 
1846  if (list_member_oid(relids, childrelid))
1847  continue;
1848 
1849  /* find_all_inheritors already got lock */
1850  childrel = table_open(childrelid, NoLock);
1851 
1852  /*
1853  * Ignore temp tables of other backends. See similar code in
1854  * ExecuteTruncate().
1855  */
1856  if (RELATION_IS_OTHER_TEMP(childrel))
1857  {
1858  table_close(childrel, RowExclusiveLock);
1859  continue;
1860  }
1861 
1862  rels = lappend(rels, childrel);
1863  part_rels = lappend(part_rels, childrel);
1864  relids = lappend_oid(relids, childrelid);
1865  /* Log this relation only if needed for logical decoding */
1866  if (RelationIsLogicallyLogged(childrel))
1867  relids_logged = lappend_oid(relids_logged, childrelid);
1868  }
1869  }
1870  }
1871 
1872  /*
1873  * Even if we used CASCADE on the upstream primary we explicitly default
1874  * to replaying changes without further cascading. This might be later
1875  * changeable with a user specified option.
1876  */
1877  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
1878 
1879  foreach(lc, remote_rels)
1880  {
1881  LogicalRepRelMapEntry *rel = lfirst(lc);
1882 
1884  }
1885  foreach(lc, part_rels)
1886  {
1887  Relation rel = lfirst(lc);
1888 
1889  table_close(rel, NoLock);
1890  }
1891 
1893 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:263
#define NIL
Definition: pg_list.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:343
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:447
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:636
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
static bool ensure_transaction(void)
Definition: worker.c:279
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1684
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:272
List * lappend(List *list, void *datum)
Definition: list.c:336
void CommandCounterIncrement(void)
Definition: xact.c:1021
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:689
#define lfirst(lc)
Definition: pg_list.h:169
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:594
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
uint32 LogicalRepRelId
Definition: logicalproto.h:84
#define lfirst_oid(lc)
Definition: pg_list.h:171

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
LogicalRepRelMapEntry relmapentry,
CmdType  operation 
)
static

Definition at line 1564 of file worker.c.

References apply_handle_delete_internal(), apply_handle_insert_internal(), Assert, TupleConversionMap::attrMap, CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog, ERROR, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCleanupTupleRouting(), ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), NIL, ModifyTableState::operation, PlanState::plan, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_RootToPartitionMap, slot_getallattrs(), slot_modify_data(), PlanState::state, and table_slot_create().

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

1570 {
1571  Relation parentrel = relinfo->ri_RelationDesc;
1572  ModifyTableState *mtstate = NULL;
1573  PartitionTupleRouting *proute = NULL;
1574  ResultRelInfo *partrelinfo;
1575  Relation partrel;
1576  TupleTableSlot *remoteslot_part;
1577  TupleConversionMap *map;
1578  MemoryContext oldctx;
1579 
1580  /* ModifyTableState is needed for ExecFindPartition(). */
1581  mtstate = makeNode(ModifyTableState);
1582  mtstate->ps.plan = NULL;
1583  mtstate->ps.state = estate;
1584  mtstate->operation = operation;
1585  mtstate->resultRelInfo = relinfo;
1586  proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
1587 
1588  /*
1589  * Find the partition to which the "search tuple" belongs.
1590  */
1591  Assert(remoteslot != NULL);
1593  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1594  remoteslot, estate);
1595  Assert(partrelinfo != NULL);
1596  partrel = partrelinfo->ri_RelationDesc;
1597 
1598  /*
1599  * To perform any of the operations below, the tuple must match the
1600  * partition's rowtype. Convert if needed or just copy, using a dedicated
1601  * slot to store the tuple in any case.
1602  */
1603  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
1604  if (remoteslot_part == NULL)
1605  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1606  map = partrelinfo->ri_RootToPartitionMap;
1607  if (map != NULL)
1608  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1609  remoteslot_part);
1610  else
1611  {
1612  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1613  slot_getallattrs(remoteslot_part);
1614  }
1615  MemoryContextSwitchTo(oldctx);
1616 
1617  switch (operation)
1618  {
1619  case CMD_INSERT:
1620  apply_handle_insert_internal(partrelinfo, estate,
1621  remoteslot_part);
1622  break;
1623 
1624  case CMD_DELETE:
1625  apply_handle_delete_internal(partrelinfo, estate,
1626  remoteslot_part,
1627  &relmapentry->remoterel);
1628  break;
1629 
1630  case CMD_UPDATE:
1631 
1632  /*
1633  * For UPDATE, depending on whether or not the updated tuple
1634  * satisfies the partition's constraint, perform a simple UPDATE
1635  * of the partition or move the updated tuple into a different
1636  * suitable partition.
1637  */
1638  {
1639  AttrMap *attrmap = map ? map->attrMap : NULL;
1640  LogicalRepRelMapEntry *part_entry;
1641  TupleTableSlot *localslot;
1642  ResultRelInfo *partrelinfo_new;
1643  bool found;
1644 
1645  part_entry = logicalrep_partition_open(relmapentry, partrel,
1646  attrmap);
1647 
1648  /* Get the matching local tuple from the partition. */
1649  found = FindReplTupleInLocalRel(estate, partrel,
1650  &part_entry->remoterel,
1651  remoteslot_part, &localslot);
1652 
1654  if (found)
1655  {
1656  /* Apply the update. */
1657  slot_modify_data(remoteslot_part, localslot,
1658  part_entry,
1659  newtup);
1660  MemoryContextSwitchTo(oldctx);
1661  }
1662  else
1663  {
1664  /*
1665  * The tuple to be updated could not be found.
1666  *
1667  * TODO what to do here, change the log level to LOG
1668  * perhaps?
1669  */
1670  elog(DEBUG1,
1671  "logical replication did not find row for update "
1672  "in replication target relation \"%s\"",
1673  RelationGetRelationName(partrel));
1674  }
1675 
1676  /*
1677  * Does the updated tuple still satisfy the current
1678  * partition's constraint?
1679  */
1680  if (!partrel->rd_rel->relispartition ||
1681  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1682  false))
1683  {
1684  /*
1685  * Yes, so simply UPDATE the partition. We don't call
1686  * apply_handle_update_internal() here, which would
1687  * normally do the following work, to avoid repeating some
1688  * work already done above to find the local tuple in the
1689  * partition.
1690  */
1691  EPQState epqstate;
1692 
1693  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1694  ExecOpenIndices(partrelinfo, false);
1695 
1696  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1697  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
1698  localslot, remoteslot_part);
1699  ExecCloseIndices(partrelinfo);
1700  EvalPlanQualEnd(&epqstate);
1701  }
1702  else
1703  {
1704  /* Move the tuple into the new partition. */
1705 
1706  /*
1707  * New partition will be found using tuple routing, which
1708  * can only occur via the parent table. We might need to
1709  * convert the tuple to the parent's rowtype. Note that
1710  * this is the tuple found in the partition, not the
1711  * original search tuple received by this function.
1712  */
1713  if (map)
1714  {
1715  TupleConversionMap *PartitionToRootMap =
1717  RelationGetDescr(parentrel));
1718 
1719  remoteslot =
1720  execute_attr_map_slot(PartitionToRootMap->attrMap,
1721  remoteslot_part, remoteslot);
1722  }
1723  else
1724  {
1725  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1726  slot_getallattrs(remoteslot);
1727  }
1728 
1729 
1730  /* Find the new partition. */
1732  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1733  proute, remoteslot,
1734  estate);
1735  MemoryContextSwitchTo(oldctx);
1736  Assert(partrelinfo_new != partrelinfo);
1737 
1738  /* DELETE old tuple found in the old partition. */
1739  apply_handle_delete_internal(partrelinfo, estate,
1740  localslot,
1741  &relmapentry->remoterel);
1742 
1743  /* INSERT new tuple into the new partition. */
1744 
1745  /*
1746  * Convert the replacement tuple to match the destination
1747  * partition rowtype.
1748  */
1750  partrel = partrelinfo_new->ri_RelationDesc;
1751  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
1752  if (remoteslot_part == NULL)
1753  remoteslot_part = table_slot_create(partrel,
1754  &estate->es_tupleTable);
1755  map = partrelinfo_new->ri_RootToPartitionMap;
1756  if (map != NULL)
1757  {
1758  remoteslot_part = execute_attr_map_slot(map->attrMap,
1759  remoteslot,
1760  remoteslot_part);
1761  }
1762  else
1763  {
1764  remoteslot_part = ExecCopySlot(remoteslot_part,
1765  remoteslot);
1766  slot_getallattrs(remoteslot);
1767  }
1768  MemoryContextSwitchTo(oldctx);
1769  apply_handle_insert_internal(partrelinfo_new, estate,
1770  remoteslot_part);
1771  }
1772  }
1773  break;
1774 
1775  default:
1776  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1777  break;
1778  }
1779 
1780  ExecCleanupTupleRouting(mtstate, proute);
1781 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:415
#define DEBUG1
Definition: elog.h:25
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:1490
#define RelationGetDescr(relation)
Definition: rel.h:483
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1170
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
CmdType operation
Definition: execnodes.h:1162
EState * state
Definition: execnodes.h:943
Form_pg_class rd_rel
Definition: rel.h:110
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
Definition: attmap.h:34
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2801
#define ERROR
Definition: elog.h:45
PlanState ps
Definition: execnodes.h:1161
LogicalRepRelation remoterel
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:505
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:580
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
#define RelationGetRelationName(relation)
Definition: rel.h:491
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, ModifyTableState *mtstate, Relation rel)
AttrMap * attrMap
Definition: tupconvert.h:28
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2381
List * es_tupleTable
Definition: execnodes.h:574
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1535
Plan * plan
Definition: execnodes.h:941
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:629
#define makeNode(_type_)
Definition: nodes.h:581
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
#define Assert(condition)
Definition: c.h:804
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:504
#define elog(elevel,...)
Definition: elog.h:227
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1665
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:217
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:1210

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 1109 of file worker.c.

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

1110 {
1111  LogicalRepTyp typ;
1112 
1114  return;
1115 
1116  logicalrep_read_typ(s, &typ);
1118 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:453
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:467

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 1261 of file worker.c.

References AfterTriggerEndQuery(), apply_handle_tuple_routing(), apply_handle_update_internal(), Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_range_table, EState::es_tupleTable, ExecInitExtraTupleSlot(), ExecResetTupleTable(), fill_extraUpdatedCols(), FirstLowInvalidHeapAttributeNumber, FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), handle_streamed_transaction(), i, InitResultRelInfo(), list_nth(), LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), makeNode, MemoryContextSwitchTo(), TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RangeTblEntry::updatedCols.

Referenced by apply_dispatch().

1262 {
1263  ResultRelInfo *resultRelInfo;
1264  LogicalRepRelMapEntry *rel;
1265  LogicalRepRelId relid;
1266  EState *estate;
1267  LogicalRepTupleData oldtup;
1268  LogicalRepTupleData newtup;
1269  bool has_oldtup;
1270  TupleTableSlot *remoteslot;
1271  RangeTblEntry *target_rte;
1272  MemoryContext oldctx;
1273 
1275  return;
1276 
1278 
1279  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1280  &newtup);
1281  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1282  if (!should_apply_changes_for_rel(rel))
1283  {
1284  /*
1285  * The relation can't become interesting in the middle of the
1286  * transaction so it's safe to unlock it.
1287  */
1289  return;
1290  }
1291 
1292  /* Check if we can do the update. */
1294 
1295  /* Initialize the executor state. */
1296  estate = create_estate_for_relation(rel);
1297  remoteslot = ExecInitExtraTupleSlot(estate,
1298  RelationGetDescr(rel->localrel),
1299  &TTSOpsVirtual);
1300  resultRelInfo = makeNode(ResultRelInfo);
1301  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
1302 
1303  /*
1304  * Populate updatedCols so that per-column triggers can fire, and so
1305  * executor can correctly pass down indexUnchanged hint. This could
1306  * include more columns than were actually changed on the publisher
1307  * because the logical replication protocol doesn't contain that
1308  * information. But it would for example exclude columns that only exist
1309  * on the subscriber, since we are not touching those.
1310  */
1311  target_rte = list_nth(estate->es_range_table, 0);
1312  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1313  {
1315  int remoteattnum = rel->attrmap->attnums[i];
1316 
1317  if (!att->attisdropped && remoteattnum >= 0)
1318  {
1319  Assert(remoteattnum < newtup.ncols);
1320  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1321  target_rte->updatedCols =
1322  bms_add_member(target_rte->updatedCols,
1324  }
1325  }
1326 
1327  /* Also populate extraUpdatedCols, in case we have generated columns */
1328  fill_extraUpdatedCols(target_rte, rel->localrel);
1329 
1331 
1332  /* Build the search tuple. */
1334  slot_store_data(remoteslot, rel,
1335  has_oldtup ? &oldtup : &newtup);
1336  MemoryContextSwitchTo(oldctx);
1337 
1338  /* For a partitioned table, apply update to correct partition. */
1339  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1340  apply_handle_tuple_routing(resultRelInfo, estate,
1341  remoteslot, &newtup, rel, CMD_UPDATE);
1342  else
1343  apply_handle_update_internal(resultRelInfo, estate,
1344  remoteslot, &newtup, rel);
1345 
1347 
1348  /* Handle queued AFTER triggers. */
1349  AfterTriggerEndQuery(estate);
1350 
1351  ExecResetTupleTable(estate->es_tupleTable, false);
1352  FreeExecutorState(estate);
1353 
1355 
1357 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:263
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:344
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define RelationGetDescr(relation)
Definition: rel.h:483
static void apply_handle_update_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
Definition: worker.c:1361
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:447
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
List * es_range_table
Definition: execnodes.h:532
Form_pg_class rd_rel
Definition: rel.h:110
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:80
static bool ensure_transaction(void)
Definition: worker.c:279
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
#define RowExclusiveLock
Definition: lockdefs.h:38
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:272
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1227
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1564
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
List * es_tupleTable
Definition: execnodes.h:574
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void CommandCounterIncrement(void)
Definition: xact.c:1021
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1182
Bitmapset * updatedCols
Definition: parsenodes.h:1128
#define makeNode(_type_)
Definition: nodes.h:581
#define Assert(condition)
Definition: c.h:804
AttrNumber * attnums
Definition: attmap.h:36
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:218
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4535
int i
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:465
uint32 LogicalRepRelId
Definition: logicalproto.h:84

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ResultRelInfo relinfo,
EState estate,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
LogicalRepRelMapEntry relmapentry 
)
static

Definition at line 1361 of file worker.c.

References DEBUG1, elog, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, and slot_modify_data().

Referenced by apply_handle_update().

1365 {
1366  Relation localrel = relinfo->ri_RelationDesc;
1367  EPQState epqstate;
1368  TupleTableSlot *localslot;
1369  bool found;
1370  MemoryContext oldctx;
1371 
1372  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1373  ExecOpenIndices(relinfo, false);
1374 
1375  found = FindReplTupleInLocalRel(estate, localrel,
1376  &relmapentry->remoterel,
1377  remoteslot, &localslot);
1378  ExecClearTuple(remoteslot);
1379 
1380  /*
1381  * Tuple found.
1382  *
1383  * Note this will fail if there are other conflicting unique indexes.
1384  */
1385  if (found)
1386  {
1387  /* Process and store remote tuple in the slot */
1389  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1390  MemoryContextSwitchTo(oldctx);
1391 
1392  EvalPlanQualSetSlot(&epqstate, remoteslot);
1393 
1394  /* Do the actual update. */
1395  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1396  remoteslot);
1397  }
1398  else
1399  {
1400  /*
1401  * The tuple to be updated could not be found.
1402  *
1403  * TODO what to do here, change the log level to LOG perhaps?
1404  */
1405  elog(DEBUG1,
1406  "logical replication did not find row for update "
1407  "in replication target relation \"%s\"",
1408  RelationGetRelationName(localrel));
1409  }
1410 
1411  /* Cleanup. */
1412  ExecCloseIndices(relinfo);
1413  EvalPlanQualEnd(&epqstate);
1414 }
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:415
#define DEBUG1
Definition: elog.h:25
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2801
LogicalRepRelation remoterel
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:580
#define RelationGetRelationName(relation)
Definition: rel.h:491
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2381
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1535
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
#define elog(elevel,...)
Definition: elog.h:227
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:217

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 2918 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

2919 {
2920  int worker_slot = DatumGetInt32(main_arg);
2921  MemoryContext oldctx;
2922  char originname[NAMEDATALEN];
2923  XLogRecPtr origin_startpos;
2924  char *myslotname;
2926 
2927  /* Attach to slot */
2928  logicalrep_worker_attach(worker_slot);
2929 
2930  /* Setup signal handling */
2932  pqsignal(SIGTERM, die);
2934 
2935  /*
2936  * We don't currently need any ResourceOwner in a walreceiver process, but
2937  * if we did, we could call CreateAuxProcessResourceOwner here.
2938  */
2939 
2940  /* Initialise stats to a sanish value */
2943 
2944  /* Load the libpq-specific functions */
2945  load_file("libpqwalreceiver", false);
2946 
2947  /* Run as replica session replication role. */
2948  SetConfigOption("session_replication_role", "replica",
2950 
2951  /* Connect to our database. */
2954  0);
2955 
2956  /*
2957  * Set always-secure search path, so malicious users can't redirect user
2958  * code (e.g. pg_index.indexprs).
2959  */
2960  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
2961 
2962  /* Load the subscription into persistent memory context. */
2964  "ApplyContext",
2968 
2970  if (!MySubscription)
2971  {
2972  ereport(LOG,
2973  (errmsg("logical replication apply worker for subscription %u will not "
2974  "start because the subscription was removed during startup",
2976  proc_exit(0);
2977  }
2978 
2979  MySubscriptionValid = true;
2980  MemoryContextSwitchTo(oldctx);
2981 
2982  if (!MySubscription->enabled)
2983  {
2984  ereport(LOG,
2985  (errmsg("logical replication apply worker for subscription \"%s\" will not "
2986  "start because the subscription was disabled during startup",
2987  MySubscription->name)));
2988 
2989  proc_exit(0);
2990  }
2991 
2992  /* Setup synchronous commit according to the user's wishes */
2993  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2995 
2996  /* Keep us informed about subscription changes. */
2999  (Datum) 0);
3000 
3001  if (am_tablesync_worker())
3002  ereport(LOG,
3003  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3005  else
3006  ereport(LOG,
3007  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3008  MySubscription->name)));
3009 
3011 
3012  /* Connect to the origin and start the replication. */
3013  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3015 
3016  if (am_tablesync_worker())
3017  {
3018  char *syncslotname;
3019 
3020  /* This is table synchronization worker, call initial sync. */
3021  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3022 
3023  /* allocate slot name in long-lived context */
3024  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3025 
3026  pfree(syncslotname);
3027  }
3028  else
3029  {
3030  /* This is main apply worker */
3031  RepOriginId originid;
3032  TimeLineID startpointTLI;
3033  char *err;
3034 
3035  myslotname = MySubscription->slotname;
3036 
3037  /*
3038  * This shouldn't happen if the subscription is enabled, but guard
3039  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3040  * crash if slot is NULL.)
3041  */
3042  if (!myslotname)
3043  ereport(ERROR,
3044  (errmsg("subscription has no replication slot set")));
3045 
3046  /* Setup replication origin tracking. */
3048  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3049  originid = replorigin_by_name(originname, true);
3050  if (!OidIsValid(originid))
3051  originid = replorigin_create(originname);
3052  replorigin_session_setup(originid);
3053  replorigin_session_origin = originid;
3054  origin_startpos = replorigin_session_get_progress(false);
3056 
3058  &err);
3059  if (wrconn == NULL)
3060  ereport(ERROR,
3061  (errmsg("could not connect to the publisher: %s", err)));
3062 
3063  /*
3064  * We don't really use the output identify_system for anything but it
3065  * does some initializations on the upstream so let's still call it.
3066  */
3067  (void) walrcv_identify_system(wrconn, &startpointTLI);
3068  }
3069 
3070  /*
3071  * Setup callback for syscache so that we know when something changes in
3072  * the subscription relation state.
3073  */
3076  (Datum) 0);
3077 
3078  /* Build logical replication streaming options. */
3079  options.logical = true;
3080  options.startpoint = origin_startpos;
3081  options.slotname = myslotname;
3082  options.proto.logical.proto_version =
3083  walrcv_server_version(wrconn) >= 140000 ?
3085  options.proto.logical.publication_names = MySubscription->publications;
3086  options.proto.logical.binary = MySubscription->binary;
3087  options.proto.logical.streaming = MySubscription->stream;
3088 
3089  /* Start normal logical streaming replication. */
3090  walrcv_startstreaming(wrconn, &options);
3091 
3092  /* Run the main loop. */
3093  LogicalRepApplyLoop(origin_startpos);
3094 
3095  proc_exit(0);
3096 }
Subscription * MySubscription
Definition: worker.c:161
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:32
WalReceiverConn * wrconn
Definition: worker.c:159
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:59
#define DatumGetInt32(X)
Definition: postgres.h:472
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:407
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void CommitTransactionCommand(void)
Definition: xact.c:2939
#define walrcv_server_version(conn)
Definition: walreceiver.h:409
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:65
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:413
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1203
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1068
union WalRcvStreamOptions::@102 proto
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:913
#define OidIsValid(objectId)
Definition: c.h:710
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2478
void pfree(void *pointer)
Definition: mcxt.c:1057
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:45
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
void logicalrep_worker_attach(int slot)
Definition: launcher.c:570
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7888
#define SIGHUP
Definition: win32_port.h:159
XLogRecPtr startpoint
Definition: walreceiver.h:168
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
MemoryContext TopMemoryContext
Definition: mcxt.c:44
MemoryContext ApplyContext
Definition: worker.c:154
static char ** options
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
uintptr_t Datum
Definition: postgres.h:367
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5682
#define ereport(elevel,...)
Definition: elog.h:155
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:154
void StartTransactionCommand(void)
Definition: xact.c:2838
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
bool MySubscriptionValid
Definition: worker.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:905
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1174
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2057
#define elog(elevel,...)
Definition: elog.h:227
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
#define snprintf
Definition: port.h:216
#define die(msg)
Definition: pg_test_fsync.c:97
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:264
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5711
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:399

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 2726 of file worker.c.

References MAXPGPATH, and snprintf.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), stream_cleanup_files(), and stream_open_file().

2727 {
2728  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
2729 }
#define MAXPGPATH
#define snprintf
Definition: port.h:216

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 1227 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

1228 {
1229  /* Updatable, no error. */
1230  if (rel->updatable)
1231  return;
1232 
1233  /*
1234  * We are in error mode so it's fine this is somewhat slow. It's better to
1235  * give user correct error.
1236  */
1238  {
1239  ereport(ERROR,
1240  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1241  errmsg("publisher did not send replica identity column "
1242  "expected by the logical replication target relation \"%s.%s\"",
1243  rel->remoterel.nspname, rel->remoterel.relname)));
1244  }
1245 
1246  ereport(ERROR,
1247  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1248  errmsg("logical replication target relation \"%s.%s\" has "
1249  "neither REPLICA IDENTITY index nor PRIMARY "
1250  "KEY and published relation does not have "
1251  "REPLICA IDENTITY FULL",
1252  rel->remoterel.nspname, rel->remoterel.relname)));
1253 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1126
int errcode(int sqlerrcode)
Definition: elog.c:694
#define OidIsValid(objectId)
Definition: c.h:710
#define ERROR
Definition: elog.h:45
LogicalRepRelation remoterel
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg(const char *fmt,...)
Definition: elog.c:905

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 2905 of file worker.c.

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by apply_handle_stream_abort(), and subxact_info_write().

2906 {
2907  if (subxact_data.subxacts)
2909 
2910  subxact_data.subxacts = NULL;
2912  subxact_data.nsubxacts = 0;
2914 }
TransactionId subxact_last
Definition: worker.c:193
uint32 nsubxacts
Definition: worker.c:191
void pfree(void *pointer)
Definition: mcxt.c:1057
SubXactInfo * subxacts
Definition: worker.c:194
#define InvalidTransactionId
Definition: transam.h:31
static ApplySubXactData subxact_data
Definition: worker.c:197
uint32 nsubxacts_max
Definition: worker.c:192

◆ create_estate_for_relation()

static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 344 of file worker.c.

References AccessShareLock, AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), GetCurrentCommandId(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

345 {
346  EState *estate;
347  RangeTblEntry *rte;
348 
349  estate = CreateExecutorState();
350 
351  rte = makeNode(RangeTblEntry);
352  rte->rtekind = RTE_RELATION;
353  rte->relid = RelationGetRelid(rel->localrel);
354  rte->relkind = rel->localrel->rd_rel->relkind;
356  ExecInitRangeTable(estate, list_make1(rte));
357 
358  estate->es_output_cid = GetCurrentCommandId(true);
359 
360  /* Prepare to catch AFTER triggers. */
362 
363  return estate;
364 }
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
CommandId es_output_cid
Definition: execnodes.h:544
#define AccessShareLock
Definition: lockdefs.h:36
Form_pg_class rd_rel
Definition: rel.h:110
#define list_make1(x1)
Definition: pg_list.h:206
EState * CreateExecutorState(void)
Definition: execUtils.c:90
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4515
#define makeNode(_type_)
Definition: nodes.h:581
RTEKind rtekind
Definition: parsenodes.h:981
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:761
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ ensure_transaction()

static bool ensure_transaction ( void  )
static

Definition at line 279 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), maybe_reread_subscription(), MemoryContextSwitchTo(), SetCurrentStatementStartTimestamp(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_start(), apply_handle_truncate(), and apply_handle_update().

280 {
281  if (IsTransactionState())
282  {
284 
287 
288  return false;
289  }
290 
293 
295 
297  return true;
298 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void maybe_reread_subscription(void)
Definition: worker.c:2380
static MemoryContext ApplyMessageContext
Definition: worker.c:153
void StartTransactionCommand(void)
Definition: xact.c:2838
bool IsTransactionState(void)
Definition: xact.c:371
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:833

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( EState estate,
Relation  localrel,
LogicalRepRelation remoterel,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 1535 of file worker.c.

References Assert, EState::es_tupleTable, GetRelationIdentityOrPK(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, and table_slot_create().

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

1539 {
1540  Oid idxoid;
1541  bool found;
1542 
1543  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1544 
1545  idxoid = GetRelationIdentityOrPK(localrel);
1546  Assert(OidIsValid(idxoid) ||
1547  (remoterel->replident == REPLICA_IDENTITY_FULL));
1548 
1549  if (OidIsValid(idxoid))
1550  found = RelationFindReplTupleByIndex(localrel, idxoid,
1552  remoteslot, *localslot);
1553  else
1554  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1555  remoteslot, *localslot);
1556 
1557  return found;
1558 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1126
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:710
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
List * es_tupleTable
Definition: execnodes.h:574
#define Assert(condition)
Definition: c.h:804
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 1978 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, FlushPosition::node, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

1980 {
1981  dlist_mutable_iter iter;
1982  XLogRecPtr local_flush = GetFlushRecPtr();
1983 
1985  *flush = InvalidXLogRecPtr;
1986 
1988  {
1989  FlushPosition *pos =
1990  dlist_container(FlushPosition, node, iter.cur);
1991 
1992  *write = pos->remote_end;
1993 
1994  if (pos->local_end <= local_flush)
1995  {
1996  *flush = pos->remote_end;
1997  dlist_delete(iter.cur);
1998  pfree(pos);
1999  }
2000  else
2001  {
2002  /*
2003  * Don't want to uselessly iterate over the rest of the list which
2004  * could potentially be long. Instead get the last element and
2005  * grab the write position from there.
2006  */
2007  pos = dlist_tail_element(FlushPosition, node,
2008  &lsn_mapping);
2009  *write = pos->remote_end;
2010  *have_pending_txes = true;
2011  return;
2012  }
2013  }
2014 
2015  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2016 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:130
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:127
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8484
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1057
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:126
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 1126 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by check_relation_updatable(), and FindReplTupleInLocalRel().

1127 {
1128  Oid idxoid;
1129 
1130  idxoid = RelationGetReplicaIndex(rel);
1131 
1132  if (!OidIsValid(idxoid))
1133  idxoid = RelationGetPrimaryKeyIndex(rel);
1134 
1135  return idxoid;
1136 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4735
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:710
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4714

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 309 of file worker.c.

References Assert, in_streamed_transaction, pq_getmsgint(), stream_write_change(), stream_xid, subxact_info_add(), and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

310 {
311  TransactionId xid;
312 
313  /* not in streaming mode */
315  return false;
316 
317  Assert(stream_fd != NULL);
319 
320  /*
321  * We should have received XID of the subxact as the first part of the
322  * message, so extract it.
323  */
324  xid = pq_getmsgint(s, 4);
325 
327 
328  /* Add the new subxact to the array (unless already there). */
329  subxact_info_add(xid);
330 
331  /* write the change to the current file */
333 
334  return true;
335 }
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:2878
static void subxact_info_add(TransactionId xid)
Definition: worker.c:2641
uint32 TransactionId
Definition: c.h:587
bool in_streamed_transaction
Definition: worker.c:168
#define Assert(condition)
Definition: c.h:804
static BufFile * stream_fd
Definition: worker.c:179
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static TransactionId stream_xid
Definition: worker.c:170

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 3102 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

3103 {
3104  return MyLogicalRepWorker != NULL;
3105 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 2057 of file worker.c.

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, StringInfoData::len, LOG, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

2058 {
2059  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2060  bool ping_sent = false;
2061  TimeLineID tli;
2062 
2063  /*
2064  * Init the ApplyMessageContext which we clean up after each replication
2065  * protocol message.
2066  */
2068  "ApplyMessageContext",
2070 
2071  /*
2072  * This memory context is used for per-stream data when the streaming mode
2073  * is enabled. This context is reset on each stream stop.
2074  */
2076  "LogicalStreamingContext",
2078 
2079  /* mark as idle, before starting to loop */
2081 
2082  /* This outer loop iterates once per wait. */
2083  for (;;)
2084  {
2086  int rc;
2087  int len;
2088  char *buf = NULL;
2089  bool endofstream = false;
2090  long wait_time;
2091 
2093 
2095 
2096  len = walrcv_receive(wrconn, &buf, &fd);
2097 
2098  if (len != 0)
2099  {
2100  /* Loop to process all available data (without blocking). */
2101  for (;;)
2102  {
2104 
2105  if (len == 0)
2106  {
2107  break;
2108  }
2109  else if (len < 0)
2110  {
2111  ereport(LOG,
2112  (errmsg("data stream from publisher has ended")));
2113  endofstream = true;
2114  break;
2115  }
2116  else
2117  {
2118  int c;
2119  StringInfoData s;
2120 
2121  /* Reset timeout. */
2122  last_recv_timestamp = GetCurrentTimestamp();
2123  ping_sent = false;
2124 
2125  /* Ensure we are reading the data into our memory context. */
2127 
2128  s.data = buf;
2129  s.len = len;
2130  s.cursor = 0;
2131  s.maxlen = -1;
2132 
2133  c = pq_getmsgbyte(&s);
2134 
2135  if (c == 'w')
2136  {
2137  XLogRecPtr start_lsn;
2138  XLogRecPtr end_lsn;
2139  TimestampTz send_time;
2140 
2141  start_lsn = pq_getmsgint64(&s);
2142  end_lsn = pq_getmsgint64(&s);
2143  send_time = pq_getmsgint64(&s);
2144 
2145  if (last_received < start_lsn)
2146  last_received = start_lsn;
2147 
2148  if (last_received < end_lsn)
2149  last_received = end_lsn;
2150 
2151  UpdateWorkerStats(last_received, send_time, false);
2152 
2153  apply_dispatch(&s);
2154  }
2155  else if (c == 'k')
2156  {
2157  XLogRecPtr end_lsn;
2159  bool reply_requested;
2160 
2161  end_lsn = pq_getmsgint64(&s);
2162  timestamp = pq_getmsgint64(&s);
2163  reply_requested = pq_getmsgbyte(&s);
2164 
2165  if (last_received < end_lsn)
2166  last_received = end_lsn;
2167 
2168  send_feedback(last_received, reply_requested, false);
2169  UpdateWorkerStats(last_received, timestamp, true);
2170  }
2171  /* other message types are purposefully ignored */
2172 
2174  }
2175 
2176  len = walrcv_receive(wrconn, &buf, &fd);
2177  }
2178  }
2179 
2180  /* confirm all writes so far */
2181  send_feedback(last_received, false, false);
2182 
2184  {
2185  /*
2186  * If we didn't get any transactions for a while there might be
2187  * unconsumed invalidation messages in the queue, consume them
2188  * now.
2189  */
2192 
2193  /* Process any table synchronization changes. */
2194  process_syncing_tables(last_received);
2195  }
2196 
2197  /* Cleanup the memory. */
2200 
2201  /* Check if we need to exit the streaming loop. */
2202  if (endofstream)
2203  break;
2204 
2205  /*
2206  * Wait for more data or latch. If we have unflushed transactions,
2207  * wake up after WalWriterDelay to see if they've been flushed yet (in
2208  * which case we should send a feedback message). Otherwise, there's
2209  * no particular urgency about waking up unless we get data or a
2210  * signal.
2211  */
2212  if (!dlist_is_empty(&lsn_mapping))
2213  wait_time = WalWriterDelay;
2214  else
2215  wait_time = NAPTIME_PER_CYCLE;
2216 
2220  fd, wait_time,
2222 
2223  if (rc & WL_LATCH_SET)
2224  {
2227  }
2228 
2229  if (ConfigReloadPending)
2230  {
2231  ConfigReloadPending = false;
2233  }
2234 
2235  if (rc & WL_TIMEOUT)
2236  {
2237  /*
2238  * We didn't receive anything new. If we haven't heard anything
2239  * from the server for more than wal_receiver_timeout / 2, ping
2240  * the server. Also, if it's been longer than
2241  * wal_receiver_status_interval since the last update we sent,
2242  * send a status update to the primary anyway, to report any
2243  * progress in applying WAL.
2244  */
2245  bool requestReply = false;
2246 
2247  /*
2248  * Check if time since last receive from primary has reached the
2249  * configured limit.
2250  */
2251  if (wal_receiver_timeout > 0)
2252  {
2254  TimestampTz timeout;
2255 
2256  timeout =
2257  TimestampTzPlusMilliseconds(last_recv_timestamp,
2259 
2260  if (now >= timeout)
2261  ereport(ERROR,
2262  (errmsg("terminating logical replication worker due to timeout")));
2263 
2264  /* Check to see if it's time for a ping. */
2265  if (!ping_sent)
2266  {
2267  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2268  (wal_receiver_timeout / 2));
2269  if (now >= timeout)
2270  {
2271  requestReply = true;
2272  ping_sent = true;
2273  }
2274  }
2275  }
2276 
2277  send_feedback(last_received, requestReply, requestReply);
2278  }
2279  }
2280 
2281  /* All done */
2282  walrcv_endstreaming(wrconn, &tli);
2283 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2292
WalReceiverConn * wrconn
Definition: worker.c:159
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:415
uint32 TimeLineID
Definition: xlogdefs.h:59
void AcceptInvalidationMessages(void)
Definition: inval.c:687
#define WL_TIMEOUT
Definition: latch.h:128
void ProcessConfigFile(GucContext context)
static dlist_head lsn_mapping
Definition: worker.c:130
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:587
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2041
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:417
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define WL_SOCKET_READABLE
Definition: latch.h:126
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ResetLatch(Latch *latch)
Definition: latch.c:660
int wal_receiver_timeout
Definition: walreceiver.c:89
#define ERROR
Definition: elog.h:45
#define NAPTIME_PER_CYCLE
Definition: worker.c:121
bool in_remote_transaction
Definition: worker.c:164
bool in_streamed_transaction
Definition: worker.c:168
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * c
static char * buf
Definition: pg_test_fsync.c:68
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
int pgsocket
Definition: port.h:31
MemoryContext TopMemoryContext
Definition: mcxt.c:44
Definition: guc.h:72
MemoryContext ApplyContext
Definition: worker.c:154
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define PGINVALID_SOCKET
Definition: port.h:33
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:155
static MemoryContext LogicalStreamingContext
Definition: worker.c:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void apply_dispatch(StringInfo s)
Definition: worker.c:1900
static void maybe_reread_subscription(void)
Definition: worker.c:2380
static MemoryContext ApplyMessageContext
Definition: worker.c:153
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
int WalWriterDelay
Definition: walwriter.c:70
int errmsg(const char *fmt,...)
Definition: elog.c:905
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
#define WL_LATCH_SET
Definition: latch.h:125
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 2380 of file worker.c.

References Assert, Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, and Subscription::synccommit.

Referenced by apply_handle_commit_internal(), ensure_transaction(), and LogicalRepApplyLoop().

2381 {
2382  MemoryContext oldctx;
2384  bool started_tx = false;
2385 
2386  /* When cache state is valid there is nothing to do here. */
2387  if (MySubscriptionValid)
2388  return;
2389 
2390  /* This function might be called inside or outside of transaction. */
2391  if (!IsTransactionState())
2392  {
2394  started_tx = true;
2395  }
2396 
2397  /* Ensure allocations in permanent context. */
2399 
2400  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2401 
2402  /*
2403  * Exit if the subscription was removed. This normally should not happen
2404  * as the worker gets killed during DROP SUBSCRIPTION.
2405  */
2406  if (!newsub)
2407  {
2408  ereport(LOG,
2409  (errmsg("logical replication apply worker for subscription \"%s\" will "
2410  "stop because the subscription was removed",
2411  MySubscription->name)));
2412 
2413  proc_exit(0);
2414  }
2415 
2416  /*
2417  * Exit if the subscription was disabled. This normally should not happen
2418  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2419  */
2420  if (!newsub->enabled)
2421  {
2422  ereport(LOG,
2423  (errmsg("logical replication apply worker for subscription \"%s\" will "
2424  "stop because the subscription was disabled",
2425  MySubscription->name)));
2426 
2427  proc_exit(0);
2428  }
2429 
2430  /* !slotname should never happen when enabled is true. */
2431  Assert(newsub->slotname);
2432 
2433  /*
2434  * Exit if any parameter that affects the remote connection was changed.
2435  * The launcher will start a new worker.
2436  */
2437  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2438  strcmp(newsub->name, MySubscription->name) != 0 ||
2439  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2440  newsub->binary != MySubscription->binary ||
2441  newsub->stream != MySubscription->stream ||
2443  {
2444  ereport(LOG,
2445  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2446  MySubscription->name)));
2447 
2448  proc_exit(0);
2449  }
2450 
2451  /* Check for other changes that should never happen too. */
2452  if (newsub->dbid != MySubscription->dbid)
2453  {
2454  elog(ERROR, "subscription %u changed unexpectedly",
2456  }
2457 
2458  /* Clean old subscription info and switch to new one. */
2461 
2462  MemoryContextSwitchTo(oldctx);
2463 
2464  /* Change synchronous commit according to the user's wishes */
2465  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2467 
2468  if (started_tx)
2470 
2471  MySubscriptionValid = true;
2472 }
Subscription * MySubscription
Definition: worker.c:161
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3073
void CommitTransactionCommand(void)
Definition: xact.c:2939
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:104
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:45
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7888
List * publications
MemoryContext ApplyContext
Definition: worker.c:154
#define ereport(elevel,...)
Definition: elog.h:155
#define Assert(condition)
Definition: c.h:804
void StartTransactionCommand(void)
Definition: xact.c:2838
bool IsTransactionState(void)
Definition: xact.c:371
bool MySubscriptionValid
Definition: worker.c:162
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:905
#define elog(elevel,...)
Definition: elog.h:227
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 2292 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

2293 {
2294  static StringInfo reply_message = NULL;
2295  static TimestampTz send_time = 0;
2296 
2297  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2298  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2299  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2300 
2301  XLogRecPtr writepos;
2302  XLogRecPtr flushpos;
2303  TimestampTz now;
2304  bool have_pending_txes;
2305 
2306  /*
2307  * If the user doesn't want status to be reported to the publisher, be
2308  * sure to exit before doing anything at all.
2309  */
2310  if (!force && wal_receiver_status_interval <= 0)
2311  return;
2312 
2313  /* It's legal to not pass a recvpos */
2314  if (recvpos < last_recvpos)
2315  recvpos = last_recvpos;
2316 
2317  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2318 
2319  /*
2320  * No outstanding transactions to flush, we can report the latest received
2321  * position. This is important for synchronous replication.
2322  */
2323  if (!have_pending_txes)
2324  flushpos = writepos = recvpos;
2325 
2326  if (writepos < last_writepos)
2327  writepos = last_writepos;
2328 
2329  if (flushpos < last_flushpos)
2330  flushpos = last_flushpos;
2331 
2332  now = GetCurrentTimestamp();
2333 
2334  /* if we've already reported everything we're good */
2335  if (!force &&
2336  writepos == last_writepos &&
2337  flushpos == last_flushpos &&
2338  !TimestampDifferenceExceeds(send_time, now,
2340  return;
2341  send_time = now;
2342 
2343  if (!reply_message)
2344  {
2346 
2347  reply_message = makeStringInfo();
2348  MemoryContextSwitchTo(oldctx);
2349  }
2350  else
2351  resetStringInfo(reply_message);
2352 
2353  pq_sendbyte(reply_message, 'r');
2354  pq_sendint64(reply_message, recvpos); /* write */
2355  pq_sendint64(reply_message, flushpos); /* flush */
2356  pq_sendint64(reply_message, writepos); /* apply */
2357  pq_sendint64(reply_message, now); /* sendTime */
2358  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2359 
2360  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2361  force,
2362  LSN_FORMAT_ARGS(recvpos),
2363  LSN_FORMAT_ARGS(writepos),
2364  LSN_FORMAT_ARGS(flushpos));
2365 
2366  walrcv_send(wrconn, reply_message->data, reply_message->len);
2367 
2368  if (recvpos > last_recvpos)
2369  last_recvpos = recvpos;
2370  if (writepos > last_writepos)
2371  last_writepos = writepos;
2372  if (flushpos > last_flushpos)
2373  last_flushpos = flushpos;
2374 }
WalReceiverConn * wrconn
Definition: worker.c:159
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1709
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1978
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:117
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
MemoryContext ApplyContext
Definition: worker.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:419
#define elog(elevel,...)
Definition: elog.h:227
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 263 of file worker.c.

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

264 {
265  if (am_tablesync_worker())
266  return MyLogicalRepWorker->relid == rel->localreloid;
267  else
268  return (rel->state == SUBREL_STATE_READY ||
269  (rel->state == SUBREL_STATE_SYNCDONE &&
270  rel->statelsn <= remote_final_lsn));
271 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:165
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
static bool am_tablesync_worker(void)

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 374 of file worker.c.

References Assert, attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

376 {
377  TupleDesc desc = RelationGetDescr(rel->localrel);
378  int num_phys_attrs = desc->natts;
379  int i;
380  int attnum,
381  num_defaults = 0;
382  int *defmap;
383  ExprState **defexprs;
384  ExprContext *econtext;
385 
386  econtext = GetPerTupleExprContext(estate);
387 
388  /* We got all the data via replication, no need to evaluate anything. */
389  if (num_phys_attrs == rel->remoterel.natts)
390  return;
391 
392  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
393  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
394 
395  Assert(rel->attrmap->maplen == num_phys_attrs);
396  for (attnum = 0; attnum < num_phys_attrs; attnum++)
397  {
398  Expr *defexpr;
399 
400  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
401  continue;
402 
403  if (rel->attrmap->attnums[attnum] >= 0)
404  continue;
405 
406  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
407 
408  if (defexpr != NULL)
409  {
410  /* Run the expression through planner */
411  defexpr = expression_planner(defexpr);
412 
413  /* Initialize executable expression in copycontext */
414  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
415  defmap[num_defaults] = attnum;
416  num_defaults++;
417  }
418 
419  }
420 
421  for (i = 0; i < num_defaults; i++)
422  slot->tts_values[defmap[i]] =
423  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
424 }
#define RelationGetDescr(relation)
Definition: rel.h:483
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
Expr * expression_planner(Expr *expr)
Definition: planner.c:6165
int maplen
Definition: attmap.h:37
Datum * tts_values
Definition: tuptable.h:126
#define GetPerTupleExprContext(estate)
Definition: executor.h:509
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:292
Node * build_column_default(Relation rel, int attrno)
int16 attnum
Definition: pg_attribute.h:83
#define Assert(condition)
Definition: c.h:804
AttrNumber * attnums
Definition: attmap.h:36
void * palloc(Size size)
Definition: mcxt.c:950
int i
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 580 of file worker.c.

References ErrorContextCallback::arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, SlotErrCallbackArg::local_attnum, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

583 {
584  int natts = slot->tts_tupleDescriptor->natts;
585  int i;
586  SlotErrCallbackArg errarg;
587  ErrorContextCallback errcallback;
588 
589  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
590  ExecClearTuple(slot);
591 
592  /*
593  * Copy all the column data from srcslot, so that we'll have valid values
594  * for unreplaced columns.
595  */
596  Assert(natts == srcslot->tts_tupleDescriptor->natts);
597  slot_getallattrs(srcslot);
598  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
599  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
600 
601  /* For error reporting, push callback + info on the error context stack */
602  errarg.rel = rel;
603  errarg.local_attnum = -1;
604  errarg.remote_attnum = -1;
605  errcallback.callback = slot_store_error_callback;
606  errcallback.arg = (void *) &errarg;
607  errcallback.previous = error_context_stack;
608  error_context_stack = &errcallback;
609 
610  /* Call the "in" function for each replaced attribute */
611  Assert(natts == rel->attrmap->maplen);
612  for (i = 0; i < natts; i++)
613  {
615  int remoteattnum = rel->attrmap->attnums[i];
616 
617  if (remoteattnum < 0)
618  continue;
619 
620  Assert(remoteattnum < tupleData->ncols);
621 
622  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
623  {
624  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
625 
626  errarg.local_attnum = i;
627  errarg.remote_attnum = remoteattnum;
628 
629  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
630  {
631  Oid typinput;
632  Oid typioparam;
633 
634  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
635  slot->tts_values[i] =
636  OidInputFunctionCall(typinput, colvalue->data,
637  typioparam, att->atttypmod);
638  slot->tts_isnull[i] = false;
639  }
640  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
641  {
642  Oid typreceive;
643  Oid typioparam;
644 
645  /*
646  * In some code paths we may be asked to re-parse the same
647  * tuple data. Reset the StringInfo's cursor so that works.
648  */
649  colvalue->cursor = 0;
650 
651  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
652  slot->tts_values[i] =
653  OidReceiveFunctionCall(typreceive, colvalue,
654  typioparam, att->atttypmod);
655 
656  /* Trouble if it didn't eat the whole buffer */
657  if (colvalue->cursor != colvalue->len)
658  ereport(ERROR,
659  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
660  errmsg("incorrect binary data format in logical replication column %d",
661  remoteattnum + 1)));
662  slot->tts_isnull[i] = false;
663  }
664  else
665  {
666  /* must be LOGICALREP_COLUMN_NULL */
667  slot->tts_values[i] = (Datum) 0;
668  slot->tts_isnull[i] = true;
669  }
670 
671  errarg.local_attnum = -1;
672  errarg.remote_attnum = -1;
673  }
674  }
675 
676  /* Pop the error context stack */
677  error_context_stack = errcallback.previous;
678 
679  /* And finally, declare that "slot" contains a valid virtual tuple */
680  ExecStoreVirtualTuple(slot);
681 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:134
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
int maplen
Definition: attmap.h:37
int errcode(int sqlerrcode)
Definition: elog.c:694
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:81
Datum * tts_values
Definition: tuptable.h:126
StringInfoData * colvalues
Definition: logicalproto.h:70
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:242
struct ErrorContextCallback * previous
Definition: elog.h:241
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:82
ErrorContextCallback * error_context_stack
Definition: elog.c:93
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:80
#define ERROR
Definition: elog.h:45
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1665
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2887
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2821
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
#define ereport(elevel,...)
Definition: elog.h:155
#define Assert(condition)
Definition: c.h:804
AttrNumber * attnums
Definition: attmap.h:36
static void slot_store_error_callback(void *arg)
Definition: worker.c:430
int errmsg(const char *fmt,...)
Definition: elog.c:905
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1647
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 465 of file worker.c.

References ErrorContextCallback::arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ErrorContextCallback::callback, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, SlotErrCallbackArg::local_attnum, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, SlotErrCallbackArg::remote_attnum, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

467 {
468  int natts = slot->tts_tupleDescriptor->natts;
469  int i;
470  SlotErrCallbackArg errarg;
471  ErrorContextCallback errcallback;
472 
473  ExecClearTuple(slot);
474 
475  /* Push callback + info on the error context stack */
476  errarg.rel = rel;
477  errarg.local_attnum = -1;
478  errarg.remote_attnum = -1;
479  errcallback.callback = slot_store_error_callback;
480  errcallback.arg = (void *) &errarg;
481  errcallback.previous = error_context_stack;
482  error_context_stack = &errcallback;
483 
484  /* Call the "in" function for each non-dropped, non-null attribute */
485  Assert(natts == rel->attrmap->maplen);
486  for (i = 0; i < natts; i++)
487  {
489  int remoteattnum = rel->attrmap->attnums[i];
490 
491  if (!att->attisdropped && remoteattnum >= 0)
492  {
493  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
494 
495  Assert(remoteattnum < tupleData->ncols);
496 
497  errarg.local_attnum = i;
498  errarg.remote_attnum = remoteattnum;
499 
500  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
501  {
502  Oid typinput;
503  Oid typioparam;
504 
505  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
506  slot->tts_values[i] =
507  OidInputFunctionCall(typinput, colvalue->data,
508  typioparam, att->atttypmod);
509  slot->tts_isnull[i] = false;
510  }
511  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
512  {
513  Oid typreceive;
514  Oid typioparam;
515 
516  /*
517  * In some code paths we may be asked to re-parse the same
518  * tuple data. Reset the StringInfo's cursor so that works.
519  */
520  colvalue->cursor = 0;
521 
522  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
523  slot->tts_values[i] =
524  OidReceiveFunctionCall(typreceive, colvalue,
525  typioparam, att->atttypmod);
526 
527  /* Trouble if it didn't eat the whole buffer */
528  if (colvalue->cursor != colvalue->len)
529  ereport(ERROR,
530  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
531  errmsg("incorrect binary data format in logical replication column %d",
532  remoteattnum + 1)));
533  slot->tts_isnull[i] = false;
534  }
535  else
536  {
537  /*
538  * NULL value from remote. (We don't expect to see
539  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
540  * NULL.)
541  */
542  slot->tts_values[i] = (Datum) 0;
543  slot->tts_isnull[i] = true;
544  }
545 
546  errarg.local_attnum = -1;
547  errarg.remote_attnum = -1;
548  }
549  else
550  {
551  /*
552  * We assign NULL to dropped attributes and missing values
553  * (missing values should be later filled using
554  * slot_fill_defaults).
555  */
556  slot->tts_values[i] = (Datum) 0;
557  slot->tts_isnull[i] = true;
558  }
559  }
560 
561  /* Pop the error context stack */
562  error_context_stack = errcallback.previous;
563 
564  ExecStoreVirtualTuple(slot);
565 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
LogicalRepRelMapEntry * rel
Definition: worker.c:134
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
int maplen
Definition: attmap.h:37
int errcode(int sqlerrcode)
Definition: elog.c:694
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:81
Datum * tts_values
Definition: tuptable.h:126
StringInfoData * colvalues
Definition: logicalproto.h:70
unsigned int Oid
Definition: postgres_ext.h:31
void(* callback)(void *arg)
Definition: elog.h:242
struct ErrorContextCallback * previous
Definition: elog.h:241
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:82
ErrorContextCallback * error_context_stack
Definition: elog.c:93
#define ERROR
Definition: elog.h:45
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1665
bool * tts_isnull
Definition: tuptable.h:128
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2887
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2821
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
uintptr_t Datum
Definition: postgres.h:367
#define ereport(elevel,...)
Definition: elog.h:155
#define Assert(condition)
Definition: c.h:804
AttrNumber * attnums
Definition: attmap.h:36
static void slot_store_error_callback(void *arg)
Definition: worker.c:430
int errmsg(const char *fmt,...)
Definition: elog.c:905
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1647
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522

◆ slot_store_error_callback()

static void slot_store_error_callback ( void *  arg)
static

Definition at line 430 of file worker.c.

References LogicalRepRelation::attnames, LogicalRepRelation::atttyps, errcontext, format_type_be(), get_atttype(), SlotErrCallbackArg::local_attnum, LogicalRepRelMapEntry::localreloid, logicalrep_typmap_gettypname(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, LogicalRepRelation::relname, SlotErrCallbackArg::remote_attnum, and LogicalRepRelMapEntry::remoterel.

Referenced by slot_modify_data(), and slot_store_data().

431 {
434  char *remotetypname;
435  Oid remotetypoid,
436  localtypoid;
437 
438  /* Nothing to do if remote attribute number is not set */
439  if (errarg->remote_attnum < 0)
440  return;
441 
442  rel = errarg->rel;
443  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
444 
445  /* Fetch remote type name from the LogicalRepTypMap cache */
446  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
447 
448  /* Fetch local type OID from the local sys cache */
449  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
450 
451  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
452  "remote type %s, local type %s",
453  rel->remoterel.nspname, rel->remoterel.relname,
454  rel->remoterel.attnames[errarg->remote_attnum],
455  remotetypname,
456  format_type_be(localtypoid));
457 }
LogicalRepRelMapEntry * rel
Definition: worker.c:134
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
unsigned int Oid
Definition: postgres_ext.h:31
LogicalRepRelation remoterel
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:938
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:501
#define errcontext
Definition: elog.h:199
void * arg

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 2022 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit_internal().

2023 {
2024  FlushPosition *flushpos;
2025 
2026  /* Need to do this in permanent context */
2028 
2029  /* Track commit lsn */
2030  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2031  flushpos->local_end = XactLastCommitEnd;
2032  flushpos->remote_end = remote_lsn;
2033 
2034  dlist_push_tail(&lsn_mapping, &flushpos->node);
2036 }
static dlist_head lsn_mapping
Definition: worker.c:130
dlist_node node
Definition: worker.c:125
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:363
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:127
MemoryContext ApplyContext
Definition: worker.c:154
XLogRecPtr local_end
Definition: worker.c:126
static MemoryContext ApplyMessageContext
Definition: worker.c:153
void * palloc(Size size)
Definition: mcxt.c:950

◆ stream_cleanup_files()

static void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 2739 of file worker.c.

References Assert, changes_filename(), HASH_REMOVE, hash_search(), MAXPGPATH, pfree(), SharedFileSetDeleteAll(), StreamXidHash::stream_fileset, subxact_filename(), and StreamXidHash::subxact_fileset.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_commit().

2740 {
2741  char path[MAXPGPATH];
2742  StreamXidHash *ent;
2743 
2744  /* Remove the xid entry from the stream xid hash */
2745  ent = (StreamXidHash *) hash_search(xidhash,
2746  (void *) &xid,
2747  HASH_REMOVE,
2748  NULL);
2749  /* By this time we must have created the transaction entry */
2750  Assert(ent != NULL);
2751 
2752  /* Delete the change file and release the stream fileset memory */
2753  changes_filename(path, subid, xid);
2755  pfree(ent->stream_fileset);
2756  ent->stream_fileset = NULL;
2757 
2758  /* Delete the subxact file and release the memory, if it exist */
2759  if (ent->subxact_fileset)
2760  {
2761  subxact_filename(path, subid, xid);
2763  pfree(ent->subxact_fileset);
2764  ent->subxact_fileset = NULL;
2765  }
2766 }
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2726
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2719
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void pfree(void *pointer)
Definition: mcxt.c:1057
#define MAXPGPATH
SharedFileSet * stream_fileset
Definition: worker.c:149
SharedFileSet * subxact_fileset
Definition: worker.c:150
static HTAB * xidhash
Definition: worker.c:176
void SharedFileSetDeleteAll(SharedFileSet *fileset)
#define Assert(condition)
Definition: c.h:804

◆ stream_close_file()

static void stream_close_file ( void  )
static

Definition at line 2857 of file worker.c.

References Assert, BufFileClose(), in_streamed_transaction, InvalidTransactionId, stream_xid, and TransactionIdIsValid.

Referenced by apply_handle_stream_stop().

2858 {
2861  Assert(stream_fd != NULL);
2862 
2864 
2866  stream_fd = NULL;
2867 }
void BufFileClose(BufFile *file)
Definition: buffile.c:395
bool in_streamed_transaction
Definition: worker.c:168
#define InvalidTransactionId
Definition: transam.h:31
#define Assert(condition)
Definition: c.h:804
static BufFile * stream_fd
Definition: worker.c:179
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static TransactionId stream_xid
Definition: worker.c:170

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first 
)
static

Definition at line 2782 of file worker.c.

References Assert, BufFileCreateShared(), BufFileOpenShared(), BufFileSeek(), changes_filename(), DEBUG1, elog, HASH_ENTER, HASH_FIND, hash_search(), in_streamed_transaction, MAXPGPATH, MemoryContextSwitchTo(), OidIsValid, palloc(), SharedFileSetInit(), StreamXidHash::stream_fileset, StreamXidHash::subxact_fileset, TransactionIdIsValid, and StreamXidHash::xid.

Referenced by apply_handle_stream_start().

2783 {
2784  char path[MAXPGPATH];
2785  bool found;
2786  MemoryContext oldcxt;
2787  StreamXidHash *ent;
2788 
2790  Assert(OidIsValid(subid));
2792  Assert(stream_fd == NULL);
2793 
2794  /* create or find the xid entry in the xidhash */
2795  ent = (StreamXidHash *) hash_search(xidhash,
2796  (void *) &xid,
2798  &found);
2799  Assert(first_segment || found);
2800  changes_filename(path, subid, xid);
2801  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
2802 
2803  /*
2804  * Create/open the buffiles under the logical streaming context so that we
2805  * have those files until stream stop.
2806  */
2808 
2809  /*
2810  * If this is the first streamed segment, the file must not exist, so make
2811  * sure we're the ones creating it. Otherwise just open the file for
2812  * writing, in append mode.
2813  */
2814  if (first_segment)
2815  {
2816  MemoryContext savectx;
2817  SharedFileSet *fileset;
2818 
2819  /*
2820  * We need to maintain shared fileset across multiple stream
2821  * start/stop calls. So, need to allocate it in a persistent context.
2822  */
2824  fileset = palloc(sizeof(SharedFileSet));
2825 
2826  SharedFileSetInit(fileset, NULL);
2827  MemoryContextSwitchTo(savectx);
2828 
2829  stream_fd = BufFileCreateShared(fileset, path);
2830 
2831  /* Remember the fileset for the next stream of the same transaction */
2832  ent->xid = xid;
2833  ent->stream_fileset = fileset;
2834  ent->subxact_fileset = NULL;
2835  }
2836  else
2837  {
2838  /*
2839  * Open the file and seek to the end of the file because we always
2840  * append the changes file.
2841  */
2842  stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
2843  BufFileSeek(stream_fd, 0, 0, SEEK_END);
2844  }
2845 
2846  MemoryContextSwitchTo(oldcxt);
2847 }
#define DEBUG1
Definition: elog.h:25
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:64
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:650
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2726
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
#define OidIsValid(objectId)
Definition: c.h:710
#define MAXPGPATH
bool in_streamed_transaction
Definition: worker.c:168
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:262
SharedFileSet * stream_fileset
Definition: worker.c:149
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
MemoryContext ApplyContext
Definition: worker.c:154
SharedFileSet * subxact_fileset
Definition: worker.c:150
static MemoryContext LogicalStreamingContext
Definition: worker.c:157
static HTAB * xidhash
Definition: worker.c:176
#define Assert(condition)
Definition: c.h:804
TransactionId xid
Definition: worker.c:148
void * palloc(Size size)
Definition: mcxt.c:950
static BufFile * stream_fd
Definition: worker.c:179
#define elog(elevel,...)
Definition: elog.h:227
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 2878 of file worker.c.

References Assert, BufFileWrite(), StringInfoData::cursor, StringInfoData::data, in_streamed_transaction, StringInfoData::len, stream_xid, and TransactionIdIsValid.

Referenced by handle_streamed_transaction().

2879 {
2880  int len;
2881 
2884  Assert(stream_fd != NULL);
2885 
2886  /* total on-disk size, including the action type character */
2887  len = (s->len - s->cursor) + sizeof(char);
2888 
2889  /* first write the size */
2890  BufFileWrite(stream_fd, &len, sizeof(len));
2891 
2892  /* then the action */
2893  BufFileWrite(stream_fd, &action, sizeof(action));
2894 
2895  /* and finally the remaining part of the buffer (after the XID) */
2896  len = (s->len - s->cursor);
2897 
2898  BufFileWrite(stream_fd, &s->data[s->cursor], len);
2899 }
bool in_streamed_transaction
Definition: worker.c:168
#define Assert(condition)
Definition: c.h:804
static BufFile * stream_fd
Definition: worker.c:179
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:586
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static TransactionId stream_xid
Definition: worker.c:170

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 2478 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

2479 {
2480  MySubscriptionValid = false;
2481 }
bool MySubscriptionValid
Definition: worker.c:162

◆ subxact_filename()

static void subxact_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 2719 of file worker.c.

References MAXPGPATH, and snprintf.

Referenced by stream_cleanup_files(), subxact_info_read(), and subxact_info_write().

2720 {
2721  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
2722 }
#define MAXPGPATH
#define snprintf
Definition: port.h:216

◆ subxact_info_add()

static void subxact_info_add ( TransactionId  xid)
static

Definition at line 2641 of file worker.c.

References Assert, BufFileTell(), SubXactInfo::fileno, i, MemoryContextSwitchTo(), ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, SubXactInfo::offset, palloc(), repalloc(), stream_xid, ApplySubXactData::subxact_last, ApplySubXactData::subxacts, TransactionIdIsValid, and SubXactInfo::xid.

Referenced by handle_streamed_transaction().

2642 {
2643  SubXactInfo *subxacts = subxact_data.subxacts;
2644  int64 i;
2645 
2646  /* We must have a valid top level stream xid and a stream fd. */
2648  Assert(stream_fd != NULL);
2649 
2650  /*
2651  * If the XID matches the toplevel transaction, we don't want to add it.
2652  */
2653  if (stream_xid == xid)
2654  return;
2655 
2656  /*
2657  * In most cases we're checking the same subxact as we've already seen in
2658  * the last call, so make sure to ignore it (this change comes later).
2659  */
2660  if (subxact_data.subxact_last == xid)
2661  return;
2662 
2663  /* OK, remember we're processing this XID. */
2664  subxact_data.subxact_last = xid;
2665 
2666  /*
2667  * Check if the transaction is already present in the array of subxact. We
2668  * intentionally scan the array from the tail, because we're likely adding
2669  * a change for the most recent subtransactions.
2670  *
2671  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
2672  * would allow us to use binary search here.
2673  */
2674  for (i = subxact_data.nsubxacts; i > 0; i--)
2675  {
2676  /* found, so we're done */
2677  if (subxacts[i - 1].xid == xid)
2678  return;
2679  }
2680 
2681  /* This is a new subxact, so we need to add it to the array. */
2682  if (subxact_data.nsubxacts == 0)
2683  {
2684  MemoryContext oldctx;
2685 
2687 
2688  /*
2689  * Allocate this memory for subxacts in per-stream context, see
2690  * subxact_info_read.
2691  */
2693  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2694  MemoryContextSwitchTo(oldctx);
2695  }
2697  {
2699  subxacts = repalloc(subxacts,
2701  }
2702 
2703  subxacts[subxact_data.nsubxacts].xid = xid;
2704 
2705  /*
2706  * Get the current offset of the stream file and store it as offset of
2707  * this subxact.
2708  */
2710  &subxacts[subxact_data.nsubxacts].fileno,
2711  &subxacts[subxact_data.nsubxacts].offset);
2712 
2714  subxact_data.subxacts = subxacts;
2715 }
int fileno
Definition: worker.c:184
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TransactionId subxact_last
Definition: worker.c:193
uint32 nsubxacts
Definition: worker.c:191
SubXactInfo * subxacts
Definition: worker.c:194
TransactionId xid
Definition: worker.c:183
static MemoryContext LogicalStreamingContext
Definition: worker.c:157
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:743
#define Assert(condition)
Definition: c.h:804
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
off_t offset
Definition: worker.c:185
void * palloc(Size size)
Definition: mcxt.c:950
static ApplySubXactData subxact_data
Definition: worker.c:197
static BufFile * stream_fd
Definition: worker.c:179
uint32 nsubxacts_max
Definition: worker.c:192
int i
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static TransactionId stream_xid
Definition: worker.c:170

◆ subxact_info_read()

static void subxact_info_read ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 2571 of file worker.c.

References Assert, BufFileClose(), BufFileOpenShared(), BufFileRead(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_FIND, hash_search(), MAXPGPATH, MemoryContextSwitchTo(), my_log2(), ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, palloc(), subxact_filename(), StreamXidHash::subxact_fileset, ApplySubXactData::subxacts, and TransactionIdIsValid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_start().

2572 {
2573  char path[MAXPGPATH];
2574  bool found;
2575  Size len;
2576  BufFile *fd;
2577  StreamXidHash *ent;
2578  MemoryContext oldctx;
2579 
2584 
2585  /* Find the stream xid entry in the xidhash */
2586  ent = (StreamXidHash *) hash_search(xidhash,
2587  (void *) &xid,
2588  HASH_FIND,
2589  &found);
2590 
2591  /*
2592  * If subxact_fileset is not valid that mean we don't have any subxact
2593  * info
2594  */
2595  if (ent->subxact_fileset == NULL)
2596  return;
2597 
2598  subxact_filename(path, subid, xid);
2599 
2600  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
2601 
2602  /* read number of subxact items */
2604  sizeof(subxact_data.nsubxacts)) !=
2605  sizeof(subxact_data.nsubxacts))
2606  ereport(ERROR,
2608  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2609  path)));
2610 
2611  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2612 
2613  /* we keep the maximum as a power of 2 */
2615 
2616  /*
2617  * Allocate subxact information in the logical streaming context. We need
2618  * this information during the complete stream so that we can add the sub
2619  * transaction info to this. On stream stop we will flush this information
2620  * to the subxact file and reset the logical streaming context.
2621  */
2624  sizeof(SubXactInfo));
2625  MemoryContextSwitchTo(oldctx);
2626 
2627  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
2628  ereport(ERROR,
2630  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2631  path)));
2632 
2633  BufFileClose(fd);
2634 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2719
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
uint32 nsubxacts
Definition: worker.c:191
void BufFileClose(BufFile *file)
Definition: buffile.c:395
SubXactInfo * subxacts
Definition: worker.c:194
#define ERROR
Definition: elog.h:45
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:717
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
int my_log2(long num)
Definition: dynahash.c:1765
SharedFileSet * subxact_fileset
Definition: worker.c:150
#define ereport(elevel,...)
Definition: elog.h:155
static MemoryContext LogicalStreamingContext
Definition: worker.c:157
struct SubXactInfo SubXactInfo
static HTAB * xidhash
Definition: worker.c:176
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:905
static ApplySubXactData subxact_data
Definition: worker.c:197
uint32 nsubxacts_max
Definition: worker.c:192
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ subxact_info_write()

static void subxact_info_write ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 2493 of file worker.c.

References Assert, BufFileClose(), BufFileCreateShared(), BufFileOpenShared(), BufFileWrite(), cleanup_subxact_info(), fd(), HASH_FIND, hash_search(), MAXPGPATH, MemoryContextSwitchTo(), ApplySubXactData::nsubxacts, palloc(), pfree(), SharedFileSetDeleteAll(), SharedFileSetInit(), subxact_filename(), StreamXidHash::subxact_fileset, ApplySubXactData::subxacts, and TransactionIdIsValid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

2494 {
2495  char path[MAXPGPATH];
2496  bool found;
2497  Size len;
2498  StreamXidHash *ent;
2499  BufFile *fd;
2500 
2502 
2503  /* find the xid entry in the xidhash */
2504  ent = (StreamXidHash *) hash_search(xidhash,
2505  (void *) &xid,
2506  HASH_FIND,
2507  &found);
2508  /* we must found the entry for its top transaction by this time */
2509  Assert(found);
2510 
2511  /*
2512  * If there is no subtransaction then nothing to do, but if already have
2513  * subxact file then delete that.
2514  */
2515  if (subxact_data.nsubxacts == 0)
2516  {
2517  if (ent->subxact_fileset)
2518  {
2521  pfree(ent->subxact_fileset);
2522  ent->subxact_fileset = NULL;
2523  }
2524  return;
2525  }
2526 
2527  subxact_filename(path, subid, xid);
2528 
2529  /*
2530  * Create the subxact file if it not already created, otherwise open the
2531  * existing file.
2532  */
2533  if (ent->subxact_fileset == NULL)
2534  {
2535  MemoryContext oldctx;
2536 
2537  /*
2538  * We need to maintain shared fileset across multiple stream
2539  * start/stop calls. So, need to allocate it in a persistent context.
2540  */
2542  ent->subxact_fileset = palloc(sizeof(SharedFileSet));
2543  SharedFileSetInit(ent->subxact_fileset, NULL);
2544  MemoryContextSwitchTo(oldctx);
2545 
2546  fd = BufFileCreateShared(ent->subxact_fileset, path);
2547  }
2548  else
2549  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
2550 
2551  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2552 
2553  /* Write the subxact count and subxact info */
2556 
2557  BufFileClose(fd);
2558 
2559  /* free the memory allocated for subxact info */
2561 }
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:64
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2719
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
uint32 nsubxacts
Definition: worker.c:191
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void pfree(void *pointer)
Definition: mcxt.c:1057
SubXactInfo * subxacts
Definition: worker.c:194
#define MAXPGPATH
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:262
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
MemoryContext ApplyContext
Definition: worker.c:154
SharedFileSet * subxact_fileset
Definition: worker.c:150
struct SubXactInfo SubXactInfo
static HTAB * xidhash
Definition: worker.c:176
void SharedFileSetDeleteAll(SharedFileSet *fileset)
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
void * palloc(Size size)
Definition: mcxt.c:950
static ApplySubXactData subxact_data
Definition: worker.c:197
static void cleanup_subxact_info(void)
Definition: worker.c:2905
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:586