PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/genam.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/dynahash.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
#include "utils/usercontext.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void TwoPhaseTransactionGid (Oid subid, TransactionId xid, char *gid, int szgid)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void apply_worker_exit (void)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
static void run_apply_worker ()
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void ApplyWorkerMain (Datum main_arg)
 
void DisableSubscriptionAndExit (void)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
bool InitializingApplyWorker = false
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 361 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 220 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Enumeration Type Documentation

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 290 of file worker.c.

291 {
292  /* The action for non-streaming transactions. */
294 
295  /* Actions for streaming transactions. */
TransApplyAction
Definition: worker.c:291
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:296
@ TRANS_PARALLEL_APPLY
Definition: worker.c:299
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:297
@ TRANS_LEADER_APPLY
Definition: worker.c:293
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:298

Function Documentation

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3297 of file worker.c.

3298 {
3300  LogicalRepMsgType saved_command;
3301 
3302  /*
3303  * Set the current command being applied. Since this function can be
3304  * called recursively when applying spooled changes, save the current
3305  * command.
3306  */
3307  saved_command = apply_error_callback_arg.command;
3309 
3310  switch (action)
3311  {
3312  case LOGICAL_REP_MSG_BEGIN:
3313  apply_handle_begin(s);
3314  break;
3315 
3318  break;
3319 
3322  break;
3323 
3326  break;
3327 
3330  break;
3331 
3334  break;
3335 
3338  break;
3339 
3340  case LOGICAL_REP_MSG_TYPE:
3341  apply_handle_type(s);
3342  break;
3343 
3346  break;
3347 
3349 
3350  /*
3351  * Logical replication does not use generic logical messages yet.
3352  * Although, it could be used by other applications that use this
3353  * output plugin.
3354  */
3355  break;
3356 
3359  break;
3360 
3363  break;
3364 
3367  break;
3368 
3371  break;
3372 
3375  break;
3376 
3379  break;
3380 
3383  break;
3384 
3387  break;
3388 
3391  break;
3392 
3393  default:
3394  ereport(ERROR,
3395  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3396  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3397  }
3398 
3399  /* Reset the current command */
3400  apply_error_callback_arg.command = saved_command;
3401 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1296
static void apply_handle_type(StringInfo s)
Definition: worker.c:2349
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3169
static void apply_handle_update(StringInfo s)
Definition: worker.c:2545
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2156
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1194
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:303
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2729
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1016
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1041
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1837
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2326
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1133
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1243
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1651
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1433
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1067
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1492
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2396
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
LogicalRepMsgType command
Definition: worker.c:246

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 4918 of file worker.c.

4919 {
4921 
4923  return;
4924 
4925  Assert(errarg->origin_name);
4926 
4927  if (errarg->rel == NULL)
4928  {
4929  if (!TransactionIdIsValid(errarg->remote_xid))
4930  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4931  errarg->origin_name,
4932  logicalrep_message_type(errarg->command));
4933  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4934  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4935  errarg->origin_name,
4937  errarg->remote_xid);
4938  else
4939  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4940  errarg->origin_name,
4942  errarg->remote_xid,
4943  LSN_FORMAT_ARGS(errarg->finish_lsn));
4944  }
4945  else
4946  {
4947  if (errarg->remote_attnum < 0)
4948  {
4949  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4950  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4951  errarg->origin_name,
4953  errarg->rel->remoterel.nspname,
4954  errarg->rel->remoterel.relname,
4955  errarg->remote_xid);
4956  else
4957  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4958  errarg->origin_name,
4960  errarg->rel->remoterel.nspname,
4961  errarg->rel->remoterel.relname,
4962  errarg->remote_xid,
4963  LSN_FORMAT_ARGS(errarg->finish_lsn));
4964  }
4965  else
4966  {
4967  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4968  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4969  errarg->origin_name,
4971  errarg->rel->remoterel.nspname,
4972  errarg->rel->remoterel.relname,
4973  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4974  errarg->remote_xid);
4975  else
4976  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4977  errarg->origin_name,
4979  errarg->rel->remoterel.nspname,
4980  errarg->rel->remoterel.relname,
4981  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4982  errarg->remote_xid,
4983  LSN_FORMAT_ARGS(errarg->finish_lsn));
4984  }
4985  }
4986 }
#define errcontext
Definition: elog.h:196
Assert(fmt[strlen(fmt) - 1] !='\n')
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
TransactionId remote_xid
Definition: worker.c:251
XLogRecPtr finish_lsn
Definition: worker.c:252
LogicalRepRelMapEntry * rel
Definition: worker.c:247
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 1016 of file worker.c.

1017 {
1018  LogicalRepBeginData begin_data;
1019 
1020  /* There must not be an active streaming transaction. */
1022 
1023  logicalrep_read_begin(s, &begin_data);
1024  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1025 
1026  remote_final_lsn = begin_data.final_lsn;
1027 
1029 
1030  in_remote_transaction = true;
1031 
1033 }
bool in_remote_transaction
Definition: worker.c:328
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4990
static XLogRecPtr remote_final_lsn
Definition: worker.c:329
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4788
static TransactionId stream_xid
Definition: worker.c:334
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131

References Assert(), LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1067 of file worker.c.

1068 {
1069  LogicalRepPreparedTxnData begin_data;
1070 
1071  /* Tablesync should never receive prepare. */
1072  if (am_tablesync_worker())
1073  ereport(ERROR,
1074  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1075  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1076 
1077  /* There must not be an active streaming transaction. */
1079 
1080  logicalrep_read_begin_prepare(s, &begin_data);
1081  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1082 
1083  remote_final_lsn = begin_data.prepare_lsn;
1084 
1086 
1087  in_remote_transaction = true;
1088 
1090 }
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1162
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1041 of file worker.c.

1042 {
1043  LogicalRepCommitData commit_data;
1044 
1045  logicalrep_read_commit(s, &commit_data);
1046 
1047  if (commit_data.commit_lsn != remote_final_lsn)
1048  ereport(ERROR,
1049  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1050  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1051  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1053 
1054  apply_handle_commit_internal(&commit_data);
1055 
1056  /* Process any tables that are being synchronized in parallel. */
1057  process_syncing_tables(commit_data.end_lsn);
1058 
1061 }
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2266
static void reset_apply_error_context_info(void)
Definition: worker.c:4998
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:660

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2266 of file worker.c.

2267 {
2268  if (is_skipping_changes())
2269  {
2271 
2272  /*
2273  * Start a new transaction to clear the subskiplsn, if not started
2274  * yet.
2275  */
2276  if (!IsTransactionState())
2278  }
2279 
2280  if (IsTransactionState())
2281  {
2282  /*
2283  * The transaction is either non-empty or skipped, so we clear the
2284  * subskiplsn.
2285  */
2287 
2288  /*
2289  * Update origin state so we can restart streaming from correct
2290  * position in case of crash.
2291  */
2292  replorigin_session_origin_lsn = commit_data->end_lsn;
2294 
2296 
2297  if (IsTransactionBlock())
2298  {
2299  EndTransactionBlock(false);
2301  }
2302 
2303  pgstat_report_stat(false);
2304 
2306  }
2307  else
2308  {
2309  /* Process any invalidation messages that might have accumulated. */
2312  }
2313 
2314  in_remote_transaction = false;
2315 }
static void stop_skipping_changes(void)
Definition: worker.c:4815
#define is_skipping_changes()
Definition: worker.c:361
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4837
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3461
void maybe_reread_subscription(void)
Definition: worker.c:3887
void AcceptInvalidationMessages(void)
Definition: inval.c:807
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
long pgstat_report_stat(bool force)
Definition: pgstat.c:582
TimestampTz committime
Definition: logicalproto.h:138
bool IsTransactionState(void)
Definition: xact.c:378
void StartTransactionCommand(void)
Definition: xact.c:2953
bool IsTransactionBlock(void)
Definition: xact.c:4832
void CommitTransactionCommand(void)
Definition: xact.c:3050
bool EndTransactionBlock(bool chain)
Definition: xact.c:3906
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:259

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, EndTransactionBlock(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1194 of file worker.c.

1195 {
1196  LogicalRepCommitPreparedTxnData prepare_data;
1197  char gid[GIDSIZE];
1198 
1199  logicalrep_read_commit_prepared(s, &prepare_data);
1200  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1201 
1202  /* Compute GID for two_phase transactions. */
1204  gid, sizeof(gid));
1205 
1206  /* There is no transaction when COMMIT PREPARED is called */
1208 
1209  /*
1210  * Update origin state so we can restart streaming from correct position
1211  * in case of crash.
1212  */
1213  replorigin_session_origin_lsn = prepare_data.end_lsn;
1215 
1216  FinishPreparedTransaction(gid, true);
1219  pgstat_report_stat(false);
1220 
1222  in_remote_transaction = false;
1223 
1224  /* Process any tables that are being synchronized in parallel. */
1225  process_syncing_tables(prepare_data.end_lsn);
1226 
1227  clear_subscription_skip_lsn(prepare_data.end_lsn);
1228 
1231 }
static void begin_replication_step(void)
Definition: worker.c:533
static void end_replication_step(void)
Definition: worker.c:556
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:4430
Subscription * MySubscription
Definition: worker.c:323
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1518
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 2729 of file worker.c.

2730 {
2731  LogicalRepRelMapEntry *rel;
2732  LogicalRepTupleData oldtup;
2733  LogicalRepRelId relid;
2734  UserContext ucxt;
2735  ApplyExecutionData *edata;
2736  EState *estate;
2737  TupleTableSlot *remoteslot;
2738  MemoryContext oldctx;
2739  bool run_as_owner;
2740 
2741  /*
2742  * Quick return if we are skipping data modification changes or handling
2743  * streamed transactions.
2744  */
2745  if (is_skipping_changes() ||
2747  return;
2748 
2750 
2751  relid = logicalrep_read_delete(s, &oldtup);
2752  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2753  if (!should_apply_changes_for_rel(rel))
2754  {
2755  /*
2756  * The relation can't become interesting in the middle of the
2757  * transaction so it's safe to unlock it.
2758  */
2761  return;
2762  }
2763 
2764  /* Set relation for error callback */
2766 
2767  /* Check if we can do the delete. */
2769 
2770  /*
2771  * Make sure that any user-supplied code runs as the table owner, unless
2772  * the user has opted out of that behavior.
2773  */
2774  run_as_owner = MySubscription->runasowner;
2775  if (!run_as_owner)
2776  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2777 
2778  /* Initialize the executor state. */
2779  edata = create_edata_for_relation(rel);
2780  estate = edata->estate;
2781  remoteslot = ExecInitExtraTupleSlot(estate,
2782  RelationGetDescr(rel->localrel),
2783  &TTSOpsVirtual);
2784 
2785  /* Build the search tuple. */
2787  slot_store_data(remoteslot, rel, &oldtup);
2788  MemoryContextSwitchTo(oldctx);
2789 
2790  /* For a partitioned table, apply delete to correct partition. */
2791  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2793  remoteslot, NULL, CMD_DELETE);
2794  else
2796  remoteslot, rel->localindexoid);
2797 
2798  finish_edata(edata);
2799 
2800  /* Reset relation for error callback */
2802 
2803  if (!run_as_owner)
2804  RestoreUserContext(&ucxt);
2805 
2807 
2809 }
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2504
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:677
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:493
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:584
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2918
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:822
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:734
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2817
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1830
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:554
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:101
@ CMD_DELETE
Definition: nodes.h:258
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
#define RelationGetDescr(relation)
Definition: rel.h:530
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:328
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:474
ResultRelInfo * targetRelInfo
Definition: worker.c:236
EState * estate
Definition: worker.c:233
Form_pg_class rd_rel
Definition: rel.h:111
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 2817 of file worker.c.

2821 {
2822  EState *estate = edata->estate;
2823  Relation localrel = relinfo->ri_RelationDesc;
2824  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2825  EPQState epqstate;
2826  TupleTableSlot *localslot;
2827  bool found;
2828 
2829  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2830  ExecOpenIndices(relinfo, false);
2831 
2832  found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2833  remoteslot, &localslot);
2834 
2835  /* If found delete it. */
2836  if (found)
2837  {
2838  EvalPlanQualSetSlot(&epqstate, localslot);
2839 
2840  /* Do the actual delete. */
2842  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2843  }
2844  else
2845  {
2846  /*
2847  * The tuple to be deleted could not be found. Do nothing except for
2848  * emitting a log message.
2849  *
2850  * XXX should this be promoted to ereport(LOG) perhaps?
2851  */
2852  elog(DEBUG1,
2853  "logical replication did not find row to be deleted "
2854  "in replication target relation \"%s\"",
2855  RelationGetRelationName(localrel));
2856  }
2857 
2858  /* Cleanup. */
2859  ExecCloseIndices(relinfo);
2860  EvalPlanQualEnd(&epqstate);
2861 }
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2871
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2364
#define DEBUG1
Definition: elog.h:30
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2563
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3006
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:243
#define ACL_DELETE
Definition: parsenodes.h:79
#define NIL
Definition: pg_list.h:68
#define RelationGetRelationName(relation)
Definition: rel.h:538
LogicalRepRelMapEntry * targetRel
Definition: worker.c:235
Relation ri_RelationDesc
Definition: execnodes.h:456

