PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/dynahash.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_cleanup_files (Oid subid, TransactionId xid)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static void DisableSubscriptionAndExit (void)
 
static void apply_dispatch (StringInfo s)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static bool FindReplTupleInLocalRel (EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void TwoPhaseTransactionGid (Oid subid, TransactionId xid, char *gid, int szgid)
 
static void apply_spooled_messages (TransactionId xid, XLogRecPtr lsn)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void apply_error_callback (void *arg)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_stream_start (StringInfo s)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void start_table_sync (XLogRecPtr *origin_startpos, char **myslotname)
 
static void start_apply (XLogRecPtr origin_startpos)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static ApplyErrorCallbackArg apply_error_callback_arg
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 277 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 199 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 2510 of file worker.c.

2511 {
2513  LogicalRepMsgType saved_command;
2514 
2515  /*
2516  * Set the current command being applied. Since this function can be
2517  * called recursively when applying spooled changes, save the current
2518  * command.
2519  */
2520  saved_command = apply_error_callback_arg.command;
2522 
2523  switch (action)
2524  {
2525  case LOGICAL_REP_MSG_BEGIN:
2526  apply_handle_begin(s);
2527  break;
2528 
2531  break;
2532 
2535  break;
2536 
2539  break;
2540 
2543  break;
2544 
2547  break;
2548 
2551  break;
2552 
2553  case LOGICAL_REP_MSG_TYPE:
2554  apply_handle_type(s);
2555  break;
2556 
2559  break;
2560 
2562 
2563  /*
2564  * Logical replication does not use generic logical messages yet.
2565  * Although, it could be used by other applications that use this
2566  * output plugin.
2567  */
2568  break;
2569 
2572  break;
2573 
2576  break;
2577 
2580  break;
2581 
2584  break;
2585 
2588  break;
2589 
2592  break;
2593 
2596  break;
2597 
2600  break;
2601 
2604  break;
2605 
2606  default:
2607  ereport(ERROR,
2608  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2609  errmsg("invalid logical replication message type \"%c\"", action)));
2610  }
2611 
2612  /* Reset the current command */
2613  apply_error_callback_arg.command = saved_command;
2614 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1098
static void apply_handle_type(StringInfo s)
Definition: worker.c:1604
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:2388
static void apply_handle_update(StringInfo s)
Definition: worker.c:1805
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:1494
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:998
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:235
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1977
static void apply_handle_begin(StringInfo s)
Definition: worker.c:835
static void apply_handle_commit(StringInfo s)
Definition: worker.c:857
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1274
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1581
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:943
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1041
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1241
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1156
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:883
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1174
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1669
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
LogicalRepMsgType
Definition: logicalproto.h:53
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:57
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:54
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:55
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:63
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:56
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:58
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
LogicalRepMsgType command
Definition: worker.c:225

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), and LogicalRepApplyLoop().

◆ apply_error_callback()

static void apply_error_callback ( void *  arg)
static

Definition at line 4032 of file worker.c.

4033 {
4035 
4037  return;
4038 
4039  Assert(errarg->origin_name);
4040 
4041  if (errarg->rel == NULL)
4042  {
4043  if (!TransactionIdIsValid(errarg->remote_xid))
4044  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4045  errarg->origin_name,
4046  logicalrep_message_type(errarg->command));
4047  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4048  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4049  errarg->origin_name,
4051  errarg->remote_xid);
4052  else
4053  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4054  errarg->origin_name,
4056  errarg->remote_xid,
4057  LSN_FORMAT_ARGS(errarg->finish_lsn));
4058  }
4059  else if (errarg->remote_attnum < 0)
4060  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4061  errarg->origin_name,
4063  errarg->rel->remoterel.nspname,
4064  errarg->rel->remoterel.relname,
4065  errarg->remote_xid,
4066  LSN_FORMAT_ARGS(errarg->finish_lsn));
4067  else
4068  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4069  errarg->origin_name,
4071  errarg->rel->remoterel.nspname,
4072  errarg->rel->remoterel.relname,
4073  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4074  errarg->remote_xid,
4075  LSN_FORMAT_ARGS(errarg->finish_lsn));
4076 }
#define errcontext
Definition: elog.h:192
Assert(fmt[strlen(fmt) - 1] !='\n')
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1198
TransactionId remote_xid
Definition: worker.c:230
XLogRecPtr finish_lsn
Definition: worker.c:231
LogicalRepRelMapEntry * rel
Definition: worker.c:226
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 835 of file worker.c.

836 {
837  LogicalRepBeginData begin_data;
838 
839  logicalrep_read_begin(s, &begin_data);
840  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
841 
842  remote_final_lsn = begin_data.final_lsn;
843 
845 
846  in_remote_transaction = true;
847 
849 }
bool in_remote_transaction
Definition: worker.c:256
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4080
static XLogRecPtr remote_final_lsn
Definition: worker.c:257
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:3902
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
XLogRecPtr final_lsn
Definition: logicalproto.h:124
TransactionId xid
Definition: logicalproto.h:126

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 883 of file worker.c.

884 {
885  LogicalRepPreparedTxnData begin_data;
886 
887  /* Tablesync should never receive prepare. */
888  if (am_tablesync_worker())
889  ereport(ERROR,
890  (errcode(ERRCODE_PROTOCOL_VIOLATION),
891  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
892 
893  logicalrep_read_begin_prepare(s, &begin_data);
894  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
895 
896  remote_final_lsn = begin_data.prepare_lsn;
897 
899 
900  in_remote_transaction = true;
901 
903 }
int errmsg_internal(const char *fmt,...)
Definition: elog.c:993
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static bool am_tablesync_worker(void)

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 857 of file worker.c.

858 {
859  LogicalRepCommitData commit_data;
860 
861  logicalrep_read_commit(s, &commit_data);
862 
863  if (commit_data.commit_lsn != remote_final_lsn)
864  ereport(ERROR,
865  (errcode(ERRCODE_PROTOCOL_VIOLATION),
866  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
867  LSN_FORMAT_ARGS(commit_data.commit_lsn),
869 
870  apply_handle_commit_internal(&commit_data);
871 
872  /* Process any tables that are being synchronized in parallel. */
873  process_syncing_tables(commit_data.end_lsn);
874 
877 }
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:1528
static void reset_apply_error_context_info(void)
Definition: worker.c:4088
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:631

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 1528 of file worker.c.

1529 {
1530  if (is_skipping_changes())
1531  {
1533 
1534  /*
1535  * Start a new transaction to clear the subskiplsn, if not started
1536  * yet.
1537  */
1538  if (!IsTransactionState())
1540  }
1541 
1542  if (IsTransactionState())
1543  {
1544  /*
1545  * The transaction is either non-empty or skipped, so we clear the
1546  * subskiplsn.
1547  */
1549 
1550  /*
1551  * Update origin state so we can restart streaming from correct
1552  * position in case of crash.
1553  */
1554  replorigin_session_origin_lsn = commit_data->end_lsn;
1556 
1558  pgstat_report_stat(false);
1559 
1560  store_flush_position(commit_data->end_lsn);
1561  }
1562  else
1563  {
1564  /* Process any invalidation messages that might have accumulated. */
1567  }
1568 
1569  in_remote_transaction = false;
1570 }
static void maybe_reread_subscription(void)
Definition: worker.c:3057
static void stop_skipping_changes(void)
Definition: worker.c:3929
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2674
#define is_skipping_changes()
Definition: worker.c:277
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:3951
void AcceptInvalidationMessages(void)
Definition: inval.c:746
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
long pgstat_report_stat(bool force)
Definition: pgstat.c:565
TimestampTz committime
Definition: logicalproto.h:133
bool IsTransactionState(void)
Definition: xact.c:377
void StartTransactionCommand(void)
Definition: xact.c:2925
void CommitTransactionCommand(void)
Definition: xact.c:3022

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, is_skipping_changes, IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), and store_flush_position().

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 998 of file worker.c.

999 {
1000  LogicalRepCommitPreparedTxnData prepare_data;
1001  char gid[GIDSIZE];
1002 
1003  logicalrep_read_commit_prepared(s, &prepare_data);
1004  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1005 
1006  /* Compute GID for two_phase transactions. */
1008  gid, sizeof(gid));
1009 
1010  /* There is no transaction when COMMIT PREPARED is called */
1012 
1013  /*
1014  * Update origin state so we can restart streaming from correct position
1015  * in case of crash.
1016  */
1017  replorigin_session_origin_lsn = prepare_data.end_lsn;
1019 
1020  FinishPreparedTransaction(gid, true);
1023  pgstat_report_stat(false);
1024 
1025  store_flush_position(prepare_data.end_lsn);
1026  in_remote_transaction = false;
1027 
1028  /* Process any tables that are being synchronized in parallel. */
1029  process_syncing_tables(prepare_data.end_lsn);
1030 
1031  clear_subscription_skip_lsn(prepare_data.end_lsn);
1032 
1035 }
static void begin_replication_step(void)
Definition: worker.c:423
static void end_replication_step(void)
Definition: worker.c:446
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:3521
Subscription * MySubscription
Definition: worker.c:253
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1480
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 1977 of file worker.c.

1978 {
1979  LogicalRepRelMapEntry *rel;
1980  LogicalRepTupleData oldtup;
1981  LogicalRepRelId relid;
1982  ApplyExecutionData *edata;
1983  EState *estate;
1984  TupleTableSlot *remoteslot;
1985  MemoryContext oldctx;
1986 
1987  /*
1988  * Quick return if we are skipping data modification changes or handling
1989  * streamed transactions.
1990  */
1991  if (is_skipping_changes() ||
1993  return;
1994 
1996 
1997  relid = logicalrep_read_delete(s, &oldtup);
1998  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1999  if (!should_apply_changes_for_rel(rel))
2000  {
2001  /*
2002  * The relation can't become interesting in the middle of the
2003  * transaction so it's safe to unlock it.
2004  */
2007  return;
2008  }
2009 
2010  /* Set relation for error callback */
2012 
2013  /* Check if we can do the delete. */
2015 
2016  /* Initialize the executor state. */
2017  edata = create_edata_for_relation(rel);
2018  estate = edata->estate;
2019  remoteslot = ExecInitExtraTupleSlot(estate,
2020  RelationGetDescr(rel->localrel),
2021  &TTSOpsVirtual);
2022 
2023  /* Build the search tuple. */
2025  slot_store_data(remoteslot, rel, &oldtup);
2026  MemoryContextSwitchTo(oldctx);
2027 
2028  /* For a partitioned table, apply delete to correct partition. */
2029  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2031  remoteslot, NULL, CMD_DELETE);
2032  else
2034  remoteslot);
2035 
2036  finish_edata(edata);
2037 
2038  /* Reset relation for error callback */
2040 
2042 
2044 }
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1764
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:500
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:405
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:462
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2140
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2052
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:641
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:553
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:540
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:96
@ CMD_DELETE
Definition: nodes.h:268
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
#define RelationGetDescr(relation)
Definition: rel.h:527
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:319
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:456
ResultRelInfo * targetRelInfo
Definition: worker.c:215
EState * estate
Definition: worker.c:212
Form_pg_class rd_rel
Definition: rel.h:110

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2052 of file worker.c.

2055 {
2056  EState *estate = edata->estate;
2057  Relation localrel = relinfo->ri_RelationDesc;
2058  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2059  EPQState epqstate;
2060  TupleTableSlot *localslot;
2061  bool found;
2062 
2063  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2064  ExecOpenIndices(relinfo, false);
2065 
2066  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
2067  remoteslot, &localslot);
2068 
2069  /* If found delete it. */
2070  if (found)
2071  {
2072  EvalPlanQualSetSlot(&epqstate, localslot);
2073 
2074  /* Do the actual delete. */
2076  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2077  }
2078  else
2079  {
2080  /*
2081  * The tuple to be deleted could not be found. Do nothing except for
2082  * emitting a log message.
2083  *
2084  * XXX should this be promoted to ereport(LOG) perhaps?
2085  */
2086  elog(DEBUG1,
2087  "logical replication did not find row to be deleted "
2088  "in replication target relation \"%s\"",
2089  RelationGetRelationName(localrel));
2090  }
2091 
2092  /* Cleanup. */
2093  ExecCloseIndices(relinfo);
2094  EvalPlanQualEnd(&epqstate);
2095 }
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2105
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:1637
#define DEBUG1
Definition: elog.h:26
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2939
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2518
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:229
#define ACL_DELETE
Definition: parsenodes.h:86
#define NIL
Definition: pg_list.h:66
#define RelationGetRelationName(relation)
Definition: rel.h:535
LogicalRepRelMapEntry * targetRel
Definition: worker.c:214
Relation ri_RelationDesc
Definition: execnodes.h:448