References ACL_DELETE, DEBUG1, elog(), ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2396 of file worker.c.

2397 {
2398  LogicalRepRelMapEntry *rel;
2399  LogicalRepTupleData newtup;
2400  LogicalRepRelId relid;
2401  UserContext ucxt;
2402  ApplyExecutionData *edata;
2403  EState *estate;
2404  TupleTableSlot *remoteslot;
2405  MemoryContext oldctx;
2406  bool run_as_owner;
2407 
2408  /*
2409  * Quick return if we are skipping data modification changes or handling
2410  * streamed transactions.
2411  */
2412  if (is_skipping_changes() ||
2414  return;
2415 
2417 
2418  relid = logicalrep_read_insert(s, &newtup);
2419  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2420  if (!should_apply_changes_for_rel(rel))
2421  {
2422  /*
2423  * The relation can't become interesting in the middle of the
2424  * transaction so it's safe to unlock it.
2425  */
2428  return;
2429  }
2430 
2431  /*
2432  * Make sure that any user-supplied code runs as the table owner, unless
2433  * the user has opted out of that behavior.
2434  */
2435  run_as_owner = MySubscription->runasowner;
2436  if (!run_as_owner)
2437  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2438 
2439  /* Set relation for error callback */
2441 
2442  /* Initialize the executor state. */
2443  edata = create_edata_for_relation(rel);
2444  estate = edata->estate;
2445  remoteslot = ExecInitExtraTupleSlot(estate,
2446  RelationGetDescr(rel->localrel),
2447  &TTSOpsVirtual);
2448 
2449  /* Process and store remote tuple in the slot */
2451  slot_store_data(remoteslot, rel, &newtup);
2452  slot_fill_defaults(rel, estate, remoteslot);
2453  MemoryContextSwitchTo(oldctx);
2454 
2455  /* For a partitioned table, insert the tuple into a partition. */
2456  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2458  remoteslot, NULL, CMD_INSERT);
2459  else
2461  remoteslot);
2462 
2463  finish_edata(edata);
2464 
2465  /* Reset relation for error callback */
2467 
2468  if (!run_as_owner)
2469  RestoreUserContext(&ucxt);
2470 
2472 
2474 }
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2482
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:765
@ CMD_INSERT
Definition: nodes.h:257
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2482 of file worker.c.

2485 {
2486  EState *estate = edata->estate;
2487 
2488  /* We must open indexes here. */
2489  ExecOpenIndices(relinfo, false);
2490 
2491  /* Do the insert. */
2493  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2494 
2495  /* Cleanup. */
2496  ExecCloseIndices(relinfo);
2497 }
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:76

References ACL_INSERT, ApplyExecutionData::estate, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationInsert(), ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1433 of file worker.c.

1434 {
1435  /*
1436  * ORIGIN message can only come inside streaming transaction or inside
1437  * remote transaction and before any actual writes.
1438  */
1439  if (!in_streamed_transaction &&
1442  ereport(ERROR,
1443  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1444  errmsg_internal("ORIGIN message sent out of order")));
1445 }
static bool in_streamed_transaction
Definition: worker.c:332

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1133 of file worker.c.

1134 {
1135  LogicalRepPreparedTxnData prepare_data;
1136 
1137  logicalrep_read_prepare(s, &prepare_data);
1138 
1139  if (prepare_data.prepare_lsn != remote_final_lsn)
1140  ereport(ERROR,
1141  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1142  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1143  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1145 
1146  /*
1147  * Unlike commit, here, we always prepare the transaction even though no
1148  * change has happened in this transaction or all changes are skipped. It
1149  * is done this way because at commit prepared time, we won't know whether
1150  * we have skipped preparing a transaction because of those reasons.
1151  *
1152  * XXX, We can optimize such that at commit prepared time, we first check
1153  * whether we have prepared the transaction or not but that doesn't seem
1154  * worthwhile because such cases shouldn't be common.
1155  */
1157 
1158  apply_handle_prepare_internal(&prepare_data);
1159 
1162  pgstat_report_stat(false);
1163 
1165 
1166  in_remote_transaction = false;
1167 
1168  /* Process any tables that are being synchronized in parallel. */
1169  process_syncing_tables(prepare_data.end_lsn);
1170 
1171  /*
1172  * Since we have already prepared the transaction, in a case where the
1173  * server crashes before clearing the subskiplsn, it will be left but the
1174  * transaction won't be resent. But that's okay because it's a rare case
1175  * and the subskiplsn will be cleared when finishing the next transaction.
1176  */
1179 
1182 }
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1096
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1096 of file worker.c.

1097 {
1098  char gid[GIDSIZE];
1099 
1100  /*
1101  * Compute unique GID for two_phase transactions. We don't use GID of
1102  * prepared transaction sent by server as that can lead to deadlock when
1103  * we have multiple subscriptions from same node point to publications on
1104  * the same node. See comments atop worker.c
1105  */
1106  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1107  gid, sizeof(gid));
1108 
1109  /*
1110  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1111  * called within the PrepareTransactionBlock below.
1112  */
1113  if (!IsTransactionBlock())
1114  {
1116  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1117  }
1118 
1119  /*
1120  * Update origin state so we can restart streaming from correct position
1121  * in case of crash.
1122  */
1123  replorigin_session_origin_lsn = prepare_data->end_lsn;
1125 
1127 }
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3854
void BeginTransactionBlock(void)
Definition: xact.c:3786

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2326 of file worker.c.

2327 {
2328  LogicalRepRelation *rel;
2329 
2331  return;
2332 
2333  rel = logicalrep_read_rel(s);
2335 
2336  /* Also reset all entries in the partition map that refer to remoterel. */
2338 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:541
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:165

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1243 of file worker.c.

1244 {
1245  LogicalRepRollbackPreparedTxnData rollback_data;
1246  char gid[GIDSIZE];
1247 
1248  logicalrep_read_rollback_prepared(s, &rollback_data);
1249  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1250 
1251  /* Compute GID for two_phase transactions. */
1252  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1253  gid, sizeof(gid));
1254 
1255  /*
1256  * It is possible that we haven't received prepare because it occurred
1257  * before walsender reached a consistent point or the two_phase was still
1258  * not enabled by that time, so in such cases, we need to skip rollback
1259  * prepared.
1260  */
1261  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1262  rollback_data.prepare_time))
1263  {
1264  /*
1265  * Update origin state so we can restart streaming from correct
1266  * position in case of crash.
1267  */
1270 
1271  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1273  FinishPreparedTransaction(gid, false);
1276 
1278  }
1279 
1280  pgstat_report_stat(false);
1281 
1283  in_remote_transaction = false;
1284 
1285  /* Process any tables that are being synchronized in parallel. */
1287 
1290 }
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2649

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 1837 of file worker.c.