References ACL_DELETE, DEBUG1, elog(), ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 1669 of file worker.c.

1670 {
1671  LogicalRepRelMapEntry *rel;
1672  LogicalRepTupleData newtup;
1673  LogicalRepRelId relid;
1674  ApplyExecutionData *edata;
1675  EState *estate;
1676  TupleTableSlot *remoteslot;
1677  MemoryContext oldctx;
1678 
1679  /*
1680  * Quick return if we are skipping data modification changes or handling
1681  * streamed transactions.
1682  */
1683  if (is_skipping_changes() ||
1685  return;
1686 
1688 
1689  relid = logicalrep_read_insert(s, &newtup);
1690  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1691  if (!should_apply_changes_for_rel(rel))
1692  {
1693  /*
1694  * The relation can't become interesting in the middle of the
1695  * transaction so it's safe to unlock it.
1696  */
1699  return;
1700  }
1701 
1702  /* Set relation for error callback */
1704 
1705  /* Initialize the executor state. */
1706  edata = create_edata_for_relation(rel);
1707  estate = edata->estate;
1708  remoteslot = ExecInitExtraTupleSlot(estate,
1709  RelationGetDescr(rel->localrel),
1710  &TTSOpsVirtual);
1711 
1712  /* Process and store remote tuple in the slot */
1714  slot_store_data(remoteslot, rel, &newtup);
1715  slot_fill_defaults(rel, estate, remoteslot);
1716  MemoryContextSwitchTo(oldctx);
1717 
1718  /* For a partitioned table, insert the tuple into a partition. */
1719  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1721  remoteslot, NULL, CMD_INSERT);
1722  else
1724  remoteslot);
1725 
1726  finish_edata(edata);
1727 
1728  /* Reset relation for error callback */
1730 
1732 
1734 }
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1742
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:584
@ CMD_INSERT
Definition: nodes.h:267
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 1742 of file worker.c.

1745 {
1746  EState *estate = edata->estate;
1747 
1748  /* We must open indexes here. */
1749  ExecOpenIndices(relinfo, false);
1750 
1751  /* Do the insert. */
1753  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1754 
1755  /* Cleanup. */
1756  ExecCloseIndices(relinfo);
1757 }
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:83

References ACL_INSERT, ApplyExecutionData::estate, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationInsert(), ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1156 of file worker.c.

1157 {
1158  /*
1159  * ORIGIN message can only come inside streaming transaction or inside
1160  * remote transaction and before any actual writes.
1161  */
1162  if (!in_streamed_transaction &&
1165  ereport(ERROR,
1166  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1167  errmsg_internal("ORIGIN message sent out of order")));
1168 }
static bool in_streamed_transaction
Definition: worker.c:260

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 943 of file worker.c.

944 {
945  LogicalRepPreparedTxnData prepare_data;
946 
947  logicalrep_read_prepare(s, &prepare_data);
948 
949  if (prepare_data.prepare_lsn != remote_final_lsn)
950  ereport(ERROR,
951  (errcode(ERRCODE_PROTOCOL_VIOLATION),
952  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
953  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
955 
956  /*
957  * Unlike commit, here, we always prepare the transaction even though no
958  * change has happened in this transaction or all changes are skipped. It
959  * is done this way because at commit prepared time, we won't know whether
960  * we have skipped preparing a transaction because of those reasons.
961  *
962  * XXX, We can optimize such that at commit prepared time, we first check
963  * whether we have prepared the transaction or not but that doesn't seem
964  * worthwhile because such cases shouldn't be common.
965  */
967 
968  apply_handle_prepare_internal(&prepare_data);
969 
972  pgstat_report_stat(false);
973 
974  store_flush_position(prepare_data.end_lsn);
975 
976  in_remote_transaction = false;
977 
978  /* Process any tables that are being synchronized in parallel. */
979  process_syncing_tables(prepare_data.end_lsn);
980 
981  /*
982  * Since we have already prepared the transaction, in a case where the
983  * server crashes before clearing the subskiplsn, it will be left but the
984  * transaction won't be resent. But that's okay because it's a rare case
985  * and the subskiplsn will be cleared when finishing the next transaction.
986  */
989 
992 }
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:909
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), and store_flush_position().

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 909 of file worker.c.

910 {
911  char gid[GIDSIZE];
912 
913  /*
914  * Compute unique GID for two_phase transactions. We don't use GID of
915  * prepared transaction sent by server as that can lead to deadlock when
916  * we have multiple subscriptions from same node point to publications on
917  * the same node. See comments atop worker.c
918  */
920  gid, sizeof(gid));
921 
922  /*
923  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
924  * called within the PrepareTransactionBlock below.
925  */
927  CommitTransactionCommand(); /* Completes the preceding Begin command. */
928 
929  /*
930  * Update origin state so we can restart streaming from correct position
931  * in case of crash.
932  */
933  replorigin_session_origin_lsn = prepare_data->end_lsn;
935 
937 }
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3818
void BeginTransactionBlock(void)
Definition: xact.c:3750

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 1581 of file worker.c.

1582 {
1583  LogicalRepRelation *rel;
1584 
1586  return;
1587 
1588  rel = logicalrep_read_rel(s);
1590 
1591  /* Also reset all entries in the partition map that refer to remoterel. */
1593 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:523
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1041 of file worker.c.

1042 {
1043  LogicalRepRollbackPreparedTxnData rollback_data;
1044  char gid[GIDSIZE];
1045 
1046  logicalrep_read_rollback_prepared(s, &rollback_data);
1047  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1048 
1049  /* Compute GID for two_phase transactions. */
1050  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1051  gid, sizeof(gid));
1052 
1053  /*
1054  * It is possible that we haven't received prepare because it occurred
1055  * before walsender reached a consistent point or the two_phase was still
1056  * not enabled by that time, so in such cases, we need to skip rollback
1057  * prepared.
1058  */
1059  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1060  rollback_data.prepare_time))
1061  {
1062  /*
1063  * Update origin state so we can restart streaming from correct
1064  * position in case of crash.
1065  */
1068 
1069  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1071  FinishPreparedTransaction(gid, false);
1074 
1076  }
1077 
1078  pgstat_report_stat(false);
1079 
1080  store_flush_position(rollback_data.rollback_end_lsn);
1081  in_remote_transaction = false;
1082 
1083  /* Process any tables that are being synchronized in parallel. */
1085 
1088 }
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2577

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 1274 of file worker.c.