1838 {
1839  TransactionId xid;
1840  TransactionId subxid;
1841  LogicalRepStreamAbortData abort_data;
1842  ParallelApplyWorkerInfo *winfo;
1843  TransApplyAction apply_action;
1844 
1845  /* Save the message before it is consumed. */
1846  StringInfoData original_msg = *s;
1847  bool toplevel_xact;
1848 
1850  ereport(ERROR,
1851  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1852  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1853 
1854  /* We receive abort information only when we can apply in parallel. */
1855  logicalrep_read_stream_abort(s, &abort_data,
1857 
1858  xid = abort_data.xid;
1859  subxid = abort_data.subxid;
1860  toplevel_xact = (xid == subxid);
1861 
1862  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1863 
1864  apply_action = get_transaction_apply_action(xid, &winfo);
1865 
1866  switch (apply_action)
1867  {
1868  case TRANS_LEADER_APPLY:
1869 
1870  /*
1871  * We are in the leader apply worker and the transaction has been
1872  * serialized to file.
1873  */
1874  stream_abort_internal(xid, subxid);
1875 
1876  elog(DEBUG1, "finished processing the STREAM ABORT command");
1877  break;
1878 
1880  Assert(winfo);
1881 
1882  /*
1883  * For the case of aborting the subtransaction, we increment the
1884  * number of streaming blocks and take the lock again before
1885  * sending the STREAM_ABORT to ensure that the parallel apply
1886  * worker will wait on the lock for the next set of changes after
1887  * processing the STREAM_ABORT message if it is not already
1888  * waiting for STREAM_STOP message.
1889  *
1890  * It is important to perform this locking before sending the
1891  * STREAM_ABORT message so that the leader can hold the lock first
1892  * and the parallel apply worker will wait for the leader to
1893  * release the lock. This is the same as what we do in
1894  * apply_handle_stream_stop. See Locking Considerations atop
1895  * applyparallelworker.c.
1896  */
1897  if (!toplevel_xact)
1898  {
1902  }
1903 
1904  if (pa_send_data(winfo, s->len, s->data))
1905  {
1906  /*
1907  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1908  * wait here for the parallel apply worker to finish as that
1909  * is not required to maintain the commit order and won't have
1910  * the risk of failures due to transaction dependencies and
1911  * deadlocks. However, it is possible that before the parallel
1912  * worker finishes and we clear the worker info, the xid
1913  * wraparound happens on the upstream and a new transaction
1914  * with the same xid can appear and that can lead to duplicate
1915  * entries in ParallelApplyTxnHash. Yet another problem could
1916  * be that we may have serialized the changes in partial
1917  * serialize mode and the file containing xact changes may
1918  * already exist, and after xid wraparound trying to create
1919  * the file for the same xid can lead to an error. To avoid
1920  * these problems, we decide to wait for the aborts to finish.
1921  *
1922  * Note, it is okay to not update the flush location position
1923  * for aborts as in worst case that means such a transaction
1924  * won't be sent again after restart.
1925  */
1926  if (toplevel_xact)
1928 
1929  break;
1930  }
1931 
1932  /*
1933  * Switch to serialize mode when we are not able to send the
1934  * change to parallel apply worker.
1935  */
1936  pa_switch_to_partial_serialize(winfo, true);
1937 
1938  /* fall through */
1940  Assert(winfo);
1941 
1942  /*
1943  * Parallel apply worker might have applied some changes, so write
1944  * the STREAM_ABORT message so that it can rollback the
1945  * subtransaction if needed.
1946  */
1948  &original_msg);
1949 
1950  if (toplevel_xact)
1951  {
1954  }
1955  break;
1956 
1957  case TRANS_PARALLEL_APPLY:
1958 
1959  /*
1960  * If the parallel apply worker is applying spooled messages then
1961  * close the file before aborting.
1962  */
1963  if (toplevel_xact && stream_fd)
1965 
1966  pa_stream_abort(&abort_data);
1967 
1968  /*
1969  * We need to wait after processing rollback to savepoint for the
1970  * next set of changes.
1971  *
1972  * We have a race condition here due to which we can start waiting
1973  * here when there are more chunk of streams in the queue. See
1974  * apply_handle_stream_stop.
1975  */
1976  if (!toplevel_xact)
1978 
1979  elog(DEBUG1, "finished processing the STREAM ABORT command");
1980  break;
1981 
1982  default:
1983  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1984  break;
1985  }
1986 
1988 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:381
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5075
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4347
static BufFile * stream_fd
Definition: worker.c:364
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1754
static void stream_close_file(void)
Definition: worker.c:4299
uint32 TransactionId
Definition: c.h:641
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
#define AccessExclusiveLock
Definition: lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1192
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepStreamAbortData::abort_lsn, AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), LogicalRepStreamAbortData::subxid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and LogicalRepStreamAbortData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2156 of file worker.c.

2157 {
2158  TransactionId xid;
2159  LogicalRepCommitData commit_data;
2160  ParallelApplyWorkerInfo *winfo;
2161  TransApplyAction apply_action;
2162 
2163  /* Save the message before it is consumed. */
2164  StringInfoData original_msg = *s;
2165 
2167  ereport(ERROR,
2168  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2169  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2170 
2171  xid = logicalrep_read_stream_commit(s, &commit_data);
2172  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2173 
2174  apply_action = get_transaction_apply_action(xid, &winfo);
2175 
2176  switch (apply_action)
2177  {
2178  case TRANS_LEADER_APPLY:
2179 
2180  /*
2181  * The transaction has been serialized to file, so replay all the
2182  * spooled operations.
2183  */
2185  commit_data.commit_lsn);
2186 
2187  apply_handle_commit_internal(&commit_data);
2188 
2189  /* Unlink the files with serialized changes and subxact info. */
2191 
2192  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2193  break;
2194 
2196  Assert(winfo);
2197 
2198  if (pa_send_data(winfo, s->len, s->data))
2199  {
2200  /* Finish processing the streaming transaction. */
2201  pa_xact_finish(winfo, commit_data.end_lsn);
2202  break;
2203  }
2204 
2205  /*
2206  * Switch to serialize mode when we are not able to send the
2207  * change to parallel apply worker.
2208  */
2209  pa_switch_to_partial_serialize(winfo, true);
2210 
2211  /* fall through */
2213  Assert(winfo);
2214 
2216  &original_msg);
2217 
2219 
2220  /* Finish processing the streaming transaction. */
2221  pa_xact_finish(winfo, commit_data.end_lsn);
2222  break;
2223 
2224  case TRANS_PARALLEL_APPLY:
2225 
2226  /*
2227  * If the parallel apply worker is applying spooled messages then
2228  * close the file before committing.
2229  */
2230  if (stream_fd)
2232 
2233  apply_handle_commit_internal(&commit_data);
2234 
2236 
2237  /*
2238  * It is important to set the transaction state as finished before
2239  * releasing the lock. See pa_wait_for_xact_finish.
2240  */
2243 
2245 
2246  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2247  break;
2248 
2249  default:
2250  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2251  break;
2252  }
2253 
2254  /* Process any tables that are being synchronized in parallel. */
2255  process_syncing_tables(commit_data.end_lsn);
2256 
2258 
2260 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4230
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2026
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1137
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert(), LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog(), LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1296 of file worker.c.

1297 {
1298  LogicalRepPreparedTxnData prepare_data;
1299  ParallelApplyWorkerInfo *winfo;
1300  TransApplyAction apply_action;
1301 
1302  /* Save the message before it is consumed. */
1303  StringInfoData original_msg = *s;
1304 
1306  ereport(ERROR,
1307  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1308  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1309 
1310  /* Tablesync should never receive prepare. */
1311  if (am_tablesync_worker())
1312  ereport(ERROR,
1313  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1314  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1315 
1316  logicalrep_read_stream_prepare(s, &prepare_data);
1317  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1318 
1319  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1320 
1321  switch (apply_action)
1322  {
1323  case TRANS_LEADER_APPLY:
1324 
1325  /*
1326  * The transaction has been serialized to file, so replay all the
1327  * spooled operations.
1328  */
1330  prepare_data.xid, prepare_data.prepare_lsn);
1331 
1332  /* Mark the transaction as prepared. */
1333  apply_handle_prepare_internal(&prepare_data);
1334 
1336 
1338 
1339  in_remote_transaction = false;
1340 
1341  /* Unlink the files with serialized changes and subxact info. */
1343 
1344  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1345  break;
1346 
1348  Assert(winfo);
1349 
1350  if (pa_send_data(winfo, s->len, s->data))
1351  {
1352  /* Finish processing the streaming transaction. */
1353  pa_xact_finish(winfo, prepare_data.end_lsn);
1354  break;
1355  }
1356 
1357  /*
1358  * Switch to serialize mode when we are not able to send the
1359  * change to parallel apply worker.
1360  */
1361  pa_switch_to_partial_serialize(winfo, true);
1362 
1363  /* fall through */
1365  Assert(winfo);
1366 
1367  stream_open_and_write_change(prepare_data.xid,
1369  &original_msg);
1370 
1372 
1373  /* Finish processing the streaming transaction. */
1374  pa_xact_finish(winfo, prepare_data.end_lsn);
1375  break;
1376 
1377  case TRANS_PARALLEL_APPLY:
1378 
1379  /*
1380  * If the parallel apply worker is applying spooled messages then
1381  * close the file before preparing.
1382  */
1383  if (stream_fd)
1385 
1387 
1388  /* Mark the transaction as prepared. */
1389  apply_handle_prepare_internal(&prepare_data);
1390 
1392 
1394 
1396 
1399 
1401 
1402  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1403  break;
1404 
1405  default:
1406  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1407  break;
1408  }
1409 
1410  pgstat_report_stat(false);
1411 
1412  /* Process any tables that are being synchronized in parallel. */
1413  process_syncing_tables(prepare_data.end_lsn);
1414 
1415  /*
1416  * Similar to prepare case, the subskiplsn could be left in a case of
1417  * server crash but it's okay. See the comments in apply_handle_prepare().
1418  */
1421 
1423 
1425 }
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, XactLastCommitEnd, LogicalRepPreparedTxnData::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1492 of file worker.c.

1493 {
1494  bool first_segment;
1495  ParallelApplyWorkerInfo *winfo;
1496  TransApplyAction apply_action;
1497 
1498  /* Save the message before it is consumed. */
1499  StringInfoData original_msg = *s;
1500 
1502  ereport(ERROR,
1503  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1504  errmsg_internal("duplicate STREAM START message")));
1505 
1506  /* There must not be an active streaming transaction. */
1508 
1509  /* notify handle methods we're processing a remote transaction */
1510  in_streamed_transaction = true;
1511 
1512  /* extract XID of the top-level transaction */
1513  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1514 
1516  ereport(ERROR,
1517  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1518  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1519 
1521 
1522  /* Try to allocate a worker for the streaming transaction. */
1523  if (first_segment)
1525 
1526  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1527 
1528  switch (apply_action)
1529  {
1531 
1532  /*
1533  * Function stream_start_internal starts a transaction. This
1534  * transaction will be committed on the stream stop unless it is a
1535  * tablesync worker in which case it will be committed after
1536  * processing all the messages. We need this transaction for
1537  * handling the BufFile, used for serializing the streaming data
1538  * and subxact info.
1539  */
1540  stream_start_internal(stream_xid, first_segment);
1541  break;
1542 
1544  Assert(winfo);
1545 
1546  /*
1547  * Once we start serializing the changes, the parallel apply
1548  * worker will wait for the leader to release the stream lock
1549  * until the end of the transaction. So, we don't need to release
1550  * the lock or increment the stream count in that case.
1551  */
1552  if (pa_send_data(winfo, s->len, s->data))
1553  {
1554  /*
1555  * Unlock the shared object lock so that the parallel apply
1556  * worker can continue to receive changes.
1557  */
1558  if (!first_segment)
1560 
1561  /*
1562  * Increment the number of streaming blocks waiting to be
1563  * processed by parallel apply worker.
1564  */
1566 
1567  /* Cache the parallel apply worker for this transaction. */
1569  break;
1570  }
1571 
1572  /*
1573  * Switch to serialize mode when we are not able to send the
1574  * change to parallel apply worker.
1575  */
1576  pa_switch_to_partial_serialize(winfo, !first_segment);
1577 
1578  /* fall through */
1580  Assert(winfo);
1581 
1582  /*
1583  * Open the spool file unless it was already opened when switching
1584  * to serialize mode. The transaction started in
1585  * stream_start_internal will be committed on the stream stop.
1586  */
1587  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1588  stream_start_internal(stream_xid, first_segment);
1589 
1591 
1592  /* Cache the parallel apply worker for this transaction. */
1594  break;
1595 
1596  case TRANS_PARALLEL_APPLY:
1597  if (first_segment)
1598  {
1599  /* Hold the lock until the end of the transaction. */
1602 
1603  /*
1604  * Signal the leader apply worker, as it may be waiting for
1605  * us.
1606  */
1608  }
1609 
1611  break;
1612 
1613  default:
1614  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1615  break;
1616  }
1617 
1619 }
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition: worker.c:340
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4317
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1454
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:682
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1087
@ PARALLEL_TRANS_STARTED

References AccessExclusiveLock, Assert(), StringInfoData::data, elog(), ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1651 of file worker.c.

1652 {
1653  ParallelApplyWorkerInfo *winfo;
1654  TransApplyAction apply_action;
1655 
1657  ereport(ERROR,
1658  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1659  errmsg_internal("STREAM STOP message without STREAM START")));
1660 
1661  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1662 
1663  switch (apply_action)
1664  {
1667  break;
1668 
1670  Assert(winfo);
1671 
1672  /*
1673  * Lock before sending the STREAM_STOP message so that the leader
1674  * can hold the lock first and the parallel apply worker will wait
1675  * for leader to release the lock. See Locking Considerations atop
1676  * applyparallelworker.c.
1677  */
1679 
1680  if (pa_send_data(winfo, s->len, s->data))
1681  {
1683  break;
1684  }
1685 
1686  /*
1687  * Switch to serialize mode when we are not able to send the
1688  * change to parallel apply worker.
1689  */
1690  pa_switch_to_partial_serialize(winfo, true);
1691 
1692  /* fall through */
1697  break;
1698 
1699  case TRANS_PARALLEL_APPLY:
1700  elog(DEBUG1, "applied %u changes in the streaming chunk",
1702 
1703  /*
1704  * By the time parallel apply worker is processing the changes in
1705  * the current streaming block, the leader apply worker may have
1706  * sent multiple streaming blocks. This can lead to parallel apply
1707  * worker start waiting even when there are more chunk of streams
1708  * in the queue. So, try to lock only if there is no message left
1709  * in the queue. See Locking Considerations atop
1710  * applyparallelworker.c.
1711  *
1712  * Note that here we have a race condition where we can start
1713  * waiting even when there are pending streaming chunks. This can
1714  * happen if the leader sends another streaming block and acquires
1715  * the stream lock again after the parallel apply worker checks
1716  * that there is no pending streaming block and before it actually
1717  * starts waiting on a lock. We can handle this case by not
1718  * allowing the leader to increment the stream block count during
1719  * the time parallel apply worker acquires the lock but it is not
1720  * clear whether that is worth the complexity.
1721  *
1722  * Now, if this missed chunk contains rollback to savepoint, then
1723  * there is a risk of deadlock which probably shouldn't happen
1724  * after restart.
1725  */
1727  break;
1728 
1729  default:
1730  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1731  break;
1732  }
1733 
1734  in_streamed_transaction = false;
1736 
1737  /*
1738  * The parallel apply worker could be in a transaction in which case we
1739  * need to report the state as STATE_IDLEINTRANSACTION.
1740  */
1743  else
1745 
1747 }
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1628
@ STATE_IDLEINTRANSACTION
#define InvalidTransactionId
Definition: transam.h:31
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4850

References AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog(), ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3169 of file worker.c.

3170 {
3171  bool cascade = false;
3172  bool restart_seqs = false;
3173  List *remote_relids = NIL;
3174  List *remote_rels = NIL;
3175  List *rels = NIL;
3176  List *part_rels = NIL;
3177  List *relids = NIL;
3178  List *relids_logged = NIL;
3179  ListCell *lc;
3180  LOCKMODE lockmode = AccessExclusiveLock;
3181 
3182  /*
3183  * Quick return if we are skipping data modification changes or handling
3184  * streamed transactions.
3185  */
3186  if (is_skipping_changes() ||
3188  return;
3189 
3191 
3192  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3193 
3194  foreach(lc, remote_relids)
3195  {
3196  LogicalRepRelId relid = lfirst_oid(lc);
3197  LogicalRepRelMapEntry *rel;
3198 
3199  rel = logicalrep_rel_open(relid, lockmode);
3200  if (!should_apply_changes_for_rel(rel))
3201  {
3202  /*
3203  * The relation can't become interesting in the middle of the
3204  * transaction so it's safe to unlock it.
3205  */
3206  logicalrep_rel_close(rel, lockmode);
3207  continue;
3208  }
3209 
3210  remote_rels = lappend(remote_rels, rel);
3212  rels = lappend(rels, rel->localrel);
3213  relids = lappend_oid(relids, rel->localreloid);
3215  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3216 
3217  /*
3218  * Truncate partitions if we got a message to truncate a partitioned
3219  * table.
3220  */
3221  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3222  {
3223  ListCell *child;
3224  List *children = find_all_inheritors(rel->localreloid,
3225  lockmode,
3226  NULL);
3227 
3228  foreach(child, children)
3229  {
3230  Oid childrelid = lfirst_oid(child);
3231  Relation childrel;
3232 
3233  if (list_member_oid(relids, childrelid))
3234  continue;
3235 
3236  /* find_all_inheritors already got lock */
3237  childrel = table_open(childrelid, NoLock);
3238 
3239  /*
3240  * Ignore temp tables of other backends. See similar code in
3241  * ExecuteTruncate().
3242  */
3243  if (RELATION_IS_OTHER_TEMP(childrel))
3244  {
3245  table_close(childrel, lockmode);
3246  continue;
3247  }
3248 
3250  rels = lappend(rels, childrel);
3251  part_rels = lappend(part_rels, childrel);
3252  relids = lappend_oid(relids, childrelid);
3253  /* Log this relation only if needed for logical decoding */
3254  if (RelationIsLogicallyLogged(childrel))
3255  relids_logged = lappend_oid(relids_logged, childrelid);
3256  }
3257  }
3258  }
3259 
3260  /*
3261  * Even if we used CASCADE on the upstream primary we explicitly default
3262  * to replaying changes without further cascading. This might be later
3263  * changeable with a user specified option.
3264  *
3265  * MySubscription->runasowner tells us whether we want to execute
3266  * replication actions as the subscription owner; the last argument to
3267  * TruncateGuts tells it whether we want to switch to the table owner.
3268  * Those are exactly opposite conditions.
3269  */
3270  ExecuteTruncateGuts(rels,
3271  relids,
3272  relids_logged,
3273  DROP_RESTRICT,
3274  restart_seqs,
3276  foreach(lc, remote_rels)
3277  {
3278  LogicalRepRelMapEntry *rel = lfirst(lc);
3279 
3281  }
3282  foreach(lc, part_rels)
3283  {
3284  Relation rel = lfirst(lc);
3285 
3286  table_close(rel, NoLock);
3287  }
3288 
3290 }
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
int LOCKMODE
Definition: lockdefs.h:26
@ DROP_RESTRICT
Definition: parsenodes.h:2169
#define ACL_TRUNCATE
Definition: parsenodes.h:80
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:31
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:700
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:657
Definition: pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1920

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), MySubscription, NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, Subscription::runasowner, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 2918 of file worker.c.

2922 {
2923  EState *estate = edata->estate;
2924  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2925  ResultRelInfo *relinfo = edata->targetRelInfo;
2926  Relation parentrel = relinfo->ri_RelationDesc;
2927  ModifyTableState *mtstate;
2928  PartitionTupleRouting *proute;
2929  ResultRelInfo *partrelinfo;
2930  Relation partrel;
2931  TupleTableSlot *remoteslot_part;
2932  TupleConversionMap *map;
2933  MemoryContext oldctx;
2934  LogicalRepRelMapEntry *part_entry = NULL;
2935  AttrMap *attrmap = NULL;
2936 
2937  /* ModifyTableState is needed for ExecFindPartition(). */
2938  edata->mtstate = mtstate = makeNode(ModifyTableState);
2939  mtstate->ps.plan = NULL;
2940  mtstate->ps.state = estate;
2941  mtstate->operation = operation;
2942  mtstate->resultRelInfo = relinfo;
2943 
2944  /* ... as is PartitionTupleRouting. */
2945  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2946 
2947  /*
2948  * Find the partition to which the "search tuple" belongs.
2949  */
2950  Assert(remoteslot != NULL);
2952  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2953  remoteslot, estate);
2954  Assert(partrelinfo != NULL);
2955  partrel = partrelinfo->ri_RelationDesc;
2956 
2957  /*
2958  * Check for supported relkind. We need this since partitions might be of
2959  * unsupported relkinds; and the set of partitions can change, so checking
2960  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2961  */
2962  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2964  RelationGetRelationName(partrel));
2965 
2966  /*
2967  * To perform any of the operations below, the tuple must match the
2968  * partition's rowtype. Convert if needed or just copy, using a dedicated
2969  * slot to store the tuple in any case.
2970  */
2971  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2972  if (remoteslot_part == NULL)
2973  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2974  map = ExecGetRootToChildMap(partrelinfo, estate);
2975  if (map != NULL)
2976  {
2977  attrmap = map->attrMap;
2978  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2979  remoteslot_part);
2980  }
2981  else
2982  {
2983  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2984  slot_getallattrs(remoteslot_part);
2985  }
2986  MemoryContextSwitchTo(oldctx);
2987 
2988  /* Check if we can do the update or delete on the leaf partition. */
2989  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2990  {
2991  part_entry = logicalrep_partition_open(relmapentry, partrel,
2992  attrmap);
2993  check_relation_updatable(part_entry);
2994  }
2995 
2996  switch (operation)
2997  {
2998  case CMD_INSERT:
2999  apply_handle_insert_internal(edata, partrelinfo,
3000  remoteslot_part);
3001  break;
3002 
3003  case CMD_DELETE:
3004  apply_handle_delete_internal(edata, partrelinfo,
3005  remoteslot_part,
3006  part_entry->localindexoid);
3007  break;
3008 
3009  case CMD_UPDATE:
3010 
3011  /*
3012  * For UPDATE, depending on whether or not the updated tuple
3013  * satisfies the partition's constraint, perform a simple UPDATE
3014  * of the partition or move the updated tuple into a different
3015  * suitable partition.
3016  */
3017  {
3018  TupleTableSlot *localslot;
3019  ResultRelInfo *partrelinfo_new;
3020  Relation partrel_new;
3021  bool found;
3022 
3023  /* Get the matching local tuple from the partition. */
3024  found = FindReplTupleInLocalRel(edata, partrel,
3025  &part_entry->remoterel,
3026  part_entry->localindexoid,
3027  remoteslot_part, &localslot);
3028  if (!found)
3029  {
3030  /*
3031  * The tuple to be updated could not be found. Do nothing
3032  * except for emitting a log message.
3033  *
3034  * XXX should this be promoted to ereport(LOG) perhaps?
3035  */
3036  elog(DEBUG1,
3037  "logical replication did not find row to be updated "
3038  "in replication target relation's partition \"%s\"",
3039  RelationGetRelationName(partrel));
3040  return;
3041  }
3042 
3043  /*
3044  * Apply the update to the local tuple, putting the result in
3045  * remoteslot_part.
3046  */
3048  slot_modify_data(remoteslot_part, localslot, part_entry,
3049  newtup);
3050  MemoryContextSwitchTo(oldctx);
3051 
3052  /*
3053  * Does the updated tuple still satisfy the current
3054  * partition's constraint?
3055  */
3056  if (!partrel->rd_rel->relispartition ||
3057  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3058  false))
3059  {
3060  /*
3061  * Yes, so simply UPDATE the partition. We don't call
3062  * apply_handle_update_internal() here, which would
3063  * normally do the following work, to avoid repeating some
3064  * work already done above to find the local tuple in the
3065  * partition.
3066  */
3067  EPQState epqstate;
3068 
3069  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3070  ExecOpenIndices(partrelinfo, false);
3071 
3072  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3074  ACL_UPDATE);
3075  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3076  localslot, remoteslot_part);
3077  ExecCloseIndices(partrelinfo);
3078  EvalPlanQualEnd(&epqstate);
3079  }
3080  else
3081  {
3082  /* Move the tuple into the new partition. */
3083 
3084  /*
3085  * New partition will be found using tuple routing, which
3086  * can only occur via the parent table. We might need to
3087  * convert the tuple to the parent's rowtype. Note that
3088  * this is the tuple found in the partition, not the
3089  * original search tuple received by this function.
3090  */
3091  if (map)
3092  {
3093  TupleConversionMap *PartitionToRootMap =
3095  RelationGetDescr(parentrel));
3096 
3097  remoteslot =
3098  execute_attr_map_slot(PartitionToRootMap->attrMap,
3099  remoteslot_part, remoteslot);
3100  }
3101  else
3102  {
3103  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3104  slot_getallattrs(remoteslot);
3105  }
3106 
3107  /* Find the new partition. */
3109  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3110  proute, remoteslot,
3111  estate);
3112  MemoryContextSwitchTo(oldctx);
3113  Assert(partrelinfo_new != partrelinfo);
3114  partrel_new = partrelinfo_new->ri_RelationDesc;
3115 
3116  /* Check that new partition also has supported relkind. */
3117  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3119  RelationGetRelationName(partrel_new));
3120 
3121  /* DELETE old tuple found in the old partition. */
3122  apply_handle_delete_internal(edata, partrelinfo,
3123  localslot,
3124  part_entry->localindexoid);
3125 
3126  /* INSERT new tuple into the new partition. */
3127 
3128  /*
3129  * Convert the replacement tuple to match the destination
3130  * partition rowtype.
3131  */
3133  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3134  if (remoteslot_part == NULL)
3135  remoteslot_part = table_slot_create(partrel_new,
3136  &estate->es_tupleTable);
3137  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3138  if (map != NULL)
3139  {
3140  remoteslot_part = execute_attr_map_slot(map->attrMap,
3141  remoteslot,
3142  remoteslot_part);
3143  }
3144  else
3145  {
3146  remoteslot_part = ExecCopySlot(remoteslot_part,
3147  remoteslot);
3148  slot_getallattrs(remoteslot);
3149  }
3150  MemoryContextSwitchTo(oldctx);
3151  apply_handle_insert_internal(edata, partrelinfo_new,
3152  remoteslot_part);
3153  }
3154  }
3155  break;
3156 
3157  default:
3158  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3159  break;
3160  }
3161 }
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:923
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1816
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1237
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3321
@ CMD_UPDATE
Definition: nodes.h:256
#define makeNode(_type_)
Definition: nodes.h:155
#define ACL_UPDATE
Definition: parsenodes.h:78
#define RelationGetNamespace(relation)
Definition: rel.h:545
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:603
PartitionTupleRouting * proute
Definition: worker.c:240
ModifyTableState * mtstate
Definition: worker.c:239
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:667
CmdType operation
Definition: execnodes.h:1282
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1286
PlanState ps
Definition: execnodes.h:1281
Plan * plan
Definition: execnodes.h:1043
EState * state
Definition: execnodes.h:1045
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:581
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:488
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:362

References ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert(), TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog(), ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, LogicalRepRelMapEntry::localindexoid, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, slot_getallattrs(), slot_modify_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2349 of file worker.c.

2350 {
2351  LogicalRepTyp typ;
2352 
2354  return;
2355 
2356  logicalrep_read_typ(s, &typ);
2357 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2545 of file worker.c.

2546 {
2547  LogicalRepRelMapEntry *rel;
2548  LogicalRepRelId relid;
2549  UserContext ucxt;
2550  ApplyExecutionData *edata;
2551  EState *estate;
2552  LogicalRepTupleData oldtup;
2553  LogicalRepTupleData newtup;
2554  bool has_oldtup;
2555  TupleTableSlot *remoteslot;
2556  RTEPermissionInfo *target_perminfo;
2557  MemoryContext oldctx;
2558  bool run_as_owner;
2559 
2560  /*
2561  * Quick return if we are skipping data modification changes or handling
2562  * streamed transactions.
2563  */
2564  if (is_skipping_changes() ||
2566  return;
2567 
2569 
2570  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2571  &newtup);
2572  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2573  if (!should_apply_changes_for_rel(rel))
2574  {
2575  /*
2576  * The relation can't become interesting in the middle of the
2577  * transaction so it's safe to unlock it.
2578  */
2581  return;
2582  }
2583 
2584  /* Set relation for error callback */
2586 
2587  /* Check if we can do the update. */
2589 
2590  /*
2591  * Make sure that any user-supplied code runs as the table owner, unless
2592  * the user has opted out of that behavior.
2593  */
2594  run_as_owner = MySubscription->runasowner;
2595  if (!run_as_owner)
2596  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2597 
2598  /* Initialize the executor state. */
2599  edata = create_edata_for_relation(rel);
2600  estate = edata->estate;
2601  remoteslot = ExecInitExtraTupleSlot(estate,
2602  RelationGetDescr(rel->localrel),
2603  &TTSOpsVirtual);
2604 
2605  /*
2606  * Populate updatedCols so that per-column triggers can fire, and so
2607  * executor can correctly pass down indexUnchanged hint. This could
2608  * include more columns than were actually changed on the publisher
2609  * because the logical replication protocol doesn't contain that
2610  * information. But it would for example exclude columns that only exist
2611  * on the subscriber, since we are not touching those.
2612  */
2613  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2614  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2615  {
2617  int remoteattnum = rel->attrmap->attnums[i];
2618 
2619  if (!att->attisdropped && remoteattnum >= 0)
2620  {
2621  Assert(remoteattnum < newtup.ncols);
2622  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2623  target_perminfo->updatedCols =
2624  bms_add_member(target_perminfo->updatedCols,
2626  }
2627  }
2628 
2629  /* Build the search tuple. */
2631  slot_store_data(remoteslot, rel,
2632  has_oldtup ? &oldtup : &newtup);
2633  MemoryContextSwitchTo(oldctx);
2634 
2635  /* For a partitioned table, apply update to correct partition. */
2636  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2638  remoteslot, &newtup, CMD_UPDATE);
2639  else
2641  remoteslot, &newtup, rel->localindexoid);
2642 
2643  finish_edata(edata);
2644 
2645  /* Reset relation for error callback */
2647 
2648  if (!run_as_owner)
2649  RestoreUserContext(&ucxt);
2650 
2652 
2654 }
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2662
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:828
int i
Definition: isn.c:73
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
AttrNumber * attnums
Definition: attmap.h:36
List * es_rteperminfos
Definition: execnodes.h:630
Bitmapset * updatedCols
Definition: parsenodes.h:1242
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RTEPermissionInfo::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2662 of file worker.c.