1275 {
1276  TransactionId xid;
1277  TransactionId subxid;
1278 
1280  ereport(ERROR,
1281  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1282  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1283 
1284  logicalrep_read_stream_abort(s, &xid, &subxid);
1285 
1286  /*
1287  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1288  * just delete the files with serialized info.
1289  */
1290  if (xid == subxid)
1291  {
1294  }
1295  else
1296  {
1297  /*
1298  * OK, so it's a subxact. We need to read the subxact file for the
1299  * toplevel transaction, determine the offset tracked for the subxact,
1300  * and truncate the file with changes. We also remove the subxacts
1301  * with higher offsets (or rather higher XIDs).
1302  *
1303  * We intentionally scan the array from the tail, because we're likely
1304  * aborting a change for the most recent subtransactions.
1305  *
1306  * We can't use the binary search here as subxact XIDs won't
1307  * necessarily arrive in sorted order, consider the case where we have
1308  * released the savepoint for multiple subtransactions and then
1309  * performed rollback to savepoint for one of the earlier
1310  * sub-transaction.
1311  */
1312  int64 i;
1313  int64 subidx;
1314  BufFile *fd;
1315  bool found = false;
1316  char path[MAXPGPATH];
1317 
1319 
1320  subidx = -1;
1323 
1324  for (i = subxact_data.nsubxacts; i > 0; i--)
1325  {
1326  if (subxact_data.subxacts[i - 1].xid == subxid)
1327  {
1328  subidx = (i - 1);
1329  found = true;
1330  break;
1331  }
1332  }
1333 
1334  /*
1335  * If it's an empty sub-transaction then we will not find the subxid
1336  * here so just cleanup the subxact info and return.
1337  */
1338  if (!found)
1339  {
1340  /* Cleanup the subxact info */
1345  return;
1346  }
1347 
1348  /* open the changes file */
1351  O_RDWR, false);
1352 
1353  /* OK, truncate the file at the right offset */
1355  subxact_data.subxacts[subidx].offset);
1356  BufFileClose(fd);
1357 
1358  /* discard the subxacts added later */
1359  subxact_data.nsubxacts = subidx;
1360 
1361  /* write the updated subxact list */
1363 
1366  }
1367 
1369 }
static void cleanup_subxact_info(void)
Definition: worker.c:3504
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:3380
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3366
static ApplySubXactData subxact_data
Definition: worker.c:298
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:3172
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:3221
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:900
void BufFileClose(BufFile *file)
Definition: buffile.c:407
uint32 TransactionId
Definition: c.h:588
int i
Definition: isn.c:73
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define MAXPGPATH
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1185
uint32 nsubxacts
Definition: worker.c:292
SubXactInfo * subxacts
Definition: worker.c:295
FileSet * stream_fileset
off_t offset
Definition: worker.c:286
TransactionId xid
Definition: worker.c:284
int fileno
Definition: worker.c:285
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileTruncateFileSet(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, fd(), SubXactInfo::fileno, i, in_streamed_transaction, InvalidXLogRecPtr, logicalrep_read_stream_abort(), MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, reset_apply_error_context_info(), set_apply_error_context_xact(), stream_cleanup_files(), LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subxact_data, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 1494 of file worker.c.

1495 {
1496  TransactionId xid;
1497  LogicalRepCommitData commit_data;
1498 
1500  ereport(ERROR,
1501  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1502  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1503 
1504  xid = logicalrep_read_stream_commit(s, &commit_data);
1505  set_apply_error_context_xact(xid, commit_data.commit_lsn);
1506 
1507  elog(DEBUG1, "received commit for streamed transaction %u", xid);
1508 
1509  apply_spooled_messages(xid, commit_data.commit_lsn);
1510 
1511  apply_handle_commit_internal(&commit_data);
1512 
1513  /* unlink the files with serialized changes and subxact info */
1515 
1516  /* Process any tables that are being synchronized in parallel. */
1517  process_syncing_tables(commit_data.end_lsn);
1518 
1520 
1522 }
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1375
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1143

References apply_handle_commit_internal(), apply_spooled_messages(), LogicalRepCommitData::commit_lsn, DEBUG1, elog(), LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, in_streamed_transaction, logicalrep_read_stream_commit(), MyLogicalRepWorker, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, stream_cleanup_files(), and LogicalRepWorker::subid.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1098 of file worker.c.

1099 {
1100  LogicalRepPreparedTxnData prepare_data;
1101 
1103  ereport(ERROR,
1104  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1105  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1106 
1107  /* Tablesync should never receive prepare. */
1108  if (am_tablesync_worker())
1109  ereport(ERROR,
1110  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1111  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1112 
1113  logicalrep_read_stream_prepare(s, &prepare_data);
1114  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1115 
1116  elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1117 
1118  /* Replay all the spooled operations. */
1119  apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1120 
1121  /* Mark the transaction as prepared. */
1122  apply_handle_prepare_internal(&prepare_data);
1123 
1125 
1126  pgstat_report_stat(false);
1127 
1128  store_flush_position(prepare_data.end_lsn);
1129 
1130  in_remote_transaction = false;
1131 
1132  /* unlink the files with serialized changes and subxact info. */
1134 
1135  /* Process any tables that are being synchronized in parallel. */
1136  process_syncing_tables(prepare_data.end_lsn);
1137 
1138  /*
1139  * Similar to prepare case, the subskiplsn could be left in a case of
1140  * server crash but it's okay. See the comments in apply_handle_prepare().
1141  */
1144 
1146 
1148 }
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376

References am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), clear_subscription_skip_lsn(), CommitTransactionCommand(), DEBUG1, elog(), LogicalRepPreparedTxnData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, logicalrep_read_stream_prepare(), MyLogicalRepWorker, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), LogicalRepWorker::subid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1174 of file worker.c.

1175 {
1176  bool first_segment;
1177 
1179  ereport(ERROR,
1180  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1181  errmsg_internal("duplicate STREAM START message")));
1182 
1183  /*
1184  * Start a transaction on stream start, this transaction will be committed
1185  * on the stream stop unless it is a tablesync worker in which case it
1186  * will be committed after processing all the messages. We need the
1187  * transaction for handling the buffile, used for serializing the
1188  * streaming data and subxact info.
1189  */
1191 
1192  /* notify handle methods we're processing a remote transaction */
1193  in_streamed_transaction = true;
1194 
1195  /* extract XID of the top-level transaction */
1196  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1197 
1199  ereport(ERROR,
1200  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1201  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1202 
1204 
1205  /*
1206  * Initialize the worker's stream_fileset if we haven't yet. This will be
1207  * used for the entire duration of the worker so create it in a permanent
1208  * context. We create this on the very first streaming message from any
1209  * transaction and then use it for this and other streaming transactions.
1210  * Now, we could create a fileset at the start of the worker as well but
1211  * then we won't be sure that it will ever be used.
1212  */
1213  if (MyLogicalRepWorker->stream_fileset == NULL)
1214  {
1215  MemoryContext oldctx;
1216 
1218 
1221 
1222  MemoryContextSwitchTo(oldctx);
1223  }
1224 
1225  /* open the spool file for this transaction */
1227 
1228  /* if this is not the first segment, open existing subxact file */
1229  if (!first_segment)
1231 
1233 
1235 }
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:3407
MemoryContext ApplyContext
Definition: worker.c:246
static TransactionId stream_xid
Definition: worker.c:262
void FileSetInit(FileSet *fileset)
Definition: fileset.c:54
void * palloc(Size size)
Definition: mcxt.c:1199
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1093

References ApplyContext, begin_replication_step(), end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FileSetInit(), in_streamed_transaction, InvalidXLogRecPtr, logicalrep_read_stream_start(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), set_apply_error_context_xact(), STATE_RUNNING, LogicalRepWorker::stream_fileset, stream_open_file(), stream_xid, LogicalRepWorker::subid, subxact_info_read(), and TransactionIdIsValid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1241 of file worker.c.

1242 {
1244  ereport(ERROR,
1245  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1246  errmsg_internal("STREAM STOP message without STREAM START")));
1247 
1248  /*
1249  * Close the file with serialized changes, and serialize information about
1250  * subxacts for the toplevel transaction.
1251  */
1254 
1255  /* We must be in a valid transaction state */
1257 
1258  /* Commit the per-stream transaction */
1260 
1261  in_streamed_transaction = false;
1262 
1263  /* Reset per-stream context */
1265 
1268 }
static MemoryContext LogicalStreamingContext
Definition: worker.c:249
static void stream_close_file(void)
Definition: worker.c:3456
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:303

References Assert(), CommitTransactionCommand(), ereport, errcode(), errmsg_internal(), ERROR, in_streamed_transaction, IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, pgstat_report_activity(), reset_apply_error_context_info(), STATE_IDLE, stream_close_file(), stream_xid, LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 2388 of file worker.c.

2389 {
2390  bool cascade = false;
2391  bool restart_seqs = false;
2392  List *remote_relids = NIL;
2393  List *remote_rels = NIL;
2394  List *rels = NIL;
2395  List *part_rels = NIL;
2396  List *relids = NIL;
2397  List *relids_logged = NIL;
2398  ListCell *lc;
2399  LOCKMODE lockmode = AccessExclusiveLock;
2400 
2401  /*
2402  * Quick return if we are skipping data modification changes or handling
2403  * streamed transactions.
2404  */
2405  if (is_skipping_changes() ||
2407  return;
2408 
2410 
2411  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
2412 
2413  foreach(lc, remote_relids)
2414  {
2415  LogicalRepRelId relid = lfirst_oid(lc);
2416  LogicalRepRelMapEntry *rel;
2417 
2418  rel = logicalrep_rel_open(relid, lockmode);
2419  if (!should_apply_changes_for_rel(rel))
2420  {
2421  /*
2422  * The relation can't become interesting in the middle of the
2423  * transaction so it's safe to unlock it.
2424  */
2425  logicalrep_rel_close(rel, lockmode);
2426  continue;
2427  }
2428 
2429  remote_rels = lappend(remote_rels, rel);
2431  rels = lappend(rels, rel->localrel);
2432  relids = lappend_oid(relids, rel->localreloid);
2434  relids_logged = lappend_oid(relids_logged, rel->localreloid);
2435 
2436  /*
2437  * Truncate partitions if we got a message to truncate a partitioned
2438  * table.
2439  */
2440  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2441  {
2442  ListCell *child;
2443  List *children = find_all_inheritors(rel->localreloid,
2444  lockmode,
2445  NULL);
2446 
2447  foreach(child, children)
2448  {
2449  Oid childrelid = lfirst_oid(child);
2450  Relation childrel;
2451 
2452  if (list_member_oid(relids, childrelid))
2453  continue;
2454 
2455  /* find_all_inheritors already got lock */
2456  childrel = table_open(childrelid, NoLock);
2457 
2458  /*
2459  * Ignore temp tables of other backends. See similar code in
2460  * ExecuteTruncate().
2461  */
2462  if (RELATION_IS_OTHER_TEMP(childrel))
2463  {
2464  table_close(childrel, lockmode);
2465  continue;
2466  }
2467 
2469  rels = lappend(rels, childrel);
2470  part_rels = lappend(part_rels, childrel);
2471  relids = lappend_oid(relids, childrelid);
2472  /* Log this relation only if needed for logical decoding */
2473  if (RelationIsLogicallyLogged(childrel))
2474  relids_logged = lappend_oid(relids_logged, childrelid);
2475  }
2476  }
2477  }
2478 
2479  /*
2480  * Even if we used CASCADE on the upstream primary we explicitly default
2481  * to replaying changes without further cascading. This might be later
2482  * changeable with a user specified option.
2483  */
2484  ExecuteTruncateGuts(rels,
2485  relids,
2486  relids_logged,
2487  DROP_RESTRICT,
2488  restart_seqs);
2489  foreach(lc, remote_rels)
2490  {
2491  LogicalRepRelMapEntry *rel = lfirst(lc);
2492 
2494  }
2495  foreach(lc, part_rels)
2496  {
2497  Relation rel = lfirst(lc);
2498 
2499  table_close(rel, NoLock);
2500  }
2501 
2503 }
List * lappend(List *list, void *datum)
Definition: list.c:338
List * lappend_oid(List *list, Oid datum)
Definition: list.c:374
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:721
int LOCKMODE
Definition: lockdefs.h:26
#define AccessExclusiveLock
Definition: lockdefs.h:43
@ DROP_RESTRICT
Definition: parsenodes.h:1935
#define ACL_TRUNCATE
Definition: parsenodes.h:87
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:170
#define lfirst_oid(lc)
Definition: pg_list.h:172
unsigned int Oid
Definition: postgres_ext.h:31
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:699
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:656
Definition: pg_list.h:52
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1789

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 2140 of file worker.c.

2144 {
2145  EState *estate = edata->estate;
2146  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2147  ResultRelInfo *relinfo = edata->targetRelInfo;
2148  Relation parentrel = relinfo->ri_RelationDesc;
2149  ModifyTableState *mtstate;
2150  PartitionTupleRouting *proute;
2151  ResultRelInfo *partrelinfo;
2152  Relation partrel;
2153  TupleTableSlot *remoteslot_part;
2154  TupleConversionMap *map;
2155  MemoryContext oldctx;
2156  LogicalRepRelMapEntry *part_entry = NULL;
2157  AttrMap *attrmap = NULL;
2158 
2159  /* ModifyTableState is needed for ExecFindPartition(). */
2160  edata->mtstate = mtstate = makeNode(ModifyTableState);
2161  mtstate->ps.plan = NULL;
2162  mtstate->ps.state = estate;
2163  mtstate->operation = operation;
2164  mtstate->resultRelInfo = relinfo;
2165 
2166  /* ... as is PartitionTupleRouting. */
2167  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2168 
2169  /*
2170  * Find the partition to which the "search tuple" belongs.
2171  */
2172  Assert(remoteslot != NULL);
2174  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2175  remoteslot, estate);
2176  Assert(partrelinfo != NULL);
2177  partrel = partrelinfo->ri_RelationDesc;
2178 
2179  /*
2180  * Check for supported relkind. We need this since partitions might be of
2181  * unsupported relkinds; and the set of partitions can change, so checking
2182  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2183  */
2184  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2186  RelationGetRelationName(partrel));
2187 
2188  /*
2189  * To perform any of the operations below, the tuple must match the
2190  * partition's rowtype. Convert if needed or just copy, using a dedicated
2191  * slot to store the tuple in any case.
2192  */
2193  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2194  if (remoteslot_part == NULL)
2195  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2196  map = ExecGetRootToChildMap(partrelinfo, estate);
2197  if (map != NULL)
2198  {
2199  attrmap = map->attrMap;
2200  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2201  remoteslot_part);
2202  }
2203  else
2204  {
2205  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2206  slot_getallattrs(remoteslot_part);
2207  }
2208  MemoryContextSwitchTo(oldctx);
2209 
2210  /* Check if we can do the update or delete on the leaf partition. */
2211  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2212  {
2213  part_entry = logicalrep_partition_open(relmapentry, partrel,
2214  attrmap);
2215  check_relation_updatable(part_entry);
2216  }
2217 
2218  switch (operation)
2219  {
2220  case CMD_INSERT:
2221  apply_handle_insert_internal(edata, partrelinfo,
2222  remoteslot_part);
2223  break;
2224 
2225  case CMD_DELETE:
2226  apply_handle_delete_internal(edata, partrelinfo,
2227  remoteslot_part);
2228  break;
2229 
2230  case CMD_UPDATE:
2231 
2232  /*
2233  * For UPDATE, depending on whether or not the updated tuple
2234  * satisfies the partition's constraint, perform a simple UPDATE
2235  * of the partition or move the updated tuple into a different
2236  * suitable partition.
2237  */
2238  {
2239  TupleTableSlot *localslot;
2240  ResultRelInfo *partrelinfo_new;
2241  Relation partrel_new;
2242  bool found;
2243 
2244  /* Get the matching local tuple from the partition. */
2245  found = FindReplTupleInLocalRel(estate, partrel,
2246  &part_entry->remoterel,
2247  remoteslot_part, &localslot);
2248  if (!found)
2249  {
2250  /*
2251  * The tuple to be updated could not be found. Do nothing
2252  * except for emitting a log message.
2253  *
2254  * XXX should this be promoted to ereport(LOG) perhaps?
2255  */
2256  elog(DEBUG1,
2257  "logical replication did not find row to be updated "
2258  "in replication target relation's partition \"%s\"",
2259  RelationGetRelationName(partrel));
2260  return;
2261  }
2262 
2263  /*
2264  * Apply the update to the local tuple, putting the result in
2265  * remoteslot_part.
2266  */
2268  slot_modify_data(remoteslot_part, localslot, part_entry,
2269  newtup);
2270  MemoryContextSwitchTo(oldctx);
2271 
2272  /*
2273  * Does the updated tuple still satisfy the current
2274  * partition's constraint?
2275  */
2276  if (!partrel->rd_rel->relispartition ||
2277  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
2278  false))
2279  {
2280  /*
2281  * Yes, so simply UPDATE the partition. We don't call
2282  * apply_handle_update_internal() here, which would
2283  * normally do the following work, to avoid repeating some
2284  * work already done above to find the local tuple in the
2285  * partition.
2286  */
2287  EPQState epqstate;
2288 
2289  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2290  ExecOpenIndices(partrelinfo, false);
2291 
2292  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
2294  ACL_UPDATE);
2295  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
2296  localslot, remoteslot_part);
2297  ExecCloseIndices(partrelinfo);
2298  EvalPlanQualEnd(&epqstate);
2299  }
2300  else
2301  {
2302  /* Move the tuple into the new partition. */
2303 
2304  /*
2305  * New partition will be found using tuple routing, which
2306  * can only occur via the parent table. We might need to
2307  * convert the tuple to the parent's rowtype. Note that
2308  * this is the tuple found in the partition, not the
2309  * original search tuple received by this function.
2310  */
2311  if (map)
2312  {
2313  TupleConversionMap *PartitionToRootMap =
2315  RelationGetDescr(parentrel));
2316 
2317  remoteslot =
2318  execute_attr_map_slot(PartitionToRootMap->attrMap,
2319  remoteslot_part, remoteslot);
2320  }
2321  else
2322  {
2323  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
2324  slot_getallattrs(remoteslot);
2325  }
2326 
2327  /* Find the new partition. */
2329  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
2330  proute, remoteslot,
2331  estate);
2332  MemoryContextSwitchTo(oldctx);
2333  Assert(partrelinfo_new != partrelinfo);
2334  partrel_new = partrelinfo_new->ri_RelationDesc;
2335 
2336  /* Check that new partition also has supported relkind. */
2337  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
2339  RelationGetRelationName(partrel_new));
2340 
2341  /* DELETE old tuple found in the old partition. */
2342  apply_handle_delete_internal(edata, partrelinfo,
2343  localslot);
2344 
2345  /* INSERT new tuple into the new partition. */
2346 
2347  /*
2348  * Convert the replacement tuple to match the destination
2349  * partition rowtype.
2350  */
2352  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
2353  if (remoteslot_part == NULL)
2354  remoteslot_part = table_slot_create(partrel_new,
2355  &estate->es_tupleTable);
2356  map = ExecGetRootToChildMap(partrelinfo_new, estate);
2357  if (map != NULL)
2358  {
2359  remoteslot_part = execute_attr_map_slot(map->attrMap,
2360  remoteslot,
2361  remoteslot_part);
2362  }
2363  else
2364  {
2365  remoteslot_part = ExecCopySlot(remoteslot_part,
2366  remoteslot);
2367  slot_getallattrs(remoteslot);
2368  }
2369  MemoryContextSwitchTo(oldctx);
2370  apply_handle_insert_internal(edata, partrelinfo_new,
2371  remoteslot_part);
2372  }
2373  }
2374  break;
2375 
2376  default:
2377  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
2378  break;
2379  }
2380 }
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:742
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1786
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1262
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
@ CMD_UPDATE
Definition: nodes.h:266
#define makeNode(_type_)
Definition: nodes.h:165
#define ACL_UPDATE
Definition: parsenodes.h:85
#define RelationGetNamespace(relation)
Definition: rel.h:542
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:585
PartitionTupleRouting * proute
Definition: worker.c:219
ModifyTableState * mtstate
Definition: worker.c:218
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:657
CmdType operation
Definition: execnodes.h:1257
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1261
PlanState ps
Definition: execnodes.h:1256
Plan * plan
Definition: execnodes.h:1029
EState * state
Definition: execnodes.h:1031
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:568
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:483
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:362

References ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert(), TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog(), ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, slot_getallattrs(), slot_modify_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 1604 of file worker.c.

1605 {
1606  LogicalRepTyp typ;
1607 
1609  return;
1610 
1611  logicalrep_read_typ(s, &typ);
1612 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 1805 of file worker.c.

1806 {
1807  LogicalRepRelMapEntry *rel;
1808  LogicalRepRelId relid;
1809  ApplyExecutionData *edata;
1810  EState *estate;
1811  LogicalRepTupleData oldtup;
1812  LogicalRepTupleData newtup;
1813  bool has_oldtup;
1814  TupleTableSlot *remoteslot;
1815  RangeTblEntry *target_rte;
1816  MemoryContext oldctx;
1817 
1818  /*
1819  * Quick return if we are skipping data modification changes or handling
1820  * streamed transactions.
1821  */
1822  if (is_skipping_changes() ||
1824  return;
1825 
1827 
1828  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1829  &newtup);
1830  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1831  if (!should_apply_changes_for_rel(rel))
1832  {
1833  /*
1834  * The relation can't become interesting in the middle of the
1835  * transaction so it's safe to unlock it.
1836  */
1839  return;
1840  }
1841 
1842  /* Set relation for error callback */
1844 
1845  /* Check if we can do the update. */
1847 
1848  /* Initialize the executor state. */
1849  edata = create_edata_for_relation(rel);
1850  estate = edata->estate;
1851  remoteslot = ExecInitExtraTupleSlot(estate,
1852  RelationGetDescr(rel->localrel),
1853  &TTSOpsVirtual);
1854 
1855  /*
1856  * Populate updatedCols so that per-column triggers can fire, and so
1857  * executor can correctly pass down indexUnchanged hint. This could
1858  * include more columns than were actually changed on the publisher
1859  * because the logical replication protocol doesn't contain that
1860  * information. But it would for example exclude columns that only exist
1861  * on the subscriber, since we are not touching those.
1862  */
1863  target_rte = list_nth(estate->es_range_table, 0);
1864  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1865  {
1867  int remoteattnum = rel->attrmap->attnums[i];
1868 
1869  if (!att->attisdropped && remoteattnum >= 0)
1870  {
1871  Assert(remoteattnum < newtup.ncols);
1872  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1873  target_rte->updatedCols =
1874  bms_add_member(target_rte->updatedCols,
1876  }
1877  }
1878 
1879  /* Also populate extraUpdatedCols, in case we have generated columns */
1880  fill_extraUpdatedCols(target_rte, rel->localrel);
1881 
1882  /* Build the search tuple. */
1884  slot_store_data(remoteslot, rel,
1885  has_oldtup ? &oldtup : &newtup);
1886  MemoryContextSwitchTo(oldctx);
1887 
1888  /* For a partitioned table, apply update to correct partition. */
1889  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1891  remoteslot, &newtup, CMD_UPDATE);
1892  else
1894  remoteslot, &newtup);
1895 
1896  finish_edata(edata);
1897 
1898  /* Reset relation for error callback */
1900 
1902 
1904 }
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
Definition: worker.c:1912
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:739
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:92
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
static void * list_nth(const List *list, int n)
Definition: pg_list.h:297
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
AttrNumber * attnums
Definition: attmap.h:36
List * es_range_table
Definition: execnodes.h:614
Bitmapset * updatedCols
Definition: parsenodes.h:1184
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_range_table, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), fill_extraUpdatedCols(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RangeTblEntry::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup 
)
static

Definition at line 1912 of file worker.c.

1916 {
1917  EState *estate = edata->estate;
1918  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1919  Relation localrel = relinfo->ri_RelationDesc;
1920  EPQState epqstate;
1921  TupleTableSlot *localslot;
1922  bool found;
1923  MemoryContext oldctx;
1924 
1925  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1926  ExecOpenIndices(relinfo, false);
1927 
1928  found = FindReplTupleInLocalRel(estate, localrel,
1929  &relmapentry->remoterel,
1930  remoteslot, &localslot);
1931  ExecClearTuple(remoteslot);
1932 
1933  /*
1934  * Tuple found.
1935  *
1936  * Note this will fail if there are other conflicting unique indexes.
1937  */
1938  if (found)
1939  {
1940  /* Process and store remote tuple in the slot */
1942  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1943  MemoryContextSwitchTo(oldctx);
1944 
1945  EvalPlanQualSetSlot(&epqstate, remoteslot);
1946 
1947  /* Do the actual update. */
1949  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1950  remoteslot);
1951  }
1952  else
1953  {
1954  /*
1955  * The tuple to be updated could not be found. Do nothing except for
1956  * emitting a log message.
1957  *
1958  * XXX should this be promoted to ereport(LOG) perhaps?
1959  */
1960  elog(DEBUG1,
1961  "logical replication did not find row to be updated "
1962  "in replication target relation \"%s\"",
1963  RelationGetRelationName(localrel));
1964  }
1965 
1966  /* Cleanup. */
1967  ExecCloseIndices(relinfo);
1968  EvalPlanQualEnd(&epqstate);
1969 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433

References ACL_UPDATE, DEBUG1, elog(), ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, slot_modify_data(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

static void apply_spooled_messages ( TransactionId  xid,
XLogRecPtr  lsn 
)
static

Definition at line 1375 of file worker.c.

1376 {
1378  int nchanges;
1379  char path[MAXPGPATH];
1380  char *buffer = NULL;
1381  MemoryContext oldcxt;
1382  BufFile *fd;
1383 
1385 
1386  /* Make sure we have an open transaction */
1388 
1389  /*
1390  * Allocate file handle and memory required to process all the messages in
1391  * TopTransactionContext to avoid them getting reset after each message is
1392  * processed.
1393  */
1395 
1396  /* Open the spool file for the committed/prepared transaction */
1398  elog(DEBUG1, "replaying changes from file \"%s\"", path);
1399 
1401  false);
1402 
1403  buffer = palloc(BLCKSZ);
1404  initStringInfo(&s2);
1405 
1406  MemoryContextSwitchTo(oldcxt);
1407 
1408  remote_final_lsn = lsn;
1409 
1410  /*
1411  * Make sure the handle apply_dispatch methods are aware we're in a remote
1412  * transaction.
1413  */
1414  in_remote_transaction = true;
1416 
1418 
1419  /*
1420  * Read the entries one by one and pass them through the same logic as in
1421  * apply_dispatch.
1422  */
1423  nchanges = 0;
1424  while (true)
1425  {
1426  int nbytes;
1427  int len;
1428 
1430 
1431  /* read length of the on-disk record */
1432  nbytes = BufFileRead(fd, &len, sizeof(len));
1433 
1434  /* have we reached end of the file? */
1435  if (nbytes == 0)
1436  break;
1437 
1438  /* do we have a correct length? */
1439  if (nbytes != sizeof(len))
1440  ereport(ERROR,
1442  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1443  path)));
1444 
1445  if (len <= 0)
1446  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1447  len, path);
1448 
1449  /* make sure we have sufficiently large buffer */
1450  buffer = repalloc(buffer, len);
1451 
1452  /* and finally read the data into the buffer */
1453  if (BufFileRead(fd, buffer, len) != len)
1454  ereport(ERROR,
1456  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1457  path)));
1458 
1459  /* copy the buffer to the stringinfo and call apply_dispatch */
1460  resetStringInfo(&s2);
1461  appendBinaryStringInfo(&s2, buffer, len);
1462 
1463  /* Ensure we are reading the data into our memory context. */
1465 
1466  apply_dispatch(&s2);
1467 
1469 
1470  MemoryContextSwitchTo(oldcxt);
1471 
1472  nchanges++;
1473 
1474  if (nchanges % 1000 == 0)
1475  elog(DEBUG1, "replayed %d changes from file \"%s\"",
1476  nchanges, path);
1477  }
1478 
1479  BufFileClose(fd);
1480 
1481  pfree(buffer);
1482  pfree(s2.data);
1483 
1484  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1485  nchanges, path);
1486 
1487  return;
1488 }
static MemoryContext ApplyMessageContext
Definition: worker.c:245
static void apply_dispatch(StringInfo s)
Definition: worker.c:2510
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:582
int errcode_for_file_access(void)
Definition: elog.c:718
MemoryContext TopTransactionContext
Definition: mcxt.c:135
void pfree(void *pointer)
Definition: mcxt.c:1306
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1321
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
const void size_t len
char * s2
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59

References appendBinaryStringInfo(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileRead(), changes_filename(), CHECK_FOR_INTERRUPTS, DEBUG1, elog(), end_replication_step(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), in_remote_transaction, initStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pfree(), pgstat_report_activity(), remote_final_lsn, repalloc(), resetStringInfo(), s2, STATE_RUNNING, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, and TopTransactionContext.

Referenced by apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 3613 of file worker.c.

3614 {
3615  int worker_slot = DatumGetInt32(main_arg);
3616  MemoryContext oldctx;
3617  char originname[NAMEDATALEN];
3618  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
3619  char *myslotname = NULL;
3621  int server_version;
3622 
3623  /* Attach to slot */
3624  logicalrep_worker_attach(worker_slot);
3625 
3626  /* Setup signal handling */
3628  pqsignal(SIGTERM, die);
3630 
3631  /*
3632  * We don't currently need any ResourceOwner in a walreceiver process, but
3633  * if we did, we could call CreateAuxProcessResourceOwner here.
3634  */
3635 
3636  /* Initialise stats to a sanish value */
3639 
3640  /* Load the libpq-specific functions */
3641  load_file("libpqwalreceiver", false);
3642 
3643  /* Run as replica session replication role. */
3644  SetConfigOption("session_replication_role", "replica",
3646 
3647  /* Connect to our database. */
3650  0);
3651 
3652  /*
3653  * Set always-secure search path, so malicious users can't redirect user
3654  * code (e.g. pg_index.indexprs).
3655  */
3656  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3657 
3658  /* Load the subscription into persistent memory context. */
3660  "ApplyContext",
3664 
3666  if (!MySubscription)
3667  {
3668  ereport(LOG,
3669  (errmsg("logical replication apply worker for subscription %u will not "
3670  "start because the subscription was removed during startup",
3672  proc_exit(0);
3673  }
3674 
3675  MySubscriptionValid = true;
3676  MemoryContextSwitchTo(oldctx);
3677 
3678  if (!MySubscription->enabled)
3679  {
3680  ereport(LOG,
3681  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3682  "start because the subscription was disabled during startup",
3683  MySubscription->name)));
3684 
3685  proc_exit(0);
3686  }
3687 
3688  /* Setup synchronous commit according to the user's wishes */
3689  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3691 
3692  /* Keep us informed about subscription changes. */
3695  (Datum) 0);
3696 
3697  if (am_tablesync_worker())
3698  ereport(LOG,
3699  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3701  else
3702  ereport(LOG,
3703  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3704  MySubscription->name)));
3705 
3707 
3708  /* Connect to the origin and start the replication. */
3709  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3711 
3712  if (am_tablesync_worker())
3713  {
3714  start_table_sync(&origin_startpos, &myslotname);
3715 
3716  /*
3717  * Allocate the origin name in long-lived context for error context
3718  * message.
3719  */
3722  originname,
3723  sizeof(originname));
3725  originname);
3726  }
3727  else
3728  {
3729  /* This is main apply worker */
3730  RepOriginId originid;
3731  TimeLineID startpointTLI;
3732  char *err;
3733 
3734  myslotname = MySubscription->slotname;
3735 
3736  /*
3737  * This shouldn't happen if the subscription is enabled, but guard
3738  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3739  * crash if slot is NULL.)
3740  */
3741  if (!myslotname)
3742  ereport(ERROR,
3743  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3744  errmsg("subscription has no replication slot set")));
3745 
3746  /* Setup replication origin tracking. */
3749  originname, sizeof(originname));
3750  originid = replorigin_by_name(originname, true);
3751  if (!OidIsValid(originid))
3752  originid = replorigin_create(originname);
3753  replorigin_session_setup(originid);
3754  replorigin_session_origin = originid;
3755  origin_startpos = replorigin_session_get_progress(false);
3757 
3759  MySubscription->name, &err);
3760  if (LogRepWorkerWalRcvConn == NULL)
3761  ereport(ERROR,
3762  (errcode(ERRCODE_CONNECTION_FAILURE),
3763  errmsg("could not connect to the publisher: %s", err)));
3764 
3765  /*
3766  * We don't really use the output identify_system for anything but it
3767  * does some initializations on the upstream so let's still call it.
3768  */
3769  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3770 
3771  /*
3772  * Allocate the origin name in long-lived context for error context
3773  * message.
3774  */
3776  originname);
3777  }
3778 
3779  /*
3780  * Setup callback for syscache so that we know when something changes in
3781  * the subscription relation state.
3782  */
3785  (Datum) 0);
3786 
3787  /* Build logical replication streaming options. */
3788  options.logical = true;
3789  options.startpoint = origin_startpos;
3790  options.slotname = myslotname;
3791 
3793  options.proto.logical.proto_version =
3797 
3798  options.proto.logical.publication_names = MySubscription->publications;
3799  options.proto.logical.binary = MySubscription->binary;
3800  options.proto.logical.streaming = MySubscription->stream;
3801  options.proto.logical.twophase = false;
3802  options.proto.logical.origin = pstrdup(MySubscription->origin);
3803 
3804  if (!am_tablesync_worker())
3805  {
3806  /*
3807  * Even when the two_phase mode is requested by the user, it remains
3808  * as the tri-state PENDING until all tablesyncs have reached READY
3809  * state. Only then, can it become ENABLED.
3810  *
3811  * Note: If the subscription has no tables then leave the state as
3812  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3813  * work.
3814  */
3817  {
3818  /* Start streaming with two_phase enabled */
3819  options.proto.logical.twophase = true;
3821 
3826  }
3827  else
3828  {
3830  }
3831 
3832  ereport(DEBUG1,
3833  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
3838  "?")));
3839  }
3840  else
3841  {
3842  /* Start normal logical streaming replication. */
3844  }
3845 
3846  /* Run the main loop. */
3847  start_apply(origin_startpos);
3848 
3849  proc_exit(0);
3850 }
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
Definition: worker.c:3542
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3157
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:376
static void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:3585
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:251
static bool MySubscriptionValid
Definition: worker.c:254
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1573
#define OidIsValid(objectId)
Definition: c.h:711
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
#define LOG
Definition: elog.h:27
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4158
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_attach(int slot)
Definition: launcher.c:565
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:38
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:39
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:37
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1910
char * pstrdup(const char *in)
Definition: mcxt.c:1483
MemoryContext TopMemoryContext
Definition: mcxt.c:130
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1470
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1083
RepOriginId replorigin_session_origin
Definition: origin.c:156
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1218
#define NAMEDATALEN
static int server_version
Definition: pg_dumpall.c:110
static char ** options
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
uintptr_t Datum
Definition: postgres.h:412
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:550
#define InvalidOid
Definition: postgres_ext.h:36
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5601
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5568
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
@ SUBSCRIPTIONOID
Definition: syscache.h:99
bool AllTablesyncsReady(void)
Definition: tablesync.c:1541
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1566
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:422
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:408
#define walrcv_server_version(conn)
Definition: walreceiver.h:418
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:416
#define SIGHUP
Definition: win32_port.h:176
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AllTablesyncsReady(), am_tablesync_worker(), apply_error_callback_arg, ApplyContext, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32(), LogicalRepWorker::dbid, DEBUG1, die, elog(), Subscription::enabled, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), InvalidOid, InvalidXLogRecPtr, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_attach(), LogRepWorkerWalRcvConn, MemoryContextStrdup(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, Subscription::origin, ApplyErrorCallbackArg::origin_name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), pstrdup(), Subscription::publications, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, server_version, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, start_apply(), start_table_sync(), StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, Subscription::twophasestate, UpdateTwoPhaseState(), LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

◆ begin_replication_step()

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 3366 of file worker.c.

3367 {
3368  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
3369 }
#define snprintf
Definition: port.h:238

References MAXPGPATH, and snprintf.

Referenced by apply_handle_stream_abort(), apply_spooled_messages(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 1764 of file worker.c.

1765 {
1766  /*
1767  * For partitioned tables, we only need to care if the target partition is
1768  * updatable (aka has PK or RI defined for it).
1769  */
1770  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1771  return;
1772 
1773  /* Updatable, no error. */
1774  if (rel->updatable)
1775  return;
1776 
1777  /*
1778  * We are in error mode so it's fine this is somewhat slow. It's better to
1779  * give user correct error.
1780  */
1782  {
1783  ereport(ERROR,
1784  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1785  errmsg("publisher did not send replica identity column "
1786  "expected by the logical replication target relation \"%s.%s\"",
1787  rel->remoterel.nspname, rel->remoterel.relname)));
1788  }
1789 
1790  ereport(ERROR,
1791  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1792  errmsg("logical replication target relation \"%s.%s\" has "
1793  "neither REPLICA IDENTITY index nor PRIMARY "
1794  "KEY and published relation does not have "
1795  "REPLICA IDENTITY FULL",
1796  rel->remoterel.nspname, rel->remoterel.relname)));
1797 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1620

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 3951 of file worker.c.