2667 {
2668  EState *estate = edata->estate;
2669  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2670  Relation localrel = relinfo->ri_RelationDesc;
2671  EPQState epqstate;
2672  TupleTableSlot *localslot;
2673  bool found;
2674  MemoryContext oldctx;
2675 
2676  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2677  ExecOpenIndices(relinfo, false);
2678 
2679  found = FindReplTupleInLocalRel(edata, localrel,
2680  &relmapentry->remoterel,
2681  localindexoid,
2682  remoteslot, &localslot);
2683  ExecClearTuple(remoteslot);
2684 
2685  /*
2686  * Tuple found.
2687  *
2688  * Note this will fail if there are other conflicting unique indexes.
2689  */
2690  if (found)
2691  {
2692  /* Process and store remote tuple in the slot */
2694  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2695  MemoryContextSwitchTo(oldctx);
2696 
2697  EvalPlanQualSetSlot(&epqstate, remoteslot);
2698 
2699  /* Do the actual update. */
2701  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2702  remoteslot);
2703  }
2704  else
2705  {
2706  /*
2707  * The tuple to be updated could not be found. Do nothing except for
2708  * emitting a log message.
2709  *
2710  * XXX should this be promoted to ereport(LOG) perhaps?
2711  */
2712  elog(DEBUG1,
2713  "logical replication did not find row to be updated "
2714  "in replication target relation \"%s\"",
2715  RelationGetRelationName(localrel));
2716  }
2717 
2718  /* Cleanup. */
2719  ExecCloseIndices(relinfo);
2720  EvalPlanQualEnd(&epqstate);
2721 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433