3952 {
3953  Relation rel;
3954  Form_pg_subscription subform;
3955  HeapTuple tup;
3956  XLogRecPtr myskiplsn = MySubscription->skiplsn;
3957  bool started_tx = false;
3958 
3959  if (likely(XLogRecPtrIsInvalid(myskiplsn)))
3960  return;
3961 
3962  if (!IsTransactionState())
3963  {
3965  started_tx = true;
3966  }
3967 
3968  /*
3969  * Protect subskiplsn of pg_subscription from being concurrently updated
3970  * while clearing it.
3971  */
3972  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
3973  AccessShareLock);
3974 
3975  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
3976 
3977  /* Fetch the existing tuple. */
3980 
3981  if (!HeapTupleIsValid(tup))
3982  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
3983 
3984  subform = (Form_pg_subscription) GETSTRUCT(tup);
3985 
3986  /*
3987  * Clear the subskiplsn. If the user has already changed subskiplsn before
3988  * clearing it we don't update the catalog and the replication origin
3989  * state won't get advanced. So in the worst case, if the server crashes
3990  * before sending an acknowledgment of the flush position the transaction
3991  * will be sent again and the user needs to set subskiplsn again. We can
3992  * reduce the possibility by logging a replication origin WAL record to
3993  * advance the origin LSN instead but there is no way to advance the
3994  * origin timestamp and it doesn't seem to be worth doing anything about
3995  * it since it's a very rare case.
3996  */
3997  if (subform->subskiplsn == myskiplsn)
3998  {
3999  bool nulls[Natts_pg_subscription];
4000  bool replaces[Natts_pg_subscription];
4001  Datum values[Natts_pg_subscription];
4002 
4003  memset(values, 0, sizeof(values));
4004  memset(nulls, false, sizeof(nulls));
4005  memset(replaces, false, sizeof(replaces));
4006 
4007  /* reset subskiplsn */
4008  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4009  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4010 
4011  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4012  replaces);
4013  CatalogTupleUpdate(rel, &tup->t_self, tup);
4014 
4015  if (myskiplsn != finish_lsn)
4016  ereport(WARNING,
4017  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4018  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4019  LSN_FORMAT_ARGS(finish_lsn),
4020  LSN_FORMAT_ARGS(myskiplsn)));
4021  }
4022 
4023  heap_freetuple(tup);
4024  table_close(rel, NoLock);
4025 
4026  if (started_tx)
4028 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define likely(x)
Definition: c.h:294
int errdetail(const char *fmt,...)
Definition: elog.c:1039
#define WARNING
Definition: elog.h:32
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
FormData_pg_subscription * Form_pg_subscription
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179

References AccessShareLock, CatalogTupleUpdate(), CommitTransactionCommand(), elog(), ereport, errdetail(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData* create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 500 of file worker.c.

501 {
502  ApplyExecutionData *edata;
503  EState *estate;
504  RangeTblEntry *rte;
505  ResultRelInfo *resultRelInfo;
506 
507  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
508  edata->targetRel = rel;
509 
510  edata->estate = estate = CreateExecutorState();
511 
512  rte = makeNode(RangeTblEntry);
513  rte->rtekind = RTE_RELATION;
514  rte->relid = RelationGetRelid(rel->localrel);
515  rte->relkind = rel->localrel->rd_rel->relkind;
517  ExecInitRangeTable(estate, list_make1(rte));
518 
519  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
520 
521  /*
522  * Use Relation opened by logicalrep_rel_open() instead of opening it
523  * again.
524  */
525  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
526 
527  /*
528  * We put the ResultRelInfo in the es_opened_result_relations list, even
529  * though we don't populate the es_result_relations array. That's a bit
530  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
531  *
532  * ExecOpenIndices() is not called here either, each execution path doing
533  * an apply operation being responsible for that.
534  */
536  lappend(estate->es_opened_result_relations, resultRelInfo);
537 
538  estate->es_output_cid = GetCurrentCommandId(true);
539 
540  /* Prepare to catch AFTER triggers. */
542 
543  /* other fields of edata remain NULL for now */
544 
545  return edata;
546 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1196
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:753
EState * CreateExecutorState(void)
Definition: execUtils.c:90
void * palloc0(Size size)
Definition: mcxt.c:1230
@ RTE_RELATION
Definition: parsenodes.h:1011
#define list_make1(x1)
Definition: pg_list.h:210
#define RelationGetRelid(relation)
Definition: rel.h:501
List * es_opened_result_relations
Definition: execnodes.h:633
CommandId es_output_cid
Definition: execnodes.h:627
RTEKind rtekind
Definition: parsenodes.h:1030
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4954
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:817

References AccessShareLock, AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, palloc0(), RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

static void DisableSubscriptionAndExit ( void  )
static

Definition at line 3857 of file worker.c.

3858 {
3859  /*
3860  * Emit the error message, and recover from the error state to an idle
3861  * state
3862  */
3863  HOLD_INTERRUPTS();
3864 
3865  EmitErrorReport();
3867  FlushErrorState();
3868 
3870 
3871  /* Report the worker failed during either table synchronization or apply */
3873  !am_tablesync_worker());
3874 
3875  /* Disable the subscription */
3879 
3880  /* Notify the subscription has been disabled and exit */
3881  ereport(LOG,
3882  errmsg("subscription \"%s\" has been disabled because of an error",
3883  MySubscription->name));
3884 
3885  proc_exit(0);
3886 }
void EmitErrorReport(void)
Definition: elog.c:1506
void FlushErrorState(void)
Definition: elog.c:1651
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4692

References AbortOutOfAnyTransaction(), am_tablesync_worker(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( EState estate,
Relation  localrel,
LogicalRepRelation remoterel,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 2105 of file worker.c.

2109 {
2110  Oid idxoid;
2111  bool found;
2112 
2113  /*
2114  * Regardless of the top-level operation, we're performing a read here, so
2115  * check for SELECT privileges.
2116  */
2117  TargetPrivilegesCheck(localrel, ACL_SELECT);
2118 
2119  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2120 
2121  idxoid = GetRelationIdentityOrPK(localrel);
2122  Assert(OidIsValid(idxoid) ||
2123  (remoterel->replident == REPLICA_IDENTITY_FULL));
2124 
2125  if (OidIsValid(idxoid))
2126  found = RelationFindReplTupleByIndex(localrel, idxoid,
2128  remoteslot, *localslot);
2129  else
2130  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2131  remoteslot, *localslot);
2132 
2133  return found;
2134 }
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:84

References ACL_SELECT, Assert(), EState::es_tupleTable, GetRelationIdentityOrPK(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), and TargetPrivilegesCheck().

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 553 of file worker.c.

554 {
555  EState *estate = edata->estate;
556 
557  /* Handle any queued AFTER triggers. */
558  AfterTriggerEndQuery(estate);
559 
560  /* Shut down tuple routing, if any was done. */
561  if (edata->proute)
562  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
563 
564  /*
565  * Cleanup. It might seem that we should call ExecCloseResultRelations()
566  * here, but we intentionally don't. It would close the rel we added to
567  * es_opened_result_relations above, which is wrong because we took no
568  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
569  * any other relations opened during execution.
570  */
571  ExecResetTupleTable(estate->es_tupleTable, false);
572  FreeExecutorState(estate);
573  pfree(edata);
574 }
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
void FreeExecutorState(EState *estate)
Definition: execUtils.c:188
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4974

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 2630 of file worker.c.

2632 {
2633  dlist_mutable_iter iter;
2634  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
2635 
2637  *flush = InvalidXLogRecPtr;
2638 
2640  {
2641  FlushPosition *pos =
2642  dlist_container(FlushPosition, node, iter.cur);
2643 
2644  *write = pos->remote_end;
2645 
2646  if (pos->local_end <= local_flush)
2647  {
2648  *flush = pos->remote_end;
2649  dlist_delete(iter.cur);
2650  pfree(pos);
2651  }
2652  else
2653  {
2654  /*
2655  * Don't want to uselessly iterate over the rest of the list which
2656  * could potentially be long. Instead get the last element and
2657  * grab the write position from there.
2658  */
2659  pos = dlist_tail_element(FlushPosition, node,
2660  &lsn_mapping);
2661  *write = pos->remote_end;
2662  *have_pending_txes = true;
2663  return;
2664  }
2665  }
2666 
2667  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2668 }
static dlist_head lsn_mapping
Definition: worker.c:208
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:325
static void dlist_delete(dlist_node *node)
Definition: ilist.h:394
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:562
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:590
#define dlist_container(type, membername, ptr)
Definition: ilist.h:543
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:205
XLogRecPtr local_end
Definition: worker.c:204
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6077

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback().

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 1620 of file worker.c.

1621 {
1622  Oid idxoid;
1623 
1624  idxoid = RelationGetReplicaIndex(rel);
1625 
1626  if (!OidIsValid(idxoid))
1627  idxoid = RelationGetPrimaryKeyIndex(rel);
1628 
1629  return idxoid;
1630 }
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4926
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4947

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by check_relation_updatable(), and FindReplTupleInLocalRel().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 462 of file worker.c.

463 {
464  TransactionId xid;
465 
466  /* not in streaming mode */
468  return false;
469 
470  Assert(stream_fd != NULL);
472 
473  /*
474  * We should have received XID of the subxact as the first part of the
475  * message, so extract it.
476  */
477  xid = pq_getmsgint(s, 4);
478 
479  if (!TransactionIdIsValid(xid))
480  ereport(ERROR,
481  (errcode(ERRCODE_PROTOCOL_VIOLATION),
482  errmsg_internal("invalid transaction ID in streamed replication transaction")));
483 
484  /* Add the new subxact to the array (unless already there). */
485  subxact_info_add(xid);
486 
487  /* write the change to the current file */
489 
490  return true;
491 }
static void subxact_info_add(TransactionId xid)
Definition: worker.c:3281
static BufFile * stream_fd
Definition: worker.c:280
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:3477
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

References generate_unaccent_rules::action, Assert(), ereport, errcode(), errmsg_internal(), ERROR, in_streamed_transaction, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 3892 of file worker.c.

3893 {
3894  return MyLogicalRepWorker != NULL;
3895 }

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 2709 of file worker.c.

2710 {
2711  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2712  bool ping_sent = false;
2713  TimeLineID tli;
2714  ErrorContextCallback errcallback;
2715 
2716  /*
2717  * Init the ApplyMessageContext which we clean up after each replication
2718  * protocol message.
2719  */
2721  "ApplyMessageContext",
2723 
2724  /*
2725  * This memory context is used for per-stream data when the streaming mode
2726  * is enabled. This context is reset on each stream stop.
2727  */
2729  "LogicalStreamingContext",
2731 
2732  /* mark as idle, before starting to loop */
2734 
2735  /*
2736  * Push apply error context callback. Fields will be filled while applying
2737  * a change.
2738  */
2739  errcallback.callback = apply_error_callback;
2740  errcallback.previous = error_context_stack;
2741  error_context_stack = &errcallback;
2742 
2743  /* This outer loop iterates once per wait. */
2744  for (;;)
2745  {
2747  int rc;
2748  int len;
2749  char *buf = NULL;
2750  bool endofstream = false;
2751  long wait_time;
2752 
2754 
2756 
2758 
2759  if (len != 0)
2760  {
2761  /* Loop to process all available data (without blocking). */
2762  for (;;)
2763  {
2765 
2766  if (len == 0)
2767  {
2768  break;
2769  }
2770  else if (len < 0)
2771  {
2772  ereport(LOG,
2773  (errmsg("data stream from publisher has ended")));
2774  endofstream = true;
2775  break;
2776  }
2777  else
2778  {
2779  int c;
2780  StringInfoData s;
2781 
2782  /* Reset timeout. */
2783  last_recv_timestamp = GetCurrentTimestamp();
2784  ping_sent = false;
2785 
2786  /* Ensure we are reading the data into our memory context. */
2788 
2789  s.data = buf;
2790  s.len = len;
2791  s.cursor = 0;
2792  s.maxlen = -1;
2793 
2794  c = pq_getmsgbyte(&s);
2795 
2796  if (c == 'w')
2797  {
2798  XLogRecPtr start_lsn;
2799  XLogRecPtr end_lsn;
2800  TimestampTz send_time;
2801 
2802  start_lsn = pq_getmsgint64(&s);
2803  end_lsn = pq_getmsgint64(&s);
2804  send_time = pq_getmsgint64(&s);
2805 
2806  if (last_received < start_lsn)
2807  last_received = start_lsn;
2808 
2809  if (last_received < end_lsn)
2810  last_received = end_lsn;
2811 
2812  UpdateWorkerStats(last_received, send_time, false);
2813 
2814  apply_dispatch(&s);
2815  }
2816  else if (c == 'k')
2817  {
2818  XLogRecPtr end_lsn;
2820  bool reply_requested;
2821 
2822  end_lsn = pq_getmsgint64(&s);
2823  timestamp = pq_getmsgint64(&s);
2824  reply_requested = pq_getmsgbyte(&s);
2825 
2826  if (last_received < end_lsn)
2827  last_received = end_lsn;
2828 
2829  send_feedback(last_received, reply_requested, false);
2830  UpdateWorkerStats(last_received, timestamp, true);
2831  }
2832  /* other message types are purposefully ignored */
2833 
2835  }
2836 
2838  }
2839  }
2840 
2841  /* confirm all writes so far */
2842  send_feedback(last_received, false, false);
2843 
2845  {
2846  /*
2847  * If we didn't get any transactions for a while there might be
2848  * unconsumed invalidation messages in the queue, consume them
2849  * now.
2850  */
2853 
2854  /* Process any table synchronization changes. */
2855  process_syncing_tables(last_received);
2856  }
2857 
2858  /* Cleanup the memory. */
2861 
2862  /* Check if we need to exit the streaming loop. */
2863  if (endofstream)
2864  break;
2865 
2866  /*
2867  * Wait for more data or latch. If we have unflushed transactions,
2868  * wake up after WalWriterDelay to see if they've been flushed yet (in
2869  * which case we should send a feedback message). Otherwise, there's
2870  * no particular urgency about waking up unless we get data or a
2871  * signal.
2872  */
2873  if (!dlist_is_empty(&lsn_mapping))
2874  wait_time = WalWriterDelay;
2875  else
2876  wait_time = NAPTIME_PER_CYCLE;
2877 
2881  fd, wait_time,
2883 
2884  if (rc & WL_LATCH_SET)
2885  {
2888  }
2889 
2890  if (ConfigReloadPending)
2891  {
2892  ConfigReloadPending = false;
2894  }
2895 
2896  if (rc & WL_TIMEOUT)
2897  {
2898  /*
2899  * We didn't receive anything new. If we haven't heard anything
2900  * from the server for more than wal_receiver_timeout / 2, ping
2901  * the server. Also, if it's been longer than
2902  * wal_receiver_status_interval since the last update we sent,
2903  * send a status update to the primary anyway, to report any
2904  * progress in applying WAL.
2905  */
2906  bool requestReply = false;
2907 
2908  /*
2909  * Check if time since last receive from primary has reached the
2910  * configured limit.
2911  */
2912  if (wal_receiver_timeout > 0)
2913  {
2915  TimestampTz timeout;
2916 
2917  timeout =
2918  TimestampTzPlusMilliseconds(last_recv_timestamp,
2920 
2921  if (now >= timeout)
2922  ereport(ERROR,
2923  (errcode(ERRCODE_CONNECTION_FAILURE),
2924  errmsg("terminating logical replication worker due to timeout")));
2925 
2926  /* Check to see if it's time for a ping. */
2927  if (!ping_sent)
2928  {
2929  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2930  (wal_receiver_timeout / 2));
2931  if (now >= timeout)
2932  {
2933  requestReply = true;
2934  ping_sent = true;
2935  }
2936  }
2937  }
2938 
2939  send_feedback(last_received, requestReply, requestReply);
2940 
2941  /*
2942  * Force reporting to ensure long idle periods don't lead to
2943  * arbitrarily delayed stats. Stats can only be reported outside
2944  * of (implicit or explicit) transactions. That shouldn't lead to
2945  * stats being delayed for long, because transactions are either
2946  * sent as a whole on commit or streamed. Streamed transactions
2947  * are spilled to disk and applied on commit.
2948  */
2949  if (!IsTransactionState())
2950  pgstat_report_stat(true);
2951  }
2952  }
2953 
2954  /* Pop the error context stack */
2955  error_context_stack = errcallback.previous;
2956 
2957  /* All done */
2959 }
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2693
#define NAPTIME_PER_CYCLE
Definition: worker.c:199
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2968
static void apply_error_callback(void *arg)
Definition: worker.c:4032
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1537
int64 TimestampTz
Definition: timestamp.h:39
ErrorContextCallback * error_context_stack
Definition: elog.c:94
struct Latch * MyLatch
Definition: globals.c:58
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:70
static char * buf
Definition: pg_test_fsync.c:67
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
char * c
struct ErrorContextCallback * previous
Definition: elog.h:234
void(* callback)(void *arg)
Definition: elog.h:235
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:84
@ WAIT_EVENT_LOGICAL_APPLY_MAIN
Definition: wait_event.h:43
int wal_receiver_timeout
Definition: walreceiver.c:91
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:424
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:426
int WalWriterDelay
Definition: walwriter.c:70

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, IsTransactionState(), StringInfoData::len, len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by start_apply().

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 3057 of file worker.c.

3058 {
3059  MemoryContext oldctx;
3061  bool started_tx = false;
3062 
3063  /* When cache state is valid there is nothing to do here. */
3064  if (MySubscriptionValid)
3065  return;
3066 
3067  /* This function might be called inside or outside of transaction. */
3068  if (!IsTransactionState())
3069  {
3071  started_tx = true;
3072  }
3073 
3074  /* Ensure allocations in permanent context. */
3076 
3078 
3079  /*
3080  * Exit if the subscription was removed. This normally should not happen
3081  * as the worker gets killed during DROP SUBSCRIPTION.
3082  */
3083  if (!newsub)
3084  {
3085  ereport(LOG,
3086  (errmsg("logical replication apply worker for subscription \"%s\" will "
3087  "stop because the subscription was removed",
3088  MySubscription->name)));
3089 
3090  proc_exit(0);
3091  }
3092 
3093  /* Exit if the subscription was disabled. */
3094  if (!newsub->enabled)
3095  {
3096  ereport(LOG,
3097  (errmsg("logical replication apply worker for subscription \"%s\" will "
3098  "stop because the subscription was disabled",
3099  MySubscription->name)));
3100 
3101  proc_exit(0);
3102  }
3103 
3104  /* !slotname should never happen when enabled is true. */
3105  Assert(newsub->slotname);
3106 
3107  /* two-phase should not be altered */
3108  Assert(newsub->twophasestate == MySubscription->twophasestate);
3109 
3110  /*
3111  * Exit if any parameter that affects the remote connection was changed.
3112  * The launcher will start a new worker.
3113  */
3114  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3115  strcmp(newsub->name, MySubscription->name) != 0 ||
3116  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3117  newsub->binary != MySubscription->binary ||
3118  newsub->stream != MySubscription->stream ||
3119  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3120  newsub->owner != MySubscription->owner ||
3121  !equal(newsub->publications, MySubscription->publications))
3122  {
3123  ereport(LOG,
3124  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
3125  MySubscription->name)));
3126 
3127  proc_exit(0);
3128  }
3129 
3130  /* Check for other changes that should never happen too. */
3131  if (newsub->dbid != MySubscription->dbid)
3132  {
3133  elog(ERROR, "subscription %u changed unexpectedly",
3135  }
3136 
3137  /* Clean old subscription info and switch to new one. */
3140 
3141  MemoryContextSwitchTo(oldctx);
3142 
3143  /* Change synchronous commit according to the user's wishes */
3144  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3146 
3147  if (started_tx)
3149 
3150  MySubscriptionValid = true;
3151 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:225
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References ApplyContext, Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog(), equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), and LogicalRepApplyLoop().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 3902 of file worker.c.

3903 {
3907 
3908  /*
3909  * Quick return if it's not requested to skip this transaction. This
3910  * function is called for every remote transaction and we assume that
3911  * skipping the transaction is not used often.
3912  */
3914  MySubscription->skiplsn != finish_lsn))
3915  return;
3916 
3917  /* Start skipping all changes of this transaction */
3918  skip_xact_finish_lsn = finish_lsn;
3919 
3920  ereport(LOG,
3921  errmsg("logical replication starts skipping transaction at LSN %X/%X",
3923 }
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:276

References Assert(), ereport, errmsg(), in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 376 of file worker.c.

378 {
379  if (OidIsValid(relid))
380  {
381  /* Replication origin name for tablesync workers. */
382  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
383  }
384  else
385  {
386  /* Replication origin name for non-tablesync workers. */
387  snprintf(originname, szoriginname, "pg_%u", suboid);
388  }
389 }

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), ApplyWorkerMain(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

◆ reset_apply_error_context_info()

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 2968 of file worker.c.

2969 {
2970  static StringInfo reply_message = NULL;
2971  static TimestampTz send_time = 0;
2972 
2973  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2974  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2975  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2976 
2977  XLogRecPtr writepos;
2978  XLogRecPtr flushpos;
2979  TimestampTz now;
2980  bool have_pending_txes;
2981 
2982  /*
2983  * If the user doesn't want status to be reported to the publisher, be
2984  * sure to exit before doing anything at all.
2985  */
2986  if (!force && wal_receiver_status_interval <= 0)
2987  return;
2988 
2989  /* It's legal to not pass a recvpos */
2990  if (recvpos < last_recvpos)
2991  recvpos = last_recvpos;
2992 
2993  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2994 
2995  /*
2996  * No outstanding transactions to flush, we can report the latest received
2997  * position. This is important for synchronous replication.
2998  */
2999  if (!have_pending_txes)
3000  flushpos = writepos = recvpos;
3001 
3002  if (writepos < last_writepos)
3003  writepos = last_writepos;
3004 
3005  if (flushpos < last_flushpos)
3006  flushpos = last_flushpos;
3007 
3009 
3010  /* if we've already reported everything we're good */
3011  if (!force &&
3012  writepos == last_writepos &&
3013  flushpos == last_flushpos &&
3014  !TimestampDifferenceExceeds(send_time, now,
3016  return;
3017  send_time = now;
3018 
3019  if (!reply_message)
3020  {
3022 
3024  MemoryContextSwitchTo(oldctx);
3025  }
3026  else
3028 
3029  pq_sendbyte(reply_message, 'r');
3030  pq_sendint64(reply_message, recvpos); /* write */
3031  pq_sendint64(reply_message, flushpos); /* flush */
3032  pq_sendint64(reply_message, writepos); /* apply */
3033  pq_sendint64(reply_message, now); /* sendTime */
3034  pq_sendbyte(reply_message, requestReply); /* replyRequested */
3035 
3036  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3037  force,
3038  LSN_FORMAT_ARGS(recvpos),
3039  LSN_FORMAT_ARGS(writepos),
3040  LSN_FORMAT_ARGS(flushpos));
3041 
3044 
3045  if (recvpos > last_recvpos)
3046  last_recvpos = recvpos;
3047  if (writepos > last_writepos)
3048  last_writepos = writepos;
3049  if (flushpos > last_flushpos)
3050  last_flushpos = flushpos;
3051 }
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:2630
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
#define DEBUG2
Definition: elog.h:25
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
static StringInfoData reply_message
Definition: walreceiver.c:134
int wal_receiver_status_interval
Definition: walreceiver.c:90
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:428

References ApplyContext, StringInfoData::data, DEBUG2, elog(), get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

◆ set_apply_error_context_xact()

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 405 of file worker.c.

406 {
407  if (am_tablesync_worker())
408  return MyLogicalRepWorker->relid == rel->localreloid;
409  else
410  return (rel->state == SUBREL_STATE_READY ||
411  (rel->state == SUBREL_STATE_SYNCDONE &&
412  rel->statelsn <= remote_final_lsn));
413 }

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 584 of file worker.c.

586 {
587  TupleDesc desc = RelationGetDescr(rel->localrel);
588  int num_phys_attrs = desc->natts;
589  int i;
590  int attnum,
591  num_defaults = 0;
592  int *defmap;
593  ExprState **defexprs;
594  ExprContext *econtext;
595 
596  econtext = GetPerTupleExprContext(estate);
597 
598  /* We got all the data via replication, no need to evaluate anything. */
599  if (num_phys_attrs == rel->remoterel.natts)
600  return;
601 
602  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
603  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
604 
605  Assert(rel->attrmap->maplen == num_phys_attrs);
606  for (attnum = 0; attnum < num_phys_attrs; attnum++)
607  {
608  Expr *defexpr;
609 
610  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
611  continue;
612 
613  if (rel->attrmap->attnums[attnum] >= 0)
614  continue;
615 
616  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
617 
618  if (defexpr != NULL)
619  {
620  /* Run the expression through planner */
621  defexpr = expression_planner(defexpr);
622 
623  /* Initialize executable expression in copycontext */
624  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
625  defmap[num_defaults] = attnum;
626  num_defaults++;
627  }
628  }
629 
630  for (i = 0; i < num_defaults; i++)
631  slot->tts_values[defmap[i]] =
632  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
633 }
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:124
#define GetPerTupleExprContext(estate)
Definition: executor.h:535
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:318
int16 attnum
Definition: pg_attribute.h:83
Expr * expression_planner(Expr *expr)
Definition: planner.c:6147
Node * build_column_default(Relation rel, int attrno)
int maplen
Definition: attmap.h:37
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126

References Assert(), attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 742 of file worker.c.

745 {
746  int natts = slot->tts_tupleDescriptor->natts;
747  int i;
748 
749  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
750  ExecClearTuple(slot);
751 
752  /*
753  * Copy all the column data from srcslot, so that we'll have valid values
754  * for unreplaced columns.
755  */
756  Assert(natts == srcslot->tts_tupleDescriptor->natts);
757  slot_getallattrs(srcslot);
758  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
759  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
760 
761  /* Call the "in" function for each replaced attribute */
762  Assert(natts == rel->attrmap->maplen);
763  for (i = 0; i < natts; i++)
764  {
766  int remoteattnum = rel->attrmap->attnums[i];
767 
768  if (remoteattnum < 0)
769  continue;
770 
771  Assert(remoteattnum < tupleData->ncols);
772 
773  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
774  {
775  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
776 
777  /* Set attnum for error callback */
779 
780  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
781  {
782  Oid typinput;
783  Oid typioparam;
784 
785  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
786  slot->tts_values[i] =
787  OidInputFunctionCall(typinput, colvalue->data,
788  typioparam, att->atttypmod);
789  slot->tts_isnull[i] = false;
790  }
791  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
792  {
793  Oid typreceive;
794  Oid typioparam;
795 
796  /*
797  * In some code paths we may be asked to re-parse the same
798  * tuple data. Reset the StringInfo's cursor so that works.
799  */
800  colvalue->cursor = 0;
801 
802  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
803  slot->tts_values[i] =
804  OidReceiveFunctionCall(typreceive, colvalue,
805  typioparam, att->atttypmod);
806 
807  /* Trouble if it didn't eat the whole buffer */
808  if (colvalue->cursor != colvalue->len)
809  ereport(ERROR,
810  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
811  errmsg("incorrect binary data format in logical replication column %d",
812  remoteattnum + 1)));
813  slot->tts_isnull[i] = false;
814  }
815  else
816  {
817  /* must be LOGICALREP_COLUMN_NULL */
818  slot->tts_values[i] = (Datum) 0;
819  slot->tts_isnull[i] = true;
820  }
821 
822  /* Reset attnum for error callback */
824  }
825  }
826 
827  /* And finally, declare that "slot" contains a valid virtual tuple */
828  ExecStoreVirtualTuple(slot);
829 }
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1630
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:94
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:93
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2832
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2898
StringInfoData * colvalues
Definition: logicalproto.h:82

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 641 of file worker.c.

643 {
644  int natts = slot->tts_tupleDescriptor->natts;
645  int i;
646 
647  ExecClearTuple(slot);
648 
649  /* Call the "in" function for each non-dropped, non-null attribute */
650  Assert(natts == rel->attrmap->maplen);
651  for (i = 0; i < natts; i++)
652  {
654  int remoteattnum = rel->attrmap->attnums[i];
655 
656  if (!att->attisdropped && remoteattnum >= 0)
657  {
658  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
659 
660  Assert(remoteattnum < tupleData->ncols);
661 
662  /* Set attnum for error callback */
664 
665  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
666  {
667  Oid typinput;
668  Oid typioparam;
669 
670  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
671  slot->tts_values[i] =
672  OidInputFunctionCall(typinput, colvalue->data,
673  typioparam, att->atttypmod);
674  slot->tts_isnull[i] = false;
675  }
676  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
677  {
678  Oid typreceive;
679  Oid typioparam;
680 
681  /*
682  * In some code paths we may be asked to re-parse the same
683  * tuple data. Reset the StringInfo's cursor so that works.
684  */
685  colvalue->cursor = 0;
686 
687  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
688  slot->tts_values[i] =
689  OidReceiveFunctionCall(typreceive, colvalue,
690  typioparam, att->atttypmod);
691 
692  /* Trouble if it didn't eat the whole buffer */
693  if (colvalue->cursor != colvalue->len)
694  ereport(ERROR,
695  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
696  errmsg("incorrect binary data format in logical replication column %d",
697  remoteattnum + 1)));
698  slot->tts_isnull[i] = false;
699  }
700  else
701  {
702  /*
703  * NULL value from remote. (We don't expect to see
704  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
705  * NULL.)
706  */
707  slot->tts_values[i] = (Datum) 0;
708  slot->tts_isnull[i] = true;
709  }
710 
711  /* Reset attnum for error callback */
713  }
714  else
715  {
716  /*
717  * We assign NULL to dropped attributes and missing values
718  * (missing values should be later filled using
719  * slot_fill_defaults).
720  */
721  slot->tts_values[i] = (Datum) 0;
722  slot->tts_isnull[i] = true;
723  }
724  }
725 
726  ExecStoreVirtualTuple(slot);
727 }

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ start_apply()