References ACL_UPDATE, DEBUG1, elog(), ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, slot_modify_data(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2026 of file worker.c.

2028 {
2029  int nchanges;
2030  char path[MAXPGPATH];
2031  char *buffer = NULL;
2032  MemoryContext oldcxt;
2033  ResourceOwner oldowner;
2034  int fileno;
2035  off_t offset;
2036 
2037  if (!am_parallel_apply_worker())
2039 
2040  /* Make sure we have an open transaction */
2042 
2043  /*
2044  * Allocate file handle and memory required to process all the messages in
2045  * TopTransactionContext to avoid them getting reset after each message is
2046  * processed.
2047  */
2049 
2050  /* Open the spool file for the committed/prepared transaction */
2052  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2053 
2054  /*
2055  * Make sure the file is owned by the toplevel transaction so that the
2056  * file will not be accidentally closed when aborting a subtransaction.
2057  */
2058  oldowner = CurrentResourceOwner;
2060 
2061  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2062 
2063  CurrentResourceOwner = oldowner;
2064 
2065  buffer = palloc(BLCKSZ);
2066 
2067  MemoryContextSwitchTo(oldcxt);
2068 
2069  remote_final_lsn = lsn;
2070 
2071  /*
2072  * Make sure the handle apply_dispatch methods are aware we're in a remote
2073  * transaction.
2074  */
2075  in_remote_transaction = true;
2077 
2079 
2080  /*
2081  * Read the entries one by one and pass them through the same logic as in
2082  * apply_dispatch.
2083  */
2084  nchanges = 0;
2085  while (true)
2086  {
2088  size_t nbytes;
2089  int len;
2090 
2092 
2093  /* read length of the on-disk record */
2094  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2095 
2096  /* have we reached end of the file? */
2097  if (nbytes == 0)
2098  break;
2099 
2100  /* do we have a correct length? */
2101  if (len <= 0)
2102  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2103  len, path);
2104 
2105  /* make sure we have sufficiently large buffer */
2106  buffer = repalloc(buffer, len);
2107 
2108  /* and finally read the data into the buffer */
2109  BufFileReadExact(stream_fd, buffer, len);
2110 
2111  BufFileTell(stream_fd, &fileno, &offset);
2112 
2113  /* init a stringinfo using the buffer and call apply_dispatch */
2114  initReadOnlyStringInfo(&s2, buffer, len);
2115 
2116  /* Ensure we are reading the data into our memory context. */
2118 
2119  apply_dispatch(&s2);
2120 
2122 
2123  MemoryContextSwitchTo(oldcxt);
2124 
2125  nchanges++;
2126 
2127  /*
2128  * It is possible the file has been closed because we have processed
2129  * the transaction end message like stream_commit in which case that
2130  * must be the last message.
2131  */
2132  if (!stream_fd)
2133  {
2134  ensure_last_message(stream_fileset, xid, fileno, offset);
2135  break;
2136  }
2137 
2138  if (nchanges % 1000 == 0)
2139  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2140  nchanges, path);
2141  }
2142 
2143  if (stream_fd)
2145 
2146  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2147  nchanges, path);
2148 
2149  return;
2150 }
MemoryContext ApplyMessageContext
Definition: worker.c:315
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4216
void apply_dispatch(StringInfo s)
Definition: worker.c:3297
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1994
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
MemoryContext TopTransactionContext
Definition: mcxt.c:146
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1451
void * palloc(Size size)
Definition: mcxt.c:1201
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:129
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog(), end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 3856 of file worker.c.

3857 {
3859  {
3860  /*
3861  * Don't stop the parallel apply worker as the leader will detect the
3862  * subscription parameter change and restart logical replication later
3863  * anyway. This also prevents the leader from reporting errors when
3864  * trying to communicate with a stopped parallel apply worker, which
3865  * would accidentally disable subscriptions if disable_on_error was
3866  * set.
3867  */
3868  return;
3869  }
3870 
3871  /*
3872  * Reset the last-start time for this apply worker so that the launcher
3873  * will restart it without waiting for wal_retrieve_retry_interval if the
3874  * subscription is still active, and so that we won't leak that hash table
3875  * entry if it isn't.
3876  */
3877  if (am_leader_apply_worker())
3879 
3880  proc_exit(0);
3881 }
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1081
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), am_parallel_apply_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeLogRepWorker(), and maybe_reread_subscription().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 4710 of file worker.c.

4711 {
4712  int worker_slot = DatumGetInt32(main_arg);
4713 
4714  InitializingApplyWorker = true;
4715 
4716  SetupApplyOrSyncWorker(worker_slot);
4717 
4718  InitializingApplyWorker = false;
4719 
4720  run_apply_worker();
4721 
4722  proc_exit(0);
4723 }
bool InitializingApplyWorker
Definition: worker.c:343
static void run_apply_worker()
Definition: worker.c:4482
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4669
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202

References DatumGetInt32(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 5028 of file worker.c.

5029 {
5030  if (isCommit && on_commit_wakeup_workers_subids != NIL)
5031  {
5032  ListCell *lc;
5033 
5034  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5035  foreach(lc, on_commit_wakeup_workers_subids)
5036  {
5037  Oid subid = lfirst_oid(lc);
5038  List *workers;
5039  ListCell *lc2;
5040 
5041  workers = logicalrep_workers_find(subid, true);
5042  foreach(lc2, workers)
5043  {
5044  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5045 
5047  }
5048  }
5049  LWLockRelease(LogicalRepWorkerLock);
5050  }
5051 
5052  /* The List storage will be reclaimed automatically in xact cleanup. */
5054 }
static List * on_commit_wakeup_workers_subids
Definition: worker.c:326
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:281
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:117

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 4216 of file worker.c.

4217 {
4218  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4219 }
#define snprintf
Definition: port.h:238

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2504 of file worker.c.

2505 {
2506  /*
2507  * For partitioned tables, we only need to care if the target partition is
2508  * updatable (aka has PK or RI defined for it).
2509  */
2510  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2511  return;
2512 
2513  /* Updatable, no error. */
2514  if (rel->updatable)
2515  return;
2516 
2517  /*
2518  * We are in error mode so it's fine this is somewhat slow. It's better to
2519  * give user correct error.
2520  */
2522  {
2523  ereport(ERROR,
2524  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2525  errmsg("publisher did not send replica identity column "
2526  "expected by the logical replication target relation \"%s.%s\"",
2527  rel->remoterel.nspname, rel->remoterel.relname)));
2528  }
2529 
2530  ereport(ERROR,
2531  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2532  errmsg("logical replication target relation \"%s.%s\" has "
2533  "neither REPLICA IDENTITY index nor PRIMARY "
2534  "KEY and published relation does not have "
2535  "REPLICA IDENTITY FULL",
2536  rel->remoterel.nspname, rel->remoterel.relname)));
2537 }
#define OidIsValid(objectId)
Definition: c.h:764
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:852

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 4413 of file worker.c.

4414 {
4415  if (subxact_data.subxacts)
4417 
4418  subxact_data.subxacts = NULL;
4420  subxact_data.nsubxacts = 0;
4422 }
static ApplySubXactData subxact_data
Definition: worker.c:382
void pfree(void *pointer)
Definition: mcxt.c:1431
uint32 nsubxacts
Definition: worker.c:376
uint32 nsubxacts_max
Definition: worker.c:377
SubXactInfo * subxacts
Definition: worker.c:379
TransactionId subxact_last
Definition: worker.c:378

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 4837 of file worker.c.

4838 {
4839  Relation rel;
4840  Form_pg_subscription subform;
4841  HeapTuple tup;
4842  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4843  bool started_tx = false;
4844 
4846  return;
4847 
4848  if (!IsTransactionState())
4849  {
4851  started_tx = true;
4852  }
4853 
4854  /*
4855  * Protect subskiplsn of pg_subscription from being concurrently updated
4856  * while clearing it.
4857  */
4858  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4859  AccessShareLock);
4860 
4861  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4862 
4863  /* Fetch the existing tuple. */
4864  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4866 
4867  if (!HeapTupleIsValid(tup))
4868  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4869 
4870  subform = (Form_pg_subscription) GETSTRUCT(tup);
4871 
4872  /*
4873  * Clear the subskiplsn. If the user has already changed subskiplsn before
4874  * clearing it we don't update the catalog and the replication origin
4875  * state won't get advanced. So in the worst case, if the server crashes
4876  * before sending an acknowledgment of the flush position the transaction
4877  * will be sent again and the user needs to set subskiplsn again. We can
4878  * reduce the possibility by logging a replication origin WAL record to
4879  * advance the origin LSN instead but there is no way to advance the
4880  * origin timestamp and it doesn't seem to be worth doing anything about
4881  * it since it's a very rare case.
4882  */
4883  if (subform->subskiplsn == myskiplsn)
4884  {
4885  bool nulls[Natts_pg_subscription];
4886  bool replaces[Natts_pg_subscription];
4887  Datum values[Natts_pg_subscription];
4888 
4889  memset(values, 0, sizeof(values));
4890  memset(nulls, false, sizeof(nulls));
4891  memset(replaces, false, sizeof(replaces));
4892 
4893  /* reset subskiplsn */
4894  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4895  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4896 
4897  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4898  replaces);
4899  CatalogTupleUpdate(rel, &tup->t_self, tup);
4900 
4901  if (myskiplsn != finish_lsn)
4902  ereport(WARNING,
4903  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4904  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4905  LSN_FORMAT_ARGS(finish_lsn),
4906  LSN_FORMAT_ARGS(myskiplsn)));
4907  }
4908 
4909  heap_freetuple(tup);
4910  table_close(rel, NoLock);
4911 
4912  if (started_tx)
4914 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define likely(x)
Definition: c.h:299
int errdetail(const char *fmt,...)
Definition: elog.c:1208
#define WARNING
Definition: elog.h:36
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
FormData_pg_subscription * Form_pg_subscription
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog(), ereport, errdetail(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData* create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 677 of file worker.c.

678 {
679  ApplyExecutionData *edata;
680  EState *estate;
681  RangeTblEntry *rte;
682  List *perminfos = NIL;
683  ResultRelInfo *resultRelInfo;
684 
685  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
686  edata->targetRel = rel;
687 
688  edata->estate = estate = CreateExecutorState();
689 
690  rte = makeNode(RangeTblEntry);
691  rte->rtekind = RTE_RELATION;
692  rte->relid = RelationGetRelid(rel->localrel);
693  rte->relkind = rel->localrel->rd_rel->relkind;
695 
696  addRTEPermissionInfo(&perminfos, rte);
697 
698  ExecInitRangeTable(estate, list_make1(rte), perminfos);
699 
700  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
701 
702  /*
703  * Use Relation opened by logicalrep_rel_open() instead of opening it
704  * again.
705  */
706  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
707 
708  /*
709  * We put the ResultRelInfo in the es_opened_result_relations list, even
710  * though we don't populate the es_result_relations array. That's a bit
711  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
712  *
713  * ExecOpenIndices() is not called here either, each execution path doing
714  * an apply operation being responsible for that.
715  */
717  lappend(estate->es_opened_result_relations, resultRelInfo);
718 
719  estate->es_output_cid = GetCurrentCommandId(true);
720 
721  /* Prepare to catch AFTER triggers. */
723 
724  /* other fields of edata remain NULL for now */
725 
726  return edata;
727 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1225
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:733
EState * CreateExecutorState(void)
Definition: execUtils.c:93
void * palloc0(Size size)
Definition: mcxt.c:1232
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1006
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:504
List * es_opened_result_relations
Definition: execnodes.h:643
CommandId es_output_cid
Definition: execnodes.h:637
RTEKind rtekind
Definition: parsenodes.h:1025
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5025
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:818

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0(), RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 4730 of file worker.c.

4731 {
4732  /*
4733  * Emit the error message, and recover from the error state to an idle
4734  * state
4735  */
4736  HOLD_INTERRUPTS();
4737 
4738  EmitErrorReport();
4740  FlushErrorState();
4741 
4743 
4744  /* Report the worker failed during either table synchronization or apply */
4746  !am_tablesync_worker());
4747 
4748  /* Disable the subscription */
4752 
4753  /* Ensure we remove no-longer-useful entry for worker's start time */
4754  if (am_leader_apply_worker())
4756 
4757  /* Notify the subscription has been disabled and exit */
4758  ereport(LOG,
4759  errmsg("subscription \"%s\" has been disabled because of an error",
4760  MySubscription->name));
4761 
4762  proc_exit(0);
4763 }
void EmitErrorReport(void)
Definition: elog.c:1675
void FlushErrorState(void)
Definition: elog.c:1831
#define LOG
Definition: elog.h:31
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4728

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
off_t  offset 
)
static

Definition at line 1994 of file worker.c.

1996 {
1997  char path[MAXPGPATH];
1998  BufFile *fd;
1999  int last_fileno;
2000  off_t last_offset;
2001 
2003 
2005 
2007 
2008  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2009 
2010  BufFileSeek(fd, 0, 0, SEEK_END);
2011  BufFileTell(fd, &last_fileno, &last_offset);
2012 
2013  BufFileClose(fd);
2014 
2016 
2017  if (last_fileno != fileno || last_offset != offset)
2018  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2019  path);
2020 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert(), begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog(), end_replication_step(), ERROR, fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( ApplyExecutionData edata,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 2871 of file worker.c.

2876 {
2877  EState *estate = edata->estate;
2878  bool found;
2879 
2880  /*
2881  * Regardless of the top-level operation, we're performing a read here, so
2882  * check for SELECT privileges.
2883  */
2884  TargetPrivilegesCheck(localrel, ACL_SELECT);
2885 
2886  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2887 
2888  Assert(OidIsValid(localidxoid) ||
2889  (remoterel->replident == REPLICA_IDENTITY_FULL));
2890 
2891  if (OidIsValid(localidxoid))
2892  {
2893 #ifdef USE_ASSERT_CHECKING
2894  Relation idxrel = index_open(localidxoid, AccessShareLock);
2895 
2896  /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2897  Assert(GetRelationIdentityOrPK(idxrel) == localidxoid ||
2899  edata->targetRel->attrmap));
2900  index_close(idxrel, AccessShareLock);
2901 #endif
2902 
2903  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2905  remoteslot, *localslot);
2906  }
2907  else
2908  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2909  remoteslot, *localslot);
2910 
2911  return found;
2912 }
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2438
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:77
bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo, AttrMap *attrmap)
Definition: relation.c:805

References AccessShareLock, ACL_SELECT, Assert(), LogicalRepRelMapEntry::attrmap, BuildIndexInfo(), EState::es_tupleTable, ApplyExecutionData::estate, GetRelationIdentityOrPK(), index_close(), index_open(), IsIndexUsableForReplicaIdentityFull(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 734 of file worker.c.

735 {
736  EState *estate = edata->estate;
737 
738  /* Handle any queued AFTER triggers. */
739  AfterTriggerEndQuery(estate);
740 
741  /* Shut down tuple routing, if any was done. */
742  if (edata->proute)
743  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
744 
745  /*
746  * Cleanup. It might seem that we should call ExecCloseResultRelations()
747  * here, but we intentionally don't. It would close the rel we added to
748  * es_opened_result_relations above, which is wrong because we took no
749  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
750  * any other relations opened during execution.
751  */
752  ExecResetTupleTable(estate->es_tupleTable, false);
753  FreeExecutorState(estate);
754  pfree(edata);
755 }
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1190
void FreeExecutorState(EState *estate)
Definition: execUtils.c:194
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5045

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 3417 of file worker.c.

3419 {
3420  dlist_mutable_iter iter;
3421  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3422 
3424  *flush = InvalidXLogRecPtr;
3425 
3427  {
3428  FlushPosition *pos =
3429  dlist_container(FlushPosition, node, iter.cur);
3430 
3431  *write = pos->remote_end;
3432 
3433  if (pos->local_end <= local_flush)
3434  {
3435  *flush = pos->remote_end;
3436  dlist_delete(iter.cur);
3437  pfree(pos);
3438  }
3439  else
3440  {
3441  /*
3442  * Don't want to uselessly iterate over the rest of the list which
3443  * could potentially be long. Instead get the last element and
3444  * grab the write position from there.
3445  */
3446  pos = dlist_tail_element(FlushPosition, node,
3447  &lsn_mapping);
3448  *write = pos->remote_end;
3449  *have_pending_txes = true;
3450  return;
3451  }
3452  }
3453 
3454  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3455 }
static dlist_head lsn_mapping
Definition: worker.c:229
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:226
XLogRecPtr local_end
Definition: worker.c:225
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6376

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 5075 of file worker.c.

5076 {
5077  *winfo = NULL;
5078 
5080  {
5081  return TRANS_PARALLEL_APPLY;
5082  }
5083 
5084  /*
5085  * If we are processing this transaction using a parallel apply worker
5086  * then either we send the changes to the parallel worker or if the worker
5087  * is busy then serialize the changes to the file which will later be
5088  * processed by the parallel worker.
5089  */
5090  *winfo = pa_find_worker(xid);
5091 
5092  if (*winfo && (*winfo)->serialize_changes)
5093  {
5095  }
5096  else if (*winfo)
5097  {
5099  }
5100 
5101  /*
5102  * If there is no parallel worker involved to process this transaction
5103  * then we either directly apply the change or serialize it to a file
5104  * which will later be applied when the transaction finish message is
5105  * processed.
5106  */
5107  else if (in_streamed_transaction)
5108  {
5109  return TRANS_LEADER_SERIALIZE;
5110  }
5111  else
5112  {
5113  return TRANS_LEADER_APPLY;
5114  }
5115 }
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 584 of file worker.c.

585 {
586  TransactionId current_xid;
588  TransApplyAction apply_action;
589  StringInfoData original_msg;
590 
591  apply_action = get_transaction_apply_action(stream_xid, &winfo);
592 
593  /* not in streaming mode */
594  if (apply_action == TRANS_LEADER_APPLY)
595  return false;
596 
598 
599  /*
600  * The parallel apply worker needs the xid in this message to decide
601  * whether to define a savepoint, so save the original message that has
602  * not moved the cursor after the xid. We will serialize this message to a
603  * file in PARTIAL_SERIALIZE mode.
604  */
605  original_msg = *s;
606 
607  /*
608  * We should have received XID of the subxact as the first part of the
609  * message, so extract it.
610  */
611  current_xid = pq_getmsgint(s, 4);
612 
613  if (!TransactionIdIsValid(current_xid))
614  ereport(ERROR,
615  (errcode(ERRCODE_PROTOCOL_VIOLATION),
616  errmsg_internal("invalid transaction ID in streamed replication transaction")));
617 
618  switch (apply_action)
619  {
621  Assert(stream_fd);
622 
623  /* Add the new subxact to the array (unless already there). */
624  subxact_info_add(current_xid);
625 
626  /* Write the change to the current file */
628  return true;
629 
631  Assert(winfo);
632 
633  /*
634  * XXX The publisher side doesn't always send relation/type update
635  * messages after the streaming transaction, so also update the
636  * relation/type in leader apply worker. See function
637  * cleanup_rel_sync_cache.
638  */
639  if (pa_send_data(winfo, s->len, s->data))
640  return (action != LOGICAL_REP_MSG_RELATION &&
642 
643  /*
644  * Switch to serialize mode when we are not able to send the
645  * change to parallel apply worker.
646  */
647  pa_switch_to_partial_serialize(winfo, false);
648 
649  /* fall through */
651  stream_write_change(action, &original_msg);
652 
653  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
654  return (action != LOGICAL_REP_MSG_RELATION &&
656 
659 
660  /* Define a savepoint for a subxact if needed. */
661  pa_start_subtrans(current_xid, stream_xid);
662  return false;
663 
664  default:
665  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
666  return false; /* silence compiler warning */
667  }
668 }
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4131
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:418

References generate_unaccent_rules::action, Assert(), StringInfoData::data, elog(), ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 4586 of file worker.c.

4587 {
4588  MemoryContext oldctx;
4589 
4590  /* Run as replica session replication role. */
4591  SetConfigOption("session_replication_role", "replica",
4593 
4594  /* Connect to our database. */
4597  0);
4598 
4599  /*
4600  * Set always-secure search path, so malicious users can't redirect user
4601  * code (e.g. pg_index.indexprs).
4602  */
4603  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4604 
4605  /* Load the subscription into persistent memory context. */
4607  "ApplyContext",
4611 
4613  if (!MySubscription)
4614  {
4615  ereport(LOG,
4616  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4618 
4619  /* Ensure we remove no-longer-useful entry for worker's start time */
4620  if (am_leader_apply_worker())
4622 
4623  proc_exit(0);
4624  }
4625 
4626  MySubscriptionValid = true;
4627  MemoryContextSwitchTo(oldctx);
4628 
4629  if (!MySubscription->enabled)
4630  {
4631  ereport(LOG,
4632  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4633  MySubscription->name)));
4634 
4636  }
4637 
4638  /* Setup synchronous commit according to the user's wishes */
4639  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4641 
4642  /*
4643  * Keep us informed about subscription or role changes. Note that the
4644  * role's superuser privilege can be revoked.
4645  */
4646  CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4648  (Datum) 0);
4649 
4652  (Datum) 0);
4653 
4654  if (am_tablesync_worker())
4655  ereport(LOG,
4656  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4659  else
4660  ereport(LOG,
4661  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4662  MySubscription->name)));
4663 
4665 }
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:4016
static void apply_worker_exit(void)
Definition: worker.c:3856
MemoryContext ApplyContext
Definition: worker.c:316
static bool MySubscriptionValid
Definition: worker.c:324
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4256
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1517
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1905
MemoryContext TopMemoryContext
Definition: mcxt.c:141
#define AllocSetContextCreate
Definition: memutils.h:128
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
Subscription * GetSubscription(Oid subid, bool missing_ok)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5656

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 4778 of file worker.c.

4779 {
4781 }
bool IsLogicalWorker(void)
Definition: worker.c:4769

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 4769 of file worker.c.

4770 {
4771  return MyLogicalRepWorker != NULL;
4772 }

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3503 of file worker.c.