static void start_apply ( XLogRecPtr  origin_startpos)
static

Definition at line 3585 of file worker.c.

3586 {
3587  PG_TRY();
3588  {
3589  LogicalRepApplyLoop(origin_startpos);
3590  }
3591  PG_CATCH();
3592  {
3595  else
3596  {
3597  /*
3598  * Report the worker failed while applying changes. Abort the
3599  * current transaction so that the stats message is sent in an
3600  * idle state.
3601  */
3604 
3605  PG_RE_THROW();
3606  }
3607  }
3608  PG_END_TRY();
3609 }
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2709
static void DisableSubscriptionAndExit(void)
Definition: worker.c:3857
#define PG_RE_THROW()
Definition: elog.h:350
#define PG_TRY(...)
Definition: elog.h:309
#define PG_END_TRY(...)
Definition: elog.h:334
#define PG_CATCH(...)
Definition: elog.h:319

References AbortOutOfAnyTransaction(), am_tablesync_worker(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by ApplyWorkerMain().

◆ start_table_sync()

static void start_table_sync ( XLogRecPtr origin_startpos,
char **  myslotname 
)
static

Definition at line 3542 of file worker.c.

3543 {
3544  char *syncslotname = NULL;
3545 
3547 
3548  PG_TRY();
3549  {
3550  /* Call initial sync. */
3551  syncslotname = LogicalRepSyncTableStart(origin_startpos);
3552  }
3553  PG_CATCH();
3554  {
3557  else
3558  {
3559  /*
3560  * Report the worker failed during table synchronization. Abort
3561  * the current transaction so that the stats message is sent in an
3562  * idle state.
3563  */
3566 
3567  PG_RE_THROW();
3568  }
3569  }
3570  PG_END_TRY();
3571 
3572  /* allocate slot name in long-lived context */
3573  *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3574  pfree(syncslotname);
3575 }
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1205

References AbortOutOfAnyTransaction(), am_tablesync_worker(), ApplyContext, Assert(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MySubscription, Subscription::oid, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by ApplyWorkerMain().

◆ stop_skipping_changes()

static void stop_skipping_changes ( void  )
static

Definition at line 3929 of file worker.c.

3930 {
3931  if (!is_skipping_changes())
3932  return;
3933 
3934  ereport(LOG,
3935  (errmsg("logical replication completed skipping transaction at LSN %X/%X",
3937 
3938  /* Stop skipping changes */
3940 }

References ereport, errmsg(), InvalidXLogRecPtr, is_skipping_changes, LOG, LSN_FORMAT_ARGS, and skip_xact_finish_lsn.

Referenced by apply_handle_commit_internal(), apply_handle_prepare(), and apply_handle_stream_prepare().

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 2674 of file worker.c.

2675 {
2676  FlushPosition *flushpos;
2677 
2678  /* Need to do this in permanent context */
2680 
2681  /* Track commit lsn */
2682  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2683  flushpos->local_end = XactLastCommitEnd;
2684  flushpos->remote_end = remote_lsn;
2685 
2686  dlist_push_tail(&lsn_mapping, &flushpos->node);
2688 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:353
dlist_node node
Definition: worker.c:203
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:258

References ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ stream_cleanup_files()

static void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 3380 of file worker.c.

3381 {
3382  char path[MAXPGPATH];
3383 
3384  /* Delete the changes file. */
3385  changes_filename(path, subid, xid);
3387 
3388  /* Delete the subxact file, if it exists. */
3389  subxact_filename(path, subid, xid);
3391 }
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3359
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:359

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ stream_close_file()

static void stream_close_file ( void  )
static

Definition at line 3456 of file worker.c.

3457 {
3460  Assert(stream_fd != NULL);
3461 
3463 
3465  stream_fd = NULL;
3466 }

References Assert(), BufFileClose(), in_streamed_transaction, InvalidTransactionId, stream_fd, stream_xid, and TransactionIdIsValid.

Referenced by apply_handle_stream_stop().

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first_segment 
)
static

Definition at line 3407 of file worker.c.

3408 {
3409  char path[MAXPGPATH];
3410  MemoryContext oldcxt;
3411 
3413  Assert(OidIsValid(subid));
3415  Assert(stream_fd == NULL);
3416 
3417 
3418  changes_filename(path, subid, xid);
3419  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
3420 
3421  /*
3422  * Create/open the buffiles under the logical streaming context so that we
3423  * have those files until stream stop.
3424  */
3426 
3427  /*
3428  * If this is the first streamed segment, create the changes file.
3429  * Otherwise, just open the file for writing, in append mode.
3430  */
3431  if (first_segment)
3433  path);
3434  else
3435  {
3436  /*
3437  * Open the file and seek to the end of the file because we always
3438  * append the changes file.
3439  */
3441  path, O_RDWR, false);
3442  BufFileSeek(stream_fd, 0, 0, SEEK_END);
3443  }
3444 
3445  MemoryContextSwitchTo(oldcxt);
3446 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:689
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262

References Assert(), BufFileCreateFileSet(), BufFileOpenFileSet(), BufFileSeek(), changes_filename(), DEBUG1, elog(), in_streamed_transaction, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, OidIsValid, stream_fd, LogicalRepWorker::stream_fileset, and TransactionIdIsValid.

Referenced by apply_handle_stream_start().

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 3477 of file worker.c.

3478 {
3479  int len;
3480