3504 {
3505  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3506  bool ping_sent = false;
3507  TimeLineID tli;
3508  ErrorContextCallback errcallback;
3509 
3510  /*
3511  * Init the ApplyMessageContext which we clean up after each replication
3512  * protocol message.
3513  */
3515  "ApplyMessageContext",
3517 
3518  /*
3519  * This memory context is used for per-stream data when the streaming mode
3520  * is enabled. This context is reset on each stream stop.
3521  */
3523  "LogicalStreamingContext",
3525 
3526  /* mark as idle, before starting to loop */
3528 
3529  /*
3530  * Push apply error context callback. Fields will be filled while applying
3531  * a change.
3532  */
3533  errcallback.callback = apply_error_callback;
3534  errcallback.previous = error_context_stack;
3535  error_context_stack = &errcallback;
3537 
3538  /* This outer loop iterates once per wait. */
3539  for (;;)
3540  {
3542  int rc;
3543  int len;
3544  char *buf = NULL;
3545  bool endofstream = false;
3546  long wait_time;
3547 
3549 
3551 
3553 
3554  if (len != 0)
3555  {
3556  /* Loop to process all available data (without blocking). */
3557  for (;;)
3558  {
3560 
3561  if (len == 0)
3562  {
3563  break;
3564  }
3565  else if (len < 0)
3566  {
3567  ereport(LOG,
3568  (errmsg("data stream from publisher has ended")));
3569  endofstream = true;
3570  break;
3571  }
3572  else
3573  {
3574  int c;
3575  StringInfoData s;
3576 
3577  if (ConfigReloadPending)
3578  {
3579  ConfigReloadPending = false;
3581  }
3582 
3583  /* Reset timeout. */
3584  last_recv_timestamp = GetCurrentTimestamp();
3585  ping_sent = false;
3586 
3587  /* Ensure we are reading the data into our memory context. */
3589 
3591 
3592  c = pq_getmsgbyte(&s);
3593 
3594  if (c == 'w')
3595  {
3596  XLogRecPtr start_lsn;
3597  XLogRecPtr end_lsn;
3598  TimestampTz send_time;
3599 
3600  start_lsn = pq_getmsgint64(&s);
3601  end_lsn = pq_getmsgint64(&s);
3602  send_time = pq_getmsgint64(&s);
3603 
3604  if (last_received < start_lsn)
3605  last_received = start_lsn;
3606 
3607  if (last_received < end_lsn)
3608  last_received = end_lsn;
3609 
3610  UpdateWorkerStats(last_received, send_time, false);
3611 
3612  apply_dispatch(&s);
3613  }
3614  else if (c == 'k')
3615  {
3616  XLogRecPtr end_lsn;
3618  bool reply_requested;
3619 
3620  end_lsn = pq_getmsgint64(&s);
3621  timestamp = pq_getmsgint64(&s);
3622  reply_requested = pq_getmsgbyte(&s);
3623 
3624  if (last_received < end_lsn)
3625  last_received = end_lsn;
3626 
3627  send_feedback(last_received, reply_requested, false);
3628  UpdateWorkerStats(last_received, timestamp, true);
3629  }
3630  /* other message types are purposefully ignored */
3631 
3633  }
3634 
3636  }
3637  }
3638 
3639  /* confirm all writes so far */
3640  send_feedback(last_received, false, false);
3641 
3643  {
3644  /*
3645  * If we didn't get any transactions for a while there might be
3646  * unconsumed invalidation messages in the queue, consume them
3647  * now.
3648  */
3651 
3652  /* Process any table synchronization changes. */
3653  process_syncing_tables(last_received);
3654  }
3655 
3656  /* Cleanup the memory. */
3659 
3660  /* Check if we need to exit the streaming loop. */
3661  if (endofstream)
3662  break;
3663 
3664  /*
3665  * Wait for more data or latch. If we have unflushed transactions,
3666  * wake up after WalWriterDelay to see if they've been flushed yet (in
3667  * which case we should send a feedback message). Otherwise, there's
3668  * no particular urgency about waking up unless we get data or a
3669  * signal.
3670  */
3671  if (!dlist_is_empty(&lsn_mapping))
3672  wait_time = WalWriterDelay;
3673  else
3674  wait_time = NAPTIME_PER_CYCLE;
3675 
3679  fd, wait_time,
3680  WAIT_EVENT_LOGICAL_APPLY_MAIN);
3681 
3682  if (rc & WL_LATCH_SET)
3683  {
3686  }
3687 
3688  if (ConfigReloadPending)
3689  {
3690  ConfigReloadPending = false;
3692  }
3693 
3694  if (rc & WL_TIMEOUT)
3695  {
3696  /*
3697  * We didn't receive anything new. If we haven't heard anything
3698  * from the server for more than wal_receiver_timeout / 2, ping
3699  * the server. Also, if it's been longer than
3700  * wal_receiver_status_interval since the last update we sent,
3701  * send a status update to the primary anyway, to report any
3702  * progress in applying WAL.
3703  */
3704  bool requestReply = false;
3705 
3706  /*
3707  * Check if time since last receive from primary has reached the
3708  * configured limit.
3709  */
3710  if (wal_receiver_timeout > 0)
3711  {
3713  TimestampTz timeout;
3714 
3715  timeout =
3716  TimestampTzPlusMilliseconds(last_recv_timestamp,
3718 
3719  if (now >= timeout)
3720  ereport(ERROR,
3721  (errcode(ERRCODE_CONNECTION_FAILURE),
3722  errmsg("terminating logical replication worker due to timeout")));
3723 
3724  /* Check to see if it's time for a ping. */
3725  if (!ping_sent)
3726  {
3727  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3728  (wal_receiver_timeout / 2));
3729  if (now >= timeout)
3730  {
3731  requestReply = true;
3732  ping_sent = true;
3733  }
3734  }
3735  }
3736 
3737  send_feedback(last_received, requestReply, requestReply);
3738 
3739  /*
3740  * Force reporting to ensure long idle periods don't lead to
3741  * arbitrarily delayed stats. Stats can only be reported outside
3742  * of (implicit or explicit) transactions. That shouldn't lead to
3743  * stats being delayed for long, because transactions are either
3744  * sent as a whole on commit or streamed. Streamed transactions
3745  * are spilled to disk and applied on commit.
3746  */
3747  if (!IsTransactionState())
3748  pgstat_report_stat(true);
3749  }
3750  }
3751 
3752  /* Pop the error context stack */
3753  error_context_stack = errcallback.previous;
3755 
3756  /* All done */
3758 }
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3487
#define NAPTIME_PER_CYCLE
Definition: worker.c:220
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:313
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3767
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:321
void apply_error_callback(void *arg)
Definition: worker.c:4918
static MemoryContext LogicalStreamingContext
Definition: worker.c:319
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
int64 TimestampTz
Definition: timestamp.h:39
ErrorContextCallback * error_context_stack
Definition: elog.c:95
struct Latch * MyLatch
Definition: globals.c:59
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:566
void ResetLatch(Latch *latch)
Definition: latch.c:725
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static char * buf
Definition: pg_test_fsync.c:73
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
char * c
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_receiver_timeout
Definition: walreceiver.c:91
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:450
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:452
int WalWriterDelay
Definition: walwriter.c:70
uint32 TimeLineID
Definition: xlogdefs.h:59

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, dlist_is_empty(), ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, initReadOnlyStringInfo(), IsTransactionState(), len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, maybe_reread_subscription(), MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 3887 of file worker.c.

3888 {
3889  MemoryContext oldctx;
3891  bool started_tx = false;
3892 
3893  /* When cache state is valid there is nothing to do here. */
3894  if (MySubscriptionValid)
3895  return;
3896 
3897  /* This function might be called inside or outside of transaction. */
3898  if (!IsTransactionState())
3899  {
3901  started_tx = true;
3902  }
3903 
3904  /* Ensure allocations in permanent context. */
3906 
3908 
3909  /*
3910  * Exit if the subscription was removed. This normally should not happen
3911  * as the worker gets killed during DROP SUBSCRIPTION.
3912  */
3913  if (!newsub)
3914  {
3915  ereport(LOG,
3916  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3917  MySubscription->name)));
3918 
3919  /* Ensure we remove no-longer-useful entry for worker's start time */
3920  if (am_leader_apply_worker())
3922 
3923  proc_exit(0);
3924  }
3925 
3926  /* Exit if the subscription was disabled. */
3927  if (!newsub->enabled)
3928  {
3929  ereport(LOG,
3930  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3931  MySubscription->name)));
3932 
3934  }
3935 
3936  /* !slotname should never happen when enabled is true. */
3937  Assert(newsub->slotname);
3938 
3939  /* two-phase should not be altered */
3940  Assert(newsub->twophasestate == MySubscription->twophasestate);
3941 
3942  /*
3943  * Exit if any parameter that affects the remote connection was changed.
3944  * The launcher will start a new worker but note that the parallel apply
3945  * worker won't restart if the streaming option's value is changed from
3946  * 'parallel' to any other value or the server decides not to stream the
3947  * in-progress transaction.
3948  */
3949  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3950  strcmp(newsub->name, MySubscription->name) != 0 ||
3951  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3952  newsub->binary != MySubscription->binary ||
3953  newsub->stream != MySubscription->stream ||
3954  newsub->passwordrequired != MySubscription->passwordrequired ||
3955  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3956  newsub->owner != MySubscription->owner ||
3957  !equal(newsub->publications, MySubscription->publications))
3958  {
3960  ereport(LOG,
3961  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3962  MySubscription->name)));
3963  else
3964  ereport(LOG,
3965  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3966  MySubscription->name)));
3967 
3969  }
3970 
3971  /*
3972  * Exit if the subscription owner's superuser privileges have been
3973  * revoked.
3974  */
3975  if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3976  {
3978  ereport(LOG,
3979  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3980  MySubscription->name));
3981  else
3982  ereport(LOG,
3983  errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3984  MySubscription->name));
3985 
3987  }
3988 
3989  /* Check for other changes that should never happen too. */
3990  if (newsub->dbid != MySubscription->dbid)
3991  {
3992  elog(ERROR, "subscription %u changed unexpectedly",
3994  }
3995 
3996  /* Clean old subscription info and switch to new one. */
3999 
4000  MemoryContextSwitchTo(oldctx);
4001 
4002  /* Change synchronous commit according to the user's wishes */
4003  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4005 
4006  if (started_tx)
4008 
4009  MySubscriptionValid = true;
4010 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog(), equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 4788 of file worker.c.

4789 {
4793 
4794  /*
4795  * Quick return if it's not requested to skip this transaction. This
4796  * function is called for every remote transaction and we assume that
4797  * skipping the transaction is not used often.
4798  */
4800  MySubscription->skiplsn != finish_lsn))
4801  return;
4802 
4803  /* Start skipping all changes of this transaction */
4804  skip_xact_finish_lsn = finish_lsn;
4805 
4806  ereport(LOG,
4807  errmsg("logical replication starts skipping transaction at LSN %X/%X",
4809 }
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:360

References Assert(), ereport, errmsg(), in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 453 of file worker.c.

455 {
456  if (OidIsValid(relid))
457  {
458  /* Replication origin name for tablesync workers. */
459  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
460  }
461  else
462  {
463  /* Replication origin name for non-tablesync workers. */
464  snprintf(originname, szoriginname, "pg_%u", suboid);
465  }
466 }

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), run_apply_worker(), and run_tablesync_worker().

◆ reset_apply_error_context_info()

◆ run_apply_worker()

static void run_apply_worker ( )
static

Definition at line 4482 of file worker.c.